Skip to content

Sources API

quixstreams.sources.base.source

BaseSource

class BaseSource(ABC)

[VIEW SOURCE]

This is the base class for all sources.

Sources are executed in a sub-process of the main application.

To create your own source you need to implement:

  • start
  • stop
  • default_topic

BaseSource is the most basic interface, and the framework expects every source to implement it. Use Source to benefit from a base implementation.

You can connect a source to a StreamingDataframe using the Application.

Example snippet:

class RandomNumbersSource(BaseSource):
def __init__(self):
    super().__init__()
    self._running = False

def start(self):
    self._running = True

    while self._running:
        number = random.randint(0, 100)
        serialized = self._producer_topic.serialize(value=number)
        self._producer.produce(
            topic=self._producer_topic.name,
            key=serialized.key,
            value=serialized.value,
        )

def stop(self):
    self._running = False

def default_topic(self) -> Topic:
    return Topic(
        name="topic-name",
        value_deserializer="json",
        value_serializer="json",
    )


def main():
    app = Application(broker_address="localhost:9092")
    source = RandomNumbersSource()

    sdf = app.dataframe(source=source)
    sdf.print(metadata=True)

    app.run()


if __name__ == "__main__":
    main()



BaseSource.configure

def configure(topic: Topic, producer: InternalProducer, **kwargs) -> None

[VIEW SOURCE]

This method is triggered before the source is started.

It configures the source's Kafka producer, the topic it will produce to and optional dependencies.



BaseSource.setup

def setup()

[VIEW SOURCE]

When applicable, set up the client here along with any validation to affirm a valid/successful authentication/connection.



BaseSource.start

@abstractmethod
def start() -> None

[VIEW SOURCE]

This method is triggered in the subprocess when the source is started.

The subprocess will run as long as the start method executes. Use it to fetch data and produce it to Kafka.



BaseSource.stop

@abstractmethod
def stop() -> None

[VIEW SOURCE]

This method is triggered when the application is shutting down.

The source must ensure that the run method is completed soon.



BaseSource.default_topic

@abstractmethod
def default_topic() -> Topic

[VIEW SOURCE]

This method is triggered when the topic is not provided to the source.

The source must return a default topic configuration.

Note: if the default topic is used, the Application will prefix its name with "source__".

Source

class Source(BaseSource)

[VIEW SOURCE]

A base class for custom Sources that provides a basic implementation of BaseSource interface. It is recommended to interface to create custom sources.

Subclass it and implement the run method to fetch data and produce it to Kafka.

Example:

import random
import time

from quixstreams import Application
from quixstreams.sources import Source


class RandomNumbersSource(Source):
    def run(self):
        while self.running:
            number = random.randint(0, 100)
            serialized = self._producer_topic.serialize(value=number)
            self.produce(key=str(number), value=serialized.value)
            time.sleep(0.5)


def main():
    app = Application(broker_address="localhost:9092")
    source = RandomNumbersSource(name="random-source")

    sdf = app.dataframe(source=source)
    sdf.print(metadata=True)

    app.run()


if __name__ == "__main__":
    main()

Helper methods and properties:

  • serialize()
  • produce()
  • flush()
  • running



Source.__init__

def __init__(
    name: str,
    shutdown_timeout: float = 10,
    on_client_connect_success: Optional[ClientConnectSuccessCallback] = None,
    on_client_connect_failure: Optional[ClientConnectFailureCallback] = None
) -> None

[VIEW SOURCE]


Arguments:

  • name: The source unique name. It is used to generate the topic configuration.
  • shutdown_timeout: Time in second the application waits for the source to gracefully shutdown.
  • on_client_connect_success: An optional callback made after successful client authentication, primarily for additional logging.
  • on_client_connect_failure: An optional callback made after failed client authentication (which should raise an Exception). Callback should accept the raised Exception as an argument. Callback must resolve (or propagate/re-raise) the Exception.



Source.running

@property
def running() -> bool

[VIEW SOURCE]

Property indicating if the source is running.

The stop method will set it to False. Use it to stop the source gracefully.



Source.cleanup

def cleanup(failed: bool) -> None

[VIEW SOURCE]

This method is triggered once the run method completes.

Use it to clean up the resources and shut down the source gracefully.

It flushes the producer when _run completes successfully.



Source.stop

def stop() -> None

[VIEW SOURCE]

This method is triggered when the application is shutting down.

It sets the running property to False.



Source.start

def start() -> None

[VIEW SOURCE]

This method is triggered in the subprocess when the source is started.

It marks the source as running, execute it's run method and ensure cleanup happens.



Source.run

@abstractmethod
def run()

[VIEW SOURCE]

This method is triggered in the subprocess when the source is started.

The subprocess will run as long as the run method executes. Use it to fetch data and produce it to Kafka.



Source.serialize

def serialize(key: Optional[object] = None,
              value: Optional[object] = None,
              headers: Optional[Headers] = None,
              timestamp_ms: Optional[int] = None) -> KafkaMessage

[VIEW SOURCE]

