Aggregation (N → 1 Fan-In)
MAPS Aggregation provides windowed fan-in (N→1) stream aggregation across multiple input topics.
An Aggregator instance:
- Subscribes to multiple input topics.
- Buffers events in arrival-time windows.
- Applies optional per-input selectors and transformer chains.
- Applies a contribution policy (FIRST or LAST).
- Emits a single aggregated envelope event to an output topic.
- Optionally applies a final output transformer chain before publish.
Stage 1 scope:
Fan-in time-bucket aggregation only.
No correlation keys, no DLQ, no persistence, no policies.
Conceptual Model
Incoming Topics → Selector → Transformer → Window → Aggregated Envelope → Output Transformers → Output Topic
Aggregation is arrival-time based, not event-time based.
Execution Model
For each aggregator:
- A window opens.
- Events from configured input topics are received.
- For each input:
- Optional JMS selector is evaluated.
- If matched, optional transformer chain is applied.
- Contribution mode determines how events are retained.
- Window closes when:
windowDurationMselapses, ortimeoutMselapses (hard cap).
- An aggregated envelope is emitted.
- Optional
outputTransformersare applied. - Event is published to
outputTopic.
Aggregator Configuration Fields
name
Unique identifier for this aggregator.
Pattern: ^[A-Za-z0-9_.-]+$
enabled
Enable or disable this instance.
Default: true
inputs
List of input topic configurations (minimum 1).
Each input supports:
topicName-- Topic to subscribe to\selector-- Optional JMS selector\transformer-- Optional transformer chain\contributionMode-- FIRST or LAST
outputTopic
Topic where the aggregated envelope is published.
windowDurationMs
Arrival-time window size (milliseconds).
Min: 1
Max: 3,600,000
timeoutMs
Hard timeout. Window closes even if not all inputs have contributed.
maxEventsPerTopic
Maximum buffered events per input within a window.
Stage 1 default: 1
outputTransformers
Optional transformer chain applied to the aggregated envelope before publish.
Contribution Modes
FIRST
Keep the first event received for the window and drop subsequent events.
LAST (Default)
Keep only the last event received for the window, replacing previous ones.
Default: LAST
Per-Input Processing Order
For each event:
- Topic match
- Selector evaluation (if defined)
- Transformer chain execution (if defined)
- Contribution mode applied
- Stored in window buffer
Selectors are evaluated before transformation.
Aggregated Envelope Structure
Before outputTransformers run, the aggregated envelope has this
structure:
{
"aggregatorName": "sensor-aggregator-1",
"windowStart": 1700000000000,
"windowEnd": 1700000001000,
"inputs": {
"maps/sensors/temp": {
"payload": { ... }
},
"maps/sensors/humidity": {
"payload": { ... }
}
}
}
If an input does not produce data during the window: - It is absent from
the inputs map.
Performance & Scaling
Managed by AggregatorManager:
stripeCount-- Concurrency partitioning\maxBatchPerAggregator-- Processing batch size\idleSleepMs-- Idle backoff\mailboxCapacity-- Internal queue capacity\maxAggregators-- Upper bound
Aggregation is: - Stream-oriented - Memory-resident - Inline with event flow - Non-persistent (Stage 1)
When to Use Aggregation
Use aggregation when you need:
- Multi-topic sensor fusion
- Periodic snapshot events
- Join-like behavior across topics
- Controlled windowed fan-in
- Pre-processing before statistics or routing
Aggregation is a stream-join primitive, not a database or correlation engine.