import os
import re
import requests
from rich import markup
import nodriver
from typing import Optional
import traceback
from image_crawler_utils import Cookies, update_nodriver_browser_cookies
from image_crawler_utils.configs import DownloadConfig
from image_crawler_utils.log import Log
from image_crawler_utils.progress_bar import CustomProgress, ProgressGroup
from image_crawler_utils.utils import check_dir, set_up_nodriver_browser
from .core_downloader import download_image
# Parsing images
async def __get_image_from_status(
url: str,
download_config: DownloadConfig=DownloadConfig(),
log: Log=Log(),
session: Optional[requests.Session]=requests.Session(),
progress_group: Optional[ProgressGroup]=None,
):
if progress_group is None: # No father tasks are provided, create an separate progress
progress = CustomProgress(has_spinner=True, transient=True)
progress.start()
else:
progress = progress_group.sub_count_bar
task = progress.add_task(description='Loading browser components...', total=3)
# Connect once to get cookies
try:
log.debug(f"Parsing Twitter / X status page: [repr.url]{markup.escape(url)}[reset]", extra={"markup": True})
browser = await set_up_nodriver_browser(
proxies=download_config.result_proxies,
)
progress.update(task, advance=1, description="Requesting Twitter / X status once...")
tab = await browser.get(url)
result = await tab.select('div[id="react-root"]')
if result is None:
raise ModuleNotFoundError('Element div[id="react-root"] not found')
except Exception as e:
progress.finish_task(task)
browser.stop()
raise ConnectionError(f"{e}")
# Replace cookies
cookies = Cookies(session.cookies.get_dict())
await update_nodriver_browser_cookies(browser, cookies)
# Connect twice to get images
try:
progress.update(task, advance=1, description="Requesting Twitter / X status again with cookies...")
await tab.get(url)
await tab.scroll_up(1000) # Sometimes it does not load from the first tweet. Scroll to top in case of this!
await tab # Wait until the whole page is fully loaded!
# Get main structure
except Exception as e:
progress.finish_task(task)
browser.stop()
raise ConnectionError(f"{e}")
# Check if it is empty
try:
result = await tab.select('article[data-testid="tweet"]', timeout=30) # Try to get a tweet first
if result is None:
raise ModuleNotFoundError('Element article[data-testid="tweet"] not found')
except:
try:
main_structure = await tab.select('div[data-testid="primaryColumn"]')
except Exception as e:
progress.finish_task(task)
raise ConnectionError(f"{e}")
empty_element = None # Twitter / X page not exist
try:
empty_element = await main_structure.query_selector('div[data-testid="error-detail"]')
finally:
if empty_element is not None:
progress.finish_task(task)
raise FileExistsError("This Twitter / X page does not exist, or not accessible without an account.")
# Check if the tweet itself is banned (comment tweets may exist)
main_structure = await tab.select('div[data-testid="primaryColumn"]')
banned_element = None # Twitter / X page banned
try:
banned_element = await main_structure.query_selector('a[href="https:\\/\\/help.twitter.com\\/rules-and-policies\\/notices-on-twitter"]')
finally:
if banned_element is not None:
progress.finish_task(task)
raise FileExistsError("This Twitter / X page does not exist because the status or user violated Twitter / X rules and policies.")
# Try parsing image elements
try:
tweet_element = await main_structure.query_selector('article[data-testid="tweet"]')
img_elements = await tweet_element.query_selector_all('img')
available_src = [element.attrs['src'] for element in img_elements
if ('src' in element.attrs.keys()
and "pbs.twimg.com/media" in element.attrs['src'])]
if len(available_src) == 0:
progress.finish_task(task)
raise FileNotFoundError("Images not found on Twitter / X status.")
progress.finish_task(task)
browser.stop()
except Exception as e:
progress.finish_task(task)
browser.stop()
raise FileNotFoundError(f"{e}")
return available_src
# Downloading images
async def __twitter_download_image_from_status(
url: str,
image_name: str,
download_config: DownloadConfig=DownloadConfig(),
log: Log=Log(),
store_path: str="./",
session: Optional[requests.Session]=requests.Session(),
progress_group: Optional[ProgressGroup]=None,
thread_id: int=0,
) -> tuple[float, int]:
# Get image from status webpage
available_src = None
for i in range(download_config.retry_times):
try:
available_src = await __get_image_from_status(
url=url,
download_config=download_config,
log=log,
session=session,
progress_group=progress_group,
)
break
except FileExistsError as e: # Status itself has error (not found, banned, etc.)
error_msg = e
break
except Exception as e:
log.warning(f"Parsing Twitter / X status page failed at attempt {i + 1} because {e}")
error_msg = e
if available_src is None:
output_msg_base = f"Parsing Twitter / X status page [repr.url]{markup.escape(url)}[reset] failed"
log.error(f"{output_msg_base}.\n{traceback.format_exc()}", output_msg=f"{output_msg_base} because {error_msg}", extra={"markup": True})
return (0, thread_id)
# Edit URLs and names
url_list = [(re.search(r".*&", src).group()[:-1] + '&name=orig') for src in available_src]
image_name_list = [image_name] * len(url_list)
for i in range(0, len(url_list)):
try:
ext = re.search(r"format=.*?&", url_list[i]).group()[len('format='):-1]
except:
ext = re.search(r"format=.*?", url_list[i]).group()[len('format='):]
if '.' not in image_name_list[i]:
# Image has no suffix
image_name_list[i] += f'.{ext}'
else:
# Image has suffix but not right
image_name_list[i] = os.path.splitext(image_name_list[i])[0] + f'.{ext}'
if len(url_list) > 1:
# More than 1 image in the status page
image_name_list[i] = os.path.splitext(image_name_list[i])[0] + f'_{i + 1}' + os.path.splitext(image_name_list[i])[1]
# Start downloading
check_dir(store_path, log)
total_downloaded_size = 0
for j in range(0, len(url_list)):
is_success, image_size = download_image(
url=url_list[j],
image_name=image_name_list[j],
download_config=download_config,
log=log,
store_path=store_path,
session=session,
progress_group=progress_group,
thread_id=thread_id,
)
total_downloaded_size += image_size
if not is_success:
log.error(f"FAILED to download [repr.filename]{markup.escape(image_name_list[j])}[reset] from [repr.url]{markup.escape(url_list[j])}[reset]", extra={"markup": True})
return total_downloaded_size, thread_id
# Actually used