Serialize data to bytes using the producer topic serializers and return a quixstreams.models.messages.KafkaMessage.


Returns:

quixstreams.models.messages.KafkaMessage



Source.produce

def produce(value: Optional[Union[str, bytes]] = None,
            key: Optional[Union[str, bytes]] = None,
            headers: Optional[Headers] = None,
            partition: Optional[int] = None,
            timestamp: Optional[int] = None,
            poll_timeout: float = PRODUCER_POLL_TIMEOUT,
            buffer_error_max_tries: int = PRODUCER_ON_ERROR_RETRIES) -> None

[VIEW SOURCE]

Produce a message to the configured source topic in Kafka.



Source.flush

def flush(timeout: Optional[float] = None) -> None

[VIEW SOURCE]

This method flush the producer.

It ensures all messages are successfully delivered to Kafka.


Arguments:

  • timeout (float): time to attempt flushing (seconds). None use producer default or -1 is infinite. Default: None

Raises:

  • CheckpointProducerTimeout: if any message fails to produce before the timeout



Source.default_topic

def default_topic() -> Topic

[VIEW SOURCE]

Return a default topic matching the source name.

The default topic will not be used if the topic has already been provided to the source.

Note: if the default topic is used, the Application will prefix its name with "source__".


Returns:

quixstreams.models.topics.Topic

StatefulSource

class StatefulSource(Source)

[VIEW SOURCE]

A Source class for custom Sources that need a state.

Subclasses are responsible for flushing, by calling flush, at reasonable intervals.

Example:

import random
import time

from quixstreams import Application
from quixstreams.sources import StatefulSource


class RandomNumbersSource(StatefulSource):
    def run(self):

        i = 0
        while self.running:
            previous = self.state.get("number", 0)
            current = random.randint(0, 100)
            self.state.set("number", current)

            serialized = self._producer_topic.serialize(value=current + previous)
            self.produce(key=str(current), value=serialized.value)
            time.sleep(0.5)

            # flush the state every 10 messages
            i += 1
            if i % 10 == 0:
                self.flush()


def main():
    app = Application(broker_address="localhost:9092")
    source = RandomNumbersSource(name="random-source")

    sdf = app.dataframe(source=source)
    sdf.print(metadata=True)

    app.run()


if __name__ == "__main__":
    main()



StatefulSource.__init__

def __init__(
    name: str,
    shutdown_timeout: float = 10,
    on_client_connect_success: Optional[ClientConnectSuccessCallback] = None,
    on_client_connect_failure: Optional[ClientConnectFailureCallback] = None
) -> None

[VIEW SOURCE]


Arguments:

  • name: The source unique name. It is used to generate the topic configuration.
  • shutdown_timeout: Time in second the application waits for the source to gracefully shutdown.
  • on_client_connect_success: An optional callback made after successful client authentication, primarily for additional logging.
  • on_client_connect_failure: An optional callback made after failed client authentication (which should raise an Exception). Callback should accept the raised Exception as an argument. Callback must resolve (or propagate/re-raise) the Exception.



StatefulSource.configure

def configure(topic: Topic,
              producer: InternalProducer,
              *,
              store_partition: Optional[StorePartition] = None,
              **kwargs) -> None

[VIEW SOURCE]

This method is triggered before the source is started.

It configures the source's Kafka producer, the topic it will produce to and the store partition.



StatefulSource.store_partitions_count

@property
def store_partitions_count() -> int

[VIEW SOURCE]

Count of store partitions.

Used to configure the number of partition in the changelog topic.



StatefulSource.assigned_store_partition

@property
def assigned_store_partition() -> int

[VIEW SOURCE]

The store partition assigned to this instance



StatefulSource.store_name

@property
def store_name() -> str

[VIEW SOURCE]

The source store name



StatefulSource.state

@property
def state() -> State

[VIEW SOURCE]

Access the State of the source.

The State lifecycle is tied to the store transaction. A transaction is only valid until the next .flush() call. If no valid transaction exist, a new transaction is created.

Important: after each .flush() call, a previously returned instance is invalidated and cannot be used. The property must be called again.



StatefulSource.flush

def flush(timeout: Optional[float] = None) -> None

[VIEW SOURCE]

This method commit the state and flush the producer.

It ensures the state is published to the changelog topic and all messages are successfully delivered to Kafka.


Arguments:

  • timeout (float): time to attempt flushing (seconds). None use producer default or -1 is infinite. Default: None

Raises:

  • CheckpointProducerTimeout: if any message fails to produce before the timeout

quixstreams.sources.core.csv

CSVSource

class CSVSource(Source)

[VIEW SOURCE]



CSVSource.__init__

def __init__(path: Union[str, Path],
             name: str,
             key_extractor: Optional[Callable[[dict], Union[str,
                                                            bytes]]] = None,
             timestamp_extractor: Optional[Callable[[dict], int]] = None,
             delay: float = 0,
             shutdown_timeout: float = 10,
             dialect: str = "excel") -> None

[VIEW SOURCE]

A base CSV source that reads data from a CSV file and produces rows

