Candle Aggregation Guide¶
The tradedesk.marketdata module provides time-bucketing candle aggregation for converting base-period candles into higher timeframes.
Overview¶
CandleAggregator converts fast candles (e.g., 1-minute) into slower timeframes (e.g., 15-minute) using wall-clock time bucketing:
- Time-aligned: Buckets align to UTC time boundaries (not count-based)
- Multi-instrument: One aggregator can handle multiple instruments concurrently
- Gap-tolerant: Missing base candles don't break aggregation
- Memory-efficient: Only stores current bucket state per instrument
Basic Usage¶
from tradedesk.marketdata import CandleAggregator
from tradedesk import Candle
# Create aggregator for 15-minute candles from 5-minute base period
agg = CandleAggregator(target_period="15MINUTE", base_period="5MINUTE")
# Process base candles
base_candles = [
Candle(timestamp="1704067200000", open=1.10, high=1.11, low=1.09, close=1.105), # 00:00:00
Candle(timestamp="1704067500000", open=1.105, high=1.12, low=1.10, close=1.115), # 00:05:00
Candle(timestamp="1704067800000", open=1.115, high=1.13, low=1.11, close=1.125), # 00:10:00
Candle(timestamp="1704068100000", open=1.125, high=1.14, low=1.12, close=1.135), # 00:15:00 (triggers)
]
instrument = "EURUSD"
# Process each base candle
result = agg.update(instrument=instrument, candle=base_candles[0]) # None (accumulating)
result = agg.update(instrument=instrument, candle=base_candles[1]) # None (accumulating)
result = agg.update(instrument=instrument, candle=base_candles[2]) # None (accumulating)
result = agg.update(instrument=instrument, candle=base_candles[3]) # Candle (bucket rolled!)
# result is now the aggregated 15-minute candle
assert result.timestamp == "1704067200000" # Bucket start time
assert result.open == 1.10 # First open
assert result.high == 1.13 # Highest high
assert result.low == 1.09 # Lowest low
assert result.close == 1.125 # Last close
Choosing Base Periods¶
Use choose_base_period() to automatically select an appropriate base period for your broker:
from tradedesk.marketdata import choose_base_period
# Default: Uses common broker periods (SECOND, 1MINUTE, 5MINUTE, HOUR)
base = choose_base_period("15MINUTE") # Returns "1MINUTE"
base = choose_base_period("7MINUTE") # Returns "1MINUTE"
base = choose_base_period("HOUR") # Returns "HOUR"
# Custom broker periods
broker_periods = ["1MINUTE", "5MINUTE", "15MINUTE", "1HOUR"]
base = choose_base_period("30MINUTE", supported_periods=broker_periods) # Returns "15MINUTE"
Selection Logic¶
The function prefers larger base periods when possible: 1. If target is exactly HOUR → use HOUR 2. If target is divisible by 5 minutes and ≥ 5 minutes → use 5MINUTE 3. If target is divisible by 1 minute and ≥ 1 minute → use 1MINUTE 4. Otherwise → use SECOND
Multiple Instruments¶
One aggregator instance can handle many instruments with independent state:
agg = CandleAggregator(target_period="15MINUTE")
# Process different instruments
result_eur = agg.update(instrument="EURUSD", candle=eurusd_candle)
result_gbp = agg.update(instrument="GBPUSD", candle=gbpusd_candle)
# Each instrument has its own bucket state
OHLCV Aggregation Rules¶
When combining base candles into an aggregated candle:
- Open: First open of the bucket
- High: Maximum of all highs
- Low: Minimum of all lows
- Close: Last close of the bucket
- Volume: Sum of all volumes
- Tick Count: Sum of all tick counts
Time Bucket Alignment¶
Buckets are aligned to UTC time boundaries:
# For 15MINUTE target:
# Bucket 1: 00:00:00 - 00:15:00
# Bucket 2: 00:15:00 - 00:30:00
# Bucket 3: 00:30:00 - 00:45:00
# etc.
# A candle at 00:17:23 falls in Bucket 2
# The aggregated candle is emitted when the first candle from Bucket 3 arrives
Advanced Usage¶
Custom Broker Periods¶
# Cryptocurrency exchange with non-standard periods
crypto_periods = ["1MINUTE", "3MINUTE", "5MINUTE", "15MINUTE", "1HOUR", "4HOUR"]
agg = CandleAggregator(
target_period="15MINUTE",
supported_periods=crypto_periods
)
Inspecting Aggregator State¶
agg = CandleAggregator(target_period="15MINUTE", base_period="5MINUTE")
base_period, target_period, factor = agg.describe()
print(f"Aggregating {factor}x {base_period} candles into {target_period}")
# Output: Aggregating 3x 5MINUTE candles into 15MINUTE
Resetting State¶
# Reset aggregation state for a specific instrument
agg.reset(instrument="EURUSD")
# Useful when reconnecting or recovering from errors
Handling Missing Candles¶
The aggregator is gap-tolerant:
# If you receive candles at:
# 00:00:00, 00:05:00, [MISSING: 00:10:00], 00:15:00
# The aggregator will emit a bucket containing only the 2 received candles
# The OHLCV values represent the partial data
Complete Example: Live Aggregation¶
from tradedesk.marketdata import CandleAggregator
class LiveAggregationStrategy:
def __init__(self, target_period: str):
self.aggregator = CandleAggregator(target_period=target_period)
async def on_base_candle(self, instrument: str, candle):
"""Called when a new base-period candle arrives."""
# Try to aggregate
aggregated = self.aggregator.update(instrument=instrument, candle=candle)
if aggregated:
# Bucket rolled! Process the aggregated candle
await self.on_aggregated_candle(instrument, aggregated)
async def on_aggregated_candle(self, instrument: str, candle):
"""Called when a full target-period candle is ready."""
print(f"{instrument} {candle.timestamp}: O={candle.open} H={candle.high} "
f"L={candle.low} C={candle.close}")
Best Practices¶
-
Choose appropriate base periods: Smaller base periods give more granular aggregation but require more frequent updates.
-
Handle None returns:
update()returnsNonewhile accumulating. Always check before processing. -
Align with broker capabilities: Use
choose_base_period()to ensure compatibility with your broker's available periods. -
Test with gaps: Your strategy should handle missing base candles gracefully.
-
Monitor bucket counts: In testing, verify you're getting the expected number of base candles per bucket.
Common Patterns¶
Building Multiple Timeframes¶
# Create aggregators for multiple timeframes
agg_15m = CandleAggregator(target_period="15MINUTE")
agg_1h = CandleAggregator(target_period="HOUR")
async def on_base_candle(instrument: str, candle):
# Feed to both aggregators
c15 = agg_15m.update(instrument=instrument, candle=candle)
c1h = agg_1h.update(instrument=instrument, candle=candle)
if c15:
await process_15min_candle(c15)
if c1h:
await process_1hour_candle(c1h)
Backtesting with Aggregation¶
# Backtest using historical 1-minute candles, strategy trades on 15-minute
agg = CandleAggregator(target_period="15MINUTE", base_period="1MINUTE")
for minute_candle in historical_1min_candles:
aggregated = agg.update(instrument="EURUSD", candle=minute_candle)
if aggregated:
await strategy.on_candle_close(aggregated)
See Also¶
- Strategy Guide - Using aggregated candles in strategies
- Backtesting Guide - Backtesting with multiple timeframes
License¶
Licensed under the Apache License, Version 2.0. See: https://www.apache.org/licenses/LICENSE-2.0
Copyright 2026 Radius Red Ltd. | Contact