Creating DE environment 5 - Prefect worfklow management

10 minute read

Introduction

The Prefect is a modern workflow orchestration tool. It provides functionality like scheduling, retries, logging, caching, observability, and notifications in the single, open-source framework. The Prefect is one of the alternatives for Airflow - the most popular workflow orchestration tool. The tools are similar in many areas. Airflow would fill my needs perfectly also. However, there are a few reasons for choosing Prefect. The first one - Prefect seems to be more lightweight. To be honest, Airflow is a big tool, using it for a hobby project would be taking a sledgehammer to crack a nut. The second reason is that Prefect was designed and built with Dask in mind. The mini Dask cluster is my primary environment for data processing. The Prefect has a dedicated DaskTaskRunner that enables flows to run tasks requiring parallel or distributed execution using Dask. Everything should work smoothly. The Prefect has also a shorter learning curve than Airflow. The Prefect task is simple like a standard python function, it just requires a dedicated decorator. The Prefect can be deployed with the standard python package installers, but I will go with Docker. The Prefect has paid, cloud-hosted server and user interface, and the free one. I will self-host the Prefect Orion - open source API and UI, which gives insight into the flows running within the instance and the answer to the question: what’s the health of my system? I will also deploy the database for Orion - PostgreSQL, prefect agent, CLI, and self-hosted S3 storage - MinIO.

Let’s get to work!

Docker compose

The docker-compose.yaml will contain 5 services:

  • prefect-postgres - the database for Orion API,
  • prefect-orion - Prefect Orion API and UI,
  • prefect-agent - poling service that run scheduled flows,
  • prefect-cli - simple environment for running test flows and deployments locally, debugging etc,
  • minio - S3 compatible object storage, for storing the flows.

The whole deployment will be based on the example repository from Prefect Community Engineer.

Dockerfile

To work properly with the remote S3 storage and the Dask cluster two external Python packages - s3fs and prefect-dask should be installed. I think running the pip install from the one Dockerfile and building the service from it in the docker-compose is the most comfortable way.

FROM prefecthq/prefect:2.7-python3.11

RUN pip install s3fs prefect-dask

PostgreSQL

The Prefect Orion database is used for storing data generated by a lot of features of Prefect, for example, flow and task states, history, logs, deployments, and configurations. Orion supports two databases: PostgreSQL and SQLite. My choice is PostgreSQL because I would like to have a fully-containerized environment.

version: "3.8"

networks:
  prefect-network:
    name: prefect-network
  minio-network:
    name: minio-network
  caddy-network:
    external: true
  dask-network:
    external: true

services:
  postgres:
    image: postgres:alpine
    container_name: prefect-postgres
    expose:
      - 5432
    volumes:
      - ./postgres:/data/db
    environment:
      POSTGRES_USER: ${POSTGRES_USERNAME}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_DB: ${POSTGRES_DATABASE}
    networks:
      - prefect-network
    restart: unless-stopped
    healthcheck:
      test:
        [
          "CMD",
          "pg_isready",
          "-q",
          "-d",
          "${POSTGRES_DATABASE}",
          "-U",
          "${POSTGRES_USERNAME}"
        ]
      timeout: 45s
      interval: 10s
      retries: 10
    profiles: [ "orion" ]

The service configuration is almost the same as in the previous posts from the service. The only difference is the usage of the profiles parameter which gives an option for selectively enabling services. It is useful because the prefect-cl service will have to run just in exceptional cases. The prefect-postgres service should always run with the prefect-orion service. The rest of the settings are pretty standard: volumes for data persistence, exposed port, environment variables for sensitive data, network attachment, and ping health check.

Orion

The Prefect Orion is an open-source, locally hosted orchestration engine, API server, and UI. It is the central component of the Prefect environment. It keeps track of the state of the workflows and tasks. It is also possible to perform a lot of management activities from the UI, for example: creating and managing deployments, configuring storages, executing flow runs, creating and managing work queues, analyzing logs, and configuring notifications. It is possible to use paid option - Prefect Cloud, but I think the open-source version is enough for hobby projects.

  orion:
    build: .
    container_name: prefect-orion
    expose:
      - 4200
    volumes:
      - ./prefect:/root/.prefect
    entrypoint:
      [
        "prefect",
        "orion",
        "start",
        "--host",
        "0.0.0.0",
        "--port",
        "4200"
      ]
    environment:
      PREFECT_ORION_API_HOST: https://tasks.brozen.best
      PREFECT_ORION_UI_API_URL: https://tasks.brozen.best/api
      PREFECT_ORION_DATABASE_CONNECTION_URL: ${ORION_DB_CONN_URL}
    networks:
      - prefect-network
      - caddy-network
      - minio-network
      - dask-network
    restart: unless-stopped
    profiles: [ "orion" ]
    depends_on:
      - postgres