to the Kafka topic in JSON format.


Arguments:

  • path: a path to the CSV file.
  • name: a unique name for the Source. It is used as a part of the default topic name.
  • key_extractor: an optional callable to extract the message key from the row. It must return either str or bytes. If empty, the Kafka messages will be produced without keys. Default - None.
  • timestamp_extractor: an optional callable to extract the message timestamp from the row. It must return time in milliseconds as int. If empty, the current epoch will be used. Default - None
  • delay: an optional delay after producing each row for stream simulation. Default - 0.
  • shutdown_timeout: Time in second the application waits for the source to gracefully shut down.
  • dialect: a CSV dialect to use. It affects quoting and delimiters. See the "csv" module docs for more info. Default - "excel".

quixstreams.sources.core.kafka.kafka

KafkaReplicatorSource

class KafkaReplicatorSource(Source)

[VIEW SOURCE]

Source implementation that replicates a topic from a Kafka broker to your application broker.

Running multiple instances of this source is supported.


Example Snippet:

from quixstreams import Application
from quixstreams.sources.kafka import KafkaReplicatorSource

app = Application(
    consumer_group="group",
)

source = KafkaReplicatorSource(
    name="source-second-kafka",
    app_config=app.config,
    topic="second-kafka-topic",
    broker_address="localhost:9092",
)

sdf = app.dataframe(source=source)
sdf = sdf.print()
app.run()



KafkaReplicatorSource.__init__

def __init__(
    name: str,
    app_config: "ApplicationConfig",
    topic: str,
    broker_address: Union[str, ConnectionConfig],
    auto_offset_reset: Optional[AutoOffsetReset] = "latest",
    consumer_extra_config: Optional[dict] = None,
    consumer_poll_timeout: Optional[float] = None,
    shutdown_timeout: float = 10,
    on_consumer_error: ConsumerErrorCallback = default_on_consumer_error,
    value_deserializer: DeserializerType = "json",
    key_deserializer: DeserializerType = "bytes",
    on_client_connect_success: Optional[ClientConnectSuccessCallback] = None,
    on_client_connect_failure: Optional[ClientConnectFailureCallback] = None
) -> None

[VIEW SOURCE]


Arguments:

  • name: The source unique name. It is used to generate the default topic name and consumer group name on the source broker. Running multiple instances of KafkaReplicatorSource with the same name connected to the same broker will make them share the same consumer group.
  • app_config: The configuration of the application. Used by the source to connect to the application kafka broker.
  • topic: The topic to replicate.
  • broker_address: The connection settings for the source Kafka.
  • auto_offset_reset: Consumer auto.offset.reset setting. Default - Use the Application auto_offset_reset setting.
  • consumer_extra_config: A dictionary with additional options that will be passed to confluent_kafka.Consumer as is. Default - None
  • consumer_poll_timeout: timeout for InternalConsumer.poll() Default - Use the Application consumer_poll_timeout setting.
  • shutdown_timeout: Time in second the application waits for the source to gracefully shutdown.
  • on_consumer_error: Triggered when the source Consumer fails to poll Kafka.
  • value_deserializer: The default topic value deserializer, used by StreamingDataframe connected to the source. Default - json
  • key_deserializer: The default topic key deserializer, used by StreamingDataframe connected to the source. Default - json
  • on_client_connect_success: An optional callback made after successful client authentication, primarily for additional logging.
  • on_client_connect_failure: An optional callback made after failed client authentication (which should raise an Exception). Callback should accept the raised Exception as an argument. Callback must resolve (or propagate/re-raise) the Exception.

quixstreams.sources.core.kafka.quix

QuixEnvironmentSource

class QuixEnvironmentSource(KafkaReplicatorSource)

[VIEW SOURCE]

Source implementation that replicates a topic from a Quix Cloud environment to your application broker. It can copy messages for development and testing without risking producing them back or affecting the consumer groups.

Running multiple instances of this source is supported.


Example Snippet:

from quixstreams import Application
from quixstreams.sources.kafka import QuixEnvironmentSource

app = Application(
    consumer_group="group",
)

source = QuixEnvironmentSource(
    name="source-quix",
    app_config=app.config,
    quix_workspace_id="WORKSPACE_ID",
    quix_sdk_token="WORKSPACE_SDK_TOKEN",
    topic="quix-source-topic",
)

sdf = app.dataframe(source=source)
sdf = sdf.print()
app.run()



QuixEnvironmentSource.__init__

def __init__(
    name: str,
    app_config: "ApplicationConfig",
    topic: str,
    quix_sdk_token: str,
    quix_workspace_id: str,
    quix_portal_api: Optional[str] = None,
    auto_offset_reset: Optional[AutoOffsetReset] = None,
    consumer_extra_config: Optional[dict] = None,
    consumer_poll_timeout: Optional[float] = None,
    shutdown_timeout: float = 10,
    on_consumer_error: ConsumerErrorCallback = default_on_consumer_error,
    value_deserializer: DeserializerType = "json",
    key_deserializer: DeserializerType = "bytes",
    on_client_connect_success: Optional[ClientConnectSuccessCallback] = None,
    on_client_connect_failure: Optional[ClientConnectFailureCallback] = None
) -> None

