F1 Analytics - a simple web app powered by data

Introduction

My first python project after the simple and local console ones was the F1 Analytics - a simple site for Formula 1 historical statistics overview. The project was conducted around one and a half years ago, so it is pretty old and I think it needs a rework. There are various arguments for why it should be upgraded. The site is hosted on Heroku - the third-party platform. The free tier is not so generous, for example, the site falls asleep after 30 minutes without the traffic. I have the private space, so I can host it on my own. The data is stored in Google BigQuery. It is an awesome warehouse, but for such a small database it has more risks than benefits. In one word - it is overkill. I can store the data on my own also. The site is connected directly to the database which also has a lot of drawbacks. Now the data processing will be part of the pipeline and the site will fetch already transformed data from the handy API. The front end also needs to be updated. There were some changes in Formula 1, for example, the points from sprint should be counted also. The project is also quite old and was one of my first coding challenges, so the code is pretty messy. The last thing is just vanity. I do not have any idea for the next project and blog post. Tbh I am quite tired and already waiting for the serious holidays. I am also currently spending a lot of time improving my theoretical knowledge, for example by reading things (I would recommend this amazing book). Refactoring the existing project seems like a fantastic idea for the ’lazy project'.

General architecture

The Ergast provides provide the data in the API and also a database images or CSV files form. This project uses files mainly because I have already created the project about the Ergast API. The second thing is that Ergast provides the data mostly for analytical purposes. It would be quite unfair to use their servers for my web app. To obtain the data files and store them in the SQLite3 database the bash script is running every 12 AM/PM (scheduled with CRON). It also checks if the data is updated, if so it sends the POST request to the webhook, which transforms the required data and upserts it into MongoDB. The data in the optimal form is served via API and accessed by a web app created with Dash. The WSGI Server used is the Gunicorn with 2 workers and 2 threads. The whole project is containerized using Docker with docker-compose. The web server is Caddy, but it won’t be shown in the repository, because the project should be ready to reproduce locally. You can take a look at the architecture in the picture below:

architecture

At the end of the project the directory tree should look like this:

f1_analytics
|   .gitignore
|   .env
|   docker-compose.yaml
|	mongo-init.sh
|___etl
|   |___cron_part
|   |   |   checker.sh
|   |   |   entrypoint,sh
|   |   |   cronjobs
|   |   |   Dockerfile
|   |___webhook_part  
|   |	|	webhook.py
|   |	|	transform_load.py
|   |	|	utils.py
|   |	|	config.py
|	|	|	requirements.txt
|	|	|	Dockerfile
|___app
|	|___api
|	|	|	server.py
|	|	|	config.py
|	|	|	gunicorn_config.py
|	|	|	entrypoint.sh
|	|	|	requirements.txt
|	|	|	Dockerfile
|	|___web
|	|	|	app.py	
|	|	|	index.py
|	|	|	components.py
|	|	|	gunicorn_config.py
|	|	|	entrypoint.sh
|	|	|	requirements.txt
|	|	|	Dockerfile
|	|	|___tabs
|	|	|	|	drivers.py
|	|	|	|	incidents.py
|	|	|	|	races.py
|	|	|	|	seasons.py
|	|	|	|	sidepanel.py

Data Pipeline

Cron part

Shell script

For the first part of the project, I decided to use shell scripting. I do not think it is the best choice, I could code it in python much faster and better. However, I didn’t show that I can write the scripts in bash on this blog yet. The first thing to do obtaining the page with curl and find the last update date with grep:

#!/bin/bash

# get the website with database images
URL=$(curl -sS 'http://ergast.com/mrd/db/')

# find the last update date with regex
CURRENT_DATE=$(echo "$URL" | grep -o -P '(?<=last updated on: ).*?(?=</p>)')

-o flag for showing matches only and -P flag for Perl regular expressions usage.

The obtained variable should be compared with the last saved date to tell the script to know if it should perform actions. For the initial load it doesn’t exist, so let’s create it in the condition block, then read it to the variable:

# condition creating a txt just for the initial load
if [ ! -f ./date.txt ]; then
    echo 'initial load'> ./date.txt
fi 