The service has port 4200 exposed and the Orion server is starting on that port within the command in the entry point. The important things here are the environment variables: PREFECT_ORION_API_HOST and PREFECT_ORION_UI_API_URL. They should be passed if Prefect will be hosted on the web otherwise the web app will try to make API requests to the local host. The PREFECT_ORION_DATABASE_CONNECTION_URL is the standard credential for communicating with the PostgreSQL database, stored in the .env file.

Agent

Agent process is a lightweight service that picks up the work from the queue and executes the flow. Work queues organize the work for agents. It is quite similar to the pub/sub topics in the message-based systems. Producers and consumers are connected and connected via the topic. In the Prefect deployments and agents are connected via the work queue. A lot of agents can be connected to one work queue and also one agent can fetch the work from multiple work queues. Users can simply change the agent that will execute the deployment by switching the work queue. Besides being crucial for orchestrating the work, it might be useful to run certain flows quicker in a different environment or for debugging and testing. I will create just one agent for the dask-queue work queue, but the agent service can be replicated if necessary.

  agent:
    build: .
    container_name: prefect-agent
    entrypoint: [ "prefect", "agent", "start", "-q", "dask-queue" ]
    environment:
      PREFECT_API_URL: http://prefect-orion:4200/api
    networks:
      - prefect-network
      - dask-network
      - minio-network
    profiles: [ "agent" ]

The agent service will start the agent instantly and will also create the dask-queue work queue. PREFECT_API_URL must be set for the environment in which your agent is running if you want to communicate with the Orion API server from a remote execution environment. It is also important here to pass the container name - prefect-orion in the PREFECT_API_URLvariable. The dask-worker from another machine wouldn’t recognize the URL by passing just the service name from docker-compose file - orion like in the example repository.

CLI

The prefect-cli service will act as an interactive Bash session for building the deployments or running the test flows in the local environment. It will share the flows directory with the host machine. It will be connected with the Orion API but will run only when needed.

  cli:
    build: .
    container_name: prefect-cli
    entrypoint: "bash"
    working_dir: "/root/flows"
    volumes:
      - ./flows:/root/flows
    networks:
      - prefect-network
      - dask-network
      - minio-network
    environment:
      PREFECT_API_URL: http://prefect-orion:4200/api
    profiles: [ "cli" ]

The service can be started with

docker-compose run cli

It is starting the bash session already in the flows directory.

MinIO

The MinIO is an open-source S3-Compatible Object Storage. It will act as remote storage for the Prefect deployments and also will be useful for other data engineering projects as a data lake. Using remote file storage for computation with Dask Runner is also recommended by Prefect. This ensures tasks executing in Dask have access to task result storage, particularly when accessing a Dask instance outside of your execution environment.

  minio:
    image: minio/minio
    container_name: minio
    expose:
      - 9000
      - 9001
    volumes:
      - ./minio:/data
    environment:
      MINIO_ROOT_USER: ${MINIO_ROOT}
      MINIO_ROOT_PASSWORD: ${MINIO_PASSWORD}
      MINIO_BROWSER_REDIRECT_URL: https://storage-console.brozen.best
      MINIO_SERVER_URL: https://storage.brozen.best
    networks:
      - minio-network
      - prefect-network
      - caddy-network
      - dask-network
    entrypoint:
      [
        "minio",
        "server",
        "--address",
        ":9000",
        "--console-address",
        ":9001",
        "/data"
      ]
    profiles: [ "minio" ]

Two ports - 9000 and 9001 should be exposed. The first one is for the API server, and the second one is for the console - a neat UI for managing MinIO. In the environment variables, the MINIO_SERVER_URL is used to specify the proxy-accessible hostname of the MinIO server to allow the Console to use the MinIO server API using the TLS certificate, and the MINIO_BROWSER_REDIRECT_URL is used to redirect the server URL requests to the MinIO console. I would like to have a dedicated host for the storage in the future, as soon as I hunt some cheap servers with a large HDD drive.

Deployment

Deploying the services with Ansible requires copying the needed files - Dockerfile, docker-compose.yaml and .env. It is also possible to copy the whole directory with one task, but I find copying the expected files alone easier to manage because it is impossible to copy the unnecessary thing by accident. The last task just executes the docker-compose --profile orion --profile agent --profile minio up -d command.

# deploy_prefect.yaml
---
- hosts: data_master_nodes
  vars:
    docker_compose_dir: /docker/prefect/

  tasks:
    - name: Copy compose source template
      template:
        src: ./composes/prefect/docker-compose.yaml
        dest: "{{ docker_compose_dir }}"
        mode: 0600

    - name: Copy .env
      template:
        src: ./composes/prefect/.env
        dest: "{{ docker_compose_dir }}"
        mode: 0600

    - name: Copy Dockerfile
      template:
        src: ./composes/prefect/Dockerfile
        dest: "{{ docker_compose_dir }}"
        mode: 0600

    - name: Build prefect
      community.docker.docker_compose:
        project_src: "{{ docker_compose_dir }}"
        profiles: ["orion", "agent", "minio"]
      register: output

