In order to reduce the amount of boilerplate code added to each of our samples, we developed the
App.run takes care of several small but important tasks in managing your Python apps. These include:
- Start subscribing to topics
- Handle termination
- Close streams and dispose topics
- Keep alive
Each of these is described in the following sections.
App.run() in your code, you will need this import:
In order to start receiving from a topic you need to make a call to the
subscribe() method. Your Python code won’t be able to receive any data from the broker if you have missed this step. It makes sense to add this call near the end of the code, just before your 'keep busy'
Your code might look something like this:
from quixstreams import TopicConsumer, StreamConsumer def on_stream_received_handler(stream_received: StreamConsumer): buffer = stream_received.timeseries.create_buffer() buffer.on_dataframe_released = on_dataframe_released_handler def on_dataframe_released_handler(stream: StreamConsumer, df: pd.DataFrame): print(df.to_string()) # ... some code setting up topic_consumer topic_consumer.on_stream_received = on_stream_received_handler topic_consumer.subscribe() while(True): # or other blocking call print('running')
App.run() you no longer need the call to
subscribe(), because it is called for you.
So your code would look like this instead:
from quixstreams import App, TopicConsumer, StreamConsumer import pandas as pd def on_stream_received_handler(stream_received: StreamConsumer): buffer = stream_received.timeseries.create_buffer() buffer.on_dataframe_released = on_dataframe_released_handler def on_dataframe_released_handler(stream: StreamConsumer, df: pd.DataFrame): print(df.to_string()) # ... some code setting up topic_consumer topic_consumer.on_stream_received = on_stream_received_handler App.run()
Termination signals such as
SIGQUIT could be used to break out of the
while loop used to keep your code listening for data. In order to listen for these you’d need to add some code like this:
import threading import signal import time # Handle graceful exit event = threading.Event() def signal_handler(sig, frame): # set the termination flag print('Setting termination flag') event.set() signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) while not event.is_set(): time.sleep(1) print('Exiting')
In this case, when the code runs, it will subscribe to and handle
SIGINT (an interrupt signal) and
SIGTERM (a termination signal). If either of these signals is observed, the code will call
signal_handler, and the
while loop will terminate enabling the code execution to come to an end.
App.run has this termination signal handling built in. It works on all popular platforms too.
App.run the above code becomes much simpler:
Ideally, when your code terminates it will close the streams it opened and tidy up any other resources it was using.
You can ensure the streams are closed by calling the following code just before your process terminates:
# dispose the topic(s) and close the stream(s) print('Closing streams...') topic_consumer.dispose() # Note the order. Producer should be closed after consumer topic_producer.dispose() # to avoid receiving data - possibly committing - but not being able to write. print('Closed streams...') # or using the with statement, if your code structure is suited for it with topic_producer, topic_consumer: # disposed in reverse order pass # do great stuff # disposed when going out of scope print('Closed streams...')
Here, you dispose of the topic consumer, which stops new data from being received, and then dispose of the topic producer. This code is easy and straightforward, however it is just more boilerplate that you have to remember.
App.run() encapsulates this and handles this for you, so the code above becomes:
When a topic is disposed, it is possible to run additional code by linking the
on_disposed event to an appropriate handler. The
on_disposed event occurs at the end of the topic disposal process.
Unless you add an infinite loop or similar code, a Python code file will run each code statement sequentially until the end of the file, and then exit.
In order to continuously handle data in your Python code, you need to prevent the code from terminating. There are several ways this could be achieved. For example, you could use an infinite
while loop to enable your code to run continuously. The following code will continuously print "running" until the
end_condition has been satisfied:
We used code similar to this in our Quix Library items for a while. However, once we'd seen the same pattern being used repeatedly, we decided to build this functionality into Quix Streams.
This is how you can use it in your code:
Bring it all together
You have seen how you could start subscribing to streams, handle termination signals, dispose of topics consumers and producers, and how to keep your code running.
To recap, here is an example of your code without using
App.run, all in one snippet:
import threading import signal import time # ... some code setting up topic_consumer and topic_producer topic_consumer.subscribe() # initiate read # Hook up to termination signal (for docker image) and CTRL-C print('Listening to streams. Press CTRL-C to exit.') # Below code is to handle graceful exit of the model. event = threading.Event() def signal_handler(sig, frame): # dispose the topic(s) and close the stream(s) print('Closing streams...') topic_consumer.dispose() topic_producer.dispose() print('Setting termination flag') event.set() signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) while not event.is_set(): time.sleep(1) print('Exiting')
If you use
App.run(), this is greatly simplified into the following much smaller snippet:
It's good practice to make sure that your code cleans up during the shutdown phase. Cleanup includes disposing of any resources you might have used, or indicating to external systems that they need to close or deallocate resources.
If you choose to implement the cleanup of other resources, or simply need to log something immediately before the code ends, you can configure
App.run() to call a function before it does all of the other built-in actions.
App.run provides the
before_shutdown hook to enable this facility. The following code provides an example of this:
In this snippet,
before_shutdown is called before the app shuts down. That is before the
while loop inside
App.run comes to an end. This enables you to close connections, tidy up, and log your last messages before the app terminates.
Triggering shutdown from code
In some cases you might want to trigger shutdown from code. You can do it using
from quixstreams import App, CancellationTokenSource import threading import time cts = CancellationTokenSource() # used for interrupting the App # setup shutdown after 5 seconds def timeout_callback(): time.sleep(5) cts.cancel() timeout_thread = threading.Thread(target=timeout_callback) timeout_thread.start() # Setup some prints and pass the token to App.run def before_shutdown(): print('before shutdown') print('Waiting 5 seconds') App.run(cts.token, before_shutdown=before_shutdown) print('exiting')