Skip to content

Google Cloud Pub/Sub Source

Info

This is a Community connector. Test it before using in production.

To learn more about differences between Core and Community connectors, see the Community and Core Connectors page.

This source enables reading from a Google Cloud Pub/Sub topic, dumping it to a kafka topic using desired StreamingDataFrame-based transformations.

How To Install

To use the Pub/Sub source, you need to install the required dependencies:

pip install quixstreams[pubsub]

How It Works

PubSubSource subscribes to a Pub/Sub subscription and produces its messages to a Kafka topic.

Messages are read in a streaming fashion and committed intermittently, offering at-least-once guarantees.

You can learn more details about read ordering, kafka message format, and message keys below.

How To Use

To use Pub/Sub Source, hand PubSubSource to app.dataframe().

For more details around various settings, see configuration.

from quixstreams import Application
from quixstreams.sources.community.pubsub import PubSubSource
from os import environ

source = PubSubSource(
    # Suggested: pass JSON-formatted credentials from an environment variable.
    service_account_json = environ["PUBSUB_SERVICE_ACCOUNT_JSON"],
    project_id="<project ID>",
    topic_id="<topic ID>",  # NOTE: NOT the full /x/y/z path!
    subscription_id="<subscription ID>",  # NOTE: NOT the full /x/y/z path!
    create_subscription=True,
)
app = Application(
    broker_address="localhost:9092",
    auto_offset_reset="earliest",
    consumer_group="gcp",
    loglevel="INFO"
)
sdf = app.dataframe(source=source).print(metadata=True)

if __name__ == "__main__":
    app.run()

Configuration

Here are some important configurations to be aware of (see PubSub Source API for all parameters).

Required:

  • project_id: a Google Cloud project ID.
  • topic_id: a Pub/Sub topic ID (NOT the full path).
  • subscription_id: a Pub/Sub subscription ID (NOT the full path).
  • service_account_json: Google Cloud Credentials JSON as a string
    • Though "optional", you MUST either use this OR set one of the following environment variables (which have different behavior):
      • GOOGLE_APPLICATION_CREDENTIALS set to a JSON filepath i.e. /x/y/z.json
      • PUBSUB_EMULATOR_HOST set to a URL if using an emulated Pub/Sub

Optional:

  • create_subscription: whether to attempt to create a subscription at startup; if it already exists, it instead logs its details at DEBUG level.
    default value: False
  • enable_message_ordering: When creating a Pub/Sub subscription, whether to allow message ordering. NOTE: does NOT affect existing subscriptions!
    default value: False

Message Keys

If a Pub/Sub message was published as an ordered message, it will contain a message key, else an empty string.

Pub/Sub Message keys will be used as the Kafka message key regardless of the Pub/Sub's enable_message_ordering subscription setting.

The message read order depends on the subscription setting.

Message Read Ordering

Message read order depends on:

  1. a Pub/Sub message being published as an ordered message (has a key)
  2. a Pub/Sub subscription enabling ordered messages (set at subscription creation)

Pub/Sub Source cannot change any existing subscription settings, including ones generated by itself!

Assuming proper permissions, new subscriptions can be generated using create_subscription=True, with ordering enabled via enable_message_ordering=True (they must be set simultaneously else ordering won't work, due to above).

Message Data Format/Schema

This is the default format of messages handled by Application:

Message keys will be the "ordering_key" string (non-ordered messages will be empty strings).

Message values will be the "data" in bytes, so transform accordingly with your SDF as desired.

Message timestamp will reflect original Pub/Sub message "publish_time" (ms).

Message headers will contain all "attribute" (metadata) fields.

Processing/Delivery Guarantees

The Pub/Sub Source offers "at-least-once" guarantees: there is no confirmation that message acknowledgements for the Pub/Sub Subscriber succeeded.

As such, in rare circumstances where acknowledgement ends up failing, messages may be processed (produced) more than once (and additionally, out of their original order, regardless of ordering settings).

Topic

The default topic name the Application dumps to is gcp-pubsub_{subscription_name}_{topic_name}.

Testing Locally

Rather than connect to Google Cloud, you can alternatively test your application using a local "emulated" Pub/Sub host via docker:

  1. DO NOT pass a service_account_json to PubSubSource, instead set environment variable:

    PUBSUB_EMULATOR_HOST=localhost:8085

  2. execute in terminal:

    docker run -d --name pubsub-emulator -p 8085:8085 gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators gcloud beta emulators pubsub start --host-port=0.0.0.0:8085