Aggregating Data
Aggregation Types
In Quix Streams, aggregation operations are divided into two groups: Aggregators and Collectors.
Aggregators
Aggregators incrementally combine the current value and the aggregated data and store the result to the state.
Use them when the aggregation operation can be performed in incremental way, like counting items.
Collectors
Collectors accumulate individual values in the state before performing any aggregation.
They can be used to batch items into a collection, or when the aggregation operation needs the full dataset, like calculating a median.
Collectors are optimized for storing individual values to the state and perform significantly better than Aggregators when you need to accumulate values into a list.
Note
Performance benefit comes at a price, Collectors only support .final()
mode.
Using .current()
is not supported.
Using Aggregations
Info
Currently, aggregations can be performed only over windowed data.
To calculate an aggregation, you need to define a window.
To learn more about windows, see the Windowing page.
When you have a window, call .agg()
and pass the configured aggregator or collector as a named parameter.
Example 1. Count items in the window
from datetime import timedelta
from quixstreams import Application
from quixstreams.dataframe.windows import Count
app = Application(...)
sdf = app.dataframe(...)
sdf = (
# Define a tumbling window of 10 minutes
sdf.tumbling_window(timedelta(minutes=10))
# Call .agg() and provide an Aggregator or Collector to it.
# Here we use a built-in aggregator "Count".
# The parameter name will be used as a part of the aggregated state and returned in the result.
.agg(count=Count())
# Specify how the windowed results are emitted.
# Here, emit results only for closed windows.
.final()
)
# Output:
# {
# 'start': <window start>,
# 'end': <window end>,
# 'count': 9999 - total number of events in the window
# }
Example 2. Accumulating items in the window
Use Collect()
to gather all events within each window period into a list.
Collect takes an optional column
parameter to limit the collection to one column of the input.
from datetime import timedelta
from quixstreams import Application
from quixstreams.dataframe.windows import Collect
app = Application(...)
sdf = app.dataframe(...)
sdf = (
# Define a tumbling window of 10 minutes
sdf.tumbling_window(timedelta(minutes=10))
# Collect events in the window into a list
.agg(events=Collect())
# Emit results only for closed windows
.final()
)
# Output:
# {
# 'start': <window start>,
# 'end': <window end>,
# 'events': [event1, event2, event3, ..., eventN] - list of all events in the window
# }
Aggregating over a single column
Aggregators allow you to select a column using the optional column
parameter.
When column
is passed, the Aggregator will perform aggregation only over this column.
It is assumed that the value is a dictionary.
Otherwise, it will use the whole message.
from datetime import timedelta
from quixstreams import Application
from quixstreams.dataframe.windows import Min
app = Application(...)
sdf = app.dataframe(...)
# Input:
# {"temperature" : 9999}
sdf = (
# Define a tumbling window of 10 minutes
sdf.tumbling_window(timedelta(minutes=10))
# Calculate the Min aggregation over the "temperature" column
.agg(min_temperature=Min(column="temperature"))
# Emit results only for closed windows
.final()
)
# Output:
# {
# 'start': <window start>,
# 'end': <window end>,
# 'min_temperature': 9999 - minimum temperature
# }****
Multiple Aggregations
It is possible to calculate several different aggregations and collections over the same window.
Collectors are optimized to store the values only once when shared with other collectors.
You can define a wide range of aggregations, such as:
- Aggregating over multiple message fields at once
- Calculating multiple aggregates for the same value
Example:
Assume you receive the temperature data from the sensor, and you need to calculate these aggregates for each 10-minute tumbling window:
- min temperature
- max temperature
- total count of events
- average temperature
from datetime import timedelta
from quixstreams import Application
from quixstreams.dataframe.windows import Min, Max, Count, Mean
app = Application(...)
sdf = app.dataframe(...)
sdf = (
# Define a tumbling window of 10 minutes
sdf.tumbling_window(timedelta(minutes=10))
.agg(
min_temp=Min("temperature"),
max_temp=Max("temperature"),
avg_temp=Mean("temperature"),
total_events=Count(),
)
# Emit results only for closed windows
.final()
)
# Output:
# {
# 'start': <window start>,
# 'end': <window end>,
# 'min_temp': 1,
# 'max_temp': 999,
# 'avg_temp': 34.32,
# 'total_events': 999,
# }
Built-in Aggregators and Collectors
Aggregators:
Count()
- to count the number of values within a window.Min()
- to get a minimum value within a window.Max()
- to get a maximum value within a window.Mean()
- to get a mean value within a window.Sum()
- to sum values within a window.Reduce()
- to write a custom aggregation (deprecated, use custom aggregator instead).
Collectors:
Collect()
- to collect all values within a window into a list.
Custom Aggregators
To implement a custom aggregator, subclass the Aggregator
class and implement 3 methods:
initialize
: Called when initializing a new window. Starting value of the aggregation.agg
: Called for every item added to the window. It should merge the new value with the aggregated state.result
: Called to generate the result from the aggregated value
By default, the aggregation state key includes the aggregation class name.
If your aggregations accepts parameters, like a column name, you can override the state_suffix
property to include those parameters in the state key.
Whenever the state key changes, the aggregation's state is reset.
Example 1. Power sum
Calculate the sum of the power of incoming data over a 10-minute tumbing window,.
from datetime import timedelta
from quixstreams import Application
from quixstreams.dataframe.windows.aggregations import Aggregator
app = Application(...)
sdf = app.dataframe(...)
class PowerSum(Aggregator):
def initialize(self):
return 0
def agg(self, aggregated, new, timestamp):
if self.column is not None:
new = new[self.column]
return aggregated + (new * new)
def result(self, aggregated):
return aggregated
# Input:
# {"amount" : 2}
# {"amount" : 3}
sdf = (
# Define a tumbling window of 10 minutes
sdf.tumbling_window(timedelta(minutes=10))
# Aggregate the custom sum
.agg(sum=PowerSum())
# Emit results only for closed windows
.final()
)
# Output:
# {
# 'start': <window start>,
# 'end': <window end>,
# 'sum': 13
# }
Example 2. Custom aggregation over multiple message fields
from datetime import timedelta
from quixstreams import Application
from quixstreams.dataframe.windows import Aggregator
class TemperatureAggregator(Aggregator):
def initialize(self):
return {
"min_temp": 0,
"max_temp": 0,
"total_events": 0,
"sum_temp": 0,
}
def agg(self, old, new, ts):
if self.column is not None:
new = new[self.column]
old["min_temp"] = min(old["min_temp"], new)
old["max_temp"] = max(old["max_temp"], new)
old["total_events"] += 1
old["sum_temp"] += new
return old
def result(self, stored):
return {
"min_temp": stored["min_temp"],
"max_temp": stored["max_temp"],
"total_events": stored["total_events"]
"avg_temp": stored["sum_temp"] / stored["total_events"]
}
app = Application(...)
sdf = app.dataframe(...)
sdf = (
# Define a tumbling window of 10 minutes
sdf.tumbling_window(timedelta(minutes=10))
.agg(
value=TemperatureAggregator(column="Temperature")
)
# Emit results only for closed windows
.final()
)
# Output:
# {
# 'start': <window start>,
# 'end': <window end>,
# 'value': {
# 'min_temp': 1,
# 'max_temp': 999,
# 'avg_temp': 34.32,
# 'total_events': 999,
# }
# }
Custom Collectors
To implement a custom Collector, subclass the Collector
class and implement the result
method.
It is called when the window is closed with an iterable of all the collected items in this window.
By default, Collectors always store the full message.
If you only need in a specific column, you can override the column
property to specify which column needs to be stored.
Example:
Collect all events over a 10-minute tumbling window into a reversed order list.
from datetime import timedelta
from quixstreams import Application
from quixstreams.dataframe.windows.aggregations import Collector
app = Application(...)
sdf = app.dataframe(...)
class ReversedCollect(Collector):
def result(self, items):
# items is the list of all collected item during the window
return list(reversed(items))
sdf = (
# Define a tumbling window of 10 minutes
sdf.tumbling_window(timedelta(minutes=10))
# Collect events in the window into a reversed list
.agg(events=ReversedCollect())
# Emit results only for closed windows
.final()
)
# Output:
# {
# 'start': <window start>,
# 'end': <window end>,
# 'events': [eventN, ..., event3, event2, event1] - reversed list of all events in the window
# }
Reduce
Warning
Reduce
is deprecated. Use multiple aggregations and custom Aggregators instead. They provide more control over parameters and better state management.
Reduce()
allows you to perform complex aggregations using custom "reducer" and "initializer" functions:
-
The "initializer" function receives the first value for the given window, and it must return an initial state for this window.
This state will be later passed to the "reducer" function.
It is called only once for each window. -
The "reducer" function receives an aggregated state and a current value, and it must combine them and return a new aggregated state.
This function should contain the actual aggregation logic.
It will be called for each message coming into the window, except the first one.