Skip to main content

Aggregation (N → 1 Fan-In)

MAPS Aggregation provides windowed fan-in (N→1) stream aggregation across multiple input topics.

An Aggregator instance:

  • Subscribes to multiple input topics.
  • Buffers events in arrival-time windows.
  • Applies optional per-input selectors and transformer chains.
  • Applies a contribution policy (FIRST or LAST).
  • Emits a single aggregated envelope event to an output topic.
  • Optionally applies a final output transformer chain before publish.

Stage 1 scope:
Fan-in time-bucket aggregation only.
No correlation keys, no DLQ, no persistence, no policies.


Conceptual Model

Incoming Topics → Selector → Transformer → Window → Aggregated Envelope → Output Transformers → Output Topic

Aggregation is arrival-time based, not event-time based.


Execution Model

For each aggregator:

  1. A window opens.
  2. Events from configured input topics are received.
  3. For each input:
    • Optional JMS selector is evaluated.
    • If matched, optional transformer chain is applied.
    • Contribution mode determines how events are retained.
  4. Window closes when:
    • windowDurationMs elapses, or
    • timeoutMs elapses (hard cap).
  5. An aggregated envelope is emitted.
  6. Optional outputTransformers are applied.
  7. Event is published to outputTopic.

Aggregator Configuration Fields

name

Unique identifier for this aggregator.
Pattern: ^[A-Za-z0-9_.-]+$

enabled

Enable or disable this instance.
Default: true

inputs

List of input topic configurations (minimum 1).

Each input supports:

  • topicName -- Topic to subscribe to\
  • selector -- Optional JMS selector\
  • transformer -- Optional transformer chain\
  • contributionMode -- FIRST or LAST

outputTopic

Topic where the aggregated envelope is published.

windowDurationMs

Arrival-time window size (milliseconds).
Min: 1
Max: 3,600,000

timeoutMs

Hard timeout. Window closes even if not all inputs have contributed.

maxEventsPerTopic

Maximum buffered events per input within a window.
Stage 1 default: 1

outputTransformers

Optional transformer chain applied to the aggregated envelope before publish.


Contribution Modes

FIRST

Keep the first event received for the window and drop subsequent events.

LAST (Default)

Keep only the last event received for the window, replacing previous ones.

Default: LAST


Per-Input Processing Order

For each event:

  1. Topic match
  2. Selector evaluation (if defined)
  3. Transformer chain execution (if defined)
  4. Contribution mode applied
  5. Stored in window buffer

Selectors are evaluated before transformation.


Aggregated Envelope Structure

Before outputTransformers run, the aggregated envelope has this structure:

{
"aggregatorName": "sensor-aggregator-1",
"windowStart": 1700000000000,
"windowEnd": 1700000001000,
"inputs": {
"maps/sensors/temp": {
"payload": { ... }
},
"maps/sensors/humidity": {
"payload": { ... }
}
}
}

If an input does not produce data during the window: - It is absent from the inputs map.


Performance & Scaling

Managed by AggregatorManager:

  • stripeCount -- Concurrency partitioning\
  • maxBatchPerAggregator -- Processing batch size\
  • idleSleepMs -- Idle backoff\
  • mailboxCapacity -- Internal queue capacity\
  • maxAggregators -- Upper bound

Aggregation is: - Stream-oriented - Memory-resident - Inline with event flow - Non-persistent (Stage 1)


When to Use Aggregation

Use aggregation when you need:

  • Multi-topic sensor fusion
  • Periodic snapshot events
  • Join-like behavior across topics
  • Controlled windowed fan-in
  • Pre-processing before statistics or routing

Aggregation is a stream-join primitive, not a database or correlation engine.