Skip to content

Full Reference

quixstreams

quixstreams.logging

configure_logging

def configure_logging(loglevel: Optional[Union[int, LogLevel]],
                      name: str = LOGGER_NAME,
                      pid: bool = False) -> bool

[VIEW SOURCE]

Configure "quixstreams" logger.

NOTE: If "quixstreams" logger already has pre-defined handlers (e.g. logging has already been configured via logging, or the function is called twice), it will skip configuration and return False.

Arguments:

  • loglevel: a valid log level as a string or None. If None passed, this function is no-op and no logging will be configured.
  • name: the log name included in the output
  • pid: if True include the process PID in the logs

Returns:

True if logging config has been updated, otherwise False.

quixstreams.error_callbacks

quixstreams.platforms

quixstreams.platforms.quix.config

strip_workspace_id_prefix

def strip_workspace_id_prefix(workspace_id: str, s: str) -> str

[VIEW SOURCE]

Remove the workspace ID from a given string if it starts with it.

Only used for consumer groups.

Arguments:

  • workspace_id: the workspace id
  • s: the string to append to

Returns:

the string with workspace_id prefix removed

prepend_workspace_id

def prepend_workspace_id(workspace_id: str, s: str) -> str

[VIEW SOURCE]

Add the workspace ID as a prefix to a given string if it does not have it.

Only used for consumer groups.

Arguments:

  • workspace_id: the workspace id
  • s: the string to append to

Returns:

the string with workspace_id prepended

QuixApplicationConfig

@dataclasses.dataclass
class QuixApplicationConfig()

[VIEW SOURCE]

A convenience container class for Quix Application configs.

QuixKafkaConfigsBuilder

class QuixKafkaConfigsBuilder()

[VIEW SOURCE]

Retrieves all the necessary information from the Quix API and builds all the objects required to connect a confluent-kafka client to the Quix Platform.

If not executed within the Quix platform directly, you must provide a Quix "streaming" (aka "sdk") token, or Personal Access Token.

Ideally you also know your workspace name or id. If not, you can search for it using a known topic name, but note the search space is limited to the access level of your token.

It also currently handles the app_auto_create_topics setting for Quix Applications.

QuixKafkaConfigsBuilder.__init__

def __init__(quix_sdk_token: Optional[str] = None,
             workspace_id: Optional[str] = None,
             quix_portal_api_service: Optional[QuixPortalApiService] = None,
             timeout: float = 30,
             topic_create_timeout: float = 60)

[VIEW SOURCE]

Arguments:

  • quix_portal_api_service: A QuixPortalApiService instance (else generated)
  • workspace_id: A valid Quix Workspace ID (else searched for)

QuixKafkaConfigsBuilder.convert_topic_response

@classmethod
def convert_topic_response(cls, api_response: dict) -> Topic

[VIEW SOURCE]

Converts a GET or POST ("create") topic API response to a Topic object

Arguments:

  • api_response: the dict response from a get or create topic call

Returns:

a corresponding Topic object

QuixKafkaConfigsBuilder.strip_workspace_id_prefix

def strip_workspace_id_prefix(s: str) -> str

[VIEW SOURCE]

Remove the workspace ID from a given string if it starts with it.

Only used for consumer groups.

Arguments:

  • s: the string to append to

Returns:

the string with workspace_id prefix removed

QuixKafkaConfigsBuilder.prepend_workspace_id

def prepend_workspace_id(s: str) -> str

[VIEW SOURCE]

Add the workspace ID as a prefix to a given string if it does not have it.

Only used for consumer groups.

Arguments:

  • s: the string to append to

Returns:

the string with workspace_id prepended

QuixKafkaConfigsBuilder.search_for_workspace

def search_for_workspace(workspace_name_or_id: Optional[str] = None,
                         timeout: Optional[float] = None) -> Optional[dict]

[VIEW SOURCE]

Search for a workspace given an expected workspace name or id.

Arguments:

  • workspace_name_or_id: the expected name or id of a workspace
  • timeout: response timeout (seconds); Default 30

Returns:

the workspace data dict if search success, else None

QuixKafkaConfigsBuilder.get_workspace_info

def get_workspace_info(known_workspace_topic: Optional[str] = None,
                       timeout: Optional[float] = None) -> dict

[VIEW SOURCE]

Queries for workspace data from the Quix API, regardless of instance cache,

and updates instance attributes from query result.

Arguments:

  • known_workspace_topic: a topic you know to exist in some workspace
  • timeout: response timeout (seconds); Default 30

QuixKafkaConfigsBuilder.search_workspace_for_topic

def search_workspace_for_topic(
        workspace_id: str,
        topic: str,
        timeout: Optional[float] = None) -> Optional[str]

[VIEW SOURCE]

Search through all the topics in the given workspace id to see if there is a

match with the provided topic.

Arguments:

  • workspace_id: the workspace to search in
  • topic: the topic to search for
  • timeout: response timeout (seconds); Default 30

Returns:

the workspace_id if success, else None

QuixKafkaConfigsBuilder.search_for_topic_workspace

def search_for_topic_workspace(topic: str,
                               timeout: Optional[float] = None
                               ) -> Optional[dict]

[VIEW SOURCE]

Find what workspace a topic belongs to.

If there is only one workspace altogether, it is assumed to be the workspace. More than one means each workspace will be searched until the first hit.

Arguments:

  • topic: the topic to search for
  • timeout: response timeout (seconds); Default 30

Returns:

workspace data dict if topic search success, else None

QuixKafkaConfigsBuilder.create_topic

def create_topic(topic: Topic, timeout: Optional[float] = None)

[VIEW SOURCE]

The actual API call to create the topic.

Arguments:

  • topic: a Topic instance
  • timeout: response timeout (seconds); Default 30

QuixKafkaConfigsBuilder.get_or_create_topic

def get_or_create_topic(topic: Topic, timeout: Optional[float] = None) -> dict

[VIEW SOURCE]

Get or create topics in a Quix cluster as part of initializing the Topic

object to obtain the true topic name.

Arguments:

  • topic: a Topic object
  • timeout: response timeout (seconds); Default 30 marked as "Ready" (and thus ready to produce to/consume from).

QuixKafkaConfigsBuilder.wait_for_topic_ready_statuses

def wait_for_topic_ready_statuses(topics: List[Topic],
                                  timeout: Optional[float] = None,
                                  finalize_timeout: Optional[float] = None)

[VIEW SOURCE]

After the broker acknowledges topics for creation, they will be in a

"Creating" status; they not usable until they are set to a status of "Ready".

This blocks until all topics are marked as "Ready" or the timeout is hit.

Arguments:

  • topics: a list of Topic objects
  • timeout: response timeout (seconds); Default 30
  • finalize_timeout: topic finalization timeout (seconds); Default 60 marked as "Ready" (and thus ready to produce to/consume from).

QuixKafkaConfigsBuilder.get_topic

def get_topic(topic_name: str, timeout: Optional[float] = None) -> dict

[VIEW SOURCE]

return the topic ID (the actual cluster topic name) if it exists, else raise

Arguments:

  • topic_name: name of the topic
  • timeout: response timeout (seconds); Default 30

Raises:

  • QuixApiRequestFailure: when topic does not exist

Returns:

response dict of the topic info if topic found, else None

QuixKafkaConfigsBuilder.get_application_config

def get_application_config(consumer_group_id: str) -> QuixApplicationConfig

[VIEW SOURCE]

Get all the necessary attributes for an Application to run on Quix Cloud.

Arguments:

  • consumer_group_id: consumer group id, if needed

Returns:

a QuixApplicationConfig instance

quixstreams.platforms.quix.env

QuixEnvironment

class QuixEnvironment()

[VIEW SOURCE]

Class to access various Quix platform environment settings

SDK_TOKEN

noqa: S105

QuixEnvironment.state_management_enabled

@property
def state_management_enabled() -> bool

[VIEW SOURCE]

Check whether "State management" is enabled for the current deployment

Returns:

True if state management is enabled, otherwise False

QuixEnvironment.deployment_id

@property
def deployment_id() -> Optional[str]

[VIEW SOURCE]

Return current Quix deployment id.

This variable is meant to be set only by Quix Platform and only when the application is deployed.

Returns:

deployment id or None

QuixEnvironment.workspace_id

@property
def workspace_id() -> Optional[str]

[VIEW SOURCE]

Return Quix workspace id if set

Returns:

workspace id or None

QuixEnvironment.portal_api

@property
def portal_api() -> Optional[str]

[VIEW SOURCE]

Return Quix Portal API url if set

Returns:

portal API URL or None

QuixEnvironment.state_dir

@property
def state_dir() -> str

[VIEW SOURCE]

Return application state directory on Quix.

Returns:

path to state dir

quixstreams.platforms.quix.checks

check_state_management_enabled

def check_state_management_enabled()

[VIEW SOURCE]

Check if State Management feature is enabled for the current deployment on Quix platform. If it's disabled, the exception will be raised.

check_state_dir

def check_state_dir(state_dir: Path)

[VIEW SOURCE]

Check if Application "state_dir" matches the state dir on Quix platform.

If it doesn't match, the warning will be logged.

Arguments:

  • state_dir: application state_dir path

quixstreams.platforms.quix

quixstreams.platforms.quix.api

QuixPortalApiService

class QuixPortalApiService()

[VIEW SOURCE]

A light wrapper around the Quix Portal Api. If used in the Quix Platform, it will use that workspaces auth token and portal endpoint, else you must provide it.

Function names closely reflect the respective API endpoint, each starting with the method [GET, POST, etc.] followed by the endpoint path.

Results will be returned in the form of request's Response.json(), unless something else is required. Non-200's will raise exceptions.

See the swagger documentation for more info about the endpoints.

QuixPortalApiService.get_workspace_certificate

def get_workspace_certificate(workspace_id: Optional[str] = None,
                              timeout: float = 30) -> Optional[bytes]

[VIEW SOURCE]

Get a workspace TLS certificate if available.

Returns None if certificate is not specified.

Arguments:

  • workspace_id: workspace id, optional
  • timeout: request timeout; Default 30

Returns:

certificate as bytes if present, or None

quixstreams.platforms.quix.exceptions

quixstreams.platforms.quix.topic_manager

QuixTopicManager

class QuixTopicManager(TopicManager)

[VIEW SOURCE]

The source of all topic management with quixstreams.

This is specifically for Applications using the Quix Cloud.

Generally initialized and managed automatically by a Quix Application, but allows a user to work with it directly when needed, such as using it alongside a plain Producer to create its topics.

See methods for details.

QuixTopicManager.__init__

def __init__(topic_admin: TopicAdmin,
             consumer_group: str,
             quix_config_builder: QuixKafkaConfigsBuilder,
             timeout: float = 30,
             create_timeout: float = 60,
             auto_create_topics: bool = True)

[VIEW SOURCE]

Arguments:

  • topic_admin: an Admin instance
  • quix_config_builder: A QuixKafkaConfigsBuilder instance, else one is generated for you.
  • timeout: response timeout (seconds)
  • create_timeout: timeout for topic creation

quixstreams.dataframe.registry

DataframeRegistry

class DataframeRegistry()

[VIEW SOURCE]

Helps manage multiple StreamingDataFrames (multi-topic Applications) and their respective repartitions.

SDFs are registered by storing their topic and current Stream.

DataframeRegistry.consumer_topics

@property
def consumer_topics() -> List[Topic]

[VIEW SOURCE]

Returns:

a list of Topics a consumer should subscribe to.

DataframeRegistry.register_root

def register_root(new_sdf: "StreamingDataFrame")

[VIEW SOURCE]

Register a "root" SDF, or the start of a topic's processing.

Arguments:

  • new_sdf: the new SDF.

DataframeRegistry.register_groupby

def register_groupby(source_sdf: "StreamingDataFrame",
                     new_sdf: "StreamingDataFrame")

[VIEW SOURCE]

Register a "groupby" SDF, which is one generated with SDF.group_by().

Arguments:

  • source_sdf: the SDF used by sdf.group_by()
  • new_sdf: the SDF generated by sdf.group_by().

DataframeRegistry.compose_all

def compose_all(
    sink: Optional[Callable[[Any, Any, int, Any], None]] = None
) -> Dict[str, VoidExecutor]

[VIEW SOURCE]

Composes all the Streams and returns them in a dict, where key is its topic.

Arguments:

  • sink: callable to accumulate the results of the execution, optional.

Returns:

a {topic_name: composed} dict, where composed is a callable

quixstreams.dataframe.dataframe

StreamingDataFrame

class StreamingDataFrame(BaseStreaming)

[VIEW SOURCE]

StreamingDataFrame is the main object you will use for ETL work.

Typically created with an app = quixstreams.app.Application() instance, via sdf = app.dataframe().

What it Does:

  • Builds a data processing pipeline, declaratively (not executed immediately)
    • Executes this pipeline on inputs at runtime (Kafka message values)
  • Provides functions/interface similar to Pandas Dataframes/Series
  • Enables stateful processing (and manages everything related to it)

How to Use:

Define various operations while continuously reassigning to itself (or new fields).

These operations will generally transform your data, access/update state, or produce to kafka topics.

