Sinks API
quixstreams.sinks.base.sink
BaseSink
This is a base class for all sinks.
Subclass it and implement its methods to create your own sink.
Note that Sinks are currently in beta, and their design may change over time.
BaseSink.flush
This method is triggered by the Checkpoint class when it commits.
You can use flush()
to write the batched data to the destination (in case of
a batching sink), or confirm the delivery of the previously sent messages
(in case of a streaming sink).
If flush() fails, the checkpoint will be aborted.
BaseSink.add
@abc.abstractmethod
def add(value: Any, key: Any, timestamp: int,
headers: List[Tuple[str, HeaderValue]], topic: str, partition: int,
offset: int)
This method is triggered on every new processed record being sent to this sink.
You can use it to accumulate batches of data before sending them outside, or to send results right away in a streaming manner and confirm a delivery later on flush().
BaseSink.on_paused
This method is triggered when the sink is paused due to backpressure, when
the SinkBackpressureError
is raised.
Here you can react to the backpressure events.
BatchingSink
A base class for batching sinks, that need to accumulate the data first before sending it to the external destinatios.
Examples: databases, objects stores, and other destinations where writing every message is not optimal.
It automatically handles batching, keeping batches in memory per topic-partition.
You may subclass it and override the write()
method to implement a custom
batching sink.
BatchingSink.write
This method implements actual writing to the external destination.
It may also raise SinkBackpressureError
if the destination cannot accept new
writes at the moment.
When this happens, the accumulated batch is dropped and the app pauses the
corresponding topic partition.
BatchingSink.add
def add(value: Any, key: Any, timestamp: int,
headers: List[Tuple[str, HeaderValue]], topic: str, partition: int,
offset: int)
Add a new record to in-memory batch.
BatchingSink.flush
Flush an accumulated batch to the destination and drop it afterward.
BatchingSink.on_paused
When the destination is already backpressure, drop the accumulated batch.
quixstreams.sinks.base.batch
SinkBatch
A batch to accumulate processed data by BatchingSink
between the checkpoints.
Batches are created automatically by the implementations of BatchingSink
.
Arguments:
topic
: a topic namepartition
: a partition number
SinkBatch.iter_chunks
Iterate over batch data in chunks of length n. The last batch may be shorter.
quixstreams.sinks.base.exceptions
SinkBackpressureError
An exception to be raised by Sinks during flush() call
to signal a backpressure event to the application.
When raised, the app will drop the accumulated sink batch,
pause the corresponding topic partition for
a timeout specified in retry_after
, and resume it when it's elapsed.
Arguments:
retry_after
: a timeout in seconds to pause fortopic
: a topic name to pausepartition
: a partition number to pause
quixstreams.sinks.core.influxdb3
InfluxDB3Sink
InfluxDB3Sink.__init__
def __init__(token: str,
host: str,
organization_id: str,
database: str,
measurement: str,
fields_keys: Iterable[str] = (),
tags_keys: Iterable[str] = (),
time_key: Optional[str] = None,
time_precision: WritePrecision = WritePrecision.MS,
include_metadata_tags: bool = False,
batch_size: int = 1000,
enable_gzip: bool = True,
request_timeout_ms: int = 10_000,
debug: bool = False)
A connector to sink processed data to InfluxDB v3.
It batches the processed records in memory per topic partition, converts them to the InfluxDB format, and flushes them to InfluxDB at the checkpoint.
The InfluxDB sink transparently handles backpressure if the destination instance cannot accept more data at the moment (e.g., when InfluxDB returns an HTTP 429 error with the "retry_after" header set). When this happens, the sink will notify the Application to pause consuming from the backpressured topic partition until the "retry_after" timeout elapses.
NOTE: InfluxDB3Sink can accept only dictionaries. If the record values are not dicts, you need to convert them to dicts before sinking.
Arguments:
token
: InfluxDB access tokenhost
: InfluxDB host in format "https://" organization_id
: InfluxDB organization_iddatabase
: database namefields_keys
: a list of keys to be used as "fields" when writing to InfluxDB. If present, it must not overlap with "tags_keys". If empty, the whole record value will be used.NOTE The fields' values can only be strings, floats, integers, or booleans. Default -
()
.tags_keys
: a list of keys to be used as "tags" when writing to InfluxDB. If present, it must not overlap with "fields_keys". These keys will be popped from the value dictionary automatically because InfluxDB doesn't allow the same keys be both in tags and fields. If empty, no tags will be sent.NOTE: InfluxDB client always converts tag values to strings. Default -
()
.time_key
: a key to be used as "time" when writing to InfluxDB. By default, the record timestamp will be used with "ms" time precision. When using a custom key, you may need to adjust thetime_precision
setting to match.time_precision
: a time precision to use when writing to InfluxDB.include_metadata_tags
: if True, includes record's key, topic, and partition as tags. Default -False
.batch_size
: how many records to write to InfluxDB in one request. Note that it only affects the size of one write request, and not the number of records flushed on each checkpoint. Default -1000
.enable_gzip
: if True, enables gzip compression for writes. Default -True
.request_timeout_ms
: an HTTP request timeout in milliseconds. Default -10000
.debug
: if True, print debug logs from InfluxDB client. Default -False
.
quixstreams.sinks.core.csv
CSVSink
CSVSink.__init__
def __init__(path: str,
dialect: str = "excel",
key_serializer: Callable[[Any], str] = str,
value_serializer: Callable[[Any], str] = json.dumps)
A base CSV sink that writes data from all assigned partitions to a single file.
It's best to be used for local debugging.
Column format: (key, value, timestamp, topic, partition, offset)
Arguments:
path
: a path to CSV filedialect
: a CSV dialect to use. It affects quoting and delimiters. See the "csv" module docs for more info. Default -"excel"
.key_serializer
: a callable to convert keys to strings. Default -str
.value_serializer
: a callable to convert values to strings. Default -json.dumps
.
quixstreams.sinks.community.iceberg
AWSIcebergConfig
AWSIcebergConfig.__init__
def __init__(aws_s3_uri: str,
aws_region: Optional[str] = None,
aws_access_key_id: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
aws_session_token: Optional[str] = None)
Configure IcebergSink to work with AWS Glue.
Arguments:
aws_s3_uri
: The S3 URI where the table data will be stored (e.g., 's3://your-bucket/warehouse/').aws_region
: The AWS region for the S3 bucket and Glue catalog.aws_access_key_id
: the AWS access key ID. NOTE: can alternatively set the AWS_ACCESS_KEY_ID environment variable when using AWS Glue.aws_secret_access_key
: the AWS secret access key. NOTE: can alternatively set the AWS_SECRET_ACCESS_KEY environment variable when using AWS Glue.aws_session_token
: a session token (or will be generated for you). NOTE: can alternatively set the AWS_SESSION_TOKEN environment variable when using AWS Glue.
IcebergSink
IcebergSink writes batches of data to an Apache Iceberg table.
The data will by default include the kafka message key, value, and timestamp.
It serializes incoming data batches into Parquet format and appends them to the Iceberg table, updating the table schema as necessary.
Currently, supports Apache Iceberg hosted in:
- AWS
Supported data catalogs:
- AWS Glue
Arguments:
table_name
: The name of the Iceberg table.config
: An IcebergConfig with all the various connection parameters.data_catalog_spec
: data cataloger to use (ex. for AWS Glue, "aws_glue").schema
: The Iceberg table schema. If None, a default schema is used.partition_spec
: The partition specification for the table. If None, a default is used.
Example setup using an AWS-hosted Iceberg with AWS Glue:
from quixstreams import Application
from quixstreams.sinks.community.iceberg import IcebergSink, AWSIcebergConfig
# Configure S3 bucket credentials
iceberg_config = AWSIcebergConfig(
aws_s3_uri="", aws_region="", aws_access_key_id="", aws_secret_access_key=""
)
# Configure the sink to write data to S3 with the AWS Glue catalog spec
iceberg_sink = IcebergSink(
table_name="glue.sink-test",
config=iceberg_config,
data_catalog_spec="aws_glue",
)
app = Application(broker_address='localhost:9092', auto_offset_reset="earliest")
topic = app.topic('sink_topic')
# Do some processing here
sdf = app.dataframe(topic=topic).print(metadata=True)
# Sink results to the IcebergSink
sdf.sink(iceberg_sink)
if __name__ == "__main__":
# Start the application
app.run()
IcebergSink.write
Writes a batch of data to the Iceberg table.
Implements retry logic to handle concurrent write conflicts.
Arguments:
batch
: The batch of data to write.
quixstreams.sinks.community.bigquery
BigQuerySink
BigQuerySink.__init__
def __init__(project_id: str,
location: str,
dataset_id: str,
table_name: str,
service_account_json: Optional[str] = None,
schema_auto_update: bool = True,
ddl_timeout: float = 10.0,
insert_timeout: float = 10.0,
retry_timeout: float = 30.0,
**kwargs)
A connector to sink processed data to Google Cloud BigQuery.
It batches the processed records in memory per topic partition, and flushes them to BigQuery at the checkpoint.
NOTE: BigQuerySink can accept only dictionaries. If the record values are not dicts, you need to convert them to dicts before sinking.
The column names and types are inferred from individual records. Each key in the record's dictionary will be inserted as a column to the resulting BigQuery table.
If the column is not present in the schema, the sink will try to add new nullable columns on the fly with types inferred from individual values.
The existing columns will not be affected.
To disable this behavior, pass schema_auto_update=False
and define the necessary schema upfront.
The minimal schema must define two columns: "timestamp" of type TIMESTAMP, and "__key" with a type of the expected message key.
Arguments:
project_id
: a Google project id.location
: a BigQuery location.dataset_id
: a BigQuery dataset id. If the dataset does not exist, the sink will try to create it.table_name
: BigQuery table name. If the table does not exist, the sink will try to create it with a default schema.service_account_json
: an optional JSON string with service account credentials to connect to BigQuery. The internalgoogle.cloud.bigquery.Client
will use the Application Default Credentials if not provided. See https://cloud.google.com/docs/authentication/provide-credentials-adc for more info. Default -None
.schema_auto_update
: if True, the sink will try to create a dataset and a table if they don't exist. It will also add missing columns on the fly with types inferred from individual values.ddl_timeout
: a timeout for a single DDL operation (adding tables, columns, etc.). Default - 10s.insert_timeout
: a timeout for a single INSERT operation. Default - 10s.retry_timeout
: a total timeout for each request to BigQuery API. During this timeout, a request can be retried according to the client's default retrying policy.kwargs
: Additional keyword arguments passed tobigquery.Client
.
quixstreams.sinks.community.file.sink
InvalidFormatError
Raised when the format is specified incorrectly.
FileSink
Writes batches of data to files on disk using specified formats.
Messages are grouped by their topic and partition. Data from messages with the same topic and partition are saved in the same directory. Each batch of messages is serialized and saved to a file within that directory. Files are named using the batch's starting offset to ensure uniqueness and order.
If append
is set to True
, the sink will attempt to append data to an
existing file rather than creating a new one. This is only supported for
formats that allow appending.
FileSink.__init__
Initializes the FileSink.
Arguments:
output_dir
: The directory where files will be written.format
: The data serialization format to use. This can be either a format name ("json", "parquet") or an instance of aFormat
subclass.append
: IfTrue
, data will be appended to existing files when possible. Note that not all formats support appending. Defaults toFalse
.
Raises:
ValueError
: Ifappend
isTrue
but the specified format does not support appending.
FileSink.write
Writes a batch of data to files on disk, grouping data by topic and partition.
If append
is True
and an existing file is found, data will be appended to
the last file. Otherwise, a new file is created based on the batch's starting
offset.
Arguments:
batch
: The batch of data to write.
quixstreams.sinks.community.file.formats.base
Format
Base class for formatting batches in file sinks.
This abstract base class defines the interface for batch formatting
in file sinks. Subclasses should implement the file_extension
property and the serialize
method to define how batches are
formatted and saved.
Format.file_extension
Returns the file extension used for output files.
Returns:
The file extension as a string.
Format.supports_append
Indicates if the format supports appending data to an existing file.
Returns:
True if appending is supported, otherwise False.
Format.serialize
Serializes a batch of messages into bytes.
Arguments:
batch
: The batch of messages to serialize.
Returns:
The serialized batch as bytes.
quixstreams.sinks.community.file.formats.json
JSONFormat
Serializes batches of messages into JSON Lines format with optional gzip compression.
This class provides functionality to serialize a SinkBatch
into bytes
in JSON Lines format. It supports optional gzip compression and allows
for custom JSON serialization through the dumps
parameter.
This format supports appending to existing files.
JSONFormat.__init__
def __init__(file_extension: str = ".jsonl",
compress: bool = False,
dumps: Optional[Callable[[Any], str]] = None) -> None
Initializes the JSONFormat.
Arguments:
file_extension
: The file extension to use for output files. Defaults to ".jsonl".compress
: IfTrue
, compresses the output using gzip and appends ".gz" to the file extension. Defaults toFalse
.dumps
: A custom function to serialize objects to JSON-formatted strings. If provided, thecompact
option is ignored.
JSONFormat.file_extension
Returns the file extension used for output files.
Returns:
The file extension as a string.
JSONFormat.serialize
Serializes a SinkBatch
into bytes in JSON Lines format.
Each item in the batch is converted into a JSON object with "_timestamp", "_key", and "_value" fields. If the message key is in bytes, it is decoded to a string.
Arguments:
batch
: TheSinkBatch
to serialize.
Returns:
The serialized batch in JSON Lines format, optionally compressed with gzip.
quixstreams.sinks.community.file.formats.parquet
ParquetFormat
Serializes batches of messages into Parquet format.
This class provides functionality to serialize a SinkBatch
into bytes
in Parquet format using PyArrow. It allows setting the file extension
and compression algorithm used for the Parquet files.
This format does not support appending to existing files.
ParquetFormat.__init__
Initializes the ParquetFormat.
Arguments:
file_extension
: The file extension to use for output files. Defaults to ".parquet".compression
: The compression algorithm to use for Parquet files. Allowed values are "none", "snappy", "gzip", "brotli", "lz4", or "zstd". Defaults to "snappy".
ParquetFormat.file_extension
Returns the file extension used for output files.
Returns:
The file extension as a string.
ParquetFormat.serialize
Serializes a SinkBatch
into bytes in Parquet format.
Each item in the batch is converted into a dictionary with "_timestamp", "_key", and the keys from the message value. If the message key is in bytes, it is decoded to a string.
Missing fields in messages are filled with None
to ensure all rows
have the same columns.
Arguments:
batch
: TheSinkBatch
to serialize.
Returns:
The serialized batch as bytes in Parquet format.
quixstreams.sinks.community.pubsub
PubSubTopicNotFoundError
Raised when the specified topic does not exist.
PubSubSink
A sink that publishes messages to Google Cloud Pub/Sub.
PubSubSink.__init__
def __init__(project_id: str,
topic_id: str,
service_account_json: Optional[str] = None,
value_serializer: Callable[[Any], Union[bytes, str]] = json.dumps,
key_serializer: Callable[[Any], str] = bytes.decode,
flush_timeout: int = 5,
**kwargs) -> None
Initialize the PubSubSink.
Arguments:
project_id
: GCP project ID.topic_id
: Pub/Sub topic ID.service_account_json
: an optional JSON string with service account credentials to connect to Pub/Sub. The internalPublisherClient
will use the Application Default Credentials if not provided. See https://cloud.google.com/docs/authentication/provide-credentials-adc for more info. Default -None
.value_serializer
: Function to serialize the value to string or bytes (defaults to json.dumps).key_serializer
: Function to serialize the key to string (defaults to bytes.decode).kwargs
: Additional keyword arguments passed to PublisherClient.
PubSubSink.add
def add(value: Any, key: Any, timestamp: int,
headers: list[tuple[str, HeaderValue]], topic: str, partition: int,
offset: int) -> None
Publish a message to Pub/Sub.
PubSubSink.flush
Wait for all publish operations to complete successfully.