[VIEW SOURCE]


Arguments:

  • quix_workspace_id: The Quix workspace ID of the source environment.
  • quix_sdk_token: Quix cloud sdk token used to connect to the source environment.
  • quix_portal_api: The Quix portal API URL of the source environment. Default - Quix__Portal__Api environment variable or Quix cloud production URL

For other parameters See quixstreams.sources.kafka.KafkaReplicatorSource

quixstreams.sources.community.file.azure

AzureFileSource

class AzureFileSource(FileSource)

[VIEW SOURCE]

A source for extracting records stored within files in an Azure Filestore container.

It recursively iterates from the provided path (file or folder) and processes all found files by parsing and producing the given records contained in each file as individual messages to a kafka topic (same topic for all).



AzureFileSource.__init__

def __init__(connection_string: str,
             container: str,
             filepath: Union[str, Path],
             key_setter: Optional[Callable[[object], object]] = None,
             value_setter: Optional[Callable[[object], object]] = None,
             timestamp_setter: Optional[Callable[[object], int]] = None,
             file_format: Union[Format, FormatName] = "json",
             compression: Optional[CompressionName] = None,
             has_partition_folders: bool = False,
             replay_speed: float = 1.0,
             name: Optional[str] = None,
             shutdown_timeout: float = 30,
             on_client_connect_success: Optional[
                 ClientConnectSuccessCallback] = None,
             on_client_connect_failure: Optional[
                 ClientConnectFailureCallback] = None)

[VIEW SOURCE]


Arguments:

  • connection_string: Azure client authentication string.
  • container: Azure container name.
  • filepath: folder to recursively iterate from (a file will be used directly).
  • key_setter: sets the kafka message key for a record in the file.
  • value_setter: sets the kafka message value for a record in the file.
  • timestamp_setter: sets the kafka message timestamp for a record in the file.
  • file_format: what format the files are stored as (ex: "json").
  • compression: what compression was used on the files, if any (ex. "gzip").
  • has_partition_folders: whether files are nested within partition folders. If True, FileSource will match the output topic partition count with it. Set this flag to True if Quix Streams FileSink was used to dump data. Note: messages will only align with these partitions if original key is used. Example structure - a 2 partition topic (0, 1): [/topic/0/file_0.ext, /topic/0/file_1.ext, /topic/1/file_0.ext]
  • replay_speed: Produce messages with this speed multiplier, which roughly reflects the time "delay" between the original message producing. Use any float >= 0, where 0 is no delay, and 1 is the original speed. NOTE: Time delay will only be accurate per partition, NOT overall.
  • name: The name of the Source application (Default: last folder name).
  • shutdown_timeout: Time in seconds the application waits for the source to gracefully shut down.
  • on_client_connect_success: An optional callback made after successful client authentication, primarily for additional logging.
  • on_client_connect_failure: An optional callback made after failed client authentication (which should raise an Exception). Callback should accept the raised Exception as an argument. Callback must resolve (or propagate/re-raise) the Exception.



AzureFileSource.file_partition_counter

def file_partition_counter() -> int

[VIEW SOURCE]

This is a simplified version of the recommended way to retrieve folder names based on the azure SDK docs examples.

quixstreams.sources.community.file.base

FileSource

class FileSource(Source)

[VIEW SOURCE]

An interface for extracting records using a file-based client.

It recursively iterates from a provided path (file or folder) and processes all found files by parsing and producing the given records contained in each file as individual messages to a kafka topic.

Requires defining methods for navigating folders and retrieving/opening raw files for the respective client.

When these abstract methods are defined, a FileSource will be able to: 1. Prepare a list of files to download, and retrieve them sequentially 2. Retrieve file contents asynchronously by downloading the upcoming one in the background 3. Decompress and deserialize the current file to loop through its records 4. Apply a replay delay for each contained record based on previous record 5. Serialize and produce respective messages to Kafka based on provided setters



FileSource.__init__

def __init__(filepath: Union[str, Path],
             key_setter: Optional[Callable[[object], object]] = None,
             value_setter: Optional[Callable[[object], object]] = None,
             timestamp_setter: Optional[Callable[[object], int]] = None,
             file_format: Union[Format, FormatName] = "json",
             compression: Optional[CompressionName] = None,
             has_partition_folders: bool = False,
             replay_speed: float = 1.0,
             name: Optional[str] = None,
             shutdown_timeout: float = 30,
             on_client_connect_success: Optional[
                 ClientConnectSuccessCallback] = None,
             on_client_connect_failure: Optional[
                 ClientConnectFailureCallback] = None)

[VIEW SOURCE]


