GrandPrix Circus - Selenium bots and ZMQ

8 minute read

Introduction

In the last post, I prepared the Mongo database instance and an API for it. It is time to use it. In this post, I will create two scripts that will sign in to Twitter and Instagram and obtain the basic data about a few new posts from the chosen profiles. The data from the two sources will be pushed to the message queue by the ZMQ producers. The consumer will take care of loading the messages to the database using an API one by one in the established time interval. The selenium bots won’t be perfect. Probably I will get banned quickly, but I will use some tricks to make them at least a bit human-like. The best way to avoid blocking websites would be using a proxy pool, but I think it is overkill for a small blog project.

Let’s get to work!

Selenium bots

To make the scrappers less detectable I will use the undetected-chromedriver. This is an optimized Selenium Chromedriver patch that does not trigger the popular anti-bot services. It does not hide the IP, so chances for the block are still quite high. Both bots will create a list of last saved ids, to make sure the content is not duplicated. The scrapper services will be scheduled inside their containers using CRON.

Twitter bot

The whole script is just one function - process_tweets. Let’s start with loading the JSON list of the last saved IDs and binding the ZMQ publisher on port 5555.

import os
import time
import json
import random

import zmq
import undetected_chromedriver as uc
from selenium import webdriver
from selenium.webdriver import ChromeOptions, Keys
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait

def process_tweets(username: str, password: str) -> None:
    try:
        with open('last_saved_ids.json', 'r') as file:
            last_saved_ids = json.load(file)
    except FileNotFoundError:
        last_saved_ids = []


    context = zmq.Context()
    publisher = context.socket(zmq.PUB)
    publisher.bind("tcp://*:5555")

Now I set the Chrome options. The browser runs in headless mode because it will run in the docker instance anyway. I also disable dev shm partition usage, because the /dev/shm partition is too small in certain VM environments, causing Chrome to fail or crash. It happens in docker images frequently.

    options = ChromeOptions()
    options.add_argument("-headless")
    options.add_argument('--disable-dev-shm-usage')        
    
    driver = uc.Chrome(headless=True, use_subprocess=False, options=options, executable_path="/usr/local/bin/chromedriver")

It is time to make the real movements on the website. Let’s open the Twitter log in page and sign in. I let it 20 seconds to find the correct fields using CSS selectors. At the end, I stop the script for a random amount of seconds between 10 and 20 to make sure the login process is completed. I will randomize all of the sleep times to make pattern recognition harder.

    url = "https://twitter.com/i/flow/login"
    driver.get(url)

    username_input = WebDriverWait(driver, 20).until(EC.visibility_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="username"]')))
    username_input.send_keys(username)
    username_input.send_keys(Keys.ENTER)

    password_input = WebDriverWait(driver, 20).until(EC.visibility_of_element_located((By.CSS_SELECTOR, 'input[name="password"]')))
    password_input.send_keys(password)
    password_input.send_keys(Keys.ENTER)

    time.sleep(random.randint(10, 20))

The last thing to do is iterate over the dictionary with the profile names and real names. It could be a list with just accounts, but I have an author field, and I would like to have the name here. The bot will go to every profile and scrape data about all visible tweets. If the content is not already scrapped ZMQ publisher will send it to the queue in JSON format.

    profiles = {"ChrisMedlandF1":"Chris Medland", "andrewbensonf1": "Andrew Benson", "MBrundleF1": "Martin Brudle", "F1": "Formula 1"}

    shuffled_keys = list(profiles.keys())
    random.shuffle(shuffled_keys)
    shuffled_profiles = {key: profiles[key] for key in shuffled_keys}

    ids = []

    for k, v in shuffled_profiles.items():
        profile_url = f"https://twitter.com/{k}"
        driver.get(profile_url)
        time.sleep(random.randint(5, 10))
        try:
            tweets = WebDriverWait(driver, 20).until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, '[data-testid="tweet"]')))
            for tweet in tweets:
                tweet_txt = tweet.text.lower()
                if "reposted" not in tweet_txt and "pinned" not in tweet_txt:
                    try:
                        created_at = tweet.find_element(By.XPATH, '//time[@datetime]').get_attribute('datetime')
                        anchor = tweet.find_element(By.CSS_SELECTOR, "a[aria-label][dir]").get_attribute("href")
                        last_tweet_id = anchor.split("/status/")[1]
                        if last_tweet_id not in last_saved_ids:
                            message = {"contentId": last_tweet_id, "source": profile_url, "author": v, "tag": "tweet", "title": f"{v} posted a new tweet!", 'creationTime': created_at}
                            publisher.send(json.dumps(message).encode("utf-8"))
                        else:
                        ids.append(last_tweet_id)
                    except Exception as e:
                        print(e)
        except Exception as e:
            print(e)
    time.sleep(random.randint(3, 30))


    with open('last_saved_ids.json', 'w') as file:
        json.dump(ids, file)

