Skip to content

Serializers API

quixstreams.models.serializers.simple_types

BytesDeserializer

class BytesDeserializer(Deserializer)

[VIEW SOURCE]

A deserializer to bypass bytes without any changes

BytesSerializer

class BytesSerializer(Serializer)

[VIEW SOURCE]

A serializer to bypass bytes without any changes

StringDeserializer

class StringDeserializer(Deserializer)

[VIEW SOURCE]



StringDeserializer.__init__

def __init__(codec: str = "utf_8")

[VIEW SOURCE]

Deserializes bytes to strings using the specified encoding.


Arguments:

  • codec: string encoding A wrapper around confluent_kafka.serialization.StringDeserializer.

IntegerDeserializer

class IntegerDeserializer(Deserializer)

[VIEW SOURCE]

Deserializes bytes to integers.

A wrapper around confluent_kafka.serialization.IntegerDeserializer.

DoubleDeserializer

class DoubleDeserializer(Deserializer)

[VIEW SOURCE]

Deserializes float to IEEE 764 binary64.

A wrapper around confluent_kafka.serialization.DoubleDeserializer.

StringSerializer

class StringSerializer(Serializer)

[VIEW SOURCE]



StringSerializer.__init__

def __init__(codec: str = "utf_8")

[VIEW SOURCE]

Serializes strings to bytes using the specified encoding.


Arguments:

  • codec: string encoding

IntegerSerializer

class IntegerSerializer(Serializer)

[VIEW SOURCE]

Serializes integers to bytes

DoubleSerializer

class DoubleSerializer(Serializer)

[VIEW SOURCE]

Serializes floats to bytes

quixstreams.models.serializers.json

JSONSerializer

class JSONSerializer(Serializer)

[VIEW SOURCE]



JSONSerializer.__init__

def __init__(
    dumps: Callable[[Any], Union[str, bytes]] = default_dumps,
    schema: Optional[Mapping] = None,
    validator: Optional[Validator] = None,
    schema_registry_client_config: Optional[SchemaRegistryClientConfig] = None,
    schema_registry_serialization_config: Optional[
        SchemaRegistrySerializationConfig] = None)

[VIEW SOURCE]

Serializer that returns data in json format.


Arguments:

  • dumps: a function to serialize objects to json. Default - 🇵🇾func:quixstreams.utils.json.dumps
  • schema: A schema used to validate the data using jsonschema.Draft202012Validator. Default - None
  • validator: A jsonschema validator used to validate the data. Takes precedences over the schema. Default - None
  • schema_registry_client_config: If provided, serialization is offloaded to Confluent's JSONSerializer. Default - None
  • schema_registry_serialization_config: Additional configuration for Confluent's JSONSerializer. Default - None

    NOTE: schema_registry_client_config must also be set.

JSONDeserializer

class JSONDeserializer(Deserializer)

[VIEW SOURCE]



JSONDeserializer.__init__

def __init__(
    loads: Callable[[Union[bytes, bytearray]], Any] = default_loads,
    schema: Optional[Mapping] = None,
    validator: Optional[Validator] = None,
    schema_registry_client_config: Optional[SchemaRegistryClientConfig] = None
)

[VIEW SOURCE]

Deserializer that parses data from JSON


Arguments:

  • loads: function to parse json from bytes. Default - 🇵🇾func:quixstreams.utils.json.loads.
  • schema: A schema used to validate the data using jsonschema.Draft202012Validator. Default - None
  • validator: A jsonschema validator used to validate the data. Takes precedences over the schema. Default - None
  • schema_registry_client_config: If provided, deserialization is offloaded to Confluent's JSONDeserializer. Default - None

quixstreams.models.serializers.avro

AvroSerializer

class AvroSerializer(Serializer)

[VIEW SOURCE]



AvroSerializer.__init__

def __init__(
    schema: Schema,
    strict: bool = False,
    strict_allow_default: bool = False,
    disable_tuple_notation: bool = False,
    schema_registry_client_config: Optional[SchemaRegistryClientConfig] = None,
    schema_registry_serialization_config: Optional[
        SchemaRegistrySerializationConfig] = None)

[VIEW SOURCE]

Serializer that returns data in Avro format.

For more information see fastavro schemaless_writer method.


Arguments:

  • schema: The avro schema.
  • strict: If set to True, an error will be raised if records do not contain exactly the same fields that the schema states. Default - False
  • strict_allow_default: If set to True, an error will be raised if records do not contain exactly the same fields that the schema states unless it is a missing field that has a default value in the schema. Default - False
  • disable_tuple_notation: If set to True, tuples will not be treated as a special case. Therefore, using a tuple to indicate the type of a record will not work. Default - False
  • schema_registry_client_config: If provided, serialization is offloaded to Confluent's AvroSerializer. Default - None
  • schema_registry_serialization_config: Additional configuration for Confluent's AvroSerializer. Default - None

    NOTE: schema_registry_client_config must also be set.

