Warsaw Open Data - building a simple asynchronous API wrapper

13 minute read

Introduction

Warsaw is the capital of Poland - the country I live in. The City Hall created an open data platform with a lot of valuable information. To be honest it is an amazing job and such a unique thing on a national scale. However, the website and documentation are not so good. Their readability is low, and the English version does not exist. I would like to make some data projects using the Warsaw Open Data, so I decided to build a simple wrapper to make it easier. It will be asynchronous because there is some real-time data and objects are sometimes quite big. I will use it mostly with streaming data using Kafka etc, so it makes sense to do it asynchronously. I also already have one blocking I/O API wrapper in my portfolio, so I would rather not repeat it. In the blog post, I won’t show the whole code of the package, because there are a lot of endpoints to cover. The desired assumption of the project is to be:

  • Asynchronous: designed to work with asyncio
  • Extensive: cover as many endpoints as possible
  • Approachable: all polish fields translated to English
  • Responsible: minimize the load on the API server
    • implemented caching
  • Simple: easy to use and customize:
    • object-oriented design
    • use of data transfer objects
    • type hinting
    • convert adequate fields from strings to the more suitable types
  • Lightweight: minimal usage of the third-party packages

The list is quite general but I guess it is fine for illustrative purposes. The post will be rather concise. I just want to quickly show you the whole work process.

Let’s get to work!

pywarsaw

Dependencies

Let’s start the project with poetry new pywarsaw command. It will create the basic directory structure.

Now add the external packages that will be used in the library.

poetry add "attrs>=22.2.0" aiohttp aiohttp-client-cache aiosqlite

The development dependencies should be also added:

poetry add pytest aioresponses pytest-asyncio Sphinx spinx-rtd-theme

The purpose of every package will be described in further sections.

DTOs

A data transfer object is a structure mostly used for carrying data between processes. A DTO provides easy serialization, validation, readability, etc. Python has built-in DTOs - data classes. I use the external package - attrs because I know it well and it provides the converters feature, which I really appreciate. It helps with converting types in a compatible with static type checkers way.

The first DTO I would like to create is a parent class. It has 4 methods for converting an object to other popular structures:

@define
class DTO:
    """Parent class for all DTOs"""

    def to_dict(self) -> dict:
        """Convert object to a dictionary"""
        return asdict(self)

    def to_flat_dict(self) -> dict:
        """Convert object to a flat dictionary"""
        d = asdict(self)
        return flat_dict(d)

    def to_tuple(self) -> tuple:
        """Convert object to a tuple"""
        return astuple(self)

    def to_json(self) -> str:
        """Convert object to a json string"""
        return JSONEncoder().encode(self)

The custom JSON encoder that can deal with DTOs and datetime objects is defined like this:

class JSONEncoder(json.JSONEncoder):
    """Custom json encoder to deal with dates, times and DTOs"""

    def default(self, obj):
        if isinstance(obj, (datetime.date, datetime.time)):
            return obj.isoformat()
        elif isinstance(obj, DTO):
            return obj.to_dict()
        return json.JSONEncoder.default(self, obj)

The function used to flat the dictionary is created this way:

def flat_dict(d, parent_key="", sep="_", index=None):
    items = []
    for k, v in d.items():
        new_key = parent_key + sep + k if parent_key else k
        if index is not None:
            new_key = "{}_{}".format(new_key, index)
        if isinstance(v, dict):
            items.extend(flat_dict(v, new_key, sep=sep).items())
        elif isinstance(v, list):
            for i, item in enumerate(v):
                if isinstance(item, dict):
                    items.extend(flat_dict(item, new_key, sep=sep, index=i).items())
                else:
                    items.append((new_key, item))
        else:
            items.append((new_key, v))
    return dict(items)

Now let’s take a look at an example target object.

@define
class CycleStation(DTO):
    """The bicycle station.
    The following information is available:
        * racks: count of the racks
        * update_date: date of the data update
        * object_id: id of the object
        * location: location
        * bikes: count of the bikes
        * station_number: station number
    """

    racks: int = field(converter=int)
    update_date: datetime.datetime = field(converter=to_datetime_with_12)
    object_id: str
    location: str
    bikes: int = field(converter=int)
    station_number: str

As you can see it has converters. Basic types like int or float have built-in converters. More complicated ones can be created manually, for example:

