Source code for image_crawler_utils.stations.twitter.parser_assets.keyword_parser

import dataclasses
from typing import Optional, Union
import traceback

from urllib import parse
import nodriver

from image_crawler_utils import Cookies, KeywordParser, CrawlerSettings, ImageInfo, update_nodriver_browser_cookies
from image_crawler_utils.keyword import KeywordLogicTree
from image_crawler_utils.progress_bar import CustomProgress, ProgressGroup
from image_crawler_utils.utils import set_up_nodriver_browser

from .search_settings import TwitterSearchSettings
from .search_status_analyzer import scrolling_to_find_status
from .status_classes import TwitterStatus



##### Twitter Keyword Parser


[docs] class TwitterKeywordMediaParser(KeywordParser): """ Keyword Parser for Twitter. Will fetch all media images from the searching result of certain keywords. Args: crawler_settings (image_crawler_utils.CrawlerSettings): The CrawlerSettings used in this Parser. station_url (str): The URL of the main page of a website. + This parameter works when several websites use the same structure. For example, https://yande.re/ and https://konachan.com/ both use Moebooru to build their websites, and this parameter must be filled to deal with these sites respectively. + For websites like https://www.pixiv.net/, as no other website uses its structure, this parameter has already been initialized and do not need to be filled. standard_keyword_string (str): Query keyword string using standard syntax. Refer to the documentation for detailed instructions. keyword_string (str, None): If you want to directly specify the keywords used in searching, set ``keyword_string`` to a custom non-empty string. It will OVERWRITE ``standard_keyword_string``. + For example, set ``keyword_string`` to ``"kuon_(utawarerumono) rating:safe"`` in DanbooruKeywordParser means searching directly with this string in Danbooru, and its standard keyword string equivalent is ``"kuon_(utawarerumono) AND rating:safe"``. cookies (image_crawler_utils.Cookies, str, dict, list, None): Cookies containing logging information. twitter_search_settings (image_crawler_utils.stations.twitter.TwitterSearchSettings): A TwitterSearchSettings class that contains extra options when searching. reload_times (int): Reload the page for ``reload_times`` times. May be useful when there are status (tweets) not detected. error_retry_delay (float): When Twitter / X returns an error, the Parser will retry after ``error_retry_delay`` seconds. headless (bool): Do not display browsers window when a browser is started. Set to :py:data:`False` will pop up browser windows. """ def __init__( self, station_url: str="https://x.com/", crawler_settings: CrawlerSettings=CrawlerSettings(), standard_keyword_string: Optional[str]=None, keyword_string: Optional[str]=None, cookies: Optional[Union[Cookies, list, dict, str]]=Cookies(), twitter_search_settings: TwitterSearchSettings=TwitterSearchSettings(), reload_times: int=1, error_retry_delay: float=200, headless: bool=True, ): super().__init__( station_url=station_url, crawler_settings=crawler_settings, standard_keyword_string=standard_keyword_string, keyword_string=keyword_string, cookies=cookies, accept_empty=True, ) self.twitter_search_settings = twitter_search_settings self.reload_times = reload_times self.error_retry_delay = error_retry_delay self.headless = headless
[docs] def run(self) -> list[ImageInfo]: """ The main function that runs the Parser and returns a list of :class:`image_crawler_utils.ImageInfo`. """ if self.cookies.is_none(): raise ValueError('Cookies cannot be empty!') if self.keyword_string is None: self.generate_keyword_string() self.get_status() return self.parse_images_from_status()
##### Custom funcs # Generate keyword string from keyword tree def __build_keyword_str(self, tree: KeywordLogicTree) -> str: # Generate standard keyword string if isinstance(tree.lchild, str): res1 = tree.lchild else: res1 = self.__build_keyword_str(tree.lchild) if isinstance(tree.rchild, str): res2 = tree.rchild else: res2 = self.__build_keyword_str(tree.rchild) if tree.logic_operator == "AND": return f'({res1} {res2})' elif tree.logic_operator == "OR": return f'({res1} OR {res2})' elif tree.logic_operator == "NOT": return f'(-{res2})' elif tree.logic_operator == "SINGLE": return f'{res2}' # Basic keyword string
[docs] def generate_keyword_string(self) -> str: self.keyword_string = self.__build_keyword_str(self.keyword_tree) return self.keyword_string
# Load browser and fetch images from status async def __get_status(self) -> list[TwitterStatus]: query_string = self.twitter_search_settings.build_search_appending_str(self.keyword_string) search_status_url = parse.quote(f'{self.station_url}search?q={query_string}&src=typed_query&f=live', safe='/:?=&') self.crawler_settings.log.info(f'Loading searching page using query string "{query_string}" and URL [repr.url]{search_status_url}[reset] ...', extra={"markup": True}) flag_success = False for i in range(self.crawler_settings.download_config.retry_times): with CustomProgress(has_spinner=True, transient=True) as progress: try: task = progress.add_task(total=3, description='Loading browser components...') # Connect once to get cookies try: self.crawler_settings.log.debug(f"Connecting to twitter searching result: [repr.url]{search_status_url}[reset]", extra={"markup": True}) browser = await set_up_nodriver_browser( proxies=self.crawler_settings.download_config.result_proxies, headless=self.headless, no_image_stylesheet=True, ) progress.update(task, advance=1, description="Requesting searching result once...") tab = await browser.get(search_status_url) result = await tab.select('div[id="react-root"]') if result is None: raise ModuleNotFoundError('Element div[id="react-root"] not found') except Exception as e: browser.stop() raise ConnectionError(f"{e}") # Replace cookies await update_nodriver_browser_cookies(browser, self.cookies) # Connect twice to get page try: progress.update(task, advance=1, description="Requesting searching result again with cookies...") await tab.get(search_status_url) # Do not reload directly! It may be the login page. except Exception as e: browser.stop() raise ConnectionError(f"{e}") flag_success = True progress.update(task, advance=1, description="[green]Requesting successfully finished!") break except Exception as e: self.crawler_settings.log.warning(f"Loading Twitter / X searching result page failed at attempt {i + 1} because {e}") error_msg = e if not flag_success: output_msg_base = f"Loading Twitter / X searching result page [repr.url]{search_status_url}[reset] failed" self.crawler_settings.log.critical(f"{output_msg_base}.\n{traceback.format_exc()}", output_msg=f"{output_msg_base} because {error_msg}", extra={"markup": True}) raise ConnectionError(f"{error_msg}") self.crawler_settings.log.info("Scrolling to get status...") with ProgressGroup(panel_title="Scrolling to Find [yellow]Status[reset]") as progress_group: status_list, media_count = await scrolling_to_find_status( tab=tab, tab_url=search_status_url, crawler_settings=self.crawler_settings, reload_times=self.reload_times, error_retry_delay=self.error_retry_delay, image_num_restriction=self.crawler_settings.capacity_count_config.image_num, progress_group=progress_group, transient=False, ) self.crawler_settings.log.info(f'Finished getting status. {len(status_list)} status & {media_count} {"images" if media_count > 1 else "image"} are collected.') browser.stop() self.status_list = status_list return self.status_list
[docs] def get_status(self) -> list[TwitterStatus]: return nodriver.loop().run_until_complete( self.__get_status() )
# Parse images from status
[docs] def parse_images_from_status(self) -> list[ImageInfo]: self.crawler_settings.log.info("Parsing image info from collected status...") image_info_list = [] for status in self.status_list: for image in status.media_list: image_info_list.append(ImageInfo( url=image.image_source, name=image.image_name, info=dataclasses.asdict(status), )) if self.crawler_settings.capacity_count_config.image_num is not None: # Get only image_num images image_info_list = image_info_list[:self.crawler_settings.capacity_count_config.image_num] self.crawler_settings.log.info(f"Image info parsed. {len(image_info_list)} {'images' if len(image_info_list) > 1 else 'image'} collected.") self.image_info_list = image_info_list return self.image_info_list