AvroDeserializer

class AvroDeserializer(Deserializer)

[VIEW SOURCE]



AvroDeserializer.__init__

def __init__(
    schema: Optional[Schema] = None,
    reader_schema: Optional[Schema] = None,
    return_record_name: bool = False,
    return_record_name_override: bool = False,
    return_named_type: bool = False,
    return_named_type_override: bool = False,
    handle_unicode_errors: str = "strict",
    schema_registry_client_config: Optional[SchemaRegistryClientConfig] = None
)

[VIEW SOURCE]

Deserializer that parses data from Avro.

For more information see fastavro schemaless_reader method.


Arguments:

  • schema: The Avro schema.
  • reader_schema: If the schema has changed since being written then the new schema can be given to allow for schema migration. Default - None
  • return_record_name: If true, when reading a union of records, the result will be a tuple where the first value is the name of the record and the second value is the record itself. Default - False
  • return_record_name_override: If true, this will modify the behavior of return_record_name so that the record name is only returned for unions where there is more than one record. For unions that only have one record, this option will make it so that the record is returned by itself, not a tuple with the name. Default - False
  • return_named_type: If true, when reading a union of named types, the result will be a tuple where the first value is the name of the type and the second value is the record itself NOTE: Using this option will ignore return_record_name and return_record_name_override. Default - False
  • return_named_type_override: If true, this will modify the behavior of return_named_type so that the named type is only returned for unions where there is more than one named type. For unions that only have one named type, this option will make it so that the named type is returned by itself, not a tuple with the name. Default - False
  • handle_unicode_errors: Should be set to a valid string that can be used in the errors argument of the string decode() function. Default - "strict"
  • schema_registry_client_config: If provided, deserialization is offloaded to Confluent's AvroDeserializer. Default - None

quixstreams.models.serializers.protobuf

ProtobufSerializer

class ProtobufSerializer(Serializer)

[VIEW SOURCE]



ProtobufSerializer.__init__

def __init__(
    msg_type: Message,
    deterministic: bool = False,
    ignore_unknown_fields: bool = False,
    schema_registry_client_config: Optional[SchemaRegistryClientConfig] = None,
    schema_registry_serialization_config: Optional[
        SchemaRegistrySerializationConfig] = None)

[VIEW SOURCE]

Serializer that returns data in protobuf format.

Serialisation from a python dictionary can have a significant performance impact. An alternative is to pass the serializer an object of the msg_type class.


Arguments:

  • msg_type: protobuf message class.
  • deterministic: If true, requests deterministic serialization of the protobuf, with predictable ordering of map keys Default - False
  • ignore_unknown_fields: If True, do not raise errors for unknown fields. Default - False
  • schema_registry_client_config: If provided, serialization is offloaded to Confluent's ProtobufSerializer. Default - None
  • schema_registry_serialization_config: Additional configuration for Confluent's ProtobufSerializer. Default - None

    NOTE: schema_registry_client_config must also be set.

ProtobufDeserializer

class ProtobufDeserializer(Deserializer)

[VIEW SOURCE]



ProtobufDeserializer.__init__

def __init__(
    msg_type: Message,
    use_integers_for_enums: bool = False,
    preserving_proto_field_name: bool = False,
    to_dict: bool = True,
    schema_registry_client_config: Optional[SchemaRegistryClientConfig] = None,
    schema_registry_serialization_config: Optional[
        SchemaRegistrySerializationConfig] = None)

[VIEW SOURCE]

Deserializer that parses protobuf data into a dictionary suitable for a StreamingDataframe.

Deserialisation to a python dictionary can have a significant performance impact. You can disable this behavior using to_dict, in that case the protobuf message will be used as the StreamingDataframe row value.


Arguments:

  • msg_type: protobuf message class.
  • use_integers_for_enums: If true, use integers instead of enum names. Default - False
  • preserving_proto_field_name: If True, use the original proto field names as defined in the .proto file. If False, convert the field names to lowerCamelCase. Default - False
  • to_dict: If false, return the protobuf message instead of a dict. Default - True
  • schema_registry_client_config: If provided, deserialization is offloaded to Confluent's ProtobufDeserializer. Default - None
  • schema_registry_serialization_config: Additional configuration for Confluent's ProtobufDeserializer. Default - None

    NOTE: schema_registry_client_config must also be set.

quixstreams.models.serializers.schema_registry

SchemaRegistryClientConfig

class SchemaRegistryClientConfig(BaseSettings)