def to_datetime_with_12(f: str) -> datetime.datetime:
    return datetime.datetime.strptime(f, "%d-%b-%y %H.%M.%S.%f %p") if f else f

It casts a string with microseconds and AM/PM to a datetime object.

I don’t think there is a reason to show all of the DTOs. They are all really similar - they use the converters when there is a need, and some fields can have more than one type (mostly None).

Exceptions

The exceptions.py contains all of the exceptions the package can rise. I have created just four, three of them are for internal API errors.

class WrongQueryParameters(Exception):
    """Raised when the query parameters are wrong."""

    def __str__(self):
        return "Wrong query parameters."


class UnauthorizedAccess(Exception):
    """Raised when API key is not provided."""

    def __str__(self):
        return "API key is not provided"


class WrongAPIKey(Exception):
    """Raised when provided API key is wrong."""

    def __str__(self):
        return "Wrong API key."


class WrongDirectory(Exception):
    """Raised when the directory for cache does not exist."""

    def __str__(self):
        return "The provided directory for a cache does not exist."

Client

A client is a class for making asynchronous HTTP requests with optional caching. The main class with methods for all endpoints - pywarsaw will inherit from it. The initialization attributes of the class are indicating if the caching is enabled, the backend for it, and the session - standard aiohttp or the cached one.

class Client:
    """A client for making async HTTP requests with optional caching.
    Args:
        session (Union[ClientSession, CachedSession]): The session to use for making requests.
        backend (Union[None, SQLiteBackend]): The backend to use for caching.
        enabled (bool): Whether caching is enabled or not.
    """

    def __init__(
        self,
        session: Union[None, ClientSession, CachedSession] = None,
        backend: Union[None, SQLiteBackend] = None,
        enabled: bool = False,
    ):
        self._session = session
        self._backend = backend
        self._enabled = enabled

Next there are two dunder methods:

    async def __aenter__(self):
        return self

    async def __aexit__(self, *args):
        await self.close()

The __aenter__ and __aexit__ are special methods in Python’s asynchronous context manager protocol. They are used to define objects that can be used with the async with statement. The __aenter__ method is called when an object is entered in an asynchronous context. The __aexit__ method is called when an object is exited from an asynchronous context, either through normal execution or because of an exception. For example, the client can be used like this:

async with Client() as c:
	await c.cache_enable()
	result = await c._get('http://example.com/api')

I have also the static method for creating a path for the cache.

    @staticmethod
    def path_create(path: str) -> str:
        """Create a cache path.
        Args:
            path (str): Directory for a cache file.
        Returns:
            str: Concatenated path.
        Raises:
            WrongDirectory: The provider directory for a cache does not exist.
        """
        if not os.path.exists(path):
            raise WrongDirectory()
        return os.path.join(path, "pywarsaw_cache")

Two methods are responsible for enabling and disabling the caching. The path, expiration time, or manual cache clearing can be set from the arguments of the cache_enable method.

    async def cache_enable(
        self,
        path: str = os.getcwd(),
        expire_after: Union[
            None, int, float, str, datetime.datetime, datetime.timedelta
        ] = 3600,
        force_clear: bool = False,
        clear_expired: bool = False,
    ) -> None:
        """Enable caching using the SQLite backend.
        Args:
            path (str): Directory for the cache file. Defaults to current workind directory.
            expire_after (Union[None, int, float, str, datetime.datetime, datetime.timedelta]):
                Expiration time for the cache. Defaults to 3600.
            force_clear (bool): Clear the cache upon initialization. Defaults to False.
            clear_expired (bool): Clear expired cache upon initialization. Defaults to False.
        """

        self._path = path
        self._enabled = True

        self._backend = SQLiteBackend(
            cache_name=Client.path_create(path),
            expire_after=expire_after,
            allowed_methods=("GET"),
            include_headers=True,
        )

        self._session = CachedSession(cache=self._backend)

        if force_clear:
            await self._session.cache.clear()

        if clear_expired:
            await self._session.cache.delete_expired_responses()

    async def cache_disable(self) -> None:
        """Disable caching"""
        self._enabled = False
        self._session = ClientSession()
        self._backend = None

