Twitter API - random tweets generator

9 minute read

Introduction

I am a Twitter user for almost a decade. Every day I spend some time reading tweets from people connected with my hobbies. In this entry, I would like to make a simple ETL pipeline using Twitter API and set it to run once a week. However, it would be so short post with just a pipeline. Luckily I know some NLP and Flask basics. This is enough to make a random tweet generator web app based on tweets from the most popular Formula 1’s journalists. The chosen ones are Andrew Benson, Chris Medland, Adam Cooper, and Edd Straw.

General architecture

This is a graph presenting the general architecture of this mini-project. It might be not perfect but I hope it is clear enough.

architecture

I do not like to over-complicate things. The project is rather simple. It contains just SQLite database and python libraries. Tweepy is a library that makes accessing Twitter API a lot easier. Markovify is an awesome simple Markov chain generator. Of course, there are better methods for text generation like LSTM, GAN, or GPT-2. However, my primary goal is to show the process of creating an ETL pipeline. Displaying a randomly generated tweet is just an addition. It does not need to be accurate, but I can be funny. The ETL script will be simple, so Ofelia (modern replacement for cron in docker environments) is enough to schedule it instead of using a framework like Airflow. Someone could wonder why sentences are not generated instantly within the first transform. The answer is Twitter API has some limitations and it’s not possible to fetch a large number of tweets in no time. I would like to build a large database so the generated tweets should be better every week. Scheduled script and a simple flask app will be containerized via Docker as separated services. The directory and file structure should look like this:

 1tweets_generator
 2|   .gitignore
 3|   .dockerignore
 4|   .env
 5|   docker-compose.yaml
 6|___src
 7|   |___etl
 8|   |   |   requirements.txt
 9|   |   |   Dockerfile
10|   |   |   tweets_etl.py
11|   |   |___utils
12|   |       |   __init__.py
13|   |       |   config.py
14|   |       |   twitter_auth.py
15|   |       |   sqlite_connector.py
16|   |___app
17|   |   |   requirements.txt
18|   |   |   Dockerfile
19|   |   |   app.py
20|   |   |___static
21|   |   |   |   style.css
22|   |   |___templates
23|   |       |   app.html

ETL

Utils

In the utils directory I am going to store config.py with environment variables, sqlite_connector, twitter_auth.py with Twitter API configuration and authentication. In the config files the standard os.environ takes care of environment variables. It will be important to configure them also in docker-compose.yaml, but I am going to show this in the last part of the post. Tweepy makes authenticating with the Twitter API super simple. The connect_sqlite function is useful because I will set up a connection with the database multiple times.

1#config
2import os
3
4
5consumer_key = os.environ['CONSUMER_KEY']
6consumer_secret = os.environ['CONSUMER_SECRET']
7access_token = os.environ['ACCESS_TOKEN']
8access_token_secret = os.environ['ACESS_TOKEN_SECRET']
 1#twitter_auth
 2import tweepy
 3
 4from utils.config import consumer_key, consumer_secret,
 5access_token, access_token_secret
 6
 7
 8def tweepy_connect():
 9    # twitter api config
10    auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
11    auth.set_access_token(access_token, access_token_secret)
12    return tweepy.API(auth)
1#sqlite_connector
2import sqlite3
3
4
5def connect_sqlite():
6    return sqlite3.connect('/code/db/tweets_db.db')

Get user’s tweets

This is the crucial part of the entire ETL process. Without original tweets, it is impossible to generate funny random ones. The tweepy’s user_timeline method is the right tool to acquire the user’s tweets. It returns the list of objects, I need just the username, the id of the tweet, and the text. It is convenient to save them as a pandas data frame. I would also like to have no duplicates in the database. My solution is to query the maximum id of the tweet for the user and set the since_id parameter in the user_timeline method to that value. The since_id is None only if the process is running the first time (database is empty).

 1def get_user_tweets(username: str, count: int) -> pd.DataFrame:
 2    try:
 3        # set connection with database
 4        conn = connect_sqlite()
 5        cur = conn.cursor()
 6    # get maximum id from tweets_table
 7        cur.execute(
 8            'SELECT MAX(id) FROM tweets_table WHERE name = ?', (username,))
 9    # save query result
