Skip to content

Data Sources

Module: coinjure.data.source

Data sources provide the event stream that drives the trading engine. All sources implement the DataSource ABC.

DataSource

from coinjure.data.source import DataSource
from coinjure.events import Event

class MyDataSource(DataSource):
    async def get_next_event(self) -> Event | None:
        """Return the next event, or None if exhausted/timeout."""
        ...

    async def start(self) -> None:
        """Called once before the engine begins polling."""
        ...

    async def stop(self) -> None:
        """Called when the engine shuts down."""
        ...
Method Description
get_next_event() Abstract. Return next event or None.
start() Lifecycle hook — launch background tasks, open connections.
stop() Lifecycle hook — cancel tasks, close connections.

CompositeDataSource

Merges events from multiple DataSource instances into a single stream via an internal queue.

from coinjure.data.source import CompositeDataSource

source = CompositeDataSource([poly_source, kalshi_source, news_source])
await source.start()  # starts all children + relay tasks
event = await source.get_next_event()  # pulls from merged queue
Method Description
drain_pending_events() Non-blocking drain of all queued events
register_token_ticker(token_id, ticker) Forward token registration to children
watch_token(token_id) Subscribe a token across all children
unwatch_token(token_id) Unsubscribe a token across all children

build_market_source

Factory function that builds a live data source for a given exchange.

from coinjure.data.source import build_market_source

source = build_market_source("polymarket")      # Polymarket CLOB + RSS news
source = build_market_source("kalshi")           # Kalshi REST + RSS news
source = build_market_source("cross_platform")   # Both exchanges + RSS news
Exchange Sources
"polymarket" LivePolyMarketDataSource + LiveRSSNewsDataSource
"kalshi" LiveKalshiDataSource + LiveRSSNewsDataSource
"cross_platform" Both live sources + LiveRSSNewsDataSource

Concrete Implementations

LivePolyMarketDataSource

Module: coinjure.data.live.polymarket

Connects to Polymarket's CLOB WebSocket for real-time order book updates and polls the Gamma API for market discovery.

LiveKalshiDataSource

Module: coinjure.data.live.kalshi

Polls Kalshi's REST API for market snapshots and order book data.

ParquetDataSource

Module: coinjure.data.backtest.parquet

Replays historical orderbook snapshots from Parquet files for backtesting. Supports multi-file concatenation and market ID filtering.

from coinjure.data.backtest.parquet import ParquetDataSource

source = ParquetDataSource(
    parquet_paths=["data/orderbook_2026-03-05T02.parquet"],
    market_ids=["12345", "67890"],  # optional filter
)

Event Types

All data sources emit events from coinjure.events:

Event Fields Description
PriceChangeEvent ticker, bid, ask, timestamp Price update
OrderBookEvent ticker, bids, asks, timestamp Full order book snapshot
NewsEvent headline, source, url, published_at News article