The _get returns a jsonified response.

    async def _get(self, url: str) -> dict:
        """Make an asynchronous HTTP GET request to the provided URL.
        Args:
            url (str): The URL to make the GET request to.
        Returns:
            dict: The JSON response of the GET request.
        Raises:
            HTTPError: If the response status is 400 or higher.
        """
        async with self._session.get(url) as response:
            return await response.json()

    async def close(self) -> None:
        """Close the session and releases all resources held by the session."""
        await self._session.close()

The close method just closes the session. It is useful if someone does not use the context manager.

    async def close(self) -> None:
        """Close the session and releases all resources held by the session."""
        await self._session.close()

Main

It is part of the package that the user will work with. The class with methods for obtaining the data from the API is named Mermaid - it is a symbol of Warsaw. As you can notice it inherits from the Client class.

class Mermaid(Client):
    """A client for interacting with Warsaw Open Data API.
    Args:
        api_key (str): The API key to use for authentication.
            Defaults to None.
    """

    def __init__(
        self,
        api_key: str = None,
    ):
        super().__init__()

I have here the static helper method for easy building the URLs from the provided arguments.

    @staticmethod
    def _build_url(endpoint, **kwargs) -> str:
        """Builds a url for a specified endpoint using provided parameters.
        Args:
           endpoint (str): The endpoint for the API call.
            **kwargs: Additional parameters to include in the url.
        Returns:
            str: The built url.
        """
        base_url = "https://api.um.warszawa.pl/api/action/"

        url = (
            base_url
            + endpoint
            + "?"
            + "&".join(
                f"{key}={item}" for key, item in kwargs.items() if item is not None
            )
        )
        return url

The _get_data method is just a _get method from the client, but with the ability to raise exceptions connected with the Warsaw Open Data API responses.

    async def _get_data(self, url: str) -> dict:
        """Retrieves data from a specified url using an asynchronous HTTP GET request.
        Args:
            url (str): The url to retrieve data from.
        Returns:
            dict: The data retrieved from the url.
        Raises:
            HTTPError: If the GET request returns an HTTP error status code.
            WrongQueryParameters: If the query parameters are wrong.
            WrongAPIKey: If the provided API key is wrong.
            UnauthorizedAccess: If API key is not provided.
        """

        res = await super()._get(url)

        # handle exceptions
        if res["result"] in (
            "Błędna metoda lub parametry wywołania",
            "Wfs error: IllegalArgumentException: FeatureMember list is empty",
        ):
            await super().close()
            raise WrongQueryParameters()

        if res["result"] == "false" and res["error"] == "Błędny apikey lub jego brak":
            await super().close()
            raise WrongAPIKey()
        if (
            res["result"] == "false"
            and res["error"] == "Nieautoryzowany dostęp do danych"
        ):
            await super().close()
            raise UnauthorizedAccess()

        return res

I will show just one method for retrieving data from a certain endpoint. This API has a lot of different data, but every method is pretty similar. It requires more or less complicated response processing and matching the fields with the chosen data transfer object.


    async def get_line_timetable(
        self, busstop_id: Union[int, str], busstop_number: str, line: Union[int, str]
    ):
        """Retrieves a list of the line timetable for the provided public transport stop and line.
        Args:
            busstop_id (int, str): Public transport stop id
            busstop_nubmer (str): Stop bar ID (eg. 01, 02)
            line: Public transport line number
        Returns:
            list[LineTimetable]: A list of line timetable objects.
        """
        url = Mermaid._build_url(
            endpoint="dbtimetable_get",
            id="e923fa0e-d96c-43f9-ae6e-60518c9f3238",
            busstopId=busstop_id,
            busstopNr=busstop_number,
            line=line,
            apikey=self.api_key,
        )

        response = await self._get_data(url)
        response = [x["values"] for x in response["result"]]
        response = [{y["key"]: y["value"] for y in x} for x in response]

        return [
            LineTimetable(
                brigade=x["brygada"],
                direction=x["kierunek"],
                route=x["trasa"],
                time=x["czas"],
                symbol_1=x["symbol_1"],
                symbol_2=x["symbol_2"],
            )
            for x in response
        ]

Tests

Having even basic tests is quite important for the reliability of the project. The pytest-asyncio plugin is helpful with testing asynchronous code. It provides support for coroutines as test functions. The aioresponses is a helper to mock/fake web requests in the python aiohttp package. It is useful to get rid of dependence on the third-party server. I hope I will create more tests for this package in the future.

