Source code for image_crawler_utils.stations.twitter.parser_assets.user_parser
import dataclasses
import datetime
from typing import Optional, Union
from urllib import parse
import nodriver
from concurrent import futures
from image_crawler_utils import Cookies, Parser, ImageInfo, CrawlerSettings, update_nodriver_browser_cookies
from image_crawler_utils.progress_bar import ProgressGroup
from image_crawler_utils.utils import set_up_nodriver_browser
from .search_settings import TwitterSearchSettings
from .search_status_analyzer import scrolling_to_find_status
from .status_classes import TwitterStatus
##### Twitter Media Parser
[docs]
class TwitterUserMediaParser(Parser):
def __init__(
self,
user_id: str,
station_url: str="https://x.com/",
crawler_settings: CrawlerSettings=CrawlerSettings(),
cookies: Optional[Union[Cookies, list, dict, str]]=Cookies(),
reload_times: int=1,
error_retry_delay: float=200,
interval_days: int=180,
starting_date: Optional[str]=None,
ending_date: Optional[str]=None,
exit_when_empty: bool=False,
headless: bool=True,
):
"""
Args:
crawler_settings (image_crawler_utils.CrawlerSettings): The CrawlerSettings used in this Parser.
user_id: Twitter / X ID of a user.
station_url (str): The URL of the main page of a website.
+ This parameter works when several websites use the same structure. For example, https://yande.re/ and https://konachan.com/ both use Moebooru to build their websites, and this parameter must be filled to deal with these sites respectively.
+ For websites like https://www.pixiv.net/, as no other website uses its structure, this parameter has already been initialized and do not need to be filled.
cookies (image_crawler_utils.Cookies, str, dict, list, None): Cookies containing logging information.
reload_times (int): Time of reloading page in case some status are omitted.
error_retry_delay (float): Pause error_retry_delay seconds if an error happened.
interval_days (int): Interval of days for each searching result page.
starting_date ("YYYY-MM-DD" format str): Get images posted only after this date.
ending_date ("YYYY-MM-DD" format str): Get images posted only before this date.
exit_when_empty (bool): Stop loading new batches when no result are found in one of the pages. Used only when you set a large interval_days and user always tweets at a high frequency.
headless (bool): Hide browser window when browser is loaded.
"""
super().__init__(
station_url=station_url,
crawler_settings=crawler_settings,
cookies=cookies,
)
self.user_id = user_id
self.reload_times = reload_times
self.error_retry_delay = error_retry_delay
self.interval_days = interval_days
self.starting_date = starting_date
self.ending_date = ending_date
self.exit_when_empty = exit_when_empty
self.headless = headless
[docs]
def run(self) -> list[ImageInfo]:
"""
The main function that runs the Parser and returns a list of :class:`image_crawler_utils.ImageInfo`.
"""
if self.cookies.is_none():
raise ValueError('Cookies cannot be empty!')
self.generate_search_settings()
self.get_status_from_urls()
return self.parse_images_from_status()
##### Custom funcs
# Generate search settings
[docs]
def generate_search_settings(self) -> list[TwitterSearchSettings]:
if self.ending_date is None:
self.ending_date = datetime.datetime.now().strftime("%Y-%m-%d")
if self.starting_date is None:
self.starting_date = "2006-01-01"
starting_datetime = datetime.datetime.strptime(self.starting_date, "%Y-%m-%d")
ending_datetime = datetime.datetime.strptime(self.ending_date, "%Y-%m-%d")
# Generate search time intervals
interval_list = []
interval_end = ending_datetime
while True:
interval_begin = interval_end - datetime.timedelta(days=self.interval_days)
if interval_begin < starting_datetime:
if self.starting_date == "2006-01-01": # Beginning of Twitter / X date
interval_begin = None
else:
interval_begin = starting_datetime
interval_list.append([interval_begin, interval_end])
interval_end = interval_end - datetime.timedelta(days=self.interval_days)
if interval_end < starting_datetime:
break
# Generate TwitterSearchSettings list
search_settings_list: list[TwitterSearchSettings] = []
for interval in interval_list:
search_settings = TwitterSearchSettings(
from_users=self.user_id,
only_media=True,
starting_date=interval[0].strftime("%Y-%m-%d") if interval[0] is not None else '',
ending_date=interval[1].strftime("%Y-%m-%d") if interval[1] is not None else '',
)
search_settings_list.append(search_settings)
self.crawler_settings.log.info(f"{len(search_settings_list)} {'pages' if len(search_settings_list) > 1 else 'page'} will be loaded to detect status.")
self.search_settings_list = search_settings_list
return self.search_settings_list
# Get all status from urls
async def __get_status_from_urls_thread(
self,
search_setting: TwitterSearchSettings,
progress_group: ProgressGroup,
thread_id: int,
) -> list[TwitterStatus]:
browser = await set_up_nodriver_browser(
proxies=self.crawler_settings.download_config.result_proxies,
headless=self.headless,
no_image_stylesheet=True,
)
await update_nodriver_browser_cookies(browser, self.cookies)
search_str = search_setting.build_search_appending_str('')
url = parse.quote(f'{self.station_url}search?q={search_str}&src=typed_query&f=live', safe='/:?=&')
tab = await browser.get(url)
self.crawler_settings.log.debug(f'Starting thread {thread_id + 1}/{len(self.search_settings_list)} to detect Twitter / X status from [repr.url]{url}[reset].', extra={"markup": True})
result_status_list, media_count = await scrolling_to_find_status(
tab=tab,
tab_url=url,
crawler_settings=self.crawler_settings,
reload_times=self.reload_times,
error_retry_delay=self.error_retry_delay,
progress_group=progress_group,
transient=True,
)
browser.stop()
self.crawler_settings.log.info(f'Finished thread {thread_id + 1}/{len(self.search_settings_list)} that detected from {search_setting.starting_date} to {search_setting.ending_date}. {len(result_status_list)} status & {media_count} {"images" if media_count > 1 else "image"} are detected.')
return result_status_list
[docs]
def get_status_from_urls(self) -> list[TwitterStatus]:
total_status_list: list[TwitterStatus] = []
total_media_num = 0
finished_num = 0
self.crawler_settings.log.info("Loading searching pages to get media from status...")
# Segment search_settings_list to prepare it for threading
thread_num = self.crawler_settings.download_config.thread_num
batched_search_settings_list = [self.search_settings_list[k * thread_num:min((k + 1) * thread_num, len(self.search_settings_list))]
for k in range((len(self.search_settings_list) - 1) // thread_num + 1)]
exit_flag = False
with ProgressGroup(panel_title="Scrolling to Find [yellow]Status[reset]") as progress_group:
task = progress_group.main_no_total_count_bar.add_task("Media image number:")
for j in range(len(batched_search_settings_list)):
# Get status using threading method
thread_num = self.crawler_settings.download_config.thread_num
with futures.ThreadPoolExecutor(thread_num) as executor:
thread_pool = [executor.submit(
nodriver.loop().run_until_complete,
self.__get_status_from_urls_thread(
batched_search_settings_list[j][i],
progress_group,
j * thread_num + i,
),
) for i in range(len(batched_search_settings_list[j]))]
# Get result
for thread in futures.as_completed(thread_pool):
finished_num += 1
total_status_url_list = [status.status_url for status in total_status_list]
for status in thread.result():
if status.status_url not in total_status_url_list:
total_status_list.append(status)
total_media_num += len(status.media_list)
progress_group.main_no_total_count_bar.update(task, advance=len(status.media_list))
# Update search result
progress_group.main_no_total_count_bar.update(task, description=f'Got [repr.number]{finished_num}[reset] {"pages" if finished_num > 1 else "page"} & [repr.number]{len(total_status_list)}[reset] status with media image number:')
# Exit when empty
if len(thread.result()) == 0 and self.exit_when_empty:
self.crawler_settings.log.debug("An empty page is detected. No new batches of page threads will be loaded.")
exit_flag = True
# Check if media image num has exceeded capacity_count_config.image_num
image_num = self.crawler_settings.capacity_count_config.image_num
if (image_num is not None and total_media_num >= image_num):
self.crawler_settings.log.info(f"Collected {total_media_num} media {'images have' if total_media_num > 1 else 'image has'} exceeded the restrictions on image num ({image_num} {'images' if image_num > 1 else 'image'}).")
break
if exit_flag:
self.crawler_settings.log.info(f"As empty pages are detected, no new pages will be loaded to detect status.")
break
progress_group.main_no_total_count_bar.update(task, description=f'[green]Finished finding status![reset] Got [repr.number]{finished_num}[reset] {"pages" if finished_num > 1 else "page"} & [repr.number]{len(total_status_list)}[reset] status with media image number:')
self.crawler_settings.log.info(f"Finished getting status. {len(total_status_list)} status {'are' if len(total_status_list) > 1 else 'is'} fetched.")
total_status_list.sort(reverse=True) # Sort by status_id from large to small
self.status_list = total_status_list
return self.status_list
# Parse images from status
[docs]
def parse_images_from_status(self) -> list[ImageInfo]:
self.crawler_settings.log.info("Parsing image info from collected status...")
image_info_list = []
for status in self.status_list:
for image in status.media_list:
image_info_list.append(ImageInfo(
url=image.image_source,
name=image.image_name,
info=dataclasses.asdict(status),
))
if self.crawler_settings.capacity_count_config.image_num is not None: # Get only image_num images
image_info_list = image_info_list[:self.crawler_settings.capacity_count_config.image_num]
self.crawler_settings.log.info(f"Image info parsed. {len(image_info_list)} {'images' if len(image_info_list) > 1 else 'image'} collected.")
self.image_info_list = image_info_list
return self.image_info_list