2. Event Detection
Danger
This tutorial is out of date. Please check the tutorials overview for our latest tutorials.
In this part of the tutorial you build the event detection service. This service runs an ML model to detect if a vehicle has been involved in an accident.
For the purposes of this tutorial, the ML model you use was trained to detect the difference between a phone shaking and it being used normally.
Using an ML model is not the only way to create the event detection - you could detect a change in the speed or use the speed and another parameter to determine if an event has occurred.
The starter template
Follow these steps to create the event detection service:
-
Navigate to
Code Samples
and search forStarter transformation
. -
Ensure you have located the Python starter transformation and click
Preview code
. -
Click
Edit code
. -
Change the name to
Crash event detection
. -
Enter
phone-data
into the input field. -
Enter
phone-out
into the output field. -
Click
Save as Application
.
You now have the basic template for the service saved to your environment.
Test the template
At this stage you should test the code to make sure it passes some basic functional tests:
-
Ensure that the data source you deployed earlier is running.
-
Click the
Run
button in the top right of your browser. -
Explore the
Console
andMessages
tabs and verify that there is data arriving into thephone-data
topic. -
Stop the code from running once you are finished investigating the tabs.
Next you add code to detect the crash events, making use of some code snippets and Python libraries.
Modify requirements.txt
You need to modify the requirements.txt
file in order to make sure required packages are installed. Follow these steps:
-
Open the
requirements.txt
file and add the following lines to ensure all the packages are installed: -
Save the
requirements.txt
file.
Modify dockerfile
The standard dockerfile
can't be used in this case. You need one that includes libgomp1
.
-
Now update the
dockerfile
in thebuild
folder to make surelibgomp1
is included.Under the line with
COPY --from=git /project .
, add the following:This will install
libgomp1
which a requirement ofXGBoost
.The completed
dockerfile
should look like thisFROM quixpythonbaseimage ENV DEBIAN_FRONTEND="noninteractive" ENV PYTHONUNBUFFERED=1 ENV PYTHONIOENCODING=UTF-8 ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt WORKDIR /app COPY --from=git /project . RUN apt-get -y install libgomp1 ca-certificates RUN find | grep requirements.txt | xargs -I '{}' python3 -m pip install -i http://pip-cache.pip-cache.svc.cluster.local/simple --trusted-host pip-cache.pip-cache.svc.cluster.local -r '{}' --extra-index-url https://pypi.org/simple --extra-index-url https://pkgs.dev.azure.com/quix-analytics/53f7fe95-59fe-4307-b479-2473b96de6d1/_packaging/public/pypi/simple/ ENTRYPOINT python3 main.py
-
Save
dockerfile
.
Modify main.py
-
Open
main.py
and add these lines to the existing imports. -
After the import statements, add the following lines:
# download the model with urllib f = request.urlopen("https://quixtutorials.blob.core.windows.net/tutorials/event-detection/XGB_model.pkl") with open("XGB_model.pkl", "wb") as model_file: model_file.write(f.read()) # load it with pickle loaded_model = pickle.load(open("XGB_model.pkl", 'rb'))
This code downloads the
.pkl
file (the pretrained model) from the Quix storage account and load the model into memory. -
Modify the
on_dataframe_received_handler
function as follows:def on_dataframe_received_handler(stream_consumer: qx.StreamConsumer, df: pd.DataFrame): # Transform data frame here in this method. You can filter data or add new features. # Pass modified data frame to output stream using stream producer. # Set the output stream id to the same as the input stream or change it, # if you grouped or merged data with different key. stream_producer = topic_producer.get_or_create_stream(stream_id = stream_consumer.stream_id) if "gForceX" in df: df["gForceTotal"] = df["gForceX"].abs() + df["gForceY"].abs() + df["gForceZ"].abs() df["shaking"] = loaded_model.predict(df[["gForceZ", "gForceY", "gForceX", "gForceTotal"]]) if df["shaking"].max() == 1: print("Crash detected.") stream_producer.events.add_timestamp_nanoseconds(df.iloc[0]["timestamp"]) \ .add_value("crash", "Crash detected.") \ .publish()
This code opens the publish stream, and then checks that the required data is in the DataFrame. It then uses the ML model with the g-force data to determine if shaking has occurred.
If shaking is detected, an event is generated and published to the publisher topic.
-
Save
main.py
.
The completed main.py
should look like this
import quixstreams as qx
import os
import pandas as pd
import pickle
from urllib import request
# download the model with urllib
f = request.urlopen("https://quixtutorials.blob.core.windows.net/tutorials/event-detection/XGB_model.pkl")
with open("XGB_model.pkl", "wb") as model_file:
model_file.write(f.read())
# load it with pickle
loaded_model = pickle.load(open("XGB_model.pkl", 'rb'))
client = qx.QuixStreamingClient()
topic_consumer = client.get_topic_consumer(os.environ["input"], consumer_group = "empty-transformation")
topic_producer = client.get_topic_producer(os.environ["output"])
def on_dataframe_received_handler(stream_consumer: qx.StreamConsumer, df: pd.DataFrame):
# Transform data frame here in this method. You can filter data or add new features.
# Pass modified data frame to output stream using stream producer.
# Set the output stream id to the same as the input stream or change it,
# if you grouped or merged data with different key.
stream_producer = topic_producer.get_or_create_stream(stream_id = stream_consumer.stream_id)
if "gForceX" in df:
df["gForceTotal"] = df["gForceX"].abs() + df["gForceY"].abs() + df["gForceZ"].abs()
df["shaking"] = loaded_model.predict(df[["gForceZ", "gForceY", "gForceX", "gForceTotal"]])
if df["shaking"].max() == 1:
print("Crash detected.")
stream_producer.events.add_timestamp_nanoseconds(df.iloc[0]["timestamp"]) \
.add_value("crash", "Crash detected.") \
.publish()
# Handle event data from samples that emit event data
def on_event_data_received_handler(stream_consumer: qx.StreamConsumer, data: qx.EventData):
print(data)
# handle your event data here
def on_stream_received_handler(stream_consumer: qx.StreamConsumer):
# subscribe to new DataFrames being received
# if you aren't familiar with DataFrames there are other callbacks available
stream_consumer.events.on_data_received = on_event_data_received_handler # register the event data callback
stream_consumer.timeseries.on_dataframe_received = on_dataframe_received_handler
# subscribe to new streams being received
topic_consumer.on_stream_received = on_stream_received_handler
print("Listening to streams. Press CTRL-C to exit.")
# Handle termination signals and provide a graceful exit
qx.App.run()
Test again
You can once again run the code in the development environment to test the functionality:
-
Ensure that the data source you deployed earlier is running.
-
Click
Run
in the top right of the browser to run the event detection code. -
If you chose to stream live data then gently shake your phone.
-
If you chose to use CSV data, wait for a crash event to be streamed from the data set, or stop and start the service in another browser tab.
-
Observe the
Console
tab. You should see a message saying "Crash detected". -
On the
Messages
tab selectoutput : phone-out
from the first dropdown. -
Gently shake your phone, or wait for another crash event from the CSV data, and observe that crash events are streamed to the output topic. You can click these rows to investigate the event data, for example:
-
Stop the code.
Success
The crash detection service is working as expected and can now be deployed
Deploy crash detection
Now that you have verified the service is working you can go ahead and deploy the service:
-
Tag the code by clicking the
add tag
icon at the top of the code panel. -
Enter a tag such as
crash-v1
. -
Now click the
Deploy
button near the top right of the code panel. -
From the
Version tag
dropdown, select the tag you created. -
Click
Deploy
.
Success
You now have a data source and the crash detection service running in your environment.
Next you’ll deploy a real-time UI to visualize the route being taken, the location of any crash events and also to see some of the sensor data.