Apache Iceberg Sink
Info
This is a Community connector. Test it before using in production.
To learn more about differences between Core and Community connectors, see the Community and Core Connectors page.
This sink writes batches of data to an Apache Iceberg table.
By default, the data will include the kafka message key, value, and timestamp.
Currently, supports Apache Iceberg hosted in:
- AWS
Supported data catalogs:
- AWS Glue
How To Install
The dependencies for this sink are not included to the default quixstreams
package.
To install them, run the following command:
How To Use
Create an instance of IcebergSink
and pass
it to the StreamingDataFrame.sink()
method.
For the full description of expected parameters, see the Iceberg Sink API page.
from quixstreams import Application
from quixstreams.sinks.community.iceberg import IcebergSink, AWSIcebergConfig
# Configure S3 bucket credentials
iceberg_config = AWSIcebergConfig(
aws_s3_uri="", aws_region="", aws_access_key_id="", aws_secret_access_key=""
)
# Configure the sink to write data to S3 with the AWS Glue catalog spec
iceberg_sink = IcebergSink(
table_name="glue.sink-test",
config=iceberg_config,
data_catalog_spec="aws_glue",
)
app = Application(broker_address='localhost:9092', auto_offset_reset="earliest")
topic = app.topic('sink_topic')
# Do some processing here
sdf = app.dataframe(topic=topic).print(metadata=True)
# Sink results to the IcebergSink
sdf.sink(iceberg_sink)
if __name__ == "__main__":
# Start the application
app.run()
How It Works
IcebergSink
is a batching sink.
It batches processed records in memory per topic partition, serializes incoming data batches into Parquet format, and appends them to the Iceberg table, updating the table schema as necessary.
Retrying Failures
IcebergSink
will retry failed commits automatically with a random delay up to 5 seconds.
Delivery Guarantees
IcebergSink
provides at-least-once guarantees, and the results may contain duplicated rows of data if there were errors during processing.