Add a CPU load threshold detection transform
You now add a transform to detect when CPU threshold is exceeded. Click Add new
on the output of your external source, and add the Starter transformation
.
You can use the defaults, or rename your transform to something like CPU Threshold
.
Then click on Edit code
. You can rename the output topic to cpu-threshold-transform
.
You'll replace the code in main.py
with the following:
import os
from quixstreams import Application
app = Application()
input_topic = app.topic(os.environ["input"])
output_topic = app.topic(os.environ["output"])
sdf = app.dataframe(input_topic)
# Filter in all rows where CPU load is over 20.
sdf = sdf.filter(lambda row: row["cpu_load"] > 20)
# Build an alert payload
sdf = sdf.apply(lambda row: {
"summary": "CPU overload",
"source": "custom_event",
"severity": "critical",
"custom_details": {
"timestamp": row["timestamp"],
"message": "CPU value is " + str(row["cpu_load"])
}
})
sdf = sdf.to_topic(output_topic)
if __name__ == "__main__":
app.run(sdf)
Here, a very simple filter function checks if the inbound data contains a CPU load above a fixed limit (set to 20 here for ease of testing). The filter filters in all rows where CPU is over the threshold.
You can test the application is running by loading some CPU intensive apps on your laptop. When the threshold is exceeded it will send a message of the following format to the output topic:
{
"summary": "CPU overload",
"source": "custom_event",
"severity": "critical",
"custom_details": {
"timestamp": 1710947291392758000,
"message": "CPU value is 25.1"
}
}
Windowing
While CPU spikes might be acceptable in the short term, they might be more concerning if such levels are sustained over a longer period of time. For detecting such a condition, an aggregation using a tumbling window could be implemented. Let's say you want to raise an alert if the CPU level exceeds a certain average level over some time period. You could use a time-based windowing function such as illustrated by the following code example:
import os
from quixstreams import Application
from datetime import timedelta
app = Application()
input_topic = app.topic(os.environ["input"])
output_topic = app.topic(os.environ["output"])
sdf = app.dataframe(input_topic)
sdf = sdf.apply(lambda row: row["cpu_load"]) \
.tumbling_window(timedelta(seconds=10)).mean().final()
# Filter all rows where CPU load is over 20.
sdf = sdf.filter(lambda row: row["cpu_load"] > 20)
sdf["window_duration_s"] = (sdf["end"] - sdf["start"]) / 1000
# Produce message payload with alert.
sdf = sdf.apply(lambda row: {
"summary": "Windowed CPU overload",
"source": "custom_event",
"severity": "critical",
"custom_details": {
"timestamp": row["end"],
"message": f"CPU {row["cpu_load"]} for duration of {row["window_duration_s"]} seconds."
}
})
sdf = sdf.to_topic(output_topic)
if __name__ == "__main__":
app.run(sdf)
Replace the code in main.py
with the windowing code, if you want to test that out.
Advanced version
Version of code that sends the alert only once:
import os
from quixstreams import Application, State
from datetime import timedelta
app = Application()
input_topic = app.topic(os.environ["input"])
output_topic = app.topic(os.environ["output"])
sdf = app.dataframe(input_topic)
sdf = sdf.apply(lambda row: row["cpu_load"]) \
.tumbling_window(timedelta(seconds=10)).mean().final()
sdf["window_duration_s"] = (sdf["end"] - sdf["start"]) / 1000
def is_alert(row: dict, state: State):
is_alert_sent_state = state.get("is_alert_sent", False)
if row["cpu_load"] > 20:
if not is_alert_sent_state:
state.set("is_alert_sent", True)
return True
else:
return False
else:
state.set("is_alert_sent", False)
return False
sdf = sdf.filter(is_alert, stateful=True)
# Produce message payload with alert.
sdf = sdf.apply(lambda row: {
"summary": "CPU overload",
"source": "custom_event",
"severity": "critical",
"custom_details": {
"timestamp": row["timestamp"],
"message": "CPU value is " + str(row["cpu_load"])
}
})
sdf = sdf.to_topic(output_topic)
if __name__ == "__main__":
app.run(sdf)