Ergast Developer API - building a simple API wrapper package
Introduction
In one of the firsts post on this blog, I created a simple cloud function to manage the ETL process between Ergast and Google BigQuery warehouse. In that post, I extracted the CSV files available on the Ergast website, because I didn’t want to overuse the server. However if I would like to make the analysis about Formula 1 more frequently, connecting with my BigQuery warehouse via the client library is not so convenient, especially in the iPython notebooks. There are plenty of awesome packages that help with interacting with Ergast API, for example pyErgast - a pandas python wrapper or Fast-F1 - a swiss army knife for Formula 1 analytics. However the mentioned tools do not meet my all expectations, so I would like to develop my own tool that will fill the gaps. Also, I would like to learn Poetry packaging tool. The name of my Ergast API wrapper is Yukinator, in honor of the Japanese Formula 1 driver - Yuki Tsunoda.
Assumptions
Listing all of the requirements at the start of a project is always a good idea. My Ergast API wrapper should meet the following needs:
- Lightweight: minimal package size
- Extensive: covers all Ergast API endpoints
- Responsible: minimize the load on the API server
- implemented caching
- Simple: easy to use and customize:
- object-oriented design
- use of Data Transfer Objects
- converts fields from string to the expected types
- Tested
- Documented
The list is quite general but I guess it is fine for illustrative purposes.
The directory and file structure at the end should look like this:
yukinator
| .gitignore
| README.md
| poetry.lock
| pyproject.toml
|___docs
| | Makefile
| | conf.py
| | index.rst
| | other.rst
|___yukinator
| | __init__.py
| | exceptions.py
| | objects.py
| | utils.py
| | yukinator.py
|___tests
| | __init__.py
| | test_objects.py
| | test_utils.py
| | test_yukinator.py
| |___resources
| | | example_response.json
| | | examples.py
|___.github
| |___workflows
| | | python-package.yml
It looks not so complicated. The files ignored by git are obviously not shown in the above example.
Yukinator
Data Transfer Objects
A data transfer object is a data structure that is mostly used for passing data between the services or application layers. Using them can give us a lot of benefits, for example:
- easy serialization
- easy validation,
- reliability: always the same keys, you know what to expect from the object,
- a lot of dunder methods are covered by default,
- readability: well-defined fields and type hints, they also require less code.
There are probably a lot more cons of using DTOs, but for me, those are most valuable (especially the readability and ease of use).
Since Python 3.7 there is a built-in @dataclass decorator. It is a good feature and I really like to create basic DTOs with it. However, for this project, I decided to use the atrrs package. Attrs is a library that was a basis for designing the data classes and it is trusted by NASA. The reason to use it for my project is the converters feature. It is really useful for types converting in a friendly way for static type checkers like mypy. Type converting is possible from the @dataclass for example like this:
@dataclass
class Example:
def __post_init__(self):
for field in fields(self):
value = getattr(self, field.name)
if not isinstance(value, field.type):
self.__setattr__(field.name, field.type(value))
@dataclass
class Child(Example):
a: int
b: str
It doesn’t work with static type checkers because the converting is done post initialization of the object. I also think that the code for this is quite ugly. I don’t think that I should care that much about static type checkers in a dynamically typed language, but why not if it is really easy with a package that has almost 100 million downloads in a month. So it can be done like this with the attrs:
@define
class RaceResultDTO(DTO):
Driver: DriverDTO = field(converter=to_driver_dto)
number: int = field(converter=int)
Constructor: ConstructorDTO = field(converter=to_constructor_dto)
position: int = field(converter=int)
positionText: str
points: float = field(converter=float)
grid: int = field(converter=int)
laps: int = field(converter=int)
status: str
Time: Optional[TimeDTO] = field(
default=None, converter=converters.optional(to_time_dto)
)
FastestLap: Optional[FastestLapDTO] = field(
default=None, converter=converters.optional(to_fastestlap_dto)
)
As you can notice the Class inherits from the DTO class, which is a parent class for every DTO. It has defined methods like to_dict, to_flat_dict, to_tuple, and to_json. They are useful for converting objects to simpler data structures.
@define
class DTO:
"""Parent class for all DTOs"""
def to_dict(self) -> dict:
"""Convert object to a dictionary"""
return asdict(self)
def to_flat_dict(self) -> dict:
"""Convert object to a flat dictionary"""
d = asdict(self)
return flat_dict(d)
def to_tuple(self) -> tuple:
"""Convert object to a tuple"""
return astuple(self)
def to_json(self) -> str:
"""Convert object to a json"""
return JSONEncoder().encode(self)
to_json
method needs a custom encoder to deal with DTOs and dates:
class JSONEncoder(json.JSONEncoder):
"""Custom json encoder to deal with dates, times and DTOs"""
def default(self, obj):
if isinstance(obj, (datetime.date, datetime.time)):
return obj.isoformat()
elif isinstance(obj, DTO):
return obj.to_dict()
return json.JSONEncoder.default(self, obj)
to_flat_dict
method is using the utility function flat_dict
.
def flat_dict(d: dict, keys: list = None) -> dict:
"""Flat nested dict"""
def _flatten_dict_gen(d, parent_key):
for k, v in d.items():
new_key = parent_key.lower() + k.capitalize() if parent_key else k
if keys:
if isinstance(v, MutableMapping) and k in keys:
yield from flatten_dict(v, new_key).items()
else:
yield new_key, v
else:
if isinstance(v, MutableMapping):
yield from flatten_dict(v, new_key).items()
else:
yield new_key, v
def flatten_dict(d: MutableMapping, parent_key: str = ""):
return dict(_flatten_dict_gen(d, parent_key))
return flatten_dict(d)
For the nested DTOs or non-standard types like dates the custom converters should be created, for example:
def to_driver_dto(d: dict):
return DriverDTO(**d)
def to_date(f: str) -> datetime.date:
return datetime.datetime.strptime(f, "%Y-%m-%d").date()
I don’t think there is a reason to show all of the DTOs. They are all really similar - they use the converters when there is a need, and some fields are optional because they don’t exist for every F1 season.
Cache
It is not so hard to write my own simple caching module, but I decided to choose the third-party package requests-cache, which works perfectly. The first thing to do is to create a static method path_creator
, which will intelligently create the path for the cache file. It is a pure caching class utility function, so I think it is fine to keep it as a static method.
class Cache:
_session = None
_path = ""
@staticmethod
def path_create(path: str) -> str:
"""Create a cache path.
Args:
path (str): Directory for a cache file.
Returns:
str: Concatenated path.
Raises:
WrongDirectory: The provider directory for a cache does not exist.
"""
if not os.path.exists(path):
raise WrongDirectory()
return os.path.join(path, "yuki_cache")
The next thing is creating a class method to enable the requests session with caching features. The backed for caching is the most simple possible - SQLite. The API wrapper will use only GET methods. It will return the stale cache data if a new request raises an exception. Headers can be passed like in the standard requests library. Clearing the cache can be also forced by passing a clear
argument or timed via passing the expires_after
argument.
@classmethod
def cache_enable(
cls,
path: str,
headers: dict,
expires_after: Union[None, int, float, str],
clear: bool,
):
"""Enable caching
Args:
path (str): Directory for a cache file.
headers (dict): Dictionary of headers to be sent on each request.
expires_after (None, int, float, str): Time after cached items will expire.
clear (bool): If True clear cache. Defaults to False.
Raises:
WrongDirectory: The provider directory for a cache does not exist.
"""
cls._path = path
cls._session = requests_cache.CachedSession(
cache_name=Cache.path_create(path),
backend="sqlite",
allowable_methods=("GET"),
expire_after=expires_after,
stale_if_error=True,
headers=headers,
)
if clear:
cls._session.cache.clear()
The second class method is just a simple cached get request.
@classmethod
def cache_get(cls, url):
"""Make a get request with caching
Args:
url (str): API endpoint URL.
Returns:
requests.models.Response: Response for the request.
HTTPError: If the response's status code is not less than 400.
"""
return cls._session.get(url)
Exceptions
My code raises just two custom exceptions: WrongQueryParameters and WrongDirectory. They are specified in the exception.py
file. The WrongDirectory error was already raised in the static method path_create
.
class WrongQueryParameters(Exception):
"""Raised when the expected output for given parameters is empty."""
def __str__(self):
return "The output for provided query parameters is empty."
class WrongDirectory(Exception):
"""Raised when the directory for cache does not exist."""
def __str__(self):
return "The provider directory for a cache does not exist."
Main
It is the main part of the project. After importing the previously created utilities and data transfer object let’s create the initialization for the Yuki
class object with the following arguments:
- headers - dictionary of headers to be sent on each request,
- cache_enabled - enable caching,
- cache_dir - directory for the cache SQLite file,
- expires after - time after cached items will expire,
- force_clear - clear the whole cache before the first request.
If caching is disabled, all the cache-related attributes are set to None.
class Yuki:
"""Api wrapper for the Ergast F1 API.
The data for every endpoint is provided in the handy data transfer object.
DTOs have methods for converting them into simpler structures:
to_dict: convert an object to a dictionary,
to_flat_dict: convert object to a flat dictionary,
to_tuple: convert an object to a tuple,
to_json: convert an object to a JSON string.
Disabling the caching is strongly not recommended.
Ergast API has a limit of four calls per second and 200 per hour.
Please take care while calling the methods within a loop.
"""
_base_url = "https://ergast.com/api/f1/"
def __init__(
self,
headers: dict = {"User-Agent": "Yukinator"},
cache_enabled: bool = True,
cache_dir: str = os.getcwd(),
expires_after: Union[None, int, float, str] = 3600,
force_clear: bool = False,
):
"""Initialize the Yuki class object.
If caching is disabled, all of the cache-related attributes are None.
Args:
headers (dict): Dictionary of headers to be sent on each request.
Defaults to {'User-Agent': 'Yukinator'}
cache_enabled (bool): Enable caching. Defaults to True.
cache_dir (str): Directory for the cache sqlite file.
Defaults to the current working directory.
expires_after (None, int, float, str): Time after cached items will expire.
Defaults to 3600 - one hour.
force_clear (bool): Clear the whole cache before the first request.
Defaults to False.
"""
self.headers = headers
self.cache_enabled = cache_enabled
self.cache_dir = cache_dir if cache_enabled else None
self.expires_after = expires_after if cache_enabled else None
self.force_clear = force_clear if cache_enabled else None
if self.cache_enabled:
Cache.cache_enable(
path=self.cache_dir,
headers=self.headers,
expires_after=self.expires_after,
clear=self.force_clear,
)
As you can see users can choose to not use the caching, but caching is strongly recommended.
The next thing is a class method _build_ur
. It builds an URL for every endpoint with a maximum possible limit of pages - 1000. I think it is a helpful method for a wrapper like this because there are several endpoints, so repeatable copy-pasting of the URL would be annoying.
@classmethod
def _build_url(cls, *endpoints) -> str:
"""Build a full URL for provided API endpoint with a limit set to 1000.
Args:
*endpoints: API endpoint suffixes.
Returns:
str: URL ready for an API call.
"""
return (
cls._base_url
+ "/".join(str(suffix) for suffix in endpoints if suffix)
+ ".json?limit=1000"
)
The next protected method is responsible for making cached requests if it is enabled or standard requests if it is not. It also checks if the response for provided parameters is empty and raises the WrongQueryParameters exception. For example, the response might be empty if the user provided the year from the future.
def _make_request(self, url: str) -> dict:
"""Make a successful GET request for a specified URL.
If a cache is enabled, get the result from the cache if possible,
else call the API and cache it.
If a cache is disabled, call the API.
Args:
url (str): API endpoint URL.
Returns:
dict: Content of response.
Raises:
HTTPError: If the response's status code is not less than 400.
WrongQueryParameters: The output for provided query parameters is empty.
"""
if self.cache_enabled:
response = Cache.cache_get(url)
if not response.ok:
response.raise_for_status()
result = response.json()
else:
response = requests.get(url, headers=self.headers)
if not response.ok:
response.raise_for_status()
result = response.json()
if int(result["MRData"]["total"]) <= 0:
raise WrongQueryParameters()
return result
The other methods are strictly connected to the API endpoint. They have very similar logic and every one of them returns the list of data transfer objects. For example, the method for drivers endpoint looks like this:
def get_drivers(
self, year: Union[int, str, None] = None, race: Union[int, str, None] = None
) -> list[DriverDTO]:
"""Obtain a list of data transfer objects with information about the drivers.
By default, return every driver in the F1 history.
If only the year parameter is provided, return a list for that season.
If year and race parameters are provided, return a list for that race and year.
If only the race parameter is provided, raise the WrongQueryParameters error.
If provided parameters are not real (e.g. year from the future),
raise the WrongQueryParameters error.
Args:
year (int, str, None): Optional parameter with a year to be queried.
Defaults to None.
race (int, str, None): Optional parameter with a number of a race
to be queried. Defaults to None.
Returns:
list[DriverDTO]: List of data transfer objects.
Raises:
HTTPError: The response's status code is not less than 400.
WrongQueryParameters: The output for provided query parameters is empty.
"""
url = Yuki._build_url(year, race, "drivers")
response = self._make_request(url)
response = response["MRData"]["DriverTable"]["Drivers"]
return [DriverDTO(**x) for x in response]
I am not able to show all of the methods in the post (it is around 500 lines of code). I think the get_races
method is the one worth mentioning. It uses the flat_dict
utility function but with specified keys. That explains the conditional block in this function. These fields contain just dates and times for qualifying, practices, and sprints. I think there is no reason to create an additional object for them.
def get_races(
self, year: Union[int, str] = "current", race: Union[int, str, None] = None
) -> list[RaceDTO]:
"""Obtain a list of data transfer objects with information about the races.
By default, return the races for a current season.
If only the year parameter is provided, return a list for that season.
If year and race parameters are provided, return a list for that race and year.
If only the race parameter is provided, return a race from a current season.
If provided parameters are not real (e.g. year from the future),
raise the WrongQueryParameters error.
Args:
year (int, str): Optional parameter with a year to be queried.
Defaults to 'current'
race (int, str, None): Optional parameter with a number of a race
to be queried. Defaults to None.
Returns:
list[RaceDTO]: List of data transfer objects.
Raises:
HTTPError: The response's status code is not less than 400.
WrongQueryParameters: The output for provided query parameters is empty.
"""
url = Yuki._build_url(year, race)
response = self._make_request(url)
response = response["MRData"]["RaceTable"]["Races"]
return [
RaceDTO(
**flat_dict(
x,
[
"FirstPractice",
"SecondPractice",
"ThirdPractice",
"Qualifying",
"Sprint",
],
)
)
for x in response
]
Tests
To be honest, with Test Driven Development methodology the tests should be written before the functionalities. However, I think it would make some mess in the structure of the blog post. I am also not fully committed to TDD yet, so the tests section after the functionalities sections is quite true. For testing, I usually use the pytest
framework.
Test utilities
Let’s start with testing the utils
. The path_create
static method is the first one to test. Everything to do is checking if the method returns correct path for the input with and without the slash at the end, and if it raises the exception while the provided directory does not exists.
def test_path_create():
expected_output = "tests/resources/yuki_cache"
assert Cache.path_create("tests/resources") == expected_output
assert Cache.path_create("tests/resources/") == expected_output
def test_path_create_wrong_dir():
with pytest.raises(WrongDirectory):
Cache.path_create("tests/wrongdir")
The next thing to test is a cache_get
method. This is a quite tricky part. I am not sure if my idea to test it is correct. I know the tests should avoid calling the external APIs, it should be mocked. However, I do not know how to mock only the first call from the cached_requests get method and let the second one get the response from the cache in a normal way. The test I implemented makes a real call to the API, checks if the type of the response is a standard response type, then make a second call and checks if the type is a cached response. The test force clears the cache within the initialization, so it also checks if that feature works correctly. Approach with the mocking I could imagine would be mocking the request and then checking if it exists in the database with SQLite, but I think it is over-complicated and I have no idea if the idea is good enough. I hope the Ergast server will survive one unnecessary API call done for testing.
def test_cache_get():
Cache.cache_enable(
"tests/resources", {"User-Agent": "Yukinator testing"}, 3600, True
)
assert (
type(
Cache.cache_get("https://ergast.com/api/f1/1995/4/drivers.json?limit=1000")
)
== requests.models.Response
)
assert (
type(
Cache.cache_get("https://ergast.com/api/f1/1995/4/drivers.json?limit=1000")
)
== requests_cache.models.response.CachedResponse
)
The last thing to test from utilities is the flat_dict
function. There is a need to check if the function works with the keys and without them. Pytest parametrize mark is useful here, it allows me to save some space.
flat_dict_data = [
(
[
{
"key_1": "value_1",
"key_2": {
"nested_key_1": "nested_value_1",
"nested_key_2": "nested_value_2",
},
"key_3": {"nested_key_3": "nested_value_3"},
},
["key_2"],
],
{
"key_1": "value_1",
"key_2Nested_key_1": "nested_value_1",
"key_2Nested_key_2": "nested_value_2",
"key_3": {"nested_key_3": "nested_value_3"},
},
),
(
[
{
"key_1": "value_1",
"key_2": {
"nested_key_1": "nested_value_1",
"nested_key_2": "nested_value_2",
},
"key_3": {"nested_key_3": "nested_value_3"},
},
],
{
"key_1": "value_1",
"key_2Nested_key_1": "nested_value_1",
"key_2Nested_key_2": "nested_value_2",
"key_3Nested_key_3": "nested_value_3",
},
),
]
@pytest.mark.parametrize("args, result", flat_dict_data)
def test_flat_dict(args, result):
assert flat_dict(*args) == result
Test objects
My objects file is pretty simple and does not require a lot of testing. So the functions to test are just to_date
, to_time
and the methods from DTO parent class.
def test_to_date():
date_string = "2022-04-02"
date_obj = datetime.date(2022, 4, 2)
assert to_date(date_string) == date_obj
def test_to_time():
time_string = "21:42:15Z"
time_obj = datetime.time(
hour=21, minute=42, second=15, tzinfo=datetime.timezone.utc
)
assert to_time(time_string) == time_obj
@pytest.fixture
def race_dto():
return RaceDTO(**example_race_1)
def test_to_dict(race_dto):
assert race_dto.to_dict() == example_racedto_dict
def test_to_tuple(race_dto):
assert race_dto.to_tuple() == example_racedto_tuple
def test_to_flatdict(race_dto):
assert race_dto.to_flat_dict() == example_racedto_flatdict
def test_to_json(race_dto):
assert race_dto.to_json() == example_racedto_json
The pytest fixture is something worth noticing here. Fixtures are functions run before each test function to which it is applied. So they are really useful for example for preparing some input data, setting connections with databases, and generally not repeating the code in every test.
Test yukinator
The main part of the wrapper is the last thing to test. Let’s start with checking if the _build_url
works with all of the arguments and with None values. Parametrize mark is also useful here. Many people tell that protected or private methods shouldn’t be unit tested. I think that my methods _build_url
and make_request
have a crucial impact on the whole wrapper, so I guess it is better to test them.
build_urls_data = [
(
[2022, 4, "qualifying"],
"https://ergast.com/api/f1/2022/4/qualifying.json?limit=1000",
),
([2001, None, "drivers"], "https://ergast.com/api/f1/2001/drivers.json?limit=1000"),
]
@pytest.mark.parametrize("args, result", build_urls_data)
def test_build_url(args, result):
assert Yuki._build_url(*args) == result
The next thing is testing the _make_request
method. Using the mocks is really useful here. A Mock is an object that simulates the behavior of a real object. If the request is mocked, the received data won’t change over time. The connection without external service doesn’t exist, so the test won’t fail if the third-party server has a problem. The tests also won’t make unnecessary API calls. The tests with mocked requests are also faster. I need to test get requests with a correct response and with the wrong query parameters.
@pytest.fixture
def fake_response():
with open("tests/resources/drivers.json") as f:
return json.load(f)
def test_make_request_witout_caching(mocker, fake_response):
fake_resp = mocker.Mock()
fake_resp.json = mocker.Mock(return_value=fake_response)
fake_resp.status_code = HTTPStatus.OK
mocker.patch("yukinator.yukinator.requests.get", return_value=fake_resp)
resp = Yuki(cache_enabled=False)._make_request(
"https://ergast.com/api/f1/2022/4/drivers.json?limit=1000"
)
assert resp == fake_response
@pytest.fixture
def fake_wrong_response():
with open("tests/resources/wrong_request.json") as f:
return json.load(f)
def test_make_request_with_wrong_query(mocker, fake_wrong_response):
fake_resp = mocker.Mock()
fake_resp.json = mocker.Mock(return_value=fake_wrong_response)
fake_resp.status_code = HTTPStatus.OK
mocker.patch("yukinator.yukinator.requests.get", return_value=fake_resp)
with pytest.raises(WrongQueryParameters):
Yuki(cache_enabled=False)._make_request(
"https://ergast.com/api/f1/1949/drivers.json?limit=1000"
)
As you can see I use the fixtures again. Using the pytest.raises
context manager is really useful for testing if the exceptions are raised properly.
I think the last thing reasonable to test is one of the methods directly connected with endpoints. I choose the get_reces
method because the DTO for this is the most complicated, it uses datetime converters, nested objects converters, and part of the response is also flattened. For this test, I only check if the type of the response is a list. I don’t think I should test the rest of the methods, because they have the same logic.
@pytest.fixture
def races_response():
with open("tests/resources/races.json") as f:
return json.load(f)
def test_get_races(mocker, races_response):
fake_resp = mocker.Mock()
fake_resp.json = mocker.Mock(return_value=races_response)
fake_resp.status_code = HTTPStatus.OK
mocker.patch("yukinator.yukinator.requests.get", return_vakue=fake_resp)
resp = Yuki(cache_enabled=False).get_races(2022, 4)
assert type(resp) == list
As you can see all the tests are successful.
Tiny important things
Docs
For documentation I usually use the classic tool - Sphinx and host it on ReadTheDocs. Most of the python docs are written with the help of Sphinx. I think the only thing worth mentioning here is that sphinx-apidoc
is useful thing for automatically generating the documentation from docstrings.
Github Actions
Github Actions is a CI/CD tool. It makes automating the software workflows really easy. Here is my basic .yml which builds the envs with python 3.9 and 3.10, builds the package, and tests it with pytest.
name: Github Actions
on: [push]
jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.9", "3.10"]
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install flake8 pytest pytest-mock
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with pytest
run: |
pytest
Poetry
Poetry is a python packaging and dependency management tool. To be honest it makes that process a lot easier compared to the classic one. Let’s start with creating the pyproject.toml
- the file which contains the build system requirements of the project. Adding the external libraries goes with poetry add
command and the dependencies are automatically contained in the pyproject.toml
.
[tool.poetry]
name = "yukinator"
version = "0.1.0"
description = "Unofficial Ergast API wrapper"
authors = ["brozen <szymon.mazurkievicz@gmail.com>"]
license = "MIT"
homepage = "https://github.com/BrozenSenpai/yukinator"
keywords = ["Ergast API"]
readme = "README.md"
[tool.poetry.dependencies]
python = "^3.9"
attrs = "^21.4.0"
requests-cache = "^0.9.4"
[tool.poetry.dev-dependencies]
pytest = "^7.1"
pytest-mock = "^3.6"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
The requirements.txt
file can be exported from the poetry.lock
file with poetry export -f requirements.txt --output requirements.txt
command.
Now the package can be published to the previously configured PyPI project with the poetry publish
command
Conclusion
I am really satisfied with this project. I think it is a small but end-to-end project and I could handle it in a very quick time. I enjoy it tbh. I hope it is not bugged, especially from the third-party site. I mean it is impossible to check all of the possible responses, so they can have some surprises, but I hope they don’t. Here is a repo. Cya in the next post!
References
- https://dev.to/izabelakowal/some-ideas-on-how-to-implement-dtos-in-python-be3
- https://changhsinlee.com/pytest-mock/
- https://opensource.com/article/21/9/unit-test-python
- https://towardsdatascience.com/testing-mock-requests-for-api-wrapper-752359807ad7
- https://towardsdatascience.com/packages-part-2-how-to-publish-test-your-package-on-pypi-with-poetry-9fc7295df1a5