[VIEW SOURCE]

Configuration required to establish the connection with a Schema Registry.


Arguments:

  • url: Schema Registry URL.
  • ssl_ca_location: Path to CA certificate file used to verify the Schema Registry's private key.
  • ssl_key_location: Path to the client's private key (PEM) used for authentication.

    NOTE: ssl_certificate_location must also be set.

  • ssl_certificate_location: Path to the client's public key (PEM) used for authentication.

    NOTE: May be set without ssl_key_location if the private key is stored within the PEM as well.

  • basic_auth_user_info: Client HTTP credentials in the form of username:password.

    NOTE: By default, userinfo is extracted from the URL if present.

SchemaRegistrySerializationConfig

class SchemaRegistrySerializationConfig(BaseSettings)

[VIEW SOURCE]

Configuration that instructs Serializer how to handle communication with a

Schema Registry.


Arguments:

  • auto_register_schemas: If True, automatically register the configured schema with Confluent Schema Registry if it has not previously been associated with the relevant subject (determined via subject.name.strategy). Defaults to True.
  • normalize_schemas: Whether to normalize schemas, which will transform schemas to have a consistent format, including ordering properties and references.
  • use_latest_version: Whether to use the latest subject version for serialization.

    NOTE: There is no check that the latest schema is backwards compatible with the object being serialized. Defaults to False.

  • subject_name_strategy: Callable(SerializationContext, str) -> str Defines how Schema Registry subject names are constructed. Standard naming strategies are defined in the confluent_kafka.schema_registry namespace. Defaults to topic_subject_name_strategy.
  • skip_known_types: Whether or not to skip known types when resolving schema dependencies. Defaults to False.
  • reference_subject_name_strategy: Defines how Schema Registry subject names for schema references are constructed. Defaults to reference_subject_name_strategy.
  • use_deprecated_format: Specifies whether the Protobuf serializer should serialize message indexes without zig-zag encoding. This option must be explicitly configured as older and newer Protobuf producers are incompatible. If the consumers of the topic being produced to are using confluent-kafka-python <1.8, then this property must be set to True until all old consumers have been upgraded.

quixstreams.models.serializers.quix

QuixDeserializer

class QuixDeserializer(JSONDeserializer)

[VIEW SOURCE]

Handles Deserialization for any Quix-formatted topic.

Parses JSON data from either TimeseriesData and EventData (ignores the rest).



QuixDeserializer.__init__

def __init__(loads: Callable[[Union[bytes, bytearray]], Any] = default_loads)

[VIEW SOURCE]


Arguments:

  • loads: function to parse json from bytes. Default - 🇵🇾func:quixstreams.utils.json.loads.



QuixDeserializer.split_values

@property
def split_values() -> bool

[VIEW SOURCE]

Each Quix message might contain data for multiple Rows. This property informs the downstream processors about that, so they can expect an Iterable instead of Mapping.



QuixDeserializer.deserialize

def deserialize(model_key: str, value: Union[List[Mapping],
                                             Mapping]) -> Iterable[Mapping]

[VIEW SOURCE]

Deserialization function for particular data types (Timeseries or EventData).


Arguments:

  • model_key: value of "__Q_ModelKey" message header
  • value: deserialized JSON value of the message, list or dict


Returns:

Iterable of dicts

QuixTimeseriesSerializer

class QuixTimeseriesSerializer(QuixSerializer)

[VIEW SOURCE]

Serialize data to JSON formatted according to Quix Timeseries format.

The serializable object must be dictionary, and each item must be of str, int, float, bytes or bytearray type. Otherwise, the SerializationError will be raised.

Input:

{'a': 1, 'b': 1.1, 'c': "string", 'd': b'bytes', 'Tags': {'tag1': 'tag'}}

Output:

{
    "Timestamps": [123123123],
    "NumericValues": {"a": [1], "b": [1.1]},
    "StringValues": {"c": ["string"]},
    "BinaryValues": {"d": ["Ynl0ZXM="]},
    "TagValues": {"tag1": ["tag"]}
}

QuixEventsSerializer

class QuixEventsSerializer(QuixSerializer)

[VIEW SOURCE]

Serialize data to JSON formatted according to Quix EventData format. The input value is expected to be a dictionary with the following keys: - "Id" (type str, default - "") - "Value" (type str, default - ""), - "Tags" (type dict, default - {})

NOTE: All the other fields will be ignored.

Input:

{
    "Id": "an_event",
    "Value": "any_string",
    "Tags": {"tag1": "tag"}}
}

Output:

{
    "Id": "an_event",
    "Value": "any_string",
    "Tags": {"tag1": "tag"}},
    "Timestamp":1692703362840389000
}