Skip to content

Topics & Serializers

quixstreams.models.topics

quixstreams.models.serializers.quix

QuixDeserializer

class QuixDeserializer(JSONDeserializer)

[VIEW SOURCE]

Handles Deserialization for any Quix-formatted topic.

Parses JSON data from either TimeseriesData and EventData (ignores the rest).



QuixDeserializer.__init__

def __init__(column_name: Optional[str] = None,
             loads: Callable[[Union[bytes, bytearray]], Any] = default_loads)

[VIEW SOURCE]


Arguments:

  • column_name: if provided, the deserialized value will be wrapped into dictionary with column_name as a key.
  • loads: function to parse json from bytes. Default - 🇵🇾func:quixstreams.utils.json.loads.



QuixDeserializer.split_values

@property
def split_values() -> bool

[VIEW SOURCE]

Each Quix message might contain data for multiple Rows. This property informs the downstream processors about that, so they can expect an Iterable instead of Mapping.



QuixDeserializer.deserialize

def deserialize(model_key: str, value: Union[List[Mapping],
                                             Mapping]) -> Iterable[Mapping]

[VIEW SOURCE]

Deserialization function for particular data types (Timeseries or EventData).


Arguments:

  • model_key: value of "__Q_ModelKey" message header
  • value: deserialized JSON value of the message, list or dict


Returns:

Iterable of dicts

QuixTimeseriesSerializer

class QuixTimeseriesSerializer(QuixSerializer)

[VIEW SOURCE]

Serialize data to JSON formatted according to Quix Timeseries format.

The serializable object must be dictionary, and each item must be of str, int, float, bytes or bytearray type. Otherwise, the SerializationError will be raised.

Input:

{'a': 1, 'b': 1.1, 'c': "string", 'd': b'bytes', 'Tags': {'tag1': 'tag'}}

Output:

{
    "Timestamps": [123123123],
    "NumericValues": {"a": [1], "b": [1.1]},
    "StringValues": {"c": ["string"]},
    "BinaryValues": {"d": ["Ynl0ZXM="]},
    "TagValues": {"tag1": ["tag"]}
}

QuixEventsSerializer

class QuixEventsSerializer(QuixSerializer)

[VIEW SOURCE]

Serialize data to JSON formatted according to Quix EventData format. The input value is expected to be a dictionary with the following keys: - "Id" (type str, default - "") - "Value" (type str, default - ""), - "Tags" (type dict, default - {})

NOTE: All the other fields will be ignored.

Input:

{
    "Id": "an_event",
    "Value": "any_string",
    "Tags": {"tag1": "tag"}}
}

Output:

{
    "Id": "an_event",
    "Value": "any_string",
    "Tags": {"tag1": "tag"}},
    "Timestamp":1692703362840389000
}

quixstreams.models.serializers.simple_types

BytesDeserializer

class BytesDeserializer(Deserializer)

[VIEW SOURCE]

A deserializer to bypass bytes without any changes

BytesSerializer

class BytesSerializer(Serializer)

[VIEW SOURCE]

A serializer to bypass bytes without any changes

StringDeserializer

class StringDeserializer(Deserializer)

[VIEW SOURCE]



StringDeserializer.__init__

def __init__(column_name: Optional[str] = None, codec: str = "utf_8")

[VIEW SOURCE]

Deserializes bytes to strings using the specified encoding.


Arguments:

  • codec: string encoding A wrapper around confluent_kafka.serialization.StringDeserializer.

IntegerDeserializer

class IntegerDeserializer(Deserializer)

[VIEW SOURCE]

Deserializes bytes to integers.

A wrapper around confluent_kafka.serialization.IntegerDeserializer.

DoubleDeserializer

class DoubleDeserializer(Deserializer)

[VIEW SOURCE]

Deserializes float to IEEE 764 binary64.

A wrapper around confluent_kafka.serialization.DoubleDeserializer.

StringSerializer

class StringSerializer(Serializer)

[VIEW SOURCE]



StringSerializer.__init__

def __init__(codec: str = "utf_8")

[VIEW SOURCE]

Serializes strings to bytes using the specified encoding.


Arguments:

  • codec: string encoding

IntegerSerializer

class IntegerSerializer(Serializer)

[VIEW SOURCE]

Serializes integers to bytes

DoubleSerializer

class DoubleSerializer(Serializer)

[VIEW SOURCE]

Serializes floats to bytes