Warsaw Open Data - building a simple asynchronous API wrapper
Introduction
Warsaw is the capital of Poland - the country I live in. The City Hall created an open data platform with a lot of valuable information. To be honest it is an amazing job and such a unique thing on a national scale. However, the website and documentation are not so good. Their readability is low, and the English version does not exist. I would like to make some data projects using the Warsaw Open Data, so I decided to build a simple wrapper to make it easier. It will be asynchronous because there is some real-time data and objects are sometimes quite big. I will use it mostly with streaming data using Kafka etc, so it makes sense to do it asynchronously. I also already have one blocking I/O API wrapper in my portfolio, so I would rather not repeat it. In the blog post, I won’t show the whole code of the package, because there are a lot of endpoints to cover. The desired assumption of the project is to be:
- Asynchronous: designed to work with asyncio
- Extensive: cover as many endpoints as possible
- Approachable: all polish fields translated to English
- Responsible: minimize the load on the API server
- implemented caching
- Simple: easy to use and customize:
- object-oriented design
- use of data transfer objects
- type hinting
- convert adequate fields from strings to the more suitable types
- Lightweight: minimal usage of the third-party packages
The list is quite general but I guess it is fine for illustrative purposes. The post will be rather concise. I just want to quickly show you the whole work process.
Let’s get to work!
pywarsaw
Dependencies
Let’s start the project with poetry new pywarsaw
command. It will create the basic directory structure.
Now add the external packages that will be used in the library.
poetry add "attrs>=22.2.0" aiohttp aiohttp-client-cache aiosqlite
The development dependencies should be also added:
poetry add pytest aioresponses pytest-asyncio Sphinx spinx-rtd-theme
The purpose of every package will be described in further sections.
DTOs
A data transfer object is a structure mostly used for carrying data between processes. A DTO provides easy serialization, validation, readability, etc. Python has built-in DTOs - data classes. I use the external package - attrs because I know it well and it provides the converters feature, which I really appreciate. It helps with converting types in a compatible with static type checkers way.
The first DTO I would like to create is a parent class. It has 4 methods for converting an object to other popular structures:
@define
class DTO:
"""Parent class for all DTOs"""
def to_dict(self) -> dict:
"""Convert object to a dictionary"""
return asdict(self)
def to_flat_dict(self) -> dict:
"""Convert object to a flat dictionary"""
d = asdict(self)
return flat_dict(d)
def to_tuple(self) -> tuple:
"""Convert object to a tuple"""
return astuple(self)
def to_json(self) -> str:
"""Convert object to a json string"""
return JSONEncoder().encode(self)
The custom JSON encoder that can deal with DTOs and datetime objects is defined like this:
class JSONEncoder(json.JSONEncoder):
"""Custom json encoder to deal with dates, times and DTOs"""
def default(self, obj):
if isinstance(obj, (datetime.date, datetime.time)):
return obj.isoformat()
elif isinstance(obj, DTO):
return obj.to_dict()
return json.JSONEncoder.default(self, obj)
The function used to flat the dictionary is created this way:
def flat_dict(d, parent_key="", sep="_", index=None):
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if index is not None:
new_key = "{}_{}".format(new_key, index)
if isinstance(v, dict):
items.extend(flat_dict(v, new_key, sep=sep).items())
elif isinstance(v, list):
for i, item in enumerate(v):
if isinstance(item, dict):
items.extend(flat_dict(item, new_key, sep=sep, index=i).items())
else:
items.append((new_key, item))
else:
items.append((new_key, v))
return dict(items)
Now let’s take a look at an example target object.
@define
class CycleStation(DTO):
"""The bicycle station.
The following information is available:
* racks: count of the racks
* update_date: date of the data update
* object_id: id of the object
* location: location
* bikes: count of the bikes
* station_number: station number
"""
racks: int = field(converter=int)
update_date: datetime.datetime = field(converter=to_datetime_with_12)
object_id: str
location: str
bikes: int = field(converter=int)
station_number: str
As you can see it has converters. Basic types like int or float have built-in converters. More complicated ones can be created manually, for example:
def to_datetime_with_12(f: str) -> datetime.datetime:
return datetime.datetime.strptime(f, "%d-%b-%y %H.%M.%S.%f %p") if f else f
It casts a string with microseconds and AM/PM to a datetime object.
I don’t think there is a reason to show all of the DTOs. They are all really similar - they use the converters when there is a need, and some fields can have more than one type (mostly None).
Exceptions
The exceptions.py
contains all of the exceptions the package can rise. I have created just four, three of them are for internal API errors.
class WrongQueryParameters(Exception):
"""Raised when the query parameters are wrong."""
def __str__(self):
return "Wrong query parameters."
class UnauthorizedAccess(Exception):
"""Raised when API key is not provided."""
def __str__(self):
return "API key is not provided"
class WrongAPIKey(Exception):
"""Raised when provided API key is wrong."""
def __str__(self):
return "Wrong API key."
class WrongDirectory(Exception):
"""Raised when the directory for cache does not exist."""
def __str__(self):
return "The provided directory for a cache does not exist."
Client
A client is a class for making asynchronous HTTP requests with optional caching. The main class with methods for all endpoints - pywarsaw will inherit from it. The initialization attributes of the class are indicating if the caching is enabled, the backend for it, and the session - standard aiohttp or the cached one.
class Client:
"""A client for making async HTTP requests with optional caching.
Args:
session (Union[ClientSession, CachedSession]): The session to use for making requests.
backend (Union[None, SQLiteBackend]): The backend to use for caching.
enabled (bool): Whether caching is enabled or not.
"""
def __init__(
self,
session: Union[None, ClientSession, CachedSession] = None,
backend: Union[None, SQLiteBackend] = None,
enabled: bool = False,
):
self._session = session
self._backend = backend
self._enabled = enabled
Next there are two dunder methods:
async def __aenter__(self):
return self
async def __aexit__(self, *args):
await self.close()
The __aenter__
and __aexit__
are special methods in Python’s asynchronous context manager protocol. They are used to define objects that can be used with the async with
statement. The __aenter__
method is called when an object is entered in an asynchronous context. The __aexit__
method is called when an object is exited from an asynchronous context, either through normal execution or because of an exception. For example, the client can be used like this:
async with Client() as c:
await c.cache_enable()
result = await c._get('http://example.com/api')
I have also the static method for creating a path for the cache.
@staticmethod
def path_create(path: str) -> str:
"""Create a cache path.
Args:
path (str): Directory for a cache file.
Returns:
str: Concatenated path.
Raises:
WrongDirectory: The provider directory for a cache does not exist.
"""
if not os.path.exists(path):
raise WrongDirectory()
return os.path.join(path, "pywarsaw_cache")
Two methods are responsible for enabling and disabling the caching. The path, expiration time, or manual cache clearing can be set from the arguments of the cache_enable
method.
async def cache_enable(
self,
path: str = os.getcwd(),
expire_after: Union[
None, int, float, str, datetime.datetime, datetime.timedelta
] = 3600,
force_clear: bool = False,
clear_expired: bool = False,
) -> None:
"""Enable caching using the SQLite backend.
Args:
path (str): Directory for the cache file. Defaults to current workind directory.
expire_after (Union[None, int, float, str, datetime.datetime, datetime.timedelta]):
Expiration time for the cache. Defaults to 3600.
force_clear (bool): Clear the cache upon initialization. Defaults to False.
clear_expired (bool): Clear expired cache upon initialization. Defaults to False.
"""
self._path = path
self._enabled = True
self._backend = SQLiteBackend(
cache_name=Client.path_create(path),
expire_after=expire_after,
allowed_methods=("GET"),
include_headers=True,
)
self._session = CachedSession(cache=self._backend)
if force_clear:
await self._session.cache.clear()
if clear_expired:
await self._session.cache.delete_expired_responses()
async def cache_disable(self) -> None:
"""Disable caching"""
self._enabled = False
self._session = ClientSession()
self._backend = None
The _get
returns a jsonified response.
async def _get(self, url: str) -> dict:
"""Make an asynchronous HTTP GET request to the provided URL.
Args:
url (str): The URL to make the GET request to.
Returns:
dict: The JSON response of the GET request.
Raises:
HTTPError: If the response status is 400 or higher.
"""
async with self._session.get(url) as response:
return await response.json()
async def close(self) -> None:
"""Close the session and releases all resources held by the session."""
await self._session.close()
The close method just closes the session. It is useful if someone does not use the context manager.
async def close(self) -> None:
"""Close the session and releases all resources held by the session."""
await self._session.close()
Main
It is part of the package that the user will work with. The class with methods for obtaining the data from the API is named Mermaid - it is a symbol of Warsaw. As you can notice it inherits from the Client
class.
class Mermaid(Client):
"""A client for interacting with Warsaw Open Data API.
Args:
api_key (str): The API key to use for authentication.
Defaults to None.
"""
def __init__(
self,
api_key: str = None,
):
super().__init__()
I have here the static helper method for easy building the URLs from the provided arguments.
@staticmethod
def _build_url(endpoint, **kwargs) -> str:
"""Builds a url for a specified endpoint using provided parameters.
Args:
endpoint (str): The endpoint for the API call.
**kwargs: Additional parameters to include in the url.
Returns:
str: The built url.
"""
base_url = "https://api.um.warszawa.pl/api/action/"
url = (
base_url
+ endpoint
+ "?"
+ "&".join(
f"{key}={item}" for key, item in kwargs.items() if item is not None
)
)
return url
The _get_data
method is just a _get
method from the client, but with the ability to raise exceptions connected with the Warsaw Open Data API responses.
async def _get_data(self, url: str) -> dict:
"""Retrieves data from a specified url using an asynchronous HTTP GET request.
Args:
url (str): The url to retrieve data from.
Returns:
dict: The data retrieved from the url.
Raises:
HTTPError: If the GET request returns an HTTP error status code.
WrongQueryParameters: If the query parameters are wrong.
WrongAPIKey: If the provided API key is wrong.
UnauthorizedAccess: If API key is not provided.
"""
res = await super()._get(url)
# handle exceptions
if res["result"] in (
"Błędna metoda lub parametry wywołania",
"Wfs error: IllegalArgumentException: FeatureMember list is empty",
):
await super().close()
raise WrongQueryParameters()
if res["result"] == "false" and res["error"] == "Błędny apikey lub jego brak":
await super().close()
raise WrongAPIKey()
if (
res["result"] == "false"
and res["error"] == "Nieautoryzowany dostęp do danych"
):
await super().close()
raise UnauthorizedAccess()
return res
I will show just one method for retrieving data from a certain endpoint. This API has a lot of different data, but every method is pretty similar. It requires more or less complicated response processing and matching the fields with the chosen data transfer object.
async def get_line_timetable(
self, busstop_id: Union[int, str], busstop_number: str, line: Union[int, str]
):
"""Retrieves a list of the line timetable for the provided public transport stop and line.
Args:
busstop_id (int, str): Public transport stop id
busstop_nubmer (str): Stop bar ID (eg. 01, 02)
line: Public transport line number
Returns:
list[LineTimetable]: A list of line timetable objects.
"""
url = Mermaid._build_url(
endpoint="dbtimetable_get",
id="e923fa0e-d96c-43f9-ae6e-60518c9f3238",
busstopId=busstop_id,
busstopNr=busstop_number,
line=line,
apikey=self.api_key,
)
response = await self._get_data(url)
response = [x["values"] for x in response["result"]]
response = [{y["key"]: y["value"] for y in x} for x in response]
return [
LineTimetable(
brigade=x["brygada"],
direction=x["kierunek"],
route=x["trasa"],
time=x["czas"],
symbol_1=x["symbol_1"],
symbol_2=x["symbol_2"],
)
for x in response
]
Tests
Having even basic tests is quite important for the reliability of the project. The pytest-asyncio plugin is helpful with testing asynchronous code. It provides support for coroutines as test functions. The aioresponses is a helper to mock/fake web requests in the python aiohttp package. It is useful to get rid of dependence on the third-party server. I hope I will create more tests for this package in the future.
Let’s start with testing utils. The first one is the flat_dict
function. The pytest parametrize mark is useful here for saving some space. I just check here if the function can flat the dictionary with nested dicts and with the nested lists of dicts.
flat_dict_data = [
(
[
{
"key_1": "value_1",
"key_2": {
"nested_key_1": "nested_value_1",
"nested_key_2": "nested_value_2",
},
"key_3": {"nested_key_3": "nested_value_3"},
},
],
{
"key_1": "value_1",
"key_2_nested_key_1": "nested_value_1",
"key_2_nested_key_2": "nested_value_2",
"key_3_nested_key_3": "nested_value_3",
},
),
(
[{"key_1": [{"nested_key": "nested_value"}, {"nested_key": "nested_value"}]}],
{"key_1_nested_key_0": "nested_value", "key_1_nested_key_1": "nested_value"},
),
]
@pytest.mark.parametrize("args, result", flat_dict_data)
def test_flat_dict(args, result):
assert flat_dict(*args) == result
It is also needed to test the function used for type converters. I won’t show all, it is easy and similar code.
def test_comma_number_to_float():
number_str = "21,15"
assert comma_number_to_float(number_str) == 21.15
The first thing of the client class to test is the path_create
helper method. I think a good idea here is to check if it returns the correct path with and without the slash in the input and if it raises the exception when the wrong directory is provided.
def test_path_create():
output = "tests/pywarsaw_cache"
assert Client.path_create("tests/") == output
assert Client.path_create("tests") == output
def test_path_create_wrong_dir():
with pytest.raises(WrongDirectory):
Client.path_create("tests/charizard")
The second thing to test is the correct change of the attributes while enabling or disabling the cache.
@pytest.mark.asyncio
async def test_cache_enable_creates_sqlite_backed():
client = Client()
await client.cache_enable(path="./tests/")
assert isinstance(client.backend, SQLiteBackend)
await client.close()
@pytest.mark.asyncio
async def test_cache_enable_creates_cached_session():
client = Client()
await client.cache_enable()
assert isinstance(client.session, CachedSession)
await client.close()
@pytest.mark.asyncio
async def test_cache_disable_creates_client_session():
client = Client()
await client.cache_disable()
assert isinstance(client.session, ClientSession)
await client.close()
Lastly, let’s check if the client returns the correct responses with caching enabled and disabled. The aioresponses package is used here for mocking requests.
@pytest.mark.asyncio
async def test_get_without_caching():
with aioresponses.aioresponses() as m:
client = Client()
await client.cache_disable()
m.get("http://example.com", payload=dict(name="squirtle"))
response = await client._get("http://example.com")
assert response == dict(name="squirtle")
await client.close()
@pytest.mark.asyncio
async def test_get_with_caching():
with aioresponses.aioresponses() as m:
client = Client()
await client.cache_enable(path="./tests/", force_clear=True)
m.get("http://example.com", payload=dict(name="charmander"))
response = await client._get("http://example.com")
assert response == dict(name="charmander")
await client.close()
I would like to also check if the build_url
method from the main part works correctly.
def test_build_url():
url = "https://api.um.warszawa.pl/api/action/endpoint_test?test1=test1&test2=test2"
assert Mermaid._build_url(endpoint="endpoint_test", test1="test1", test2="test2")
It is also good to test if it can raise the proper exception for several API server errors.
@pytest.mark.asyncio
async def test_get_data_with_wrong_query_parameters():
with aioresponses.aioresponses() as m:
client = Mermaid()
await client.cache_disable()
m.get(
"http://example.com",
payload=dict(result="Błędna metoda lub parametry wywołania"),
)
with pytest.raises(WrongQueryParameters):
await client._get_data("http://example.com")
await client.close()
@pytest.mark.asyncio
async def test_get_data_with_wrong_api_key():
with aioresponses.aioresponses() as m:
client = Mermaid()
await client.cache_disable()
m.get(
"http://example.com",
payload=dict(result="false", error="Błędny apikey lub jego brak"),
)
with pytest.raises(WrongAPIKey):
await client._get_data("http://example.com")
await client.close()
@pytest.mark.asyncio
async def test_get_data_with_unauthorized_access():
with aioresponses.aioresponses() as m:
client = Mermaid()
await client.cache_disable()
m.get(
"http://example.com",
payload=dict(result="false", error="Nieautoryzowany dostęp do danych"),
)
with pytest.raises(UnauthorizedAccess):
await client._get_data("http://example.com")
await client.close()
The pytest and its plugins are defined as development dependencies in poetry, so let’s run the following command to run tests:
poetry run pytest
As you can see every test passed.
Build
Let’s publish our package now. It requires a few tiny additional steps like docs, ci/cd, readme, or license.
Docs
I generate the documentation in the HTML automatically from the Python docstrings. I won’t show the full process here, just check the Sphinx with the spihnx-apidoc tool. The documentation is hosted for free on ReadTheDocs. If you add a Spinx to the poetry dev dependencies and you would like to check locally if everything is fine, the documentation can be created with:
poetry run make html
CI/CD
My CI/CD is really basic, it is just one Github Actions pipeline. It runs linter and pytests on push to the remote repository.
name: Github Actions
on: [push]
jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.9", "3.10"]
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install flake8
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with pytest
run: |
pytest
Poetry
Firstly let’s export a requirements.txt
file because the GitHub Actions pipeline needs it. I make it without any hashes. I export the development dependencies also because ReadTheDocs requires them to build HTML correctly.
poetry export -f requirements.txt --output requirements.txt --without-hashes --dev
The pyproject.toml
file contains the whole required information about our package. It looks like this:
[tool.poetry]
name = "pywarsaw"
version = "0.1.0"
description = "An unofficial asynchronous API wrapper for Warsaw Open Data - https://api.um.warszawa.pl/"
authors = ["brozen <szymon.mazurkievicz@gmail.com>"]
license = "MIT"
homepage = "https://github.com/BrozenSenpai/pywarsaw"
documentation = "https://pywarsaw.readthedocs.io/en/latest/"
keywords = ["Warsaw"]
readme = "README.md"
[tool.poetry.dependencies]
python = "^3.9"
attrs = "^22.2.0"
aiohttp = "^3.8.3"
aiohttp-client-cache = "^0.8.1"
aiosqlite = "^0.18.0"
[tool.poetry.dev-dependencies]
pytest = "^7.2.1"
aioresponses = "^0.7.4"
pytest-asyncio = "^0.20.3"
Sphinx = "^4.0.0"
sphinx-rtd-theme = "^1.1.1"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
Now the package can be built and published to the PyPi with the following command:
poetry publish --build
It is also good to deploy it to the test PyPi first and check it everything is fine.
Now the pywarsaw can be installed with pip.
pip install pywarsaw
For example, you can obtain a list of objects with information about current air quality in Warsaw, with caching enabled like this:
import asyncio
import pywarsaw
async def main():
client = pywarsaw.Mermaid(api_key="YOUR_API_KEY")
await client.cache_enable()
result = await client.get_air_quality()
await client.close()
asyncio.run(main())
Or even better with the context manager:
async with pywarsaw.Mermaid(api_key="YOUR_API_KEY") as client:
await client.cache_enable()
result = await client.get_air_quality()
Conclusion
The post was quite short, I rather treat it like a diary than a tutorial or something. The package is not ended yet, it still requires covering some endpoints. The Warsaw Open Data has a lot to offer, but the documentation is not the best (it is awful tbh). I will upgrade it in future versions. Here is the repository, feel free to check it!