Arguments:

  • filepath: folder to recursively iterate from (a file will be used directly).
  • key_setter: sets the kafka message key for a record in the file.
  • value_setter: sets the kafka message value for a record in the file.
  • timestamp_setter: sets the kafka message timestamp for a record in the file.
  • file_format: what format the files are stored as (ex: "json").
  • compression: what compression was used on the files, if any (ex. "gzip").
  • has_partition_folders: whether files are nested within partition folders. If True, FileSource will match the output topic partition count with it. Set this flag to True if Quix Streams FileSink was used to dump data. Note: messages will only align with these partitions if original key is used. Example structure - a 2 partition topic (0, 1): [/topic/0/file_0.ext, /topic/0/file_1.ext, /topic/1/file_0.ext]
  • replay_speed: Produce messages with this speed multiplier, which roughly reflects the time "delay" between the original message producing. Use any float >= 0, where 0 is no delay, and 1 is the original speed. NOTE: Time delay will only be accurate per partition, NOT overall.
  • name: The name of the Source application (Default: last folder name).
  • shutdown_timeout: Time in seconds the application waits for the source to gracefully shut down.
  • on_client_connect_success: An optional callback made after successful client authentication, primarily for additional logging.
  • on_client_connect_failure: An optional callback made after failed client authentication (which should raise an Exception). Callback should accept the raised Exception as an argument. Callback must resolve (or propagate/re-raise) the Exception.



FileSource.get_file_list

@abstractmethod
def get_file_list(filepath: Path) -> Iterable[Path]

[VIEW SOURCE]

Find all files/"blobs" starting from a root folder.

Each item in the iterable should be a resolvable filepath.


Arguments:

  • filepath: a starting filepath


Returns:

an iterable will all desired files in their desired processing order



FileSource.read_file

@abstractmethod
def read_file(filepath: Path) -> BinaryIO

[VIEW SOURCE]

Returns a filepath as an unaltered, open filestream.

Result should be ready for deserialization (and/or decompression).



FileSource.process_record

def process_record(record: object)

[VIEW SOURCE]

Applies replay delay, serializes the record, and produces it to Kafka.



FileSource.file_partition_counter

def file_partition_counter() -> int

[VIEW SOURCE]

Can optionally define a way of counting folders to intelligently set the "default_topic" partition count to match partition folder count.

If defined, class flag "has_partition_folders" can then be set to employ it.

It is not required since this operation may not be easy to implement, and the file structure may not be used outside Quix Streams FileSink.

Example structure with 2 partitions (0,1):

topic_name/
├── 0/               # partition 0
│   ├── file_a.ext
│   └── file_b.ext
└── 1/               # partition 1
    ├── file_x.ext
    └── file_y.ext



FileSource.default_topic

def default_topic() -> Topic

[VIEW SOURCE]

Optionally allows the file structure to define the partition count for the

internal topic with file_partition_counter (instead of the default of 1).


Returns:

the default topic with optionally altered partition count

quixstreams.sources.community.file.local

LocalFileSource

class LocalFileSource(FileSource)

[VIEW SOURCE]

A source for extracting records stored within files in a local filesystem.

It recursively iterates from the provided path (file or folder) and processes all found files by parsing and producing the given records contained in each file as individual messages to a kafka topic (same topic for all).



LocalFileSource.__init__

def __init__(filepath: Union[str, Path],
             key_setter: Optional[Callable[[object], object]] = None,
             value_setter: Optional[Callable[[object], object]] = None,
             timestamp_setter: Optional[Callable[[object], int]] = None,
             file_format: Union[Format, FormatName] = "json",
             compression: Optional[CompressionName] = None,
             has_partition_folders: bool = False,
             replay_speed: float = 1.0,
             name: Optional[str] = None,
             shutdown_timeout: float = 30,
             on_client_connect_success: Optional[
                 ClientConnectSuccessCallback] = None,
             on_client_connect_failure: Optional[
                 ClientConnectFailureCallback] = None)

[VIEW SOURCE]


Arguments:

  • filepath: folder to recursively iterate from (a file will be used directly).
  • key_setter: sets the kafka message key for a record in the file.
  • value_setter: sets the kafka message value for a record in the file.
  • timestamp_setter: sets the kafka message timestamp for a record in the file.
  • file_format: what format the files are stored as (ex: "json").
  • compression: what compression was used on the files, if any (ex. "gzip").
  • has_partition_folders: whether files are nested within partition folders. If True, FileSource will match the output topic partition count with it. Set this flag to True if Quix Streams FileSink was used to dump data. Note: messages will only align with these partitions if original key is used. Example structure - a 2 partition topic (0, 1): [/topic/0/file_0.ext, /topic/0/file_1.ext, /topic/1/file_0.ext]
  • replay_speed: Produce messages with this speed multiplier, which roughly reflects the time "delay" between the original message producing. Use any float >= 0, where 0 is no delay, and 1 is the original speed. NOTE: Time delay will only be accurate per partition, NOT overall.
  • name: The name of the Source application (Default: last folder name).
  • shutdown_timeout: Time in seconds the application waits for the source to gracefully shut down.
  • on_client_connect_success: An optional callback made after successful client authentication, primarily for additional logging.
  • on_client_connect_failure: An optional callback made after failed client authentication (which should raise an Exception). Callback should accept the raised Exception as an argument. Callback must resolve (or propagate/re-raise) the Exception.

