Skip to content

Quix Platform API

quixstreams.platforms.quix.api

QuixPortalApiService

class QuixPortalApiService()

[VIEW SOURCE]

A light wrapper around the Quix Portal Api. If used in the Quix Platform, it will use that workspaces auth token and portal endpoint, else you must provide it.

Function names closely reflect the respective API endpoint, each starting with the method [GET, POST, etc.] followed by the endpoint path.

Results will be returned in the form of request's Response.json(), unless something else is required. Non-200's will raise exceptions.

See the swagger documentation for more info about the endpoints.



QuixPortalApiService.get_workspace_certificate

def get_workspace_certificate(
        workspace_id: Optional[str] = None) -> Optional[bytes]

[VIEW SOURCE]

Get a workspace TLS certificate if available.

Returns None if certificate is not specified.


Arguments:

  • workspace_id: workspace id, optional


Returns:

certificate as bytes if present, or None

quixstreams.platforms.quix.config

TopicCreationConfigs

@dataclasses.dataclass
class TopicCreationConfigs()

[VIEW SOURCE]



name

Required when not created by a Quix App.



strip_workspace_id_prefix

def strip_workspace_id_prefix(workspace_id: str, s: str) -> str

[VIEW SOURCE]

Remove the workspace ID from a given string if it starts with it,

typically a topic or consumer group id


Arguments:

  • workspace_id: the workspace id
  • s: the string to append to


Returns:

the string with workspace_id prefix removed



prepend_workspace_id

def prepend_workspace_id(workspace_id: str, s: str) -> str

[VIEW SOURCE]

Add the workspace ID as a prefix to a given string if it does not have it,

typically a topic or consumer group it


Arguments:

  • workspace_id: the workspace id
  • s: the string to append to


Returns:

the string with workspace_id prepended

QuixKafkaConfigsBuilder

class QuixKafkaConfigsBuilder()

[VIEW SOURCE]

Retrieves all the necessary information from the Quix API and builds all the objects required to connect a confluent-kafka client to the Quix Platform.

If not executed within the Quix platform directly, you must provide a Quix "streaming" (aka "sdk") token, or Personal Access Token.

Ideally you also know your workspace name or id. If not, you can search for it using a known topic name, but note the search space is limited to the access level of your token.

It also currently handles the app_auto_create_topics setting for Application.Quix.



QuixKafkaConfigsBuilder.__init__

def __init__(quix_portal_api_service: Optional[QuixPortalApiService] = None,
             workspace_id: Optional[str] = None,
             workspace_cert_path: Optional[str] = None)

[VIEW SOURCE]


Arguments:

  • quix_portal_api_service: A QuixPortalApiService instance (else generated)
  • workspace_id: A valid Quix Workspace ID (else searched for)
  • workspace_cert_path: path to an existing workspace cert (else retrieved)



QuixKafkaConfigsBuilder.strip_workspace_id_prefix

def strip_workspace_id_prefix(s: str) -> str

[VIEW SOURCE]

Remove the workspace ID from a given string if it starts with it,

typically a topic or consumer group id


Arguments:

  • s: the string to append to


Returns:

the string with workspace_id prefix removed



QuixKafkaConfigsBuilder.prepend_workspace_id

def prepend_workspace_id(s: str) -> str

[VIEW SOURCE]

Add the workspace ID as a prefix to a given string if it does not have it,

typically a topic or consumer group it


Arguments:

  • s: the string to append to


Returns:

the string with workspace_id prepended



QuixKafkaConfigsBuilder.search_for_workspace

def search_for_workspace(
        workspace_name_or_id: Optional[str] = None) -> Optional[dict]

[VIEW SOURCE]

Search for a workspace given an expected workspace name or id.


Arguments:

  • workspace_name_or_id: the expected name or id of a workspace


Returns:

the workspace data dict if search success, else None



QuixKafkaConfigsBuilder.get_workspace_info

def get_workspace_info(known_workspace_topic: Optional[str] = None)

[VIEW SOURCE]

Queries for workspace data from the Quix API, regardless of instance cache,

and updates instance attributes from query result.


Arguments:

  • known_workspace_topic: a topic you know to exist in some workspace



QuixKafkaConfigsBuilder.search_workspace_for_topic

def search_workspace_for_topic(workspace_id: str, topic: str) -> Optional[str]

[VIEW SOURCE]