We recommend your data structure to be "columnar" (aka a dict/JSON) in nature so that it works with the entire interface, but simple types like ints, str, etc. are also supported.

See the various methods and classes for more specifics, or for a deep dive into usage, see streamingdataframe.md under the docs/ folder.

NOTE: column referencing like sdf["a_column"] and various methods often create other object types (typically quixstreams.dataframe.StreamingSeries), which is expected; type hinting should alert you to any issues should you attempt invalid operations with said objects (however, we cannot infer whether an operation is valid with respect to your data!).

Example Snippet:

sdf = StreamingDataFrame()
sdf = sdf.apply(a_func)
sdf = sdf.filter(another_func)
sdf = sdf.to_topic(topic_obj)

StreamingDataFrame.apply

def apply(func: Union[
    ApplyCallback,
    ApplyCallbackStateful,
    ApplyWithMetadataCallback,
    ApplyWithMetadataCallbackStateful,
],
          *,
          stateful: bool = False,
          expand: bool = False,
          metadata: bool = False) -> Self

[VIEW SOURCE]

Apply a function to transform the value and return a new value.

The result will be passed downstream as an input value.

Example Snippet:

# This stores a string in state and capitalizes every column with a string value.
# A second apply then keeps only the string value columns (shows non-stateful).
def func(d: dict, state: State):
    value = d["store_field"]
    if value != state.get("my_store_key"):
        state.set("my_store_key") = value
    return {k: v.upper() if isinstance(v, str) else v for k, v in d.items()}

sdf = StreamingDataFrame()
sdf = sdf.apply(func, stateful=True)
sdf = sdf.apply(lambda d: {k: v for k,v in d.items() if isinstance(v, str)})

Arguments:

  • func: a function to apply
  • stateful: if True, the function will be provided with a second argument of type State to perform stateful operations.
  • expand: if True, expand the returned iterable into individual values downstream. If returned value is not iterable, TypeError will be raised. Default - False.
  • metadata: if True, the callback will receive key, timestamp and headers along with the value. Default - False.

StreamingDataFrame.update

def update(func: Union[
    UpdateCallback,
    UpdateCallbackStateful,
    UpdateWithMetadataCallback,
    UpdateWithMetadataCallbackStateful,
],
           *,
           stateful: bool = False,
           metadata: bool = False) -> Self

[VIEW SOURCE]

Apply a function to mutate value in-place or to perform a side effect

(e.g., printing a value to the console).

The result of the function will be ignored, and the original value will be passed downstream.

This operation occurs in-place, meaning reassignment is entirely OPTIONAL: the original StreamingDataFrame is returned for chaining (sdf.update().print()).

Example Snippet:

# Stores a value and mutates a list by appending a new item to it.
# Also prints to console.

def func(values: list, state: State):
    value = values[0]
    if value != state.get("my_store_key"):
        state.set("my_store_key") = value
    values.append("new_item")

sdf = StreamingDataFrame()
sdf = sdf.update(func, stateful=True)
# does not require reassigning
sdf.update(lambda v: v.append(1))

Arguments:

  • func: function to update value
  • stateful: if True, the function will be provided with a second argument of type State to perform stateful operations.
  • metadata: if True, the callback will receive key, timestamp and headers along with the value. Default - False.

Returns:

the updated StreamingDataFrame instance (reassignment NOT required).

StreamingDataFrame.filter

def filter(func: Union[
    FilterCallback,
    FilterCallbackStateful,
    FilterWithMetadataCallback,
    FilterWithMetadataCallbackStateful,
],
           *,
           stateful: bool = False,
           metadata: bool = False) -> Self

[VIEW SOURCE]

Filter value using provided function.

If the function returns True-like value, the original value will be passed downstream.

Example Snippet:

# Stores a value and allows further processing only if the value is greater than
# what was previously stored.

def func(d: dict, state: State):
    value = d["my_value"]
    if value > state.get("my_store_key"):
        state.set("my_store_key") = value
        return True
    return False

sdf = StreamingDataFrame()
sdf = sdf.filter(func, stateful=True)

Arguments:

  • func: function to filter value
  • stateful: if True, the function will be provided with second argument of type State to perform stateful operations.
  • metadata: if True, the callback will receive key, timestamp and headers along with the value. Default - False.

StreamingDataFrame.group_by

def group_by(key: Union[str, Callable[[Any], Any]],
             name: Optional[str] = None,
             value_deserializer: Optional[DeserializerType] = "json",
             key_deserializer: Optional[DeserializerType] = "json",
             value_serializer: Optional[SerializerType] = "json",
             key_serializer: Optional[SerializerType] = "json") -> Self

[VIEW SOURCE]

"Groups" messages by re-keying them via the provided group_by operation

on their message values.

This enables things like aggregations on messages with non-matching keys.