quixstreams.sources.community.file.s3

S3FileSource

class S3FileSource(FileSource)

[VIEW SOURCE]

A source for extracting records stored within files in an S3 bucket location.

It recursively iterates from the provided path (file or folder) and processes all found files by parsing and producing the given records contained in each file as individual messages to a kafka topic (same topic for all).



S3FileSource.__init__

def __init__(
        filepath: Union[str, Path],
        bucket: str,
        region_name: Optional[str] = getenv("AWS_REGION"),
        aws_access_key_id: Optional[str] = getenv("AWS_ACCESS_KEY_ID"),
        aws_secret_access_key: Optional[str] = getenv("AWS_SECRET_ACCESS_KEY"),
        endpoint_url: Optional[str] = getenv("AWS_ENDPOINT_URL_S3"),
        key_setter: Optional[Callable[[object], object]] = None,
        value_setter: Optional[Callable[[object], object]] = None,
        timestamp_setter: Optional[Callable[[object], int]] = None,
        has_partition_folders: bool = False,
        file_format: Union[Format, FormatName] = "json",
        compression: Optional[CompressionName] = None,
        replay_speed: float = 1.0,
        name: Optional[str] = None,
        shutdown_timeout: float = 30,
        on_client_connect_success: Optional[
            ClientConnectSuccessCallback] = None,
        on_client_connect_failure: Optional[
            ClientConnectFailureCallback] = None)

[VIEW SOURCE]


Arguments:

  • filepath: folder to recursively iterate from (a file will be used directly).
  • bucket: The S3 bucket name only (ex: 'your-bucket').
  • region_name: The AWS region. NOTE: can alternatively set the AWS_REGION environment variable
  • aws_access_key_id: the AWS access key ID. NOTE: can alternatively set the AWS_ACCESS_KEY_ID environment variable
  • aws_secret_access_key: the AWS secret access key. NOTE: can alternatively set the AWS_SECRET_ACCESS_KEY environment variable
  • endpoint_url: the endpoint URL to use; only required for connecting to a locally hosted S3. NOTE: can alternatively set the AWS_ENDPOINT_URL_S3 environment variable
  • key_setter: sets the kafka message key for a record in the file.
  • value_setter: sets the kafka message value for a record in the file.
  • timestamp_setter: sets the kafka message timestamp for a record in the file.
  • file_format: what format the files are stored as (ex: "json").
  • compression: what compression was used on the files, if any (ex. "gzip").
  • has_partition_folders: whether files are nested within partition folders. If True, FileSource will match the output topic partition count with it. Set this flag to True if Quix Streams FileSink was used to dump data. Note: messages will only align with these partitions if original key is used. Example structure - a 2 partition topic (0, 1): [/topic/0/file_0.ext, /topic/0/file_1.ext, /topic/1/file_0.ext]
  • replay_speed: Produce messages with this speed multiplier, which roughly reflects the time "delay" between the original message producing. Use any float >= 0, where 0 is no delay, and 1 is the original speed. NOTE: Time delay will only be accurate per partition, NOT overall.
  • name: The name of the Source application (Default: last folder name).
  • shutdown_timeout: Time in seconds the application waits for the source to gracefully shut down.
  • on_client_connect_success: An optional callback made after successful client authentication, primarily for additional logging.
  • on_client_connect_failure: An optional callback made after failed client authentication (which should raise an Exception). Callback should accept the raised Exception as an argument. Callback must resolve (or propagate/re-raise) the Exception.

quixstreams.sources.community.file.compressions.gzip

quixstreams.sources.community.file.formats.json

JSONFormat

class JSONFormat(Format)

[VIEW SOURCE]



JSONFormat.__init__

def __init__(compression: Optional[CompressionName],
             loads: Optional[Callable[[str], dict]] = None)

[VIEW SOURCE]

Read a JSON-formatted file (along with decompressing it).


Arguments:

  • compression: the compression type used on the file
  • loads: A custom function to deserialize objects to the expected dict with {_key: str, _value: dict, _timestamp: int}.

quixstreams.sources.community.file.formats.parquet

quixstreams.sources.community.kinesis.kinesis

KinesisSource

class KinesisSource(StatefulSource)

[VIEW SOURCE]

NOTE: Requires pip install quixstreams[kinesis] to work.

This source reads data from an Amazon Kinesis stream, dumping it to a kafka topic using desired StreamingDataFrame-based transformations.

Provides "at-least-once" guarantees.

The incoming message value will be in bytes, so transform in your SDF accordingly.

Example Usage:

from quixstreams import Application
from quixstreams.sources.community.kinesis import KinesisSource


kinesis = KinesisSource(
    stream_name="<YOUR STREAM>",
    aws_access_key_id="<YOUR KEY ID>",
    aws_secret_access_key="<YOUR SECRET KEY>",
    aws_region="<YOUR REGION>",
    auto_offset_reset="earliest",  # start from the beginning of the stream (vs end)
)

