Using Producer & Consumer
When use Producer & Consumer
Quix Streams provide access to low-level Producer
and Consumer
classes.
The intended use is when StreamingDataFrame
is not enough for the given application, and you need more flexibility.
For example:
- To produce data to the Kafka topic from the file or another non-Kafka source.
- To manually commit topic offsets.
- To batch messages in-memory before processing them.
Producer
& Consumer
are essentially thin wrappers on top of the confluent_kafka
library, which is used in Quix Streams.
The wrapping code adds additional logging, typing, and error handling, but the intention is to keep the methods API the same.
Using them will require more code than with StreamingDataFrame
but you can implement any streaming workflows you need.
Using Consumer
There are two ways to create a Consumer
object:
-
Using
Application.get_consumer()
method.
This way theApplication
will provide the already configuredConsumer
according toApplication
configs.
It is a recommended way to create aConsumer
. -
Create an instance of
quixstreams.kafka.Consumer
.
This way you will need to configure the instance yourself.
See the Consumer API Docs for more details and methods.
Differences from confluent_kafka.Consumer
- The
__init__
parameters are expanded into individual params compared toconfluent_kafka.Consumer
which accepts a dictionary will all the parameters. - The
"enable.auto.offset.store"
parameter is set toFalse
by default in order to provide at-least-once processing guarantees by default. - Rebalancing callbacks
on_assign
,on_revoke
andon_lost
raise Kafka errors asPartitionAssignmentError
exceptions when they occur. quixstreams.kafka.Consumer
implements a context manager interface to gracefully close itself.
Example:
Creating a Consumer
object using an Application
instance and start polling the topic.
from quixstreams import Application
# Configure an Application.
# The config params will be used for the Consumer instance too.
app = Application(
broker_address='localhost:9092',
auto_offset_reset='earliest',
auto_commit_enable=True,
)
# Create a consumer and start a polling loop
with app.get_consumer() as consumer:
consumer.subscribe(topics=['my-topic'])
while True:
msg = consumer.poll(0.1)
if msg is None:
continue
elif msg.error():
print('Kafka error:', msg.error())
continue
value = msg.value()
# Do some work with the value here ...
# Store the offset of the processed message on the Consumer
# for the auto-commit mechanism.
# It will send it to Kafka in the background.
# Storing offset only after the message is processed enables at-least-once delivery
# guarantees.
consumer.store_offsets(message=msg)
Using Producer
Similarly to Consumer
, there are two ways to create a Producer
object:
-
Using
Application.get_producer()
method.
This way theApplication
will provide the already configuredProducer
according toApplication
configs.
It is a recommended way to create aProducer
. -
Create an instance of
quixstreams.kafka.Producer
.
This way you will need to configure the instance yourself.
See the Producer API Docs for more details and methods.
Differences from confluent_kafka.Producer
- The
__init__
parameters are expanded into individual params compared toconfluent_kafka.Producer
which accepts a dictionary will all the parameters. - The
produce()
method automatically calls.poll()
to empty the internal buffer for produced messages.produce()
also retriesBufferError
in case the internal producer buffer is full.
quixstreams.kafka.Producer
implements a context manager interface to gracefully flush itself.
Example:
Creating a Producer
object using an Application
instance and start producing messages.
For a more complete example, you can also read the Quickstart page.
from quixstreams import Application
# Configure an Application.
# The config params will be used for the Consumer instance too.
app = Application(broker_address='localhost:9092')
# Create some messages to produce
messages = [
{'key': b'key1', 'value': b'value1'},
{'key': b'key2', 'value': b'value2'},
]
# Create a producer and start producing messages
with app.get_producer() as producer:
for message in messages:
producer.produce(topic='my-topic', key=message['key'], value=message['value'])