Skip to content

Context API

quixstreams.context



set_message_context

def set_message_context(context: Optional[MessageContext])

[VIEW SOURCE]

Set a MessageContext for the current message in the given contextvars.Context

NOTE: This is for advanced usage only. If you need to change the message key, StreamingDataFrame.to_topic() has an argument for it.


Example Snippet:

from quixstreams import Application, set_message_context, message_context

# Changes the current sdf value based on what the message partition is.
def alter_context(value):
    context = message_context()
    if value > 1:
        context.headers = context.headers + (b"cool_new_header", value.encode())
        set_message_context(context)

app = Application()
sdf = app.dataframe()
sdf = sdf.update(lambda value: alter_context(value))


Arguments:

  • context: instance of MessageContext



message_context

def message_context() -> MessageContext

[VIEW SOURCE]

Get a MessageContext for the current message, which houses most of the message

metadata, like: - key - timestamp - partition - offset


Example Snippet:

from quixstreams import Application, message_context

# Changes the current sdf value based on what the message partition is.

app = Application()
sdf = app.dataframe()
sdf = sdf.apply(lambda value: 1 if message_context().partition == 2 else 0)


Returns:

instance of MessageContext



message_key

def message_key() -> Any

[VIEW SOURCE]

Get the current message's key.


Example Snippet:

from quixstreams import Application, message_key

# Changes the current sdf value based on what the message key is.

app = Application()
sdf = app.dataframe()
sdf = sdf.apply(lambda value: 1 if message_key() == b'1' else 0)


Returns:

a deserialized message key