Google Cloud Pub/Sub Source
Info
This is a Community connector. Test it before using in production.
To learn more about differences between Core and Community connectors, see the Community and Core Connectors page.
This source enables reading from a Google Cloud Pub/Sub topic, dumping it to a
kafka topic using desired StreamingDataFrame
-based transformations.
How To Install
To use the Pub/Sub source, you need to install the required dependencies:
How It Works
PubSubSource
subscribes to a Pub/Sub subscription and produces its messages to a Kafka topic.
Messages are read in a streaming fashion and committed intermittently, offering at-least-once guarantees.
You can learn more details about read ordering, kafka message format, and message keys below.
How To Use
To use Pub/Sub Source, hand PubSubSource
to app.dataframe()
.
For more details around various settings, see configuration.
from quixstreams import Application
from quixstreams.sources.community.pubsub import PubSubSource
from os import environ
source = PubSubSource(
# Suggested: pass JSON-formatted credentials from an environment variable.
service_account_json = environ["PUBSUB_SERVICE_ACCOUNT_JSON"],
project_id="<project ID>",
topic_id="<topic ID>", # NOTE: NOT the full /x/y/z path!
subscription_id="<subscription ID>", # NOTE: NOT the full /x/y/z path!
create_subscription=True,
)
app = Application(
broker_address="localhost:9092",
auto_offset_reset="earliest",
consumer_group="gcp",
loglevel="INFO"
)
sdf = app.dataframe(source=source).print(metadata=True)
if __name__ == "__main__":
app.run()
Configuration
Here are some important configurations to be aware of (see PubSub Source API for all parameters).
Required:
project_id
: a Google Cloud project ID.topic_id
: a Pub/Sub topic ID (NOT the full path).subscription_id
: a Pub/Sub subscription ID (NOT the full path).service_account_json
: Google Cloud Credentials JSON as a string- Though "optional", you MUST either use this OR set one of the following
environment variables (which have different behavior):
GOOGLE_APPLICATION_CREDENTIALS
set to a JSON filepath i.e./x/y/z.json
PUBSUB_EMULATOR_HOST
set to a URL if using an emulated Pub/Sub
- Though "optional", you MUST either use this OR set one of the following
environment variables (which have different behavior):
Optional:
create_subscription
: whether to attempt to create a subscription at startup; if it already exists, it instead logs its details atDEBUG
level.
default value:False
enable_message_ordering
: When creating a Pub/Sub subscription, whether to allow message ordering. NOTE: does NOT affect existing subscriptions!
default value:False
Message Keys
If a Pub/Sub message was published as an ordered message, it will contain a message key, else an empty string.
Pub/Sub Message keys will be used as the Kafka message key
regardless of the Pub/Sub's enable_message_ordering
subscription setting.
The message read order depends on the subscription setting.
Message Read Ordering
Message read order depends on:
- a Pub/Sub message being published as an ordered message (has a key)
- a Pub/Sub subscription enabling ordered messages (set at subscription creation)
Pub/Sub Source cannot change any existing subscription settings, including ones generated by itself!
Assuming proper permissions, new subscriptions can be generated using create_subscription=True
,
with ordering enabled via enable_message_ordering=True
(they must be set simultaneously
else ordering won't work, due to above).
Message Data Format/Schema
This is the default format of messages handled by Application
:
Message keys will be the "ordering_key" string (non-ordered messages will be empty strings).
Message values will be the "data" in bytes, so transform accordingly with your SDF as desired.
Message timestamp will reflect original Pub/Sub message "publish_time" (ms).
Message headers will contain all "attribute" (metadata) fields.
Processing/Delivery Guarantees
The Pub/Sub Source offers "at-least-once" guarantees: there is no confirmation that message acknowledgements for the Pub/Sub Subscriber succeeded.
As such, in rare circumstances where acknowledgement ends up failing, messages may be processed (produced) more than once (and additionally, out of their original order, regardless of ordering settings).
Topic
The default topic name the Application dumps to is gcp-pubsub_{subscription_name}_{topic_name}
.
Testing Locally
Rather than connect to Google Cloud, you can alternatively test your application using a local "emulated" Pub/Sub host via docker:
-
DO NOT pass a
service_account_json
toPubSubSource
, instead set environment variable:PUBSUB_EMULATOR_HOST=localhost:8085
-
execute in terminal:
docker run -d --name pubsub-emulator -p 8085:8085 gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators gcloud beta emulators pubsub start --host-port=0.0.0.0:8085