Tutorial: Word Counts
We will build a simple word counter, which is a great introduction to Quix Streams and Kafka!
What You Will Learn
This example will show how to use a Quix Streams Application
to:
- Ingest a non-Kafka data source
- Do simple event alterations
- Generate multiple events from a single event
- Filter any undesired events
- Create a Kafka topic
- Produce results, with new Kafka keys, to a topic
Outline of the Problem
Imagine we are a company with various products.
We want to process text reviews for our various products in real time and see what words are the most popular across all of them.
We need an application that will split reviews into individual words, and then send the counts of each individually downstream for further processing.
Our Example
We will use a Quix Streams Source
to generate text to be processed by our
new Word Counter Application
.
Note
Our example uses JSON
formatting for Kafka message values.
Event Expansion
The most important concept we want to explore here is how you can "expand" a single event/message into multiple new ones.
More than that: each new event generated via expansion is processed individually through the remainder of your pipeline, allowing you to write ALL your operations like they are handling a single event...because they are!
NOTE: Expanding often includes adjusting outgoing Kafka keys as well, so we additionally showcase that.
Before Getting Started
-
You will see links scattered throughout this tutorial.
- Tutorial code links are marked >>> LIKE THIS <<< .
- All other links provided are completely optional.
- They are great ways to learn more about various concepts if you need it!
-
This tutorial uses a
Source
rather than a KafkaTopic
to ingest data.Source
connectors enable reading data from a non-Kafka origin (typically to get it into Kafka).- This approach circumvents users having to run a producer alongside the
Application
. - A
Source
is easily replaced with an actual Kafka topic (just pass aTopic
instead of aSource
).
Generating Text Data
Our >>> Word Counter Application <<< uses a Source
called ReviewGenerator
that generates a static set of "reviews", which are simply strings, where the key is the product name.
The incoming Kafka messages look like:
# ...
{kafka_key: 'product_one', kafka_value: "WOW I love product_one. It is the best."}
{kafka_key: 'product_two', kafka_value: "Fire the creator of product_two please."}
# etc...
Word Counter Application
Now let's go over the main()
portion of
our >>> Word Counter Application <<< in detail!
Create an Application
Create a Quix Streams Application, which is our constructor for everything!
We provide it our connection settings, consumer group (ideally unique per Application), and where the consumer group should start from on the (internal) Source topic.
Tip
Once you are more familiar with Kafka, we recommend learning more about auto_offset_reset.
Our Application
import os
from quixstreams import Application
app = Application(
broker_address=os.getenv("BROKER_ADDRESS", "localhost:9092"),
consumer_group="product_review_word_counter",
auto_offset_reset="earliest"
)
Specify Topics
Application.topic()
returns Topic
objects which are used by StreamingDataFrame
.
Create one for each topic used by your Application
.
Note
Any missing topics will be automatically created for you upon running an Application
.
Our Topics
We have one output topic, named product_review_word_counts
:
The StreamingDataFrame (SDF)
Now for the fun part: building our StreamingDataFrame, often shorthanded to "SDF".
SDF allows manipulating the message value in a dataframe-like fashion using various operations.
After initializing with either a Topic
or Source
, we continue reassigning to the
same sdf
variable as we add operations.
Note
A few StreamingDataFrame
operations are
"in-place",
like .print()
.
Initializing our SDF
First, we initialize our SDF with our ReviewGenerator
Source
,
which means we will be consuming data from a non-Kafka origin.
Tip
You can consume from a Kafka topic instead by passing a Topic
object
with app.dataframe(topic=<Topic>)
.
Let's go over the SDF operations in this example in detail.
Tokenizing Text
from collections import Counter
def tokenize_and_count(text):
words = Counter(text.lower().replace(".", " ").split()).items()
return words
sdf = sdf.apply(tokenize_and_count, expand=True)
This is where most of the magic happens!
We alter our text data with SDF.apply(F)
(F
should take your current
message value as an argument, and return your new message value):
our F
here is tokenize_and_count
.
Basically we do some fairly typical string normalization and count the words, resulting in (word, count) pairs.
This effectively turns this:
>>> "Bob likes bananas and Frank likes apples."
to this:
>>> [('bob', 1), ('likes', 2), ('bananas', 1), ('and', 1), ('frank', 1), ('apples', 1)]
expand=True
argument details
-
It tells SDF "hey, this .apply() returns multiple independent events!"
-
The
F
returns alist
(or a non-dict iterable of some kind), hence the "expand"!
Filtering Expanded Results
def should_skip(word_count_pair):
word, count = word_count_pair
return word not in ['i', 'a', 'we', 'it', 'is', 'and', 'or', 'the']
sdf = sdf.filter(should_skip)
Now we filter out some "filler" words using SDF.filter(F)
, where F
is our should_skip
function.
For SDF.filter(F)
, if the (boolean-ed) return value of F
is:
-
True
-> continue processing this event -
False
-> stop ALL further processing of this event (including produces!)
Remember that each word is an independent event now due to our previous expand, so our
F
expects only a single word count pair.
With this filter applied, our "and" event is removed:
>>> [('bob', 1), ('likes', 2), ('bananas', 1), ('frank', 1), ('apples', 1)]
Producing Events With New Keys
Finally, we produce each event downstream (they will be independent messages)
via SDF.to_topic(T)
, where T
is our previously defined Topic
(not the topic name!).
Notice here the optional key
argument, which allows you to provide a custom key generator.
While it's fairly common to maintain the input event's key (SDF's default behavior), there are also many reasons why you might adjust it, so we showcase an example of that here!
Why change the key to the word being counted?
This key change would enable calculating total word counts over time from this topic without additional data transformations (a more advanced operation).
Even though we won't do that in this example, you can imagine doing so in a
downstream Application
!
In the end we would produce 5 messages in total, like so:
# two shown here...
{kafka_key: "bob", kafka_value: ["bob", 1]}
{kafka_key: "likes", kafka_value: ["likes", 2]}
# etc...
Note
This is a user-friendly representation of how a message key/value in the Kafka topic
product_review_word_counts
would appear.
Running the Application
Running a Source
-based Application
requires calling Application.run()
within a
if __name__ == "__main__"
block.
Our Application Run Block
Our entire Application
(and all its spawned objects) resides within a
main()
function, executed as required:
Try it Yourself!
1. Run Kafka
First, have a running Kafka cluster.
To easily run a broker locally with Docker, just run this simple one-liner.
2. Download files
3. Install Quix Streams
In your desired python environment, execute: pip install quixstreams
4. Run the application
In your desired python environment, execute: python tutorial_app.py
.
5. Check out the results!
...but wait, I don't see any Application
processing output...Is it working???
One thing to keep in mind is that the Quix Streams does not log/print any message processing operations by default.
To get visual outputs around message processing, you can either:
-
use
DEBUG
mode viaApplication(loglevel="DEBUG")
Danger
you should NOT run your applications in
DEBUG
mode in production.
Related topics - Data Aggregation
If you were interested in learning how to aggregate across events as we hinted at with our key changes, check out how easy it is with either SDF's stateful functions or windowing, depending on your use case!