Search through all the topics in the given workspace id to see if there is a

match with the provided topic.


Arguments:

  • workspace_id: the workspace to search in
  • topic: the topic to search for


Returns:

the workspace_id if success, else None



QuixKafkaConfigsBuilder.search_for_topic_workspace

def search_for_topic_workspace(topic: str) -> Optional[dict]

[VIEW SOURCE]

Find what workspace a topic belongs to.

If there is only one workspace altogether, it is assumed to be the workspace. More than one means each workspace will be searched until the first hit.


Arguments:

  • topic: the topic to search for


Returns:

workspace data dict if topic search success, else None



QuixKafkaConfigsBuilder.get_workspace_ssl_cert

def get_workspace_ssl_cert(
        extract_to_folder: Optional[Path] = None) -> Optional[str]

[VIEW SOURCE]

Gets and extracts zipped certificate from the API to provided folder if the

SSL certificate is specified in broker configuration.

If no path was provided, will dump to /tmp. Expects cert named 'ca.cert'.


Arguments:

  • extract_to_folder: path to folder to dump zipped cert file to


Returns:

full cert filepath as string or None if certificate is not specified



QuixKafkaConfigsBuilder.create_topics

def create_topics(topics: List[Topic],
                  finalize_timeout_seconds: Optional[int] = None)

[VIEW SOURCE]

Create topics in a Quix cluster.


Arguments:

  • topics: a list of Topic objects
  • finalize_timeout_seconds: How long to wait for the topics to be marked as "Ready" (and thus ready to produce to/consume from).



QuixKafkaConfigsBuilder.confirm_topics_exist

def confirm_topics_exist(topics: Union[List[Topic], List[str]])

[VIEW SOURCE]

Confirm whether the desired set of topics exists in the Quix workspace.


Arguments:

  • topics: a list of Topic or topic names



QuixKafkaConfigsBuilder.get_confluent_broker_config

def get_confluent_broker_config(known_topic: Optional[str] = None) -> dict

[VIEW SOURCE]

Get the full client config dictionary required to authenticate a confluent-kafka

client to a Quix platform broker/workspace.

The returned config can be used directly by any confluent-kafka-python consumer/ producer (add your producer/consumer-specific configs afterward).


Arguments:

  • known_topic: a topic known to exist in some workspace


Returns:

a dict of confluent-kafka-python client settings (see librdkafka config for more details)



QuixKafkaConfigsBuilder.get_confluent_client_configs

def get_confluent_client_configs(
    topics: list,
    consumer_group_id: Optional[str] = None
) -> Tuple[dict, List[str], Optional[str]]

[VIEW SOURCE]

Get all the values you need in order to use a confluent_kafka-based client

with a topic on a Quix platform broker/workspace.

The returned config can be used directly by any confluent-kafka-python consumer/ producer (add your producer/consumer-specific configs afterward).

The topics and consumer group are appended with any necessary values.


Arguments:

  • topics: list of topics
  • consumer_group_id: consumer group id, if needed


Returns:

a tuple with configs and altered versions of the topics and consumer group name

quixstreams.platforms.quix.env

QuixEnvironment

class QuixEnvironment()

[VIEW SOURCE]

Class to access various Quix platform environment settings



QuixEnvironment.state_management_enabled

@property
def state_management_enabled() -> bool

[VIEW SOURCE]

Check whether "State management" is enabled for the current deployment


Returns:

True if state management is enabled, otherwise False



QuixEnvironment.deployment_id

@property
def deployment_id() -> Optional[str]

[VIEW SOURCE]

Return current Quix deployment id.

This variable is meant to be set only by Quix Platform and only when the application is deployed.


Returns:

deployment id or None



QuixEnvironment.workspace_id

@property
def workspace_id() -> Optional[str]

[VIEW SOURCE]

Return Quix workspace id if set


Returns:

workspace id or None



QuixEnvironment.portal_api

@property
def portal_api() -> Optional[str]

[VIEW SOURCE]

Return Quix Portal API url if set


Returns:

portal API URL or None



QuixEnvironment.sdk_token

@property
def sdk_token() -> Optional[str]

[VIEW SOURCE]

Return Quix SDK token if set


Returns:

sdk token or None



QuixEnvironment.state_dir

@property
def state_dir() -> str

[VIEW SOURCE]

Return application state directory on Quix.


Returns:

path to state dir