if __name__ == "__main__":
    your_username = os.environ["TWITTER_ACC"]
    your_password = os.environ["TWITTER_PASSWD"]

    process_tweets(your_username, your_password)

Let’s create a Dockerfile also. It installs the wget, unzip, cron, and chromium. I need them to download and unzip the Chrome driver and schedule the script.

FROM python:3.10-slim-bullseye

RUN apt update --fix-missing && \
    apt install -y wget unzip gnupg cron

RUN wget https://chromedriver.storage.googleapis.com/114.0.5735.90/chromedriver_linux64.zip

RUN unzip chromedriver_linux64.zip -d /usr/local/bin/

RUN rm chromedriver_linux64.zip

RUN apt install -y -qq chromium

WORKDIR /code

COPY ./requirements.txt ./requirements.txt

RUN pip install --no-cache-dir --upgrade -r ./requirements.txt

COPY ./get_tweets.py ./entrypoint.sh ./

RUN chmod +x ./entrypoint.sh

COPY cronjobs /etc/cron.d/cronjobs

RUN chmod 0644 /etc/cron.d/cronjobs

RUN crontab /etc/cron.d/cronjobs

ENTRYPOINT ["sh", "./entrypoint.sh"]

For example, the cronjobs file can look like this. It will run the script every 3 hours.

0 */3 * * * cd /code && /usr/local/bin/python3 get_tweets.py >> log.txt 2>&1

The entry point script is defined like this. It also returns the standard output and error to the log file.

python3  get_tweets.py  >>  log.txt  2>&1 && cron  -f

Having an entry point ensures that the script will run after building the container. Otherwise, it would wait three hours. It is possible to use CMD instead of ENTRYPOINT if you would like to have the possibility to override it from the docker commands. The twitter-bot service should be also included in the docker-compose file:

 twitter-bot:
   build: ./data_acquisition_services/twitter_data_service
   container_name: twitter-bot
   depends_on:
     - "consumer-service"
   environment:
     - TWITTER_ACC=${TWITTER_ACC}
     - TWITTER_PASSWD=${TWITTER_PASSWD}
   expose:
     - 5555

For testing on the local host map the port instead of just exposing it.

Instagram bot

The scrapper for Instagram follows the same schema. Firstly I bind the ZMQ publisher on a different port than the previous bot. Next, I set up the Chrome driver, sign in to the website, and then I go to the desired profiles. Here I scroll the page 3 times because it usually does not have a lot of content by default. I do not scrape the post creation date, because it does not exist on the profile page. If you would like to you need to click on the post (for example just go to the URL specified using the already scrapped post id), and find the first datetime (it has more, comments have it also). I will set it in the consumer service, just because I do not want to increase the running time.

import os
import time
import json
import random

import undetected_chromedriver as uc
import zmq
from selenium import webdriver
from selenium.webdriver import ChromeOptions, Keys
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait


def process_posts(username, password) -> None:
    try:
        with open('last_saved_ids.json', 'r') as file:
            last_saved_ids = json.load(file)
    except FileNotFoundError:
        last_saved_ids = []

    context = zmq.Context()
    publisher = context.socket(zmq.PUB)
    publisher.bind("tcp://*:5556")

    options = ChromeOptions()
    options.add_argument("-headless")
    options.add_argument('--disable-dev-shm-usage')        

    driver = uc.Chrome(headless=True, use_subprocess=False, options=options, executable_path="/usr/local/bin/chromedriver")

    url = "https://www.instagram.com/"
    driver.get(url)

    username_input = WebDriverWait(driver, 40).until(EC.presence_of_element_located((By.CSS_SELECTOR, 'input[name="username"]')))
    username_input.send_keys(username)
    
    password_input = WebDriverWait(driver, 40).until(EC.presence_of_element_located((By.CSS_SELECTOR, 'input[name="password"]')))
    password_input.send_keys(password)
    password_input.send_keys(Keys.ENTER)

    time.sleep(random.randint(10, 20))

    profiles = {"f1troll": "F1 MEMES", "carlossainz55": "Carlos Sainz", "pierregasly": "Pierre Gasly", 
    "lewishamilton": "Lewis Hamilton", "maxverstappen1": "Max Verstappen", "schecoperez": "Checo Perez",
    "charles_leclerc": "Charles Leclerc", "georgerussell63": "George Russell", "fernandoalo_oficial": "Fernando Alonso"}

    shuffled_keys = list(profiles.keys())
    random.shuffle(shuffled_keys)
    shuffled_profiles = {key: profiles[key] for key in shuffled_keys}

    ids = []

    for k, v in shuffled_profiles.items():
        profile_url = url + k
        driver.get(profile_url)

        for _ in range(random.randint(1, 3)):
            driver.find_element(By.TAG_NAME, 'body').send_keys(Keys.PAGE_DOWN)
            time.sleep(random.randint(1, 4)) 

        post_links = driver.find_elements(By.XPATH, "//a[contains(@href, '/p/')]")

        for link in post_links:
            post_id = link.get_attribute("href").split("/")[-2]
            if post_id not in last_saved_ids:
                message = {"contentId": post_id, "source": profile_url, "author": v, "tag": "insta", "title": f"{v} posted a new insta photo!", "creationTime": None}
                publisher.send(json.dumps(message).encode("utf-8"))
            ids.append(post_id)

        time.sleep(random.randint(3, 30))

    with open('last_saved_ids.json', 'w') as file:
        json.dump(ids, file)

if __name__ == "__main__":
    your_username = os.environ["INSTA_ACC"]
    your_password = os.environ["INSTA_PASSWD"]

    process_posts(your_username, your_password)

The Docker part is identical like for the Twitter bot, so I won’t waste space and paste it here again.

Consumer service

Consumer service is straightforward. In ZeroMQ, a poller is a mechanism used for efficiently handling multiple sockets or file descriptors in an event-driven architecture. It allows a program to wait for events on multiple sockets without blocking on any one socket. The poller provides a way to monitor multiple sockets for events such as incoming messages, connection status changes, or other events of interest. Basically, it handles the messages coming from more than one source. The service lives forever (if there are no errors ofc). It loads the received JSON, creates a creation time field if it is none, and then sends it to the database using an API created in the previous post. It waits 300 seconds before handling the next message in the queue. I would not like to have 1562412342385285 posts with the same timestamp in the application.

import json
import os
import time
from datetime import datetime

import requests
import zmq

context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://twitter-bot:5555")
subscriber.connect("tcp://instagram-bot:5556")
subscriber.setsockopt_string(zmq.SUBSCRIBE, "")

poller = zmq.Poller()
poller.register(subscriber, zmq.POLLIN)

API_KEY = os.environ["API_KEY"]

headers = {
    'Content-Type': 'application/json',
    'x-api-key': API_KEY
}

while True:
    socks = dict(poller.poll())
    if subscriber in socks and socks[subscriber] == zmq.POLLIN:
        message = subscriber.recv()
    received_data = json.loads(message.decode("utf-8"))
    if received_data["creationTime"] is None:
        received_data["creationTime"] = datetime.now().isoformat()
    res = requests.post("http://gpc-api:8000/content/", json=received_data, headers=headers)
    time.sleep(300)

Dockerfile and compose don’t need a comment I think.

FROM python:3.10-slim-bullseye

WORKDIR /app

RUN pip install pyzmq requests

COPY consumer.py ./

ENTRYPOINT ["python", "consumer.py"]
 consumer-service:
   build: ./consumer_service
   container_name: consumer-service
   environment:
     - API_KEY=${API_KEY}
   depends_on: 
     - "gpc-api"

Now just run it with docker-compose up -d.

Conclusion

The data is ready to power up the application now. The ZMQ is a perfect tool to create a small, learning system with the usage of queues (I know it is used for the big ones too). I really enjoy it, however, I would like to try Kafka in some future projects also. Also, remember that scrapping websites is not always considered legal. For the commercial project, I would rather pay Twitter for the API access or spend more time on figuring out how to use Meta API for Instagram. If the Grand Prix Circus were a commercial app, I would not use automated data acquisition at all. The website would need the editorial office to provide quality content. I would also give the possibility to create posts for the community. In the next post, I will start creating the front-end - application. Cya!