You can provide a column name (uses the column's value) or a custom function to generate this new key.

.groupby() can only be performed once per StreamingDataFrame instance.

NOTE: group_by generates a topic that copies the original topic's settings.

Example Snippet:

# We have customer purchase events where the message key is the "store_id",
# but we want to calculate sales per customer (by "customer_account_id").

def func(d: dict, state: State):
    current_total = state.get("customer_sum", 0)
    new_total = current_total + d["customer_spent"]
    state.set("customer_sum", new_total)
    d["customer_total"] = new_total
    return d

sdf = StreamingDataFrame()
sdf = sdf.group_by("customer_account_id")
sdf = sdf.apply(func, stateful=True)

Arguments:

  • key: how the new key should be generated from the message value; requires a column name (string) or a callable that takes the message value.
  • name: a name for the op (must be unique per group-by), required if key is a custom callable.
  • value_deserializer: a deserializer type for values; default - JSON
  • key_deserializer: a deserializer type for keys; default - JSON
  • value_serializer: a serializer type for values; default - JSON
  • key_serializer: a serializer type for keys; default - JSON

Returns:

a clone with this operation added (assign to keep its effect).

StreamingDataFrame.contains

def contains(key: str) -> StreamingSeries

[VIEW SOURCE]

Check if the key is present in the Row value.

Example Snippet:

# Add new column 'has_column' which contains a boolean indicating
# the presence of 'column_x'

sdf = StreamingDataFrame()
sdf['has_column'] = sdf.contains('column_x')

Arguments:

  • key: a column name to check.

Returns:

a Column object that evaluates to True if the key is present or False otherwise.

StreamingDataFrame.to_topic

def to_topic(topic: Topic, key: Optional[Callable[[Any], Any]] = None) -> Self

[VIEW SOURCE]

Produce current value to a topic. You can optionally specify a new key.

This operation occurs in-place, meaning reassignment is entirely OPTIONAL: the original StreamingDataFrame is returned for chaining (sdf.update().print()).

Example Snippet:

from quixstreams import Application

# Produce to two different topics, changing the key for one of them.

app = Application()
input_topic = app.topic("input_x")
output_topic_0 = app.topic("output_a")
output_topic_1 = app.topic("output_b")

sdf = app.dataframe(input_topic)
sdf = sdf.to_topic(output_topic_0)
# does not require reassigning
sdf.to_topic(output_topic_1, key=lambda data: data["a_field"])

Arguments:

  • topic: instance of Topic
  • key: a callable to generate a new message key, optional. If passed, the return type of this callable must be serializable by key_serializer defined for this Topic object. By default, the current message key will be used.

Returns:

the updated StreamingDataFrame instance (reassignment NOT required).

StreamingDataFrame.set_timestamp

def set_timestamp(func: Callable[[Any, Any, int, Any], int]) -> Self

[VIEW SOURCE]

Set a new timestamp based on the current message value and its metadata.

The new timestamp will be used in windowed aggregations and when producing messages to the output topics.

The new timestamp must be in milliseconds to conform Kafka requirements.

Example Snippet:

from quixstreams import Application


app = Application()
input_topic = app.topic("data")

sdf = app.dataframe(input_topic)
# Updating the record's timestamp based on the value
sdf = sdf.set_timestamp(lambda value, key, timestamp, headers: value['new_timestamp'])

Arguments:

  • func: callable accepting the current value, key, timestamp, and headers. It's expected to return a new timestamp as integer in milliseconds.

Returns:

a new StreamingDataFrame instance

StreamingDataFrame.set_headers

def set_headers(
        func: Callable[
            [Any, Any, int, HeadersTuples],
            HeadersTuples,
        ]) -> Self

[VIEW SOURCE]

Set new message headers based on the current message value and metadata.

The new headers will be used when producing messages to the output topics.

The provided callback must accept value, key, timestamp, and headers, and return a new collection of (header, value) tuples.

Example Snippet:

from quixstreams import Application


app = Application()
input_topic = app.topic("data")

sdf = app.dataframe(input_topic)
# Updating the record's headers based on the value and metadata
sdf = sdf.set_headers(lambda value, key, timestamp, headers: [('id', value['id'])])

Arguments:

  • func: callable accepting the current value, key, timestamp, and headers. It's expected to return a new set of headers as a collection of (header, value) tuples.

Returns:

a new StreamingDataFrame instance

StreamingDataFrame.print

def print(pretty: bool = True, metadata: bool = False) -> Self

[VIEW SOURCE]

Print out the current message value (and optionally, the message metadata) to

stdout (console) (like the built-in print function).

Can also output a more dict-friendly format with pretty=True.

This operation occurs in-place, meaning reassignment is entirely OPTIONAL: the original StreamingDataFrame is returned for chaining (sdf.update().print()).

NOTE: prints the current (edited) values, not the original values.

Example Snippet:

from quixstreams import Application


app = Application()
input_topic = app.topic("data")

sdf = app.dataframe(input_topic)
sdf["edited_col"] = sdf["orig_col"] + "edited"
# print the updated message value with the newly added column
sdf.print()

Arguments:

  • pretty: Whether to use "pprint" formatting, which uses new-lines and indents for easier console reading (but might be worse for log parsing).
  • metadata: Whether to additionally print the key, timestamp, and headers

Returns:

the updated StreamingDataFrame instance (reassignment NOT required).

StreamingDataFrame.compose

def compose(
    sink: Optional[Callable[[Any, Any, int, Any], None]] = None
) -> Dict[str, VoidExecutor]

[VIEW SOURCE]

Compose all functions of this StreamingDataFrame into one big closure.

Closures are more performant than calling all the functions in the StreamingDataFrame one-by-one.

Generally not required by users; the quixstreams.app.Application class will do this automatically.

Example Snippet:

from quixstreams import Application
sdf = app.dataframe()
sdf = sdf.apply(apply_func)
sdf = sdf.filter(filter_func)
sdf = sdf.compose()

result_0 = sdf({"my": "record"})
result_1 = sdf({"other": "record"})

Arguments:

  • sink: callable to accumulate the results of the execution, optional.

Returns:

a function that accepts "value" and returns a result of StreamingDataFrame

StreamingDataFrame.test

def test(value: Any,
         key: Any,
         timestamp: int,
         headers: Optional[Any] = None,
         ctx: Optional[MessageContext] = None,
         topic: Optional[Topic] = None) -> List[Any]

[VIEW SOURCE]

A shorthand to test StreamingDataFrame with provided value

and MessageContext.

Arguments:

  • value: value to pass through StreamingDataFrame
  • key: key to pass through StreamingDataFrame
  • timestamp: timestamp to pass through StreamingDataFrame
  • ctx: instance of MessageContext, optional. Provide it if the StreamingDataFrame instance calls to_topic(), has stateful functions or windows. Default - None.
  • topic: optionally, a topic branch to test with

Returns:

result of StreamingDataFrame

StreamingDataFrame.tumbling_window

def tumbling_window(duration_ms: Union[int, timedelta],
                    grace_ms: Union[int, timedelta] = 0,
                    name: Optional[str] = None) -> TumblingWindowDefinition

[VIEW SOURCE]

Create a tumbling window transformation on this StreamingDataFrame.

Tumbling windows divide time into fixed-sized, non-overlapping windows.

They allow performing stateful aggregations like sum, reduce, etc. on top of the data and emit results downstream.

Notes:

  • The timestamp of the aggregation result is set to the window start timestamp.
  • Every window is grouped by the current Kafka message key.
  • Messages with None key will be ignored.
  • The time windows always use the current event time.

Example Snippet:

app = Application()
sdf = app.dataframe(...)

sdf = (
    # Define a tumbling window of 60s and grace period of 10s
    sdf.tumbling_window(
        duration_ms=timedelta(seconds=60), grace_ms=timedelta(seconds=10.0)
    )

    # Specify the aggregation function
    .sum()

    # Specify how the results should be emitted downstream.
    # "current()" will emit results as they come for each updated window,
    # possibly producing multiple messages per key-window pair
    # "final()" will emit windows only when they are closed and cannot
    # receive any updates anymore.
    .current()
)

Arguments:

  • duration_ms: The length of each window. Can be specified as either an int representing milliseconds or a timedelta object.

    NOTE: timedelta objects will be rounded to the closest millisecond value.

  • grace_ms: The grace period for data arrival. It allows late-arriving data (data arriving after the window has theoretically closed) to be included in the window. Can be specified as either an int representing milliseconds or as a timedelta object.

    NOTE: timedelta objects will be rounded to the closest millisecond value.

  • name: The unique identifier for the window. If not provided, it will be automatically generated based on the window's properties.

Returns:

TumblingWindowDefinition instance representing the tumbling window configuration. This object can be further configured with aggregation functions like sum, count, etc. applied to the StreamingDataFrame.

StreamingDataFrame.hopping_window

def hopping_window(duration_ms: Union[int, timedelta],
                   step_ms: Union[int, timedelta],
                   grace_ms: Union[int, timedelta] = 0,
                   name: Optional[str] = None) -> HoppingWindowDefinition

[VIEW SOURCE]

Create a hopping window transformation on this StreamingDataFrame.

Hopping windows divide the data stream into overlapping windows based on time. The overlap is controlled by the step_ms parameter.

They allow performing stateful aggregations like sum, reduce, etc. on top of the data and emit results downstream.

Notes:

  • The timestamp of the aggregation result is set to the window start timestamp.
  • Every window is grouped by the current Kafka message key.
  • Messages with None key will be ignored.
  • The time windows always use the current event time.

Example Snippet:

app = Application()
sdf = app.dataframe(...)

sdf = (
    # Define a hopping window of 60s with step 30s and grace period of 10s
    sdf.hopping_window(
        duration_ms=timedelta(seconds=60),
        step_ms=timedelta(seconds=30),
        grace_ms=timedelta(seconds=10)
    )

    # Specify the aggregation function
    .sum()

    # Specify how the results should be emitted downstream.
    # "current()" will emit results as they come for each updated window,
    # possibly producing multiple messages per key-window pair
    # "final()" will emit windows only when they are closed and cannot
    # receive any updates anymore.
    .current()
)

Arguments:

  • duration_ms: The length of each window. It defines the time span for which each window aggregates data. Can be specified as either an int representing milliseconds or a timedelta object.

    NOTE: timedelta objects will be rounded to the closest millisecond value.

  • step_ms: The step size for the window. It determines how much each successive window moves forward in time. Can be specified as either an int representing milliseconds or a timedelta object.

    NOTE: timedelta objects will be rounded to the closest millisecond value.

  • grace_ms: The grace period for data arrival. It allows late-arriving data to be included in the window, even if it arrives after the window has theoretically moved forward. Can be specified as either an int representing milliseconds or a timedelta object.

    NOTE: timedelta objects will be rounded to the closest millisecond value.

  • name: The unique identifier for the window. If not provided, it will be automatically generated based on the window's properties.

Returns:

HoppingWindowDefinition instance representing the hopping window configuration. This object can be further configured with aggregation functions like sum, count, etc. and applied to the StreamingDataFrame.

StreamingDataFrame.sliding_window

def sliding_window(duration_ms: Union[int, timedelta],
                   grace_ms: Union[int, timedelta] = 0,
                   name: Optional[str] = None) -> SlidingWindowDefinition

[VIEW SOURCE]

Create a sliding window transformation on this StreamingDataFrame.

Sliding windows continuously evaluate the stream with a fixed step of 1 ms allowing for overlapping, but not redundant windows of a fixed size.

Sliding windows are similar to hopping windows with step_ms set to 1, but are siginificantly more perforant.

They allow performing stateful aggregations like sum, reduce, etc. on top of the data and emit results downstream.

Notes:

  • The timestamp of the aggregation result is set to the window start timestamp.
  • Every window is grouped by the current Kafka message key.
  • Messages with None key will be ignored.
  • The time windows always use the current event time.
  • Windows are inclusive on both the start end end time.
  • Every window contains a distinct aggregation.

Example Snippet:

app = Application()
sdf = app.dataframe(...)

sdf = (
    # Define a sliding window of 60s with a grace period of 10s
    sdf.sliding_window(
        duration_ms=timedelta(seconds=60),
        grace_ms=timedelta(seconds=10)
    )

    # Specify the aggregation function
    .sum()

    # Specify how the results should be emitted downstream.
    # "current()" will emit results as they come for each updated window,
    # possibly producing multiple messages per key-window pair
    # "final()" will emit windows only when they are closed and cannot
    # receive any updates anymore.
    .current()
)

Arguments:

  • duration_ms: The length of each window. Can be specified as either an int representing milliseconds or a timedelta object.

    NOTE: timedelta objects will be rounded to the closest millisecond value.

  • grace_ms: The grace period for data arrival. It allows late-arriving data (data arriving after the window has theoretically closed) to be included in the window. Can be specified as either an int representing milliseconds or as a timedelta object.

    NOTE: timedelta objects will be rounded to the closest millisecond value.

  • name: The unique identifier for the window. If not provided, it will be automatically generated based on the window's properties.

Returns:

SlidingWindowDefinition instance representing the sliding window configuration. This object can be further configured with aggregation functions like sum, count, etc. applied to the StreamingDataFrame.

StreamingDataFrame.drop

def drop(columns: Union[str, List[str]],
         errors: Literal["ignore", "raise"] = "raise") -> Self

[VIEW SOURCE]

Drop column(s) from the message value (value must support del, like a dict).

This operation occurs in-place, meaning reassignment is entirely OPTIONAL: the original StreamingDataFrame is returned for chaining (sdf.update().print()).

Example Snippet:

# Remove columns "x" and "y" from the value.
# This would transform {"x": 1, "y": 2, "z": 3} to {"z": 3}

sdf = StreamingDataFrame()
sdf.drop(["x", "y"])

Arguments:

  • columns: a single column name or a list of names, where names are str
  • errors: If "ignore", suppress error and only existing labels are dropped. Default - "raise".

Returns:

a new StreamingDataFrame instance

StreamingDataFrame.sink

def sink(sink: BaseSink)

[VIEW SOURCE]

Sink the processed data to the specified destination.

Internally, each processed record is added to a sink, and the sinks are flushed on each checkpoint. The offset will be committed only if all the sinks for all topic partitions are flushed successfully.

Additionally, Sinks may signal the backpressure to the application (e.g., when the destination is rate-limited). When this happens, the application will pause the corresponding topic partition and resume again after the timeout. The backpressure handling and timeouts are defined by the specific sinks.

Note: sink() is a terminal operation - it cannot receive any additional operations, but branches can still be generated from its originating SDF.

quixstreams.dataframe.series

StreamingSeries

class StreamingSeries(BaseStreaming)

[VIEW SOURCE]

StreamingSeries are typically generated by StreamingDataframes when getting elements from, or performing certain operations on, a StreamingDataframe, thus acting as a representation of "column" value.

They share some operations with the StreamingDataframe, but also provide some additional functionality.

Most column value operations are handled by this class, and StreamingSeries can generate other StreamingSeries as a result of said operations.

What it Does:

  • Allows ways to do simple operations with dataframe "column"/dictionary values:
    • Basic ops like add, subtract, modulo, etc.
  • Enables comparisons/inequalities:
    • Greater than, equals, etc.
    • and/or, is/not operations
  • Can check for existence of columns in StreamingDataFrames
  • Enables chaining of various operations together

How to Use:

For the most part, you may not even notice this class exists! They will naturally be created as a result of typical StreamingDataFrame use.

Auto-complete should help you with valid methods and type-checking should alert you to invalid operations between StreamingSeries.

In general, any typical Pands dataframe operation between columns should be valid with StreamingSeries, and you shouldn't have to think about them explicitly.

Example Snippet:

# Random methods for example purposes. More detailed explanations found under
# various methods or in the docs folder.

sdf = StreamingDataFrame()
sdf = sdf["column_a"].apply(a_func).apply(diff_func, stateful=True)
sdf["my_new_bool_field"] = sdf["column_b"].contains("this_string")
sdf["new_sum_field"] = sdf["column_c"] + sdf["column_d"] + 2
sdf = sdf[["column_a"] & (sdf["new_sum_field"] >= 10)]

StreamingSeries.from_apply_callback

@classmethod
def from_apply_callback(cls, func: ApplyWithMetadataCallback,
                        sdf_id: int) -> Self

[VIEW SOURCE]

Create a StreamingSeries from a function.

The provided function will be wrapped into Apply

Arguments:

  • func: a function to apply
  • sdf_id: the id of the calling SDF.

Returns:

instance of StreamingSeries

StreamingSeries.apply

def apply(func: ApplyCallback) -> Self

[VIEW SOURCE]

Add a callable to the execution list for this series.

The provided callable should accept a single argument, which will be its input. The provided callable should similarly return one output, or None

They can be chained together or included with other operations.

Example Snippet:

# The `StreamingSeries` are generated when `sdf["COLUMN_NAME"]` is called.
# This stores a string in state and capitalizes the column value; the result is
# assigned to a new column.
#  Another apply converts a str column to an int, assigning it to a new column.

def func(value: str, state: State):
    if value != state.get("my_store_key"):
        state.set("my_store_key") = value
    return v.upper()

sdf = StreamingDataFrame()
sdf["new_col"] = sdf["a_column"]["nested_dict_key"].apply(func, stateful=True)
sdf["new_col_2"] = sdf["str_col"].apply(lambda v: int(v)) + sdf["str_col2"] + 2

Arguments:

  • func: a callable with one argument and one output

Returns:

a new StreamingSeries with the new callable added

StreamingSeries.compose_returning

def compose_returning() -> ReturningExecutor

[VIEW SOURCE]

Compose a list of functions from this StreamingSeries and its parents into one

big closure that always returns the transformed record.

This closure is to be used to execute the functions in the stream and to get the result of the transformations.

Stream may only contain simple "apply" functions to be able to compose itself into a returning function.

Returns:

a callable accepting value, key and timestamp and returning a tuple "(value, key, timestamp)

StreamingSeries.compose

def compose(
    sink: Optional[Callable[[Any, Any, int, Any],
                            None]] = None) -> VoidExecutor

[VIEW SOURCE]

Compose all functions of this StreamingSeries into one big closure.

Generally not required by users; the quixstreams.app.Application class will do this automatically.

Example Snippet:

from quixstreams import Application

app = Application(...)

sdf = app.dataframe()
sdf = sdf["column_a"].apply(apply_func)
sdf = sdf["column_b"].contains(filter_func)
sdf = sdf.compose()

result_0 = sdf({"my": "record"})
result_1 = sdf({"other": "record"})

Arguments:

  • sink: callable to accumulate the results of the execution.

Raises:

  • ValueError: if disallowed functions are present in the tree of underlying Stream.

Returns:

a callable accepting value, key and timestamp and returning None

StreamingSeries.test

def test(value: Any,
         key: Any,
         timestamp: int,
         headers: Optional[Any] = None,
         ctx: Optional[MessageContext] = None) -> Any

[VIEW SOURCE]

A shorthand to test StreamingSeries with provided value

and MessageContext.

Arguments:

  • value: value to pass through StreamingSeries
  • ctx: instance of MessageContext, optional. Provide it if the StreamingSeries instance has functions calling get_current_key(). Default - None.

Returns:

result of StreamingSeries

StreamingSeries.isin

def isin(other: Container) -> Self

[VIEW SOURCE]

Check if series value is in "other".

Same as "StreamingSeries in other".

Runtime result will be a bool.

Example Snippet:

from quixstreams import Application

# Check if "str_column" is contained in a column with a list of strings and
# assign the resulting `bool` to a new column: "has_my_str".

sdf = app.dataframe()
sdf["has_my_str"] = sdf["str_column"].isin(sdf["column_with_list_of_strs"])

Arguments:

  • other: a container to check

Returns:

new StreamingSeries

StreamingSeries.contains

def contains(other: Union[Self, object]) -> Self

[VIEW SOURCE]

Check if series value contains "other"

Same as "other in StreamingSeries".

Runtime result will be a bool.

Example Snippet:

from quixstreams import Application

# Check if "column_a" contains "my_substring" and assign the resulting
# `bool` to a new column: "has_my_substr"

sdf = app.dataframe()
sdf["has_my_substr"] = sdf["column_a"].contains("my_substring")

Arguments:

  • other: object to check

Returns:

new StreamingSeries

StreamingSeries.is_

def is_(other: Union[Self, object]) -> Self

[VIEW SOURCE]

Check if series value refers to the same object as other

Runtime result will be a bool.

Example Snippet:

# Check if "column_a" is the same as "column_b" and assign the resulting `bool`
#  to a new column: "is_same"

from quixstreams import Application
sdf = app.dataframe()
sdf["is_same"] = sdf["column_a"].is_(sdf["column_b"])

Arguments:

  • other: object to check for "is"

Returns:

new StreamingSeries

StreamingSeries.isnot

def isnot(other: Union[Self, object]) -> Self

[VIEW SOURCE]

Check if series value does not refer to the same object as other

Runtime result will be a bool.

Example Snippet:

from quixstreams import Application

# Check if "column_a" is the same as "column_b" and assign the resulting `bool`
# to a new column: "is_not_same"

sdf = app.dataframe()
sdf["is_not_same"] = sdf["column_a"].isnot(sdf["column_b"])

Arguments:

  • other: object to check for "is_not"

Returns:

new StreamingSeries

StreamingSeries.isnull

def isnull() -> Self

[VIEW SOURCE]

Check if series value is None.

Runtime result will be a bool.

Example Snippet:

from quixstreams import Application

# Check if "column_a" is null and assign the resulting `bool` to a new column:
# "is_null"

sdf = app.dataframe()
sdf["is_null"] = sdf["column_a"].isnull()

Returns:

new StreamingSeries

StreamingSeries.notnull

def notnull() -> Self

[VIEW SOURCE]

Check if series value is not None.

Runtime result will be a bool.

Example Snippet:

from quixstreams import Application

# Check if "column_a" is not null and assign the resulting `bool` to a new column:
# "is_not_null"

sdf = app.dataframe()
sdf["is_not_null"] = sdf["column_a"].notnull()

Returns:

new StreamingSeries

StreamingSeries.abs

def abs() -> Self

[VIEW SOURCE]

Get absolute value of the series value.

Example Snippet:

from quixstreams import Application

# Get absolute value of "int_col" and add it to "other_int_col".
# Finally, assign the result to a new column: "abs_col_sum".

sdf = app.dataframe()
sdf["abs_col_sum"] = sdf["int_col"].abs() + sdf["other_int_col"]

Returns:

new StreamingSeries

quixstreams.dataframe

quixstreams.dataframe.utils

ensure_milliseconds

def ensure_milliseconds(delta: Union[int, timedelta]) -> int

[VIEW SOURCE]

Convert timedelta to milliseconds.

If the delta is not This function will also round the value to the closest milliseconds in case of higher precision.

Arguments:

  • delta: timedelta object

Returns:

timedelta value in milliseconds as int

quixstreams.dataframe.exceptions

quixstreams.dataframe.windows.sliding

SlidingWindow

class SlidingWindow(FixedTimeWindow)

[VIEW SOURCE]

SlidingWindow.process_window

def process_window(
    value: Any, timestamp_ms: int, state: WindowedState
) -> tuple[Iterable[WindowResult], Iterable[WindowResult]]

[VIEW SOURCE]

The algorithm is based on the concept that each message is associated with a left and a right window.

Left Window: - Begins at message timestamp - window size - Ends at message timestamp

Right Window: - Begins at message timestamp + 1 ms - Ends at message timestamp + 1 ms + window size

For example, for a window size of 10 and a message A arriving at timestamp 26:

0        10        20        30        40        50        60

----|---------|---------|---------|---------|---------|---------|---> A left window -> |---------||---------| <- right window 16 26 27 37

The algorithm scans backward through the window store: - Starting at: start_time = message timestamp + 1 ms (the right window's start time) - Ending at: start_time = message timestamp - 2 * window size

During this traversal, the algorithm performs the following actions:

  1. Determine if the right window should be created. If yes, locate the existing aggregation to copy to the new window.
  2. Determine if the right window of the previous record should be created. If yes, locate the existing aggregation and combine it with the incoming message.
  3. Locate and update the left window if it exists.
  4. If the left window does not exist, create it. Locate the existing aggregation and combine it with the incoming message.
  5. Locate and update all existing windows to which the new message belongs.

quixstreams.dataframe.windows.definitions

FixedTimeWindowDefinition

class FixedTimeWindowDefinition(abc.ABC)

[VIEW SOURCE]

FixedTimeWindowDefinition.sum

def sum() -> "FixedTimeWindow"

[VIEW SOURCE]

Configure the window to aggregate data by summing up values within

each window period.

Returns:

an instance of FixedTimeWindow configured to perform sum aggregation.

FixedTimeWindowDefinition.count

def count() -> "FixedTimeWindow"

[VIEW SOURCE]

Configure the window to aggregate data by counting the number of values

within each window period.

Returns:

an instance of FixedTimeWindow configured to perform record count.

FixedTimeWindowDefinition.mean

def mean() -> "FixedTimeWindow"

[VIEW SOURCE]

Configure the window to aggregate data by calculating the mean of the values

within each window period.

Returns:

an instance of FixedTimeWindow configured to calculate the mean of the values.

FixedTimeWindowDefinition.reduce

def reduce(reducer: Callable[[Any, Any], Any],
           initializer: Callable[[Any], Any]) -> "FixedTimeWindow"

[VIEW SOURCE]

Configure the window to perform a custom aggregation using reducer

and initializer functions.

Example Snippet:

sdf = StreamingDataFrame(...)

# Using "reduce()" to calculate multiple aggregates at once
def reducer(agg: dict, current: int):
    aggregated = {
        'min': min(agg['min'], current),
        'max': max(agg['max'], current),
        'count': agg['count'] + 1
    }
    return aggregated

def initializer(current) -> dict:
    return {'min': current, 'max': current, 'count': 1}

window = (
    sdf.tumbling_window(duration_ms=1000)
    .reduce(reducer=reducer, initializer=initializer)
    .final()
)

Arguments:

  • reducer: A function that takes two arguments (the accumulated value and a new value) and returns a single value. The returned value will be saved to the state store and sent downstream.
  • initializer: A function to call for every first element of the window. This function is used to initialize the aggregation within a window.

Returns:

A window configured to perform custom reduce aggregation on the data.

FixedTimeWindowDefinition.max

def max() -> "FixedTimeWindow"

[VIEW SOURCE]

Configure a window to aggregate the maximum value within each window period.

Returns:

an instance of FixedTimeWindow configured to calculate the maximum value within each window period.

FixedTimeWindowDefinition.min

def min() -> "FixedTimeWindow"

[VIEW SOURCE]

Configure a window to aggregate the minimum value within each window period.

Returns:

an instance of FixedTimeWindow configured to calculate the maximum value within each window period.

quixstreams.dataframe.windows

quixstreams.dataframe.windows.time_based

FixedTimeWindow

class FixedTimeWindow()

[VIEW SOURCE]

FixedTimeWindow.final

def final() -> "StreamingDataFrame"

[VIEW SOURCE]

Apply the window aggregation and return results only when the windows are closed.

The format of returned windows:

{
    "start": <window start time in milliseconds>,
    "end": <window end time in milliseconds>,
    "value: <aggregated window value>,
}

The individual window is closed when the event time (the maximum observed timestamp across the partition) passes its end timestamp + grace period. The closed windows cannot receive updates anymore and are considered final.

NOTE: Windows can be closed only within the same message key. If some message keys appear irregularly in the stream, the latest windows can remain unprocessed until the message the same key is received.

FixedTimeWindow.current

def current() -> "StreamingDataFrame"

[VIEW SOURCE]

Apply the window transformation to the StreamingDataFrame to return results for each updated window.

The format of returned windows:

{
    "start": <window start time in milliseconds>,
    "end": <window end time in milliseconds>,
    "value: <aggregated window value>,
}

This method processes streaming data and returns results as they come, regardless of whether the window is closed or not.

quixstreams.dataframe.windows.base

get_window_ranges

def get_window_ranges(timestamp_ms: int,
                      duration_ms: int,
                      step_ms: Optional[int] = None) -> Deque[Tuple[int, int]]

[VIEW SOURCE]

Get a list of window ranges for the given timestamp.

Arguments:

  • timestamp_ms: timestamp in milliseconds
  • duration_ms: window duration in milliseconds
  • step_ms: window step in milliseconds for hopping windows, optional.

Returns:

a list of (, ) tuples

quixstreams.dataframe.base

quixstreams.rowproducer

RowProducer

class RowProducer()

[VIEW SOURCE]

A producer class that is capable of serializing Rows to bytes and send them to Kafka.

The serialization is performed according to the Topic serialization settings.

Arguments:

  • broker_address: Connection settings for Kafka. Accepts string with Kafka broker host and port formatted as <host>:<port>, or a ConnectionConfig object if authentication is required.
  • extra_config: A dictionary with additional options that will be passed to confluent_kafka.Producer as is. Note: values passed as arguments override values in extra_config.
  • on_error: a callback triggered when RowProducer.produce_row() or RowProducer.poll() fail. If producer fails and the callback returnsTrue, the exception will be logged but not propagated. The default callback logs an exception and returnsFalse`.
  • flush_timeout: The time the producer is waiting for all messages to be delivered.
  • transactional: whether to use Kafka transactions or not. Note this changes which underlying Producer class is used.

RowProducer.produce_row

def produce_row(row: Row,
                topic: Topic,
                key: Optional[Any] = _KEY_UNSET,
                partition: Optional[int] = None,
                timestamp: Optional[int] = None)

[VIEW SOURCE]

Serialize Row to bytes according to the Topic serialization settings

and produce it to Kafka

If this method fails, it will trigger the provided "on_error" callback.

Arguments:

  • row: Row object
  • topic: Topic object
  • key: message key, optional
  • partition: partition number, optional
  • timestamp: timestamp in milliseconds, optional

RowProducer.poll

def poll(timeout: float = 0)

[VIEW SOURCE]

Polls the producer for events and calls on_delivery callbacks.

If poll() fails, it will trigger the provided "on_error" callback

Arguments:

  • timeout: timeout in seconds

RowProducer.abort_transaction

def abort_transaction(timeout: Optional[float] = None)

[VIEW SOURCE]

Attempt an abort if an active transaction.

Else, skip since it throws an exception if at least one transaction was successfully completed at some point.

This avoids polluting the stack trace in the case where a transaction was not active as expected (because of some other exception already raised) and a cleanup abort is attempted.

NOTE: under normal circumstances a transaction will be open due to how the Checkpoint inits another immediately after committing.

quixstreams.core.stream

quixstreams.core.stream.stream

Stream

class Stream()

[VIEW SOURCE]

Stream.__init__

def __init__(func: Optional[StreamFunction] = None,
             parent: Optional[Self] = None)

[VIEW SOURCE]

A base class for all streaming operations.

Stream is an abstraction of a function pipeline. Each Stream has a function and a parent (None by default). When adding new function to the stream, it creates a new Stream object and sets "parent" to the previous Stream to maintain an order of execution.

Streams supports four types of functions:

  • "Apply" - generate new values based on a previous one. The result of an Apply function is passed downstream to the next functions. If "expand=True" is passed and the function returns an Iterable, each item of it will be treated as a separate value downstream.
  • "Update" - update values in-place. The result of an Update function is always ignored, and its input is passed downstream.
  • "Filter" - to filter values from the Stream. The result of a Filter function is interpreted as boolean. If it's True, the input will be passed downstream. If it's False, the record will be filtered from the stream.
  • "Transform" - to transform keys and timestamps along with the values. "Transform" functions may change the keys and should be used with caution. The result of the Transform function is passed downstream to the next functions. If "expand=True" is passed and the function returns an Iterable, each item of it will be treated as a separate value downstream.

To execute the functions on the Stream, call .compose() method, and it will return a closure to execute all the functions accumulated in the Stream and its parents.

Arguments:

  • func: a function to be called on the stream. It is expected to be wrapped into one of "Apply", "Filter", "Update" or "Trasform" from quixstreams.core.stream.functions package. Default - "ApplyFunction(lambda value: value)".
  • parent: a parent Stream

Stream.add_filter

def add_filter(func: Union[FilterCallback, FilterWithMetadataCallback],
               *,
               metadata: bool = False) -> Self

[VIEW SOURCE]

Add a function to filter values from the Stream.

The return value of the function will be interpreted as bool. If the function returns False-like result, the Stream will raise Filtered exception during execution.

Arguments:

  • func: a function to filter values from the stream
  • metadata: if True, the callback will receive key and timestamp along with the value. Default - False.

Returns:

a new Stream derived from the current one

Stream.add_apply

def add_apply(func: Union[
    ApplyCallback,
    ApplyExpandedCallback,
    ApplyWithMetadataCallback,
    ApplyWithMetadataExpandedCallback,
],
              *,
              expand: bool = False,
              metadata: bool = False) -> Self

[VIEW SOURCE]

Add an "apply" function to the Stream.

The function is supposed to return a new value, which will be passed further during execution.

Arguments:

  • func: a function to generate a new value
  • expand: if True, expand the returned iterable into individual values downstream. If returned value is not iterable, TypeError will be raised. Default - False.
  • metadata: if True, the callback will receive key and timestamp along with the value. Default - False.

Returns:

a new Stream derived from the current one

Stream.add_update

def add_update(func: Union[UpdateCallback, UpdateWithMetadataCallback],
               *,
               metadata: bool = False) -> Self

[VIEW SOURCE]

Add an "update" function to the Stream, that will mutate the input value.

The return of this function will be ignored and its input will be passed downstream.

Arguments:

  • func: a function to mutate the value
  • metadata: if True, the callback will receive key and timestamp along with the value. Default - False.

Returns:

a new Stream derived from the current one

Stream.add_transform

def add_transform(func: Union[TransformCallback, TransformExpandedCallback],
                  *,
                  expand: bool = False) -> Self

[VIEW SOURCE]

Add a "transform" function to the Stream, that will mutate the input value.

The callback must accept a value, a key, and a timestamp. It's expected to return a new value, new key and new timestamp.

The result of the callback which will be passed downstream during execution.

Arguments:

  • func: a function to mutate the value
  • expand: if True, expand the returned iterable into individual items downstream. If returned value is not iterable, TypeError will be raised. Default - False.

Returns:

a new Stream derived from the current one

Stream.diff

def diff(other: "Stream") -> Self

[VIEW SOURCE]

Takes the difference between Streams self and other based on their last

common parent, and returns a new, independent Stream that includes only this difference (the start of the "diff" will have no parent).

It's impossible to calculate a diff when: - Streams don't have a common parent. - When the self Stream already includes all the nodes from the other Stream, and the resulting diff is empty.

Arguments:

  • other: a Stream to take a diff from.

Raises:

  • ValueError: if Streams don't have a common parent, if the diff is empty, or pruning failed.

Returns:

a new independent Stream instance whose root begins at the diff

Stream.root_path

def root_path(allow_splits=True) -> List[Self]

[VIEW SOURCE]

Return a list of all parent Streams including the node itself.

Can optionally stop at a first encountered split with allow_splits=False

The tree is ordered from parent to child (current node comes last).

Returns:

a list of Stream objects

Stream.full_tree

def full_tree() -> List[Self]

[VIEW SOURCE]

Starts at tree root and finds every Stream in the tree (including splits).

Returns:

The collection of all Streams interconnected to this one

Stream.compose

def compose(
    allow_filters=True,
    allow_expands=True,
    allow_updates=True,
    allow_transforms=True,
    sink: Optional[Callable[[Any, Any, int, Any],
                            None]] = None) -> VoidExecutor

[VIEW SOURCE]

Generate an "executor" closure by mapping all relatives of this Stream and

composing their functions together.

The resulting "executor" can be called with a given value, key, timestamp, and headers (i.e. a Kafka message).

By default, executor doesn't return the result of the execution. To accumulate the results, pass the sink parameter.

Arguments:

  • allow_filters: If False, this function will fail with ValueError if the stream has filter functions in the tree. Default - True.
  • allow_updates: If False, this function will fail with ValueError if the stream has update functions in the tree. Default - True.
  • allow_expands: If False, this function will fail with ValueError if the stream has functions with "expand=True" in the tree. Default - True.
  • allow_transforms: If False, this function will fail with ValueError if the stream has transform functions in the tree. Default - True.
  • sink: callable to accumulate the results of the execution, optional.

Stream.compose_returning

def compose_returning() -> ReturningExecutor

[VIEW SOURCE]

Compose a list of functions from this Stream and its parents into one big closure that always returns the transformed record.

This closure is to be used to execute the functions in the stream and to get the result of the transformations.

Stream may only contain simple "apply" functions to be able to compose itself into a returning function.

quixstreams.core.stream.functions.update

UpdateFunction

class UpdateFunction(StreamFunction)

[VIEW SOURCE]

Wrap a function into an "Update" function.

The provided function must accept a value, and it's expected to mutate it or to perform some side effect.

The result of the callback is always ignored, and the original input is passed downstream.

UpdateWithMetadataFunction

class UpdateWithMetadataFunction(StreamFunction)

[VIEW SOURCE]

Wrap a function into an "Update" function.

The provided function must accept a value, a key, and a timestamp. The callback is expected to mutate the value or to perform some side effect with it.

The result of the callback is always ignored, and the original input is passed downstream.

quixstreams.core.stream.functions

quixstreams.core.stream.functions.types

quixstreams.core.stream.functions.utils

pickle_copier

def pickle_copier(obj: T) -> Callable[[], T]

[VIEW SOURCE]

A utility function to copy objects using a "pickle" library.

On average, it's faster than "copy.deepcopy". It accepts an object and returns a callable creating copies of this object.

Arguments:

  • obj: an object to copy

quixstreams.core.stream.functions.transform

TransformFunction

class TransformFunction(StreamFunction)

[VIEW SOURCE]

Wrap a function into a "Transform" function.

The provided callback must accept a value, a key and a timestamp. It's expected to return a new value, new key and new timestamp.

This function must be used with caution, because it can technically change the key. It's supposed to be used by the library internals and not be a part of the public API.

The result of the callback will always be passed downstream.

quixstreams.core.stream.functions.filter

FilterFunction

class FilterFunction(StreamFunction)

[VIEW SOURCE]

Wraps a function into a "Filter" function. The result of a Filter function is interpreted as boolean. If it's True, the input will be return downstream. If it's False, the Filtered exception will be raised to signal that the value is filtered out.

FilterWithMetadataFunction

class FilterWithMetadataFunction(StreamFunction)

[VIEW SOURCE]

Wraps a function into a "Filter" function.

The passed callback must accept value, key, and timestamp, and it's expected to return a boolean-like result.

If the result is True, the input will be passed downstream. Otherwise, the value will be filtered out.

quixstreams.core.stream.functions.base

StreamFunction

class StreamFunction(abc.ABC)

[VIEW SOURCE]

A base class for all the streaming operations in Quix Streams.

It provides a get_executor method to return a closure to be called with the input values.

StreamFunction.get_executor

@abc.abstractmethod
def get_executor(*child_executors: VoidExecutor) -> VoidExecutor

[VIEW SOURCE]

Returns a wrapper to be called on a value, key, timestamp and headers.

quixstreams.core.stream.functions.apply

ApplyFunction

class ApplyFunction(StreamFunction)

[VIEW SOURCE]

Wrap a function into "Apply" function.

The provided callback is expected to return a new value based on input, and its result will always be passed downstream.

ApplyWithMetadataFunction

class ApplyWithMetadataFunction(StreamFunction)

[VIEW SOURCE]

Wrap a function into "Apply" function.

The provided function is expected to accept value, and timestamp and return a new value based on input, and its result will always be passed downstream.

quixstreams.core

quixstreams.processing

quixstreams.processing.context

ProcessingContext

@dataclasses.dataclass
class ProcessingContext()

[VIEW SOURCE]

A class to share processing-related objects between Application and StreamingDataFrame instances.

ProcessingContext.store_offset

def store_offset(topic: str, partition: int, offset: int)

[VIEW SOURCE]

Store the offset of the processed message to the checkpoint.

Arguments:

  • topic: topic name
  • partition: partition number
  • offset: message offset

ProcessingContext.init_checkpoint

def init_checkpoint()

[VIEW SOURCE]

Initialize a new checkpoint

ProcessingContext.commit_checkpoint

def commit_checkpoint(force: bool = False)

[VIEW SOURCE]

Attempts finalizing the current Checkpoint only if the Checkpoint is "expired",

or force=True is passed, otherwise do nothing.

To finalize: the Checkpoint will be committed if it has any stored offsets, else just close it. A new Checkpoint is then created.

Arguments:

  • force: if True, commit the Checkpoint before its expiration deadline.

quixstreams.processing.pausing

PausingManager

class PausingManager()

[VIEW SOURCE]

A class to temporarily pause topic partitions and resume them after the timeout is elapsed.

PausingManager.pause

def pause(topic: str, partition: int, offset_to_seek: int,
          resume_after: float)

[VIEW SOURCE]

Pause the topic-partition for a certain period of time.

This method is supposed to be called in case of backpressure from Sinks.

PausingManager.is_paused

def is_paused(topic: str, partition: int) -> bool

[VIEW SOURCE]

Check if the topic-partition is already paused

PausingManager.resume_if_ready

def resume_if_ready()

[VIEW SOURCE]

Resume consuming from topic-partitions after the wait period has elapsed.

PausingManager.revoke

def revoke(topic: str, partition: int)

[VIEW SOURCE]

Remove partition from the list of paused TPs if it's revoked

quixstreams.sinks.core.influxdb3

InfluxDB3Sink

class InfluxDB3Sink(BatchingSink)

[VIEW SOURCE]

InfluxDB3Sink.__init__

def __init__(token: str,
             host: str,
             organization_id: str,
             database: str,
             measurement: str,
             fields_keys: Iterable[str] = (),
             tags_keys: Iterable[str] = (),
             time_key: Optional[str] = None,
             time_precision: WritePrecision = WritePrecision.MS,
             include_metadata_tags: bool = False,
             batch_size: int = 1000,
             enable_gzip: bool = True,
             request_timeout_ms: int = 10_000,
             debug: bool = False)

[VIEW SOURCE]

A connector to sink processed data to InfluxDB v3.

It batches the processed records in memory per topic partition, converts them to the InfluxDB format, and flushes them to InfluxDB at the checkpoint.

The InfluxDB sink transparently handles backpressure if the destination instance cannot accept more data at the moment (e.g., when InfluxDB returns an HTTP 429 error with the "retry_after" header set). When this happens, the sink will notify the Application to pause consuming from the backpressured topic partition until the "retry_after" timeout elapses.

NOTE: InfluxDB3Sink can accept only dictionaries. If the record values are not dicts, you need to convert them to dicts before sinking.

Arguments:

  • token: InfluxDB access token
  • host: InfluxDB host in format "https://"
  • organization_id: InfluxDB organization_id
  • database: database name
  • fields_keys: a list of keys to be used as "fields" when writing to InfluxDB. If present, it must not overlap with "tags_keys". If empty, the whole record value will be used.

    NOTE The fields' values can only be strings, floats, integers, or booleans. Default - ().

  • tags_keys: a list of keys to be used as "tags" when writing to InfluxDB. If present, it must not overlap with "fields_keys". These keys will be popped from the value dictionary automatically because InfluxDB doesn't allow the same keys be both in tags and fields. If empty, no tags will be sent.

    NOTE: InfluxDB client always converts tag values to strings. Default - ().

  • time_key: a key to be used as "time" when writing to InfluxDB. By default, the record timestamp will be used with "ms" time precision. When using a custom key, you may need to adjust the time_precision setting to match.
  • time_precision: a time precision to use when writing to InfluxDB.
  • include_metadata_tags: if True, includes record's key, topic, and partition as tags. Default - False.
  • batch_size: how many records to write to InfluxDB in one request. Note that it only affects the size of one write request, and not the number of records flushed on each checkpoint. Default - 1000.
  • enable_gzip: if True, enables gzip compression for writes. Default - True.
  • request_timeout_ms: an HTTP request timeout in milliseconds. Default - 10000.
  • debug: if True, print debug logs from InfluxDB client. Default - False.

quixstreams.sinks.core

quixstreams.sinks.core.csv

CSVSink

class CSVSink(BatchingSink)

[VIEW SOURCE]

CSVSink.__init__

def __init__(path: str,
             dialect: str = "excel",
             key_serializer: Callable[[Any], str] = str,
             value_serializer: Callable[[Any], str] = json.dumps)

[VIEW SOURCE]

A base CSV sink that writes data from all assigned partitions to a single file.

It's best to be used for local debugging.

Column format: (key, value, timestamp, topic, partition, offset)

Arguments:

  • path: a path to CSV file
  • dialect: a CSV dialect to use. It affects quoting and delimiters. See the "csv" module docs for more info. Default - "excel".
  • key_serializer: a callable to convert keys to strings. Default - str.
  • value_serializer: a callable to convert values to strings. Default - json.dumps.

quixstreams.sinks

quixstreams.sinks.community.postgresql

PostgreSQLSink

class PostgreSQLSink(BatchingSink)

[VIEW SOURCE]

PostgreSQLSink.__init__

def __init__(host: str,
             port: int,
             dbname: str,
             user: str,
             password: str,
             table_name: str,
             schema_auto_update: bool = True,
             **kwargs)

[VIEW SOURCE]

A connector to sink topic data to PostgreSQL.

Arguments:

  • host: PostgreSQL server address.
  • port: PostgreSQL server port.
  • dbname: PostgreSQL database name.
  • user: Database user name.
  • password: Database user password.
  • table_name: PostgreSQL table name.
  • schema_auto_update: Automatically update the schema when new columns are detected.
  • ddl_timeout: Timeout for DDL operations such as table creation or schema updates.
  • kwargs: Additional parameters for psycopg2.connect.

quixstreams.sinks.community.file.formats.parquet

ParquetFormat

class ParquetFormat(Format)

[VIEW SOURCE]

Serializes batches of messages into Parquet format.

This class provides functionality to serialize a SinkBatch into bytes in Parquet format using PyArrow. It allows setting the file extension and compression algorithm used for the Parquet files.

This format does not support appending to existing files.

ParquetFormat.__init__

def __init__(file_extension: str = ".parquet",
             compression: Compression = "snappy") -> None

[VIEW SOURCE]

Initializes the ParquetFormat.

Arguments:

  • file_extension: The file extension to use for output files. Defaults to ".parquet".
  • compression: The compression algorithm to use for Parquet files. Allowed values are "none", "snappy", "gzip", "brotli", "lz4", or "zstd". Defaults to "snappy".

ParquetFormat.file_extension

@property
def file_extension() -> str

[VIEW SOURCE]

Returns the file extension used for output files.

Returns:

The file extension as a string.

ParquetFormat.serialize

def serialize(batch: SinkBatch) -> bytes

[VIEW SOURCE]

Serializes a SinkBatch into bytes in Parquet format.

Each item in the batch is converted into a dictionary with "_timestamp", "_key", and the keys from the message value. If the message key is in bytes, it is decoded to a string.

Missing fields in messages are filled with None to ensure all rows have the same columns.

Arguments:

  • batch: The SinkBatch to serialize.

Returns:

The serialized batch as bytes in Parquet format.

quixstreams.sinks.community.file.formats

InvalidFormatError

class InvalidFormatError(Exception)

[VIEW SOURCE]

Raised when the format is specified incorrectly.

resolve_format

def resolve_format(format: Union[FormatName, Format]) -> Format

[VIEW SOURCE]

Resolves the format into a Format instance.

Arguments:

  • format: The format to resolve, either a format name ("json", "parquet") or a Format instance.

Raises:

  • InvalidFormatError: If the format name is invalid.

Returns:

An instance of Format corresponding to the specified format.

quixstreams.sinks.community.file.formats.json

JSONFormat

class JSONFormat(Format)

[VIEW SOURCE]

Serializes batches of messages into JSON Lines format with optional gzip compression.

This class provides functionality to serialize a SinkBatch into bytes in JSON Lines format. It supports optional gzip compression and allows for custom JSON serialization through the dumps parameter.

This format supports appending to existing files.

JSONFormat.__init__

def __init__(file_extension: str = ".jsonl",
             compress: bool = False,
             dumps: Optional[Callable[[Any], str]] = None) -> None

[VIEW SOURCE]

Initializes the JSONFormat.

Arguments:

  • file_extension: The file extension to use for output files. Defaults to ".jsonl".
  • compress: If True, compresses the output using gzip and appends ".gz" to the file extension. Defaults to False.
  • dumps: A custom function to serialize objects to JSON-formatted strings. If provided, the compact option is ignored.

JSONFormat.file_extension

@property
def file_extension() -> str

[VIEW SOURCE]

Returns the file extension used for output files.

Returns:

The file extension as a string.

JSONFormat.serialize

def serialize(batch: SinkBatch) -> bytes

[VIEW SOURCE]

Serializes a SinkBatch into bytes in JSON Lines format.

Each item in the batch is converted into a JSON object with "_timestamp", "_key", and "_value" fields. If the message key is in bytes, it is decoded to a string.

Arguments:

  • batch: The SinkBatch to serialize.

Returns:

The serialized batch in JSON Lines format, optionally compressed with gzip.

quixstreams.sinks.community.file.formats.base

Format

class Format(ABC)

[VIEW SOURCE]

Base class for formatting batches in file sinks.

This abstract base class defines the interface for batch formatting in file sinks. Subclasses should implement the file_extension property and the serialize method to define how batches are formatted and saved.

Format.file_extension

@property
@abstractmethod
def file_extension() -> str

[VIEW SOURCE]

Returns the file extension used for output files.

Returns:

The file extension as a string.

Format.supports_append

@property
@abstractmethod
def supports_append() -> bool

[VIEW SOURCE]

Indicates if the format supports appending data to an existing file.

Returns:

True if appending is supported, otherwise False.

Format.serialize

@abstractmethod
def serialize(batch: SinkBatch) -> bytes

[VIEW SOURCE]

Serializes a batch of messages into bytes.

Arguments:

  • batch: The batch of messages to serialize.

Returns:

The serialized batch as bytes.

quixstreams.sinks.community.file.sink

FileSink

class FileSink(BatchingSink)

[VIEW SOURCE]

A sink that writes data batches to files using configurable formats and destinations.

The sink groups messages by their topic and partition, ensuring data from the same source is stored together. Each batch is serialized using the specified format (e.g., JSON, Parquet) before being written to the configured destination.

The destination determines the storage location and write behavior. By default, it uses LocalDestination for writing to the local filesystem, but can be configured to use other storage backends (e.g., cloud storage).

FileSink.__init__

def __init__(directory: str = "",
             format: Union[FormatName, Format] = "json",
             destination: Optional[Destination] = None) -> None

[VIEW SOURCE]

Initialize the FileSink with the specified configuration.

Arguments:

  • directory: Base directory path for storing files. Defaults to current directory.
  • format: Data serialization format, either as a string ("json", "parquet") or a Format instance.
  • destination: Storage destination handler. Defaults to LocalDestination if not specified.

FileSink.write

def write(batch: SinkBatch) -> None

[VIEW SOURCE]

Write a batch of data using the configured format and destination.

The method performs the following steps: 1. Serializes the batch data using the configured format 2. Writes the serialized data to the destination 3. Handles any write failures by raising a backpressure error

Arguments:

  • batch: The batch of data to write.

Raises:

  • SinkBackpressureError: If the write operation fails, indicating that the sink needs backpressure with a 5-second retry delay.

quixstreams.sinks.community.file.destinations.local

LocalDestination

class LocalDestination(Destination)

[VIEW SOURCE]

A destination that writes data to the local filesystem.

Handles writing data to local files with support for both creating new files and appending to existing ones.

LocalDestination.__init__

def __init__(append: bool = False) -> None

[VIEW SOURCE]

Initialize the local destination.

Arguments:

  • append: If True, append to existing files instead of creating new ones. Defaults to False.

LocalDestination.set_extension

def set_extension(format: Format) -> None

[VIEW SOURCE]

Set the file extension and validate append mode compatibility.

Arguments:

  • format: The Format instance that defines the file extension.

Raises:

  • ValueError: If append mode is enabled but the format doesn't support appending.

LocalDestination.write

def write(data: bytes, batch: SinkBatch) -> None

[VIEW SOURCE]

Write data to a local file.

Arguments:

  • data: The serialized data to write.
  • batch: The batch information containing topic and partition details.

quixstreams.sinks.community.file.destinations

quixstreams.sinks.community.file.destinations.s3

S3BucketNotFoundError

class S3BucketNotFoundError(Exception)

[VIEW SOURCE]

Raised when the specified S3 bucket does not exist.

S3BucketAccessDeniedError

class S3BucketAccessDeniedError(Exception)

[VIEW SOURCE]

Raised when the specified S3 bucket access is denied.

S3Destination

class S3Destination(Destination)

[VIEW SOURCE]

A destination that writes data to Amazon S3.

Handles writing data to S3 buckets using the AWS SDK. Credentials can be provided directly or via environment variables.

S3Destination.__init__

def __init__(bucket: str,
             aws_access_key_id: Optional[str] = getenv("AWS_ACCESS_KEY_ID"),
             aws_secret_access_key: Optional[str] = getenv(
                 "AWS_SECRET_ACCESS_KEY"),
             region_name: Optional[str] = getenv("AWS_REGION",
                                                 getenv("AWS_DEFAULT_REGION")),
             **kwargs) -> None

[VIEW SOURCE]

Initialize the S3 destination.

Arguments:

  • bucket: Name of the S3 bucket to write to.
  • aws_access_key_id: AWS access key ID. Defaults to AWS_ACCESS_KEY_ID environment variable.
  • aws_secret_access_key: AWS secret access key. Defaults to AWS_SECRET_ACCESS_KEY environment variable.
  • region_name: AWS region name. Defaults to AWS_REGION or AWS_DEFAULT_REGION environment variable.
  • kwargs: Additional keyword arguments passed to boto3.client.

Raises:

  • S3BucketNotFoundError: If the specified bucket doesn't exist.
  • S3BucketAccessDeniedError: If access to the bucket is denied.

S3Destination.write

def write(data: bytes, batch: SinkBatch) -> None

[VIEW SOURCE]

Write data to S3.

Arguments:

  • data: The serialized data to write.
  • batch: The batch information containing topic and partition details.

quixstreams.sinks.community.file.destinations.base

Destination

class Destination(ABC)

[VIEW SOURCE]

Abstract base class for defining where and how data should be stored.

Destinations handle the storage of serialized data, whether that's to local disk, cloud storage, or other locations. They manage the physical writing of data while maintaining a consistent directory/path structure based on topics and partitions.

Destination.set_directory

def set_directory(directory: str) -> None

[VIEW SOURCE]

Configure the base directory for storing files.

Arguments:

  • directory: The base directory path where files will be stored.

Raises:

  • ValueError: If the directory path contains invalid characters. Only alphanumeric characters (a-zA-Z0-9), spaces, dots, and underscores are allowed.

Destination.set_extension

def set_extension(format: Format) -> None

[VIEW SOURCE]

Set the file extension based on the format.

Arguments:

  • format: The Format instance that defines the file extension.

Destination.write

@abstractmethod
def write(data: bytes, batch: SinkBatch) -> None

[VIEW SOURCE]

Write the serialized data to storage.

Arguments:

  • data: The serialized data to write.
  • batch: The batch information containing topic, partition and offset details.

quixstreams.sinks.community.file

quixstreams.sinks.community.bigquery

BigQuerySink

class BigQuerySink(BatchingSink)

[VIEW SOURCE]

BigQuerySink.__init__

def __init__(project_id: str,
             location: str,
             dataset_id: str,
             table_name: str,
             service_account_json: Optional[str] = None,
             schema_auto_update: bool = True,
             ddl_timeout: float = 10.0,
             insert_timeout: float = 10.0,
             retry_timeout: float = 30.0,
             **kwargs)

[VIEW SOURCE]

A connector to sink processed data to Google Cloud BigQuery.

It batches the processed records in memory per topic partition, and flushes them to BigQuery at the checkpoint.

NOTE: BigQuerySink can accept only dictionaries. If the record values are not dicts, you need to convert them to dicts before sinking.

The column names and types are inferred from individual records. Each key in the record's dictionary will be inserted as a column to the resulting BigQuery table.

If the column is not present in the schema, the sink will try to add new nullable columns on the fly with types inferred from individual values. The existing columns will not be affected. To disable this behavior, pass schema_auto_update=False and define the necessary schema upfront. The minimal schema must define two columns: "timestamp" of type TIMESTAMP, and "__key" with a type of the expected message key.

Arguments:

  • project_id: a Google project id.
  • location: a BigQuery location.
  • dataset_id: a BigQuery dataset id. If the dataset does not exist, the sink will try to create it.
  • table_name: BigQuery table name. If the table does not exist, the sink will try to create it with a default schema.
  • service_account_json: an optional JSON string with service account credentials to connect to BigQuery. The internal google.cloud.bigquery.Client will use the Application Default Credentials if not provided. See https://cloud.google.com/docs/authentication/provide-credentials-adc for more info. Default - None.
  • schema_auto_update: if True, the sink will try to create a dataset and a table if they don't exist. It will also add missing columns on the fly with types inferred from individual values.
  • ddl_timeout: a timeout for a single DDL operation (adding tables, columns, etc.). Default - 10s.
  • insert_timeout: a timeout for a single INSERT operation. Default - 10s.
  • retry_timeout: a total timeout for each request to BigQuery API. During this timeout, a request can be retried according to the client's default retrying policy.
  • kwargs: Additional keyword arguments passed to bigquery.Client.

quixstreams.sinks.community.kinesis

KinesisStreamNotFoundError

class KinesisStreamNotFoundError(Exception)

[VIEW SOURCE]

Raised when the specified Kinesis stream does not exist.

KinesisSink

class KinesisSink(BaseSink)

[VIEW SOURCE]

KinesisSink.__init__

def __init__(stream_name: str,
             aws_access_key_id: Optional[str] = getenv("AWS_ACCESS_KEY_ID"),
             aws_secret_access_key: Optional[str] = getenv(
                 "AWS_SECRET_ACCESS_KEY"),
             region_name: Optional[str] = getenv("AWS_REGION",
                                                 getenv("AWS_DEFAULT_REGION")),
             value_serializer: Callable[[Any], str] = json.dumps,
             key_serializer: Callable[[Any], str] = bytes.decode,
             **kwargs) -> None

[VIEW SOURCE]

Initialize the KinesisSink.

Arguments:

  • stream_name: Kinesis stream name.
  • aws_access_key_id: AWS access key ID.
  • aws_secret_access_key: AWS secret access key.
  • region_name: AWS region name (e.g., 'us-east-1').
  • value_serializer: Function to serialize the value to string (defaults to json.dumps).
  • key_serializer: Function to serialize the key to string (defaults to bytes.decode).
  • kwargs: Additional keyword arguments passed to boto3.client.

KinesisSink.add

def add(value: Any, key: Any, timestamp: int, headers: HeadersTuples,
        topic: str, partition: int, offset: int) -> None

[VIEW SOURCE]

Buffer a record for the Kinesis stream.

Records are buffered until the batch size reaches 500, at which point they are sent immediately. If the batch size is less than 500, records will be sent when the flush method is called.

KinesisSink.flush

def flush(topic: str, partition: int) -> None

[VIEW SOURCE]

Flush all buffered records for a given topic-partition.

This method sends any outstanding records that have not yet been sent because the batch size was less than 500. It waits for all futures to complete, ensuring that all records are successfully sent to the Kinesis stream.

quixstreams.sinks.community

This module contains Sinks developed and maintained by the members of Quix Streams community.

quixstreams.sinks.community.redis

RedisSink

class RedisSink(BatchingSink)

[VIEW SOURCE]

RedisSink.__init__

def __init__(host: str,
             port: int,
             db: int,
             value_serializer: Callable[[Any], Union[bytes, str]] = json.dumps,
             key_serializer: Optional[Callable[[Any, Any], Union[bytes,
                                                                 str]]] = None,
             password: Optional[str] = None,
             socket_timeout: float = 30.0,
             **kwargs) -> None

[VIEW SOURCE]

A connector to sink processed data to Redis.

It batches the processed records in memory per topic partition, and flushes them to Redis at the checkpoint.

Arguments:

  • host: Redis host.
  • port: Redis port.
  • db: Redis DB number.
  • value_serializer: a callable to serialize the value to string or bytes (defaults to json.dumps).
  • key_serializer: an optional callable to serialize the key to string or bytes. If not provided, the Kafka message key will be used as is.
  • password: Redis password, optional.
  • socket_timeout: Redis socket timeout. Default - 30s.
  • kwargs: Additional keyword arguments passed to the redis.Redis instance.

quixstreams.sinks.community.iceberg

AWSIcebergConfig

class AWSIcebergConfig(BaseIcebergConfig)

[VIEW SOURCE]

AWSIcebergConfig.__init__

def __init__(aws_s3_uri: str,
             aws_region: Optional[str] = None,
             aws_access_key_id: Optional[str] = None,
             aws_secret_access_key: Optional[str] = None,
             aws_session_token: Optional[str] = None)

[VIEW SOURCE]

Configure IcebergSink to work with AWS Glue.

Arguments:

  • aws_s3_uri: The S3 URI where the table data will be stored (e.g., 's3://your-bucket/warehouse/').
  • aws_region: The AWS region for the S3 bucket and Glue catalog.
  • aws_access_key_id: the AWS access key ID. NOTE: can alternatively set the AWS_ACCESS_KEY_ID environment variable when using AWS Glue.
  • aws_secret_access_key: the AWS secret access key. NOTE: can alternatively set the AWS_SECRET_ACCESS_KEY environment variable when using AWS Glue.
  • aws_session_token: a session token (or will be generated for you). NOTE: can alternatively set the AWS_SESSION_TOKEN environment variable when using AWS Glue.

IcebergSink

class IcebergSink(BatchingSink)

[VIEW SOURCE]

IcebergSink writes batches of data to an Apache Iceberg table.

The data will by default include the kafka message key, value, and timestamp.

It serializes incoming data batches into Parquet format and appends them to the Iceberg table, updating the table schema as necessary.

Currently, supports Apache Iceberg hosted in:

  • AWS

Supported data catalogs:

  • AWS Glue

Arguments:

  • table_name: The name of the Iceberg table.
  • config: An IcebergConfig with all the various connection parameters.
  • data_catalog_spec: data cataloger to use (ex. for AWS Glue, "aws_glue").
  • schema: The Iceberg table schema. If None, a default schema is used.
  • partition_spec: The partition specification for the table. If None, a default is used.

Example setup using an AWS-hosted Iceberg with AWS Glue:

from quixstreams import Application
from quixstreams.sinks.community.iceberg import IcebergSink, AWSIcebergConfig

# Configure S3 bucket credentials
iceberg_config = AWSIcebergConfig(
    aws_s3_uri="", aws_region="", aws_access_key_id="", aws_secret_access_key=""
)

# Configure the sink to write data to S3 with the AWS Glue catalog spec
iceberg_sink = IcebergSink(
    table_name="glue.sink-test",
    config=iceberg_config,
    data_catalog_spec="aws_glue",
)

app = Application(broker_address='localhost:9092', auto_offset_reset="earliest")
topic = app.topic('sink_topic')

# Do some processing here
sdf = app.dataframe(topic=topic).print(metadata=True)

# Sink results to the IcebergSink
sdf.sink(iceberg_sink)


if __name__ == "__main__":
    # Start the application
    app.run()

IcebergSink.write

def write(batch: SinkBatch)

[VIEW SOURCE]

Writes a batch of data to the Iceberg table.

Implements retry logic to handle concurrent write conflicts.

Arguments:

  • batch: The batch of data to write.

quixstreams.sinks.community.pubsub

PubSubTopicNotFoundError

class PubSubTopicNotFoundError(Exception)

[VIEW SOURCE]

Raised when the specified topic does not exist.

PubSubSink

class PubSubSink(BaseSink)

[VIEW SOURCE]

A sink that publishes messages to Google Cloud Pub/Sub.

PubSubSink.__init__

def __init__(project_id: str,
             topic_id: str,
             service_account_json: Optional[str] = None,
             value_serializer: Callable[[Any], Union[bytes, str]] = json.dumps,
             key_serializer: Callable[[Any], str] = bytes.decode,
             flush_timeout: int = 5,
             **kwargs) -> None

[VIEW SOURCE]

Initialize the PubSubSink.

Arguments:

  • project_id: GCP project ID.
  • topic_id: Pub/Sub topic ID.
  • service_account_json: an optional JSON string with service account credentials to connect to Pub/Sub. The internal PublisherClient will use the Application Default Credentials if not provided. See https://cloud.google.com/docs/authentication/provide-credentials-adc for more info. Default - None.
  • value_serializer: Function to serialize the value to string or bytes (defaults to json.dumps).
  • key_serializer: Function to serialize the key to string (defaults to bytes.decode).
  • kwargs: Additional keyword arguments passed to PublisherClient.

PubSubSink.add

def add(value: Any, key: Any, timestamp: int, headers: HeadersTuples,
        topic: str, partition: int, offset: int) -> None

[VIEW SOURCE]

Publish a message to Pub/Sub.

PubSubSink.flush

def flush(topic: str, partition: int) -> None

[VIEW SOURCE]

Wait for all publish operations to complete successfully.

quixstreams.sinks.base.sink

BaseSink

class BaseSink(abc.ABC)

[VIEW SOURCE]

This is a base class for all sinks.

Subclass it and implement its methods to create your own sink.

Note that Sinks are currently in beta, and their design may change over time.

BaseSink.flush

@abc.abstractmethod
def flush(topic: str, partition: int)

[VIEW SOURCE]

This method is triggered by the Checkpoint class when it commits.

You can use flush() to write the batched data to the destination (in case of a batching sink), or confirm the delivery of the previously sent messages (in case of a streaming sink).

If flush() fails, the checkpoint will be aborted.

BaseSink.add

@abc.abstractmethod
def add(value: Any, key: Any, timestamp: int, headers: HeadersTuples,
        topic: str, partition: int, offset: int)

[VIEW SOURCE]

This method is triggered on every new processed record being sent to this sink.

You can use it to accumulate batches of data before sending them outside, or to send results right away in a streaming manner and confirm a delivery later on flush().

BaseSink.on_paused

def on_paused(topic: str, partition: int)

[VIEW SOURCE]

This method is triggered when the sink is paused due to backpressure, when the SinkBackpressureError is raised.

Here you can react to the backpressure events.

BatchingSink

class BatchingSink(BaseSink)

[VIEW SOURCE]

A base class for batching sinks, that need to accumulate the data first before sending it to the external destinatios.

Examples: databases, objects stores, and other destinations where writing every message is not optimal.

It automatically handles batching, keeping batches in memory per topic-partition.

You may subclass it and override the write() method to implement a custom batching sink.

BatchingSink.write

@abc.abstractmethod
def write(batch: SinkBatch)

[VIEW SOURCE]

This method implements actual writing to the external destination.

It may also raise SinkBackpressureError if the destination cannot accept new writes at the moment. When this happens, the accumulated batch is dropped and the app pauses the corresponding topic partition.

BatchingSink.add

def add(value: Any, key: Any, timestamp: int, headers: HeadersTuples,
        topic: str, partition: int, offset: int)

[VIEW SOURCE]

Add a new record to in-memory batch.

BatchingSink.flush

def flush(topic: str, partition: int)

[VIEW SOURCE]

Flush an accumulated batch to the destination and drop it afterward.

BatchingSink.on_paused

def on_paused(topic: str, partition: int)

[VIEW SOURCE]

When the destination is already backpressure, drop the accumulated batch.

quixstreams.sinks.base.batch

SinkBatch

class SinkBatch()

[VIEW SOURCE]

A batch to accumulate processed data by BatchingSink between the checkpoints.

Batches are created automatically by the implementations of BatchingSink.

Arguments:

  • topic: a topic name
  • partition: a partition number

SinkBatch.iter_chunks

def iter_chunks(n: int) -> Iterable[Iterable[SinkItem]]

[VIEW SOURCE]

Iterate over batch data in chunks of length n. The last batch may be shorter.

quixstreams.sinks.base

quixstreams.sinks.base.exceptions

SinkBackpressureError

class SinkBackpressureError(QuixException)

[VIEW SOURCE]

An exception to be raised by Sinks during flush() call

to signal a backpressure event to the application.

When raised, the app will drop the accumulated sink batch, pause the corresponding topic partition for a timeout specified in retry_after, and resume it when it's elapsed.

Arguments:

  • retry_after: a timeout in seconds to pause for
  • topic: a topic name to pause
  • partition: a partition number to pause

quixstreams.sinks.base.manager

quixstreams.sinks.base.item

quixstreams.utils

quixstreams.utils.settings

BaseSettings

class BaseSettings(_BaseSettings)

[VIEW SOURCE]

BaseSettings.as_dict

def as_dict(plaintext_secrets: bool = False,
            include: Optional[Set[str]] = None) -> dict

[VIEW SOURCE]

Dump any non-empty config values as a dictionary.

Arguments:

  • plaintext_secrets: whether secret values are plaintext or obscured (***)
  • include: optional list of fields to be included in the dictionary

Returns:

a dictionary

quixstreams.utils.dicts

dict_values

def dict_values(d: object) -> List

[VIEW SOURCE]

Recursively unpacks a set of nested dicts to get a flattened list of leaves,

where "leaves" are the first non-dict item.

i.e {"a": {"b": {"c": 1}, "d": 2}, "e": 3} becomes [1, 2, 3]

Arguments:

  • d: initially, a dict (with potentially nested dicts)

Returns:

a list with all the leaves of the various contained dicts

quixstreams.utils.json

dumps

def dumps(value: Any) -> bytes

[VIEW SOURCE]

Serialize to JSON using orjson package.

Arguments:

  • value: value to serialize to JSON

Returns:

bytes

loads

def loads(value: bytes) -> Any

[VIEW SOURCE]

Deserialize from JSON using orjson package.

Main differences: - It returns bytes - It doesn't allow non-str keys in dictionaries

Arguments:

  • value: value to deserialize from

Returns:

object

quixstreams.types

quixstreams.models.timestamps

TimestampType

class TimestampType(enum.IntEnum)

[VIEW SOURCE]

TIMESTAMP_NOT_AVAILABLE

timestamps not supported by broker

TIMESTAMP_CREATE_TIME

message creation time (or source / producer time)

TIMESTAMP_LOG_APPEND_TIME

broker receive time

MessageTimestamp

class MessageTimestamp()

[VIEW SOURCE]

Represents a timestamp of incoming Kafka message.

It is made pseudo-immutable (i.e. public attributes don't have setters), and it should not be mutated during message processing.

MessageTimestamp.create

@classmethod
def create(cls, timestamp_type: int, milliseconds: int) -> Self

[VIEW SOURCE]

Create a Timestamp object based on data

from confluent_kafka.Message.timestamp().

If timestamp type is "TIMESTAMP_NOT_AVAILABLE", the milliseconds are set to None

Arguments:

  • timestamp_type: a timestamp type represented as a number Can be one of:
  • "0" - TIMESTAMP_NOT_AVAILABLE, timestamps not supported by broker.
  • "1" - TIMESTAMP_CREATE_TIME, message creation time (or source / producer time).
  • "2" - TIMESTAMP_LOG_APPEND_TIME, broker receive time.
  • milliseconds: the number of milliseconds since the epoch (UTC).

Returns:

Timestamp object

quixstreams.models

quixstreams.models.messagecontext

MessageContext

class MessageContext()

[VIEW SOURCE]

An object with Kafka message properties.

It is made pseudo-immutable (i.e. public attributes don't have setters), and it should not be mutated during message processing.

quixstreams.models.types

ConfluentKafkaMessageProto

class ConfluentKafkaMessageProto(Protocol)

[VIEW SOURCE]

An interface of confluent_kafka.Message.

Use it to not depend on exact implementation and simplify testing.

Instances of confluent_kafka.Message cannot be directly created from Python, see https://github.com/confluentinc/confluent-kafka-python/issues/1535.

quixstreams.models.serializers.avro

AvroSerializer

class AvroSerializer(Serializer)

[VIEW SOURCE]

AvroSerializer.__init__

def __init__(
    schema: Schema,
    strict: bool = False,
    strict_allow_default: bool = False,
    disable_tuple_notation: bool = False,
    schema_registry_client_config: Optional[SchemaRegistryClientConfig] = None,
    schema_registry_serialization_config: Optional[
        SchemaRegistrySerializationConfig] = None)

[VIEW SOURCE]

Serializer that returns data in Avro format.

For more information see fastavro schemaless_writer method.

Arguments:

  • schema: The avro schema.
  • strict: If set to True, an error will be raised if records do not contain exactly the same fields that the schema states. Default - False
  • strict_allow_default: If set to True, an error will be raised if records do not contain exactly the same fields that the schema states unless it is a missing field that has a default value in the schema. Default - False
  • disable_tuple_notation: If set to True, tuples will not be treated as a special case. Therefore, using a tuple to indicate the type of a record will not work. Default - False
  • schema_registry_client_config: If provided, serialization is offloaded to Confluent's AvroSerializer. Default - None
  • schema_registry_serialization_config: Additional configuration for Confluent's AvroSerializer. Default - None

    NOTE: schema_registry_client_config must also be set.

AvroDeserializer

class AvroDeserializer(Deserializer)

[VIEW SOURCE]

AvroDeserializer.__init__

def __init__(
    schema: Optional[Schema] = None,
    reader_schema: Optional[Schema] = None,
    return_record_name: bool = False,
    return_record_name_override: bool = False,
    return_named_type: bool = False,
    return_named_type_override: bool = False,
    handle_unicode_errors: str = "strict",
    schema_registry_client_config: Optional[SchemaRegistryClientConfig] = None
)

[VIEW SOURCE]

Deserializer that parses data from Avro.

For more information see fastavro schemaless_reader method.

Arguments:

  • schema: The Avro schema.
  • reader_schema: If the schema has changed since being written then the new schema can be given to allow for schema migration. Default - None
  • return_record_name: If true, when reading a union of records, the result will be a tuple where the first value is the name of the record and the second value is the record itself. Default - False
  • return_record_name_override: If true, this will modify the behavior of return_record_name so that the record name is only returned for unions where there is more than one record. For unions that only have one record, this option will make it so that the record is returned by itself, not a tuple with the name. Default - False
  • return_named_type: If true, when reading a union of named types, the result will be a tuple where the first value is the name of the type and the second value is the record itself NOTE: Using this option will ignore return_record_name and return_record_name_override. Default - False
  • return_named_type_override: If true, this will modify the behavior of return_named_type so that the named type is only returned for unions where there is more than one named type. For unions that only have one named type, this option will make it so that the named type is returned by itself, not a tuple with the name. Default - False
  • handle_unicode_errors: Should be set to a valid string that can be used in the errors argument of the string decode() function. Default - "strict"
  • schema_registry_client_config: If provided, deserialization is offloaded to Confluent's AvroDeserializer. Default - None

quixstreams.models.serializers.schema_registry

SchemaRegistryClientConfig

class SchemaRegistryClientConfig(BaseSettings)

[VIEW SOURCE]

Configuration required to establish the connection with a Schema Registry.

Arguments:

  • url: Schema Registry URL.
  • ssl_ca_location: Path to CA certificate file used to verify the Schema Registry's private key.
  • ssl_key_location: Path to the client's private key (PEM) used for authentication.

    NOTE: ssl_certificate_location must also be set.

  • ssl_certificate_location: Path to the client's public key (PEM) used for authentication.

    NOTE: May be set without ssl_key_location if the private key is stored within the PEM as well.

  • basic_auth_user_info: Client HTTP credentials in the form of username:password.

    NOTE: By default, userinfo is extracted from the URL if present.

SchemaRegistrySerializationConfig

class SchemaRegistrySerializationConfig(BaseSettings)

[VIEW SOURCE]

Configuration that instructs Serializer how to handle communication with a

Schema Registry.

Arguments:

  • auto_register_schemas: If True, automatically register the configured schema with Confluent Schema Registry if it has not previously been associated with the relevant subject (determined via subject.name.strategy). Defaults to True.
  • normalize_schemas: Whether to normalize schemas, which will transform schemas to have a consistent format, including ordering properties and references.
  • use_latest_version: Whether to use the latest subject version for serialization.

    NOTE: There is no check that the latest schema is backwards compatible with the object being serialized. Defaults to False.

  • subject_name_strategy: Callable(SerializationContext, str) -> str Defines how Schema Registry subject names are constructed. Standard naming strategies are defined in the confluent_kafka.schema_registry namespace. Defaults to topic_subject_name_strategy.
  • skip_known_types: Whether or not to skip known types when resolving schema dependencies. Defaults to False.
  • reference_subject_name_strategy: Defines how Schema Registry subject names for schema references are constructed. Defaults to reference_subject_name_strategy.
  • use_deprecated_format: Specifies whether the Protobuf serializer should serialize message indexes without zig-zag encoding. This option must be explicitly configured as older and newer Protobuf producers are incompatible. If the consumers of the topic being produced to are using confluent-kafka-python <1.8, then this property must be set to True until all old consumers have been upgraded.

quixstreams.models.serializers

quixstreams.models.serializers.exceptions

IgnoreMessage

class IgnoreMessage(exceptions.QuixException)

[VIEW SOURCE]

Raise this exception from Deserializer.call in order to ignore the processing of the particular message.

quixstreams.models.serializers.quix

QuixDeserializer

class QuixDeserializer(JSONDeserializer)

[VIEW SOURCE]

Handles Deserialization for any Quix-formatted topic.

Parses JSON data from either TimeseriesData and EventData (ignores the rest).

QuixDeserializer.__init__

def __init__(loads: Callable[[Union[bytes, bytearray]], Any] = default_loads)

[VIEW SOURCE]

Arguments:

  • loads: function to parse json from bytes. Default - 🇵🇾func:quixstreams.utils.json.loads.

QuixDeserializer.split_values

@property
def split_values() -> bool

[VIEW SOURCE]

Each Quix message might contain data for multiple Rows. This property informs the downstream processors about that, so they can expect an Iterable instead of Mapping.

QuixDeserializer.deserialize

def deserialize(model_key: str, value: Union[List[Mapping],
                                             Mapping]) -> Iterable[Mapping]

[VIEW SOURCE]

Deserialization function for particular data types (Timeseries or EventData).

Arguments:

  • model_key: value of "__Q_ModelKey" message header
  • value: deserialized JSON value of the message, list or dict

Returns:

Iterable of dicts

QuixSerializer

class QuixSerializer(JSONSerializer)

[VIEW SOURCE]

QuixSerializer.__init__

def __init__(as_legacy: bool = True,
             dumps: Callable[[Any], Union[str, bytes]] = default_dumps)

[VIEW SOURCE]

Serializer that returns data in json format.

Arguments:

  • as_legacy: parse as the legacy format; Default = True
  • dumps: a function to serialize objects to json. Default - 🇵🇾func:quixstreams.utils.json.dumps

QuixTimeseriesSerializer

class QuixTimeseriesSerializer(QuixSerializer)

[VIEW SOURCE]

Serialize data to JSON formatted according to Quix Timeseries format.

The serializable object must be dictionary, and each item must be of str, int, float, bytes or bytearray type. Otherwise, the SerializationError will be raised.

Input:

{'a': 1, 'b': 1.1, 'c': "string", 'd': b'bytes', 'Tags': {'tag1': 'tag'}}

Output:

{
    "Timestamps": [123123123],
    "NumericValues": {"a": [1], "b": [1.1]},
    "StringValues": {"c": ["string"]},
    "BinaryValues": {"d": ["Ynl0ZXM="]},
    "TagValues": {"tag1": ["tag"]}
}

QuixEventsSerializer

class QuixEventsSerializer(QuixSerializer)

[VIEW SOURCE]

Serialize data to JSON formatted according to Quix EventData format. The input value is expected to be a dictionary with the following keys: - "Id" (type str, default - "") - "Value" (type str, default - ""), - "Tags" (type dict, default - {})

NOTE: All the other fields will be ignored.

Input:

{
    "Id": "an_event",
    "Value": "any_string",
    "Tags": {"tag1": "tag"}}
}

Output:

{
    "Id": "an_event",
    "Value": "any_string",
    "Tags": {"tag1": "tag"}},
    "Timestamp":1692703362840389000
}

quixstreams.models.serializers.simple_types

BytesDeserializer

class BytesDeserializer(Deserializer)

[VIEW SOURCE]

A deserializer to bypass bytes without any changes

BytesSerializer

class BytesSerializer(Serializer)

[VIEW SOURCE]

A serializer to bypass bytes without any changes

StringDeserializer

class StringDeserializer(Deserializer)

[VIEW SOURCE]

StringDeserializer.__init__

def __init__(codec: str = "utf_8")

[VIEW SOURCE]

Deserializes bytes to strings using the specified encoding.

Arguments:

  • codec: string encoding A wrapper around confluent_kafka.serialization.StringDeserializer.

IntegerDeserializer

class IntegerDeserializer(Deserializer)

[VIEW SOURCE]

Deserializes bytes to integers.

A wrapper around confluent_kafka.serialization.IntegerDeserializer.

DoubleDeserializer

class DoubleDeserializer(Deserializer)

[VIEW SOURCE]

Deserializes float to IEEE 764 binary64.

A wrapper around confluent_kafka.serialization.DoubleDeserializer.

StringSerializer

class StringSerializer(Serializer)

[VIEW SOURCE]

StringSerializer.__init__

def __init__(codec: str = "utf_8")

[VIEW SOURCE]

Serializes strings to bytes using the specified encoding.

Arguments:

  • codec: string encoding

IntegerSerializer

class IntegerSerializer(Serializer)

[VIEW SOURCE]

Serializes integers to bytes

DoubleSerializer

class DoubleSerializer(Serializer)

[VIEW SOURCE]

Serializes floats to bytes

quixstreams.models.serializers.protobuf

ProtobufSerializer

class ProtobufSerializer(Serializer)

[VIEW SOURCE]

ProtobufSerializer.__init__

def __init__(
    msg_type: Message,
    deterministic: bool = False,
    ignore_unknown_fields: bool = False,
    schema_registry_client_config: Optional[SchemaRegistryClientConfig] = None,
    schema_registry_serialization_config: Optional[
        SchemaRegistrySerializationConfig] = None)

[VIEW SOURCE]

Serializer that returns data in protobuf format.

Serialisation from a python dictionary can have a significant performance impact. An alternative is to pass the serializer an object of the msg_type class.

Arguments:

  • msg_type: protobuf message class.
  • deterministic: If true, requests deterministic serialization of the protobuf, with predictable ordering of map keys Default - False
  • ignore_unknown_fields: If True, do not raise errors for unknown fields. Default - False
  • schema_registry_client_config: If provided, serialization is offloaded to Confluent's ProtobufSerializer. Default - None
  • schema_registry_serialization_config: Additional configuration for Confluent's ProtobufSerializer. Default - None

    NOTE: schema_registry_client_config must also be set.

ProtobufDeserializer

class ProtobufDeserializer(Deserializer)

[VIEW SOURCE]

ProtobufDeserializer.__init__

def __init__(
    msg_type: Message,
    use_integers_for_enums: bool = False,
    preserving_proto_field_name: bool = False,
    to_dict: bool = True,
    schema_registry_client_config: Optional[SchemaRegistryClientConfig] = None,
    schema_registry_serialization_config: Optional[
        SchemaRegistrySerializationConfig] = None)

[VIEW SOURCE]

Deserializer that parses protobuf data into a dictionary suitable for a StreamingDataframe.

Deserialisation to a python dictionary can have a significant performance impact. You can disable this behavior using to_dict, in that case the protobuf message will be used as the StreamingDataframe row value.

Arguments:

  • msg_type: protobuf message class.
  • use_integers_for_enums: If true, use integers instead of enum names. Default - False
  • preserving_proto_field_name: If True, use the original proto field names as defined in the .proto file. If False, convert the field names to lowerCamelCase. Default - False
  • to_dict: If false, return the protobuf message instead of a dict. Default - True
  • schema_registry_client_config: If provided, deserialization is offloaded to Confluent's ProtobufDeserializer. Default - None
  • schema_registry_serialization_config: Additional configuration for Confluent's ProtobufDeserializer. Default - None

    NOTE: schema_registry_client_config must also be set.

quixstreams.models.serializers.json

JSONSerializer

class JSONSerializer(Serializer)

[VIEW SOURCE]

JSONSerializer.__init__

def __init__(
    dumps: Callable[[Any], Union[str, bytes]] = default_dumps,
    schema: Optional[Mapping] = None,
    validator: Optional[Validator] = None,
    schema_registry_client_config: Optional[SchemaRegistryClientConfig] = None,
    schema_registry_serialization_config: Optional[
        SchemaRegistrySerializationConfig] = None)

[VIEW SOURCE]

Serializer that returns data in json format.

Arguments:

  • dumps: a function to serialize objects to json. Default - 🇵🇾func:quixstreams.utils.json.dumps
  • schema: A schema used to validate the data using jsonschema.Draft202012Validator. Default - None
  • validator: A jsonschema validator used to validate the data. Takes precedences over the schema. Default - None
  • schema_registry_client_config: If provided, serialization is offloaded to Confluent's JSONSerializer. Default - None
  • schema_registry_serialization_config: Additional configuration for Confluent's JSONSerializer. Default - None

    NOTE: schema_registry_client_config must also be set.

JSONDeserializer

class JSONDeserializer(Deserializer)

[VIEW SOURCE]

JSONDeserializer.__init__

def __init__(
    loads: Callable[[Union[bytes, bytearray]], Any] = default_loads,
    schema: Optional[Mapping] = None,
    validator: Optional[Validator] = None,
    schema_registry_client_config: Optional[SchemaRegistryClientConfig] = None
)

[VIEW SOURCE]

Deserializer that parses data from JSON

Arguments:

  • loads: function to parse json from bytes. Default -