# read last saved date
LAST_DATE=$(cat ./date.txt)

The whole action starts now. SQLite utility cannot a database in the non-existing directory or I do not know how to do this, so before the fireworks, it should be created:

# create a directory for database (and docker volume) if not exists
mkdir -p db

Everything is ready to create a condition block that will perform a bunch of tasks if the data on the Ergast website is updated. The first thing is downloading the archive, decompressing it, and removing the zip.

if [ "$CURRENT_DATE" != "$LAST_DATE" ]; then
    curl -sS 'http://ergast.com/downloads/f1db_csv.zip' --output f1db_csv.zip && unzip -qq -o f1db_csv.zip -d ./csvs && 
    rm f1db_csv.zip && # download the archive, decompress it and remove zip

-sS and -qqflags are used for silent modes, -o for overwriting the files without prompting.

The next thing is inserting the CSVs into the SQLite database. It can be done in the simple for loop:

    for f in ./csvs/*.csv;
    do
        sqlite3 ./db/f1db.db ".mode csv" ".import $f $(basename $f .csv)" ".exit"
    done &&

The CSV files can be recursively removed:

    rm -r ./csvs && # remove directory with csvs

The date in the text file should be replaced:

    echo "$CURRENT_DATE" > ./date.txt && # replace the date stored in the text file

Now, it is time to send a POST request to the webhook service, also with the curl:

    curl -sS -X POST -H "Content-Type: application/json" -d '{"message": "ready to serve"}' http://f1analytics-webhook:5010/webhook && # send a post request to the web hook

The URL contains the name of the docker container with its port.

At the end of the condition block let’s log the update or no change:

    echo "$(date "+%m-%d-%Y %T") : Data updated" # log the update message
else
    echo "$(date "+%m-%d-%Y %T") : No change" # log the no change message
fi

Cronjobs

Quick part, just set up the cron to run every 12 AM/PM and log output to the file.

0 0,12 * * * cd /etl && sh checker.sh >> log.txt 2>&1

Entrypoint

Here is a little cheat. I would like to run the script immediately, so I will execute it and then run the cron:

#!/bin/bash
sh /etl/checker.sh >> log.txt 2>&1 && cron -f 

-f flag for running the cron in the foreground. Normally it would run in the background, which could be dangerous (for example the failure of the process would be unnoticed).

Dockerfile

Creating the Dockerfile for the one cron scheduled shell script is not the hardest task. The distro image I usually use is the debian:bullseye-slim, it meets my needs perfectly. The following Dockerfile installs curl cron unzip and sqlite3 packages because they are used in the shell script. Both .sh files are copied and granted with executable permission. Next, the cronjobs are copied into the cron.d directory, granted with the proper permissions and added to the crontab. Entrypoint just executes the previously defined command.

FROM debian:bullseye-slim

RUN apt-get update && apt-get install curl cron unzip sqlite3 -y

WORKDIR /etl

COPY checker.sh entrypoint.sh ./

RUN chmod +x checker.sh entrypoint.sh

COPY cronjobs /etc/cron.d/cronjobs

RUN chmod 0644 /etc/cron.d/cronjobs

RUN crontab /etc/cron.d/cronjobs

ENTRYPOINT ["sh", "entrypoint.sh"]

Webhook part

Webhook

Webhook is an endpoint that is triggered when some event occurs. In the data world, webhooks are the easiest form of the subscription model. In my project, the webhook gets notified when the new data is successfully loaded into the SQLite database. It makes transformations using SQL and loads the data in the ready-to-serve for web app form into MongoDB.

import pymongo
from flask import Flask, request

from utils import mongo_config, bulk_insert

from data_wrangle import (
    get_drivers_performance,
    get_drivers_season_data,
    get_constructors_season_data,
    get_races_data,
    get_status_data,
    get_seasons,
    get_drivers,
)

app = Flask(__name__)


@app.route("/webhook", methods=["POST"])
def webhook():
    if request.json["message"] == "ready to serve":
        # get and insert seasons data
        get_seasons()
        # get and insert drivers data
        get_drivers()
        # get and insert drivers performance data
        get_drivers_performance()
        # get and insert drivers seasons data
        get_drivers_season_data()
        # get and insert constructors seasons data
        get_constructors_season_data()
        # get and insert races data
        get_races_data()
        # get and insert incidents data
        get_status_data()

        return "OK\n"

app.run(host="0.0.0.0", port=5010)

The only thing to notice here is specifying the POST method in the route. All of the functions will be described in the rest of this part.

Transform and load

The utils file contains the functions for connecting with databases and upsert into MongoDB collection (update if exists else insert):

import sqlite3

import pymongo

from config import mongo_access

# set connection with sqlite db
def connect_db():
    return sqlite3.connect(r"./db/f1db.db")


# configure mongodb client
def mongo_config():
    client = pymongo.MongoClient(mongo_access)
    db = client[db_name]
    return db


# helper for repeatable inserting
def mongo_upsert(data, collection):
    data = dict(data)
    collection.update_one({"_id": data["_id"]}, {"$set": data}, upsert=True)

Data wrangling for this project contains just simple SQL aggregation. Then I am loading the obtained data into MongoDB in the loop using previously created function. Doing this in this step means that the data is transformed once and the web app will just fetch already transformed data. Previously, processing was done on the application side, which was a terrible design.

import sqlite3

from utils import connect_db, mongo_upsert, mongo_config

# Construct the data for "Grand Prix" page and load it into the races collection
def get_races_data():
    conn = connect_db()
    db = mongo_config()
    conn.row_factory = sqlite3.Row
    cur = conn.cursor()
    cur.execute(
        """
    SELECT ra.raceId AS _id, ra.year AS Season, ra.date AS Date, ra.name AS 'Grand Prix',
    d.forename || d.surname AS Winner, c.name AS Constructor,
    ct.name AS Circuit, ct.lat, ct.lng, ct.location AS Location
    FROM drivers AS d
    LEFT JOIN results AS r
    ON d.driverId = r.driverId
    LEFT JOIN races AS ra
    ON r.raceId = ra.raceId
    LEFT JOIN constructors AS c
    ON r.constructorId = c.constructorId
    LEFT JOIN circuits AS ct
    ON ra.circuitId = ct.circuitId
    WHERE r.position = 1
    ORDER BY ra.date
    """
    )

    for row in cur:
        mongo_upsert(row, db.races)

    conn.close()


# Construct the data for "Incidents" page and load it into incidents collection
def get_status_data():
    conn = connect_db()
    db = mongo_config()
    conn.row_factory = sqlite3.Row
    cur = conn.cursor()
    cur.execute(
        """
    SELECT ct.circuitId || ra.year || d.driverId AS _id, ra.year AS Season, ra.date AS Date, ct.name AS Circuit, 
    d.forename || ' ' || d.surname AS Driver, c.name AS Constructor, st.status AS Incident
    FROM drivers AS d
    LEFT JOIN results AS r
    ON d.driverId = r.driverId
    LEFT JOIN races AS ra
    ON r.raceId = ra.raceId
    LEFT JOIN constructors AS c
    ON r.constructorId = c.constructorId
    LEFT JOIN circuits AS ct
    ON ra.circuitId = ct.circuitId
    LEFT JOIN status as st
    ON r.statusId = st.statusId
    WHERE st.status NOT LIKE '%Lap%' AND st.status != 'Finished'
    ORDER BY ra.date
    """
    )

    for x in cur:
        mongo_upsert(x, db.incidents)

    conn.close()


# Construct the data for "Drivers" page and load it into drivers_performance collection
def get_drivers_performance():
    conn = connect_db()
    db = mongo_config()
    conn.row_factory = sqlite3.Row
    cur = conn.cursor()
    cur.execute(
        """
    SELECT d.driverId || ra.raceId || ra.year || d.surname AS _id, d.forename || ' ' || d.surname AS Driver, ct.name AS Circuit,
    c.name AS Constructor, r.position as Position,
    d_st.points AS Points, ra.round AS Round, ra.year AS Season,
    r.grid AS Grid
    FROM drivers AS d
    LEFT JOIN results AS r
    ON d.driverId = r.driverId
    LEFT JOIN races AS ra
    ON r.raceId = ra.RaceId
    LEFT JOIN constructors AS c
    ON r.constructorId = c.constructorId
    LEFT JOIN circuits AS ct
    ON ra.circuitId = ct.circuitId
    LEFT JOIN driver_standings AS d_st
    ON d.driverId = d_st.driverId AND ra.raceId = d_st.raceId
    ORDER BY ra.year 
    """
    )

    for x in cur:
        mongo_upsert(x, db.drivers_performance)

    conn.close()


# Construct the data for "Seasons" page and loat it into drivers_seasons collection
def get_drivers_season_data():
    conn = connect_db()
    db = mongo_config()
    conn.row_factory = sqlite3.Row
    cur = conn.cursor()
    cur.execute(
        """
        WITH podiums AS (
            SELECT d.driverId, ra.year, 
			COUNT(DISTINCT r.raceId) AS Podiums
            FROM drivers AS d
            LEFT JOIN results AS r
            ON d.driverId = r.driverId
            LEFT JOIN races AS ra
            ON r.raceId = ra.raceId
            WHERE r.position IN (1, 2, 3)
            GROUP BY 1, 2
            )
        SELECT d.driverId || ra.year AS _id, d.forename || ' ' || d.surname AS Driver, 
        d.nationality AS Nationality, ra.year AS Season,
        MAX(CAST(d_st.points AS FLOAT)) AS Points,
        p.Podiums
        FROM drivers AS d
        LEFT JOIN driver_standings AS d_st
        ON d.driverId = d_st.driverId
        LEFT JOIN races AS ra
        ON d_st.raceId = ra.raceId
        LEFT JOIN podiums AS p
        ON d.driverId = p.driverId AND ra.year = p.year
        GROUP BY 1, 2, 3, 4
        ORDER BY 5 DESC
    """
    )

    for x in cur:
        mongo_upsert(x, db.drivers_seasons)

    conn.close()


# Construct the data for "Seasons" page and load it into constructors_seasons collection
def get_constructors_season_data():
    conn = connect_db()
    db = mongo_config()
    conn.row_factory = sqlite3.Row
    cur = conn.cursor()
    cur.execute(
        """
        WITH podiums AS (
            SELECT c.constructorId, ra.year, 
			COUNT(DISTINCT r.raceId) AS Podiums
            FROM constructors AS c
            LEFT JOIN results AS r
            ON c.constructorId = r.constructorId
            LEFT JOIN races AS ra
            ON r.raceId = ra.raceId
            WHERE r.position IN (1, 2, 3)
            GROUP BY 1, 2
            )
        SELECT c.constructorId || ra.year AS _id, c.name AS Constructor, 
        c.nationality AS Nationality, ra.year AS Season,
        MAX(CAST(c_st.points AS FLOAT)) AS Points,
        p.Podiums
        FROM constructors AS c
        LEFT JOIN constructor_standings AS c_st
        ON c.constructorId = c_st.constructorId
        LEFT JOIN races AS ra
        ON c_st.raceId = ra.raceId
        LEFT JOIN podiums AS p
        ON c.constructorId = p.constructorId AND ra.year = p.year
        GROUP BY 1, 2, 3, 4
        ORDER BY 5 DESC
    """
    )

    for x in cur:
        mongo_upsert(x, db.constructors_seasons)

    conn.close()


# Get max year
def get_seasons():
    conn = connect_db()
    db = mongo_config()
    conn.row_factory = sqlite3.Row
    cur = conn.cursor()
    cur.execute(
        """
        SELECT DISTINCT year AS _id, year AS Season
        FROM races
        ORDER BY 1 DESC
    """
    )

    for x in cur:
        mongo_upsert(x, db.seasons)

    conn.close()


# Get drivers list
def get_drivers():
    conn = connect_db()
    db = mongo_config()
    conn.row_factory = sqlite3.Row
    cur = conn.cursor()
    cur.execute(
        """
        SELECT DISTINCT driverId AS _id, driver FROM (
        SELECT driverId, forename || ' ' || surname AS driver
        FROM drivers
        ) AS sub
        ORDER BY 2
    """
    )

    for x in cur:
        mongo_upsert(x, db.drivers)

    conn.close()

As you can see I am using the sqlite3.Row type. The reason is easier to convert the results to the dictionary for loading into the document-type database.

Dockerfile

Easy peasy step, just running the flask app. I am not using any production server here, because it won’t be ever online and always be responsible for processing one certain request at the specified time.

FROM python:3.10-slim-bullseye

WORKDIR /etl

COPY requirements.txt .

RUN pip install -r requirements.txt

COPY config.py utils.py transform_load.py webhook.py ./

ENTRYPOINT ["python", "webhook.py"]

Web

API

Server

It is time to create an endpoint for every previously created table in MongoDB. Serving the whole table at once wouldn’t be reasonable, tables are quite big. Breaking them by year or driver is a good idea because the dashboards will have such a filter:

from flask import Flask, jsonify
from pymongo import MongoClient

from config import mongo_access

app = Flask(__name__)


client = MongoClient(mongo_access)
db = client[db_name]


# create endpoint for seasons data
@app.route("/api/seasons", methods=["GET"])
def seasons():
    table = db.seasons
    return jsonify(list(table.find({}, {"_id": False})))


# create endpoint for drivers data
@app.route("/api/drivers", methods=["GET"])
def drivers():
    table = db.drivers
    return jsonify(list(table.find({}, {"_id": False})))


# create endpoint for drivers seasons data
@app.route("/api/drivers_seasons/<string:season>", methods=["GET"])
def drivers_seasons(season):
    table = db.drivers_seasons
    return jsonify(list(table.find({"Season": season}, {"_id": False})))


# create endpoint for constructors seasons data
@app.route("/api/constructors_seasons/<string:season>", methods=["GET"])
def constructors_seasons(season):
    table = db.constructors_seasons
    return jsonify(list(table.find({"Season": season}, {"_id": False})))


# create endpoint for races data
@app.route("/api/races/<string:season>", methods=["GET"])
def races(season):
    table = db.races
    return jsonify(list(table.find({"Season": season}, {"_id": False})))


# create endpoint for incidents data
@app.route("/api/incidents/<string:season>", methods=["GET"])
def incidents(season):
    table = db.incidents
    return jsonify(list(table.find({"Season": season}, {"_id": False})))


# create endpoint for drivers performance data
@app.route("/api/drivers_performance/<string:driver>", methods=["GET"])
def drivers_performance(driver):
    table = db.drivers_performance
    return jsonify(list(table.find({"Driver": driver}, {"_id": False})))


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5011)

As you can see I am ignoring the _id field. It was required to make the aggregation properly in the previous step. Now it would make a mess in the front end step (remember it is created almost two years ago - I do not want to spend a lot of time reconfiguring it).

WSGI

I will run this app in the production environment. That’s why the web part needs WSGI. I can tell that WSGI is required to handle the workload if multiple users are playing with the application. In simple words - the application won’t be extremely slow. You can read more about it here. The first thing to do is to create a config file:

bind = "0.0.0.0:5011"
workers = 2
threads = 2
timeout = 120

Bind parameter is the socket to bind, workers parameter is the number of worker processes for handling requests, threads parameter is the number of worker threads for handling requests, timeout parameter means that workers are silent for more than this many seconds are killed and restarted.

The app can be run with the created config directly from the Dockerfile, but it is always more readable to create the entry point. It is simply one line:

#!/bin/bash
gunicorn --config ./gunicorn_config.py server:app

Dockerfile

Dockerfile is simple as always, the only difference is that the entry point is running the external file:

FROM python:3.10-slim-bullseye

WORKDIR /api

COPY requirements.txt .

RUN pip install -r requirements.txt

COPY config.py server.py gunicorn_config.py entrypoint.sh ./

RUN chmod +x ./entrypoint.sh

ENTRYPOINT ["sh", "entrypoint.sh"]

App

Dash app

I won’t describe the whole frontend part of the application. It is not so small and I created it almost 2 years ago. You can check the code in the repository linked in the conclusion. One thing I would like to mention is the usage of pandas. I can wrap the JSON in the pandas’ data frames with a little cost because it is a Dash framework dependency anyway. It makes life a lot easier because Dash is designed to cooperate with pandas.

WSGI

It is pretty much the same as for the API server. Entrypoint is a bit different. It has a sleep command with 70 seconds value. It is designed for the initial load, so the app won’t be started if the API is not serving any data. It is a very trivial solution and could be done better I think.

#!/bin/bash
sleep 70
gunicorn --config ./gunicorn_config.py index:server

Dockerfile

It is the same like in the API part:

FROM python:3.10-slim-bullseye

WORKDIR /web

COPY requirements.txt .

RUN pip install -r requirements.txt

COPY tabs tabs/

COPY app.py components.py index.py gunicorn_config.py entrypoint.sh ./

RUN chmod +x ./entrypoint.sh

ENTRYPOINT ["sh", "entrypoint.sh"]

Compose

The docker-compose is the hearth of the project:

version: '3.8'
services:
  cron:
    build: ./etl/cron_part
    container_name: f1analytics-cron
    volumes:
      - sqlite-data:/etl/db
    depends_on:
      - "webhook"
  webhook:
    build: ./etl/webhook_part
    container_name: f1analytics-webhook
    ports:
      - 5010:5010
    environment:
      - MONGO_ACCESS=${MONGO_ACCESS}
      - DB_NAME=${DB_NAME}
    volumes:
      - sqlite-data:/etl/db
    depends_on:
      - "mongodb"
  api:
    build: ./app/api
    container_name: f1analytics-api
    ports:
      - 5011:5011
    environment:
      - MONGO_ACCESS=${MONGO_ACCESS}
      - DB_NAME=${DB_NAME}
    depends_on:
      - "mongodb"
  app:
    build: ./app/web
    container_name: f1analytics-app
    ports:
      - 5012:5012
    depends_on:
      - "api"
    restart: unless-stopped
  mongodb:
    image: mongo
    container_name: mongodb
    environment:
      - MONGO_INITDB_ROOT_USERNAME=${ROOT_NAME}
      - MONGO_INITDB_ROOT_PASSWORD=${ROOT_PASSWORD}
      - DB_NAME=${DB_NAME}
      - DB_USER=${DB_USER}
      - DB_PASSWORD=${DB_PASSWORD}
    ports:
      - 27017:27017
    volumes:
      - mongodb-data:/data/db
      - ./mongo-init.sh:/docker-entrypoint-initdb.d/mongo-init.sh
    restart: unless-stopped

volumes:
  mongodb-data:
  sqlite-data:

It builds all of the parts coded previously. It has two volumes - for databases. MongoDB is running first, then webhook and API, then cron and app at the end. Environmental variables are passed from the .env file. As you can see the MongoDB is initiated from the external file:

mongo --eval "db.auth('$MONGO_INITDB_ROOT_USERNAME', '$MONGO_INITDB_ROOT_PASSWORD'); db = db.getSiblingDB('$DB_NAME'); db.createUser({ user: '$DB_USER', pwd: '$DB_PASSWORD', roles: [{ role: 'readWrite', db: '$DB_NAME' }] });"

It creates a new database for the data and a user with reading and writing permissions for this database. The other way for managing MongoDB from external scripts is via javascript, but I prefer shell script, because of the ability to use the environment variables.

Now the project can be run with:

docker-compose up -d

But be careful, the repository is not supposed to be run in production. If you would like to, please take care of the security!

seasons

gp

drivers

incidents

Conclusion

The project was more interesting than I thought. I am happy that I could create it in a few days, including writing a blog post (the hardest thing). You can check the repo here. I plan that the next project will include the streaming tool, probably Apache Kafka. Tbh I would like to create a data engineering project that will contain various tools. Wish me luck! Cya!

References

  1. https://blog.logrocket.com/receive-webhooks-python-flask-or-django/
  2. https://lovethepenguin.com/sqlite3-how-to-import-csv-files-to-tables-from-bash-3bca7daedd31
  3. https://www.airplane.dev/blog/docker-cron-jobs-how-to-run-cron-inside-containers
  4. https://blog.thesparktree.com/cron-in-docker
  5. https://stackoverflow.com/questions/42912755/how-to-create-a-db-for-mongodb-container-on-start-up
  6. https://blog.entirely.digital/docker-gunicorn-and-flask/