Data pipelines and data storage for AIRBODS air measurement experiments. The primary data pipeline downloads all the raw data available on a single Datacake "workspace" (specified by its unique identifier) and copies those data onto a database. It then transforms that data to make a "clean" data set available for research use.
This document contains some descriptions of how the system is built and how to administer and maintain it.
The data are described in the Metadata section below.
Code examples are contained the the examples
directory that can be used to retrieve data in various languages.#
This repository contains an Ansible project as described below.
For an overview, see this blog post "Supporting airborne infection research with data engineering" by the Research & Innovation IT team in IT Services at the University of Sheffield.
The prototype of this system is in rcgsheffield/ventilation-for-hospitality.
There is an overview of the system in the architecture diagram.
flowchart LR
s1[Sensor]
s2[Sensor]
s3[Sensor]
dc("Cloud data system (Datacake)")
wf["Workflow orchestrator (Airflow)"]
sql[("SQL Database (PostgreSQL)")]
gd["Metadata registry (Google Sheets)"]
s1 --> dc
s2 --> dc
s3 --> dc
dc --> wf
gd --> wf
wf --> sql
The system comprises two major parts:
- Workflow Orchestrator: The data pipelines are implemented using a workflow orchestrator called Apache Airflow.
- Database: The data are stored in a relational database management system implemented using PostgreSQL.
- A web-based database administration interface is available to manage this database.
The workflow orchestrator comprises the subcomponents inside the black dotted line. The user roles are represented by red person icons. The cloud services are shown as blue clouds. The services are represented as green rectangles. The databases are shown as yellow cylinders.
There are three typical user roles:
- End user (researcher) will access the SQL database using a certain security role.
- Data engineer will access the workflow orchestrator using its web portal.
- A system administrator will access the virtual machine(s) on which the system runs.
This is a more detailed diagram of the different services and the infrastructure they run on.
flowchart LR
sheets
datacake
subgraph "Campus network"
subgraph "Virtual machine 1"
web["Administration tool"]
sched["Scheduler"]
backend[("Result backend")]
broker[("Message broker")]
end
subgraph "Virtual machine 2"
worker["Worker(s)"]
end
subgraph "Virtual machine 3"
db[("PostgreSQL database")]
dbadmin["Database administration tool"]
end
end
web <--> sched
sched <--> broker
broker <--> worker
backend <--> worker
worker --> db
dbadmin <--> db
sheets --> worker
datacake --> worker
The data pipelines are defined using three directed acyclic graphs (DAGs) that are defined in airflow/dags/*.py
.
datacake
downloads, stores and transforms data retrieved via the Datacake API.datacake_devices
retrieves and stores metadata about sensors from the Datacake APIdeployments
retrieves and stores information about sensors and their deployments at certain locations from a Google Sheets spreadsheet.
The configuration for these is stored in airflow/variables.json
.
There is a custom operator used to access GraphQL APIs via HTTP in airflow/plugins/operators/graphql.py
that is used in the workflows that interact with Datacake, which exposes this kind of API.
The RDMS service contains three databases:
airbods
contains the research data and is accessible by the end users. The tables are described in the metadata section below.airflow
contains the metadata and system data for the workflow orchestrator. This may be accessed by the user account (role) namedairflow
.postgres
contains system data for the database service itself
The database settings are specified in the Ansible playbook and the configuration files postgresql.conf
and pg_hba.conf
as discussed in the PostgreSQL 12 documentation.
The system comprises multiple subsystems as described above that are installed using an automated deployment tool to facilitate continuous integration/continuous deployment (CICD).
Automated deployment is implemented using Ansible, which defines a workflow comprising a series of tasks that install software and configure services. This workflow is executed on one or more remote machines. See their docs: Executing playbooks for troubleshooting. Most of the service configuration files are in the files
directory (Ansible will automatically search this directory for files to upload.) Variables are defined in the group_vars/all
YAML file.
You may need to install Ansible in a Python virtual environment if you need to use a more up-to-date version of the software, which is written in Python. To use the tool you'll need to activate the virtual environment with a command similar to the one below, assuming the environment was created in the directory ~/ansible
:
source ~/ansible/bin/activate
The secret passwords used by each subsystem are defined in local files in the ./secrets
directory, which isn't included in this repository for security reasons.
The secrets/variables.json
file contains the fernet key used for encryption of passwords by Airflow. This file looks like this, where the Fernet key may be generated using the Python code snippet below.
{
"fernet_key": "********************"
}
To generate a Fernet key:
import cryptography.fernet
fernet_key = cryptography.fernet.Fernet.generate_key()
print(fernet_key.decode())
Security keys, certificate requests and certificates may be generated using openssl
. Certificate-authority-signed certificates were retrieved via ITS Helpdesk. This is particularly important for the SQL database because this service will be exposed to the risks associated with access via the public internet.
Generate key and certificate:
openssl req -nodes -x509 -newkey rsa:4096 -keyout secrets/webserver_key.pem -out secrets/webserver.pem -days 365
-nodes
means "No DES" or an unencrypted private key which has no password.
The deployment script installs the necessary software subsystems on a remote machine. The script should be idempotent so it can be run repeatedly without causing problems.
The private key must be installed and configured on the target machine so that the control node may connect using secure shell (SSH). See this tutorial: How To Configure SSH Key-Based Authentication on a Linux Server. The ssh-copy-id
tool is useful here to install your key information on a remote host.
ssh-copy-id $USER@airbods.my-domain.com
ssh-copy-id $USER@airbods01.my-domain.com
ssh-copy-id $USER@airbods02.my-domain.com
You can view the installed files like so:
ssh $USER@airbods.my-domain.com -i ~/.ssh/id_rsa "ls -la ~/.ssh/"
The ida_rsa
file is that user's private key. The authorized_keys
file is used to list the public keys that can automatically connect. These files would be stored in the directory ~/.ssh
for the user you use to connect.
You can test the connection like so:
ssh $USER@airbods.my-domain.com -i ~/.ssh/id_rsa "echo OK"
The tool ssh-agent
is useful to save time by storing the password for encrypted private keys so you don't have to repeatedly type it in. See the Ansible docs connection info.
ssh-agent bash
ssh-add ~/.ssh/id_rsa
It's assumed that you log into the remote system as a user (with username $USER
) that is a member of the system administrator's group airbodsadmins
. Contact the ITS Unix team with queries about this.
You'll also need to be able to run executables as other users. To test this, try to run the following commands (which will only work after the relevant users have been created.)
sudo -u postgres id
sudo -u airflow id
The variable $USER
is used to specify which username to use to connect to the remote host.
USER=<sa_username>
Use the shell variable $INVENTORY
to select the target environment:
INVENTORY=hosts-dev.yaml
#INVENTORY=hosts-prod.yaml
Check Ansible is working. (You probably need to use ssh-agent
as described above.)
# View Ansible package version
ansible --version
# View inventory
ansible --inventory $INVENTORY --list-hosts all
# Ping all remote hosts
ansible --inventory $INVENTORY --user $USER all -m ping
# Run a custom command on each host
ansible --inventory $INVENTORY --user $USER -a "echo OK" all
To run the deployment script, we need to use the deployment script which is defined as an Ansible "playbook" using the ansible-playbook
command (see ansible-playbook CLI docs).
The --inventory
option determines which inventory of hosts will be targeted, where hosts.yaml
contains the development environment and hosts-prod.yaml
points to the production environment. You need to use a different inventory file which will point the deployment script to a different machine using the --inventory
argument of the ansible-playbook
command.
# Check that the Ansible Playbook command is working
ansible-playbook --version
# Check a playbook
ansible-playbook --inventory $INVENTORY --user $USER --ask-become-pass airbods.yaml --check
Install services (inside the selected environment) which assumes that all the secret password files have been created in the ./secrets
directory.
ansible-playbook --inventory $INVENTORY --user $USER --ask-become-pass airbods.yaml
Run automated tests:
ansible-playbook --inventory $INVENTORY --user $USER --ask-become-pass test.yaml
If problems occur, check the logs and try the following steps:
- Re-run the deployment script
- Check service status (see the Monitoring section below) and logs.
- Check that each service can connect to the other relevant services.
- Ensure that the Ansible notify handler feature is enabled for any changes you may have have made.
- Restart the services on the remote host (or perhaps reboot the entire remote system manually)
- Use Ansible's verbose mode and other debugging features
- If you encounter a "file not found" error, it may be due to a missing secrets file which should be saved in the
./secrets/
directory and contain a random password.
Currently this deployment is designed to run a single machine, assuming that it's a small-scale instance. To scale this up to multiple machines, the hosts and deployment script will need to be adapted so that each machine performs a different role.
Routine maintenance must be performed on the server operating system and the various services that make up the system. There is a script called maintenance.yaml
to automate some of this. This can be run as follows:
ansible-playbook --inventory hosts-prod.yaml --user $USER --ask-become-pass maintenance.yaml
This will clear out various aspects of the system but isn't a substitute for a proper maintenance routine.
There are several ways to interact with and control each part of the system.
The research data are contained in a Structured Query Language (SQL) database. There are several ways to connect to this SQL database, either using software packages or programming languages. See the examples directory for a list of available tools.
To connect to the database machine, use the connection string is as follows:
postgresql://$USERNAME:$PASSWORD@airbods.my-domain.com/airbods
You can connect to the database from your local computer with psql as follows:
psql --host=airbods.my-domain.com --dbname=airbods --username=$USERNAME
You may need to change the username to something else. You can enter the password manually or use a pgpass file.
You can run a command by using the shell or as follows:
psql --host=airbods.my-domain.com --dbname=airbods --username=$USERNAME --command "SELECT now();"
See PostgreSQL docs Chapter 25. Backup and Restore.
First, log into the database machine as an administrator, and run the following pg_dump command as the postgres
user:
# Log in with database service account
sudo su - postgres --shell /bin/bash
pg_dump airbods --verbose --file "/tmp/airbods-`date -I`.sql"
You can use the Airflow Using the Command Line Interface to view, control and administer the workflow orchestrator. You should run this tool as the service user airflow
or as root
.
# Log in as system user (or use sudo -u airflow <command>)
sudo su - airflow
/opt/airflow/bin/airflow --help
# List users
/opt/airflow/bin/airflow users list
# List DAGs
/opt/airflow/bin/airflow dags list
See Testing a DAG.
To check that the DAG code may be imported:
python dags/datacake.py
Test a specific task:
airflow task test <dag_id> <task_id> <date>
airflow tasks test datacake all_devices_history 2021-06-01
Run unit tests:
python -m unittest --failfast
The database service may be controlled using systemd
:
# Restart PostgreSQL service
sudo systemctl restart postgresql
To connect to the SQL database, see code examples.
The pgAdmin tool is available by opening airbods.my-domain.com using a web browser.
The connection to the database must be configured using the Server Dialogue.
- If no servers are configured, click "Create Server"
- On the "General" tab, populate these fields:
- Name:
Airbods
- Shared:
Yes
- Name:
- On the "Connection" tab, populate these fields:
- Host name:
localhost
- Username:
pgadmin
- Password:
***************
(this must be kept secret) - Save password:
Yes
- Host name:
- Click "Save"
These user accounts are used to access pgAdmin only (not the SQL database itself.)
- Once logged in, to open the User Management panel, click on the arrow in the top-right corner of the screen.
- Click "Users"
- Click the "+" icon on the top-right of the "User Management" window
- Fill in the form fields
- Role = Administrator
- hit "Enter"
Follow the steps below to create a new login (user credentials) to access the SQL database.
- Log into the pgAdmin tool
- On the browser (left pane) select the Airbods server
- Right-click on the server and click on "Create" -> "Login/Group Role"
- On the first tab called "General" in the name field, enter a username (don't use special characters)
- On the second tab, "Definition," enter a unique password
- On the "Privileges" tab, set "Can login?" to "Yes"
- On the "Membership" tab, next to "Member of" click on the "Select roles" box and select the "researcher" role.
- Click "Save"
As a check, the SQL tab should show something like this:
CREATE ROLE joebloggs WITH
LOGIN
NOSUPERUSER
NOCREATEDB
NOCREATEROLE
INHERIT
NOREPLICATION
CONNECTION LIMIT -1
PASSWORD 'xxxxxx';
GRANT researcher TO joebloggs;
The PostgreSQL database can be administered using psql.
sudo -u postgres psql
# List databases
sudo -u postgres psql -c "\l"
# List users
sudo -u postgres psql -c "\du"
A database role exists for end users called researcher
.
Create new user credentials using the createuser shell command:
createuser --pwprompt --role=researcher
You could also do this using CREATE ROLE:
CREATE USER joebloggs LOGIN PASSWORD 'xxxxxx' IN ROLE researcher;
Role membership can also be managed for existing users.
The data pipelines are managed using Apache Airflow. See the Airflow tutorial.
The is an Airflow GUI available via the webserver service which is available via SSH tunnel. To test that this is available using curl
using the command below, the --insecure
flag is used to prevent the certificate being checked (because the certificate used is not signed by a certificate authority (CA) and is self-signed.)
# Create SSH tunnel
ssh -L 4443:127.0.0.1:4443 $USER@airbods01.my-domain.com
This can be opened at this URL: https://localhost:4443. To view this interface in your web browser you'll need to skip any security warning messages about this certificate issue.
You can check it's worked by running this command from your local machine:
# Check the web page is accessible
curl --insecure --head https://localhost:4443
The DAG calendar view is useful to give an overview of the entire history of the workflow.
To run these commands, you must log in to a worker machine and run the following commands as the user airflow
(that is, prefix the commands with sudo -u airflow
to run them as that service user.)
# Log in as Airflow service user
sudo su - airflow --shell /bin/bash
# Add the Airflow binary directory to the shell context ($PATH)
export PATH="/opt/airflow/bin:$PATH"
To check it's working:
# Check Airflow CLI is working as expected
airflow version
# Get environment information
airflow info
# Check the metadata database connection
airflow db check
To show the available workflows:
airflow dags list
The state of failed tasks may be cleared using the GUI under Browse > DAG Runs. You can also use the CLI with the tasks clear command. This may be applied to an entire DAG run, or a subset of tasks, for a specified time range.
# Clear *all* tasks within a time range
#airflow tasks clear --start-date "YYYY-MM-DD" --end-date "YYYY-MM-DD" datacake
To only clear failed tasks:
airflow tasks clear datacake --only-failed
Using the Airflow CLI, use the backfill command (see CLI backfill docs) to run historic data pipelines:
# airflow dags backfill $DAG_ID -s $START_DATE -e $END_DATE -t <task_regex>
airflow dags backfill datacake --start-date "$START_DATE" --end-date "$(date -I)"
The workflows use the Datacake GraphQL API, which communicates over authenticated HTTPS. Please read those documents for an introduction to using this. This is implemented using an Airflow custom operator called GraphQLHttpOperator
which is a subclass of the built-in SimpleHttpOperator
. There is a browser-based interface at https://api.datacake.co/graphql/ where you need to input the access token and you can run test GraphQL queries.
The raw data are stored on the virtual machine in the directory /home/airflow/data
as JSON files, one for each hourly batch operation.
The following are the items in the database. There are two types of object: tables and views. Tables contain rows of data and views are predefined SQL queries that display, merge or process that data in a certain way.
The SQL DDL used to define and create this schema is contained in SQL files in the directory ansible/playbooks/files/database and is run by the deployment script.
raw
contains a copy of the original data retrieved from Datacake. It unadvisable to use this data for research analysis, useclean
or one of the views instead. Each row corresponds to a data capture reading by a sensor at a certain time. It has a timestamp column, several columns describing the sensor used and one column per metric.clean
contains the transformed data, taken fromraw
, that is ready for usedevice
contains one row per sensor and has the following columns:device_id
is a unique identifier created by Datacake, one per sensor entry on that platform. You can view the information about a device using its URL:https://app.datacake.de/airbods/devices/d/<device_id>
serial_number
is the serial number taken from the physical device. This is the best way to uniquely identify a sensor because it won't change.verbose_name
is a human-readable label applied to each device which change over time.object
contains a JSON object with all the additional device information stored on Datacake.
deployment
contains one row for each time a sensor is moved to a new location for a period of time. There may be multiple rows per device, one for each combination of location and time period. Theserial_number
column maps to the column of the same name on thedevice
table. Thestart_time
is the time when the device was deployed. Theend_time
is when this deployment ended, or is blank if this deployment continues. The other columns describe the location of the deployment.
reading
merges the tablesclean
,device
anddeployment
to present the context for each clean data row, in addition to calculating some aggregated statistics. The deployment information is taken from the relevant deployment for that sensor at the time the reading was recorded. It contains the following columns:serial_number
is the sensor identifier.time_utc
andtime_europe_london
is the time of each reading, displayed in each time zonecity
,site
,area
etc. are columns from thedeployment
table describing the sensor location.air_quality
,co2
and other physical measurementsco2_zone_min
,humidity_area_mean
,temperature_zone_max
and other similar columns contain the aggregate statistics for each metric, partitioned over the location and day. The aggregate functions are the minimummin
, averagemean
and maximummax
for that deployment. The location may be the zone and area where the sensor was deployed. The day is the calendar date in UTC (GMT).
device_deployment
merges the tablesdevice
anddeployment
but contains one row per sensor for latest deployment. The columns are the same as those on the two source tables.
There are a number of ways to check the validity and integrity of the data in the database.
To check the time range of the raw data:
SELECT DISTINCT left(raw.time_, 10) AS day
FROM airbods.public.raw
ORDER BY 1;
Similarly for the days in the clean data:
SELECT DISTINCT clean.time_::DATE AS day
FROM airbods.public.clean
ORDER BY 1;
The system comprises several services which are managed using systemd
.
You can view the status of the services using the following commands:
systemctl status airflow-webserver
systemctl status airflow-scheduler
systemctl status airflow-worker
systemctl status redis
# RabbitMQ
sudo systemctl status rabbitmq-server
sudo -u rabbitmq rabbitmq-diagnostics status
# View PostgreSQL cluster status
systemctl status postgresql
pg_lsclusters
# pgAdmin4 web server
sudo systemctl status apache2
# Airflow
sudo -u airflow /opt/airflow/bin/airflow info
Service logs are available using the journalctl
command. The ---since
option is used to filter by time.
# Airflow service logs
sudo journalctl -u airflow-worker --reverse # most recent first
sudo journalctl -u airflow-webserver --since "$(date -I)" # current day
sudo journalctl -u airflow-scheduler --since "$(date -I) 12:00" # this afternoon
PostgreSQL database service logs:
sudo systemctl status postgresql
# List available log files (they're rotated regularly)
sudo ls -l /var/log/postgresql
sudo tail /var/log/postgresql/postgresql-12-main.log
pgAdmin logs:
sudo journalctl -u gunicorn --since "1 hour ago"
RabbitMQ message broker service logs:
sudo journalctl -u rabbitmq-server --since "1 hour ago"
# View logs for a particular node
sudo -u rabbitmq tail /var/log/rabbitmq/rabbit@localhost.log
# List RabbitMQ log files
ls -lt /var/log/rabbitmq/
# List RabbitMQ crash log files
sudo ls -lt /var/log/rabbitmq/log/
Redis database service logs:
sudo journalctl -u redis --since "$(date -I)"
Task summary
top
Processor-related statistics:
mpstat
Memory usage in MB:
free --mega
Memory usage by process:
htop --sort-key PERCENT_MEM
Storage space usage
df -h
There are several tools to analyse disk usage, such as ncdu
:
sudo apt install ncdu
sudo ncdu /home
You can look at the workers and tasks using Flower, a celery monitoring tool. (Also see Airflow Flower docs.) This can be accessed using an SSH tunnel for port 5555:
ssh -L 5555:127.0.0.1:5555 $USER@airbods01.my-domain.com
Then open http://localhost:5555 in a web browser on your computer.
See: RabbitMQ rabbitmqctl
and Management Command Line Tool. Examples:
# Check it's installed
sudo -u rabbitmq rabbitmqctl version
# View node status
sudo -u rabbitmq rabbitmqctl status
# List users
sudo -u rabbitmq rabbitmqctl list_users
SSH tunnel port 15672 on the remote machine 127.0.0.1:15672 using the ssh
command or putty
.
ssh -L 15672:127.0.0.1:15672 $USER@airbods01.my-domain.com
Then open http://localhost:15672 on your local machine.
See RabbitMQ docs Troubleshooting Guidance.
If there is a problem, try restarting the service:
sudo systemctl restart rabbitmq-server
Check that private key fingerprint matches that of the certificate:
openssl rsa -noout -modulus -in secrets/airbods_my_domain_com.key
openssl req -noout -modulus -in secrets/airbods_my_domain_com.csr
openssl x509 -noout -modulus -in files/airbods_my_domain_com_cert.cer
Check certificate expiration dates:
openssl x509 -noout -dates -in secrets/webserver.pem
openssl x509 -noout -dates -in files/airbods_my_domain_com_cert.cer
The ITS load balancer airbods-lb.my-domain.com
points to https://airbods.my-domain.com:443.