Process - threshold detection
In this part of the tour you'll learn how to create a threshold detection transform. The transform detects if CPU load exceeds a certain threshold, and if so, sends a message to its output topic.
Create the threshold detection transform
To create the threshold detection transform:
- In your
Develop
environment, click onCode Samples
in the main left-hand navigation. - Select the
Python
,Transformation
, andBasic templates
filters. - For
Starter transformation
clickPreview code
. - Click
Edit code
. - Name the transform "CPU Threshold".
- Select the input topic
cpu-load
. - For the output topic, add a new topic called
cpu-spike
. - In the application view, click on
main.py
to edit it. -
Replace all the code in
main.py
with the following:from quixstreams import Application import os # Create an Application # It will get the SDK token from environment variables to connect to Quix Kafka app = Application() # Define input and output topics input_topic = app.topic(os.environ["input"]) output_topic = app.topic(os.environ["output"]) # Create a StreamingDataFrame to process data sdf = app.dataframe(input_topic) # Filter in all rows where CPU load is over 25. sdf = sdf.filter(lambda row: row["cpu_load"] > 25) # Produce message payload with alert. sdf = sdf.apply(lambda row: "CPU value is " + str(row["cpu_load"])) # Print messages to the console sdf = sdf.update(lambda row: print(row)) # Send messages to the output topic sdf = sdf.to_topic(output_topic) if __name__ == "__main__": # Run the Application app.run(sdf)
-
Tag the project as
process-v1
and deploy as a service. - Monitor the logs for the deployed process.
If CPU load exceeds the threshold the message is published to the output topic, for further processing in the next stage of the pipeline.