In database terminology primary key refers to the column in a table that's intended to be the primary way of identifying rows. Each table must have exactly one, and it needs to be unique. This is usually some kind of a unique identifier associated with objects presented by the table, or if such an identifier doesn't exist simply a running ID number (which is incremented automatically).
Implementing API Clients¶
In this final exercise we're visiting the other side of the API puzzle: the clients. This exercise material has two parts: in the first one we make an automated machine client (Python script). In the second part we'll go over another means of communication between services using task queues with RabbitMQ.
API Clients with Python¶
Our first client is a map script that reads a map representations from local files, and then uploads their contents to the API server, or updates if the map already exists but is different.
Learning goals: Using Python's requests library to make HTTP requests
Preparations¶
Another exercise, another set of Python modules. We're using Requests for making API calls. Another module that is used in the latter half of the exercise is Pika, a RabbitMQ library for Python. Installation into your virtual environment as usual. Also installing dotenv since it is useful for configuring things.
pip install dotenv pip install requests pip install pika
In order to test the examples in this material, you will need to run a new API. This API is a rather simple grid based map service that could be used to store e.g. video game maps. For the purposes of this course it only has three concepts: maps, observers, and obstacles. The API can be grabbed from Github. You may also want to run it with some basic data for testing:
pip install -e . flask --app=gridmap init-db flask --app=gridmap testgen flask --app=gridmap run
You can also see the documentation at the Flasgger default location
localhost:5000/apidocs/.Using Requests¶
The basic use of Requests is quite simple and very similar to using Flask's test client. The biggest obvious difference is that now we're actually making an
HTTP request
. Like the test client, Requests also has a function for each HTTP method
. These functions also take similar arguments: URL as a mandatory first argument, then keyword arguments like headers, params and data (for headers
, query parameters
and request body
respectively). It also has a keyword argument json as a shortcut or sending a Python dictionary as JSON. For example, to get maps collection:In [1]: import requests
In [2]: SERVER_URL = "http://localhost:5000"
In [3]: resp = requests.get(SERVER_URL + "/api/maps/")
In [4]: body = resp.json()
For another example, here is how to send a POST request, and read the Location header afterward:
In [5]: data = {"name": "Fancy Map", "height": 6, "width": 12}
In [6]: resp = requests.post(SERVER_URL + "/api/maps/", json=data)
In [7]: resp.headers["Location"]
Out[8]: '/api/maps/fancy-map/'
Using Requests Sessions¶
Our intended client is expected to call the API a lot. Requests offers sessions which can help improve the performance of the client by reusing TCP connections. It can also set persistent
headers
which is helpful in sending the Accept header, as well as authentication tokens for APIs that use them. Sessions should be used as context managers using with statement to ensure that the session is closed.In [1]: import requests
In [2]: SERVER_URL = "http://localhost:5000/"
In [3]: with requests.Session() as s:
...: s.headers.update({"Accept": "application/json"})
...: resp = s.get(SERVER_URL + "/api/maps/")
With this setup, when using the session object to send
HTTP requests
, all the session headers are automatically included. Any headers defined in the request method call are added on top of the session headers (taking precedence in case of conflict).Basic Client Actions¶
Now that we know the basics of using Requests, we can look at the example properly. We're not going to show the full code here, only the parts that actually interact with the API (but you can download the full code later). We are going to go with a custom map representation for local files that uses the symbols "." for empty tile, and "#" for obstacle. Any remaining characters denote observers, and their details will be listed after the map itself. Map name is always on the first line, e.g.
Fancy Map ............ .a...##...b. .....#...... ........#### .c........d. ....##...... a,Fancy Observer A,6 b,Fancy Observer B,6 c,Fancy Observer C,6 d,Fancy Observer D,6
Admittedly this data format is not very good and we would very likely much better off just storing the same JSON locally that the API holds. We chose to use this data format mostly to demonstrate the idea of converting between two different representations of data when interacting with an API. That and also because we don't really want to implement an editor for the JSON data, whereas this format is at least somewhat editable with just a text editor.
Client Workflow¶
The client workflow is roughly as follows:
- GET the full map collection from the API
- Loop through local map files for
- new maps
- maps to update
- For each new map:
- Send a POST request to create the map
- Loop through the map and send a POST request to create each observer and obstacle
- For each existing map:
- GET the map's details from the API
- Calculate map dimensions from local file and compare to the API data, updating with PUT if different
- Loop through the map and compare each observer and obstacle to existing data
- exists locally, absent on API: create with POST
- absent locally, exists on API: remove with DELETE
- exists on both: check for differences, update with PUT if needed
- Optionally: download all maps that we don't have locally.
If we wanted to be fancier with this whole thing, we would add modification date to maps on the API and then the client could see which side has newer information by comparing the timestamp from the API to the local file's last modified timestamp. In the event of API having newer information, it would simply reverse the above steps.
Because this is not a hypermedia client, there is a chance of API changes breaking the client. To avoid further complications, the client will include a check at the beginning that compares the server's API version to a local constant that indicates what API version the client was implemented for. If these differ, the user should review API changes, then update the code, and the API version constant accordingly to make sure changes do not cause errors.
Client Architecture¶
In the material we are going to focus on the part of the client that actually interacts with the API. In order to properly do so, we are going to decouple it as much as possible from the rest of the application logic. This is generally good practice as it allows you to change backends without having to modify everything about the client. The chosen structure for the code is to use a handful of classes to organize things:
APIDataSource- our main point of interest, the class that communicates with the APIMap- a common parent for local and remote maps that defines behaviors they both should haveLocalMap- a subclass that loads its data from a local fileRemoteMap- a subclass that loads its data from the API
Knowledge about the API is split as follows: The APIDataSource class knows the URIs used by the API and the Map class has an internal data format that is compatible with the API's resource representations. The subclasses of Map are mostly there to separate the loading logic into two classes that can both have their own
__init__, for convenience. We could also just have two different load methods in the Map class instead. Another design would be to have two different data source classes that both output instances of Map. Either way there needs to be an agreement somewhere about how data in the client is mapped to resource representations used by the API.The program flow is going to be defined by a handful of functions that utilize these classes.
Making a Generic Client Class¶
An easy starting point is to simply make a generic client class that works with any REST API. This is a relatively simple matter of creating a class with a constructor that creates a session and sets necessary headers and other information, and methods for sending the basic request types. To initialize a client, at mimimum a host address as needed, and possibly other information for authentication and TLS. We are going to leave header-based authentication as homework, but TLS for self-signed certificates will be supported by passing a file path.
import requests
from urllib.parse import urljoin
class APIDataSource:
def __init__(self, host, ca_cert=None):
assert host.startswith("http"), "No protocol in host address"
self.host = host
self.session = requests.Session()
if ca_cert:
self.session.verify = ca_cert
It's best to set the host address at the beginning. If the client needs to communicate with multiple APIs, it's a good idea to create a different instance of the client class for each of them, as they might also have different certicates or authentication. The assert at the beginning to ensure that either "http" or "https" was included in the host address is a sanity check that prevents errors in the future. If we wanted to be fancier, the client could also just add it automatically by guessing which one is used if it's missing (e.g. if ca_cert has been given, the connection is most likely going to use TLS). Setting a session's verify attribute will make requests check the host's identity against the specified certificate.
Basically all that's left is to add methods for each of the HTTP methods so that everything will work in a unified manner. Just like before, we are using assert statements to make checks, handling them will be someone else's problem.
def _get(self, uri):
response = self.session.get(urljoin(self.host, uri))
assert response.status_code == 200
return response.json()
def _post(self, uri, data):
response = self.session.post(method, urljoin(self.host, uri), json=data)
assert response.status_code == 201
return response.headers.get("Location")
def _put(self, uri, data):
response = self.session.post(method, urljoin(self.host, uri), json=data)
assert response.status_code == 204
def _delete(self, uri):
response = self.session.delete(urljoin(self.host, uri))
assert response.status_code == 204
These are intended to be internal methods of the client class. As we discussed earlier, the client class should know the URIs so that if there are changes in the API, all of them can be updated in the same place. The place in question will be a bunch of convenience methods that each either get data from the API or send data to it. For example, there will be a method called get_maps which makes a GET request to the API's
/api/maps/ URI. When another part of the client code needs to have all of the maps, it will call this method, not the _get method directly. This way those parts of the code do not need to know the URI. The convenience method would look simply like: def get_maps(self):
return self._get("/api/maps/")
Finally one point of interest in the client class is how the session is managed. Unlike in the earlier examples, the session is not used directly in a with statement. This is because we want to keep it around for multiple different methods. Instead, it is now necessary to explicitly close the session when the connection is no longer needed. Now, one could write a close method for the client class that closes the session, and then just remember to call it at the end. However, since the client class itself kinda does serve a similar purpose as a session, why not make the class itself compatible with context management? Making a class compatible with the with statement is quite simple, it simply needs to define two special methods: __enter__ and __exit__. The first defines what kind of an object is stored into the with statement variable, and the second what happens at the end. For our simple client class:
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
self.session.close()
Now it's possible to write code like
with APIDataSource(hostname) as api: to get an instance of the class and have the session always closed properly regardless of what happens inside the with block.Comparing Data¶
The section above really covers all you need to know about writing REST API clients with Requests. This last section mostly aims to give you some idea of how to go about comparing data from two sources, one of which is an API. For these examples, we are assuming local map data has been read to instances of LocalMap as described earlier. If you want to see how these are parsed, refer to the full example code. Let's start by showing the basics of the Map class:
class Map:
def __init__(self):
self.name = ""
self.slug = ""
self.width = 0
self.height = 0
self.observers = {}
self.obstacles = set()
def __hash__(self):
return hash(self.slug)
def __eq__(self, other):
return self.slug == other.slug
def __str__(self):
return self.name
The maps are initialized empty but the corresponding __init__ in each of the subclasses takes care of setting basic attributes. For API side maps we don't actually populate the observers and obstacles unless needed. If you check the documentation of
/api/maps/, it only contains the basic information of a map, not its contents. As the contents are only relevant when a map exists on both sides, we don't need to GET that information for the other two cases. There's also some class magic once again. By setting the __hash__ and __eq__ methods we are basically saying that any instance of the class is the same as another instance if their uniquely identifying string attribute, slug, is the same. This makes writing comparison expressions easier.Since our client's purpose is to only synchronize data one way, from local to remote, it is sufficient to check which maps exist locally, and which of them also exist remotely. For observers on a map we still need to do a two-way check, so we are going to actually cover both scenarios. For maps we need to first have a list of Map instances from each source. Since
/api/maps/ returns the entire map collection from the API, we just need to create RemoteMap instances for each object in the JSON document. If you look at this class definition, all that is needed is to pass the map object to the constructor:class RemoteMap(Map):
def __init__(self, document):
super().__init__()
self._read_map_info(document)
def _read_map_info(self, document):
for key, value in document.items():
setattr(self, key, value)
def read_remote_maps(api):
maps = []
document = api.get_maps()
for map_data in document["maps"]:
maps.append(RemoteMap(map_data))
return maps
The use of setattr here to streamline setting attributes is possible because we decided in the design stage that Map will have properties that are compatible with the API data, i.e. it will have the exact same names for attributes. Again refer to the full example if you want to see how this is done for local map files. Once we have a list of Map instances from each source, they can be compared quite simply. This comparison takes advantage of the fact that we made it possible to make two different instances of the class identical when their slug matches:
def compare_maps(api, local, remote):
for map_obj in local:
if map_obj in remote:
remote_obj = remote[remote.index(map_obj)]
remote_obj.update_from_api(api)
if map_obj.attribute_difference(remote_obj):
api.update_map(map_obj.slug, map_obj)
compare_observers(api, map_obj, remote_obj)
compare_obstacles(api, map_obj, remote_obj)
else:
api.create_map(map_obj.serialize_json())
for observer_data in map_obj.serialize_observers():
api.create_observer(map_obj.slug, observer_data)
for x, y in map_obj.serialize_obstacles():
api.create_obstacle(map_obj.slug, x, y)
As discussed earlier there are only two branches here: either we check for differences between local map and remote map, and update accordingly, or create a new map on the API side if a map we have locally was not found there. The latter case is obviously much more straightforward, as all we need to do is to serialize the data and do the necessary API calls with the data.
The update branch is a little more interesting because now we have two instances of the same map, and need to figure out what changes are needed to make them equal. We need to start by adding the data about observers and obstacles to the remote map, using its update_from_api method. Looking at the API documentation we can see that there is no way to simply submit a list of observers and obstacles with the PUT request that udpates basic information about maps. This means the individual API calls need to be figured out at the client end. We are going to look at the observer update process more carefully. It is more interesting because there are four possible outcomes for each observer:
- it's new, i.e. only exists on the local map;
- it has been moved or its vision range has changed, API side needs to be updated;
- it has been removed, i.e. it only exists on the remote map;
- it's the same, no action needed
To find these, the Map class has a method for comparing two instances. When reading the code it's good to recall that we are comparing two dictionaries where observer slugs are used as keys, and the entire data of an observer as the value. We made it a method of Map instead of a function that would compare two instances of Map largely because this way functions that are outside of Map don't need to know how its internals work.
def observer_difference(self, other):
new = []
changed = []
gone = []
for slug, observer_data in self.observers.items():
if slug in other:
if any(observer_data[key] != other[slug][key] for key in observer_data):
changed.append(observer_data)
else:
new.append(observer_data)
for slug in other:
if slug not in self.observers:
gone.append(slug)
return new, changed, gone
Once we have the three lists returned by this method it's rather simple to call the API for each element to POST, PUT, and DELETE accordingly. In the last list it's sufficient to only have the observer slug as no data is needed when deleting. As the obstacles are simply coordinate pairs in sets, similar difference comparison for them is very simple. The - operator is a shorthand for using set.difference.
def obstacle_difference(self, other):
new = self.obstacles - other
gone = other - self.obstacles
return new, gone
The functions compare_observers and compare_obstacles simply submit the lists returned from these methods. You can find the boring details from the full code example below.
TLS Authentication¶
From this point on you will need to communicate with a RabbitMQ server. If you want to run the examples as they are, you need to have it running on your own computer. Simply installing and using default settings will be fine. However, we are also providing a live RabbitMQ server in the CSC cloud that you can use. Since it is publicly available, some measures are in place to increase security. First you will need some keys.
Using TLS¶
TLS (Transport Layer Security) is a rather important topic in the internet world as it is the vehicle that allows sending secrets like API keys encrypted. As there are other courses that will give you a much better understanding into what TLS involves, we will cover it rather briefly and mostly focus on what you need to know to access the course servers that use TLS. We are only covering this subject because without TLS, all traffic including keys and passwords would be plain text.
Generally speaking, when you connect to a server using TLS you obtain its certificate that has two purposes: 1) it can be used to verify that the server is who it says it is; 2) to encrypt communication between your client that holds the certificate, and the server that holds the corresponding private key. We are mostly interested in the latter part for the purposes of this course in order to keep things a little bit simpler.
The API server will just be using a certificate from Certbot. This means you don't need to take any action to connect to it as those certificates are usually trusted automatically. However in order to do peer verification, we actually need to create our own certificates. The file below is out private certificate authority's certificate. You will need this when configuring your client later, so grab it now.
TLS Peer Verification¶
In addition to encryption, our RabbitMQ server also uses TLS peer verification. In order to be able to connect to the server, your client must have its own certificate that is signed by the same CA certificate as the server's own certificate. As we are also using passwords this is not necessarily needed but it doesn't hurt either. Most importantly, it is a good opportunity to introduce yet another concept that you might find useful when making real life deployments. Just like how your client needs the server to prove its identity in order to trust it, peer verification does the same from the server's side - only clients with valid certificates are allowed to connect.
Once again export your group name into an environment variable with
export PWPGROUP=<group_name>
In order to obtain a client certificate (or a server certificate, it's more or less the same process), you must first generate a private key. Do not use the same private key you used for the SSH key! Generate a new key file with:
openssl ecparam -name prime256v1 -genkey -noout -out client.key
As usual, this key file must be treated with utmost security considerations. Ideally it should never leave the machine where it was generated. Once you have your key, you can generate a Certificate Signing Request (CSR). This is a file that you will send to your Certificate Authority for signing in order to obtain your certificate. Regardless of whether you are dealing with our course's CA, or a real one, you will always need to do this. The generation will ask you a bunch of questions and we do not care too much about what you put there, but a real CA might so please pay attention to any instructions. The most importat one is the FQDN where you should normally put your domain name. Anyway, to start the process:
openssl req -new -sha256 -key client.key -out $PWPGROUP.csr -addext "extendedKeyUsage=clientAuth"
This will output the .csr file. Send this file to the course email list as part of the next task. In return you will get a certificate file with the same name and the .crt extension. This is the certificate file that you will use when setting up your client to communicate with our RabbitMQ server.
TLS with Pika¶
In order to configure Pika to connect with TLS, you need a little bit more work and the help of the built-in ssl module. Pika needs to be given an SSL context which you can build with the ssl module. In our case we are simply going to build a context that uses the CA certificate to verify the receiving server. These lines will prepare the context. The last one specifies the client certificate to send for peer verification.
import ssl
context = ssl.create_default_context(cafile="/path/to/ca/file")
context.verify_mode = ssl.CERT_REQUIRED
context.load_cert_chain(
"/path/to/client/cert",
"/path/to/client/key",
)
Once you have your context, it is simply passed to the Pika connection along with your credentials. The following example assumes you have username, password, host, port, and vhost defined somewhere earlier in the file.
credentials = pika.PlainCredentials(username, password)
connection = pika.BlockingConnection(pika.ConnectionParameters(
host,
port,
vhost,
credentials,
ssl_options=pika.SSLOptions(context)
))
After this change to forming the connection, the rest of the examples work just like you were using a default RabbitMQ installation without credentials and TLS.
Managing Secrets¶
Before moving on we need to discuss the topic of how to make secrets available to your code. As code these days tends to end up in Github - particularly for your project - you should never include any credentials in your code. Not only is this incredible insecure, it also makes deployment harder. Generally speaking there are two primary ways to convey secrets to an application:
- Configuration files
- Environment variables
In some scenarios command line arguments and interactive prompts can also be considered, but usually only for human users. When making automated machine clients, picking one of the two primary methods is recommended. In case of Flask, using a configuration file as explained in Flask API project example is the recommended way. For clients that simply use Requests you can make your own configuration file solution with e.g. Python's configparser, or use environment variables.
Environment variables are set with the set or export command depending on your operating system. Python access them through the environ dictionary in the os module, or the getenv function if you want to use defaults instead of getting an error when a variable is not found
username = os.environ["PWP_RABBIT_USER"]
password = os.getenv("PWP_RABBIT_PASSWD", "notsafe")
You can also remove the variable after retrieving provided unsetenv is supported on your system. This would be done just like removing and retrieving a key from any dictionary, with the pop method:
username = os.environ.pop("PWP_RABBIT_USER")
While the variable will still exist after you exit the application, it will not be available to any subprocesses etc. during the application's runtime so if you are doing something fun like, I don't know, running student code in order to check whether it's correct, it is probably a good idea to not let those processes access any important environment variables.
In general configuration files are easier to manage because with environment variables you still need to write them into the environment somehow, and this usually happens from a file so might as well use a configuration file at that point. The most important thing about configuration files is to just make sure they are only readable by the user your application is running as. While full web server setups usually use configuration files, in container deployments it can be more straightforward to configure the container launchup to write secrets into environment variables. Choose an approach that suits your needs. Please see the extra part of exercise 3 for more information on your options when using containers.
Task Queues and Publish/Subscribe¶
Major part of the course has involved direct communication between the API and client where the client accesses the API through HTTP requests and gets mostly immediate responses. By that we mean the client expects to get whatever it was looking for as a direct response to its request. However, this is certainly not the only way of communication between services and clients or other services. In this last segment we are taking a peek at using task queues and publish/subscribe, two means of communication that are more asynchronous in nature.
This section will use RabbitMQ, a message broker commonly used to implement communication between services. We have provided you a running instance of RabbitMQ that you can access with credentials. You will be informed via course channels about how to get access. Feel free to use this instance to test examples, complete tasks, and implement these messaging protocols in your project if you want to go that way.
Our way of interacting with RabbitMQ is the Pika module which offers a Python interface for connecting to RabbitMQ.
Task Queues¶
Task queues are used to "place orders" for computational tasks that will be run by separate workers. Unlike API calls that would be targeted to a specific service by its address, tasks are sent to the message broker. As it is a queue, the first eligible worker to notice the message will undertake the task, marking it as being processed. They are generally used when you have lots of heavy tasks that can be run independently of each other, or tasks that need to be run in isolation. Worker pools are usually easy to scale as all a worker needs to do is connect to the message broker and it's ready to go. Usually workers don't have their own persistent storage.
For a real life example, Lovelace uses task queues when checking code submissions. This is a use case where task queues fit like a glove:
- Each submission needs to be checked exactly once
- Each check is an independent operation
- Checking the submission is an expensive operation
- It also needs heavy isolation as we are running unknown code
- There is no need for persistent storage for the individual workers
Sensorhub Statistics¶
As to our code example, we're going to implement a feature to get statistics from a sensor. Because calculating statistics can be time-consuming, they will be calculated on separate workers. For the purposes of this example we are only calculating the mean value of measurements, but you could add all sorts of statistics without changes to the communication protocol. In our simple plan, only one set of stats will exist for a sensor at any given time. Here is a summary of how this will be implemented:
- We will have a new database model for sensor statistics on the API server
- We will add a new route,
/api/sensors/<sensor:sensor>/stats/, it supports GET, PUT, and DELETE - When a client sends a GET request to this resource, one of two things will happen:
- If there are existing statistics, they will be returned to the client (200)
- If there are no existing statistics, measurement data will be sent to the task queue for processing, and the client receives a 202 (Accepted) response to indicate that this resource is not yet available but will be in the future.
- When the worker has completed processing calculating statistics, it will send a PUT request with the statistics to the main API
- If a client wants to generate the stats again, they will first send DELETE to the stats resource, and then send a new GET request.
The full API can be found from Github. We made a separate fork from the one used in the previous exercise to keep them separate. Grab the example repository with git, and switch to branch ex4-stats.
The full system that we are implementing in this material is presented by the diagram below. Details about each component can be found in the sections below. Our system will have one API server and one RabbitMQ server, but can have any number of workers and clients connected to it.
API Configuration¶
Unless you are installing RabbitMQ on your local computer, you need to configure the API with your credentials and the RabbitMQ server address. Copy the following lines, and put them into config.py in your application's
instance folder
, then edit the values to match your credentials from the Decryptify task.CLIENT_KEY = "/path/to/client.key"
CLIENT_CERT = "/path/to/client.crt"
CA_CERT = "/path/to/ca.crt"
RABBITMQ_USE_TLS = True
RABBITMQ_HOST = "fip-x-x-x-x.kaj.poutavm.fi"
RABBITMQ_PORT = 5671
RABBITMQ_VHOST = "yourgroup-vhost"
RABBITMQ_USER = "yourgroup"
RABBITMQ_PASS = "password"
Statistics Model¶
The stats model doesn't have anything that wasn't covered in Exercise 1, but it's shown here to give you an idea what we are working with.
class Stats(db.Model):
id = db.Column(db.Integer, primary_key=True)
generated = db.Column(db.DateTime, nullable=False)
mean = db.Column(db.Float, nullable=False)
sensor_id = db.Column(
db.Integer,
db.ForeignKey("sensor.id"),
unique=True, nullable=False
)
sensor = db.relationship("Sensor", back_populates="stats")
It also has serialize, deserialize, and json_schema methods that are not shown here. A relationship pointing to this model will also be added to the Sensor model.
stats = db.relationship("Stats", back_populates="sensor", uselist=False)
Statistics Resource¶
The statistics resource will look like this.
class SensorStats(Resource):
def get(self, sensor):
if sensor.stats:
body = sensor.stats.serialize()
return Response(json.dumps(body), 200, mimetype=JSON)
else:
self._send_task(sensor)
return Response(status=202)
def put(self, sensor):
if not request.json:
raise UnsupportedMediaType
try:
validate(
request.json,
Stats.json_schema(),
format_checker=draft7_format_checker
)
except ValidationError as e:
print(e)
raise BadRequest(description=str(e))
stats = Stats()
stats.deserialize(request.json)
sensor.stats = stats
db.session.add(sensor)
db.session.commit()
return Response(status=204)
def delete(self, sensor):
db.session.delete(sensor.stats)
db.session.commit()
return Response(status=204)
There are mostly two things to note here. GET handling depends on whether stats currently exist or not - the mystery method _send_task will be called when stats are not available. We are using PUT to set the stats for the sensor regardless of whether they existed previously or not. This is done to deal with the situation where multiple GET requests are sent to the stats resource before the calculation service has finished processing. It will still result in stats being calculated multiple times, but we will only have one result at the end (the latest one). This issue could be addressed by adding suitable model class fields and logic to mark sensor's stats as being processed.
Sending Tasks¶
The process of sending a task has the following steps:
- Gather the data to be sent
- Form a connection to the RabbitMQ broker and get a channel object
- Declare a queue
- Send the task to the declared queue
The worker is going to need two pieces of information to complete its task: list of measurement values, and the sensor's unique identifier so it knows where to send the results back. The data can be obtained easily by forming a list of measurement values from the sensor's measurements relationship (this could be done in a more optimized manner, but we're keeping it simple).
def _send_task(self, sensor):
# get sensor measurements, values only
body = {
"data": [meas.value for meas in sensor.measurements],
"sensor": sensor.name
}
# form a connection, open channel and declare a queue
connection = get_rabbit_connection()
channel = connection.channel()
channel.queue_declare(queue="stats")
# publish message (task) to the default exchange
channel.basic_publish(
exchange="",
routing_key="stats",
body=json.dumps(body)
)
connection.close()
We are using the default exchange here to make the example simple. It allows us to declare queues freely. The queue name "stats" is given as routing_key argument to basic_publish. This will publish the message to the "stats" queue so that it can be picked up by workers who are consuming the "stats" queue. In order to be more efficient with connections, the middle part could be done on module level only once.
With this our API side is ready. It will now send requests for stats to the queue. You can already run the server and visit any stats URL. However, as there are no consumers for the queue, the task will simply sit there until a consumer shows up.
Statistic Worker¶
Next step is implementing the worker that consumes the task queue. To keep the example simple, this will be a Python program that runs from the command line and keeps serving until it is terminated with Ctrl-C. The workflow is roughly:
- Define a handler function for received tasks
- Connect to the RabbitMQ server and get a channel
- Declare the "stats" queue
- Enter the consuming loop to serve tasks
Once a task comes in, the following will occur:
- Check that the task has all we need (data and "edit" control)
- Calculate the stats and add a timestamp
- Add the sensor's unique identifier to predefined URI and send the stats there with a PUT request
- Regardless of result, acknowledge the task as completed
In the first implementation round we won't implement handling for errors. All tasks will be acknowledged regardless of result because we don't want them to go back to the queue. For instance, if we were to receive a task without an existing sensor identifier, the task simply cannot be completed, and returning it to the queue would be pointless. In the second implementation round we will add a mechanism to notify other parts of the system that there was a failure in completing the task.
The full code is available below. Note that it includes the second implementation round additions as well. The example snippets below will only have
pass in these spots.Worker Configuration¶
The worker doesn't believe in configuration files. Sort of. It uses dotenv to read
environment variables
from a file. This approach is flexible in the sense that it allows you to define your environment variables either in the file it wants to load, or just have them defined in your run environment via some other means. Here is what should go into your environment file. They are usually stored with the name .env and placed outside your repository.PWP_RABBIT_URI=fip-x-x-x-x.kaj.poutavm.fi:5671 PWP_RABBIT_USER=yourgroup PWP_RABBIT_PASSWD=password PWP_RABBIT_VHOST=yourgroup-vhost CA_CERT=/path/to/ca.crt PWP_RABBIT_CLIENT_CERT=/path/to/client.crt PWP_RABBIT_CLIENT_KEY=/path/to/client.key API_SERVER=http://localhost:5000
The configuration is mostly read by this class that we will be using for the worker as well as the client later.
class RabbitBackend(object):
def __init__(self, broker, user, passwd, ca=None, client_cert=None, client_key=None):
host, port = broker.split(":")
self.host = host
self.port = int(port)
self.user = user
self.passwd = passwd
self.credentials = pika.PlainCredentials(self.user, self.passwd)
self.ca = ca
self.client_cert = client_cert
self.client_key = client_key
def get_connection(self, vhost="/"):
if self.ca:
context = ssl.create_default_context(cafile=self.ca)
context.verify_mode = ssl.CERT_REQUIRED
else:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
if self.client_cert and self.client_key:
context.load_cert_chain(certfile=self.client_cert, keyfile=self.client_key)
return pika.BlockingConnection(pika.ConnectionParameters(
self.host,
self.port,
vhost,
self.credentials,
ssl_options=pika.SSLOptions(context)
))
Setting Up and Running¶
First we are going to look at the main function that will connect to the message broker, and start consuming the "stats" queue. Most of this is quite similar to what we did on the sending end in the API side. Prints in this and other functions are simply there to make it easier to follow what is going on. In real life these should be replaced by using the logging module or other logging facilities.
def main(backend):
connection = backend.get_connection(os.environ["PWP_RABBIT_VHOST"])
channel = connection.channel()
channel.queue_declare(queue="stats")
channel.basic_consume(queue="stats", on_message_callback=handle_task)
print("Service started")
channel.start_consuming()
The call to basic_consume is where we configure what queue will be consumed and how this worker will handle tasks. Note that we are not using auto_ack because we want to be sure our tasks are not lost in void if the worker dies or falls off the network in the middle of processing. This means the task handler needs to acknowledge the message when it has finished processing. With this setup we are ready to implement the actual logic for handling tasks.
Task Handler¶
The task handler in this case is the function that is responsible for parsing the task from the received message, calculating the statistics, and sending the response back in the designated manner (a PUT request to predefined address). If our workers handled more than one type of task, this function's role would be to parse the task and call the appropriate processing function. The code is presented below with comments for handling steps.
def handle_task(channel, method, properties, body):
print("Handling task")
try:
# try to parse data and return address from the message body
task = json.loads(body)
data = task["data"]
sensor = task["sensor"]
href = API_SERVER + f"/api/sensors/{sensor}/stats/"
except KeyError as e:
# log error
print(e)
else:
# calculate stats
stats = calculate_stats(task["data"])
stats["generated"] = datetime.now().isoformat()
# send the results back to the API
with requests.Session() as session:
resp = session.put(
API_SERVER + href,
json=stats
)
if resp.status_code != 204:
# log error
print("Failed PUT")
else:
print("Stats updated")
finally:
# acknowledge the task regardless of outcome
print("Task handled")
channel.basic_ack(delivery_tag=method.delivery_tag)
We set the call to the statistics function in a way that it returns a dictionary of whatever stats it was able to calculate, and we add a timestamp afterward. The data is sent back to the API with a PUT request as we designed. Also like we discussed earlier, there's a
finally clause in the try structure that will, regardless of what happens, always acknowledge the message.With this we have a fully operational system where clients can now request stats from the API, and those stats will be generated by the worker. However, if we have really heavy calculations that can take a long time, how would the client know when to check back? Similarly, how will the rest of the system know if something went wrong so that developers can start working on how to fix it?
Publish / Subscribe¶
The other topic of this material is broadcasting with publish / subscribe. Whereas with task queues only one recipient will consume the message, in publish / subscribe the message will be delivered to all consumers. This communication method is useful when sending events to the system in situations where we do not know the exact recipient, and/or there is multiple of them. In our example we can use it in two ways:
- We can broadcast an event when statistics calculation has been completed, and this can be picked up by some notification service to let the user know that the statistics they ordered are ready.
- We can broadcast log events so that system admins can notice problems
We can achieve this by creating two exchanges with the fanout type: one for notifications and one for logs. Services that are interested in those can then listen on the exchange of their choice.
Changes to Worker¶
The worker will need a couple of changes to fulfill these new requirements. The file provided earlier has these changes already included.
Setting Up Exchanges¶
First we need to introduce the exchanges in our main function. We also need to make the channel globally available. We're just going to be lazy and use the
global keyword to achieve this, but generally it would be better to e.g. turn the worker into a class. The changed main function is shown belowdef main(backend):
connection = backend.get_connection(os.environ["PWP_RABBIT_VHOST"])
channel = connection.channel()
channel.exchange_declare(
exchange="notifications",
exchange_type="fanout"
)
channel.exchange_declare(
exchange="logs",
exchange_type="fanout"
)
channel.queue_declare(queue="stats")
channel.basic_consume(queue="stats", on_message_callback=handle_task)
print("Service started")
channel.start_consuming()
Besides adding the exchanges, the rest is the same. Declaring these new exchanges doesn't interfere with the previous functionality in any way. Also just like queues, exchange declarations are idempotent, and should be declared at both ends.
Broadcast Functions¶
Once we have the exchanges, we can write functions that will publish to them. We'll make one for the notifications, and another one for logs. The same basic_publish method that was used on the API side to send the task is also used here, but this time we are providing the exchange argument instead of routing_key.
def send_notification(channel, sensor):
channel.basic_publish(
exchange="notifications",
routing_key="",
body=json.dumps({
"task": "statistics",
"sensor": sensor,
})
)
def log_error(channel, message):
channel.basic_publish(
exchange="logs",
routing_key="",
body=json.dumps({
"timestamp": datetime.now().isoformat(),
"content": message
})
)
After these functions are in place, all that's left is to replace some of the prints with calls to these functions, which you can see in the final code.
Client Example¶
For the client we are going to make one important shortcut: the client will be allowed to connect and listen on the notifications exchange directly. In a real setup we probably should not be doing this directly, but we are already running three Python programs when we're done, and don't want to add any more intermediate steps to this example. We are going to implement a very simple client. It's going to take a sensor's name as command line argument, send a request to stats and then print them out when they are ready.
Requesting Stats¶
The core function is the one that requests stats from the API. This function can do one of two things when it receives a successful response (i.e. in the 200 range):
- on 200: print out the statistics
- on 202: open a connection to the message broker and wait for a notification
def request_stats(backend, sensor):
with requests.Session() as session:
stats_url = API_SERVER + f"/api/sensors/{sensor}/stats/"
resp = session.get(
stats_url
)
if resp.status_code == 200:
print(resp.json())
elif resp.status_code == 202:
try:
listen_notifications(backend)
except StopListening:
sys.exit(0)
else:
print(f"Response {resp.status_code}")
Under the
elif branch we go into the listen mode by calling a function which we'll introduce next. This is also done under a try statement that catches a custom exception which we can use inside the handling logic to notify that we are done listening.Listening In¶
In order to listen to notifications, the client needs to undergo a setup process that is quite similar to the one done by the stats worker. There's just a couple key differences because we are listening in on a fanout type exchange.
- We need to declare the exchange
- We want to create a new queue that is exclusively used by this client
This is almost directly from the third RabbitMQ tutorial:
def listen_notifications(backend):
connection = backend.get_connection(os.environ["PWP_RABBIT_VHOST"])
channel = connection.channel()
channel.exchange_declare(
exchange="notifications",
exchange_type="fanout"
)
result = channel.queue_declare(queue="", exclusive=True)
channel.queue_bind(
exchange="notifications",
queue=result.method.queue
)
channel.basic_consume(
queue=result.method.queue,
on_message_callback=notification_handler,
auto_ack=True
)
channel.start_consuming()
When we declare a queue without a name, RabbitMQ will generate a queue name for us. Also if it's set to exclusive, it tells RabbitMQ to delete the queue once we disconnect from it. The generated name is available through the response we get from queue_declare. Using that we can bind the queue to the "notifications" exchange. After that we just need to start consuming, and pass any notifications in the channel to our handler.
Handling Notifications¶
When a notification is received, the client needs to check that it's valid, and then determine if it was the notification we are looking for. In our current designs nofitications for all sensors are sent through the same exchange with no routing key, so it's up to the consumer to figure out if it is interested in the notification. Once we get notification for the sensor we requested stats for, the client can send a new GET request to the API to retrieve the stats. The notification handler code is shown below:
def notification_handler(ch, method, properties, body):
try:
data = json.loads(body)
href = API_SERVER + f"/api/sensors/{sensor}/stats/"
notification_for = data["sensor"]
except (KeyError, json.JSONDecodeError) as e:
print(e)
raise StopListening
else:
if notification_for == sensor:
with requests.Session() as session:
resp = session.get(href)
if resp.status_code == 200:
print(resp.json())
else:
print(f"Response {resp.status_code}")
raise StopListening
The sensor variable comes from the module level - it was set when we read its value from sys.argv. As discussed previously, this function can raise StopListening exception when either an error is encountered, or we have successfully obtained the stats that were initially requested.
Give feedback on this content
Comments about this material