app = Application(
    broker_address="<YOUR BROKER INFO>",
    consumer_group="<YOUR GROUP>",
)

sdf = app.dataframe(source=kinesis).print(metadata=True)
# YOUR LOGIC HERE!

if __name__ == "__main__":
    app.run()



KinesisSource.__init__

def __init__(
        stream_name: str,
        aws_region: Optional[str] = getenv("AWS_REGION"),
        aws_access_key_id: Optional[str] = getenv("AWS_ACCESS_KEY_ID"),
        aws_secret_access_key: Optional[str] = getenv("AWS_SECRET_ACCESS_KEY"),
        aws_endpoint_url: Optional[str] = getenv("AWS_ENDPOINT_URL_KINESIS"),
        shutdown_timeout: float = 10,
        auto_offset_reset: AutoOffsetResetType = "latest",
        max_records_per_shard: int = 1000,
        commit_interval: float = 5.0,
        retry_backoff_secs: float = 5.0,
        on_client_connect_success: Optional[
            ClientConnectSuccessCallback] = None,
        on_client_connect_failure: Optional[
            ClientConnectFailureCallback] = None)

[VIEW SOURCE]


Arguments:

  • stream_name: name of the desired Kinesis stream to consume.
  • aws_region: The AWS region. NOTE: can alternatively set the AWS_REGION environment variable
  • aws_access_key_id: the AWS access key ID. NOTE: can alternatively set the AWS_ACCESS_KEY_ID environment variable
  • aws_secret_access_key: the AWS secret access key. NOTE: can alternatively set the AWS_SECRET_ACCESS_KEY environment variable
  • aws_endpoint_url: the endpoint URL to use; only required for connecting to a locally hosted Kinesis. NOTE: can alternatively set the AWS_ENDPOINT_URL_KINESIS environment variable
  • shutdown_timeout:
  • auto_offset_reset: When no previous offset has been recorded, whether to start from the beginning ("earliest") or end ("latest") of the stream.
  • max_records_per_shard: During round-robin consumption, how many records to consume per shard (partition) per consume (NOT per-commit).
  • commit_interval: the time between commits
  • retry_backoff_secs: how long to back off from doing HTTP calls for a shard when Kinesis consumer encounters handled/expected errors.
  • on_client_connect_success: An optional callback made after successful client authentication, primarily for additional logging.
  • on_client_connect_failure: An optional callback made after failed client authentication (which should raise an Exception). Callback should accept the raised Exception as an argument. Callback must resolve (or propagate/re-raise) the Exception.

quixstreams.sources.community.pubsub.pubsub

PubSubSource

class PubSubSource(Source)

[VIEW SOURCE]

This source enables reading from a Google Cloud Pub/Sub topic, dumping it to a kafka topic using desired SDF-based transformations.

Provides "at-least-once" guarantees.

Currently, forwarding message keys ("ordered messages" in Pub/Sub) is unsupported.

The incoming message value will be in bytes, so transform in your SDF accordingly.

Example Usage:

from quixstreams import Application
from quixstreams.sources.community.pubsub import PubSubSource
from os import environ

source = PubSubSource(
    # Suggested: pass JSON-formatted credentials from an environment variable.
    service_account_json = environ["PUBSUB_SERVICE_ACCOUNT_JSON"],
    project_id="<project ID>",
    topic_id="<topic ID>",  # NOTE: NOT the full /x/y/z path!
    subscription_id="<subscription ID>",  # NOTE: NOT the full /x/y/z path!
    create_subscription=True,
)
app = Application(
    broker_address="localhost:9092",
    auto_offset_reset="earliest",
    consumer_group="gcp",
    loglevel="INFO"
)
sdf = app.dataframe(source=source).print(metadata=True)

if __name__ == "__main__":
    app.run()



PubSubSource.__init__

def __init__(project_id: str,
             topic_id: str,
             subscription_id: str,
             service_account_json: Optional[str] = None,
             commit_every: int = 100,
             commit_interval: float = 5.0,
             create_subscription: bool = False,
             enable_message_ordering: bool = False,
             shutdown_timeout: float = 10.0,
             on_client_connect_success: Optional[
                 ClientConnectSuccessCallback] = None,
             on_client_connect_failure: Optional[
                 ClientConnectFailureCallback] = None)

[VIEW SOURCE]


Arguments:

  • project_id: a Google Cloud project ID.
  • topic_id: a Pub/Sub topic ID (NOT the full path).
  • subscription_id: a Pub/Sub subscription ID (NOT the full path).
  • service_account_json: a Google Cloud Credentials JSON as a string Can instead use environment variables (which have different behavior):
  • "GOOGLE_APPLICATION_CREDENTIALS" set to a JSON filepath i.e. /x/y/z.json
  • "PUBSUB_EMULATOR_HOST" set to a URL if using an emulated Pub/Sub
  • commit_every: max records allowed to be processed before committing.
  • commit_interval: max allowed elapsed time between commits.
  • create_subscription: whether to attempt to create a subscription at startup; if it already exists, it instead logs its details (DEBUG level).
  • enable_message_ordering: When creating a Pub/Sub subscription, whether to allow message ordering. NOTE: does NOT affect existing subscriptions!
  • shutdown_timeout: How long to wait for a graceful shutdown of the source.
  • on_client_connect_success: An optional callback made after successful client authentication, primarily for additional logging.
  • on_client_connect_failure: An optional callback made after failed client authentication (which should raise an Exception). Callback should accept the raised Exception as an argument. Callback must resolve (or propagate/re-raise) the Exception.