Let’s start with testing utils. The first one is the flat_dict function. The pytest parametrize mark is useful here for saving some space. I just check here if the function can flat the dictionary with nested dicts and with the nested lists of dicts.

flat_dict_data = [
    (
        [
            {
                "key_1": "value_1",
                "key_2": {
                    "nested_key_1": "nested_value_1",
                    "nested_key_2": "nested_value_2",
                },
                "key_3": {"nested_key_3": "nested_value_3"},
            },
        ],
        {
            "key_1": "value_1",
            "key_2_nested_key_1": "nested_value_1",
            "key_2_nested_key_2": "nested_value_2",
            "key_3_nested_key_3": "nested_value_3",
        },
    ),
    (
        [{"key_1": [{"nested_key": "nested_value"}, {"nested_key": "nested_value"}]}],
        {"key_1_nested_key_0": "nested_value", "key_1_nested_key_1": "nested_value"},
    ),
]


@pytest.mark.parametrize("args, result", flat_dict_data)
def test_flat_dict(args, result):
    assert flat_dict(*args) == result

It is also needed to test the function used for type converters. I won’t show all, it is easy and similar code.

def test_comma_number_to_float():
    number_str = "21,15"
    assert comma_number_to_float(number_str) == 21.15

The first thing of the client class to test is the path_create helper method. I think a good idea here is to check if it returns the correct path with and without the slash in the input and if it raises the exception when the wrong directory is provided.

def test_path_create():
    output = "tests/pywarsaw_cache"
    assert Client.path_create("tests/") == output
    assert Client.path_create("tests") == output


def test_path_create_wrong_dir():
    with pytest.raises(WrongDirectory):
        Client.path_create("tests/charizard")

The second thing to test is the correct change of the attributes while enabling or disabling the cache.

@pytest.mark.asyncio
async def test_cache_enable_creates_sqlite_backed():
    client = Client()
    await client.cache_enable(path="./tests/")
    assert isinstance(client.backend, SQLiteBackend)
    await client.close()


@pytest.mark.asyncio
async def test_cache_enable_creates_cached_session():
    client = Client()
    await client.cache_enable()
    assert isinstance(client.session, CachedSession)
    await client.close()


@pytest.mark.asyncio
async def test_cache_disable_creates_client_session():
    client = Client()
    await client.cache_disable()
    assert isinstance(client.session, ClientSession)
    await client.close()

Lastly, let’s check if the client returns the correct responses with caching enabled and disabled. The aioresponses package is used here for mocking requests.

@pytest.mark.asyncio
async def test_get_without_caching():
    with aioresponses.aioresponses() as m:
        client = Client()
        await client.cache_disable()
        m.get("http://example.com", payload=dict(name="squirtle"))
        response = await client._get("http://example.com")
        assert response == dict(name="squirtle")
        await client.close()


@pytest.mark.asyncio
async def test_get_with_caching():
    with aioresponses.aioresponses() as m:
        client = Client()
        await client.cache_enable(path="./tests/", force_clear=True)
        m.get("http://example.com", payload=dict(name="charmander"))
        response = await client._get("http://example.com")
        assert response == dict(name="charmander")
        await client.close()

I would like to also check if the build_url method from the main part works correctly.

def test_build_url():
    url = "https://api.um.warszawa.pl/api/action/endpoint_test?test1=test1&test2=test2"
    assert Mermaid._build_url(endpoint="endpoint_test", test1="test1", test2="test2")

It is also good to test if it can raise the proper exception for several API server errors.

@pytest.mark.asyncio
async def test_get_data_with_wrong_query_parameters():
    with aioresponses.aioresponses() as m:
        client = Mermaid()
        await client.cache_disable()
        m.get(
            "http://example.com",
            payload=dict(result="Błędna metoda lub parametry wywołania"),
        )
        with pytest.raises(WrongQueryParameters):
            await client._get_data("http://example.com")
        await client.close()


@pytest.mark.asyncio
async def test_get_data_with_wrong_api_key():
    with aioresponses.aioresponses() as m:
        client = Mermaid()
        await client.cache_disable()
        m.get(
            "http://example.com",
            payload=dict(result="false", error="Błędny apikey lub jego brak"),
        )
        with pytest.raises(WrongAPIKey):
            await client._get_data("http://example.com")
        await client.close()


