Configuring Quix Streams Applications
quixstreams.Application
class is the main place to configure Quix Streams applications.
You can configure various aspects of your app, like:
- Kafka settings and credentials.
- State directory and RocksDB parameters.
- Error callbacks.
- Logging.
- and more.
For the full list of parameters see the Application API docs.
Main Configuration Parameters
broker_address
(REQUIRED) - Kafka connection settings as either:- broker url string (
"localhost:9092"
), good for local development
OR -
ConnectionConfig
(see: Authentication for more details). -
consumer_group
- Kafka consumer group name.
Consumer group is also used in the state directory path and as a prefix for changelog topics to ensure the applications from different consumer groups don't access the same state.
Default -"quixstreams-default"
. -
commit_interval
- How often to commit the processed offsets and state in seconds.
Default -5.0
.
See the Checkpointing page for more information about thecommit_interval
parameter. -
commit_every
- Commit the checkpoint after processing N offsets.
Default -0
.
See the Checkpointing page for more information about thecommit_every
parameter. -
auto_offset_reset
- Consumerauto.offset.reset
setting.
It determines where the consumer should start reading messages from.
See moreauto.offset.reset
in this article.
Options:"latest"
,"earliest"
.
Default -"latest"
. -
processing_guarantee
- Use "at-least-once" or "exactly-once" processing guarantees.
See Processing Guarantees for more information.
Options -"at-least-once"
or"exactly-once"
.
Default -"at-least-once"
.
Authentication
If you need to provide authentication settings for your broker, you
can do so with the broker_address
argument by passing it a ConnectionConfig
object (instead of a string), like so:
from quixstreams import Application
from quixstreams.kafka.configuration import ConnectionConfig
connection = ConnectionConfig(
bootstrap_servers="my_url",
security_protocol="sasl_plaintext",
sasl_mechanism="PLAIN",
sasl_username="my_user",
sasl_password="my_pass"
)
app = Application(broker_address=connection)
Using A librdkafka Config (Alternative)
ConnectionConfig
can also import from an already valid librdkafka dictionary:
from quixstreams import Application
from quixstreams.kafka.configuration import ConnectionConfig
librdkafka_config = {
"bootstrap.servers": "my.url",
"security.protocol": "sasl_plaintext",
"sasl.mechanism": "PLAIN",
"sasl.username": "my_user",
"sasl.password": "my_pass"
}
# NOTE: use class directly (ConnectionConfig, NOT ConnectionConfig())
app = Application(
broker_address=ConnectionConfig.from_librdkafka_dict(librdkafka_config)
)
Ignoring Irrelevant librdkafka Settings
ConnectionConfig.from_librdkafka_dict(config, ignore_extras=True)
will additionally
ignore irrelevant settings (but you will lose some validation checks).
Processing Guarantees
This section concerns the processing_guarantee
setting.
What are "processing guarantees"?
Kafka broadly has three guarantee levels/semantics associated with handling messages.
From weakest to strongest: at-most-once
, at-least-once
, and exactly-once
.
Stronger guarantees generally have larger overhead, and thus reduced speed.
These guarantees can be read literally: when consuming Kafka messages, you can
guarantee each will be processed X
times:
at-most-once
- a message will be processed not more than once.
If the processing fails, the message will not be processed again.at-least-once
- a message may be processed more than once.
It may lead to duplicates in the output topics, but the message won’t be lost and is guaranteed to be processed.exactly-once
- a message will be processed exactly once.
The message is guaranteed to be processed without duplicated outputs but at the cost of higher latency and complexity.
What options does Quix Streams offer?
Currently, Quix Streams offers at-least-once
and exactly-once
.
The default guarantee is at-least-once
.
Performance comparison
It is difficult to offer hard numbers for the speed difference between exactly-once
and at-least-once
because many factors are at play, though latency is perhaps
the main contributor since exactly-once
requires more communication with the broker.
With default Application
settings, exactly-once
will likely perform at worst
10% slower than at-least-once
, but given the nature of infrastructure/architecture,
even this is not a guarantee.
You can minimize the impact of latency somewhat by adjusting the commit_interval
of
the Application
, but be aware of the
performance considerations around commit_interval
).
Exactly-Once producer buffering
Because exactly-once
messages do not fully commit or produce until the
checkpoint successfully completes, any resulting produced
messages are not immediately readable like with at-least-once
.
Basically, the commit_interval
of an Application
can be interpreted as the maximum amount of time before a produced message can be read
(depending on how far into the checkpoint a message is processed). If you adjust it,
be aware of other performance considerations around commit_interval
).
NOTE:
groupby
doubles this effect since it produces to another topic under the hood.
What processing guarantee is right for me?
To pick the right guarantee, you need to weigh an increase in speed vs. the potential to double-process a result, which may require other infrastructural considerations to handle appropriately.
In general, you may consider using at-least-once
guarantees when:
- The latency and processing speed are the primary concerns.
- The downstream consumers can gracefully handle duplicated messages.
You may consider using exactly-once
instead when:
- Consistency and correctness of the outputs are critical.
- Downstream consumers of the output topics cannot handle duplicated data.
- Note: you may want to pick a smaller value for the commit_interval
to commit checkpoints more often and reduce the latency.
For more information about tuning the commit_interval
, see the "Configuring the Checkpointing" page.
State
-
state_dir
- path to the application state directory.
This directory contains data for each state store separated by consumer group.
Default -"state"
. -
rocksdb_options
- options to be used for RocksDB instances.
The default configuration is described in the classquixstreams.state.rocksdb.options.RocksDBOptions
.
To override it, pass an instance ofquixstreams.state.rocksdb.options.RocksDBOptions
.
Default -None
(i.e. use a default config). -
use_changelog_topics
- whether the application should use changelog topics to back stateful operations.
See the Stateful Processing page to learn more about changelog topics.
Default -True
.
Logging
loglevel
- a log level to use for the "quixstreams" logger.
Options:"DEBUG"
,"INFO"
,"ERROR"
,"WARNING"
,"CRITICAL"
,"NOTSET"
,None
.
IfNone
is passed, no logging will be configured.
You may passNone
and configure "quixstreams" logger externally usinglogging
library.
Default -"INFO"
.
Error callbacks
To flexibly handle errors, Application
class accepts callbacks triggered when exceptions occur on different stages of stream processing.
If a callback returns True
, the exception will be ignored.
Otherwise, the exception will be propagated and the processing will eventually stop.
By default, Application will propagate all exceptions as usual.
NOTE: Use with caution. The Application will commit the message offset if an error callback decides to ignore the exception.
on_consumer_error
- triggered when internal Consumer fails to poll Kafka or cannot deserialize a message. Example:
def on_consumer_error(exc: Exception, message, logger) -> bool:
"""
Handle the consumer exception and ignore it
"""
logger.error('Ignore consumer exception exc=%s offset=%s', exc, message.offset())
return True
on_processing_error
- triggered when exception is raised during processing, after the message is deserialized. Example:
def on_processing_error(exc: Exception, row, logger) -> bool:
"""
Handle the processing exception and ignore it
"""
logger.error('Ignore processing exception exc=%s row=%s', exc, row)
return True
on_producer_error
- triggered when internal Producer fails to serialize or to produce a message to Kafka.
Example:
def on_producer_error(exc: Exception, row, logger) -> bool:
"""
Handle the producer exception and ignore it
"""
logger.error('Ignore producer exception exc=%s row=%s', exc, row)
return True
Processing Callbacks
on_message_processed
- a callback triggered when message is successfully processed.
You may use it for debugging purposes (e.g. count the total processed messages).
Another use case is to stop Application after processing a certain amount of messages.
Example:
from quixstreams import Application
TOTAL_PROCESSED = 0
def on_message_processed(topic:str, partition: int, offset: int):
"""
Stop application after processing 100 messages.
"""
global TOTAL_PROCESSED
TOTAL_PROCESSED += 1
if TOTAL_PROCESSED == 100:
app.stop()
app = Application(
broker_address='localhost:9092',
on_message_processed=on_message_processed,
)
Topic Management
auto_create_topics
- whether to create all defined topics automatically when starting the app.
Only topics made usingApplication.topic()
call are tracked.
Default -True
.topic_create_timeout
- timeout in seconds to wait for newly created topics to finalize. Default -60.0
.
Advanced Kafka Configuration
-
producer_extra_config
- a dictionary with additional Producer options in the format of librdkafka.
Values in this dictionary cannot override settings already defined by other parameters, likebroker_address
. -
consumer_extra_config
- a dictionary with additional Consumer options in the format of librdkafka.
Values in the dictionary cannot override settings already defined by other parameters, likebroker_address
,auto_offset_reset
andconsumer_group
. -
consumer_poll_timeout
- a timeout in seconds for the internal Consumer.poll()
.
Default -1.0
. -
producer_poll_timeout
- a timeout in seconds for the internal Producer.
Default -0.0
. -
request_timeout
- request timeout in seconds for any API-related calls, mostly around topic management. Default -30.0
.