quixstreams.sources.community.pandas

PandasDataFrameSource

class PandasDataFrameSource(Source)

[VIEW SOURCE]



PandasDataFrameSource.__init__

def __init__(df: pd.DataFrame,
             key_column: str,
             timestamp_column: str = None,
             delay: float = 0,
             shutdown_timeout: float = 10,
             keep_meta_as_values: bool = True,
             name: str = "pandas-dataframe-source") -> None

[VIEW SOURCE]

A source that reads data from a pandas.DataFrame and produces rows to a Kafka topic in JSON format.


Arguments:

  • df: the pandas.DataFrame object to read data from.
  • key_column: a column name that contains the messages keys. The values in dataframe[key_column] must be either strings or None.
  • timestamp_column: an optional argument to specify a dataframe column that contains the messages timestamps. The values in dataframe[timestamp_column] must be time in milliseconds as int. If empty, the current epoch will be used. Default - None
  • name: a unique name for the Source, used as a part of the default topic name. Default - "pandas-dataframe-source".
  • delay: an optional delay after producing each row for stream simulation. Default - 0.
  • shutdown_timeout: Time in seconds the application waits for the source to gracefully shut down.
  • keep_meta_as_values: Whether to keep metadata (timestamp_column and key_column) as-values data too. If True, timestamp and key columns are passed both as metadata and values in the message. If False, timestamp and key columns are passed only as the message's metadata. Default - True.



PandasDataFrameSource.run

def run()

[VIEW SOURCE]

Produces data from the DataFrame row by row.

quixstreams.sources.community.influxdb3.influxdb3

InfluxDB3Source

class InfluxDB3Source(Source)

[VIEW SOURCE]

InfluxDB3Source extracts data from a specified set of measurements in a database (or all available ones if none are specified).

It processes measurements sequentially by gathering/producing a tumbling "time_delta"-sized window of data, starting from 'start_date' and eventually stopping at 'end_date', completing that measurement.

It then starts the next measurement, continuing until all are complete.

If no 'end_date' is provided, it will run indefinitely for a single measurement (which means no other measurements will be processed!).



InfluxDB3Source.__init__

def __init__(
    host: str,
    token: str,
    organization_id: str,
    database: str,
    key_setter: Optional[Callable[[object], object]] = None,
    timestamp_setter: Optional[Callable[[object], int]] = None,
    start_date: datetime = datetime.now(tz=timezone.utc),
    end_date: Optional[datetime] = None,
    measurements: Optional[Union[str, list[str]]] = None,
    measurement_column_name: str = "_measurement_name",
    sql_query: Optional[str] = None,
    time_delta: str = "5m",
    delay: float = 0,
    max_retries: int = 5,
    name: Optional[str] = None,
    shutdown_timeout: float = 10,
    on_client_connect_success: Optional[ClientConnectSuccessCallback] = None,
    on_client_connect_failure: Optional[ClientConnectFailureCallback] = None
) -> None

[VIEW SOURCE]


Arguments:

  • host: Host URL of the InfluxDB instance.
  • token: Authentication token for InfluxDB.
  • organization_id: Organization name in InfluxDB.
  • database: Database name in InfluxDB.
  • key_setter: sets the kafka message key for a measurement record. By default, will set the key to the measurement's name.
  • timestamp_setter: sets the kafka message timestamp for a measurement record. By default, the timestamp will be the Kafka default (Kafka produce time).
  • start_date: The start datetime for querying InfluxDB. Uses current time by default.
  • end_date: The end datetime for querying InfluxDB. If none provided, runs indefinitely for a single measurement.
  • measurements: The measurements to query. If None, all measurements will be processed.
  • measurement_column_name: The column name used for appending the measurement name to the record.
  • sql_query: Custom SQL query for retrieving data. Query expects a {start_time}, {end_time}, and {measurement_name} for later formatting. If provided, it overrides the default window-query logic.
  • time_delta: Time interval for batching queries, e.g., "5m" for 5 minutes.
  • delay: An optional delay between producing batches.
  • name: A unique name for the Source, used as part of the topic name.
  • shutdown_timeout: Time in seconds to wait for graceful shutdown.
  • max_retries: Maximum number of retries for querying or producing. Note that consecutive retries have a multiplicative backoff.
  • on_client_connect_success: An optional callback made after successful client authentication, primarily for additional logging.
  • on_client_connect_failure: An optional callback made after failed client authentication (which should raise an Exception). Callback should accept the raised Exception as an argument. Callback must resolve (or propagate/re-raise) the Exception.