Skip to content

Schema Registry

Serializers and deserializers for JSON Schema, Avro, and Protobuf support integration with a Schema Registry.

The current implementation wraps Confluent's serializers and deserializers, which are tightly coupled with the Schema Registry.

To integrate your existing Schema Registry, pass SchemaRegistryClientConfig to your serializers and deserializers. Additional optional configuration can be provided via SchemaRegistrySerializationConfig.

from quixstreams.models import (
    SchemaRegistryClientConfig,
    SchemaRegistrySerializationConfig,
)

schema_registry_client_config = SchemaRegistryClientConfig(
    url='localhost:8081',
    basic_auth_user_info='username:password',
)

# optional
schema_registry_serialization_config = SchemaRegistrySerializationConfig(
    auto_register_schemas=False,
)

Note: For the full list of available options, refer to the Serializers API.

JSON Schema

For both the serializer and deserializer, a schema must be provided.

from quixstreams.models import JSONDeserializer, JSONSerializer

MY_SCHEMA = {
    "title": "MyObject",
    "type": "object",
    "properties": {
        "name": {"type": "string"},
        "id": {"type": "number"},
    },
    "required": ["id"],
}

deserializer = JSONDeserializer(
    schema=MY_SCHEMA,
    schema_registry_client_config=schema_registry_client_config,
    schema_registry_serialization_config=schema_registry_serialization_config,
)
serializer = JSONSerializer(
    schema=MY_SCHEMA,
    schema_registry_client_config=schema_registry_client_config,
    schema_registry_serialization_config=schema_registry_serialization_config,
)

Avro

The serializer requires a schema, but the deserializer can automatically fetch the required schema from the Schema Registry.

from quixstreams.models.serialize.avro import AvroDeserializer, AvroSerializer

MY_SCHEMA = {
    "type": "record",
    "name": "testschema",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "id", "type": "int", "default": 0},
    ],
}

deserializer = AvroDeserializer(
    schema_registry_client_config=schema_registry_client_config,
    schema_registry_serialization_config=schema_registry_serialization_config,
)
serializer = AvroSerializer(
    schema=MY_SCHEMA,
    schema_registry_client_config=schema_registry_client_config,
    schema_registry_serialization_config=schema_registry_serialization_config,
)

Protobuf

For both the serializer and deserializer, msg_type must be provided.

from quixstreams.models.serialize.protobuf import ProtobufDeserializer, ProtobufSerializer

from my_input_models_pb2 import InputProto
from my_output_models_pb2 import OutputProto

deserializer = ProtobufDeserializer(
    msg_type=InputProto,
    schema_registry_client_config=schema_registry_client_config,
    schema_registry_serialization_config=schema_registry_serialization_config,
)
serializer = ProtobufSerializer(
    msg_type=OutputProto,
    schema_registry_client_config=schema_registry_client_config,
    schema_registry_serialization_config=schema_registry_serialization_config,
)

See the Serialization and Deserialization page to learn more about how to integrate the serializer and deserializer with your application.