10        since_id = cur.fetchone()[0]
11        conn.close()
12    except:
13        since_id = None
14    # set connection with twitter api via tweepy
15    api = tweepy_connect()
16    # get the last count of tweets from the user's timeline with id higher than max id in database
17    tweets = api.user_timeline(screen_name=username,
18                               count=count, include_rts=False, tweet_mode='extended', since_id=since_id)
19    # save tweets to pandas dataframe
20    df = pd.DataFrame(data=[(tweet.id, tweet.full_text, username)
21                            for tweet in tweets], columns=['id', 'tweets', 'name'])
22    return df

Preprocess tweets

The posts in social medias are full of urls, hashtags (#), mentions (@) or mistakes like double spaces. It is good idea to get rid of them before modeling. It can be done using simple regex with pandas replace method. It is possible to have an empty tweets after removing hashtags, mentions and urls so there is a need to drop them also.

 1def preprocess_tweets(df: pd.DataFrame) -> pd.DataFrame:
 2   # remove hashtags and mentions
 3   df.tweets = df.tweets.replace("(@|#)[A-Za-z0-9_]+", "", regex=True)
 4   # remove urls
 5   df.tweets = df.tweets.replace(r"(http\S+|www.\S+)", "", regex=True)
 6   # remove multiple spaces
 7   df.tweets = df.tweets.replace(r"\s+", ' ', regex=True)
 8   # remove empty rows
 9   df.tweets = df.tweets.replace('', np.nan)
10   df = df.dropna(axis=0, subset=['tweets'])
11   return df

Load tweets

This is a simple step - the data frame created before just needs to be appended to the table in the database using pandas to_sql method. In case of initial load, the table should be created before with the id as primary key.

 1def load_tweets(df: pd.DataFrame) -> None:
 2    # set connection with database
 3    conn = connect_sqlite()
 4    cur = conn.cursor()
 5    # create table (if not exists) with tweets, name and id as primary key
 6    cur.execute(
 7        'CREATE TABLE IF NOT EXISTS tweets_table (id INT, tweets TEXT, name TEXT, CONSTRAINT primary_key_constraint PRIMARY KEY (id))')
 8    # append dataframe to the existing table
 9    df.to_sql('tweets_table', conn, if_exists='append', index=False)
10    conn.commit()
11    conn.close()

Markovify tweets

This step includes extracting data and transforming in one function. Pandas read_sql method allows to query the table and save results to the data frame instantly. The markovify.NewlineText class is a good chose to build a model using data frame. Tweets have maximum length of 280 characters so restricting generated sentences also is a proper idea.

 1def markovify_tweets() -> pd.DataFrame:
 2    # set connection with database
 3    conn = connect_sqlite()
 4    # load tweets to dataframe
 5    df = pd.read_sql('SELECT tweets FROM tweets_table', conn)
 6    conn.close()
 7    # make a markov model
 8    model = markovify.NewlineText(df.tweets)
 9    # save 100 markovified sentences to dataframe
10    df = pd.DataFrame((model.make_short_sentence(280)
11                       for x in range(100)), columns=['sentence'])
12    return df

Load markovified tweets

This step is pretty similar to the first load. The main difference here is replacing the existing table instead of appending.

 1def load_markovified_tweets(df: pd.DataFrame) -> None:
 2    # set connection with database
 3    conn = connect_sqlite()
 4    cur = conn.cursor()
 5    # create table if not exists
 6    cur.execute(
 7        'CREATE TABLE IF NOT EXISTS markovified_tweets_table(sentence TEXT)')
 8    # replace existing table
 9    df.to_sql('markovified_tweets_table', conn,
10              if_exists='replace', index=False)
11    conn.commit()
12    conn.close()

Run

Every part of the ETL is already created. The get_user_tweets function must be invoked for every journalist separately. Pandas concat in a mix with simple mapping a list of journalists do the job here.

 1def run() -> None:
 2    # run whole etl process
 3    journalists = ['andrewbensonf1', 'ChrisMedlandF1',
 4                   'adamcooperf1', 'eddstrawf1']
 5    # get tweets from every listed user and concat them
 6    df = pd.concat(map(lambda x: get_user_tweets(
 7        x, 200), journalists), ignore_index=True)
 8    df = preprocess_tweets(df)
 9    load_tweets(df)
10    df = markovify_tweets()
11    load_markovified_tweets(df)
12
13
14if __name__ == '__main__':
15    run()

Dockerfile

In the Dockerfile for the ETL script, I just need to build a python environment. Commands to be executed by scheduler will be specified in the docker-compose so no cmd or entrypoint here.

1FROM python:3.9-slim
2
3WORKDIR /code
4
5COPY . .
6
7RUN pip install -r requirements.txt

App

Considering the simplicity of the app I won’t show the HTML or CSS files here.

Flask

The python code is not so complicated, because the app just shows random tweets from the database. The random tweet is selected from the table using ORDER BY RANDOM(). I know it is not so efficient way, but the table has only 100 rows so I suppose I can use it here.

 1import sqlite3
 2
 3from flask import Flask, render_template
 4
 5
 6app = Flask(__name__)
 7
 8
 9@app.route('/')
10def get_tweet():
11    # connect database
12    conn = sqlite3.connect('/code/db/tweets_db.db')
13    cur = conn.cursor()
14    # select random entry
15    cur.execute(
16        'SELECT sentence FROM markovified_tweets_table ORDER BY RANDOM() LIMIT 1')
17    tweet = cur.fetchone()[0]
18    conn.close()
19    return render_template('app.html', tweet=tweet)
20
21
22if __name__ == '__main__':
23    app.run(host="0.0.0.0", port=5000)

Dockerfile

Dockerfile contains building a python environment and running the app with a specified host and port.

1FROM python:3.9-slim
2
3WORKDIR /app
4
5COPY . .
6
7RUN pip install -r requirements.txt
8
9CMD python app.py 0.0.0.0:5000

Compose

The docker-compose is a tool for defining and running multi-container Docker applications. The first service defined is the tweets_etl. The image is built from the Dockerfile previously created. The environment variables are defined in the .env file. To let the compose automatically find them it is required to use ${variable} or $variable syntax. The first shell command - python tweets_etl.py is used to run the python script initially. It is important because the scheduler will be set to run weekly, I do not want to wait a week for the first run. The second one tail -f /dev/null keeps the container alive. The tweet-etl and the app images share the volume. In the labels, Ofelia scheduler is configured to run the specified command with specified environment variables weekly. The second service is the Ofelia. It depends on the tweets_etl service, so it will wait until the service has been started. For the app service built from the previously created Dockerfile ports must be specified.

 1version: '3'
 2services:
 3  tweets-etl:
 4    build:
 5      context: ./src/etl
 6      dockerfile: Dockerfile
 7    environment:
 8      CONSUMER_KEY: ${CONSUMER_KEY}
 9      CONSUMER_SECRET: ${CONSUMER_SECRET}
10      ACCESS_TOKEN: ${ACCESS_TOKEN}
11      ACCESS_TOKEN_SECRET: ${ACCESS_TOKEN_SECRET}
12    command: >
13            sh -c " python tweets_etl.py && tail -f /dev/null"
14    volumes:
15      - ${VOLUME}
16    labels:
17      ofelia.enabled: "true"
18      ofelia.job-exec.tweets-etl.schedule: "@weekly"
19      ofelia.job-exec.tweets-etl.command: "python tweets_etl.py"
20      ofelia.job-exec.tweets-etl.environment: '["CONSUMER_KEY=${CONSUMER_KEY}",
21      "CONSUMER_SECRET=${CONSUMER_SECRET}",
22      "ACCESS_TOKEN=${ACCESS_TOKEN}", "ACCESS_TOKEN_SECRET=${ACCESS_TOKEN_SECRET}"]'
23  ofelia:
24    image: mcuadros/ofelia:latest
25    restart: always
26    depends_on:
27      - tweets-etl
28    command: daemon --docker
29    volumes:
30      - /var/run/docker.sock:/var/run/docker.sock:ro
31  app:
32    build:
33      context: ./src/app
34      dockerfile: Dockerfile
35    container_name: flask
36    ports:
37      - "5000:5000"
38    volumes:
39      - ${VOLUME}

Now the whole infrastructure can be built and started using docker-compose up -d from the directory with docker-compose.yaml and .env. The site should be accesible at http://localhost:5000/.

app

The tweets mostly don’t make sense, but the code works fine. It is also probably not safe to run it in the production environment!

Conclusion

I hope the project is at least not bad because I am just starting with a more complicated usage of Docker. I also know that nowadays ETL tasks are usually scheduled in a framework like Airflow or Luigi in the cloud platforms with a few clicks but I just wanted to have it on my infrastructure. Nevertheless, I will probably use more advanced schedulers in future posts. Wish me luck!