Full Reference
quixstreams
quixstreams.logging
configure_logging
def configure_logging(loglevel: Optional[Union[int, LogLevel]],
name: str = LOGGER_NAME,
pid: bool = False) -> bool
Configure "quixstreams" logger.
NOTE: If "quixstreams" logger already has pre-defined handlers (e.g. logging has already been configured via
logging
, or the function is called twice), it will skip configuration and returnFalse
.
Arguments:
loglevel
: a valid log level as a string or None. If None passed, this function is no-op and no logging will be configured.name
: the log name included in the outputpid
: if True include the process PID in the logs
Returns:
True if logging config has been updated, otherwise False.
quixstreams.error_callbacks
quixstreams.platforms
quixstreams.platforms.quix.config
strip_workspace_id_prefix
Remove the workspace ID from a given string if it starts with it.
Only used for consumer groups.
Arguments:
workspace_id
: the workspace ids
: the string to append to
Returns:
the string with workspace_id prefix removed
prepend_workspace_id
Add the workspace ID as a prefix to a given string if it does not have it.
Only used for consumer groups.
Arguments:
workspace_id
: the workspace ids
: the string to append to
Returns:
the string with workspace_id prepended
QuixApplicationConfig
A convenience container class for Quix Application configs.
QuixKafkaConfigsBuilder
Retrieves all the necessary information from the Quix API and builds all the objects required to connect a confluent-kafka client to the Quix Platform.
If not executed within the Quix platform directly, you must provide a Quix "streaming" (aka "sdk") token, or Personal Access Token.
Ideally you also know your workspace name or id. If not, you can search for it using a known topic name, but note the search space is limited to the access level of your token.
It also currently handles the app_auto_create_topics setting for Quix Applications.
QuixKafkaConfigsBuilder.__init__
def __init__(quix_sdk_token: Optional[str] = None,
workspace_id: Optional[str] = None,
quix_portal_api_service: Optional[QuixPortalApiService] = None,
timeout: float = 30,
topic_create_timeout: float = 60)
Arguments:
quix_portal_api_service
: A QuixPortalApiService instance (else generated)workspace_id
: A valid Quix Workspace ID (else searched for)
QuixKafkaConfigsBuilder.convert_topic_response
Converts a GET or POST ("create") topic API response to a Topic object
Arguments:
api_response
: the dict response from a get or create topic call
Returns:
a corresponding Topic object
QuixKafkaConfigsBuilder.strip_workspace_id_prefix
Remove the workspace ID from a given string if it starts with it.
Only used for consumer groups.
Arguments:
s
: the string to append to
Returns:
the string with workspace_id prefix removed
QuixKafkaConfigsBuilder.prepend_workspace_id
Add the workspace ID as a prefix to a given string if it does not have it.
Only used for consumer groups.
Arguments:
s
: the string to append to
Returns:
the string with workspace_id prepended
QuixKafkaConfigsBuilder.search_for_workspace
def search_for_workspace(workspace_name_or_id: Optional[str] = None,
timeout: Optional[float] = None) -> Optional[dict]
Search for a workspace given an expected workspace name or id.
Arguments:
workspace_name_or_id
: the expected name or id of a workspacetimeout
: response timeout (seconds); Default 30
Returns:
the workspace data dict if search success, else None
QuixKafkaConfigsBuilder.get_workspace_info
def get_workspace_info(known_workspace_topic: Optional[str] = None,
timeout: Optional[float] = None) -> dict
Queries for workspace data from the Quix API, regardless of instance cache,
and updates instance attributes from query result.
Arguments:
known_workspace_topic
: a topic you know to exist in some workspacetimeout
: response timeout (seconds); Default 30
QuixKafkaConfigsBuilder.search_workspace_for_topic
def search_workspace_for_topic(
workspace_id: str,
topic: str,
timeout: Optional[float] = None) -> Optional[str]
Search through all the topics in the given workspace id to see if there is a
match with the provided topic.
Arguments:
workspace_id
: the workspace to search intopic
: the topic to search fortimeout
: response timeout (seconds); Default 30
Returns:
the workspace_id if success, else None
QuixKafkaConfigsBuilder.search_for_topic_workspace
Find what workspace a topic belongs to.
If there is only one workspace altogether, it is assumed to be the workspace. More than one means each workspace will be searched until the first hit.
Arguments:
topic
: the topic to search fortimeout
: response timeout (seconds); Default 30
Returns:
workspace data dict if topic search success, else None
QuixKafkaConfigsBuilder.create_topic
The actual API call to create the topic.
Arguments:
topic
: a Topic instancetimeout
: response timeout (seconds); Default 30
QuixKafkaConfigsBuilder.get_or_create_topic
Get or create topics in a Quix cluster as part of initializing the Topic
object to obtain the true topic name.
Arguments:
topic
: aTopic
objecttimeout
: response timeout (seconds); Default 30 marked as "Ready" (and thus ready to produce to/consume from).
QuixKafkaConfigsBuilder.wait_for_topic_ready_statuses
def wait_for_topic_ready_statuses(topics: List[Topic],
timeout: Optional[float] = None,
finalize_timeout: Optional[float] = None)
After the broker acknowledges topics for creation, they will be in a
"Creating" status; they not usable until they are set to a status of "Ready".
This blocks until all topics are marked as "Ready" or the timeout is hit.
Arguments:
topics
: a list ofTopic
objectstimeout
: response timeout (seconds); Default 30finalize_timeout
: topic finalization timeout (seconds); Default 60 marked as "Ready" (and thus ready to produce to/consume from).
QuixKafkaConfigsBuilder.get_topic
return the topic ID (the actual cluster topic name) if it exists, else raise
Arguments:
topic_name
: name of the topictimeout
: response timeout (seconds); Default 30
Raises:
QuixApiRequestFailure
: when topic does not exist
Returns:
response dict of the topic info if topic found, else None
QuixKafkaConfigsBuilder.get_application_config
Get all the necessary attributes for an Application to run on Quix Cloud.
Arguments:
consumer_group_id
: consumer group id, if needed
Returns:
a QuixApplicationConfig instance
quixstreams.platforms.quix.env
QuixEnvironment
Class to access various Quix platform environment settings
SDK_TOKEN
noqa: S105
QuixEnvironment.state_management_enabled
Check whether "State management" is enabled for the current deployment
Returns:
True if state management is enabled, otherwise False
QuixEnvironment.deployment_id
Return current Quix deployment id.
This variable is meant to be set only by Quix Platform and only when the application is deployed.
Returns:
deployment id or None
QuixEnvironment.workspace_id
Return Quix workspace id if set
Returns:
workspace id or None
QuixEnvironment.portal_api
Return Quix Portal API url if set
Returns:
portal API URL or None
QuixEnvironment.state_dir
Return application state directory on Quix.
Returns:
path to state dir
quixstreams.platforms.quix.checks
check_state_management_enabled
Check if State Management feature is enabled for the current deployment on Quix platform. If it's disabled, the exception will be raised.
check_state_dir
Check if Application "state_dir" matches the state dir on Quix platform.
If it doesn't match, the warning will be logged.
Arguments:
state_dir
: application state_dir path
quixstreams.platforms.quix
quixstreams.platforms.quix.api
QuixPortalApiService
A light wrapper around the Quix Portal Api. If used in the Quix Platform, it will use that workspaces auth token and portal endpoint, else you must provide it.
Function names closely reflect the respective API endpoint, each starting with the method [GET, POST, etc.] followed by the endpoint path.
Results will be returned in the form of request's Response.json(), unless something else is required. Non-200's will raise exceptions.
See the swagger documentation for more info about the endpoints.
QuixPortalApiService.get_workspace_certificate
def get_workspace_certificate(workspace_id: Optional[str] = None,
timeout: float = 30) -> Optional[bytes]
Get a workspace TLS certificate if available.
Returns None
if certificate is not specified.
Arguments:
workspace_id
: workspace id, optionaltimeout
: request timeout; Default 30
Returns:
certificate as bytes if present, or None
quixstreams.platforms.quix.exceptions
quixstreams.platforms.quix.topic_manager
QuixTopicManager
The source of all topic management with quixstreams.
This is specifically for Applications using the Quix Cloud.
Generally initialized and managed automatically by a Quix Application,
but allows a user to work with it directly when needed, such as using it alongside
a plain Producer
to create its topics.
See methods for details.
QuixTopicManager.__init__
def __init__(topic_admin: TopicAdmin,
consumer_group: str,
quix_config_builder: QuixKafkaConfigsBuilder,
timeout: float = 30,
create_timeout: float = 60,
auto_create_topics: bool = True)
Arguments:
topic_admin
: anAdmin
instancequix_config_builder
: A QuixKafkaConfigsBuilder instance, else one is generated for you.timeout
: response timeout (seconds)create_timeout
: timeout for topic creation
quixstreams.dataframe.registry
DataframeRegistry
Helps manage multiple StreamingDataFrames
(multi-topic Applications
)
and their respective repartitions.
SDF
s are registered by storing their topic and current Stream.
DataframeRegistry.consumer_topics
Returns:
a list of Topics a consumer should subscribe to.
DataframeRegistry.register_root
Register a "root" SDF, or the start of a topic's processing.
Arguments:
new_sdf
: the new SDF.
DataframeRegistry.register_groupby
Register a "groupby" SDF, which is one generated with SDF.group_by()
.
Arguments:
source_sdf
: the SDF used bysdf.group_by()
new_sdf
: the SDF generated bysdf.group_by()
.
DataframeRegistry.compose_all
def compose_all(
sink: Optional[Callable[[Any, Any, int, Any], None]] = None
) -> Dict[str, VoidExecutor]
Composes all the Streams and returns them in a dict, where key is its topic.
Arguments:
sink
: callable to accumulate the results of the execution, optional.
Returns:
a {topic_name: composed} dict, where composed is a callable
quixstreams.dataframe.dataframe
StreamingDataFrame
StreamingDataFrame
is the main object you will use for ETL work.
Typically created with an app = quixstreams.app.Application()
instance,
via sdf = app.dataframe()
.
What it Does:
- Builds a data processing pipeline, declaratively (not executed immediately)
- Executes this pipeline on inputs at runtime (Kafka message values)
- Provides functions/interface similar to Pandas Dataframes/Series
- Enables stateful processing (and manages everything related to it)
How to Use:
Define various operations while continuously reassigning to itself (or new fields).
These operations will generally transform your data, access/update state, or produce to kafka topics.
We recommend your data structure to be "columnar" (aka a dict/JSON) in nature so
that it works with the entire interface, but simple types like ints
, str
, etc.
are also supported.
See the various methods and classes for more specifics, or for a deep dive into
usage, see streamingdataframe.md
under the docs/
folder.
NOTE: column referencing like
sdf["a_column"]
and various methods often create other object types (typicallyquixstreams.dataframe.StreamingSeries
), which is expected; type hinting should alert you to any issues should you attempt invalid operations with said objects (however, we cannot infer whether an operation is valid with respect to your data!).
Example Snippet:
sdf = StreamingDataFrame()
sdf = sdf.apply(a_func)
sdf = sdf.filter(another_func)
sdf = sdf.to_topic(topic_obj)
StreamingDataFrame.apply
def apply(func: Union[
ApplyCallback,
ApplyCallbackStateful,
ApplyWithMetadataCallback,
ApplyWithMetadataCallbackStateful,
],
*,
stateful: bool = False,
expand: bool = False,
metadata: bool = False) -> Self
Apply a function to transform the value and return a new value.
The result will be passed downstream as an input value.
Example Snippet:
# This stores a string in state and capitalizes every column with a string value.
# A second apply then keeps only the string value columns (shows non-stateful).
def func(d: dict, state: State):
value = d["store_field"]
if value != state.get("my_store_key"):
state.set("my_store_key") = value
return {k: v.upper() if isinstance(v, str) else v for k, v in d.items()}
sdf = StreamingDataFrame()
sdf = sdf.apply(func, stateful=True)
sdf = sdf.apply(lambda d: {k: v for k,v in d.items() if isinstance(v, str)})
Arguments:
func
: a function to applystateful
: ifTrue
, the function will be provided with a second argument of typeState
to perform stateful operations.expand
: if True, expand the returned iterable into individual values downstream. If returned value is not iterable,TypeError
will be raised. Default -False
.metadata
: if True, the callback will receive key, timestamp and headers along with the value. Default -False
.
StreamingDataFrame.update
def update(func: Union[
UpdateCallback,
UpdateCallbackStateful,
UpdateWithMetadataCallback,
UpdateWithMetadataCallbackStateful,
],
*,
stateful: bool = False,
metadata: bool = False) -> Self
Apply a function to mutate value in-place or to perform a side effect
(e.g., printing a value to the console).
The result of the function will be ignored, and the original value will be passed downstream.
This operation occurs in-place, meaning reassignment is entirely OPTIONAL: the
original StreamingDataFrame
is returned for chaining (sdf.update().print()
).
Example Snippet:
# Stores a value and mutates a list by appending a new item to it.
# Also prints to console.
def func(values: list, state: State):
value = values[0]
if value != state.get("my_store_key"):
state.set("my_store_key") = value
values.append("new_item")
sdf = StreamingDataFrame()
sdf = sdf.update(func, stateful=True)
# does not require reassigning
sdf.update(lambda v: v.append(1))
Arguments:
func
: function to update valuestateful
: ifTrue
, the function will be provided with a second argument of typeState
to perform stateful operations.metadata
: if True, the callback will receive key, timestamp and headers along with the value. Default -False
.
Returns:
the updated StreamingDataFrame instance (reassignment NOT required).
StreamingDataFrame.filter
def filter(func: Union[
FilterCallback,
FilterCallbackStateful,
FilterWithMetadataCallback,
FilterWithMetadataCallbackStateful,
],
*,
stateful: bool = False,
metadata: bool = False) -> Self
Filter value using provided function.
If the function returns True-like value, the original value will be passed downstream.
Example Snippet:
# Stores a value and allows further processing only if the value is greater than
# what was previously stored.
def func(d: dict, state: State):
value = d["my_value"]
if value > state.get("my_store_key"):
state.set("my_store_key") = value
return True
return False
sdf = StreamingDataFrame()
sdf = sdf.filter(func, stateful=True)
Arguments:
func
: function to filter valuestateful
: ifTrue
, the function will be provided with second argument of typeState
to perform stateful operations.metadata
: if True, the callback will receive key, timestamp and headers along with the value. Default -False
.
StreamingDataFrame.group_by
def group_by(key: Union[str, Callable[[Any], Any]],
name: Optional[str] = None,
value_deserializer: Optional[DeserializerType] = "json",
key_deserializer: Optional[DeserializerType] = "json",
value_serializer: Optional[SerializerType] = "json",
key_serializer: Optional[SerializerType] = "json") -> Self
"Groups" messages by re-keying them via the provided group_by operation
on their message values.
This enables things like aggregations on messages with non-matching keys.
You can provide a column name (uses the column's value) or a custom function to generate this new key.
.groupby()
can only be performed once per StreamingDataFrame
instance.
NOTE: group_by generates a topic that copies the original topic's settings.
Example Snippet:
# We have customer purchase events where the message key is the "store_id",
# but we want to calculate sales per customer (by "customer_account_id").
def func(d: dict, state: State):
current_total = state.get("customer_sum", 0)
new_total = current_total + d["customer_spent"]
state.set("customer_sum", new_total)
d["customer_total"] = new_total
return d
sdf = StreamingDataFrame()
sdf = sdf.group_by("customer_account_id")
sdf = sdf.apply(func, stateful=True)
Arguments:
key
: how the new key should be generated from the message value; requires a column name (string) or a callable that takes the message value.name
: a name for the op (must be unique per group-by), required ifkey
is a custom callable.value_deserializer
: a deserializer type for values; default - JSONkey_deserializer
: a deserializer type for keys; default - JSONvalue_serializer
: a serializer type for values; default - JSONkey_serializer
: a serializer type for keys; default - JSON
Returns:
a clone with this operation added (assign to keep its effect).
StreamingDataFrame.contains
Check if the key is present in the Row value.
Example Snippet:
# Add new column 'has_column' which contains a boolean indicating
# the presence of 'column_x'
sdf = StreamingDataFrame()
sdf['has_column'] = sdf.contains('column_x')
Arguments:
key
: a column name to check.
Returns:
a Column object that evaluates to True if the key is present or False otherwise.
StreamingDataFrame.to_topic
Produce current value to a topic. You can optionally specify a new key.
This operation occurs in-place, meaning reassignment is entirely OPTIONAL: the
original StreamingDataFrame
is returned for chaining (sdf.update().print()
).
Example Snippet:
from quixstreams import Application
# Produce to two different topics, changing the key for one of them.
app = Application()
input_topic = app.topic("input_x")
output_topic_0 = app.topic("output_a")
output_topic_1 = app.topic("output_b")
sdf = app.dataframe(input_topic)
sdf = sdf.to_topic(output_topic_0)
# does not require reassigning
sdf.to_topic(output_topic_1, key=lambda data: data["a_field"])
Arguments:
topic
: instance ofTopic
key
: a callable to generate a new message key, optional. If passed, the return type of this callable must be serializable bykey_serializer
defined for this Topic object. By default, the current message key will be used.
Returns:
the updated StreamingDataFrame instance (reassignment NOT required).
StreamingDataFrame.set_timestamp
Set a new timestamp based on the current message value and its metadata.
The new timestamp will be used in windowed aggregations and when producing messages to the output topics.
The new timestamp must be in milliseconds to conform Kafka requirements.
Example Snippet:
from quixstreams import Application
app = Application()
input_topic = app.topic("data")
sdf = app.dataframe(input_topic)
# Updating the record's timestamp based on the value
sdf = sdf.set_timestamp(lambda value, key, timestamp, headers: value['new_timestamp'])
Arguments:
func
: callable accepting the current value, key, timestamp, and headers. It's expected to return a new timestamp as integer in milliseconds.
Returns:
a new StreamingDataFrame instance
StreamingDataFrame.set_headers
def set_headers(
func: Callable[
[Any, Any, int, List[Tuple[str, HeaderValue]]],
Collection[Tuple[str, HeaderValue]],
]
) -> Self
Set new message headers based on the current message value and metadata.
The new headers will be used when producing messages to the output topics.
The provided callback must accept value, key, timestamp, and headers, and return a new collection of (header, value) tuples.
Example Snippet:
from quixstreams import Application
app = Application()
input_topic = app.topic("data")
sdf = app.dataframe(input_topic)
# Updating the record's headers based on the value and metadata
sdf = sdf.set_headers(lambda value, key, timestamp, headers: [('id', value['id'])])
Arguments:
func
: callable accepting the current value, key, timestamp, and headers. It's expected to return a new set of headers as a collection of (header, value) tuples.
Returns:
a new StreamingDataFrame instance
StreamingDataFrame.print
Print out the current message value (and optionally, the message metadata) to
stdout (console) (like the built-in print
function).
Can also output a more dict-friendly format with pretty=True
.
This operation occurs in-place, meaning reassignment is entirely OPTIONAL: the
original StreamingDataFrame
is returned for chaining (sdf.update().print()
).
NOTE: prints the current (edited) values, not the original values.
Example Snippet:
from quixstreams import Application
app = Application()
input_topic = app.topic("data")
sdf = app.dataframe(input_topic)
sdf["edited_col"] = sdf["orig_col"] + "edited"
# print the updated message value with the newly added column
sdf.print()
Arguments:
pretty
: Whether to use "pprint" formatting, which uses new-lines and indents for easier console reading (but might be worse for log parsing).metadata
: Whether to additionally print the key, timestamp, and headers
Returns:
the updated StreamingDataFrame instance (reassignment NOT required).
StreamingDataFrame.compose
def compose(
sink: Optional[Callable[[Any, Any, int, Any], None]] = None
) -> Dict[str, VoidExecutor]
Compose all functions of this StreamingDataFrame into one big closure.
Closures are more performant than calling all the functions in the
StreamingDataFrame
one-by-one.
Generally not required by users; the quixstreams.app.Application
class will
do this automatically.
Example Snippet:
from quixstreams import Application
sdf = app.dataframe()
sdf = sdf.apply(apply_func)
sdf = sdf.filter(filter_func)
sdf = sdf.compose()
result_0 = sdf({"my": "record"})
result_1 = sdf({"other": "record"})
Arguments:
sink
: callable to accumulate the results of the execution, optional.
Returns:
a function that accepts "value" and returns a result of StreamingDataFrame
StreamingDataFrame.test
def test(value: Any,
key: Any,
timestamp: int,
headers: Optional[Any] = None,
ctx: Optional[MessageContext] = None,
topic: Optional[Topic] = None) -> List[Any]
A shorthand to test StreamingDataFrame
with provided value
and MessageContext
.
Arguments:
value
: value to pass throughStreamingDataFrame
key
: key to pass throughStreamingDataFrame
timestamp
: timestamp to pass throughStreamingDataFrame
ctx
: instance ofMessageContext
, optional. Provide it if the StreamingDataFrame instance callsto_topic()
, has stateful functions or windows. Default -None
.topic
: optionally, a topic branch to test with
Returns:
result of StreamingDataFrame
StreamingDataFrame.tumbling_window
def tumbling_window(duration_ms: Union[int, timedelta],
grace_ms: Union[int, timedelta] = 0,
name: Optional[str] = None) -> TumblingWindowDefinition
Create a tumbling window transformation on this StreamingDataFrame.
Tumbling windows divide time into fixed-sized, non-overlapping windows.
They allow performing stateful aggregations like sum
, reduce
, etc.
on top of the data and emit results downstream.
Notes:
- The timestamp of the aggregation result is set to the window start timestamp.
- Every window is grouped by the current Kafka message key.
- Messages with
None
key will be ignored. - The time windows always use the current event time.
Example Snippet:
app = Application()
sdf = app.dataframe(...)
sdf = (
# Define a tumbling window of 60s and grace period of 10s
sdf.tumbling_window(
duration_ms=timedelta(seconds=60), grace_ms=timedelta(seconds=10.0)
)
# Specify the aggregation function
.sum()
# Specify how the results should be emitted downstream.
# "current()" will emit results as they come for each updated window,
# possibly producing multiple messages per key-window pair
# "final()" will emit windows only when they are closed and cannot
# receive any updates anymore.
.current()
)
Arguments:
duration_ms
: The length of each window. Can be specified as either anint
representing milliseconds or atimedelta
object.NOTE:
timedelta
objects will be rounded to the closest millisecond value.grace_ms
: The grace period for data arrival. It allows late-arriving data (data arriving after the window has theoretically closed) to be included in the window. Can be specified as either anint
representing milliseconds or as atimedelta
object.NOTE:
timedelta
objects will be rounded to the closest millisecond value.name
: The unique identifier for the window. If not provided, it will be automatically generated based on the window's properties.
Returns:
TumblingWindowDefinition
instance representing the tumbling window
configuration.
This object can be further configured with aggregation functions
like sum
, count
, etc. applied to the StreamingDataFrame.
StreamingDataFrame.hopping_window
def hopping_window(duration_ms: Union[int, timedelta],
step_ms: Union[int, timedelta],
grace_ms: Union[int, timedelta] = 0,
name: Optional[str] = None) -> HoppingWindowDefinition
Create a hopping window transformation on this StreamingDataFrame.
Hopping windows divide the data stream into overlapping windows based on time.
The overlap is controlled by the step_ms
parameter.
They allow performing stateful aggregations like sum
, reduce
, etc.
on top of the data and emit results downstream.
Notes:
- The timestamp of the aggregation result is set to the window start timestamp.
- Every window is grouped by the current Kafka message key.
- Messages with
None
key will be ignored. - The time windows always use the current event time.
Example Snippet:
app = Application()
sdf = app.dataframe(...)
sdf = (
# Define a hopping window of 60s with step 30s and grace period of 10s
sdf.hopping_window(
duration_ms=timedelta(seconds=60),
step_ms=timedelta(seconds=30),
grace_ms=timedelta(seconds=10)
)
# Specify the aggregation function
.sum()
# Specify how the results should be emitted downstream.
# "current()" will emit results as they come for each updated window,
# possibly producing multiple messages per key-window pair
# "final()" will emit windows only when they are closed and cannot
# receive any updates anymore.
.current()
)
Arguments:
duration_ms
: The length of each window. It defines the time span for which each window aggregates data. Can be specified as either anint
representing milliseconds or atimedelta
object.NOTE:
timedelta
objects will be rounded to the closest millisecond value.step_ms
: The step size for the window. It determines how much each successive window moves forward in time. Can be specified as either anint
representing milliseconds or atimedelta
object.NOTE:
timedelta
objects will be rounded to the closest millisecond value.grace_ms
: The grace period for data arrival. It allows late-arriving data to be included in the window, even if it arrives after the window has theoretically moved forward. Can be specified as either anint
representing milliseconds or atimedelta
object.NOTE:
timedelta
objects will be rounded to the closest millisecond value.name
: The unique identifier for the window. If not provided, it will be automatically generated based on the window's properties.
Returns:
HoppingWindowDefinition
instance representing the hopping
window configuration.
This object can be further configured with aggregation functions
like sum
, count
, etc. and applied to the StreamingDataFrame.
StreamingDataFrame.sliding_window
def sliding_window(duration_ms: Union[int, timedelta],
grace_ms: Union[int, timedelta] = 0,
name: Optional[str] = None) -> SlidingWindowDefinition
Create a sliding window transformation on this StreamingDataFrame.
Sliding windows continuously evaluate the stream with a fixed step of 1 ms allowing for overlapping, but not redundant windows of a fixed size.
Sliding windows are similar to hopping windows with step_ms set to 1, but are siginificantly more perforant.
They allow performing stateful aggregations like sum
, reduce
, etc.
on top of the data and emit results downstream.
Notes:
- The timestamp of the aggregation result is set to the window start timestamp.
- Every window is grouped by the current Kafka message key.
- Messages with
None
key will be ignored. - The time windows always use the current event time.
- Windows are inclusive on both the start end end time.
- Every window contains a distinct aggregation.
Example Snippet:
app = Application()
sdf = app.dataframe(...)
sdf = (
# Define a sliding window of 60s with a grace period of 10s
sdf.sliding_window(
duration_ms=timedelta(seconds=60),
grace_ms=timedelta(seconds=10)
)
# Specify the aggregation function
.sum()
# Specify how the results should be emitted downstream.
# "current()" will emit results as they come for each updated window,
# possibly producing multiple messages per key-window pair
# "final()" will emit windows only when they are closed and cannot
# receive any updates anymore.
.current()
)
Arguments:
duration_ms
: The length of each window. Can be specified as either anint
representing milliseconds or atimedelta
object.NOTE:
timedelta
objects will be rounded to the closest millisecond value.grace_ms
: The grace period for data arrival. It allows late-arriving data (data arriving after the window has theoretically closed) to be included in the window. Can be specified as either anint
representing milliseconds or as atimedelta
object.NOTE:
timedelta
objects will be rounded to the closest millisecond value.name
: The unique identifier for the window. If not provided, it will be automatically generated based on the window's properties.
Returns:
SlidingWindowDefinition
instance representing the sliding window
configuration.
This object can be further configured with aggregation functions
like sum
, count
, etc. applied to the StreamingDataFrame.
StreamingDataFrame.drop
Drop column(s) from the message value (value must support del
, like a dict).
This operation occurs in-place, meaning reassignment is entirely OPTIONAL: the
original StreamingDataFrame
is returned for chaining (sdf.update().print()
).
Example Snippet:
# Remove columns "x" and "y" from the value.
# This would transform {"x": 1, "y": 2, "z": 3} to {"z": 3}
sdf = StreamingDataFrame()
sdf.drop(["x", "y"])
Arguments:
columns
: a single column name or a list of names, where names arestr
errors
: If "ignore", suppress error and only existing labels are dropped. Default -"raise"
.
Returns:
a new StreamingDataFrame instance
StreamingDataFrame.sink
Sink the processed data to the specified destination.
Internally, each processed record is added to a sink, and the sinks are flushed on each checkpoint. The offset will be committed only if all the sinks for all topic partitions are flushed successfully.
Additionally, Sinks may signal the backpressure to the application (e.g., when the destination is rate-limited). When this happens, the application will pause the corresponding topic partition and resume again after the timeout. The backpressure handling and timeouts are defined by the specific sinks.
Note: sink()
is a terminal operation - it cannot receive any additional
operations, but branches can still be generated from its originating SDF.
quixstreams.dataframe.series
StreamingSeries
StreamingSeries
are typically generated by StreamingDataframes
when getting
elements from, or performing certain operations on, a StreamingDataframe
,
thus acting as a representation of "column" value.
They share some operations with the StreamingDataframe
, but also provide some
additional functionality.
Most column value operations are handled by this class, and StreamingSeries
can
generate other StreamingSeries
as a result of said operations.
What it Does:
- Allows ways to do simple operations with dataframe "column"/dictionary values:
- Basic ops like add, subtract, modulo, etc.
- Enables comparisons/inequalities:
- Greater than, equals, etc.
- and/or, is/not operations
- Can check for existence of columns in
StreamingDataFrames
- Enables chaining of various operations together
How to Use:
For the most part, you may not even notice this class exists!
They will naturally be created as a result of typical StreamingDataFrame
use.
Auto-complete should help you with valid methods and type-checking should alert
you to invalid operations between StreamingSeries
.
In general, any typical Pands dataframe operation between columns should be valid
with StreamingSeries
, and you shouldn't have to think about them explicitly.
Example Snippet:
# Random methods for example purposes. More detailed explanations found under
# various methods or in the docs folder.
sdf = StreamingDataFrame()
sdf = sdf["column_a"].apply(a_func).apply(diff_func, stateful=True)
sdf["my_new_bool_field"] = sdf["column_b"].contains("this_string")
sdf["new_sum_field"] = sdf["column_c"] + sdf["column_d"] + 2
sdf = sdf[["column_a"] & (sdf["new_sum_field"] >= 10)]
StreamingSeries.from_apply_callback
Create a StreamingSeries from a function.
The provided function will be wrapped into Apply
Arguments:
func
: a function to applysdf_id
: the id of the callingSDF
.
Returns:
instance of StreamingSeries
StreamingSeries.apply
Add a callable to the execution list for this series.
The provided callable should accept a single argument, which will be its input. The provided callable should similarly return one output, or None
They can be chained together or included with other operations.
Example Snippet:
# The `StreamingSeries` are generated when `sdf["COLUMN_NAME"]` is called.
# This stores a string in state and capitalizes the column value; the result is
# assigned to a new column.
# Another apply converts a str column to an int, assigning it to a new column.
def func(value: str, state: State):
if value != state.get("my_store_key"):
state.set("my_store_key") = value
return v.upper()
sdf = StreamingDataFrame()
sdf["new_col"] = sdf["a_column"]["nested_dict_key"].apply(func, stateful=True)
sdf["new_col_2"] = sdf["str_col"].apply(lambda v: int(v)) + sdf["str_col2"] + 2
Arguments:
func
: a callable with one argument and one output
Returns:
a new StreamingSeries
with the new callable added
StreamingSeries.compose_returning
Compose a list of functions from this StreamingSeries and its parents into one
big closure that always returns the transformed record.
This closure is to be used to execute the functions in the stream and to get the result of the transformations.
Stream may only contain simple "apply" functions to be able to compose itself into a returning function.
Returns:
a callable accepting value, key and timestamp and returning a tuple "(value, key, timestamp)
StreamingSeries.compose
Compose all functions of this StreamingSeries into one big closure.
Generally not required by users; the quixstreams.app.Application
class will
do this automatically.
Example Snippet:
from quixstreams import Application
app = Application(...)
sdf = app.dataframe()
sdf = sdf["column_a"].apply(apply_func)
sdf = sdf["column_b"].contains(filter_func)
sdf = sdf.compose()
result_0 = sdf({"my": "record"})
result_1 = sdf({"other": "record"})
Arguments:
sink
: callable to accumulate the results of the execution.
Raises:
ValueError
: if disallowed functions are present in the tree of underlyingStream
.
Returns:
a callable accepting value, key and timestamp and returning None
StreamingSeries.test
def test(value: Any,
key: Any,
timestamp: int,
headers: Optional[Any] = None,
ctx: Optional[MessageContext] = None) -> Any
A shorthand to test StreamingSeries
with provided value
and MessageContext
.
Arguments:
value
: value to pass throughStreamingSeries
ctx
: instance ofMessageContext
, optional. Provide it if the StreamingSeries instance has functions callingget_current_key()
. Default -None
.
Returns:
result of StreamingSeries
StreamingSeries.isin
Check if series value is in "other".
Same as "StreamingSeries in other".
Runtime result will be a bool
.
Example Snippet:
from quixstreams import Application
# Check if "str_column" is contained in a column with a list of strings and
# assign the resulting `bool` to a new column: "has_my_str".
sdf = app.dataframe()
sdf["has_my_str"] = sdf["str_column"].isin(sdf["column_with_list_of_strs"])
Arguments:
other
: a container to check
Returns:
new StreamingSeries
StreamingSeries.contains
Check if series value contains "other"
Same as "other in StreamingSeries".
Runtime result will be a bool
.
Example Snippet:
from quixstreams import Application
# Check if "column_a" contains "my_substring" and assign the resulting
# `bool` to a new column: "has_my_substr"
sdf = app.dataframe()
sdf["has_my_substr"] = sdf["column_a"].contains("my_substring")
Arguments:
other
: object to check
Returns:
new StreamingSeries
StreamingSeries.is_
Check if series value refers to the same object as other
Runtime result will be a bool
.
Example Snippet:
# Check if "column_a" is the same as "column_b" and assign the resulting `bool`
# to a new column: "is_same"
from quixstreams import Application
sdf = app.dataframe()
sdf["is_same"] = sdf["column_a"].is_(sdf["column_b"])
Arguments:
other
: object to check for "is"
Returns:
new StreamingSeries
StreamingSeries.isnot
Check if series value does not refer to the same object as other
Runtime result will be a bool
.
Example Snippet:
from quixstreams import Application
# Check if "column_a" is the same as "column_b" and assign the resulting `bool`
# to a new column: "is_not_same"
sdf = app.dataframe()
sdf["is_not_same"] = sdf["column_a"].isnot(sdf["column_b"])
Arguments:
other
: object to check for "is_not"
Returns:
new StreamingSeries
StreamingSeries.isnull
Check if series value is None.
Runtime result will be a bool
.
Example Snippet:
from quixstreams import Application
# Check if "column_a" is null and assign the resulting `bool` to a new column:
# "is_null"
sdf = app.dataframe()
sdf["is_null"] = sdf["column_a"].isnull()
Returns:
new StreamingSeries
StreamingSeries.notnull
Check if series value is not None.
Runtime result will be a bool
.
Example Snippet:
from quixstreams import Application
# Check if "column_a" is not null and assign the resulting `bool` to a new column:
# "is_not_null"
sdf = app.dataframe()
sdf["is_not_null"] = sdf["column_a"].notnull()
Returns:
new StreamingSeries
StreamingSeries.abs
Get absolute value of the series value.
Example Snippet:
from quixstreams import Application
# Get absolute value of "int_col" and add it to "other_int_col".
# Finally, assign the result to a new column: "abs_col_sum".
sdf = app.dataframe()
sdf["abs_col_sum"] = sdf["int_col"].abs() + sdf["other_int_col"]
Returns:
new StreamingSeries
quixstreams.dataframe
quixstreams.dataframe.utils
ensure_milliseconds
Convert timedelta to milliseconds.
If the delta
is not
This function will also round the value to the closest milliseconds in case of
higher precision.
Arguments:
delta
:timedelta
object
Returns:
timedelta value in milliseconds as int
quixstreams.dataframe.exceptions
quixstreams.dataframe.windows.sliding
SlidingWindow
SlidingWindow.process_window
def process_window(
value: Any, timestamp_ms: int, state: WindowedState
) -> tuple[Iterable[WindowResult], Iterable[WindowResult]]
The algorithm is based on the concept that each message is associated with a left and a right window.
Left Window: - Begins at message timestamp - window size - Ends at message timestamp
Right Window: - Begins at message timestamp + 1 ms - Ends at message timestamp + 1 ms + window size
For example, for a window size of 10 and a message A arriving at timestamp 26:
0 10 20 30 40 50 60
----|---------|---------|---------|---------|---------|---------|---> A left window -> |---------||---------| <- right window 16 26 27 37
The algorithm scans backward through the window store: - Starting at: start_time = message timestamp + 1 ms (the right window's start time) - Ending at: start_time = message timestamp - 2 * window size
During this traversal, the algorithm performs the following actions:
- Determine if the right window should be created. If yes, locate the existing aggregation to copy to the new window.
- Determine if the right window of the previous record should be created. If yes, locate the existing aggregation and combine it with the incoming message.
- Locate and update the left window if it exists.
- If the left window does not exist, create it. Locate the existing aggregation and combine it with the incoming message.
- Locate and update all existing windows to which the new message belongs.
quixstreams.dataframe.windows.definitions
FixedTimeWindowDefinition
FixedTimeWindowDefinition.sum
Configure the window to aggregate data by summing up values within
each window period.
Returns:
an instance of FixedTimeWindow
configured to perform sum aggregation.
FixedTimeWindowDefinition.count
Configure the window to aggregate data by counting the number of values
within each window period.
Returns:
an instance of FixedTimeWindow
configured to perform record count.
FixedTimeWindowDefinition.mean
Configure the window to aggregate data by calculating the mean of the values
within each window period.
Returns:
an instance of FixedTimeWindow
configured to calculate the mean
of the values.
FixedTimeWindowDefinition.reduce
def reduce(reducer: Callable[[Any, Any], Any],
initializer: Callable[[Any], Any]) -> "FixedTimeWindow"
Configure the window to perform a custom aggregation using reducer
and initializer
functions.
Example Snippet:
sdf = StreamingDataFrame(...)
# Using "reduce()" to calculate multiple aggregates at once
def reducer(agg: dict, current: int):
aggregated = {
'min': min(agg['min'], current),
'max': max(agg['max'], current),
'count': agg['count'] + 1
}
return aggregated
def initializer(current) -> dict:
return {'min': current, 'max': current, 'count': 1}
window = (
sdf.tumbling_window(duration_ms=1000)
.reduce(reducer=reducer, initializer=initializer)
.final()
)
Arguments:
reducer
: A function that takes two arguments (the accumulated value and a new value) and returns a single value. The returned value will be saved to the state store and sent downstream.initializer
: A function to call for every first element of the window. This function is used to initialize the aggregation within a window.
Returns:
A window configured to perform custom reduce aggregation on the data.
FixedTimeWindowDefinition.max
Configure a window to aggregate the maximum value within each window period.
Returns:
an instance of FixedTimeWindow
configured to calculate the maximum
value within each window period.
FixedTimeWindowDefinition.min
Configure a window to aggregate the minimum value within each window period.
Returns:
an instance of FixedTimeWindow
configured to calculate the maximum
value within each window period.
quixstreams.dataframe.windows
quixstreams.dataframe.windows.time_based
FixedTimeWindow
FixedTimeWindow.final
Apply the window aggregation and return results only when the windows are closed.
The format of returned windows:
{
"start": <window start time in milliseconds>,
"end": <window end time in milliseconds>,
"value: <aggregated window value>,
}
The individual window is closed when the event time (the maximum observed timestamp across the partition) passes its end timestamp + grace period. The closed windows cannot receive updates anymore and are considered final.
NOTE: Windows can be closed only within the same message key. If some message keys appear irregularly in the stream, the latest windows can remain unprocessed until the message the same key is received.
FixedTimeWindow.current
Apply the window transformation to the StreamingDataFrame to return results for each updated window.
The format of returned windows:
{
"start": <window start time in milliseconds>,
"end": <window end time in milliseconds>,
"value: <aggregated window value>,
}
This method processes streaming data and returns results as they come, regardless of whether the window is closed or not.
quixstreams.dataframe.windows.base
get_window_ranges
def get_window_ranges(timestamp_ms: int,
duration_ms: int,
step_ms: Optional[int] = None) -> Deque[Tuple[int, int]]
Get a list of window ranges for the given timestamp.
Arguments:
timestamp_ms
: timestamp in millisecondsduration_ms
: window duration in millisecondsstep_ms
: window step in milliseconds for hopping windows, optional.
Returns:
a list of (
quixstreams.dataframe.base
quixstreams.rowproducer
RowProducer
A producer class that is capable of serializing Rows to bytes and send them to Kafka.
The serialization is performed according to the Topic serialization settings.
Arguments:
broker_address
: Connection settings for Kafka. Accepts string with Kafka broker host and port formatted as<host>:<port>
, or a ConnectionConfig object if authentication is required.extra_config
: A dictionary with additional options that will be passed toconfluent_kafka.Producer
as is. Note: values passed as arguments override values inextra_config
.on_error
: a callback triggered whenRowProducer.produce_row()
orRowProducer.poll()
fail. If producer fails and the callback returns
True, the exception will be logged but not propagated. The default callback logs an exception and returns
False`.flush_timeout
: The time the producer is waiting for all messages to be delivered.transactional
: whether to use Kafka transactions or not. Note this changes which underlyingProducer
class is used.
RowProducer.produce_row
def produce_row(row: Row,
topic: Topic,
key: Optional[Any] = _KEY_UNSET,
partition: Optional[int] = None,
timestamp: Optional[int] = None)
Serialize Row to bytes according to the Topic serialization settings
and produce it to Kafka
If this method fails, it will trigger the provided "on_error" callback.
Arguments:
row
: Row objecttopic
: Topic objectkey
: message key, optionalpartition
: partition number, optionaltimestamp
: timestamp in milliseconds, optional
RowProducer.poll
Polls the producer for events and calls on_delivery
callbacks.
If poll()
fails, it will trigger the provided "on_error" callback
Arguments:
timeout
: timeout in seconds
RowProducer.abort_transaction
Attempt an abort if an active transaction.
Else, skip since it throws an exception if at least one transaction was successfully completed at some point.
This avoids polluting the stack trace in the case where a transaction was not active as expected (because of some other exception already raised) and a cleanup abort is attempted.
NOTE: under normal circumstances a transaction will be open due to how the Checkpoint inits another immediately after committing.
quixstreams.core.stream
quixstreams.core.stream.stream
Stream
Stream.__init__
A base class for all streaming operations.
Stream
is an abstraction of a function pipeline.
Each Stream has a function and a parent (None by default).
When adding new function to the stream, it creates a new Stream
object and
sets "parent" to the previous Stream
to maintain an order of execution.
Streams supports four types of functions:
- "Apply" - generate new values based on a previous one.
The result of an Apply function is passed downstream to the next functions.
If "expand=True" is passed and the function returns an
Iterable
, each item of it will be treated as a separate value downstream. - "Update" - update values in-place. The result of an Update function is always ignored, and its input is passed downstream.
- "Filter" - to filter values from the Stream.
The result of a Filter function is interpreted as boolean.
If it's
True
, the input will be passed downstream. If it'sFalse
, the record will be filtered from the stream. - "Transform" - to transform keys and timestamps along with the values.
"Transform" functions may change the keys and should be used with caution.
The result of the Transform function is passed downstream to the next
functions.
If "expand=True" is passed and the function returns an
Iterable
, each item of it will be treated as a separate value downstream.
To execute the functions on the Stream
, call .compose()
method, and
it will return a closure to execute all the functions accumulated in the Stream
and its parents.
Arguments:
func
: a function to be called on the stream. It is expected to be wrapped into one of "Apply", "Filter", "Update" or "Trasform" fromquixstreams.core.stream.functions
package. Default - "ApplyFunction(lambda value: value)".parent
: a parentStream
Stream.add_filter
def add_filter(func: Union[FilterCallback, FilterWithMetadataCallback],
*,
metadata: bool = False) -> Self
Add a function to filter values from the Stream.
The return value of the function will be interpreted as bool
.
If the function returns False
-like result, the Stream will raise Filtered
exception during execution.
Arguments:
func
: a function to filter values from the streammetadata
: if True, the callback will receive key and timestamp along with the value. Default -False
.
Returns:
a new Stream
derived from the current one
Stream.add_apply
def add_apply(func: Union[
ApplyCallback,
ApplyExpandedCallback,
ApplyWithMetadataCallback,
ApplyWithMetadataExpandedCallback,
],
*,
expand: bool = False,
metadata: bool = False) -> Self
Add an "apply" function to the Stream.
The function is supposed to return a new value, which will be passed further during execution.
Arguments:
func
: a function to generate a new valueexpand
: if True, expand the returned iterable into individual values downstream. If returned value is not iterable,TypeError
will be raised. Default -False
.metadata
: if True, the callback will receive key and timestamp along with the value. Default -False
.
Returns:
a new Stream
derived from the current one
Stream.add_update
def add_update(func: Union[UpdateCallback, UpdateWithMetadataCallback],
*,
metadata: bool = False) -> Self
Add an "update" function to the Stream, that will mutate the input value.
The return of this function will be ignored and its input will be passed downstream.
Arguments:
func
: a function to mutate the valuemetadata
: if True, the callback will receive key and timestamp along with the value. Default -False
.
Returns:
a new Stream derived from the current one
Stream.add_transform
def add_transform(func: Union[TransformCallback, TransformExpandedCallback],
*,
expand: bool = False) -> Self
Add a "transform" function to the Stream, that will mutate the input value.
The callback must accept a value, a key, and a timestamp. It's expected to return a new value, new key and new timestamp.
The result of the callback which will be passed downstream during execution.
Arguments:
func
: a function to mutate the valueexpand
: if True, expand the returned iterable into individual items downstream. If returned value is not iterable,TypeError
will be raised. Default -False
.
Returns:
a new Stream derived from the current one
Stream.diff
Takes the difference between Streams self
and other
based on their last
common parent, and returns a new, independent Stream
that includes only
this difference (the start of the "diff" will have no parent).
It's impossible to calculate a diff when:
- Streams don't have a common parent.
- When the self
Stream already includes all the nodes from
the other
Stream, and the resulting diff is empty.
Arguments:
other
: aStream
to take a diff from.
Raises:
ValueError
: if Streams don't have a common parent, if the diff is empty, or pruning failed.
Returns:
a new independent Stream
instance whose root begins at the diff
Stream.root_path
Return a list of all parent Streams including the node itself.
Can optionally stop at a first encountered split with allow_splits=False
The tree is ordered from parent to child (current node comes last).
Returns:
a list of Stream
objects
Stream.full_tree
Starts at tree root and finds every Stream in the tree (including splits).
Returns:
The collection of all Streams interconnected to this one
Stream.compose
def compose(
allow_filters=True,
allow_expands=True,
allow_updates=True,
allow_transforms=True,
sink: Optional[Callable[[Any, Any, int, Any],
None]] = None) -> VoidExecutor
Generate an "executor" closure by mapping all relatives of this Stream
and
composing their functions together.
The resulting "executor" can be called with a given value, key, timestamp, and headers (i.e. a Kafka message).
By default, executor doesn't return the result of the execution.
To accumulate the results, pass the sink
parameter.
Arguments:
allow_filters
: If False, this function will fail withValueError
if the stream has filter functions in the tree. Default - True.allow_updates
: If False, this function will fail withValueError
if the stream has update functions in the tree. Default - True.allow_expands
: If False, this function will fail withValueError
if the stream has functions with "expand=True" in the tree. Default - True.allow_transforms
: If False, this function will fail withValueError
if the stream has transform functions in the tree. Default - True.sink
: callable to accumulate the results of the execution, optional.
Stream.compose_returning
Compose a list of functions from this Stream
and its parents into one
big closure that always returns the transformed record.
This closure is to be used to execute the functions in the stream and to get the result of the transformations.
Stream may only contain simple "apply" functions to be able to compose itself into a returning function.
quixstreams.core.stream.functions.update
UpdateFunction
Wrap a function into an "Update" function.
The provided function must accept a value, and it's expected to mutate it or to perform some side effect.
The result of the callback is always ignored, and the original input is passed downstream.
UpdateWithMetadataFunction
Wrap a function into an "Update" function.
The provided function must accept a value, a key, and a timestamp. The callback is expected to mutate the value or to perform some side effect with it.
The result of the callback is always ignored, and the original input is passed downstream.
quixstreams.core.stream.functions
quixstreams.core.stream.functions.types
quixstreams.core.stream.functions.utils
pickle_copier
A utility function to copy objects using a "pickle" library.
On average, it's faster than "copy.deepcopy". It accepts an object and returns a callable creating copies of this object.
Arguments:
obj
: an object to copy
quixstreams.core.stream.functions.transform
TransformFunction
Wrap a function into a "Transform" function.
The provided callback must accept a value, a key and a timestamp. It's expected to return a new value, new key and new timestamp.
This function must be used with caution, because it can technically change the key. It's supposed to be used by the library internals and not be a part of the public API.
The result of the callback will always be passed downstream.
quixstreams.core.stream.functions.filter
FilterFunction
Wraps a function into a "Filter" function.
The result of a Filter function is interpreted as boolean.
If it's True
, the input will be return downstream.
If it's False
, the Filtered
exception will be raised to signal that the
value is filtered out.
FilterWithMetadataFunction
Wraps a function into a "Filter" function.
The passed callback must accept value, key, and timestamp, and it's expected to return a boolean-like result.
If the result is True
, the input will be passed downstream.
Otherwise, the value will be filtered out.
quixstreams.core.stream.functions.base
StreamFunction
A base class for all the streaming operations in Quix Streams.
It provides a get_executor
method to return a closure to be called with the input
values.
StreamFunction.get_executor
Returns a wrapper to be called on a value, key, timestamp and headers.
quixstreams.core.stream.functions.apply
ApplyFunction
Wrap a function into "Apply" function.
The provided callback is expected to return a new value based on input, and its result will always be passed downstream.
ApplyWithMetadataFunction
Wrap a function into "Apply" function.
The provided function is expected to accept value, and timestamp and return a new value based on input, and its result will always be passed downstream.
quixstreams.core
quixstreams.processing
quixstreams.processing.context
ProcessingContext
A class to share processing-related objects
between Application
and StreamingDataFrame
instances.
ProcessingContext.store_offset
Store the offset of the processed message to the checkpoint.
Arguments:
topic
: topic namepartition
: partition numberoffset
: message offset
ProcessingContext.init_checkpoint
Initialize a new checkpoint
ProcessingContext.commit_checkpoint
Attempts finalizing the current Checkpoint only if the Checkpoint is "expired",
or force=True
is passed, otherwise do nothing.
To finalize: the Checkpoint will be committed if it has any stored offsets, else just close it. A new Checkpoint is then created.
Arguments:
force
: ifTrue
, commit the Checkpoint before its expiration deadline.
quixstreams.processing.pausing
PausingManager
A class to temporarily pause topic partitions and resume them after the timeout is elapsed.
PausingManager.pause
Pause the topic-partition for a certain period of time.
This method is supposed to be called in case of backpressure from Sinks.
PausingManager.is_paused
Check if the topic-partition is already paused
PausingManager.resume_if_ready
Resume consuming from topic-partitions after the wait period has elapsed.
PausingManager.revoke
Remove partition from the list of paused TPs if it's revoked
quixstreams.sinks.core.influxdb3
InfluxDB3Sink
InfluxDB3Sink.__init__
def __init__(token: str,
host: str,
organization_id: str,
database: str,
measurement: str,
fields_keys: Iterable[str] = (),
tags_keys: Iterable[str] = (),
time_key: Optional[str] = None,
time_precision: WritePrecision = WritePrecision.MS,
include_metadata_tags: bool = False,
batch_size: int = 1000,
enable_gzip: bool = True,
request_timeout_ms: int = 10_000,
debug: bool = False)
A connector to sink processed data to InfluxDB v3.
It batches the processed records in memory per topic partition, converts them to the InfluxDB format, and flushes them to InfluxDB at the checkpoint.
The InfluxDB sink transparently handles backpressure if the destination instance cannot accept more data at the moment (e.g., when InfluxDB returns an HTTP 429 error with the "retry_after" header set). When this happens, the sink will notify the Application to pause consuming from the backpressured topic partition until the "retry_after" timeout elapses.
NOTE: InfluxDB3Sink can accept only dictionaries. If the record values are not dicts, you need to convert them to dicts before sinking.
Arguments:
token
: InfluxDB access tokenhost
: InfluxDB host in format "https://" organization_id
: InfluxDB organization_iddatabase
: database namefields_keys
: a list of keys to be used as "fields" when writing to InfluxDB. If present, it must not overlap with "tags_keys". If empty, the whole record value will be used.NOTE The fields' values can only be strings, floats, integers, or booleans. Default -
()
.tags_keys
: a list of keys to be used as "tags" when writing to InfluxDB. If present, it must not overlap with "fields_keys". These keys will be popped from the value dictionary automatically because InfluxDB doesn't allow the same keys be both in tags and fields. If empty, no tags will be sent.NOTE: InfluxDB client always converts tag values to strings. Default -
()
.time_key
: a key to be used as "time" when writing to InfluxDB. By default, the record timestamp will be used with "ms" time precision. When using a custom key, you may need to adjust thetime_precision
setting to match.time_precision
: a time precision to use when writing to InfluxDB.include_metadata_tags
: if True, includes record's key, topic, and partition as tags. Default -False
.batch_size
: how many records to write to InfluxDB in one request. Note that it only affects the size of one write request, and not the number of records flushed on each checkpoint. Default -1000
.enable_gzip
: if True, enables gzip compression for writes. Default -True
.request_timeout_ms
: an HTTP request timeout in milliseconds. Default -10000
.debug
: if True, print debug logs from InfluxDB client. Default -False
.
quixstreams.sinks.core
quixstreams.sinks.core.csv
CSVSink
CSVSink.__init__
def __init__(path: str,
dialect: str = "excel",
key_serializer: Callable[[Any], str] = str,
value_serializer: Callable[[Any], str] = json.dumps)
A base CSV sink that writes data from all assigned partitions to a single file.
It's best to be used for local debugging.
Column format: (key, value, timestamp, topic, partition, offset)
Arguments:
path
: a path to CSV filedialect
: a CSV dialect to use. It affects quoting and delimiters. See the "csv" module docs for more info. Default -"excel"
.key_serializer
: a callable to convert keys to strings. Default -str
.value_serializer
: a callable to convert values to strings. Default -json.dumps
.
quixstreams.sinks
quixstreams.sinks.community.file.formats.parquet
ParquetFormat
Serializes batches of messages into Parquet format.
This class provides functionality to serialize a SinkBatch
into bytes
in Parquet format using PyArrow. It allows setting the file extension
and compression algorithm used for the Parquet files.
This format does not support appending to existing files.
ParquetFormat.__init__
Initializes the ParquetFormat.
Arguments:
file_extension
: The file extension to use for output files. Defaults to ".parquet".compression
: The compression algorithm to use for Parquet files. Allowed values are "none", "snappy", "gzip", "brotli", "lz4", or "zstd". Defaults to "snappy".
ParquetFormat.file_extension
Returns the file extension used for output files.
Returns:
The file extension as a string.
ParquetFormat.serialize
Serializes a SinkBatch
into bytes in Parquet format.
Each item in the batch is converted into a dictionary with "_timestamp", "_key", and the keys from the message value. If the message key is in bytes, it is decoded to a string.
Missing fields in messages are filled with None
to ensure all rows
have the same columns.
Arguments:
batch
: TheSinkBatch
to serialize.
Returns:
The serialized batch as bytes in Parquet format.
quixstreams.sinks.community.file.formats
quixstreams.sinks.community.file.formats.json
JSONFormat
Serializes batches of messages into JSON Lines format with optional gzip compression.
This class provides functionality to serialize a SinkBatch
into bytes
in JSON Lines format. It supports optional gzip compression and allows
for custom JSON serialization through the dumps
parameter.
This format supports appending to existing files.
JSONFormat.__init__
def __init__(file_extension: str = ".jsonl",
compress: bool = False,
dumps: Optional[Callable[[Any], str]] = None) -> None
Initializes the JSONFormat.
Arguments:
file_extension
: The file extension to use for output files. Defaults to ".jsonl".compress
: IfTrue
, compresses the output using gzip and appends ".gz" to the file extension. Defaults toFalse
.dumps
: A custom function to serialize objects to JSON-formatted strings. If provided, thecompact
option is ignored.
JSONFormat.file_extension
Returns the file extension used for output files.
Returns:
The file extension as a string.
JSONFormat.serialize
Serializes a SinkBatch
into bytes in JSON Lines format.
Each item in the batch is converted into a JSON object with "_timestamp", "_key", and "_value" fields. If the message key is in bytes, it is decoded to a string.
Arguments:
batch
: TheSinkBatch
to serialize.
Returns:
The serialized batch in JSON Lines format, optionally compressed with gzip.
quixstreams.sinks.community.file.formats.base
Format
Base class for formatting batches in file sinks.
This abstract base class defines the interface for batch formatting
in file sinks. Subclasses should implement the file_extension
property and the serialize
method to define how batches are
formatted and saved.
Format.file_extension
Returns the file extension used for output files.
Returns:
The file extension as a string.
Format.supports_append
Indicates if the format supports appending data to an existing file.
Returns:
True if appending is supported, otherwise False.
Format.serialize
Serializes a batch of messages into bytes.
Arguments:
batch
: The batch of messages to serialize.
Returns:
The serialized batch as bytes.
quixstreams.sinks.community.file.sink
InvalidFormatError
Raised when the format is specified incorrectly.
FileSink
Writes batches of data to files on disk using specified formats.
Messages are grouped by their topic and partition. Data from messages with the same topic and partition are saved in the same directory. Each batch of messages is serialized and saved to a file within that directory. Files are named using the batch's starting offset to ensure uniqueness and order.
If append
is set to True
, the sink will attempt to append data to an
existing file rather than creating a new one. This is only supported for
formats that allow appending.
FileSink.__init__
Initializes the FileSink.
Arguments:
output_dir
: The directory where files will be written.format
: The data serialization format to use. This can be either a format name ("json", "parquet") or an instance of aFormat
subclass.append
: IfTrue
, data will be appended to existing files when possible. Note that not all formats support appending. Defaults toFalse
.
Raises:
ValueError
: Ifappend
isTrue
but the specified format does not support appending.
FileSink.write
Writes a batch of data to files on disk, grouping data by topic and partition.
If append
is True
and an existing file is found, data will be appended to
the last file. Otherwise, a new file is created based on the batch's starting
offset.
Arguments:
batch
: The batch of data to write.
quixstreams.sinks.community.file
quixstreams.sinks.community.bigquery
BigQuerySink
BigQuerySink.__init__
def __init__(project_id: str,
location: str,
dataset_id: str,
table_name: str,
service_account_json: Optional[str] = None,
schema_auto_update: bool = True,
ddl_timeout: float = 10.0,
insert_timeout: float = 10.0,
retry_timeout: float = 30.0,
**kwargs)
A connector to sink processed data to Google Cloud BigQuery.
It batches the processed records in memory per topic partition, and flushes them to BigQuery at the checkpoint.
NOTE: BigQuerySink can accept only dictionaries. If the record values are not dicts, you need to convert them to dicts before sinking.
The column names and types are inferred from individual records. Each key in the record's dictionary will be inserted as a column to the resulting BigQuery table.
If the column is not present in the schema, the sink will try to add new nullable columns on the fly with types inferred from individual values.
The existing columns will not be affected.
To disable this behavior, pass schema_auto_update=False
and define the necessary schema upfront.
The minimal schema must define two columns: "timestamp" of type TIMESTAMP, and "__key" with a type of the expected message key.
Arguments:
project_id
: a Google project id.location
: a BigQuery location.dataset_id
: a BigQuery dataset id. If the dataset does not exist, the sink will try to create it.table_name
: BigQuery table name. If the table does not exist, the sink will try to create it with a default schema.service_account_json
: an optional JSON string with service account credentials to connect to BigQuery. The internalgoogle.cloud.bigquery.Client
will use the Application Default Credentials if not provided. See https://cloud.google.com/docs/authentication/provide-credentials-adc for more info. Default -None
.schema_auto_update
: if True, the sink will try to create a dataset and a table if they don't exist. It will also add missing columns on the fly with types inferred from individual values.ddl_timeout
: a timeout for a single DDL operation (adding tables, columns, etc.). Default - 10s.insert_timeout
: a timeout for a single INSERT operation. Default - 10s.retry_timeout
: a total timeout for each request to BigQuery API. During this timeout, a request can be retried according to the client's default retrying policy.kwargs
: Additional keyword arguments passed tobigquery.Client
.
quixstreams.sinks.community
This module contains Sinks developed and maintained by the members of Quix Streams community.
quixstreams.sinks.community.iceberg
AWSIcebergConfig
AWSIcebergConfig.__init__
def __init__(aws_s3_uri: str,
aws_region: Optional[str] = None,
aws_access_key_id: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
aws_session_token: Optional[str] = None)
Configure IcebergSink to work with AWS Glue.
Arguments:
aws_s3_uri
: The S3 URI where the table data will be stored (e.g., 's3://your-bucket/warehouse/').aws_region
: The AWS region for the S3 bucket and Glue catalog.aws_access_key_id
: the AWS access key ID. NOTE: can alternatively set the AWS_ACCESS_KEY_ID environment variable when using AWS Glue.aws_secret_access_key
: the AWS secret access key. NOTE: can alternatively set the AWS_SECRET_ACCESS_KEY environment variable when using AWS Glue.aws_session_token
: a session token (or will be generated for you). NOTE: can alternatively set the AWS_SESSION_TOKEN environment variable when using AWS Glue.
IcebergSink
IcebergSink writes batches of data to an Apache Iceberg table.
The data will by default include the kafka message key, value, and timestamp.
It serializes incoming data batches into Parquet format and appends them to the Iceberg table, updating the table schema as necessary.
Currently, supports Apache Iceberg hosted in:
- AWS
Supported data catalogs:
- AWS Glue
Arguments:
table_name
: The name of the Iceberg table.config
: An IcebergConfig with all the various connection parameters.data_catalog_spec
: data cataloger to use (ex. for AWS Glue, "aws_glue").schema
: The Iceberg table schema. If None, a default schema is used.partition_spec
: The partition specification for the table. If None, a default is used.
Example setup using an AWS-hosted Iceberg with AWS Glue:
from quixstreams import Application
from quixstreams.sinks.community.iceberg import IcebergSink, AWSIcebergConfig
# Configure S3 bucket credentials
iceberg_config = AWSIcebergConfig(
aws_s3_uri="", aws_region="", aws_access_key_id="", aws_secret_access_key=""
)
# Configure the sink to write data to S3 with the AWS Glue catalog spec
iceberg_sink = IcebergSink(
table_name="glue.sink-test",
config=iceberg_config,
data_catalog_spec="aws_glue",
)
app = Application(broker_address='localhost:9092', auto_offset_reset="earliest")
topic = app.topic('sink_topic')
# Do some processing here
sdf = app.dataframe(topic=topic).print(metadata=True)
# Sink results to the IcebergSink
sdf.sink(iceberg_sink)
if __name__ == "__main__":
# Start the application
app.run()
IcebergSink.write
Writes a batch of data to the Iceberg table.
Implements retry logic to handle concurrent write conflicts.
Arguments:
batch
: The batch of data to write.
quixstreams.sinks.community.pubsub
PubSubTopicNotFoundError
Raised when the specified topic does not exist.
PubSubSink
A sink that publishes messages to Google Cloud Pub/Sub.
PubSubSink.__init__
def __init__(project_id: str,
topic_id: str,
service_account_json: Optional[str] = None,
value_serializer: Callable[[Any], Union[bytes, str]] = json.dumps,
key_serializer: Callable[[Any], str] = bytes.decode,
flush_timeout: int = 5,
**kwargs) -> None
Initialize the PubSubSink.
Arguments:
project_id
: GCP project ID.topic_id
: Pub/Sub topic ID.service_account_json
: an optional JSON string with service account credentials to connect to Pub/Sub. The internalPublisherClient
will use the Application Default Credentials if not provided. See https://cloud.google.com/docs/authentication/provide-credentials-adc for more info. Default -None
.value_serializer
: Function to serialize the value to string or bytes (defaults to json.dumps).key_serializer
: Function to serialize the key to string (defaults to bytes.decode).kwargs
: Additional keyword arguments passed to PublisherClient.
PubSubSink.add
def add(value: Any, key: Any, timestamp: int,
headers: list[tuple[str, HeaderValue]], topic: str, partition: int,
offset: int) -> None
Publish a message to Pub/Sub.
PubSubSink.flush
Wait for all publish operations to complete successfully.
quixstreams.sinks.base.sink
BaseSink
This is a base class for all sinks.
Subclass it and implement its methods to create your own sink.
Note that Sinks are currently in beta, and their design may change over time.
BaseSink.flush
This method is triggered by the Checkpoint class when it commits.
You can use flush()
to write the batched data to the destination (in case of
a batching sink), or confirm the delivery of the previously sent messages
(in case of a streaming sink).
If flush() fails, the checkpoint will be aborted.
BaseSink.add
@abc.abstractmethod
def add(value: Any, key: Any, timestamp: int,
headers: List[Tuple[str, HeaderValue]], topic: str, partition: int,
offset: int)
This method is triggered on every new processed record being sent to this sink.
You can use it to accumulate batches of data before sending them outside, or to send results right away in a streaming manner and confirm a delivery later on flush().
BaseSink.on_paused
This method is triggered when the sink is paused due to backpressure, when
the SinkBackpressureError
is raised.
Here you can react to the backpressure events.
BatchingSink
A base class for batching sinks, that need to accumulate the data first before sending it to the external destinatios.
Examples: databases, objects stores, and other destinations where writing every message is not optimal.
It automatically handles batching, keeping batches in memory per topic-partition.
You may subclass it and override the write()
method to implement a custom
batching sink.
BatchingSink.write
This method implements actual writing to the external destination.
It may also raise SinkBackpressureError
if the destination cannot accept new
writes at the moment.
When this happens, the accumulated batch is dropped and the app pauses the
corresponding topic partition.
BatchingSink.add
def add(value: Any, key: Any, timestamp: int,
headers: List[Tuple[str, HeaderValue]], topic: str, partition: int,
offset: int)
Add a new record to in-memory batch.
BatchingSink.flush
Flush an accumulated batch to the destination and drop it afterward.
BatchingSink.on_paused
When the destination is already backpressure, drop the accumulated batch.
quixstreams.sinks.base.batch
SinkBatch
A batch to accumulate processed data by BatchingSink
between the checkpoints.
Batches are created automatically by the implementations of BatchingSink
.
Arguments:
topic
: a topic namepartition
: a partition number
SinkBatch.iter_chunks
Iterate over batch data in chunks of length n. The last batch may be shorter.
quixstreams.sinks.base
quixstreams.sinks.base.exceptions
SinkBackpressureError
An exception to be raised by Sinks during flush() call
to signal a backpressure event to the application.
When raised, the app will drop the accumulated sink batch,
pause the corresponding topic partition for
a timeout specified in retry_after
, and resume it when it's elapsed.
Arguments:
retry_after
: a timeout in seconds to pause fortopic
: a topic name to pausepartition
: a partition number to pause
quixstreams.sinks.base.manager
quixstreams.sinks.base.item
quixstreams.utils
quixstreams.utils.settings
BaseSettings
BaseSettings.as_dict
Dump any non-empty config values as a dictionary.
Arguments:
plaintext_secrets
: whether secret values are plaintext or obscured (***)include
: optional list of fields to be included in the dictionary
Returns:
a dictionary
quixstreams.utils.dicts
dict_values
Recursively unpacks a set of nested dicts to get a flattened list of leaves,
where "leaves" are the first non-dict item.
i.e {"a": {"b": {"c": 1}, "d": 2}, "e": 3} becomes [1, 2, 3]
Arguments:
d
: initially, a dict (with potentially nested dicts)
Returns:
a list with all the leaves of the various contained dicts
quixstreams.utils.json
dumps
Serialize to JSON using orjson
package.
Arguments:
value
: value to serialize to JSON
Returns:
bytes
loads
Deserialize from JSON using orjson
package.
Main differences:
- It returns bytes
- It doesn't allow non-str keys in dictionaries
Arguments:
value
: value to deserialize from
Returns:
object
quixstreams.types
quixstreams.models.timestamps
TimestampType
TIMESTAMP_NOT_AVAILABLE
timestamps not supported by broker
TIMESTAMP_CREATE_TIME
message creation time (or source / producer time)
TIMESTAMP_LOG_APPEND_TIME
broker receive time
MessageTimestamp
Represents a timestamp of incoming Kafka message.
It is made pseudo-immutable (i.e. public attributes don't have setters), and it should not be mutated during message processing.
MessageTimestamp.create
Create a Timestamp object based on data
from confluent_kafka.Message.timestamp()
.
If timestamp type is "TIMESTAMP_NOT_AVAILABLE", the milliseconds are set to None
Arguments:
timestamp_type
: a timestamp type represented as a number Can be one of:- "0" - TIMESTAMP_NOT_AVAILABLE, timestamps not supported by broker.
- "1" - TIMESTAMP_CREATE_TIME, message creation time (or source / producer time).
- "2" - TIMESTAMP_LOG_APPEND_TIME, broker receive time.
milliseconds
: the number of milliseconds since the epoch (UTC).
Returns:
Timestamp object
quixstreams.models
quixstreams.models.messagecontext
MessageContext
An object with Kafka message properties.
It is made pseudo-immutable (i.e. public attributes don't have setters), and it should not be mutated during message processing.
quixstreams.models.types
ConfluentKafkaMessageProto
An interface of confluent_kafka.Message
.
Use it to not depend on exact implementation and simplify testing.
Instances of confluent_kafka.Message
cannot be directly created from Python,
see https://github.com/confluentinc/confluent-kafka-python/issues/1535.
quixstreams.models.serializers.avro
AvroSerializer
AvroSerializer.__init__
def __init__(
schema: Schema,
strict: bool = False,
strict_allow_default: bool = False,
disable_tuple_notation: bool = False,
schema_registry_client_config: Optional[SchemaRegistryClientConfig] = None,
schema_registry_serialization_config: Optional[
SchemaRegistrySerializationConfig] = None)
Serializer that returns data in Avro format.
For more information see fastavro schemaless_writer method.
Arguments:
schema
: The avro schema.strict
: If set to True, an error will be raised if records do not contain exactly the same fields that the schema states. Default -False
strict_allow_default
: If set to True, an error will be raised if records do not contain exactly the same fields that the schema states unless it is a missing field that has a default value in the schema. Default -False
disable_tuple_notation
: If set to True, tuples will not be treated as a special case. Therefore, using a tuple to indicate the type of a record will not work. Default -False
schema_registry_client_config
: If provided, serialization is offloaded to Confluent's AvroSerializer. Default -None
schema_registry_serialization_config
: Additional configuration for Confluent's AvroSerializer. Default -None
NOTE:
schema_registry_client_config
must also be set.
AvroDeserializer
AvroDeserializer.__init__
def __init__(
schema: Optional[Schema] = None,
reader_schema: Optional[Schema] = None,
return_record_name: bool = False,
return_record_name_override: bool = False,
return_named_type: bool = False,
return_named_type_override: bool = False,
handle_unicode_errors: str = "strict",
schema_registry_client_config: Optional[SchemaRegistryClientConfig] = None
)
Deserializer that parses data from Avro.
For more information see fastavro schemaless_reader method.
Arguments:
schema
: The Avro schema.reader_schema
: If the schema has changed since being written then the new schema can be given to allow for schema migration. Default -None
return_record_name
: If true, when reading a union of records, the result will be a tuple where the first value is the name of the record and the second value is the record itself. Default -False
return_record_name_override
: If true, this will modify the behavior of return_record_name so that the record name is only returned for unions where there is more than one record. For unions that only have one record, this option will make it so that the record is returned by itself, not a tuple with the name. Default -False
return_named_type
: If true, when reading a union of named types, the result will be a tuple where the first value is the name of the type and the second value is the record itself NOTE: Using this option will ignore return_record_name and return_record_name_override. Default -False
return_named_type_override
: If true, this will modify the behavior of return_named_type so that the named type is only returned for unions where there is more than one named type. For unions that only have one named type, this option will make it so that the named type is returned by itself, not a tuple with the name. Default -False
handle_unicode_errors
: Should be set to a valid string that can be used in the errors argument of the string decode() function. Default -"strict"
schema_registry_client_config
: If provided, deserialization is offloaded to Confluent's AvroDeserializer. Default -None
quixstreams.models.serializers.schema_registry
SchemaRegistryClientConfig
Configuration required to establish the connection with a Schema Registry.
Arguments:
url
: Schema Registry URL.ssl_ca_location
: Path to CA certificate file used to verify the Schema Registry's private key.ssl_key_location
: Path to the client's private key (PEM) used for authentication.NOTE:
ssl_certificate_location
must also be set.ssl_certificate_location
: Path to the client's public key (PEM) used for authentication.NOTE: May be set without
ssl_key_location
if the private key is stored within the PEM as well.basic_auth_user_info
: Client HTTP credentials in the form ofusername:password
.NOTE: By default, userinfo is extracted from the URL if present.
SchemaRegistrySerializationConfig
Configuration that instructs Serializer how to handle communication with a
Schema Registry.
Arguments:
auto_register_schemas
: If True, automatically register the configured schema with Confluent Schema Registry if it has not previously been associated with the relevant subject (determined via subject.name.strategy). Defaults to True.normalize_schemas
: Whether to normalize schemas, which will transform schemas to have a consistent format, including ordering properties and references.use_latest_version
: Whether to use the latest subject version for serialization.NOTE: There is no check that the latest schema is backwards compatible with the object being serialized. Defaults to False.
subject_name_strategy
: Callable(SerializationContext, str) -> str Defines how Schema Registry subject names are constructed. Standard naming strategies are defined in the confluent_kafka.schema_registry namespace. Defaults to topic_subject_name_strategy.skip_known_types
: Whether or not to skip known types when resolving schema dependencies. Defaults to False.reference_subject_name_strategy
: Defines how Schema Registry subject names for schema references are constructed. Defaults to reference_subject_name_strategy.use_deprecated_format
: Specifies whether the Protobuf serializer should serialize message indexes without zig-zag encoding. This option must be explicitly configured as older and newer Protobuf producers are incompatible. If the consumers of the topic being produced to are using confluent-kafka-python <1.8, then this property must be set to True until all old consumers have been upgraded.
quixstreams.models.serializers
quixstreams.models.serializers.exceptions
IgnoreMessage
Raise this exception from Deserializer.call in order to ignore the processing of the particular message.
quixstreams.models.serializers.quix
QuixDeserializer
Handles Deserialization for any Quix-formatted topic.
Parses JSON data from either TimeseriesData
and EventData
(ignores the rest).
QuixDeserializer.__init__
Arguments:
loads
: function to parse json from bytes. Default - func:quixstreams.utils.json.loads
.
QuixDeserializer.split_values
Each Quix message might contain data for multiple Rows. This property informs the downstream processors about that, so they can expect an Iterable instead of Mapping.
QuixDeserializer.deserialize
Deserialization function for particular data types (Timeseries or EventData).
Arguments:
model_key
: value of "__Q_ModelKey" message headervalue
: deserialized JSON value of the message, list or dict
Returns:
Iterable of dicts
QuixSerializer
QuixSerializer.__init__
Serializer that returns data in json format.
Arguments:
as_legacy
: parse as the legacy format; Default = Truedumps
: a function to serialize objects to json. Default - func:quixstreams.utils.json.dumps
QuixTimeseriesSerializer
Serialize data to JSON formatted according to Quix Timeseries format.
The serializable object must be dictionary, and each item must be of str
, int
,
float
, bytes
or bytearray
type.
Otherwise, the SerializationError
will be raised.
Input:
Output:
{
"Timestamps": [123123123],
"NumericValues": {"a": [1], "b": [1.1]},
"StringValues": {"c": ["string"]},
"BinaryValues": {"d": ["Ynl0ZXM="]},
"TagValues": {"tag1": ["tag"]}
}
QuixEventsSerializer
Serialize data to JSON formatted according to Quix EventData format.
The input value is expected to be a dictionary with the following keys:
- "Id" (type str
, default - "")
- "Value" (type str
, default - ""),
- "Tags" (type dict
, default - {})
NOTE: All the other fields will be ignored.
Input:
Output:
{
"Id": "an_event",
"Value": "any_string",
"Tags": {"tag1": "tag"}},
"Timestamp":1692703362840389000
}
quixstreams.models.serializers.simple_types
BytesDeserializer
A deserializer to bypass bytes without any changes
BytesSerializer
A serializer to bypass bytes without any changes
StringDeserializer
StringDeserializer.__init__
Deserializes bytes to strings using the specified encoding.
Arguments:
codec
: string encoding A wrapper aroundconfluent_kafka.serialization.StringDeserializer
.
IntegerDeserializer
Deserializes bytes to integers.
A wrapper around confluent_kafka.serialization.IntegerDeserializer
.
DoubleDeserializer
Deserializes float to IEEE 764 binary64.
A wrapper around confluent_kafka.serialization.DoubleDeserializer
.
StringSerializer
StringSerializer.__init__
Serializes strings to bytes using the specified encoding.
Arguments:
codec
: string encoding
IntegerSerializer
Serializes integers to bytes
DoubleSerializer
Serializes floats to bytes
quixstreams.models.serializers.protobuf
ProtobufSerializer
ProtobufSerializer.__init__
def __init__(
msg_type: Message,
deterministic: bool = False,
ignore_unknown_fields: bool = False,
schema_registry_client_config: Optional[SchemaRegistryClientConfig] = None,
schema_registry_serialization_config: Optional[
SchemaRegistrySerializationConfig] = None)
Serializer that returns data in protobuf format.
Serialisation from a python dictionary can have a significant performance impact. An alternative is to pass the serializer an object of the msg_type
class.
Arguments:
msg_type
: protobuf message class.deterministic
: If true, requests deterministic serialization of the protobuf, with predictable ordering of map keys Default -False
ignore_unknown_fields
: If True, do not raise errors for unknown fields. Default -False
schema_registry_client_config
: If provided, serialization is offloaded to Confluent's ProtobufSerializer. Default -None
schema_registry_serialization_config
: Additional configuration for Confluent's ProtobufSerializer. Default -None
NOTE:
schema_registry_client_config
must also be set.
ProtobufDeserializer
ProtobufDeserializer.__init__
def __init__(
msg_type: Message,
use_integers_for_enums: bool = False,
preserving_proto_field_name: bool = False,
to_dict: bool = True,
schema_registry_client_config: Optional[SchemaRegistryClientConfig] = None,
schema_registry_serialization_config: Optional[
SchemaRegistrySerializationConfig] = None)
Deserializer that parses protobuf data into a dictionary suitable for a StreamingDataframe.
Deserialisation to a python dictionary can have a significant performance impact. You can disable this behavior using to_dict
, in that case the protobuf message will be used as the StreamingDataframe row value.
Arguments:
msg_type
: protobuf message class.use_integers_for_enums
: If true, use integers instead of enum names. Default -False
preserving_proto_field_name
: If True, use the original proto field names as defined in the .proto file. If False, convert the field names to lowerCamelCase. Default -False
to_dict
: If false, return the protobuf message instead of a dict. Default -True
schema_registry_client_config
: If provided, deserialization is offloaded to Confluent's ProtobufDeserializer. Default -None
schema_registry_serialization_config
: Additional configuration for Confluent's ProtobufDeserializer. Default -None
NOTE:
schema_registry_client_config
must also be set.
quixstreams.models.serializers.json
JSONSerializer
JSONSerializer.__init__
def __init__(
dumps: Callable[[Any], Union[str, bytes]] = default_dumps,
schema: Optional[Mapping] = None,
validator: Optional[Validator] = None,
schema_registry_client_config: Optional[SchemaRegistryClientConfig] = None,
schema_registry_serialization_config: Optional[
SchemaRegistrySerializationConfig] = None)
Serializer that returns data in json format.
Arguments:
dumps
: a function to serialize objects to json. Default - func:quixstreams.utils.json.dumps
schema
: A schema used to validate the data usingjsonschema.Draft202012Validator
. Default -None
validator
: A jsonschema validator used to validate the data. Takes precedences over the schema. Default -None
schema_registry_client_config
: If provided, serialization is offloaded to Confluent's JSONSerializer. Default -None
schema_registry_serialization_config
: Additional configuration for Confluent's JSONSerializer. Default -None
NOTE:
schema_registry_client_config
must also be set.
JSONDeserializer
JSONDeserializer.__init__
def __init__(
loads: Callable[[Union[bytes, bytearray]], Any] = default_loads,
schema: Optional[Mapping] = None,
validator: Optional[Validator] = None,
schema_registry_client_config: Optional[SchemaRegistryClientConfig] = None
)
Deserializer that parses data from JSON
Arguments:
loads
: function to parse json from bytes. Default - func:quixstreams.utils.json.loads
.schema
: A schema used to validate the data usingjsonschema.Draft202012Validator
. Default -None
validator
: A jsonschema validator used to validate the data. Takes precedences over the schema. Default -None
schema_registry_client_config
: If provided, deserialization is offloaded to Confluent's JSONDeserializer. Default -None
quixstreams.models.serializers.base
SerializationContext
Provides additional context for message serialization/deserialization.
Every Serializer
and Deserializer
receives an instance of SerializationContext
Deserializer
Deserializer.__init__
A base class for all Deserializers
Deserializer.split_values
Return True if the deserialized message should be considered as Iterable and each item in it should be processed as a separate message.
Serializer
A base class for all Serializers
Serializer.extra_headers
Informs producer to set additional headers
for the message it will be serializing
Must return a dictionary with headers. Keys must be strings, and values must be strings, bytes or None.
Returns:
dict with headers
quixstreams.models.messages
quixstreams.models.rows
quixstreams.models.topics
quixstreams.models.topics.admin
convert_topic_list
Converts Topic
s to ConfluentTopic
s as required for Confluent's
AdminClient.create_topic()
.
Arguments:
topics
: list ofTopic
s
Returns:
list of confluent_kafka ConfluentTopic
s
TopicAdmin
For performing "admin"-level operations on a Kafka cluster, mostly around topics.
Primarily used to create and inspect topic configurations.
TopicAdmin.__init__
def __init__(broker_address: Union[str, ConnectionConfig],
logger: logging.Logger = logger,
extra_config: Optional[Mapping] = None)
Arguments:
broker_address
: Connection settings for Kafka. Accepts string with Kafka broker host and port formatted as<host>:<port>
, or a ConnectionConfig object if authentication is required.logger
: a Logger instance to attach librdkafka logging toextra_config
: optional configs (generally accepts producer configs)
TopicAdmin.list_topics
Get a list of topics and their metadata from a Kafka cluster
Arguments:
timeout
: response timeout (seconds); Default infinite (-1)
Returns:
a dict of topic names and their metadata objects
TopicAdmin.inspect_topics
A simplified way of getting the topic configurations of the provided topics
from the cluster (if they exist).
Arguments:
topic_names
: a list of topic namestimeout
: response timeout (seconds)NOTE:
timeout
must be >0 here (expects non-neg, and 0 != inf).
Returns:
a dict with topic names and their respective TopicConfig
TopicAdmin.create_topics
Create the given list of topics and confirm they are ready.
Also raises an exception with detailed printout should the creation fail (it ignores issues for a topic already existing).
Arguments:
topics
: a list ofTopic
timeout
: creation acknowledge timeout (seconds)finalize_timeout
: topic finalization timeout (seconds)NOTE:
timeout
must be >0 here (expects non-neg, and 0 != inf).
quixstreams.models.topics.utils
merge_headers
def merge_headers(original: Optional[MessageHeadersTuples],
other: MessageHeadersMapping) -> MessageHeadersTuples
Merge two sets of Kafka message headers, overwriting headers in "origin"
by the values from "other".
Arguments:
original
: original headers as a list of (key, value) tuples.other
: headers to merge as a dictionary.
Returns:
a list of (key, value) tuples.
quixstreams.models.topics.topic
TopicConfig
Represents all kafka-level configuration for a kafka topic.
Generally used by Topic and any topic creation procedures.
Topic
A definition of a Kafka topic.
Typically created with an app = quixstreams.app.Application()
instance via
app.topic()
, and used by quixstreams.dataframe.StreamingDataFrame
instance.
Topic.__init__
def __init__(
name: str,
config: Optional[TopicConfig] = None,
value_deserializer: Optional[DeserializerType] = None,
key_deserializer: Optional[DeserializerType] = BytesDeserializer(),
value_serializer: Optional[SerializerType] = None,
key_serializer: Optional[SerializerType] = BytesSerializer(),
timestamp_extractor: Optional[TimestampExtractor] = None)
Arguments:
name
: topic nameconfig
: topic configs viaTopicConfig
(creation/validation)value_deserializer
: a deserializer type for valueskey_deserializer
: a deserializer type for keysvalue_serializer
: a serializer type for valueskey_serializer
: a serializer type for keystimestamp_extractor
: a callable that returns a timestamp in milliseconds from a deserialized message.
Topic.row_serialize
Serialize Row to a Kafka message structure
Arguments:
row
: Row to serializekey
: message key to serialize
Returns:
KafkaMessage object with serialized values
Topic.row_deserialize
Deserialize incoming Kafka message to a Row.
Arguments:
message
: an object with interface ofconfluent_kafka.Message
Returns:
Row, list of Rows or None if the message is ignored.
quixstreams.models.topics.exceptions
quixstreams.models.topics.manager
affirm_ready_for_create
Validate a list of topics is ready for creation attempt
Arguments:
topics
: list ofTopic
s
TopicManager
The source of all topic management for a Quix Streams Application.
Intended only for internal use by Application.
To create a Topic, use Application.topic() or generate them directly.
TopicManager.__init__
def __init__(topic_admin: TopicAdmin,
consumer_group: str,
timeout: float = 30,
create_timeout: float = 60,
auto_create_topics: bool = True)
Arguments:
topic_admin
: anAdmin
instance (required for some functionality)consumer_group
: the consumer group (of theApplication
)timeout
: response timeout (seconds)create_timeout
: timeout for topic creation
TopicManager.changelog_topics
Note: Topic
s are the changelogs.
returns: the changelog topic dict, {topic_name: {suffix: Topic}}
TopicManager.all_topics
Every registered topic name mapped to its respective Topic
.
returns: full topic dict, {topic_name: Topic}
TopicManager.topic_config
def topic_config(num_partitions: Optional[int] = None,
replication_factor: Optional[int] = None,
extra_config: Optional[dict] = None) -> TopicConfig
Convenience method for generating a TopicConfig
with default settings
Arguments:
num_partitions
: the number of topic partitionsreplication_factor
: the topic replication factorextra_config
: other optional configuration settings
Returns:
a TopicConfig object
TopicManager.topic
def topic(name: str,
value_deserializer: Optional[DeserializerType] = None,
key_deserializer: Optional[DeserializerType] = "bytes",
value_serializer: Optional[SerializerType] = None,
key_serializer: Optional[SerializerType] = "bytes",
config: Optional[TopicConfig] = None,
timestamp_extractor: Optional[TimestampExtractor] = None) -> Topic
A convenience method for generating a Topic
. Will use default config options
as dictated by the TopicManager.
Arguments:
name
: topic namevalue_deserializer
: a deserializer type for valueskey_deserializer
: a deserializer type for keysvalue_serializer
: a serializer type for valueskey_serializer
: a serializer type for keysconfig
: optional topic configurations (for creation/validation)timestamp_extractor
: a callable that returns a timestamp in milliseconds from a deserialized message.
Returns:
Topic object with creation configs
TopicManager.register
Register an already generated :class:quixstreams.models.topics.Topic
to the topic manager.
The topic name and config can be updated by the topic manager.
Arguments:
topic
: The topic to register
TopicManager.repartition_topic
def repartition_topic(operation: str,
topic_name: str,
value_deserializer: Optional[DeserializerType] = "json",
key_deserializer: Optional[DeserializerType] = "json",
value_serializer: Optional[SerializerType] = "json",
key_serializer: Optional[SerializerType] = "json",
timeout: Optional[float] = None) -> Topic
Create an internal repartition topic.
Arguments:
operation
: name of the GroupBy operation (column name or user-defined).topic_name
: name of the topic the GroupBy is sourced from.value_deserializer
: a deserializer type for values; default - JSONkey_deserializer
: a deserializer type for keys; default - JSONvalue_serializer
: a serializer type for values; default - JSONkey_serializer
: a serializer type for keys; default - JSONtimeout
: config lookup timeout (seconds); Default 30
Returns:
Topic
object (which is also stored on the TopicManager)
TopicManager.changelog_topic
Performs all the logic necessary to generate a changelog topic based on a
"source topic" (aka input/consumed topic).
Its main goal is to ensure partition counts of the to-be generated changelog
match the source topic, and ensure the changelog topic is compacted. Also
enforces the serialization type. All Topic
objects generated with this are
stored on the TopicManager.
If source topic already exists, defers to the existing topic settings, else
uses the settings as defined by the Topic
(and its defaults) as generated
by the TopicManager
.
In general, users should NOT need this; an Application knows when/how to
generate changelog topics. To turn off changelogs, init an Application with
"use_changelog_topics"=False
.
Arguments:
topic_name
: name of consumed topic (app input topic)NOTE: normally contain any prefixes added by TopicManager.topic()
store_name
: name of the store this changelog belongs to (default, rolling10s, etc.)timeout
: config lookup timeout (seconds); Default 30
Returns:
Topic
object (which is also stored on the TopicManager)
TopicManager.create_topics
def create_topics(topics: List[Topic],
timeout: Optional[float] = None,
create_timeout: Optional[float] = None)
Creates topics via an explicit list of provided Topics
.
Exists as a way to manually specify what topics to create; otherwise,
create_all_topics()
is generally simpler.
Arguments:
topics
: list ofTopic
stimeout
: creation acknowledge timeout (seconds); Default 30create_timeout
: topic finalization timeout (seconds); Default 60
TopicManager.create_all_topics
A convenience method to create all Topic objects stored on this TopicManager.
If auto_create_topics
is set to False no topic will be created.
Arguments:
timeout
: creation acknowledge timeout (seconds); Default 30create_timeout
: topic finalization timeout (seconds); Default 60
TopicManager.validate_all_topics
Validates all topics exist and changelogs have correct topic and rep factor.
Issues are pooled and raised as an Exception once inspections are complete.
quixstreams.state.rocksdb.windowed.store
WindowedRocksDBStore
RocksDB-based windowed state store.
It keeps track of individual store partitions and provides access to the partitions' transactions.
WindowedRocksDBStore.__init__
def __init__(
name: str,
topic: str,
base_dir: str,
changelog_producer_factory: Optional[ChangelogProducerFactory] = None,
options: Optional[RocksDBOptionsType] = None)
Arguments:
name
: a unique store nametopic
: a topic name for this storebase_dir
: path to a directory with the statechangelog_producer_factory
: a ChangelogProducerFactory instance if using changelogsoptions
: RocksDB options. IfNone
, the default options will be used.
quixstreams.state.rocksdb.windowed.partition
WindowedRocksDBStorePartition
A base class to access windowed state in RocksDB.
It represents a single RocksDB database.
Besides the data, it keeps track of the latest observed timestamp and stores the expiration index to delete expired windows.
Arguments:
path
: an absolute path to the RocksDB folderoptions
: RocksDB options. IfNone
, the default options will be used.
quixstreams.state.rocksdb.windowed.metadata
quixstreams.state.rocksdb.windowed.transaction
WindowedRocksDBPartitionTransaction
WindowedRocksDBPartitionTransaction.expire_windows
def expire_windows(max_start_time: int,
prefix: bytes,
delete: bool = True) -> list[tuple[tuple[int, int], Any]]
Get all expired windows from RocksDB up to the specified max_start_time
timestamp.
This method marks the latest found window as expired in the expiration index, so consecutive calls may yield different results for the same "latest timestamp".
How it works: - First, it checks the expiration cache for the start time of the last expired window for the current prefix. If found, this value helps reduce the search space and prevents returning previously expired windows. - Next, it iterates over window segments and identifies the windows that should be marked as expired. - Finally, it updates the expiration cache with the start time of the latest windows found.
Arguments:
max_start_time
: The timestamp up to which windows are considered expired, inclusive.prefix
: The key prefix for filtering windows.delete
: If True, expired windows will be deleted.
Returns:
A sorted list of tuples in the format ((start, end), value)
.
WindowedRocksDBPartitionTransaction.delete_windows
Delete windows from RocksDB up to the specified max_start_time
timestamp.
This method removes all window entries that have a start time less than or equal to the given
max_start_time
. It ensures that expired data is cleaned up efficiently without affecting
unexpired windows.
How it works:
- It retrieves the start time of the last deleted window for the given prefix from the
deletion index. This minimizes redundant scans over already deleted windows.
- It iterates over the windows starting from the last deleted timestamp up to the max_start_time
.
- Each window within this range is deleted from the database.
- After deletion, it updates the deletion index with the start time of the latest window
that was deleted to keep track of progress.
Arguments:
max_start_time
: The timestamp up to which windows should be deleted, inclusive.prefix
: The key prefix used to identify and filter relevant windows.
WindowedRocksDBPartitionTransaction.get_windows
def get_windows(start_from_ms: int,
start_to_ms: int,
prefix: bytes,
backwards: bool = False) -> list[tuple[tuple[int, int], Any]]
Get all windows that start between "start_from_ms" and "start_to_ms"
within the specified prefix.
This function also checks the update cache for any updates not yet committed to RocksDB.
Arguments:
start_from_ms
: The minimal window start time, exclusive.start_to_ms
: The maximum window start time, inclusive.prefix
: The key prefix for filtering windows.backwards
: If True, yields windows in reverse order.
Returns:
A sorted list of tuples in the format ((start, end), value)
.
quixstreams.state.rocksdb.windowed
quixstreams.state.rocksdb.windowed.serialization
parse_window_key
Parse the window key from Rocksdb into (message_key, start, end) structure.
Expected window key format:
Arguments:
key
: a key from Rocksdb
Returns:
a tuple with message key, start timestamp, end timestamp
encode_window_key
Encode window start and end timestamps into bytes of the following format:
<start>|<end>
Encoding window keys this way make them sortable in RocksDB within the same prefix.
Arguments:
start_ms
: window start in millisecondsend_ms
: window end in milliseconds
Returns:
window timestamps as bytes
encode_window_prefix
Encode window prefix and start time to iterate over keys in RocksDB
Format:
<prefix>|<start>
Arguments:
prefix
: transaction prefixstart_ms
: window start time in milliseconds
Returns:
bytes
quixstreams.state.rocksdb.windowed.state
WindowedTransactionState
WindowedTransactionState.__init__
A windowed state to be provided into StreamingDataFrame
window functions.
Arguments:
transaction
: instance ofWindowedRocksDBPartitionTransaction
WindowedTransactionState.get_window
Get the value of the window defined by start
and end
timestamps
if the window is present in the state, else default
Arguments:
start_ms
: start of the window in millisecondsend_ms
: end of the window in millisecondsdefault
: default value to return if the key is not found
Returns:
value or None if the key is not found and default
is not provided
WindowedTransactionState.update_window
def update_window(start_ms: int,
end_ms: int,
value: Any,
timestamp_ms: int,
window_timestamp_ms: Optional[int] = None) -> None
Set a value for the window.
This method will also update the latest observed timestamp in state partition
using the provided timestamp
.
Arguments:
start_ms
: start of the window in millisecondsend_ms
: end of the window in millisecondsvalue
: value of the windowtimestamp_ms
: current message timestamp in millisecondswindow_timestamp_ms
: arbitrary timestamp stored with the window value
WindowedTransactionState.get_latest_timestamp
Get the latest observed timestamp for the current message key.
Use this timestamp to determine if the arriving event is late and should be discarded from the processing.
Returns:
latest observed event timestamp in milliseconds
WindowedTransactionState.expire_windows
Get all expired windows from RocksDB up to the specified max_start_time
timestamp.
This method marks the latest found window as expired in the expiration index, so consecutive calls may yield different results for the same "latest timestamp".
Arguments:
max_start_time
: The timestamp up to which windows are considered expired, inclusive.delete
: If True, expired windows will be deleted.
Returns:
A sorted list of tuples in the format ((start, end), value)
.
WindowedTransactionState.get_windows
def get_windows(start_from_ms: int,
start_to_ms: int,
backwards: bool = False) -> list[tuple[tuple[int, int], Any]]
Get all windows that start between "start_from_ms" and "start_to_ms".
Arguments:
start_from_ms
: The minimal window start time, exclusive.start_to_ms
: The maximum window start time, inclusive.backwards
: If True, yields windows in reverse order.
Returns:
A sorted list of tuples in the format ((start, end), value)
.
WindowedTransactionState.delete_windows
Delete windows from RocksDB up to the specified max_start_time
timestamp.
This method removes all window entries that have a start time less than or equal to the given
max_start_time
. It ensures that expired data is cleaned up efficiently without affecting
unexpired windows.
Arguments:
max_start_time
: The timestamp up to which windows should be deleted, inclusive.
quixstreams.state.rocksdb.options
RocksDBOptions
RocksDB database options.
Arguments:
dumps
: function to dump data to JSONloads
: function to load data from JSONopen_max_retries
: number of times to retry opening the database if it's locked by another process. To disable retrying, pass 0open_retry_backoff
: number of seconds to wait between each retry. Please seerocksdict.Options
for a complete description of other options.
RocksDBOptions.to_options
Convert parameters to rocksdict.Options
Returns:
instance of rocksdict.Options
quixstreams.state.rocksdb.store
RocksDBStore
RocksDB-based state store.
It keeps track of individual store partitions and provides access to the partitions' transactions.
RocksDBStore.__init__
def __init__(
name: str,
topic: str,
base_dir: str,
changelog_producer_factory: Optional[ChangelogProducerFactory] = None,
options: Optional[options_type] = None)
Arguments:
name
: a unique store nametopic
: a topic name for this storebase_dir
: path to a directory with the statechangelog_producer_factory
: a ChangelogProducerFactory instance if using changelogsoptions
: RocksDB options. IfNone
, the default options will be used.
quixstreams.state.rocksdb.partition
RocksDBStorePartition
A base class to access state in RocksDB.
It represents a single RocksDB database.
Responsibilities: 1. Managing access to the RocksDB instance 2. Creating transactions to interact with data 3. Flushing WriteBatches to the RocksDB
It opens the RocksDB on __init__
. If the db is locked by another process,
it will retry according to open_max_retries
and open_retry_backoff
options.
Arguments:
path
: an absolute path to the RocksDB folderoptions
: RocksDB options. IfNone
, the default options will be used.
RocksDBStorePartition.write
def write(cache: PartitionTransactionCache,
processed_offset: Optional[int],
changelog_offset: Optional[int],
batch: Optional[WriteBatch] = None)
Write data to RocksDB
Arguments:
cache
: The modified dataprocessed_offset
: The offset processed to generate the data.changelog_offset
: The changelog message offset of the data.batch
: prefilledrocksdict.WriteBatch
, optional.
RocksDBStorePartition.get
Get a key from RocksDB.
Arguments:
key
: a key encoded tobytes
default
: a default value to return if the key is not found.cf_name
: rocksdb column family name. Default - "default"
Returns:
a value if the key is present in the DB. Otherwise, default
RocksDBStorePartition.exists
Check if a key is present in the DB.
Arguments:
key
: a key encoded tobytes
.cf_name
: rocksdb column family name. Default - "default"
Returns:
True
if the key is present, False
otherwise.
RocksDBStorePartition.get_processed_offset
Get last processed offset for the given partition
Returns:
offset or None
if there's no processed offset yet
RocksDBStorePartition.get_changelog_offset
Get offset that the changelog is up-to-date with.
Returns:
offset or None
if there's no processed offset yet
RocksDBStorePartition.close
Close the underlying RocksDB
RocksDBStorePartition.path
Absolute path to RocksDB database folder
Returns:
file path
RocksDBStorePartition.destroy
Delete underlying RocksDB database
The database must be closed first.
Arguments:
path
: an absolute path to the RocksDB folder
RocksDBStorePartition.get_column_family_handle
Get a column family handle to pass to it WriteBatch.
This method will cache the CF handle instance to avoid creating them repeatedly.
Arguments:
cf_name
: column family name
Returns:
instance of rocksdict.ColumnFamily
RocksDBStorePartition.get_column_family
Get a column family instance.
This method will cache the CF instance to avoid creating them repeatedly.
Arguments:
cf_name
: column family name
Returns:
instance of rocksdict.Rdict
for the given column family
quixstreams.state.rocksdb.metadata
quixstreams.state.rocksdb
quixstreams.state.rocksdb.types
quixstreams.state.rocksdb.exceptions
quixstreams.state.metadata
quixstreams.state.memory.store
MemoryStore
In-memory state store.
It keeps track of individual store partitions and provides access to the partitions' transactions.
Requires a full state recovery for each partition on assignment.
MemoryStore.__init__
def __init__(
name: str,
topic: str,
changelog_producer_factory: Optional[ChangelogProducerFactory] = None
) -> None
Arguments:
name
: a unique store nametopic
: a topic name for this storechangelog_producer_factory
: a ChangelogProducerFactory instance if using changelogs topics.
quixstreams.state.memory.partition
MemoryStorePartition
Class to access in-memory state.
Responsibilities: 1. Recovering from changelog messages 2. Creating transaction to interact with data 3. Track partition state in-memory
MemoryStorePartition.write
@_validate_partition_state()
def write(cache: PartitionTransactionCache, processed_offset: Optional[int],
changelog_offset: Optional[int]) -> None
Write data to the state
Arguments:
cache
: The partition update cacheprocessed_offset
: The offset processed to generate the data.changelog_offset
: The changelog message offset of the data.
MemoryStorePartition.get_processed_offset
Get last processed offset for the given partition
Returns:
offset or None
if there's no processed offset yet
MemoryStorePartition.get_changelog_offset
Get offset that the changelog is up-to-date with.
Returns:
offset or None
if there's no processed offset yet
MemoryStorePartition.get
@_validate_partition_state()
def get(key: bytes,
default: Any = None,
cf_name: str = "default") -> Union[None, bytes, Any]
Get a key from the store
Arguments:
key
: a key encoded tobytes
default
: a default value to return if the key is not found.cf_name
: rocksdb column family name. Default - "default"
Returns:
a value if the key is present in the store. Otherwise, default
MemoryStorePartition.exists
Check if a key is present in the store.
Arguments:
key
: a key encoded tobytes
.cf_name
: rocksdb column family name. Default - "default"
Returns:
True
if the key is present, False
otherwise.
quixstreams.state.memory
quixstreams.state.recovery
RecoveryPartition
A changelog topic partition mapped to a respective StorePartition
with helper
methods to determine its current recovery status.
Since StorePartition
s do recovery directly, it also handles recovery transactions.
RecoveryPartition.offset
Get the changelog offset from the underlying StorePartition
.
Returns:
changelog offset (int)
RecoveryPartition.needs_recovery_check
Determine whether to attempt recovery for underlying StorePartition
.
This does NOT mean that anything actually requires recovering.
RecoveryPartition.has_invalid_offset
Determine if the current changelog offset stored in state is invalid.
RecoveryPartition.recover_from_changelog_message
Recover the StorePartition using a message read from its respective changelog.
Arguments:
changelog_message
: A confluent kafka message (everything as bytes)
RecoveryPartition.set_watermarks
Set the changelog watermarks as gathered from Consumer.get_watermark_offsets()
Arguments:
lowwater
: topic partition lowwaterhighwater
: topic partition highwater
RecoveryPartition.set_recovery_consume_position
Update the recovery partition with the consumer's position (whenever
an empty poll is returned during recovery).
It is possible that it may be set more than once.
Arguments:
offset
: the consumer's current read position of the changelog
ChangelogProducerFactory
Generates ChangelogProducers, which produce changelog messages to a StorePartition.
ChangelogProducerFactory.__init__
Arguments:
changelog_name
: changelog topic nameproducer
: a RowProducer (not shared withApplication
instance)
Returns:
a ChangelogWriter instance
ChangelogProducerFactory.get_partition_producer
Generate a ChangelogProducer for producing to a specific partition number
(and thus StorePartition).
Arguments:
partition_num
: source topic partition number
ChangelogProducer
Generated for a StorePartition
to produce state changes to its respective
kafka changelog partition.
ChangelogProducer.__init__
Arguments:
changelog_name
: A changelog topic namepartition
: source topic partition numberproducer
: a RowProducer (not shared withApplication
instance)
ChangelogProducer.produce
def produce(key: bytes,
value: Optional[bytes] = None,
headers: Optional[MessageHeadersMapping] = None)
Produce a message to a changelog topic partition.
Arguments:
key
: message key (same as state key, including prefixes)value
: message value (same as state value)headers
: message headers (includes column family info)
RecoveryManager
Manages all consumer-related aspects of recovery, including: - assigning/revoking, pausing/resuming topic partitions (especially changelogs) - consuming changelog messages until state is updated fully.
Also tracks/manages RecoveryPartitions
, which are assigned/tracked only if
recovery for that changelog partition is required.
Recovery is attempted from the Application
after any new partition assignment.
RecoveryManager.partitions
Returns a mapping of assigned RecoveryPartitions in the following format:
{
RecoveryManager.has_assignments
Whether the Application has assigned RecoveryPartitions
Returns:
has assignments, as bool
RecoveryManager.recovering
Whether the Application is currently recovering
Returns:
is recovering, as bool
RecoveryManager.register_changelog
Register a changelog Topic with the TopicManager.
Arguments:
topic_name
: source topic namestore_name
: name of the store
RecoveryManager.do_recovery
If there are any active RecoveryPartitions, do a recovery procedure.
After, will resume normal Application
processing.
RecoveryManager.assign_partition
def assign_partition(topic: str, partition: int, committed_offset: int,
store_partitions: Dict[str, StorePartition])
Assigns StorePartition
s (as RecoveryPartition
s) ONLY IF recovery required.
Pauses active consumer partitions as needed.
RecoveryManager.revoke_partition
revoke ALL StorePartitions (across all Stores) for a given partition number
Arguments:
partition_num
: partition number of source topic
quixstreams.state
quixstreams.state.types
WindowedState
A windowed state to be provided into StreamingDataFrame
window functions.
WindowedState.get_window
Get the value of the window defined by start
and end
timestamps
if the window is present in the state, else default
Arguments:
start_ms
: start of the window in millisecondsend_ms
: end of the window in millisecondsdefault
: default value to return if the key is not found
Returns:
value or None if the key is not found and default
is not provided
WindowedState.update_window
def update_window(start_ms: int,
end_ms: int,
value: Any,
timestamp_ms: int,
window_timestamp_ms: Optional[int] = None)
Set a value for the window.
This method will also update the latest observed timestamp in state partition
using the provided timestamp
.
Arguments:
start_ms
: start of the window in millisecondsend_ms
: end of the window in millisecondsvalue
: value of the windowtimestamp_ms
: current message timestamp in millisecondswindow_timestamp_ms
: arbitrary timestamp stored with the window value
WindowedState.get_latest_timestamp
Get the latest observed timestamp for the current state partition.
Use this timestamp to determine if the arriving event is late and should be discarded from the processing.
Returns:
latest observed event timestamp in milliseconds
WindowedState.expire_windows
Get all expired windows from RocksDB up to the specified max_start_time
timestamp.
This method marks the latest found window as expired in the expiration index, so consecutive calls may yield different results for the same "latest timestamp".
Arguments:
max_start_time
: The timestamp up to which windows are considered expired, inclusive.delete
: If True, expired windows will be deleted.
Returns:
A sorted list of tuples in the format ((start, end), value)
.
WindowedState.delete_windows
Delete windows from RocksDB up to the specified max_start_time
timestamp.
This method removes all window entries that have a start time less than or equal to the given
max_start_time
. It ensures that expired data is cleaned up efficiently without affecting
unexpired windows.
Arguments:
max_start_time
: The timestamp up to which windows should be deleted, inclusive.
WindowedState.get_windows
def get_windows(start_from_ms: int,
start_to_ms: int,
backwards: bool = False) -> list[tuple[tuple[int, int], Any]]
Get all windows that start between "start_from_ms" and "start_to_ms".
Arguments:
start_from_ms
: The minimal window start time, exclusive.start_to_ms
: The maximum window start time, inclusive.backwards
: If True, yields windows in reverse order.
Returns:
A sorted list of tuples in the format ((start, end), value)
.
WindowedPartitionTransaction
WindowedPartitionTransaction.failed
Return True
if transaction failed to update data at some point.
Failed transactions cannot be re-used.
Returns:
bool
WindowedPartitionTransaction.completed
Return True
if transaction is successfully completed.
Completed transactions cannot be re-used.
Returns:
bool
WindowedPartitionTransaction.prepared
Return True
if transaction is prepared completed.
Prepared transactions cannot receive new updates, but can be flushed.
Returns:
bool
WindowedPartitionTransaction.prepare
Produce changelog messages to the changelog topic for all changes accumulated
in this transaction and prepare transcation to flush its state to the state store.
After successful prepare()
, the transaction status is changed to PREPARED,
and it cannot receive updates anymore.
If changelog is disabled for this application, no updates will be produced to the changelog topic.
Arguments:
processed_offset
: the offset of the latest processed message
WindowedPartitionTransaction.get_window
Get the value of the window defined by start
and end
timestamps
if the window is present in the state, else default
Arguments:
start_ms
: start of the window in millisecondsend_ms
: end of the window in millisecondsprefix
: a key prefixdefault
: default value to return if the key is not found
Returns:
value or None if the key is not found and default
is not provided
WindowedPartitionTransaction.update_window
Set a value for the window.
This method will also update the latest observed timestamp in state partition
using the provided timestamp
.
Arguments:
start_ms
: start of the window in millisecondsend_ms
: end of the window in millisecondsvalue
: value of the windowtimestamp_ms
: current message timestamp in millisecondsprefix
: a key prefix
WindowedPartitionTransaction.get_latest_timestamp
Get the latest observed timestamp for the current state prefix
(same as message key).
Use this timestamp to determine if the arriving event is late and should be discarded from the processing.
Returns:
latest observed event timestamp in milliseconds
WindowedPartitionTransaction.expire_windows
def expire_windows(max_start_time: int,
prefix: bytes,
delete: bool = True) -> list[tuple[tuple[int, int], Any]]
Get all expired windows from RocksDB up to the specified max_start_time
timestamp.
This method marks the latest found window as expired in the expiration index, so consecutive calls may yield different results for the same "latest timestamp".
Arguments:
max_start_time
: The timestamp up to which windows are considered expired, inclusive.prefix
: The key prefix for filtering windows.delete
: If True, expired windows will be deleted.
Returns:
A sorted list of tuples in the format ((start, end), value)
.
WindowedPartitionTransaction.delete_windows
Delete windows from RocksDB up to the specified max_start_time
timestamp.
This method removes all window entries that have a start time less than or equal to the given
max_start_time
. It ensures that expired data is cleaned up efficiently without affecting
unexpired windows.
Arguments:
max_start_time
: The timestamp up to which windows should be deleted, inclusive.prefix
: The key prefix used to identify and filter relevant windows.
WindowedPartitionTransaction.get_windows
def get_windows(start_from_ms: int,
start_to_ms: int,
prefix: bytes,
backwards: bool = False) -> list[tuple[tuple[int, int], Any]]
Get all windows that start between "start_from_ms" and "start_to_ms"
within the specified prefix.
Arguments:
start_from_ms
: The minimal window start time, exclusive.start_to_ms
: The maximum window start time, inclusive.prefix
: The key prefix for filtering windows.backwards
: If True, yields windows in reverse order.
Returns:
A sorted list of tuples in the format ((start, end), value)
.
WindowedPartitionTransaction.flush
Flush the recent updates to the storage.
Arguments:
processed_offset
: offset of the last processed message, optional.changelog_offset
: offset of the last produced changelog message, optional.
WindowedPartitionTransaction.changelog_topic_partition
Return the changelog topic-partition for the StorePartition of this transaction.
Returns None
if changelog_producer is not provided.
Returns:
(topic, partition) or None
PartitionRecoveryTransaction
A class for managing recovery for a StorePartition from a changelog message
PartitionRecoveryTransaction.flush
Flush the recovery update to the storage.
quixstreams.state.exceptions
quixstreams.state.manager
StateStoreManager
Class for managing state stores and partitions.
StateStoreManager is responsible for: - reacting to rebalance callbacks - managing the individual state stores - providing access to store transactions
StateStoreManager.stores
Map of registered state stores
Returns:
dict in format {topic: {store_name: store}}
StateStoreManager.recovery_required
Whether recovery needs to be done.
StateStoreManager.using_changelogs
Whether the StateStoreManager is using changelog topics
Returns:
using changelogs, as bool
StateStoreManager.do_recovery
Perform a state recovery, if necessary.
StateStoreManager.stop_recovery
Stop recovery (called during app shutdown).
StateStoreManager.get_store
Get a store for given name and topic
Arguments:
topic
: topic namestore_name
: store name
Returns:
instance of Store
StateStoreManager.register_store
def register_store(topic_name: str,
store_name: str = DEFAULT_STATE_STORE_NAME,
store_type: Optional[StoreTypes] = None)
Register a state store to be managed by StateStoreManager.
During processing, the StateStoreManager will react to rebalancing callbacks and assign/revoke the partitions for registered stores.
Each store can be registered only once for each topic.
Arguments:
topic_name
: topic namestore_name
: store namestore_type
: the storage type used for this store. Default to StateStoreManagerdefault_store_type
StateStoreManager.register_windowed_store
Register a windowed state store to be managed by StateStoreManager.
During processing, the StateStoreManager will react to rebalancing callbacks and assign/revoke the partitions for registered stores.
Each window store can be registered only once for each topic.
Arguments:
topic_name
: topic namestore_name
: store name
StateStoreManager.clear_stores
Delete all state stores managed by StateStoreManager.
StateStoreManager.on_partition_assign
Assign store partitions for each registered store for the given TopicPartition
and return a list of assigned StorePartition
objects.
Arguments:
topic
: Kafka topic namepartition
: Kafka topic partitioncommitted_offset
: latest committed offset for the partition
Returns:
list of assigned StorePartition
StateStoreManager.on_partition_revoke
Revoke store partitions for each registered store for the given TopicPartition
Arguments:
topic
: Kafka topic namepartition
: Kafka topic partition
StateStoreManager.init
Initialize StateStoreManager
and create a store directory
StateStoreManager.close
Close all registered stores
quixstreams.state.serialization
quixstreams.state.base.store
Store
Abstract state store.
It keeps track of individual store partitions and provides access to the partitions' transactions.
Store.topic
Topic name
Store.name
Store name
Store.partitions
Mapping of assigned store partitions
Returns:
dict of "{partition:
Store.assign_partition
Assign new store partition
Arguments:
partition
: partition number
Returns:
instance of StorePartition
Store.revoke_partition
Revoke assigned store partition
Arguments:
partition
: partition number
Store.start_partition_transaction
Start a new partition transaction.
PartitionTransaction
is the primary interface for working with data in Stores.
Arguments:
partition
: partition number
Returns:
instance of PartitionTransaction
Store.close
Close store and revoke all store partitions
quixstreams.state.base.partition
StorePartition
A base class to access state in the underlying storage. It represents a single instance of some storage (e.g. a single database for the persistent storage).
StorePartition.get_processed_offset
Get last processed offset for the given partition
Returns:
offset or None
if there's no processed offset yet
StorePartition.get_changelog_offset
Get offset that the changelog is up-to-date with.
Returns:
offset or None
if there's no processed offset yet
StorePartition.write
@abstractmethod
def write(cache: PartitionTransactionCache, processed_offset: Optional[int],
changelog_offset: Optional[int])
Update the state with data from the update cache
Arguments:
cache
: The modified dataprocessed_offset
: The offset processed to generate the data.changelog_offset
: The changelog message offset of the data.
StorePartition.get
@abstractmethod
def get(key: bytes,
default: Any = None,
cf_name: str = "default") -> Union[None, bytes, Any]
Get a key from the store
Arguments:
key
: a key encoded tobytes
default
: a default value to return if the key is not found.cf_name
: rocksdb column family name. Default - "default"
Returns:
a value if the key is present in the store. Otherwise, default
StorePartition.exists
Check if a key is present in the store.
Arguments:
key
: a key encoded tobytes
.cf_name
: rocksdb column family name. Default - "default"
Returns:
True
if the key is present, False
otherwise.
StorePartition.begin
Start a new PartitionTransaction
Using PartitionTransaction
is a recommended way for accessing the data.
StorePartition.recover_from_changelog_message
def recover_from_changelog_message(
changelog_message: ConfluentKafkaMessageProto, committed_offset: int)
Updates state from a given changelog message.
Arguments:
changelog_message
: A raw Confluent message read from a changelog topic.committed_offset
: latest committed offset for the partition
quixstreams.state.base.transaction
PartitionTransactionCache
A cache with the data updated in the current PartitionTransaction. It is used to read-your-own-writes before the transaction is committed to the Store.
Internally, updates and deletes are separated into two separate structures to simplify the querying over them.
PartitionTransactionCache.get
Get a value for the key.
Returns the key value if it has been updated during the transaction.
If the key has already been deleted, returns "DELETED" sentinel (we don't need to check the actual store). If the key is not present in the cache, returns "UNDEFINED sentinel (we need to check the store).
:param: key: key as bytes :param: prefix: key prefix as bytes :param: cf_name: column family name
PartitionTransactionCache.set
Set a value for the key.
:param: key: key as bytes :param: value: value as bytes :param: prefix: key prefix as bytes :param: cf_name: column family name
PartitionTransactionCache.delete
Delete a key.
:param: key: key as bytes :param: value: value as bytes :param: prefix: key prefix as bytes :param: cf_name: column family name
PartitionTransactionCache.is_empty
Return True if any changes have been made (updates or deletes), otherwise return False.
PartitionTransactionCache.get_column_families
Get all update column families.
PartitionTransactionCache.get_updates
Get all updated keys (excluding deleted)
in the format "{
:param: cf_name: column family name
PartitionTransactionCache.get_deletes
Get all deleted keys (excluding updated) as a set.
PartitionTransactionStatus
STARTED
Transaction is started and accepts updates
PREPARED
Transaction is prepared, it can no longer receive updates
COMPLETE
Transaction is fully completed, it cannot be used anymore
FAILED
Transaction is failed, it cannot be used anymore
validate_transaction_status
Check that the status of RocksDBTransaction
is valid before calling a method
PartitionTransaction
A transaction class to perform simple key-value operations like "get", "set", "delete" and "exists" on a single storage partition.
PartitionTransaction.failed
Return True
if transaction failed to update data at some point.
Failed transactions cannot be re-used.
Returns:
bool
PartitionTransaction.completed
Return True
if transaction is successfully completed.
Completed transactions cannot be re-used.
Returns:
bool
PartitionTransaction.prepared
Return True
if transaction is prepared completed.
Prepared transactions cannot receive new updates, but can be flushed.
Returns:
bool
PartitionTransaction.changelog_topic_partition
Return the changelog topic-partition for the StorePartition of this transaction.
Returns None
if changelog_producer is not provided.
Returns:
(topic, partition) or None
PartitionTransaction.as_state
Create an instance implementing the State
protocol to be provided
to StreamingDataFrame
functions.
All operations called on this State object will be prefixed with
the supplied prefix
.
Returns:
an instance implementing the State
protocol
PartitionTransaction.get
@validate_transaction_status(PartitionTransactionStatus.STARTED)
def get(key: Any,
prefix: bytes,
default: Any = None,
cf_name: str = "default") -> Optional[Any]
Get a key from the store.
It returns None
if the key is not found and default
is not provided.
Arguments:
key
: keyprefix
: a key prefixdefault
: default value to return if the key is not foundcf_name
: column family name
Returns:
value or None if the key is not found and default
is not provided
PartitionTransaction.set
@validate_transaction_status(PartitionTransactionStatus.STARTED)
def set(key: Any, value: Any, prefix: bytes, cf_name: str = "default")
Set value for the key.
Arguments:
key
: keyprefix
: a key prefixvalue
: valuecf_name
: column family name
PartitionTransaction.delete
@validate_transaction_status(PartitionTransactionStatus.STARTED)
def delete(key: Any, prefix: bytes, cf_name: str = "default")
Delete value for the key.
This function always returns None
, even if value is not found.
Arguments:
key
: keyprefix
: a key prefixcf_name
: column family name
PartitionTransaction.exists
@validate_transaction_status(PartitionTransactionStatus.STARTED)
def exists(key: Any, prefix: bytes, cf_name: str = "default") -> bool
Check if the key exists in state.
Arguments:
key
: keyprefix
: a key prefixcf_name
: column family name
Returns:
True if key exists, False otherwise
PartitionTransaction.prepare
Produce changelog messages to the changelog topic for all changes accumulated
in this transaction and prepare transaction to flush its state to the state store.
After successful prepare()
, the transaction status is changed to PREPARED,
and it cannot receive updates anymore.
If changelog is disabled for this application, no updates will be produced to the changelog topic.
Arguments:
processed_offset
: the offset of the latest processed message
PartitionTransaction.flush
@validate_transaction_status(PartitionTransactionStatus.STARTED,
PartitionTransactionStatus.PREPARED)
def flush(processed_offset: Optional[int] = None,
changelog_offset: Optional[int] = None)
Flush the recent updates to the database.
It writes the WriteBatch to RocksDB and marks itself as finished.
If writing fails, the transaction is marked as failed and cannot be used anymore.
NOTE: If no keys have been modified during the transaction (i.e. no "set" or "delete" have been called at least once), it will not flush ANY data to the database including the offset to optimize I/O.
Arguments:
processed_offset
: offset of the last processed message, optional.changelog_offset
: offset of the last produced changelog message, optional.
quixstreams.state.base
quixstreams.state.base.state
State
Primary interface for working with key-value state data from StreamingDataFrame
State.get
Get the value for key if key is present in the state, else default
Arguments:
key
: keydefault
: default value to return if the key is not found
Returns:
value or None if the key is not found and default
is not provided
State.set
Set value for the key.
Arguments:
key
: keyvalue
: value
State.delete
Delete value for the key.
This function always returns None
, even if value is not found.
Arguments:
key
: key
State.exists
Check if the key exists in state.
Arguments:
key
: key
Returns:
True if key exists, False otherwise
TransactionState
TransactionState.__init__
Simple key-value state to be provided into StreamingDataFrame
functions
Arguments:
transaction
: instance ofPartitionTransaction
TransactionState.get
Get the value for key if key is present in the state, else default
Arguments:
key
: keydefault
: default value to return if the key is not found
Returns:
value or None if the key is not found and default
is not provided
TransactionState.set
Set value for the key.
Arguments:
key
: keyvalue
: value
TransactionState.delete
Delete value for the key.
This function always returns None
, even if value is not found.
Arguments:
key
: key
TransactionState.exists
Check if the key exists in state.
Arguments:
key
: key
Returns:
True if key exists, False otherwise
quixstreams.exceptions
quixstreams.exceptions.assignment
PartitionAssignmentError
Error happened during partition rebalancing.
Raised from on_assign
, on_revoke
and on_lost
callbacks
quixstreams.exceptions.base
quixstreams.context
set_message_context
Set a MessageContext for the current message in the given contextvars.Context
NOTE: This is for advanced usage only. If you need to change the message key,
StreamingDataFrame.to_topic()
has an argument for it.
Example Snippet:
from quixstreams import Application, set_message_context, message_context
# Changes the current sdf value based on what the message partition is.
def alter_context(value):
context = message_context()
if value > 1:
context.headers = context.headers + (b"cool_new_header", value.encode())
set_message_context(context)
app = Application()
sdf = app.dataframe()
sdf = sdf.update(lambda value: alter_context(value))
Arguments:
context
: instance ofMessageContext
message_context
Get a MessageContext for the current message, which houses most of the message
metadata, like: - key - timestamp - partition - offset
Example Snippet:
from quixstreams import Application, message_context
# Changes the current sdf value based on what the message partition is.
app = Application()
sdf = app.dataframe()
sdf = sdf.apply(lambda value: 1 if message_context().partition == 2 else 0)
Returns:
instance of MessageContext
quixstreams.kafka.configuration
ConnectionConfig
Provides an interface for all librdkafka connection-based configs.
Allows converting to or from a librdkafka dictionary.
Also obscures secrets and handles any case sensitivity issues.
ConnectionConfig.settings_customise_sources
@classmethod
def settings_customise_sources(
cls, settings_cls: Type[BaseSettings],
init_settings: PydanticBaseSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource
) -> Tuple[PydanticBaseSettingsSource, ...]
Included to ignore reading/setting values from the environment
ConnectionConfig.from_librdkafka_dict
Create a ConnectionConfig
from a librdkafka config dictionary.
Arguments:
config
: a dict of configs (like {"bootstrap.servers": "url"})ignore_extras
: Ignore non-connection settings (else raise exception)
Returns:
a ConnectionConfig
ConnectionConfig.as_librdkafka_dict
Dump any non-empty config values as a librdkafka dictionary.
NOTE: All secret values will be dumped in PLAINTEXT by default.
Arguments:
plaintext_secrets
: whether secret values are plaintext or obscured (***)
Returns:
a librdkafka-compatible dictionary
quixstreams.kafka
quixstreams.kafka.producer
Producer
Producer.__init__
def __init__(broker_address: Union[str, ConnectionConfig],
logger: logging.Logger = logger,
error_callback: Callable[[KafkaError], None] = _default_error_cb,
extra_config: Optional[dict] = None,
flush_timeout: Optional[int] = None)
A wrapper around confluent_kafka.Producer
.
It initializes confluent_kafka.Producer
on demand
avoiding network calls during __init__
, provides typing info for methods
and some reasonable defaults.
Arguments:
broker_address
: Connection settings for Kafka. Accepts string with Kafka broker host and port formatted as<host>:<port>
, or a ConnectionConfig object if authentication is required.logger
: a Logger instance to attach librdkafka logging toerror_callback
: callback used for producer errorsextra_config
: A dictionary with additional options that will be passed toconfluent_kafka.Producer
as is. Note: values passed as arguments override values inextra_config
.flush_timeout
: The time the producer is waiting for all messages to be delivered.
Producer.produce
def produce(topic: str,
value: Optional[Union[str, bytes]] = None,
key: Optional[Union[str, bytes]] = None,
headers: Optional[Headers] = None,
partition: Optional[int] = None,
timestamp: Optional[int] = None,
poll_timeout: float = 5.0,
buffer_error_max_tries: int = 3,
on_delivery: Optional[DeliveryCallback] = None)
Produce a message to a topic.
It also polls Kafka for callbacks before producing to minimize
the probability of BufferError
.
If BufferError
still happens, the method will poll Kafka with timeout
to free up the buffer and try again.
Arguments:
topic
: topic namevalue
: message valuekey
: message keyheaders
: message headerspartition
: topic partitiontimestamp
: message timestamppoll_timeout
: timeout forpoll()
call in case ofBufferError
buffer_error_max_tries
: max retries forBufferError
. Pass0
to not retry afterBufferError
.on_delivery
: the delivery callback to be triggered onpoll()
for the produced message.
Producer.poll
Polls the producer for events and calls on_delivery
callbacks.
Arguments:
timeout
: poll timeout seconds; Default: 0 (unlike others)NOTE: -1 will hang indefinitely if there are no messages to acknowledge
Producer.flush
Wait for all messages in the Producer queue to be delivered.
Arguments:
timeout
(float
): time to attempt flushing (seconds). None use producer default or -1 is infinite. Default: None
Returns:
number of messages remaining to flush
TransactionalProducer
A separate producer class used only internally for transactions (transactions are only needed when using a consumer).
quixstreams.kafka.consumer
Consumer
Consumer.__init__
def __init__(broker_address: Union[str, ConnectionConfig],
consumer_group: Optional[str],
auto_offset_reset: AutoOffsetReset,
auto_commit_enable: bool = True,
logger: logging.Logger = logger,
error_callback: Callable[[KafkaError], None] = _default_error_cb,
on_commit: Optional[Callable[
[Optional[KafkaError], List[TopicPartition]], None]] = None,
extra_config: Optional[dict] = None)
A wrapper around confluent_kafka.Consumer
.
It initializes confluent_kafka.Consumer
on demand
avoiding network calls during __init__
, provides typing info for methods
and some reasonable defaults.
Arguments:
broker_address
: Connection settings for Kafka. Accepts string with Kafka broker host and port formatted as<host>:<port>
, or a ConnectionConfig object if authentication is required.consumer_group
: Kafka consumer group. Passed asgroup.id
toconfluent_kafka.Consumer
auto_offset_reset
: Consumerauto.offset.reset
setting. Available values:
"earliest" - automatically reset the offset to the smallest offset
"latest" - automatically reset the offset to the largest offset
"error" - trigger an error (ERR__AUTO_OFFSET_RESET
) which is retrieved by consuming messages (used for testing)auto_commit_enable
: If true, periodically commit offset of the last message handed to the application. Default -True
.logger
: a Logger instance to attach librdkafka logging toerror_callback
: callback used for consumer errorson_commit
: Offset commit result propagation callback. Passed as "offset_commit_cb" toconfluent_kafka.Consumer
.extra_config
: A dictionary with additional options that will be passed toconfluent_kafka.Consumer
as is. Note: values passed as arguments override values inextra_config
.
Consumer.poll
Consumes a single message, calls callbacks and returns events.
The application must check the returned class:Message
object's func:Message.error()
method to distinguish between proper
messages (error() returns None), or an event or error.
Note: a RebalancingCallback
may be called from this method (
on_assign
, on_revoke
, or on_lost
).
Arguments:
timeout
(float
): Maximum time in seconds to block waiting for message, event or callback. None or -1 is infinite. Default: None.
Raises:
RuntimeError
: if called on a closed consumer
Returns:
Optional[Message]
: A Message
object or None
on timeout
Consumer.subscribe
def subscribe(topics: List[str],
on_assign: Optional[RebalancingCallback] = None,
on_revoke: Optional[RebalancingCallback] = None,
on_lost: Optional[RebalancingCallback] = None)
Set subscription to supplied list of topics
This replaces a previous subscription.
Arguments:
topics
(List[str]
): List of topics (strings) to subscribe to.on_assign
(Optional[RebalancingCallback]
): callback to provide handling of customized offsets on completion of a successful partition re-assignment.on_revoke
(Optional[RebalancingCallback]
): callback to provide handling of offset commits to a customized store on the start of a rebalance operation.on_lost
(Optional[RebalancingCallback]
): callback to provide handling in the case the partition assignment has been lost. Partitions that have been lost may already be owned by other members in the group and therefore committing offsets, for example, may fail.
Raises:
KafkaException
: if a Kafka-based error occursRuntimeError
: if called on a closed consumer
Consumer.unsubscribe
Remove current subscription.
Raises:
KafkaException
: if a Kafka-based error occursRuntimeError
: if called on a closed consumer
Consumer.store_offsets
def store_offsets(message: Optional[Message] = None,
offsets: Optional[List[TopicPartition]] = None)
Store offsets for a message or a list of offsets.
message
and offsets
are mutually exclusive. The stored offsets
will be committed according to 'auto.commit.interval.ms' or manual
offset-less commit
.
Note that 'enable.auto.offset.store' must be set to False when using this API.
Arguments:
message
(confluent_kafka.Message
): Store message's offset+1.offsets
(List[TopicPartition]
): List of topic+partitions+offsets to store.
Raises:
KafkaException
: if a Kafka-based error occursRuntimeError
: if called on a closed consumer
Consumer.commit
def commit(message: Optional[Message] = None,
offsets: Optional[List[TopicPartition]] = None,
asynchronous: bool = True) -> Optional[List[TopicPartition]]
Commit a message or a list of offsets.
The message
and offsets
parameters are mutually exclusive.
If neither is set, the current partition assignment's offsets are used instead.
Use this method to commit offsets if you have 'enable.auto.commit' set to False.
Arguments:
message
(Message
): Commit the message's offset+1. Note: By convention, committed offsets reflect the next message to be consumed, not the last message consumed.offsets
(List[TopicPartition]
): List of topic+partitions+offsets to commit.asynchronous
(bool
): If true, asynchronously commit, returning None immediately. If False, the commit() call will block until the commit succeeds or fails and the committed offsets will be returned (on success). Note that specific partitions may have failed and the .err field of each partition should be checked for success.
Raises:
KafkaException
: if a Kafka-based error occursRuntimeError
: if called on a closed consumer
Consumer.committed
def committed(partitions: List[TopicPartition],
timeout: Optional[float] = None) -> List[TopicPartition]
Retrieve committed offsets for the specified partitions.
Arguments:
partitions
(List[TopicPartition]
): List of topic+partitions to query for stored offsets.timeout
(float
): Request timeout (seconds). None or -1 is infinite. Default: None
Raises:
KafkaException
: if a Kafka-based error occursRuntimeError
: if called on a closed consumer
Returns:
List[TopicPartition]
: List of topic+partitions with offset and possibly error set.
Consumer.get_watermark_offsets
def get_watermark_offsets(partition: TopicPartition,
timeout: Optional[float] = None,
cached: bool = False) -> Tuple[int, int]
Retrieve low and high offsets for the specified partition.
Arguments:
partition
(TopicPartition
): Topic+partition to return offsets for.timeout
(float
): Request timeout (seconds). None or -1 is infinite. Ignored if cached=True. Default: Nonecached
(bool
): Instead of querying the broker, use cached information. Cached values: The low offset is updated periodically (if statistics.interval.ms is set) while the high offset is updated on each message fetched from the broker for this partition.
Raises:
KafkaException
: if a Kafka-based error occursRuntimeError
: if called on a closed consumer
Returns:
Tuple[int, int]
: Tuple of (low,high) on success or None on timeout.
The high offset is the offset of the last message + 1.
Consumer.list_topics
Request metadata from the cluster.
This method provides the same information as listTopics(), describeTopics() and describeCluster() in the Java Admin client.
Arguments:
topic
(str
): If specified, only request information about this topic, else return results for all topics in cluster. Warning: If auto.create.topics.enable is set to true on the broker and an unknown topic is specified, it will be created.timeout
(float
): The maximum response time before timing out None or -1 is infinite. Default: None
Raises:
KafkaException
: if a Kafka-based error occurs
Consumer.memberid
Return this client's broker-assigned group member id.
The member id is assigned by the group coordinator and is propagated to the consumer during rebalance.
Raises:
RuntimeError
: if called on a closed consumer
Returns:
Optional[string]
: Member id string or None
Consumer.offsets_for_times
def offsets_for_times(partitions: List[TopicPartition],
timeout: Optional[float] = None) -> List[TopicPartition]
Look up offsets by timestamp for the specified partitions.
The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If the provided timestamp exceeds that of the last message in the partition, a value of -1 will be returned.
Arguments:
partitions
(List[TopicPartition]
): topic+partitions with timestamps in the TopicPartition.offset field.timeout
(float
): The maximum response time before timing out. None or -1 is infinite. Default: None
Raises:
KafkaException
: if a Kafka-based error occursRuntimeError
: if called on a closed consumer
Returns:
List[TopicPartition]
: List of topic+partition with offset field set and possibly error set
Consumer.pause
Pause consumption for the provided list of partitions.
Paused partitions must be tracked manually.
Does NOT affect the result of Consumer.assignment()
.
Arguments:
partitions
(List[TopicPartition]
): List of topic+partitions to pause.
Raises:
KafkaException
: if a Kafka-based error occurs
Consumer.resume
Resume consumption for the provided list of partitions.
Arguments:
partitions
(List[TopicPartition]
): List of topic+partitions to resume.
Raises:
KafkaException
: if a Kafka-based error occurs
Consumer.position
Retrieve current positions (offsets) for the specified partitions.
Arguments:
partitions
(List[TopicPartition]
): List of topic+partitions to return current offsets for. The current offset is the offset of the last consumed message + 1.
Raises:
KafkaException
: if a Kafka-based error occursRuntimeError
: if called on a closed consumer
Returns:
List[TopicPartition]
: List of topic+partitions with offset and possibly error set.
Consumer.seek
Set consume position for partition to offset.
The offset may be an absolute (>=0) or a
logical offset like OFFSET_BEGINNING
.
seek()
may only be used to update the consume offset of an
actively consumed partition (i.e., after Consumer.assign()
),
to set the starting offset of partition not being consumed instead
pass the offset in an assign()
call.
Arguments:
partition
(TopicPartition
): Topic+partition+offset to seek to.
Raises:
KafkaException
: if a Kafka-based error occurs
Consumer.assignment
Returns the current partition assignment.
Raises:
KafkaException
: if a Kafka-based error occursRuntimeError
: if called on a closed consumer
Returns:
List[TopicPartition]
: List of assigned topic+partitions.
Consumer.set_sasl_credentials
Sets the SASL credentials used for this client.
These credentials will overwrite the old ones, and will be used the next time the client needs to authenticate. This method will not disconnect existing broker connections that have been established with the old credentials. This method is applicable only to SASL PLAIN and SCRAM mechanisms.
Arguments:
username
(str
): your usernamepassword
(str
): your password
Consumer.incremental_assign
Assign new partitions.
Can be called outside the Consumer
on_assign
callback (multiple times).
Partitions immediately show on Consumer.assignment()
.
Any additional partitions besides the ones passed during the Consumer
on_assign
callback will NOT be associated with the consumer group.
Arguments:
partitions
(List[TopicPartition]
): a list of topic partitions
Consumer.incremental_unassign
Revoke partitions.
Can be called outside an on_revoke callback.
Arguments:
partitions
(List[TopicPartition]
): a list of topic partitions
Consumer.close
Close down and terminate the Kafka Consumer.
Actions performed:
- Stops consuming.
- Commits offsets, unless the consumer property 'enable.auto.commit' is set to False.
- Leaves the consumer group.
Registered callbacks may be called from this method,
see poll()
for more info.
Consumer.consumer_group_metadata
Used by the producer during consumer offset sending for an EOS transaction.
quixstreams.kafka.exceptions
quixstreams.app
Application
The main Application class.
Typically, the primary object needed to get a kafka application up and running.
Most functionality is explained the various methods, except for "column assignment".
What it Does:
- On init:
- Provides defaults or helper methods for commonly needed objects
- If
quix_sdk_token
is passed, configures the app to use the Quix Cloud.
- When executed via
.run()
(after setup):- Initializes Topics and StreamingDataFrames
- Facilitates processing of Kafka messages with a
StreamingDataFrame
- Handles all Kafka client consumer/producer responsibilities.
Example Snippet:
from quixstreams import Application
# Set up an `app = Application` and `sdf = StreamingDataFrame`;
# add some operations to `sdf` and then run everything.
app = Application(broker_address='localhost:9092', consumer_group='group')
topic = app.topic('test-topic')
df = app.dataframe(topic)
df.apply(lambda value, context: print('New message', value))
app.run()
Application.__init__
def __init__(broker_address: Optional[Union[str, ConnectionConfig]] = None,
*,
quix_sdk_token: Optional[str] = None,
consumer_group: Optional[str] = None,
auto_offset_reset: AutoOffsetReset = "latest",
commit_interval: float = 5.0,
commit_every: int = 0,
consumer_extra_config: Optional[dict] = None,
producer_extra_config: Optional[dict] = None,
state_dir: Union[str, Path] = Path("state"),
rocksdb_options: Optional[RocksDBOptionsType] = None,
on_consumer_error: Optional[ConsumerErrorCallback] = None,
on_processing_error: Optional[ProcessingErrorCallback] = None,
on_producer_error: Optional[ProducerErrorCallback] = None,
on_message_processed: Optional[MessageProcessedCallback] = None,
consumer_poll_timeout: float = 1.0,
producer_poll_timeout: float = 0.0,
loglevel: Optional[Union[int, LogLevel]] = "INFO",
auto_create_topics: bool = True,
use_changelog_topics: bool = True,
quix_config_builder: Optional[QuixKafkaConfigsBuilder] = None,
topic_manager: Optional[TopicManager] = None,
request_timeout: float = 30,
topic_create_timeout: float = 60,
processing_guarantee: ProcessingGuarantee = "at-least-once")
Arguments:
broker_address
: Connection settings for Kafka. Used by Producer, Consumer, and Admin clients. Accepts string with Kafka broker host and port formatted as<host>:<port>
, or a ConnectionConfig object if authentication is required. Either this ORquix_sdk_token
must be set to useApplication
(not both). Takes priority over quix auto-configuration. Linked Environment Variable:Quix__Broker__Address
. Default:None
quix_sdk_token
: If using the Quix Cloud, the SDK token to connect with. Either this ORbroker_address
must be set to use Application (not both). Linked Environment Variable:Quix__Sdk__Token
. Default: None (if not run on Quix Cloud)NOTE: the environment variable is set for you in the Quix Cloud
consumer_group
: Kafka consumer group. Passed asgroup.id
toconfluent_kafka.Consumer
. Linked Environment Variable:Quix__Consumer__Group
. Default - "quixstreams-default" (set during init)NOTE: Quix Applications will prefix it with the Quix workspace id.
commit_interval
: How often to commit the processed messages in seconds. Default - 5.0.commit_every
: Commit the checkpoint after processing N messages. Use this parameter for more granular control of the commit schedule. If the value is > 0, the application will commit the checkpoint after processing the specified number of messages across all the assigned partitions. If the value is <= 0, only thecommit_interval
will be considered. Default - 0. >NOTE: Only input offsets are counted, and the application > may produce more results than the number of incoming messages.auto_offset_reset
: Consumerauto.offset.reset
settingconsumer_extra_config
: A dictionary with additional options that will be passed toconfluent_kafka.Consumer
as is.producer_extra_config
: A dictionary with additional options that will be passed toconfluent_kafka.Producer
as is.state_dir
: path to the application state directory. Default -"state"
.rocksdb_options
: RocksDB options. IfNone
, the default options will be used.consumer_poll_timeout
: timeout forRowConsumer.poll()
. Default -1.0
sproducer_poll_timeout
: timeout forRowProducer.poll()
. Default -0
s.on_message_processed
: a callback triggered when message is successfully processed.loglevel
: a log level for "quixstreams" logger. Should be a string or None. IfNone
is passed, no logging will be configured. You may passNone
and configure "quixstreams" logger externally usinglogging
library. Default -"INFO"
.auto_create_topics
: Create allTopic
s made via Application.topic() Default -True
use_changelog_topics
: Use changelog topics to back stateful operations Default -True
topic_manager
: ATopicManager
instancerequest_timeout
: timeout (seconds) for REST-based requeststopic_create_timeout
: timeout (seconds) for topic create finalizationprocessing_guarantee
: Use "exactly-once" or "at-least-once" processing.
Error Handlers
To handle errors,Application
accepts callbacks triggered when exceptions occur on different stages of stream processing. If the callback returnsTrue
, the exception will be ignored. Otherwise, the exception will be propagated and the processing will eventually stop.on_consumer_error
: triggered when internalRowConsumer
fails to poll Kafka or cannot deserialize a message.on_processing_error
: triggered when exception is raised withinStreamingDataFrame.process()
.on_producer_error
: triggered whenRowProducer
fails to serialize or to produce a message to Kafka.
Quix Cloud Parametersquix_config_builder
: instance ofQuixKafkaConfigsBuilder
to be used instead of the default one.NOTE: It is recommended to just use
quix_sdk_token
instead.
Application.Quix
RAISES EXCEPTION: DEPRECATED.
use Application() with "quix_sdk_token" parameter or set the "Quix__Sdk__Token" environment variable.
Application.topic
def topic(name: str,
value_deserializer: DeserializerType = "json",
key_deserializer: DeserializerType = "bytes",
value_serializer: SerializerType = "json",
key_serializer: SerializerType = "bytes",
config: Optional[TopicConfig] = None,
timestamp_extractor: Optional[TimestampExtractor] = None) -> Topic
Create a topic definition.
Allows you to specify serialization that should be used when consuming/producing to the topic in the form of a string name (i.e. "json" for JSON) or a serialization class instance directly, like JSONSerializer().
Example Snippet:
from quixstreams import Application
# Specify an input and output topic for a `StreamingDataFrame` instance,
# where the output topic requires adjusting the key serializer.
app = Application()
input_topic = app.topic("input-topic", value_deserializer="json")
output_topic = app.topic(
"output-topic", key_serializer="str", value_serializer=JSONSerializer()
)
sdf = app.dataframe(input_topic)
sdf.to_topic(output_topic)
Arguments:
name
: topic nameNOTE: If the application is created via
Quix.Application()
, the topic name will be prefixed by Quix workspace id, and it will be<workspace_id>-<name>
value_deserializer
: a deserializer type for values; default="json"key_deserializer
: a deserializer type for keys; default="bytes"value_serializer
: a serializer type for values; default="json"key_serializer
: a serializer type for keys; default="bytes"config
: optional topic configurations (for creation/validation)NOTE: will not create without Application's auto_create_topics set to True (is True by default)
timestamp_extractor
: a callable that returns a timestamp in milliseconds from a deserialized message. Default -None
.
Example Snippet:
app = Application(...)
def custom_ts_extractor(
value: Any,
headers: Optional[List[Tuple[str, bytes]]],
timestamp: float,
timestamp_type: TimestampType,
) -> int:
return value["timestamp"]
topic = app.topic("input-topic", timestamp_extractor=custom_ts_extractor)
Returns:
Topic
object
Application.dataframe
def dataframe(topic: Optional[Topic] = None,
source: Optional[BaseSource] = None) -> StreamingDataFrame
A simple helper method that generates a StreamingDataFrame
, which is used
to define your message processing pipeline.
The topic is what the StreamingDataFrame
will use as its input, unless
a source is provided (topic
is optional when using a source
).
If both topic
AND source
are provided, the source will write to that topic
instead of its default topic (which the StreamingDataFrame
then consumes).
See :class:quixstreams.dataframe.StreamingDataFrame
for more details.
Example Snippet:
from quixstreams import Application
# Set up an `app = Application` and `sdf = StreamingDataFrame`;
# add some operations to `sdf` and then run everything.
app = Application(broker_address='localhost:9092', consumer_group='group')
topic = app.topic('test-topic')
df = app.dataframe(topic)
df.apply(lambda value, context: print('New message', value)
app.run()
Arguments:
topic
: aquixstreams.models.Topic
instance to be used as an input topic.source
: aquixstreams.sources
"BaseSource" instance
Returns:
StreamingDataFrame
object
Application.stop
Stop the internal poll loop and the message processing.
Only necessary when manually managing the lifecycle of the Application
(
likely through some sort of threading).
To otherwise stop an application, either send a SIGTERM
to the process
(like Kubernetes does) or perform a typical KeyboardInterrupt
(Ctrl+C
).
Arguments:
fail
: if True, signals that application is stopped due to unhandled exception, and it shouldn't commit the current checkpoint.
Application.get_producer
Create and return a pre-configured Producer instance. The Producer is initialized with params passed to Application.
It's useful for producing data to Kafka outside the standard Application processing flow, (e.g. to produce test data into a topic). Using this within the StreamingDataFrame functions is not recommended, as it creates a new Producer instance each time, which is not optimized for repeated use in a streaming pipeline.
Example Snippet:
from quixstreams import Application
app = Application(...)
topic = app.topic("input")
with app.get_producer() as producer:
for i in range(100):
producer.produce(topic=topic.name, key=b"key", value=b"value")
Application.get_consumer
Create and return a pre-configured Consumer instance.
The Consumer is initialized with params passed to Application.
It's useful for consuming data from Kafka outside the standard Application processing flow. (e.g., to consume test data from a topic). Using it within the StreamingDataFrame functions is not recommended, as it creates a new Consumer instance each time, which is not optimized for repeated use in a streaming pipeline.
Note: By default, this consumer does not autocommit the consumed offsets to allow
at-least-once processing.
To store the offset call store_offsets() after processing a message.
If autocommit is necessary set enable.auto.offset.store
to True in
the consumer config when creating the app.
Example Snippet:
from quixstreams import Application
app = Application(...)
topic = app.topic("input")
with app.get_consumer() as consumer:
consumer.subscribe([topic.name])
while True:
msg = consumer.poll(timeout=1.0)
if msg is not None:
# Process message
# Optionally commit the offset
# consumer.store_offsets(msg)
Arguments:
auto_commit_enable
: Enable or disable auto commit Default - True
Application.clear_state
Clear the state of the application.
Application.add_source
Add a source to the application.
Use when no transformations (which requires a StreamingDataFrame
) are needed.
See :class:quixstreams.sources.base.BaseSource
for more details.
Arguments:
source
: a :class:quixstreams.sources.BaseSource
instancetopic
: the :class:quixstreams.models.Topic
instance the source will produce to Default: the source default
Application.run
Start processing data from Kafka using provided StreamingDataFrame
Once started, it can be safely terminated with a SIGTERM
signal
(like Kubernetes does) or a typical KeyboardInterrupt
(Ctrl+C
).
Example Snippet:
from quixstreams import Application
# Set up an `app = Application` and `sdf = StreamingDataFrame`;
# add some operations to `sdf` and then run everything.
app = Application(broker_address='localhost:9092', consumer_group='group')
topic = app.topic('test-topic')
df = app.dataframe(topic)
df.apply(lambda value, context: print('New message', value)
app.run()
Application.setup_topics
Validate and create the topics
ApplicationConfig
Immutable object holding the application configuration
For details see :class:quixstreams.Application
ApplicationConfig.settings_customise_sources
@classmethod
def settings_customise_sources(
cls, settings_cls: Type[BaseSettings],
init_settings: PydanticBaseSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource
) -> Tuple[PydanticBaseSettingsSource, ...]
Included to ignore reading/setting values from the environment
ApplicationConfig.copy
Update the application config and return a copy
quixstreams.sources.core
quixstreams.sources.core.kafka.checkpoint
Checkpoint
Checkpoint implementation used by the KafkaReplicatorSource
Checkpoint.close
Perform cleanup (when the checkpoint is empty) instead of committing.
Needed for exactly-once, as Kafka transactions are timeboxed.
Checkpoint.commit
Commit the checkpoint.
This method will: 1. Flush the producer to ensure everything is delivered. 2. Commit topic offsets.
quixstreams.sources.core.kafka
quixstreams.sources.core.kafka.kafka
KafkaReplicatorSource
Source implementation that replicates a topic from a Kafka broker to your application broker.
Running multiple instances of this source is supported.
Example Snippet:
from quixstreams import Application
from quixstreams.sources.kafka import KafkaReplicatorSource
app = Application(
consumer_group="group",
)
source = KafkaReplicatorSource(
name="source-second-kafka",
app_config=app.config,
topic="second-kafka-topic",
broker_address="localhost:9092",
)
sdf = app.dataframe(source=source)
sdf = sdf.print()
app.run()
KafkaReplicatorSource.__init__
def __init__(name: str,
app_config: "ApplicationConfig",
topic: str,
broker_address: Union[str, ConnectionConfig],
auto_offset_reset: AutoOffsetReset = "latest",
consumer_extra_config: Optional[dict] = None,
consumer_poll_timeout: Optional[float] = None,
shutdown_timeout: float = 10,
on_consumer_error: Optional[
ConsumerErrorCallback] = default_on_consumer_error,
value_deserializer: DeserializerType = "json",
key_deserializer: DeserializerType = "bytes") -> None
Arguments:
name
: The source unique name. It is used to generate the default topic name and consumer group name on the source broker. Running multiple instances ofKafkaReplicatorSource
with the same name connected to the same broker will make them share the same consumer group.app_config
: The configuration of the application. Used by the source to connect to the application kafka broker.topic
: The topic to replicate.broker_address
: The connection settings for the source Kafka.auto_offset_reset
: Consumerauto.offset.reset
setting. Default - Use the Applicationauto_offset_reset
setting.consumer_extra_config
: A dictionary with additional options that will be passed toconfluent_kafka.Consumer
as is. Default -None
consumer_poll_timeout
: timeout forRowConsumer.poll()
Default - Use the Applicationconsumer_poll_timeout
setting.shutdown_timeout
: Time in second the application waits for the source to gracefully shutdown.on_consumer_error
: Triggered when the sourceConsumer
fails to poll Kafka.value_deserializer
: The default topic value deserializer, used by StreamingDataframe connected to the source. Default -json
key_deserializer
: The default topic key deserializer, used by StreamingDataframe connected to the source. Default -json
quixstreams.sources.core.kafka.quix
QuixEnvironmentSource
Source implementation that replicates a topic from a Quix Cloud environment to your application broker. It can copy messages for development and testing without risking producing them back or affecting the consumer groups.
Running multiple instances of this source is supported.
Example Snippet:
from quixstreams import Application
from quixstreams.sources.kafka import QuixEnvironmentSource
app = Application(
consumer_group="group",
)
source = QuixEnvironmentSource(
name="source-quix",
app_config=app.config,
quix_workspace_id="WORKSPACE_ID",
quix_sdk_token="WORKSPACE_SDK_TOKEN",
topic="quix-source-topic",
)
sdf = app.dataframe(source=source)
sdf = sdf.print()
app.run()
QuixEnvironmentSource.__init__
def __init__(name: str,
app_config: "ApplicationConfig",
topic: str,
quix_sdk_token: str,
quix_workspace_id: str,
quix_portal_api: Optional[str] = None,
auto_offset_reset: Optional[AutoOffsetReset] = None,
consumer_extra_config: Optional[dict] = None,
consumer_poll_timeout: Optional[float] = None,
shutdown_timeout: float = 10,
on_consumer_error: Optional[
ConsumerErrorCallback] = default_on_consumer_error,
value_deserializer: DeserializerType = "json",
key_deserializer: DeserializerType = "bytes") -> None
Arguments:
quix_workspace_id
: The Quix workspace ID of the source environment.quix_sdk_token
: Quix cloud sdk token used to connect to the source environment.quix_portal_api
: The Quix portal API URL of the source environment. Default -Quix__Portal__Api
environment variable or Quix cloud production URL
For other parameters See quixstreams.sources.kafka.KafkaReplicatorSource
quixstreams.sources.core.csv
CSVSource
CSVSource.__init__
def __init__(path: Union[str, Path],
name: str,
key_extractor: Optional[Callable[[dict], AnyStr]] = None,
timestamp_extractor: Optional[Callable[[dict], int]] = None,
delay: float = 0,
shutdown_timeout: float = 10,
dialect: str = "excel") -> None
A base CSV source that reads data from a CSV file and produces rows
to the Kafka topic in JSON format.
Arguments:
path
: a path to the CSV file.name
: a unique name for the Source. It is used as a part of the default topic name.key_extractor
: an optional callable to extract the message key from the row. It must return eitherstr
orbytes
. If empty, the Kafka messages will be produced without keys. Default -None
.timestamp_extractor
: an optional callable to extract the message timestamp from the row. It must return time in milliseconds asint
. If empty, the current epoch will be used. Default -None
delay
: an optional delay after producing each row for stream simulation. Default -0
.shutdown_timeout
: Time in second the application waits for the source to gracefully shut down.dialect
: a CSV dialect to use. It affects quoting and delimiters. See the "csv" module docs for more info. Default -"excel"
.
quixstreams.sources
quixstreams.sources.community.file.formats.parquet
quixstreams.sources.community.file.formats
quixstreams.sources.community.file.formats.json
JSONFormat
JSONFormat.__init__
Read a JSON-formatted file (along with decompressing it).
Arguments:
compression
: the compression type used on the fileloads
: A custom function to deserialize objects to the expected dict with {_key: str, _value: dict, _timestamp: int}.
quixstreams.sources.community.file.formats.base
Format
Base class for reading files serialized by the Quix Streams File Sink Connector.
Formats include things like JSON, Parquet, etc.
Also handles different compression types.
Format.__init__
super().init() this for a usable init.
Format.deserialize
Parse a filelike byte stream into a collection of records
using the designated format's deserialization approach.
The opening, decompression, and closing of the byte stream's origin is handled automatically.
The iterable should output dicts with the following data/naming structure: {_key: str, _value: dict, _timestamp: int}.
Arguments:
filestream
: a filelike byte stream (such asf
fromf = open(file)
)
quixstreams.sources.community.file
quixstreams.sources.community.file.file
FileSource
Ingest a set of local files into kafka by iterating through the provided folder and processing all nested files within it.
Expects folder and file structures as generated by the related FileSink connector:
my_topics/
├── topic_a/
│ ├── 0/
│ │ ├── 0000.ext
│ │ └── 0011.ext
│ └── 1/
│ ├── 0003.ext
│ └── 0016.ext
└── topic_b/
└── etc...
Intended to be used with a single topic (ex: topic_a), but will recursively read from whatever entrypoint is passed to it.
File format structure depends on the file format.
See the .formats
and .compressions
modules to see what is supported.
Example Usage:
from quixstreams import Application
from quixstreams.sources.community.file import FileSource
app = Application(broker_address="localhost:9092", auto_offset_reset="earliest")
source = FileSource(
filepath="/path/to/my/topic_folder",
file_format="json",
file_compression="gzip",
)
sdf = app.dataframe(source=source).print(metadata=True)
if __name__ == "__main__":
app.run()
FileSource.__init__
def __init__(filepath: Union[str, Path],
file_format: Union[Format, FormatName],
file_compression: Optional[CompressionName] = None,
as_replay: bool = True,
name: Optional[str] = None,
shutdown_timeout: float = 10)
Arguments:
filepath
: a filepath to recursively read through; it is recommended to provide the path to a given topic folder (ex:/path/to/topic_a
).file_format
: what format the message files are in (ex: json, parquet). Optionally, can provide aFormat
instance if more than file_compression is necessary to define (file_compression will then be ignored).file_compression
: what compression is used on the given files, if any.as_replay
: Produce the messages with the original time delay between them. Otherwise, produce the messages as fast as possible. NOTE: Time delay will only be accurate per partition, NOT overall.name
: The name of the Source application (Default: last folder name).shutdown_timeout
: Time in seconds the application waits for the source to gracefully shutdown
FileSource.default_topic
Uses the file structure to generate the desired partition count for the
internal topic.
Returns:
the original default topic, with updated partition count
quixstreams.sources.community.file.compressions.gzip
quixstreams.sources.community.file.compressions
quixstreams.sources.community.file.compressions.base
quixstreams.sources.community
This module contains Sources developed and maintained by the members of Quix Streams community.
quixstreams.sources.community.pubsub
quixstreams.sources.community.pubsub.consumer
PubSubSubscriptionNotFound
Raised when an expected subscription does not exist
PubSubConsumer
PubSubConsumer.poll_and_process
This uses the asynchronous puller to retrieve and handle a message with its assigned callback.
Committing is a separate step.
PubSubConsumer.poll_and_process_batch
Polls and processes until either the max_batch_size or batch_timeout is reached.
PubSubConsumer.subscribe
Asynchronous subscribers require subscribing (synchronous do not).
NOTE: This will not detect whether the subscription exists.
PubSubConsumer.handle_subscription
Handles subscription management in one place.
Subscriptions work similarly to Kafka consumer groups.
-
Each topic can have multiple subscriptions (consumer group ~= subscription).
-
A subscription can have multiple subscribers (similar to consumers in a group).
-
NOTE: exactly-once adds message methods (ack_with_response) when enabled.
quixstreams.sources.community.pubsub.pubsub
PubSubSource
This source enables reading from a Google Cloud Pub/Sub topic, dumping it to a kafka topic using desired SDF-based transformations.
Provides "at-least-once" guarantees.
Currently, forwarding message keys ("ordered messages" in Pub/Sub) is unsupported.
The incoming message value will be in bytes, so transform in your SDF accordingly.
Example Usage:
from quixstreams import Application
from quixstreams.sources.community.pubsub import PubSubSource
from os import environ
source = PubSubSource(
# Suggested: pass JSON-formatted credentials from an environment variable.
service_account_json = environ["PUBSUB_SERVICE_ACCOUNT_JSON"],
project_id="<project ID>",
topic_id="<topic ID>", # NOTE: NOT the full /x/y/z path!
subscription_id="<subscription ID>", # NOTE: NOT the full /x/y/z path!
create_subscription=True,
)
app = Application(
broker_address="localhost:9092",
auto_offset_reset="earliest",
consumer_group="gcp",
loglevel="INFO"
)
sdf = app.dataframe(source=source).print(metadata=True)
if __name__ == "__main__":
app.run()
PubSubSource.__init__
def __init__(project_id: str,
topic_id: str,
subscription_id: str,
service_account_json: Optional[str] = None,
commit_every: int = 100,
commit_interval: float = 5.0,
create_subscription: bool = False,
enable_message_ordering: bool = False,
shutdown_timeout: float = 10.0)
Arguments:
project_id
: a Google Cloud project ID.topic_id
: a Pub/Sub topic ID (NOT the full path).subscription_id
: a Pub/Sub subscription ID (NOT the full path).service_account_json
: a Google Cloud Credentials JSON as a string Can instead use environment variables (which have different behavior):- "GOOGLE_APPLICATION_CREDENTIALS" set to a JSON filepath i.e. /x/y/z.json
- "PUBSUB_EMULATOR_HOST" set to a URL if using an emulated Pub/Sub
commit_every
: max records allowed to be processed before committing.commit_interval
: max allowed elapsed time between commits.create_subscription
: whether to attempt to create a subscription at startup; if it already exists, it instead logs its details (DEBUG level).enable_message_ordering
: When creating a Pub/Sub subscription, whether to allow message ordering. NOTE: does NOT affect existing subscriptions!shutdown_timeout
: How long to wait for a graceful shutdown of the source.
quixstreams.sources.base
quixstreams.sources.base.exceptions
SourceException
Raised in the parent process when a source finish with an exception
quixstreams.sources.base.source
BaseSource
This is the base class for all sources.
Sources are executed in a sub-process of the main application.
To create your own source you need to implement:
start
stop
default_topic
BaseSource
is the most basic interface, and the framework expects every
source to implement it.
Use Source
to benefit from a base implementation.
You can connect a source to a StreamingDataframe using the Application.
Example snippet:
class RandomNumbersSource(BaseSource):
def __init__(self):
super().__init__()
self._running = False
def start(self):
self._running = True
while self._running:
number = random.randint(0, 100)
serialized = self._producer_topic.serialize(value=number)
self._producer.produce(
topic=self._producer_topic.name,
key=serialized.key,
value=serialized.value,
)
def stop(self):
self._running = False
def default_topic(self) -> Topic:
return Topic(
name="topic-name",
value_deserializer="json",
value_serializer="json",
)
def main():
app = Application(broker_address="localhost:9092")
source = RandomNumbersSource()
sdf = app.dataframe(source=source)
sdf.print(metadata=True)
app.run()
if __name__ == "__main__":
main()
BaseSource.configure
This method is triggered when the source is registered to the Application.
It configures the source's Kafka producer and the topic it will produce to.
BaseSource.start
This method is triggered in the subprocess when the source is started.
The subprocess will run as long as the start method executes. Use it to fetch data and produce it to Kafka.
BaseSource.stop
This method is triggered when the application is shutting down.
The source must ensure that the run
method is completed soon.
BaseSource.default_topic
This method is triggered when the topic is not provided to the source.
The source must return a default topic configuration.
Source
A base class for custom Sources that provides a basic implementation of BaseSource
interface.
It is recommended to interface to create custom sources.
Subclass it and implement the run
method to fetch data and produce it to Kafka.
Example:
from quixstreams import Application
import random
from quixstreams.sources import Source
class RandomNumbersSource(Source):
def run(self):
while self.running:
number = random.randint(0, 100)
serialized = self._producer_topic.serialize(value=number)
self.produce(key=str(number), value=serialized.value)
def main():
app = Application(broker_address="localhost:9092")
source = RandomNumbersSource(name="random-source")
sdf = app.dataframe(source=source)
sdf.print(metadata=True)
app.run()
if __name__ == "__main__":
main()
Helper methods and properties:
serialize()
produce()
flush()
running
Source.__init__
Arguments:
name
: The source unique name. It is used to generate the topic configuration.shutdown_timeout
: Time in second the application waits for the source to gracefully shutdown.
Source.running
Property indicating if the source is running.
The stop
method will set it to False
. Use it to stop the source gracefully.
Source.cleanup
This method is triggered once the run
method completes.
Use it to clean up the resources and shut down the source gracefully.
It flushes the producer when _run
completes successfully.
Source.stop
This method is triggered when the application is shutting down.
It sets the running
property to False
.
Source.start
This method is triggered in the subprocess when the source is started.
It marks the source as running, execute it's run method and ensure cleanup happens.
Source.run
This method is triggered in the subprocess when the source is started.
The subprocess will run as long as the run method executes. Use it to fetch data and produce it to Kafka.
Source.serialize
def serialize(key: Optional[object] = None,
value: Optional[object] = None,
headers: Optional[Headers] = None,
timestamp_ms: Optional[int] = None) -> KafkaMessage
Serialize data to bytes using the producer topic serializers and return a quixstreams.models.messages.KafkaMessage
.
Returns:
quixstreams.models.messages.KafkaMessage
Source.produce
def produce(value: Optional[Union[str, bytes]] = None,
key: Optional[Union[str, bytes]] = None,
headers: Optional[Headers] = None,
partition: Optional[int] = None,
timestamp: Optional[int] = None,
poll_timeout: float = 5.0,
buffer_error_max_tries: int = 3) -> None
Produce a message to the configured source topic in Kafka.
Source.flush
This method flush the producer.
It ensures all messages are successfully delivered to Kafka.
Arguments:
timeout
(float
): time to attempt flushing (seconds). None use producer default or -1 is infinite. Default: None
Raises:
CheckpointProducerTimeout
: if any message fails to produce before the timeout
Source.default_topic
Return a default topic matching the source name.
The default topic will not be used if the topic has already been provided to the source.
Returns:
quixstreams.models.topics.Topic
quixstreams.sources.base.manager
SourceProcess
An implementation of the Source subprocess.
It manages a source and its subprocess, handles the communication between the child and parent processes, lifecycle, and error handling.
Some methods are designed to be used from the parent process, and others from the child process.
SourceProcess.run
An entrypoint of the child process.
Responsible for:
* Configuring the signal handlers to handle shutdown properly
* Execution of the source run
method
* Reporting the source exceptions to the parent process
SourceProcess.raise_for_error
Raise a quixstreams.sources.manager.SourceException
if the child process was terminated with an exception.
SourceProcess.stop
Handle shutdown of the source and its subprocess.
First, it tries to shut down gracefully by sending a SIGTERM and waiting up to
source.shutdown_timeout
seconds for the process to exit. If the process
is still alive, it will kill it with a SIGKILL.
SourceManager
Class managing the sources registered with the app
Sources run in their separate process pay attention about cross-process communication
SourceManager.register
Register a new source in the manager.
Each source need to already be configured, can't reuse a topic and must be unique
SourceManager.raise_for_error
Raise an exception if any process has stopped with an exception
SourceManager.is_alive
Check if any process is alive
Returns:
True if at least one process is alive
quixstreams.sources.base.multiprocessing
quixstreams.rowconsumer
RowConsumer
RowConsumer.__init__
def __init__(broker_address: Union[str, ConnectionConfig],
consumer_group: str,
auto_offset_reset: AutoOffsetReset,
auto_commit_enable: bool = True,
on_commit: Callable[[Optional[KafkaError], List[TopicPartition]],
None] = None,
extra_config: Optional[dict] = None,
on_error: Optional[ConsumerErrorCallback] = None)
A consumer class that is capable of deserializing Kafka messages to Rows
according to the Topics deserialization settings.
It overrides .subscribe()
method of Consumer class to accept Topic
objects instead of strings.
Arguments:
broker_address
: Connection settings for Kafka. Accepts string with Kafka broker host and port formatted as<host>:<port>
, or a ConnectionConfig object if authentication is required.consumer_group
: Kafka consumer group. Passed asgroup.id
toconfluent_kafka.Consumer
auto_offset_reset
: Consumerauto.offset.reset
setting. Available values:- "earliest" - automatically reset the offset to the smallest offset
- "latest" - automatically reset the offset to the largest offset
auto_commit_enable
: If true, periodically commit offset of the last message handed to the application. Default -True
.on_commit
: Offset commit result propagation callback. Passed as "offset_commit_cb" toconfluent_kafka.Consumer
.extra_config
: A dictionary with additional options that will be passed toconfluent_kafka.Consumer
as is. Note: values passed as arguments override values inextra_config
.on_error
: a callback triggered whenRowConsumer.poll_row
fails. If consumer fails and the callback returnsTrue
, the exception will be logged but not propagated. The default callback logs an exception and returnsFalse
.
RowConsumer.subscribe
def subscribe(topics: List[Topic],
on_assign: Optional[RebalancingCallback] = None,
on_revoke: Optional[RebalancingCallback] = None,
on_lost: Optional[RebalancingCallback] = None)
Set subscription to supplied list of topics.
This replaces a previous subscription.
This method also updates the internal mapping with topics that is used to deserialize messages to Rows.
Arguments:
topics
: list ofTopic
instances to subscribe to.on_assign
(callable
): callback to provide handling of customized offsets on completion of a successful partition re-assignment.on_revoke
(callable
): callback to provide handling of offset commits to a customized store on the start of a rebalance operation.on_lost
(callable
): callback to provide handling in the case the partition assignment has been lost. Partitions that have been lost may already be owned by other members in the group and therefore committing offsets, for example, may fail.
RowConsumer.poll_row
Consumes a single message and deserialize it to Row or a list of Rows.
The message is deserialized according to the corresponding Topic.
If deserializer raises IgnoreValue
exception, this method will return None.
If Kafka returns an error, it will be raised as exception.
Arguments:
timeout
: poll timeout seconds
Returns:
single Row, list of Rows or None
quixstreams.checkpointing.checkpoint
BaseCheckpoint
Base class to keep track of state updates and consumer offsets and to checkpoint these updates on schedule.
Two implementations exist: * one for checkpointing the Application in quixstreams/checkpoint/checkpoint.py * one for checkpointing the kafka source in quixstreams/sources/kafka/checkpoint.py
BaseCheckpoint.expired
Returns True
if checkpoint deadline has expired OR
if the total number of processed offsets exceeded the "commit_every" limit
when it's defined.
BaseCheckpoint.empty
Returns True
if checkpoint doesn't have any offsets stored yet.
BaseCheckpoint.store_offset
Store the offset of the processed message to the checkpoint.
Arguments:
topic
: topic namepartition
: partition numberoffset
: message offset
BaseCheckpoint.close
Perform cleanup (when the checkpoint is empty) instead of committing.
Needed for exactly-once, as Kafka transactions are timeboxed.
BaseCheckpoint.commit
Commit the checkpoint.
Checkpoint
Checkpoint implementation used by the application
Checkpoint.get_store_transaction
def get_store_transaction(
topic: str,
partition: int,
store_name: str = DEFAULT_STATE_STORE_NAME) -> PartitionTransaction
Get a PartitionTransaction for the given store, topic and partition.
It will return already started transaction if there's one.
Arguments:
topic
: topic namepartition
: partition numberstore_name
: store name
Returns:
instance of PartitionTransaction
Checkpoint.close
Perform cleanup (when the checkpoint is empty) instead of committing.
Needed for exactly-once, as Kafka transactions are timeboxed.
Checkpoint.commit
Commit the checkpoint.
This method will: 1. Produce the changelogs for each state store 2. Flush the producer to ensure everything is delivered. 3. Commit topic offsets. 4. Flush each state store partition to the disk.