from abc import ABC, abstractmethod
import requests
import traceback
import random
import time, datetime
from typing import Optional, Union
from collections.abc import Iterable, Callable
import os, dill
from rich import print, markup
import json
from bs4 import BeautifulSoup
from urllib import parse
from concurrent import futures
import nodriver, asyncio
from image_crawler_utils import Cookies, update_nodriver_browser_cookies
from image_crawler_utils.keyword import KeywordLogicTree, construct_keyword_tree
from image_crawler_utils.log import Log
from image_crawler_utils.progress_bar import CustomProgress, ProgressGroup
from image_crawler_utils.utils import check_dir, Empty, set_up_nodriver_browser, silent_deconstruct_browser
from .crawler_settings import CrawlerSettings
from .image_info import ImageInfo
[docs]
class Parser(ABC):
"""
A Parser include several basic functions.
Args:
station_url (str): The URL of the main page of a website.
+ This parameter works when several websites use the same structure. For example, https://yande.re/ and https://konachan.com/ both use Moebooru to build their websites, and this parameter must be filled to deal with these sites respectively.
+ For websites like https://www.pixiv.net/, as no other website uses its structure, this parameter has already been initialized and do not need to be filled.
crawler_settings (image_crawler_utils.CrawlerSettings): The CrawlerSettings used in this Parser.
cookies (image_crawler_utils.Cookies, list, dict, str, None): Cookies used in loading websites.
+ Can be one of :class:`image_crawler_utils.Cookies`, :py:class:`list`, :py:class:`dict`, :py:class:`str` or :py:data:`None`.
+ :py:data:`None` means no cookies and works the same as ``Cookies()``.
+ Leave this parameter blank works the same as :py:data:`None` / ``Cookies()``.
"""
def __init__(
self,
station_url: str,
crawler_settings: CrawlerSettings=CrawlerSettings(),
cookies: Optional[Union[Cookies, list, dict, str]]=Cookies(),
):
super().__init__()
self.crawler_settings = crawler_settings
self.station_url = parse.quote(station_url + ('/' if not station_url.endswith('/') else ''), safe='/:?=&')
if isinstance(cookies, Cookies):
self.cookies = cookies
else:
self.cookies = Cookies(cookies)
##### Funtion requires rewriting
[docs]
@abstractmethod
def run(self) -> list[ImageInfo]:
"""
MUST BE OVERRIDEN.
Generate a list of ImageInfo, containing image urls, names and infos.
"""
raise NotImplemented
##### General Function
# Display all config
[docs]
def display_all_configs(self):
"""
Display all config info.
Dataclasses will be displayed in a neater way.
"""
print("========== Current Parser Config ==========")
# Basic info
try:
print('\nBasic Info:')
print(f" + Station URL: [repr.url]{markup.escape(self.station_url)}[reset]")
if self.cookies.is_none():
print(f" + Cookies: None")
else:
print(f" + Cookies:")
print(self.cookies.cookies_selenium)
except Exception as e:
print(f"Basic Info missing because {e}!\n{traceback.format_exc()}", "error")
# Other info
if set(self.__init__.__code__.co_varnames) != set(KeywordParser.__init__.__code__.co_varnames):
print('\nOther Info:')
for varname in self.__init__.__code__.co_varnames:
if varname not in KeywordParser.__init__.__code__.co_varnames:
if getattr(self, varname, None) is not None:
print(f" + {varname}: {getattr(self, varname)}")
print('')
print("CrawlerSettings used:")
self.crawler_settings.display_all_configs()
print('')
print("========== Parser Config Ending ==========")
[docs]
def save_to_pkl(
self,
pkl_file: str,
) -> Optional[tuple[str, str]]:
"""
Save the parser in a .pkl file.
Args:
path (str): Path to save the pkl file. Default is saving to the current path.
pkl_file (str, None): Name of the pkl file. (Suffix is optional.)
Returns:
(Saved file name, Absolute path of the saved file), or None if failed.
"""
path, filename = os.path.split(pkl_file)
check_dir(path, self.crawler_settings.log)
f_name = os.path.join(path, f"{filename}.pkl")
f_name = f_name.replace(".pkl.pkl", ".pkl") # If .pkl is already contained in pkl_file, skip it
try:
with open(f_name, "wb") as f:
dill.dump(self, f)
self.crawler_settings.log.info(f'{type(self).__name__} has been dumped into [repr.filename]{markup.escape(os.path.abspath(f_name))}[reset]', extra={"markup": True})
return f_name, os.path.abspath(f_name)
except Exception as e:
self.crawler_settings.log.error(f'Failed to dump {type(self).__name__} into [repr.filename]{markup.escape(os.path.abspath(f_name))}[reset] because {e}\n{traceback.format_exc()}', extra={"markup": True})
return None
[docs]
@classmethod
def load_from_pkl(
cls,
pkl_file: str,
log: Log=Log(),
) -> CrawlerSettings:
"""
Load the parser from .pkl file.
ATTENTION: You should use the correspondent Parser class when loading. For example, loading DanbooruKeywordParser should use ``DanbooruKeywordParser.load_from_pkl()``.
Args:
pkl_file (str, None): Name of the pkl file.
log (image_crawler_utils.log.Log, None): Logging config.
Returns:
A CrawlerSettings class loaded from pkl file, or None if failed.
"""
try:
with open(pkl_file, "rb") as f:
cls = dill.load(f)
log.info(f'{type(cls).__name__} has been successfully loaded from [repr.filename]{markup.escape(os.path.abspath(pkl_file))}[reset]', extra={"markup": True})
return cls
except Exception as e:
log.error(f'Failed to load {type(cls).__name__} from [repr.filename]{markup.escape(os.path.abspath(pkl_file))}[reset] because {e}\n{traceback.format_exc()}', extra={"markup": True})
return None
# --------------------------------------------------------- #
# BASIC REQUEST METHOD: Using requests to get contents #
# --------------------------------------------------------- #
# Get webpage content
[docs]
def request_page_content(
self,
url: str,
session=requests.Session(),
headers: Optional[Union[dict, Callable]]=Empty(),
thread_delay: Union[None, float, Callable]=None,
) -> str:
"""
Download webpage content.
Args:
url (str): The URL of the page to download.
session (requests from import requests, or requests.Session): Can be requests or requests.Session()
headers (dict, Callable, None): If you need to specify headers for current request, use this argument. Set to None (default) meaning use the headers from self.crawler_settings.download_config.result_headers
thread_delay: Delay before thread running. Default set to None. Used to deal with websites like Pixiv which has a restriction on requests in a certain period of time.
Returns:
The HTML content of the webpage.
"""
self.crawler_settings.log.debug(f'Try connecting to [repr.url]{markup.escape(url)}[reset]', extra={"markup": True})
if thread_delay is None:
real_thread_delay = self.crawler_settings.download_config.result_thread_delay
else:
real_thread_delay = thread_delay() if callable(thread_delay) else thread_delay
time.sleep(real_thread_delay)
for i in range(self.crawler_settings.download_config.retry_times):
try:
download_time = self.crawler_settings.download_config.max_download_time
if isinstance(headers, Empty):
request_headers = self.crawler_settings.download_config.result_headers
else:
request_headers = headers() if callable(headers) else headers
response = session.get(
url,
headers=request_headers,
proxies=self.crawler_settings.download_config.result_proxies,
timeout=(self.crawler_settings.download_config.timeout, download_time),
)
if response.status_code == requests.status_codes.codes.ok:
self.crawler_settings.log.debug(f'Successfully connected to [repr.url]{markup.escape(url)}[reset] at attempt {i + 1}.', extra={"markup": True})
return response.text
elif response.status_code == 429:
self.crawler_settings.log.warning(f'Connecting to [repr.url]{markup.escape(url)}[reset] FAILED at attempt {i + 1} because TOO many requests at the same time (response status code {response.status_code}). Retrying to connect in 1 to 2 minutes, but it is suggested to lower the number of threads or increase thread delay time and try again.', extra={"markup": True})
time.sleep(60 + random.random() * 60)
elif 400 <= response.status_code < 500:
self.crawler_settings.log.error(f'Connecting to [repr.url]{markup.escape(url)}[reset] FAILED because response status code is {response.status_code}.', extra={"markup": True})
return None
else:
self.crawler_settings.log.warning(f'Failed to connect to [repr.url]{markup.escape(url)}[reset] at attempt {i + 1}. Response status code is {response.status_code}.', extra={"markup": True})
except Exception as e:
self.crawler_settings.log.warning(f"Connecting to [repr.url]{markup.escape(url)}[reset] at attempt {i + 1} FAILED because {e} Retry connecting.\n{traceback.format_exc()}",
output_msg=f"Connecting to [repr.url]{markup.escape(url)}[reset] at attempt {i + 1} FAILED.", extra={"markup": True})
time.sleep(self.crawler_settings.download_config.result_fail_delay)
self.crawler_settings.log.error(f'FAILED to connect to [repr.url]{markup.escape(url)}[reset]', extra={"markup": True})
return None
# Download in threads
def __request_page_content_thread(
self,
url: str,
thread_id: int,
session=requests.Session(),
headers: Optional[Union[dict, Callable]]=Empty(),
thread_delay: Union[None, float, Callable]=None,
):
"""
Works the same as self.request_page_content, except for an thread id appended to its result.
"""
result = self.request_page_content(
url=url,
session=session,
headers=headers,
thread_delay=thread_delay,
)
return result, thread_id
[docs]
def threading_request_page_content(
self,
url_list: Iterable[str],
restriction_num: Optional[int]=None,
session=requests.Session(),
headers: Optional[Union[dict, Callable, Iterable]]=Empty(),
thread_delay: Union[None, float, Callable]=None,
batch_num: Optional[int]=None,
batch_delay: Union[float, Callable]=0.0,
) -> list[str]:
"""
Download multiple webpage content using threading.
Args:
url_list (list[str]): The list of URLs of the page to download.
restriction_num (int, None): Only download the first restriction_num number of pages. Set to None (default) meaning no restrictions.
session (requests from import requests, or requests.Session): Can be requests or requests.Session()
headers (dict, list, Callable, None): If you need to specify headers for current threading requests, use this argument. Set to None (default) meaning use the headers from self.crawler_settings.download_config.result_headers
+ If it is a list, it should be of the same length as url_list, and for url_list[i] it will use the headers in headers[i]. The element in this list can be a dict of a function.
thread_delay (float, Callable, None): Delay before thread running. Default set to None. Used to deal with websites like Pixiv which has a restriction on requests in a certain period of time.
batch_num: Number of pages for each batch; using it with batch_delay to wait a certain period of time after downloading each batch. Used to deal with websites like Pixiv which has a restriction on requests in a certain period of time.
batch_delay: Delaying time (seconds) after each batch is downloaded. Used to deal with websites like Pixiv which has a restriction on requests in a certain period of time.
Returns:
A list of the HTML contents of the webpages. Its order is the same as the one of url_list.
"""
page_num = len(url_list)
if restriction_num is not None:
page_num = min(page_num, restriction_num)
l_url_list = list(url_list)
if isinstance(headers, Empty):
headers = self.crawler_settings.download_config.result_headers
elif isinstance(headers, Iterable) and not isinstance(headers, dict):
if len(headers) != len(url_list):
self.crawler_settings.log.critical(f"The number of headers ({len(url_list)}) should be of the same length as the number of URLs ({len(headers)})")
raise ValueError(f"The number of headers ({len(headers)}) should be of the same length as the number of URLs ({len(url_list)})")
l_headers = list(headers)
page_content_dict_with_thread_id = {}
self.crawler_settings.log.info(f"Total webpage num: {page_num}")
if page_num > 0:
if batch_num is None:
batch_num = page_num
batched_url_list = [l_url_list[k * batch_num:min((k + 1) * batch_num, page_num)]
for k in range((page_num - 1) // batch_num + 1)]
if isinstance(headers, Iterable) and not isinstance(headers, dict):
batched_headers = [l_headers[k * batch_num:min((k + 1) * batch_num, page_num)]
for k in range((page_num - 1) // batch_num + 1)]
with ProgressGroup(panel_title="Downloading [yellow]Webpages[reset]") as progress_group:
task = progress_group.main_count_bar.add_task("Downloading webpages:", total=page_num)
for j in range(len(batched_url_list)):
with futures.ThreadPoolExecutor(self.crawler_settings.download_config.thread_num) as executor:
# Start downloading
if isinstance(headers, Iterable) and not isinstance(headers, dict):
thread_pool = [executor.submit(
self.__request_page_content_thread,
batched_url_list[j][i],
j * batch_num + i,
session,
batched_headers[j][i],
thread_delay,
) for i in range(len(batched_url_list[j]))]
else:
thread_pool = [executor.submit(
self.__request_page_content_thread,
batched_url_list[j][i],
j * batch_num + i,
session,
headers,
thread_delay,
) for i in range(len(batched_url_list[j]))]
for thread in futures.as_completed(thread_pool):
page_content_dict_with_thread_id[thread.result()[1]] = thread.result()[0] # Successful -> content, Failed -> None
progress_group.main_count_bar.update(task, advance=1)
if (j + 1) * batch_num < page_num:
current_batch_delay = batch_delay() if callable(batch_delay) else batch_delay
restart_time = datetime.datetime.strftime(datetime.datetime.now() + datetime.timedelta(seconds=current_batch_delay), '%H:%M:%S')
self.crawler_settings.log.info(f"A batch of {len(batched_url_list[j])} {'page' if len(batched_url_list) <= 1 else 'pages'} has been downloaded. Waiting {current_batch_delay} {'second' if current_batch_delay <= 1 else 'seconds'} before resuming at {restart_time}.")
# Update progress bar to pausing
progress_group.main_count_bar.update(task, description=f"[yellow bold](Pausing)[reset] Downloading webpages:")
time.sleep(current_batch_delay)
# Reset progress bar from pausing
progress_group.main_count_bar.update(task, description=f"Downloading webpages:")
# Finished normally, set progress bar to finished state
progress_group.main_count_bar.update(task, description=f"[green]Downloading webpages finished!")
else:
self.crawler_settings.log.warning(f"No webpages are to be downloaded.")
# Return corresponding page result according to their order in URLs
page_content_list = [page_content_dict_with_thread_id[i]
for i in range(len(page_content_dict_with_thread_id))]
return page_content_list
# --------------------------------------------------------- #
# ADVANCED REQUEST METHOD: Using nodriver to get contents #
# --------------------------------------------------------- #
# Get webpage content
async def __nodriver_request_page_content(
self,
url: str,
browser: Optional[nodriver.Browser]=None,
headless: bool=True,
is_json: bool=False,
thread_delay: Union[None, float, Callable]=None,
page_stay_time: Optional[float]=None,
) -> str:
if thread_delay is None:
real_thread_delay = self.crawler_settings.download_config.result_thread_delay
else:
real_thread_delay = thread_delay() if callable(thread_delay) else thread_delay
await asyncio.sleep(real_thread_delay)
# If no browser exists, set up the browser
if browser is None:
# Display a progress bar if and only if browser is None
progress = CustomProgress(has_spinner=True, transient=True)
progress.start()
task = progress.add_task(description=f'Loading browser components...', total=2)
use_browser = await set_up_nodriver_browser(
proxies=self.crawler_settings.download_config.result_proxies,
window_width=800,
window_height=600,
headless=headless,
)
# Replace cookies, pay attention that domain should be set from station_url if not included
adapted_cookies_selenium = self.cookies.cookies_selenium
for cookie in adapted_cookies_selenium:
if ('domain' not in cookie.keys()) or cookie['domain'] == '':
cookie['domain'] = parse.urlparse(self.station_url).hostname
await update_nodriver_browser_cookies(use_browser, Cookies(adapted_cookies_selenium))
else:
use_browser = browser
if browser is None: # Display a progress bar if and only if browser is None
progress.update(task, advance=1, description=f"Loading page...")
for i in range(self.crawler_settings.download_config.retry_times):
try:
status_code = []
# Timeout func
async def tab_get_await():
if browser is None: # Use the main tab
tab = use_browser.main_tab
else: # Open a new tab
tab = await use_browser.get(new_tab=True)
def get_response_status(event): # Get response status code
if event.response.url == url:
status_code.append(event.response.status)
tab.add_handler(nodriver.cdp.network.ResponseReceived, get_response_status) # Add a handler to control this
await tab.get(url)
if page_stay_time is not None:
await asyncio.sleep(page_stay_time) # Stay for a while so that the page can be fully loaded
else:
await tab
return tab
# Check timeout
if self.crawler_settings.download_config.timeout is None:
tab = await tab_get_await()
else:
timeout_sec = self.crawler_settings.download_config.timeout
try:
tab = await asyncio.wait_for(tab_get_await(), timeout=timeout_sec + (page_stay_time if page_stay_time is not None else 0)) # Add page_stay_time to timeout
except:
raise TimeoutError(f"Cannot connect to {url} in {timeout_sec} {'second' if timeout_sec <= 1 else 'seconds'} with nodriver.")
status_code = status_code[0] if len(status_code) > 0 else 200 # If cannot get the status code, set it to 200
if status_code == requests.status_codes.codes.ok:
self.crawler_settings.log.debug(f'Successfully connected to [repr.url]{markup.escape(url)}[reset] at attempt {i + 1}.', extra={"markup": True})
if is_json:
result = await tab.get_content() # tab.select cannot deal with TOO long text!
soup = BeautifulSoup(result, 'lxml')
text = soup.find('pre').text
content = json.dumps(json.loads(text), ensure_ascii=False)
else:
content = await tab.get_content()
if browser is None: # Display a progress bar if and only if browser is None
progress.update(task, advance=1)
progress.finish_task(task)
use_browser.stop()
else:
await tab.close()
return content
elif status_code == 429:
self.crawler_settings.log.warning(f'Connecting to [repr.url]{markup.escape(url)}[reset] FAILED at attempt {i + 1} because TOO many requests at the same time (response status code {status_code}). Retrying to connect in 1 to 2 minutes, but it is suggested to lower the number of threads or increase thread delay time and try again.', extra={"markup": True})
await asyncio.sleep(60 + random.random() * 60)
elif 400 <= status_code < 500:
self.crawler_settings.log.error(f'Connecting to [repr.url]{markup.escape(url)}[reset] FAILED because response status code is {status_code}.', extra={"markup": True})
return None
else:
self.crawler_settings.log.warning(f'Failed to connect to [repr.url]{markup.escape(url)}[reset] at attempt {i + 1}. Response status code is {status_code}.', extra={"markup": True})
except Exception as e:
self.crawler_settings.log.warning(f"Connecting to [repr.url]{markup.escape(url)}[reset] at attempt {i + 1} FAILED because {e} Retry connecting.\n{traceback.format_exc()}",
output_msg=f"Connecting to [repr.url]{markup.escape(url)}[reset] at attempt {i + 1} FAILED.", extra={"markup": True})
await asyncio.sleep(self.crawler_settings.download_config.result_fail_delay)
if browser is None: # Only stop the browser when it is independently set up
use_browser.stop()
self.crawler_settings.log.error(f'FAILED to connect to [repr.url]{markup.escape(url)}[reset]', extra={"markup": True})
return None
[docs]
def nodriver_request_page_content(
self,
url: str,
browser: Optional[nodriver.Browser]=None,
headless: bool=True,
is_json: bool=False,
thread_delay: Union[None, float, Callable]=None,
page_stay_time: Optional[float]=None,
):
"""
Download webpage content with nodriver.
For those sites having strong anti-crawling measures, try using this function to bypass them.
Args:
url (str): The URL of the page to download.
browser (nodriver.Browser, None): Whether to use an existing browser instance.
headless (bool): Whether to set the browser in headless mode. Default set to :py:data:`True`. Only works when browser is None.
is_json (bool): Whether the result is a JSON text. Default set to False.
thread_delay (float, Callable, None): Delay before thread running. Default set to None. Used to deal with websites like Pixiv which has a restriction on requests in a certain period of time.
page_stay_time (float, None): Force the page to stay for page_stay_time seconds so that it can be fully loaded. Default set to None meaning no restrictions in time.
Returns:
The HTML content of the webpage.
"""
return nodriver.loop().run_until_complete(
self.__nodriver_request_page_content(
url=url,
browser=browser,
headless=headless,
is_json=is_json,
thread_delay=thread_delay,
page_stay_time=page_stay_time,
)
)
async def __nodriver_threading_request_page_content(
self,
url_list: Iterable[str],
restriction_num: Optional[int]=None,
is_json: Union[bool, Iterable[bool]]=False,
thread_delay: Union[None, float, Callable]=None,
batch_num: Optional[int]=None,
batch_delay: Union[float, Callable]=0.0,
headless: bool=True,
deconstruct_browser: bool=False,
page_stay_time: Optional[float]=None,
) -> list[str]:
page_num = len(url_list)
if restriction_num is not None:
page_num = min(page_num, restriction_num)
l_url_list = list(url_list)
if isinstance(is_json, Iterable):
if len(is_json) != len(url_list):
self.crawler_settings.log.critical(f"The number of is_json ({len(is_json)}) should be of the same length as the number of URLs ({len(url_list)})")
raise ValueError(f"The number of is_json ({len(is_json)}) should be of the same length as the number of URLs ({len(url_list)})")
l_is_json = list(l_is_json)
self.crawler_settings.log.info(f"Total webpage num: {page_num}")
page_content_list = []
if page_num > 0:
if batch_num is None:
batch_num = min(page_num, 500)
batch_delay = 0.0
silent_batch = True # Only reload browsers, no delaying.
else:
silent_batch = False
batched_url_list = [l_url_list[k * batch_num:min((k + 1) * batch_num, page_num)]
for k in range((page_num - 1) // batch_num + 1)]
if isinstance(is_json, Iterable):
batched_is_json = [l_is_json[k * batch_num:min((k + 1) * batch_num, page_num)]
for k in range((page_num - 1) // batch_num + 1)]
with ProgressGroup(panel_title="Downloading [yellow]Webpages[reset]") as progress_group:
task = progress_group.main_count_bar.add_task("Downloading webpages:", total=page_num)
# Define an async task function
async def page_task(
bar: CustomProgress,
task,
url: str,
browser: nodriver.Browser,
is_json: bool,
thread_delay: Union[float, Callable],
sem: asyncio.Semaphore, # Control max corountine number
page_stay_time: Optional[float],
):
async with sem:
result = await self.__nodriver_request_page_content(
url=url,
browser=browser,
is_json=is_json,
thread_delay=thread_delay,
page_stay_time=page_stay_time,
)
bar.update(task, advance=1)
return result
sem = asyncio.Semaphore(self.crawler_settings.download_config.thread_num) # Max coroutine number
for j in range(len(batched_url_list)):
# Set up browser instance for every batch
browser = await set_up_nodriver_browser(
proxies=self.crawler_settings.download_config.result_proxies,
window_width=800,
window_height=600,
headless=headless,
)
# Replace cookies, pay attention that domain should be set from station_url if not included
adapted_cookies_selenium = self.cookies.cookies_selenium
for cookie in adapted_cookies_selenium:
if ('domain' not in cookie.keys()) or cookie['domain'] == '':
cookie['domain'] = parse.urlparse(self.station_url).hostname
await update_nodriver_browser_cookies(browser=browser, cookies=Cookies(adapted_cookies_selenium))
self.crawler_settings.log.debug("Browser components loaded.")
results = await asyncio.gather(*[
asyncio.create_task(
page_task(
bar=progress_group.main_count_bar,
task=task,
url=batched_url_list[j][i],
browser=browser,
is_json=is_json if not isinstance(is_json, Iterable) else batched_is_json[j][i],
thread_delay=thread_delay,
sem=sem,
page_stay_time=page_stay_time,
)
)
for i in range(len(batched_url_list[j]))])
for result in results:
page_content_list.append(result)
if (j + 1) * batch_num < page_num:
current_batch_delay = batch_delay() if callable(batch_delay) else batch_delay
restart_time = datetime.datetime.strftime(datetime.datetime.now() + datetime.timedelta(seconds=current_batch_delay), '%H:%M:%S')
if not silent_batch:
self.crawler_settings.log.info(f"A batch of {len(batched_url_list[j])} {'page' if len(batched_url_list) <= 1 else 'pages'} has been downloaded. Waiting {current_batch_delay} {'second' if current_batch_delay <= 1 else 'seconds'} before resuming at {restart_time}.")
# Update progress bar to pausing
progress_group.main_count_bar.update(task, description=f"[yellow bold](Pausing)[reset] Downloading webpages:")
await asyncio.sleep(current_batch_delay)
# Reset progress bar from pausing
progress_group.main_count_bar.update(task, description=f"Downloading webpages:")
# Stop the browser
browser.stop()
self.crawler_settings.log.debug("Browser components stopped.")
# If deonstruct_browser=True, clear caches
if deconstruct_browser:
silent_deconstruct_browser(log=self.crawler_settings.log)
# Finished normally, set progress bar to finished state
progress_group.main_count_bar.update(task, description=f"[green]Downloading webpages finished!")
else:
self.crawler_settings.log.warning(f"No webpages are to be downloaded.")
return page_content_list
[docs]
def nodriver_threading_request_page_content(
self,
url_list: Iterable[str],
restriction_num: Optional[int]=None,
is_json: Union[bool, Iterable[bool]]=False,
thread_delay: Union[None, float, Callable]=None,
batch_num: Optional[int]=None,
batch_delay: Union[float, Callable]=0.0,
headless: bool=True,
deconstruct_browser: bool=False,
page_stay_time: Optional[float]=None,
) -> list[str]:
"""
Download multiple webpage content using asynchronous coroutines (similar to threads) with nodriver.
For those sites having strong anti-crawling measures, try using this function to bypass them.
Args:
url_list (list[str]): The list of URLs of the page to download.
restriction_num (int, None): Only download the first restriction_num number of pages. Set to None (default) meaning no restrictions.
is_json (bool or Iterable instance): Whether the result is a JSON text. Can be a bool or a iterable object with the same length as url_list. Default set to False.
thread_delay (float, Callable, None): Delay before thread running. Default set to None. Used to deal with websites like Pixiv which has a restriction on requests in a certain period of time.
batch_num (int): Number of pages for each batch; using it with batch_delay to wait a certain period of time after downloading each batch. Used to deal with websites like Pixiv which has a restriction on requests in a certain period of time.
batch_delay (float, Callable): Delaying time (seconds) after each batch is downloaded. Used to deal with websites like Pixiv which has a restriction on requests in a certain period of time.
headless (bool): Display a browser window or not. Default set to :py:data:`True`, and setting it to :py:data:`False` is helpful for debugging and bypassing some anti-crawling measures.
deconstruct_browser (int): Whether to deconstruct all instances and clear caches upon finishing. Can improve performances in restricted environments.
page_stay_time (float, None): Force the page to stay for page_stay_time seconds so that it can be fully loaded. Default set to None meaning no restrictions in time.
Returns:
A list of the HTML contents of the webpages. Its order is the same as the one of url_list.
"""
return nodriver.loop().run_until_complete(
self.__nodriver_threading_request_page_content(
url_list=url_list,
restriction_num=restriction_num,
is_json=is_json,
thread_delay=thread_delay,
batch_num=batch_num,
batch_delay=batch_delay,
headless=headless,
deconstruct_browser=deconstruct_browser,
page_stay_time=page_stay_time,
)
)
# --------------------------------------------------------- #
# Cloudflare related functions #
# --------------------------------------------------------- #
# Get Cloudflare cf_clearance cookies
async def __get_cloudflare_cookies(
self,
url: Optional[str]=None,
headless: bool=False,
timeout: float=60,
save_cookies_file: Optional[str]=None,
try_clicking: bool=False,
):
test_url = url if url is not None else self.station_url
self.crawler_settings.log.info(f"Loading browser to get Cloudflare cookies from [repr.url]{markup.escape(test_url)}[reset].", extra={"markup": True})
# Pass Cloudflare verification
with CustomProgress(has_spinner=True, transient=True) as progress:
task = progress.add_task(description='Loading browser components...', total=2)
try:
browser = await set_up_nodriver_browser(
proxies=self.crawler_settings.download_config.result_proxies,
headless=headless,
window_width=800,
window_height=600,
)
progress.update(task, advance=1, description="Loading Cloudflare page and try passing it...")
tab = await browser.get(test_url)
await tab
start_timestamp = datetime.datetime.now()
while (datetime.datetime.now() - start_timestamp).seconds < timeout:
try:
result = await tab.select('input[name="cf-turnstile-response"]', timeout=3)
if result is None:
break
if try_clicking:
await tab.verify_cf(flash=True)
except:
break
try:
result = await tab.select('input[name="cf-turnstile-response"]', timeout=1)
if result is not None:
self.crawler_settings.log.error("Failed to pass the Cloudflare verification.")
return
except:
pass
progress.update(task, advance=1, description="[green]Cloudflare page successfully passed!")
progress.finish_task(task)
except Exception as e:
output_msg_base = f"Failed to get the Cloudflare cookies"
self.crawler_settings.log.error(f"{output_msg_base}.\n{traceback.format_exc()}", output_msg=f"{output_msg_base} because {e}")
progress.finish_task(task)
return
# Get user agent and cookies
try:
user_agent = browser.info.get("User-Agent")
if self.crawler_settings.download_config.result_headers is None:
self.crawler_settings.download_config.headers = {'User-Agent': user_agent}
self.crawler_settings.log.info(f"User agent is replaced by: {user_agent}")
elif isinstance(self.crawler_settings.download_config.headers, dict):
self.crawler_settings.download_config.headers['User-Agent'] = user_agent
self.crawler_settings.log.info(f"User agent is replaced by: {user_agent}")
else:
self.crawler_settings.log.warning(f"User agent is unchanged! It might be because download_config.headers is a function. Your cookies may not work.")
cookies_nodriver = await browser.cookies.get_all()
self.cookies = Cookies(cookies_nodriver)
self.crawler_settings.log.info("Cookies have been replaced. You can use Parser.cookies to extract it. ATTENTION: The cookies only work with certain user agent and IP address in a certain time.")
if save_cookies_file is not None:
self.cookies.save_to_json(save_cookies_file)
browser.stop()
except Exception as e:
output_msg_base = f"Failed to parse user agent or Cookies"
self.crawler_settings.log.error(f"{output_msg_base}.\n{traceback.format_exc()}", output_msg=f"{output_msg_base} because {e}")
browser.stop()
[docs]
def get_cloudflare_cookies(
self,
url: Optional[str]=None,
headless: bool=False,
timeout: float=60,
save_cookies_file: Optional[str]=None,
try_clicking: bool=False,
):
"""
Bypass Cloudflare check and get its cookies.
Args:
url (str): Get Cloudflare cookies using this URL. Set to None (default) will use the station_url in this class.
headless (bool): Whether to display a browser window. Recommend setting to True in case you need to manually bypass Cloudflare.
save_cookies_file (str, None): Path to save the new cookies. Default set to :py:data:`None`, meaning not saving cookies.
timeout (float): Try to finish Cloudflare test in timeout seconds.
try_clicking (bool): Try to repeatedly click the verification box. MAY CAUSE THE WEBSITE TO GET STUCK IN THE VERIFICATION PAGE.
"""
nodriver.loop().run_until_complete(
self.__get_cloudflare_cookies(
url=url,
headless=headless,
timeout=timeout,
save_cookies_file=save_cookies_file,
try_clicking=try_clicking,
)
)
[docs]
class KeywordParser(Parser):
"""
A Parser for fetching result from keyword searching.
Args:
station_url (str): The URL of the main page of a website.
+ This parameter works when several websites use the same structure. For example, https://yande.re/ and https://konachan.com/ both use Moebooru to build their websites, and this parameter must be filled to deal with these sites respectively.
+ For websites like https://www.pixiv.net/, as no other website uses its structure, this parameter has already been initialized and do not need to be filled.
crawler_settings (image_crawler_utils.CrawlerSettings): The CrawlerSettings used in this Parser.
standard_keyword_string (str): Query keyword string using standard syntax. Refer to the documentation for detailed instructions.
keyword_string (str, None): If you want to directly specify the keywords used in searching, set ``keyword_string`` to a custom non-empty string. It will OVERWRITE ``standard_keyword_string``.
+ For example, set ``keyword_string`` to ``"kuon_(utawarerumono) rating:safe"`` in DanbooruKeywordParser means searching directly with this string in Danbooru, and its standard keyword string equivalent is ``"kuon_(utawarerumono) AND rating:safe"``.
cookies (image_crawler_utils.Cookies, list, dict, str, None): Cookies used in loading websites.
+ Can be one of :class:`image_crawler_utils.Cookies`, :py:class:`list`, :py:class:`dict`, :py:class:`str` or :py:data:`None`.
+ :py:data:`None` means no cookies and works the same as ``Cookies()``.
+ Leave this parameter blank works the same as :py:data:`None` / ``Cookies()``.
accept_empty (bool): If set to :py:data:`False` (default), when both ``standard_keyword_string`` and ``keyword_string`` is an empty string (like '' or ' '), a critical error will be thrown. If set to :py:data:`True`, no error will be thrown and the parameters are accepted.
"""
def __init__(
self,
station_url: str,
crawler_settings: CrawlerSettings=CrawlerSettings(),
standard_keyword_string: Optional[str]=None,
keyword_string: Optional[str]=None,
cookies: Optional[Union[Cookies, list, dict, str]]=Cookies(),
accept_empty: bool=False,
):
super().__init__(
station_url=station_url,
crawler_settings=crawler_settings,
cookies=cookies,
)
self.standard_keyword_string = standard_keyword_string
if standard_keyword_string is None or len(standard_keyword_string.strip()) == 0:
if keyword_string is None or len(keyword_string.strip()) == 0:
if not accept_empty:
self.crawler_settings.log.critical("standard_keyword_string and keyword_string cannot be empty / None at the same time!")
raise KeyError("standard_keyword_string and keyword_string cannot be empty / None at the same time!")
else:
self.crawler_settings.log.debug("standard_keyword_string is empty. Use keyword_string instead.")
self.keyword_tree = KeywordLogicTree() # An empty tree. Should not be used.
else:
self.keyword_tree = construct_keyword_tree(standard_keyword_string)
self.keyword_string = keyword_string
##### Funtion requires rewriting
[docs]
@abstractmethod
def run(self) -> list[ImageInfo]:
"""
Generate a list of ImageInfo, containing image urls, names and infos by crawling the website.
MUST BE OVERRIDDEN if inherited from Parser or KeywordParser class.
"""
raise NotImplemented
##### General Function
# Display all config
[docs]
def display_all_configs(self):
"""
Display all config info.
Dataclasses will be displayed in a neater way.
"""
print("========== Current KeywordParser Config ==========")
# Basic info
print('\nBasic Info:')
try:
print(f" + Station URL: [repr.url]{markup.escape(self.station_url)}[reset]")
print(f" + Standard keyword string: {self.standard_keyword_string}")
print(f" + Keyword tree: {self.keyword_tree.list_struct()}")
print(f" + Keyword string: {self.keyword_string}")
if self.cookies.is_none():
print(f" + Cookies: None")
else:
print(f" + Cookies:")
print(self.cookies.cookies_selenium)
except Exception as e:
print(f"Basic Info missing because {e}!\n{traceback.format_exc()}", "error")
# Other info
if set(self.__init__.__code__.co_varnames) != set(KeywordParser.__init__.__code__.co_varnames):
print('\nOther Info:')
for varname in self.__init__.__code__.co_varnames:
if varname not in KeywordParser.__init__.__code__.co_varnames:
if getattr(self, varname, None) is not None:
print(f" + {varname}: {getattr(self, varname)}")
print('')
print("CrawlerSettings used:")
self.crawler_settings.display_all_configs()
print('')
print("========== Keyword Parser Config Ending ==========")
# Generate standard keyword string
[docs]
def generate_standard_keyword_string(
self,
keyword_tree: Optional[KeywordLogicTree]=None
):
"""
Generate a standard keyword string.
Generated result may not be the same from the standard_keyword_string input.
Args:
keyword_tree: The KeywordLogicTree that a standard keyword string will be built from. Set to :py:data:`None` (default) will use the KeywordLogicTree generated from the ``standard_keyword_string`` parameter.
+ **ATTENTION:** When set to :py:data:`None`, the standard keyword string may not be absolutely same as ``standard_keyword_string``.
Returns:
A standard keyword string.
"""
# Standard keyword string
kw_tree = self.keyword_tree if keyword_tree is None else keyword_tree
self.standard_keyword_string = kw_tree.standard_keyword_string()