By Asmik Nalmpatian and Lisa Wimmer – for Intro to NLP
In this notebook we demonstrate scraping Twitter data in Python.
Potential reasons for preferring Python over R for general web scraping include larger resources of online support (Python enjoys a vivid community in the field) and easier handling of web drivers required for actively navigating the internet (this stems from personal experience; running drivers from R frequently ran in to session crashes).
The content displayed here is to a very large extent work by colleagues of ours.
We thank them for sharing their code and refer any complaints to those two gentlemen :)
Goal: collect tweets by German MPs
Steps:
If needed, first install libraries via !pip install <library>
.
# Load libraries
import pandas as pd # data wrangling
import numpy as np # math operations
import math # math operations
import os # directories
import time # system time
import random # random number generation
import pickle # data compression
import re # regular expressions
import unidecode # regular expressions
import urllib.request # scraping
import requests # scraping
from bs4 import BeautifulSoup # scraping
import ctypes # interface to C
import tweepy # twitter
import sys # system limit (preventing infinite running)
sys.setrecursionlimit(100000)
import selenium # chrome driver
from selenium import webdriver # chrome driver
import selenium.common.exceptions as selexcept # exception handling
For this you need to have downloaded chromedriver.exe
from this website and stored it in the below file directory.
Note that the version of the driver must match the Chrome version you use (mismatches will throw an error in below code cell).
# Specify location of chromedriver.exe
chdriver_path = r'C:\\Users\\wimme\\Documents\\1_uni\\1_master\\consulting\\projects\\consulting\\1_scraping\\input\\chromedriver.exe'
# Set up selenium driver
options = webdriver.ChromeOptions()
options.add_argument('--ignore-certificate-errors')
options.add_argument('--incognito')
options.add_argument('--headless')
# Start driver (a Chrome browser window should open by itself)
driver = webdriver.Chrome(chdriver_path)
This part requires some moving around the website, which is why we use a selenium driver in the first place.
# Specify url and navigate to website
website = "https://www.bundestag.de/abgeordnete"
driver.get(website)
# Switch to list view - first, find "List" button
element = driver.find_element_by_class_name('icon-list-bullet')
# Click button
webdriver.ActionChains(driver).move_to_element(element).click(element).perform()
# Wait for list view to load
time.sleep(random.randint(15, 20))
# Count how many MPs are listed on the website (includes dropouts and successors)
len(driver.find_element_by_class_name('bt-list-holder').find_elements_by_tag_name('li'))
First, get name and party for each MP.
For much of the scraping we rely on the famous BeautifulSoup
package that makes parsing HTML texts easy.
Note that we still need to specify HTML nodes / CSS selectors found by, e.g., Chrome's developer tab or SelectorGadget.
# Set up empty list
abg_df = []
# Find names and party and append to list
for link in driver.find_element_by_class_name('bt-list-holder').find_elements_by_tag_name('li'):
abg_df.append({
'name': link.find_element_by_class_name('bt-teaser-person-text').\
find_element_by_tag_name('h3').text,
'party': link.find_element_by_class_name('bt-teaser-person-text').\
find_element_by_tag_name('p').text,
})
# Convert to pandas dataframe
abg_df = pd.DataFrame(abg_df)
# Separate names in first and last names
name_concat = abg_df['name'].str.split(", ", n = 1, expand = True)
abg_df['last_name'] = name_concat[0]
abg_df['first_name'] = name_concat[1]
abg_df.drop(columns = ['name'], inplace = True)
abg_df = abg_df.reindex(
columns = ['last_name', 'first_name'] + list(abg_df.columns[:-2]))
# Add columns for infors that will be scraped shortly
abg_df = abg_df.reindex(columns = abg_df.columns.tolist() +
['bundesland', 'wahlkreis_name', 'wahlkreis_nr', 'wahlkreis', 'username'])
# Inspect
abg_df.head()
Looking good! Now for the somewhat more complicated part.
The driver will now visit each MP's landing page and scrape their personal info, remote-controlling the browser window. You can watch it click and jump to pages :)
# Create range to loop over
abg_range = abg_df.index[abg_df['bundesland'].isnull()]
abg_range
for abg in abg_range:
try:
# (Re-)load list view (for all iterations)
driver.get(website)
element = driver.find_element_by_class_name('icon-list-bullet')
# Click to change to list view
webdriver.ActionChains(driver).move_to_element(element).click(element).perform()
# Wait for list view to load
time.sleep(random.randint(3, 5))
# Click to open individual page
driver.find_element_by_class_name('bt-list-holder').find_elements_by_tag_name('li')\
[abg].click()
# Wait for page to load
time.sleep(random.randint(3, 5))
# Convert page to soup
soup = BeautifulSoup(driver.page_source, 'lxml')
# Extract state (bundesland) and electoral district (wahlkreis)
bundesland = soup.find(
'div', attrs = {'class': 'col-xs-12 col-sm-6 bt-standard-content'}).h5.text
wahlkreis = soup.find(
'div', attrs = {'class': 'col-xs-12 col-sm-6 bt-standard-content'}).a.text \
if soup.find(
'div', attrs = {'class': 'col-xs-12 col-sm-6 bt-standard-content'}).a is not None \
else "n.a."
# Split wahlkreis in name and ID
wahlkreis_name = wahlkreis.split(':')[1].strip(' ') if wahlkreis \
not in ["n.a.", None] else ""
wahlkreis_nr = int(
wahlkreis.split(':')[0].strip('Wahlkreis').strip('')) if wahlkreis \
not in ["n.a.", None] else ""
# Extract social media account
social_media = {}
if len(soup.find_all('h5', string = 'Profile im Internet')) == 1:
for link in soup.find_all(class_ = 'bt-linkliste')[0].find_all('a'):
social_media[link['title']] = link.get('href')
abg_df.loc[abg, 'bundesland'] = bundesland
abg_df.loc[abg, 'wahlkreis'] = wahlkreis
abg_df.loc[abg, 'wahlkreis_name'] = wahlkreis_name
abg_df.loc[abg, 'wahlkreis_nr'] = wahlkreis_nr
abg_df.loc[abg, 'username'] = social_media['Twitter'] if 'Twitter' in social_media else ""
if abg%20 == 0:
print('Data for MP %s successfully retrieved' %abg)
# In case of IndexError or AttributeError, which occurs if page fails to load, try again
except (IndexError, AttributeError, selexcept.NoSuchElementException):
abg_range = abg_df.index[abg_df['bundesland'].isnull()]
ctypes.windll.user32.MessageBoxW(0, "MP data successfully scraped", "Progress Report")
abg_df.head(10)
For those MPs who have a Twitter account but do not list it on their bundestag.de landing page, we try to find this information on their respective party's website.
Luckily, these websites are static and do not require remote control via selenium.
Note: In case of the AfD party, Twitter accounts are not available on the official party website and must thus be gathered manually.
# Access party websites and convert into soups
fdp = "https://www.fdpbt.de/fraktion/abgeordnete"
source_fdp = requests.get(fdp).text
soup_fdp = BeautifulSoup(source_fdp, 'html.parser')
cdu = "https://www.cducsu.de/hier-stellt-die-cducsu-bundestagsfraktion-ihre-abgeordneten-vor"
source_cdu = requests.get(cdu).text
soup_cdu = BeautifulSoup(source_cdu, 'html.parser')
spd = "https://www.spdfraktion.de/abgeordnete/alle?wp=19&view=list&old=19"
source_spd = requests.get(spd).text
soup_spd = BeautifulSoup(source_spd, 'html.parser')
gruene = "https://www.gruene-bundestag.de/abgeordnete"
source_gruene = requests.get(gruene).text
soup_gruene = BeautifulSoup(source_gruene, 'html.parser')
# For Die Linke, one needs to extract Twitter accounts from each individual MP website
linke_base = "https://www.linksfraktion.de/fraktion/abgeordnete/"
# Website contains bins of MPs, according to last name
letters = [['a', 'e'], ['f', 'j'], ['k', 'o'], ['p', 't'], ['u', 'z']]
linke_name_bins = []
for letter in letters:
extension = f'{letter[0]}-bis-{letter[1]}/'
linke_name_bins.append(linke_base + extension)
# For each party, find appropriate parent node in soup
all_abg_cdu = soup_cdu.find_all(class_ = 'teaser delegates')
all_abg_spd = soup_spd.find_all(class_ = 'views-row')
extensions_gruene = soup_gruene.find_all('a', class_ = "abgeordneteTeaser__wrapper")
urlbase_gruene = 'https://www.gruene-bundestag.de'
all_abg_gruene = []
for a in extensions_gruene:
extension = a['href']
link = urlbase_gruene + str(extension)
all_abg_gruene.append(link)
all_abg_linke = []
for name_bin in linke_name_bins:
source = requests.get(name_bin).text
soup = BeautifulSoup(source, 'html.parser')
for abg in soup.find_all('div', attrs = {'class': 'col-xs-12 col-sm-12 col-md-6 col-lg-6'}):
extension = abg.find('h2').find('a')['href'].lstrip('/fraktion/abgeordnete/')
all_abg_linke.append(linke_base + extension)
# Scrape accounts from soups
twitter_list = []
# CDU/CSU
for abg in all_abg_cdu:
twitter = abg.find(class_ = 'twitter')
twitter_list.append(
{'party': "cdu_csu",
'name': abg.find('h2').find('span').text.strip(' '),
'twitter_ext': twitter.find('a', href = True)['href'] if twitter is not None else ""
}
)
# Gruene
for abg in all_abg_gruene:
abg_source = requests.get(abg).text
abg_soup = BeautifulSoup(abg_source, 'html.parser')
hrefss = []
twitter = ""
for x in abg_soup.find_all(class_ = "weitereInfoTeaser"):
for y in x.find_all('a', href = True):
z = y['href']
hrefss.append(z)
for i in hrefss:
if "twitter" not in i:
continue
else:
twitter = i
twitter_list.append(
{'party': "gruene",
'name': abg_soup.find('h1').text,
'twitter_ext': twitter
}
)
# Linke
for abg in all_abg_linke:
abg_source = requests.get(abg).text
abg_soup = BeautifulSoup(abg_source, 'html.parser')
twitter = abg_soup.find('a', text = re.compile('Twitter-Profil'))
twitter_list.append(
{'party': "linke",
'name': abg_soup.find('h1').text.strip(' '),
'twitter_ext': twitter['href'] if twitter is not None else ""
}
)
# SPD
for abg in all_abg_spd:
twitter = abg.find(class_ = 'ico_twitter')
twitter_list.append(
{'party': "spd",
'name': abg.find('h3').find('a').get_text().strip(' '),
'twitter_ext': twitter['href'] if twitter is not None else ""
}
)
# Convert to data frame
twitter_df = pd.DataFrame(twitter_list)
ctypes.windll.user32.MessageBoxW(0, "Twitter accounts successfully scraped", "Progress Report")
twitter_df.head()
First, we define a regex-based function that (repeatedly) splits names and discards unwanted sequences such as academic titles.
def name_prep(name, twitter = True):
interim = re.sub("[\(\[].*?[\)\]]", "", name).strip(' ')
interim = re.sub(r'(^\w{1,6}\. ?)', r'', interim)
interim = re.sub(r'(^\w{1,6}\. ?)', r'', interim)
interim = re.sub(r'(^\w{1,6}\. ?)', r'', interim)
interim = re.sub(r'(^\w{1,6}\. ?)', r'', interim)
interim = re.sub(r'(^\w{1,6}\. ?)', r'', interim)
interim = unidecode.unidecode(interim).strip(' ')
interim = re.sub(' +', ' ', interim)
if twitter:
if len(interim.split()) > 2:
if interim.split()[0].endswith(('.', 'med', 'forest')):
first_name = interim.split()[1]
else:
first_name = interim.split()[0]
last_name = interim.split()[-1]
return (first_name + ' ' + last_name)
if interim.split()[-1] == 'von':
first_name = interim.split()[0:-1]
else:
return interim
else:
if len(interim.split()) > 1:
return(interim.split()[0])
else:
return interim
# Check whether it works
name_prep(name = 'Prof. Dr. Dr. rer. nat. Carl Friedrich Gauss', twitter = True)
We apply the function the name variables in both data frames:
# Prepare MP names from Twitter df for name-based matching
twitter_df['name_matching'] = twitter_df['name'].apply(name_prep, twitter = True)
# Prepare MP names from MP df for name-based matching
abg_df['name_matching'] = abg_df['first_name'].apply(name_prep, twitter = False) + ' ' + \
abg_df['last_name'].apply(name_prep, twitter = False)
Ready for merging! For now we keep the user names we found from both sources and set them to NaN where they are empty.
# Merge Twitter df and MP df
abg_twitter_df = pd.merge(
abg_df,
twitter_df[['name_matching', 'twitter_ext']],
how = 'left',
left_on = 'name_matching',
right_on = 'name_matching'
)
abg_twitter_df['username'] = np.where(
abg_twitter_df['username'] != '',
abg_twitter_df['username'],
np.nan
)
abg_twitter_df['twitter_ext'] = np.where(
abg_twitter_df['twitter_ext'] != '',
abg_twitter_df['twitter_ext'],
np.nan
)
abg_twitter_df.head()
And then we get rid of the redundancies, using the names found on the party's websites where possible, and the ones found on the Bundestag site otherwise.
# Impute account name from Bundestag website where necessary and available
abg_twitter_df['username'] = np.where(
abg_twitter_df['twitter_ext'].notnull(),
abg_twitter_df['twitter_ext'],
abg_twitter_df['username'])
abg_twitter_df = abg_twitter_df.drop('twitter_ext', axis = 1)
abg_twitter_df.head()
Lastly, we extract the usernames from the account URLs.
# Define function to extract usernames
def get_username(url):
if url.startswith('http'):
return(url.split('/')[3].split('?')[0])
else:
return(url.split('?')[0])
# Apply to all observations with existing account URL
mask = abg_twitter_df['username'].notnull()
abg_twitter_df['username'] = abg_twitter_df['username'][mask].apply(get_username)
abg_twitter_df.head()
After quite some LOC we are now ready to scrape the data we are actually after.
For this, we will use the tweepy
library and define a function that retrieves the data we are looking for.
# Function to download tweets for a specific user with Tweepy
def download_tweets_tweepy_mod(username):
# Helper function to check whether tweet is retweet
def is_retweet(x):
try:
res = not(math.isnan(x))
except:
res = True
return(res)
# Helper function to retrieve hashtags
def get_hashtags(x):
hashtags_dict = x['hashtags']
hashtags_text = [x['text'] for x in hashtags_dict]
return(hashtags_text)
# Helper function to retrieve user mentions
def get_mentions(x):
mentions_dict = x['user_mentions']
mentions_text = [x['screen_name'] for x in mentions_dict]
return(mentions_text)
# Initialize a list to hold all the tweepy Tweets
alltweets = []
# Specify relevant columns
colnames = [
'created_at',
'full_text',
'retweet_count',
'favorite_count',
'followers_count',
'location']
try:
# Make initial request for most recent tweets (200 is the maximum allowed count)
new_tweets = api.user_timeline(screen_name = username,
count = 200,
tweet_mode = "extended")
# Save most recent tweets
alltweets.extend(new_tweets)
# Save the id of the oldest tweet less one
oldest = alltweets[-1].id - 1
# Keep grabbing tweets until there are no tweets left to grab
while len(new_tweets) > 0:
# All subsequent requests use the max_id param to prevent duplicates
new_tweets = api.user_timeline(screen_name = username,
count = 200,
max_id = oldest,
tweet_mode = 'extended')
# Save most recent tweets
alltweets.extend(new_tweets)
oldest = alltweets[-1].id - 1
# Convert output to pandas DataFrame
outtweets = pd.DataFrame([tweet.__dict__ for tweet in alltweets])
# Check whether tweet is retweet
outtweets['is_retweet'] = outtweets['retweeted_status'].apply(is_retweet)
# Retrieve other metrics
outtweets['followers_count'] = [x.followers_count for x in outtweets['author']]
outtweets['location'] = [x.location for x in outtweets['author']]
outtweets = outtweets[~ outtweets['is_retweet']]
outtweets = outtweets[colnames]
# Add boolean column for availability
outtweets.insert(0, 'available', True)
except:
print('Data for user %s cannot be downloaded' %username)
outtweets = pd.DataFrame(np.nan, index = [0], columns = colnames)
outtweets.insert(0, 'available', False)
# Add column with username
outtweets.insert(0, 'username', username)
return(outtweets)
Again, we need to specify our API credentials, and again, PLEASE do not misuse :)
my_keys = {
'consumer_key': 'o0g3JVWSKzRYv9dQp2SEPdjXp',
'consumer_secret': 'AyvUIFzB82w3ZetyTXf1PbHiSxK7CgdcJo0D5jfKAoFlUuP0iH',
'access_token_key': '1302924762914660354-7ydX1jUVSnscL60hhl83biPGNVeQoH',
'access_token_secret': '9NqtnWj2q8uLuQkLMWdamJyIEb56hlGJOVgrydzoakorT'}
# Set up access to API
auth = tweepy.OAuthHandler(my_keys['consumer_key'], my_keys['consumer_secret'])
auth.set_access_token(my_keys['access_token_key'], my_keys['access_token_secret'])
api = tweepy.API(auth, wait_on_rate_limit = True)
We create a list of all MPs with active accounts:
# Get names and usernames
names = abg_twitter_df['name_matching']
twitter_usernames = abg_twitter_df['username']
twitter_account = pd.concat([names, twitter_usernames], axis = 1)
mask = twitter_account.username.notnull()
twitter_account = twitter_account[mask]
twitter_account.reset_index(drop = True, inplace = True)
twitter_account.head()
Let the scraping begin:
# Download most recent tweets using tweepy (at most 3200 tweets per user)
tweepy_df = pd.DataFrame()
for username in twitter_account['username']:
tweepy_df = pd.concat([tweepy_df, download_tweets_tweepy_mod(username)])
tweepy_df = twitter_account.merge(tweepy_df, on = 'username')
ctypes.windll.user32.MessageBoxW(0, "Twitter data successfully scraped", "Progress Report")
# Inspect
tweepy_df.sample(n = 20)
And that's it! We can now use our meta information and Twitter data as we please - merge them, analyze them, print them ... :)