@pytest.mark.asyncio
async def test_get_data_with_unauthorized_access():
    with aioresponses.aioresponses() as m:
        client = Mermaid()
        await client.cache_disable()
        m.get(
            "http://example.com",
            payload=dict(result="false", error="Nieautoryzowany dostęp do danych"),
        )
        with pytest.raises(UnauthorizedAccess):
            await client._get_data("http://example.com")
        await client.close()

The pytest and its plugins are defined as development dependencies in poetry, so let’s run the following command to run tests:

poetry run pytest

tests

As you can see every test passed.

Build

Let’s publish our package now. It requires a few tiny additional steps like docs, ci/cd, readme, or license.

Docs

I generate the documentation in the HTML automatically from the Python docstrings. I won’t show the full process here, just check the Sphinx with the spihnx-apidoc tool. The documentation is hosted for free on ReadTheDocs. If you add a Spinx to the poetry dev dependencies and you would like to check locally if everything is fine, the documentation can be created with:

poetry run make html

CI/CD

My CI/CD is really basic, it is just one Github Actions pipeline. It runs linter and pytests on push to the remote repository.

name: Github Actions

on: [push]

jobs:
  build:

    runs-on: ubuntu-latest
    strategy:
      matrix:
        python-version: ["3.9", "3.10"]

    steps:
      - uses: actions/checkout@v3
      - name: Set up Python ${{ matrix.python-version }}
        uses: actions/setup-python@v3
        with:
          python-version: ${{ matrix.python-version }}
      - name: Install dependencies
        run: |
          python -m pip install --upgrade pip
          pip install flake8
          if [ -f requirements.txt ]; then pip install -r requirements.txt; fi          
      - name: Lint with flake8
        run: |
          # stop the build if there are Python syntax errors or undefined names
          flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
          # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
          flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics          
      - name: Test with pytest
        run: |
          pytest          

Poetry

Firstly let’s export a requirements.txt file because the GitHub Actions pipeline needs it. I make it without any hashes. I export the development dependencies also because ReadTheDocs requires them to build HTML correctly.

poetry export -f requirements.txt --output requirements.txt --without-hashes --dev

The pyproject.toml file contains the whole required information about our package. It looks like this:

[tool.poetry]
name = "pywarsaw"
version = "0.1.0"
description = "An unofficial asynchronous API wrapper for Warsaw Open Data - https://api.um.warszawa.pl/"
authors = ["brozen <szymon.mazurkievicz@gmail.com>"]
license = "MIT"
homepage = "https://github.com/BrozenSenpai/pywarsaw"
documentation = "https://pywarsaw.readthedocs.io/en/latest/"
keywords = ["Warsaw"]
readme =  "README.md"

[tool.poetry.dependencies]
python = "^3.9"
attrs = "^22.2.0"
aiohttp = "^3.8.3"
aiohttp-client-cache = "^0.8.1"
aiosqlite = "^0.18.0"

[tool.poetry.dev-dependencies]
pytest = "^7.2.1"
aioresponses = "^0.7.4"
pytest-asyncio = "^0.20.3"
Sphinx = "^4.0.0"
sphinx-rtd-theme = "^1.1.1"


[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

Now the package can be built and published to the PyPi with the following command:

poetry publish --build

It is also good to deploy it to the test PyPi first and check it everything is fine.

Now the pywarsaw can be installed with pip.

pip install pywarsaw

For example, you can obtain a list of objects with information about current air quality in Warsaw, with caching enabled like this:

import asyncio
import pywarsaw

async def main():
    client = pywarsaw.Mermaid(api_key="YOUR_API_KEY")
    await client.cache_enable()

    result = await client.get_air_quality()

    await client.close()

asyncio.run(main())

Or even better with the context manager:

async with pywarsaw.Mermaid(api_key="YOUR_API_KEY") as client:
    await client.cache_enable()

    result = await client.get_air_quality()

Conclusion

The post was quite short, I rather treat it like a diary than a tutorial or something. The package is not ended yet, it still requires covering some endpoints. The Warsaw Open Data has a lot to offer, but the documentation is not the best (it is awful tbh). I will upgrade it in future versions. Here is the repository, feel free to check it!