It can be executed in the standard way, with the command:

ansible-playbook deploy_prefect.yaml -u charizard

Now let’s add the entries to the Caddyfile. The first one is for the Prefect Orion, behind a reverse proxy and the Authelia (a quite important thing here, this service does not have its authentication and authorization feature).

tasks.brozen.best {
  forward_auth authelia:9091 {
    uri /api/verify?rd=https://whoareyou.brozen.best/
    copy_headers Remote-User Remote-Groups Remote-Name Remote-Email
  }

  reverse_proxy prefect-orion:4200
}

Now the service should be accessible from the web via provided URL.

prefect

The second two entries are for the MinIO Server and console. The MinIO has its authentication and authorization, but I will host the console behind the Authelia also. The server is not behind the Authelia, because the UI wouldn’t be able to communicate with the API. It redirects all access requests to the console anyway.

storage.brozen.best {
  reverse_proxy minio:9000
}

storage-console.brozen.best {
  forward_auth authelia:9091 {
    uri /api/verify?rd=https://whoareyou.brozen.best/
    copy_headers Remote-User Remote-Groups Remote-Name Remote-Email
  }
  reverse_proxy minio:9001
}

The console UI is accessible from both provider URLs.

minio

The MinIO storage should be also connected with the Prefect. It requires creating a dedicated bucket with MinIO, for example, with the name flows and user, for example, prefect-user. Both things can be done fast and easily from the user interface. From the Prefect UI the remote file system block should be created, for example with the name dask-block. The base path should be s3://flows and the settings should contain the following JSON:

{
  "key": "YOUR_MINIO_KEY",
  "secret": "YOUR_MINIO_SECRET",
  "client_kwargs": {
    "endpoint_url": "http://minio:9000"
  }
}

block

Let’s also make a simple test of the whole infrastructure. The first thing to do is create a simple test flow in the flows directory. It is pretty copied from the Prefect documentation.

from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
from prefect.filesystems import RemoteFileSystem

remote_file_system_block = RemoteFileSystem.load("dask-block")

@task
def say_hello(name):
    print(f"hello {name}")

@task
def say_goodbye(name):
    print(f"goodbye {name}")

@flow(task_runner=DaskTaskRunner(address="scheduler:8786"))
def greetings(names):
    for name in names:
        say_hello.submit(name)
        say_goodbye.submit(name)

if __name__ == "__main__":
    greetings(["Wojciech", "Dawid", "Michał", "Adam", "Rafał"])

As you can see it has specified the remote file storage block to use and the flow is using the DaskTaskRunner with the address of the Dask scheduler provided. Now let’s launch the prefect-cli service, which is created to be the testing environment mainly.

docker-compose run cli

The deployment can be created with the following command:

prefect deployment build ./dask-flow.py:greetings --sb "remote-file-system/dask-block" -n "test-deployment" -q "dask-queue"

The -sb flag specifies the file system to use, -n is just the name of the deployment, and -q is the work queue to use. After running it the console should give the following output:

Found flow 'greetings'
Deployment YAML created at '/root/flows/greetings-deployment.yaml'.
Successfully uploaded 2 files to s3://flows/

The flow and deployment files should be now in the MinIO bucket. To apply the deployment run:

prefect deployment apply greetings-deployment.yaml

The deployment can be launched manually from the UI. Not only in this way, but it is just a testing case only, a lot of more sophisticated options can be specified in the deployment, for example, scheduling. The logs from the flow should look similar to this:

Created task run 'say_goodbye-0c32b902-24' for task 'say_goodbye'

Submitted task run 'say_goodbye-0c32b902-24' for execution.

Finished in state Completed()

It works! 🐼

Conclusion

Deploying the Prefect in the connection with the existing Dask cluster and Caddy web server was not the easiest task. It required broadening my knowledge for me so I am satisfied. I hope it will work well for my future projects. It was the last post from the Data Engineering environment series. I guess it was a nice journey. I will expand my server architecture and if I find something interesting to describe I will write a “spin-off post”, but for now I will back to the data engineering or data analytics posts and projects. Wish me luck! Cya!

References

  1. https://stories.dask.org/en/latest/prefect-workflows.html#how-dask-helps
  2. https://github.com/PrefectHQ/prefect-dask
  3. https://docs.prefect.io/concepts/work-queues/
  4. https://docs.prefect.io/tutorials/dask-ray-task-runners/
  5. https://github.com/PrefectHQ/prefect/issues/5648
  6. https://medium.com/the-prefect-blog/how-to-self-host-prefect-orion-with-postgres-using-docker-compose-631c41ab8a9f
  7. https://hub.docker.com/r/minio/minio/