Source code for image_crawler_utils.stations.twitter.parser_assets.search_status_analyzer
from typing import Optional
import re
import datetime
import os
import traceback
from bs4 import BeautifulSoup
import nodriver
import asyncio
from image_crawler_utils import CrawlerSettings
from image_crawler_utils.log import Log
from image_crawler_utils.progress_bar import CustomProgress, ProgressGroup
from .constants import SCROLL_DELAY, SCROLL_NUM, DOWN_SCROLL_LENGTH, LOAD_SCROLL_LENGTH
from .status_classes import TwitterStatus, TwitterStatusMedia
from .utils import twitter_progress_bar_loading, twitter_empty_check, twitter_error_check
[docs]
def parse_twitter_status_element(
status_html: str,
log: Log=Log()
) -> Optional[TwitterStatus]:
"""
Parse Twitter / X status element from search result page: "<article ...></article>".
Args:
status_html (str): HTML string of status element "<article ...></article>".
log (image_crawler_utils.log.Log, None): Logging config.
Returns:
A image_crawler_utils.stations.twitter.TwitterStatus class.
"""
soup = BeautifulSoup(status_html, "lxml")
result = TwitterStatus()
# Basic elements
try:
result.status_url = f'https://x.com{soup.find("a", class_="css-146c3p1 r-bcqeeo r-1ttztb7 r-qvutc0 r-37j5jr r-a023e6 r-rjixqe r-16dba41 r-xoduu5 r-1q142lx r-1w6e6rj r-9aw3ui r-3s2u2q r-1loqt21").get("href")}'
result.status_id = result.status_url.split('/')[-1]
result.user_id = result.status_url.split('/')[3]
except Exception as e:
return None # If a status does not contain these elements, then it is likely an advertisement!
# User name
try:
result.user_name = soup.find('span', class_='css-1jxf684 r-bcqeeo r-1ttztb7 r-qvutc0 r-poiln3').text
except Exception as e:
log.warning(f"Cannot get user name from [repr.url]{result.status_url}[reset] because {e}", extra={"markup": True})
# Posting time
try:
result.time = soup.find('time').get("datetime")
except Exception as e:
log.warning(f"Cannot get time from [repr.url]{result.status_url}[reset] because {e}", extra={"markup": True})
# Replies, retweets, and likes; string found is like '123 replies'
try:
stats_box = soup.find('div', attrs={'role': "group"})
buttons = stats_box.find_all('button')
try:
reply_num_str = buttons[0].get('aria-label')
result.reply_num = int(reply_num_str.split(' ')[0])
except Exception as e:
log.warning(f"Cannot get reply num from [repr.url]{result.status_url}[reset] because {e}", extra={"markup": True})
try:
retweet_num_str = buttons[1].get('aria-label')
result.retweet_num = int(retweet_num_str.split(' ')[0])
except Exception as e:
log.warning(f"Cannot get retweet num from [repr.url]{result.status_url}[reset] because {e}", extra={"markup": True})
try:
like_num_str = buttons[2].get('aria-label')
result.like_num = int(like_num_str.split(' ')[0])
except Exception as e:
log.warning(f"Cannot get like num from [repr.url]{result.status_url}[reset] because {e}", extra={"markup": True})
except Exception as e:
log.warning(f"Cannot get replies / retweets / likes information from [repr.url]{result.status_url}[reset] because {e}", extra={"markup": True})
# Optional elements: view num (Some old tweets do not have this)
try:
view_num_str = soup.find('a', class_="css-175oi2r r-1777fci r-bt1l66 r-bztko3 r-lrvibr r-1ny4l3l r-1loqt21").get('aria-label')
result.view_num = int(view_num_str.split(' ')[0])
except:
result.view_num = None
# Optional elements: Text
text_box = soup.find('div', attrs={'data-testid': "tweetText"})
if text_box is not None:
try:
result.text = ''.join([(content.get("alt") if "alt" in content.attrs else content.text) for content in text_box.contents])
result.hashtags = [content.text for content in text_box.find_all('a') if content.get("href").startswith('/hashtag')]
result.links = [content.get("href") for content in text_box.find_all('a') if not content.get("href").startswith('/hashtag')]
except:
pass
# Optional elements: Media
media_box = [div for div in soup.find_all('div') if "aria-labelledby" in div.attrs]
if len(media_box) > 0:
media_box = media_box[0]
try:
for link in media_box.find_all('a'):
if link.find('img') is not None:
media_info = TwitterStatusMedia()
media_info.link = f'https://x.com{link.get("href")}' if 'https' not in link.get("href") else link.get("href")
image_source = link.find("img").get("src")
if 'abs-0.twimg.com/emoji' in image_source: # .svg is emoji images
continue
media_info.image_source = re.search(r".*&", image_source).group()[:-1] + '&name=orig'
media_info.image_id = image_source.split('/')[-1].split('?')[0]
try:
ext = re.search(r"format=.*?&", image_source).group()[len('format='):-1]
except:
ext = re.search(r"format=.*?", image_source).group()[len('format='):]
media_info.image_name = result.status_id + f'.{ext}'
result.media_list.append(media_info)
if len(result.media_list) > 1:
# Multiple media in a status
for i in range(len(result.media_list)):
name, ext = os.path.splitext(result.media_list[i].image_name)
result.media_list[i].image_name = name + f'_{i + 1}' + ext
except Exception as e:
output_msg_base = f"There should be at least 1 media in [repr.url]{result.status_url}[reset], but none is detected"
log.warning(f"{output_msg_base}.\n{traceback.format_exc()}", output_msg=f"{output_msg_base} because {e}")
return result
[docs]
async def find_twitter_status(
tab: nodriver.Tab,
log: Log=Log(),
) -> list[TwitterStatus]:
"""
Finding all Twitter / X status on current searching result page.
Args:
tab (unodriver.Tab): Nodriver tab with loaded searching result page.
log (image_crawler_utils.log.Log, None): Logging config.
Returns:
A list of image_crawler_utils.stations.twitter.TwitterStatus class.
"""
status_list: list[TwitterStatus] = []
# Find status
await tab # Let the page be loaded
main_structure = await tab.select('div[data-testid="primaryColumn"]')
status_elements = await main_structure.query_selector_all('article[data-testid="tweet"]')
for element in status_elements:
element_html = await element.get_html()
parsed_twitter_status = parse_twitter_status_element(element_html, log=log)
if parsed_twitter_status is not None:
status_list.append(parsed_twitter_status)
return status_list
[docs]
async def scrolling_to_find_status(
tab: nodriver.Tab,
tab_url: str,
crawler_settings: CrawlerSettings=CrawlerSettings(),
reload_times: int=1,
error_retry_delay: float=200,
image_num_restriction: Optional[int]=None,
progress_group: Optional[ProgressGroup]=None,
transient: bool=False,
) -> list[TwitterStatus]:
"""
Scrolling to finding all Twitter / X status on current searching result page.
Args:
crawler_settings (image_crawler_utils.CrawlerSettings): The CrawlerSettings used in this Parser.
tab (nodriver.Tab): nodriver.Tab with loaded searching result page.
reload_times (int): To deal with (possible) missing status, reload pages for reload_times to get status results.
error_retry_delay (float): When an error happens (especially Twitter / X returns an error), sleep error_retry_delay before reloading again.
progress_group (image_crawler_utils.progress_bar.ProgressGroup): The Group of Progress bars to be displayed in.
transient (bool): Hide Progress bars after finishing.
Returns:
A list of image_crawler_utils.stations.twitter.TwitterStatus class, sort by status from large to small.
"""
# Fetching status with retrying; every attempt may lead to different results
final_status_list: list[TwitterStatus] = [] # All status
# Load the page for reload_count times
for reload_count in range(reload_times):
not_from_retry_button = True
if progress_group is None: # No father tasks are provided, create an separate progress
progress = CustomProgress(has_total=False, transient=transient)
progress.start()
else:
if transient:
progress = progress_group.sub_no_total_count_bar
else:
progress = progress_group.main_no_total_count_bar
task = progress.add_task(description=f'Loading [repr.number]{reload_count + 1}[reset]/[repr.number]{reload_times}[reset], scrolling times:')
# Different from reload_count, retry_count only works when an error happens
retry_count = 0
while retry_count < crawler_settings.download_config.retry_times:
try:
# Loading until progress bar (rotating circle) disappears
crawler_settings.log.debug(f'Awaiting loading icons to disappear in [repr.url]{tab_url}[reset] ...', extra={"markup": True})
await twitter_progress_bar_loading(tab)
crawler_settings.log.debug(f'Loading icons disappeared in [repr.url]{tab_url}[reset].', extra={"markup": True})
if not_from_retry_button: # If the page is new, do some initialization
retry_count += 1
attempt_status_list: list[TwitterStatus] = [] # Status retrieved in every retry
len_attempt_status = -1
scroll_count = 0
media_count = 0
await tab.scroll_up(1000) # Sometimes it does not load from the first tweet. Scroll to top in case of this!
# Check if it is empty
crawler_settings.log.debug(f'Checking "empty" elements in [repr.url]{tab_url}[reset].', extra={"markup": True})
check = await twitter_empty_check(tab)
if check:
crawler_settings.log.warning(f'Page [repr.url]{tab_url}[reset] contains no result.', extra={"markup": True})
return [], 0 # Exit directly
# Check if there is an error
crawler_settings.log.debug(f'Checking error elements in [repr.url]{tab_url}[reset].', extra={"markup": True})
check = await twitter_error_check(tab)
if check:
raise ConnectionRefusedError
# Start scrolling down batch
while len(attempt_status_list) != len_attempt_status or not not_from_retry_button: # When it is loaded from retry button, force the loop to run once
len_attempt_status = len(attempt_status_list)
if not_from_retry_button: # When retry button is detected, the page had already scrolled down
# Scroll down LOAD_SCROLL_LENGTH
progress.update(task, advance=1)
await tab.scroll_down(LOAD_SCROLL_LENGTH)
crawler_settings.log.debug(f'Scrolled down {LOAD_SCROLL_LENGTH} at [repr.url]{tab_url}[reset]', extra={"markup": True})
# Loading until progress bar (rotating circle) disappears
crawler_settings.log.debug(f'Awaiting loading icons to disappear in [repr.url]{tab_url}[reset] ...', extra={"markup": True})
await twitter_progress_bar_loading(tab)
crawler_settings.log.debug(f'Loading icons disappeared in [repr.url]{tab_url}[reset].', extra={"markup": True})
# Check if there is an error
crawler_settings.log.debug(f'Checking error elements in [repr.url]{tab_url}[reset].', extra={"markup": True})
check = await twitter_error_check(tab)
if check:
raise ConnectionRefusedError
# Scroll up LOAD_SCROLL_LENGTH
progress.update(task, advance=1)
await tab.scroll_up(LOAD_SCROLL_LENGTH)
crawler_settings.log.debug(f'Scrolled up {LOAD_SCROLL_LENGTH} at [repr.url]{tab_url}[reset]', extra={"markup": True})
# Only compare the results after SCROLL_NUM scrollings
for i in range(SCROLL_NUM):
await asyncio.sleep(SCROLL_DELAY)
progress.update(task, advance=1)
await tab.scroll_down(DOWN_SCROLL_LENGTH)
crawler_settings.log.debug(f'Scrolled down {DOWN_SCROLL_LENGTH} at [repr.url]{tab_url}[reset]', extra={"markup": True})
scroll_count += 1
# Twitter has f**king StaleElementReferenceException, which means you may retry several times to retrieve the element
for j in range(crawler_settings.download_config.retry_times):
try:
current_status_list = await find_twitter_status(
tab=tab,
log=crawler_settings.log,
)
break # Successful, stop retrying
except ConnectionRefusedError as e: # An Twitter / X error happens!
raise ConnectionRefusedError(e)
except Exception as e:
current_status_list = None
error_msg = e
if current_status_list is None: # An error happened
raise ConnectionError(error_msg)
else: # No error, status successfully got
attempt_status_url_list = [status.status_url for status in attempt_status_list]
for status in current_status_list:
if status.status_url not in attempt_status_url_list:
attempt_status_list.append(status)
media_count += len(status.media_list)
progress.update(task, description=f'Loading [repr.number]{reload_count + 1}[reset]/[repr.number]{reload_times}[reset], [repr.number]{len(attempt_status_list)}[reset] status & [repr.number]{media_count}[reset] {"images" if media_count > 1 else "image"} detected after scrolling times:')
# Reached restrictions on media num
if image_num_restriction is not None and media_count >= image_num_restriction:
crawler_settings.log.info(f'Collected {media_count} media {"images have" if media_count > 1 else "image has"} exceeded the restrictions on image num ({image_num_restriction} {"images" if image_num_restriction > 1 else "image"}).')
len_attempt_status = len(attempt_status_list) # Set this to break the outer loop
break
not_from_retry_button = True # Current scrolling down finished
break # Succeeded, no retrying
except ConnectionRefusedError:
restart_time = datetime.datetime.strftime(datetime.datetime.now() + datetime.timedelta(seconds=error_retry_delay), '%H:%M:%S')
crawler_settings.log.warning(f'Twitter / X returns an error when loading [repr.url]{tab_url}[reset], next reloading will start {error_retry_delay} {"seconds" if error_retry_delay > 1 else "second"} later at {restart_time}.', extra={"markup": True})
# Update progress bar to pausing
progress.update(task, description=f'[yellow bold](Pausing)[reset] Loading [repr.number]{reload_count + 1}[reset]/[repr.number]{reload_times}[reset], [repr.number]{len(attempt_status_list)}[reset] status & [repr.number]{media_count}[reset] {"images" if media_count > 1 else "image"} detected after scrolling times:')
await asyncio.sleep(error_retry_delay)
# Reset progress bar from pausing
progress.update(task, description=f'Loading [repr.number]{reload_count + 1}[reset]/[repr.number]{reload_times}[reset], [repr.number]{len(attempt_status_list)}[reset] status & [repr.number]{media_count}[reset] {"images" if media_count > 1 else "image"} detected after scrolling times:')
try: # Try clicking the retry button
main_structure = await tab.select('div[data-testid="primaryColumn"]')
error_element = await main_structure.query_selector('button[class="css-175oi2r r-sdzlij r-1phboty r-rs99b7 r-lrvibr r-2yi16 r-1qi8awa r-3pj75a r-1loqt21 r-o7ynqc r-6416eg r-1ny4l3l"]')
await error_element.click()
await tab
not_from_retry_button = False # Keep collected status and do not scroll up & update retry count
except: # Failed to find the button, then reload the page
crawler_settings.log.warning(f'Retry button is missing in [repr.url]{tab_url}[reset]! Refreshing this page.', extra={"markup": True})
await tab.get(tab_url) # Refresh
retry_count -= 1 # Do not update retry count
not_from_retry_button = True
except Exception as e:
output_msg_base = f'Failed to load page [repr.url]{tab_url}[reset] at attempt {retry_count}'
crawler_settings.log.warning(f"{output_msg_base}.\n{traceback.format_exc()}", output_msg=f"{output_msg_base} because {e}", extra={"markup": True})
if retry_count < crawler_settings.download_config.retry_times - 1: # Not the last reloading
await asyncio.sleep(crawler_settings.download_config.result_thread_delay)
await tab.get(tab_url) # Refresh
not_from_retry_button = True
progress.finish_task(task, hide=transient) # No matter success of failure, finish the task in this reload_count
# Add status in this loading to the final_status_list
final_status_url_list = [status.status_url for status in final_status_list]
for status in attempt_status_list:
if status.status_url not in final_status_url_list:
final_status_list.append(status)
# Reload page again
if reload_count < reload_times - 1:
await tab.get(tab_url) # Refresh
await tab
await tab.scroll_up(1000)
final_status_list.sort(reverse=True) # Sort by status_id from large to small
return final_status_list, media_count