GrandPrix Circus - Selenium bots and ZMQ
Introduction
In the last post, I prepared the Mongo database instance and an API for it. It is time to use it. In this post, I will create two scripts that will sign in to Twitter and Instagram and obtain the basic data about a few new posts from the chosen profiles. The data from the two sources will be pushed to the message queue by the ZMQ producers. The consumer will take care of loading the messages to the database using an API one by one in the established time interval. The selenium bots won’t be perfect. Probably I will get banned quickly, but I will use some tricks to make them at least a bit human-like. The best way to avoid blocking websites would be using a proxy pool, but I think it is overkill for a small blog project.
Let’s get to work!
Selenium bots
To make the scrappers less detectable I will use the undetected-chromedriver. This is an optimized Selenium Chromedriver patch that does not trigger the popular anti-bot services. It does not hide the IP, so chances for the block are still quite high. Both bots will create a list of last saved ids, to make sure the content is not duplicated. The scrapper services will be scheduled inside their containers using CRON.
Twitter bot
The whole script is just one function - process_tweets
. Let’s start with loading the JSON list of the last saved IDs and binding the ZMQ publisher on port 5555.
import os
import time
import json
import random
import zmq
import undetected_chromedriver as uc
from selenium import webdriver
from selenium.webdriver import ChromeOptions, Keys
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
def process_tweets(username: str, password: str) -> None:
try:
with open('last_saved_ids.json', 'r') as file:
last_saved_ids = json.load(file)
except FileNotFoundError:
last_saved_ids = []
context = zmq.Context()
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5555")
Now I set the Chrome options. The browser runs in headless mode because it will run in the docker instance anyway. I also disable dev shm partition usage, because the /dev/shm partition is too small in certain VM environments, causing Chrome to fail or crash. It happens in docker images frequently.
options = ChromeOptions()
options.add_argument("-headless")
options.add_argument('--disable-dev-shm-usage')
driver = uc.Chrome(headless=True, use_subprocess=False, options=options, executable_path="/usr/local/bin/chromedriver")
It is time to make the real movements on the website. Let’s open the Twitter log in page and sign in. I let it 20 seconds to find the correct fields using CSS selectors. At the end, I stop the script for a random amount of seconds between 10 and 20 to make sure the login process is completed. I will randomize all of the sleep times to make pattern recognition harder.
url = "https://twitter.com/i/flow/login"
driver.get(url)
username_input = WebDriverWait(driver, 20).until(EC.visibility_of_element_located((By.CSS_SELECTOR, 'input[autocomplete="username"]')))
username_input.send_keys(username)
username_input.send_keys(Keys.ENTER)
password_input = WebDriverWait(driver, 20).until(EC.visibility_of_element_located((By.CSS_SELECTOR, 'input[name="password"]')))
password_input.send_keys(password)
password_input.send_keys(Keys.ENTER)
time.sleep(random.randint(10, 20))
The last thing to do is iterate over the dictionary with the profile names and real names. It could be a list with just accounts, but I have an author field, and I would like to have the name here. The bot will go to every profile and scrape data about all visible tweets. If the content is not already scrapped ZMQ publisher will send it to the queue in JSON format.
profiles = {"ChrisMedlandF1":"Chris Medland", "andrewbensonf1": "Andrew Benson", "MBrundleF1": "Martin Brudle", "F1": "Formula 1"}
shuffled_keys = list(profiles.keys())
random.shuffle(shuffled_keys)
shuffled_profiles = {key: profiles[key] for key in shuffled_keys}
ids = []
for k, v in shuffled_profiles.items():
profile_url = f"https://twitter.com/{k}"
driver.get(profile_url)
time.sleep(random.randint(5, 10))
try:
tweets = WebDriverWait(driver, 20).until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, '[data-testid="tweet"]')))
for tweet in tweets:
tweet_txt = tweet.text.lower()
if "reposted" not in tweet_txt and "pinned" not in tweet_txt:
try:
created_at = tweet.find_element(By.XPATH, '//time[@datetime]').get_attribute('datetime')
anchor = tweet.find_element(By.CSS_SELECTOR, "a[aria-label][dir]").get_attribute("href")
last_tweet_id = anchor.split("/status/")[1]
if last_tweet_id not in last_saved_ids:
message = {"contentId": last_tweet_id, "source": profile_url, "author": v, "tag": "tweet", "title": f"{v} posted a new tweet!", 'creationTime': created_at}
publisher.send(json.dumps(message).encode("utf-8"))
else:
ids.append(last_tweet_id)
except Exception as e:
print(e)
except Exception as e:
print(e)
time.sleep(random.randint(3, 30))
with open('last_saved_ids.json', 'w') as file:
json.dump(ids, file)
if __name__ == "__main__":
your_username = os.environ["TWITTER_ACC"]
your_password = os.environ["TWITTER_PASSWD"]
process_tweets(your_username, your_password)
Let’s create a Dockerfile also. It installs the wget, unzip, cron, and chromium. I need them to download and unzip the Chrome driver and schedule the script.
FROM python:3.10-slim-bullseye
RUN apt update --fix-missing && \
apt install -y wget unzip gnupg cron
RUN wget https://chromedriver.storage.googleapis.com/114.0.5735.90/chromedriver_linux64.zip
RUN unzip chromedriver_linux64.zip -d /usr/local/bin/
RUN rm chromedriver_linux64.zip
RUN apt install -y -qq chromium
WORKDIR /code
COPY ./requirements.txt ./requirements.txt
RUN pip install --no-cache-dir --upgrade -r ./requirements.txt
COPY ./get_tweets.py ./entrypoint.sh ./
RUN chmod +x ./entrypoint.sh
COPY cronjobs /etc/cron.d/cronjobs
RUN chmod 0644 /etc/cron.d/cronjobs
RUN crontab /etc/cron.d/cronjobs
ENTRYPOINT ["sh", "./entrypoint.sh"]
For example, the cronjobs file can look like this. It will run the script every 3 hours.
0 */3 * * * cd /code && /usr/local/bin/python3 get_tweets.py >> log.txt 2>&1
The entry point script is defined like this. It also returns the standard output and error to the log file.
python3 get_tweets.py >> log.txt 2>&1 && cron -f
Having an entry point ensures that the script will run after building the container. Otherwise, it would wait three hours. It is possible to use CMD
instead of ENTRYPOINT
if you would like to have the possibility to override it from the docker commands. The twitter-bot service should be also included in the docker-compose file:
twitter-bot:
build: ./data_acquisition_services/twitter_data_service
container_name: twitter-bot
depends_on:
- "consumer-service"
environment:
- TWITTER_ACC=${TWITTER_ACC}
- TWITTER_PASSWD=${TWITTER_PASSWD}
expose:
- 5555
For testing on the local host map the port instead of just exposing it.
Instagram bot
The scrapper for Instagram follows the same schema. Firstly I bind the ZMQ publisher on a different port than the previous bot. Next, I set up the Chrome driver, sign in to the website, and then I go to the desired profiles. Here I scroll the page 3 times because it usually does not have a lot of content by default. I do not scrape the post creation date, because it does not exist on the profile page. If you would like to you need to click on the post (for example just go to the URL specified using the already scrapped post id), and find the first datetime (it has more, comments have it also). I will set it in the consumer service, just because I do not want to increase the running time.
import os
import time
import json
import random
import undetected_chromedriver as uc
import zmq
from selenium import webdriver
from selenium.webdriver import ChromeOptions, Keys
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
def process_posts(username, password) -> None:
try:
with open('last_saved_ids.json', 'r') as file:
last_saved_ids = json.load(file)
except FileNotFoundError:
last_saved_ids = []
context = zmq.Context()
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5556")
options = ChromeOptions()
options.add_argument("-headless")
options.add_argument('--disable-dev-shm-usage')
driver = uc.Chrome(headless=True, use_subprocess=False, options=options, executable_path="/usr/local/bin/chromedriver")
url = "https://www.instagram.com/"
driver.get(url)
username_input = WebDriverWait(driver, 40).until(EC.presence_of_element_located((By.CSS_SELECTOR, 'input[name="username"]')))
username_input.send_keys(username)
password_input = WebDriverWait(driver, 40).until(EC.presence_of_element_located((By.CSS_SELECTOR, 'input[name="password"]')))
password_input.send_keys(password)
password_input.send_keys(Keys.ENTER)
time.sleep(random.randint(10, 20))
profiles = {"f1troll": "F1 MEMES", "carlossainz55": "Carlos Sainz", "pierregasly": "Pierre Gasly",
"lewishamilton": "Lewis Hamilton", "maxverstappen1": "Max Verstappen", "schecoperez": "Checo Perez",
"charles_leclerc": "Charles Leclerc", "georgerussell63": "George Russell", "fernandoalo_oficial": "Fernando Alonso"}
shuffled_keys = list(profiles.keys())
random.shuffle(shuffled_keys)
shuffled_profiles = {key: profiles[key] for key in shuffled_keys}
ids = []
for k, v in shuffled_profiles.items():
profile_url = url + k
driver.get(profile_url)
for _ in range(random.randint(1, 3)):
driver.find_element(By.TAG_NAME, 'body').send_keys(Keys.PAGE_DOWN)
time.sleep(random.randint(1, 4))
post_links = driver.find_elements(By.XPATH, "//a[contains(@href, '/p/')]")
for link in post_links:
post_id = link.get_attribute("href").split("/")[-2]
if post_id not in last_saved_ids:
message = {"contentId": post_id, "source": profile_url, "author": v, "tag": "insta", "title": f"{v} posted a new insta photo!", "creationTime": None}
publisher.send(json.dumps(message).encode("utf-8"))
ids.append(post_id)
time.sleep(random.randint(3, 30))
with open('last_saved_ids.json', 'w') as file:
json.dump(ids, file)
if __name__ == "__main__":
your_username = os.environ["INSTA_ACC"]
your_password = os.environ["INSTA_PASSWD"]
process_posts(your_username, your_password)
The Docker part is identical like for the Twitter bot, so I won’t waste space and paste it here again.
Consumer service
Consumer service is straightforward. In ZeroMQ, a poller is a mechanism used for efficiently handling multiple sockets or file descriptors in an event-driven architecture. It allows a program to wait for events on multiple sockets without blocking on any one socket. The poller provides a way to monitor multiple sockets for events such as incoming messages, connection status changes, or other events of interest. Basically, it handles the messages coming from more than one source. The service lives forever (if there are no errors ofc). It loads the received JSON, creates a creation time field if it is none, and then sends it to the database using an API created in the previous post. It waits 300 seconds before handling the next message in the queue. I would not like to have 1562412342385285 posts with the same timestamp in the application.
import json
import os
import time
from datetime import datetime
import requests
import zmq
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://twitter-bot:5555")
subscriber.connect("tcp://instagram-bot:5556")
subscriber.setsockopt_string(zmq.SUBSCRIBE, "")
poller = zmq.Poller()
poller.register(subscriber, zmq.POLLIN)
API_KEY = os.environ["API_KEY"]
headers = {
'Content-Type': 'application/json',
'x-api-key': API_KEY
}
while True:
socks = dict(poller.poll())
if subscriber in socks and socks[subscriber] == zmq.POLLIN:
message = subscriber.recv()
received_data = json.loads(message.decode("utf-8"))
if received_data["creationTime"] is None:
received_data["creationTime"] = datetime.now().isoformat()
res = requests.post("http://gpc-api:8000/content/", json=received_data, headers=headers)
time.sleep(300)
Dockerfile and compose don’t need a comment I think.
FROM python:3.10-slim-bullseye
WORKDIR /app
RUN pip install pyzmq requests
COPY consumer.py ./
ENTRYPOINT ["python", "consumer.py"]
consumer-service:
build: ./consumer_service
container_name: consumer-service
environment:
- API_KEY=${API_KEY}
depends_on:
- "gpc-api"
Now just run it with docker-compose up -d
.
Conclusion
The data is ready to power up the application now. The ZMQ is a perfect tool to create a small, learning system with the usage of queues (I know it is used for the big ones too). I really enjoy it, however, I would like to try Kafka in some future projects also. Also, remember that scrapping websites is not always considered legal. For the commercial project, I would rather pay Twitter for the API access or spend more time on figuring out how to use Meta API for Instagram. If the Grand Prix Circus were a commercial app, I would not use automated data acquisition at all. The website would need the editorial office to provide quality content. I would also give the possibility to create posts for the community. In the next post, I will start creating the front-end - application. Cya!