import os
from typing import Optional, Union, Callable
from collections.abc import Iterable
import dill
import traceback
from concurrent import futures
import requests
from rich import print, markup
from rich.progress import SpinnerColumn
from image_crawler_utils import Cookies, CrawlerSettings
from image_crawler_utils.image_downloader import download_image_from_url
from image_crawler_utils.progress_bar import ProgressGroup
from image_crawler_utils.utils import check_dir
from image_crawler_utils.log import Log
from .image_info import ImageInfo
[docs]
class Downloader:
"""
Downloading images using threading method.
Args:
crawler_settings (image_crawler_utils.CrawlerSettings): The CrawlerSettings used in this Downloader.
image_info_list (image_crawler_utils.ImageInfo): A list of ImageInfo.
store_path (str): Path to store images, or a list of storage paths respectively for every image.
+ Default is the current working directory.
+ If it set to an iterable list, then its length should be the same as ``image_info_list``.
image_info_filter (callable, bool): A callable function used to filter the images in the list of ImageInfo.
+ The function of ``image_info_filter`` should only accept 1 argument of ImageInfo type and returns `True` (download this image) or `False` (do not download this image), like:
.. code-block:: python
def filter_func(image_info: ImageInfo) -> bool:
# Meet the conditions
return True
# Do not meet the conditions
return False
+ If the function have other parameters, use ``lambda`` to exclude other parameters:
.. code-block:: python
image_info_filter=lambda info: filter_func(info, param1, param2, ...)
+ If you want to download all images in the ImageInfo list, set ``image_info_filter`` to :py:data:`True`.
+ **TIPS**: If you want to search images with complex restrictions that the image station sites may not support (e.g. Images with many keywords and restrictions on the ratio between width and height), you can simplify the query with some keywords to get all images with Parsers, and filter them with your custom ``image_info_filter`` function.
cookies (image_crawler_utils.Cookies, str, dict, list, None): Cookies used to access images from a website.
+ :py:data:`None` means no cookies and works the same as ``Cookies()``.
+ Leave this parameter blank works the same as :py:data:`None` / ``Cookies()``.
+ **TIPS**: You can add corresponding cookies to Downloader if there are URLs of images only accessible with an account. For example, if you have saved Pixiv and Twitter / X cookies respectively in ``Pixiv_cookies.json`` and ``Twitter_cookies.json``, then you can use ``cookies=Cookies.load_from_json("Pixiv_cookies.json") + Cookies.load_from_json("Twitter_cookies.json")`` to add both cookies to the Downloader.
"""
def __init__(
self,
image_info_list: Iterable[ImageInfo],
crawler_settings: CrawlerSettings=CrawlerSettings(),
store_path: Union[str, Iterable[str]]='./',
image_info_filter: Union[Callable, bool]=True,
cookies: Optional[Union[Cookies, list, dict, str]]=Cookies(),
):
self.crawler_settings = crawler_settings
self.image_info_list = image_info_list
if isinstance(store_path, str):
self.store_path = store_path + ('/' if not store_path.endswith('/') else '')
else:
if len(store_path) != len(image_info_list):
raise ValueError(f'The length of store_path ({len(store_path)}) should be the same as the length of image_info_list ({len(image_info_list)}).')
self.store_path = [path + ('/' if not path.endswith('/') else '') for path in store_path]
self.image_info_filter = image_info_filter
if isinstance(cookies, Cookies):
self.cookies = cookies
else:
self.cookies = Cookies(cookies)
[docs]
def run(self) -> tuple[int, list[ImageInfo], list[ImageInfo], list[ImageInfo]]:
"""
Run the Threading Downloader Object.
Returns:
(Total size of image downloaded, Succeeded ImageInfo list, Failed ImageInfo list, Skipped ImageInfo list)
+ **Total size of image downloaded**: An int denoting the total size (in bytes) of images downloaded.
+ **Succeeded ImageInfo list**: A list of ImageInfo containing successfully downloaded images.
+ **Failed ImageInfo list**: A list of ImageInfo containing images failed to be downloaded.
+ Images not downloaded due to reaching ``capacity`` defined in :class:`image_crawler_utils.CrawlerSettings` will be classified to this list.
+ **Skipped ImageInfo list**: A list of ImageInfo containing images skipped.
+ Images filtered out by ``image_info_filter``, not downloaded due to the restriction of ``image_num`` in :class:`image_crawler_utils.CrawlerSettings`, and skipped due to such images already exist when ``overwrite_images`` in DownloadConfig is set to :py:data:`False` will be classified to this list.
"""
# Filter image info list
download_num, filtered_ordinals_list, skipped_ordinals_list = self.__filter_ordinals_list()
# Download images
download_traffic, succeeded_ordinals_list, failed_ordinals_list = self.__download_images(download_num, filtered_ordinals_list)
# Conclude
self.crawler_settings.log.info(f"{len(succeeded_ordinals_list)} succeeded ({download_traffic / 2**20:.2f} MB in total), {len(failed_ordinals_list)} failed, {len(skipped_ordinals_list)} skipped.")
# Convert ordinal list into ImageInfo list
succeeded_image_info_list = [self.image_info_list[i] for i in succeeded_ordinals_list]
failed_image_info_list = [self.image_info_list[i] for i in failed_ordinals_list]
skipped_image_info_list = [self.image_info_list[i] for i in skipped_ordinals_list]
return download_traffic, succeeded_image_info_list, failed_image_info_list, skipped_image_info_list
[docs]
def save_to_pkl(
self,
pkl_file: str,
) -> Optional[tuple[str, str]]:
"""
Save the Downloader with settings in a pkl file.
Args:
path (str): Path to save the pkl file. Default is saving to the current path.
pkl_file (str, None): Name of the pkl file. (Suffix is optional.)
Returns:
(Saved file name, Absolute path of the saved file), or None if failed.
"""
path, filename = os.path.split(pkl_file)
check_dir(path, self.crawler_settings.log)
f_name = os.path.join(path, f"{filename}.pkl")
f_name = f_name.replace(".pkl.pkl", ".pkl") # If .pkl is already contained in pkl_file, skip it
try:
with open(f_name, "wb") as f:
dill.dump(self, f)
self.crawler_settings.log.info(f'{type(self).__name__} has been dumped into [repr.filename]{markup.escape(os.path.abspath(f_name))}[reset]', extra={"markup": True})
return f_name, os.path.abspath(f_name)
except Exception as e:
self.crawler_settings.log.error(f'Failed to dump {type(self).__name__} into [repr.filename]{markup.escape(os.path.abspath(f_name))}[reset] because {e}\n{traceback.format_exc()}', extra={"markup": True})
return None
[docs]
@classmethod
def load_from_pkl(
cls,
pkl_file: str,
log: Log=Log(),
) -> CrawlerSettings:
"""
Load parser from .pkl file.
Args:
pkl_file (str, None): Name of the pkl file.
log (image_crawler_utils.log.Log, None): Logging config.
Returns:
A CrawlerSettings class loaded from pkl file, or None if failed.
"""
try:
with open(pkl_file, "rb") as f:
cls = dill.load(f)
log.info(f'{type(cls).__name__} has been successfully loaded from [repr.filename]{markup.escape(os.path.abspath(pkl_file))}[reset]', extra={"markup": True})
return cls
except Exception as e:
log.error(f'Failed to load {type(cls).__name__} from [repr.filename]{markup.escape(os.path.abspath(pkl_file))}[reset] because {e}\n{traceback.format_exc()}', extra={"markup": True})
return None
# Filter image info list
def __filter_ordinals_list(self) -> tuple[int, list[ImageInfo], list[ImageInfo]]:
# Filter iamges
filtered_ordinals_list: list[int] = []
skipped_ordinals_list: list[int] = []
for i in range(len(self.image_info_list)):
item = self.image_info_list[i]
if type(self.image_info_filter) is bool and self.image_info_filter:
filtered_ordinals_list.append(i)
elif callable(self.image_info_filter) and self.image_info_filter(item):
filtered_ordinals_list.append(i)
else:
skipped_ordinals_list.append(i)
if len(skipped_ordinals_list) > 0:
self.crawler_settings.log.info(f"{len(skipped_ordinals_list)} {'images' if len(skipped_ordinals_list) > 1 else 'image'} will be skipped because {'these images are' if len(skipped_ordinals_list) > 1 else 'this image is'} filtered out by image_info_filter.")
# Skip downloaded images if set in download_config
existed_ordinals_list = []
if self.crawler_settings.download_config.overwrite_images is False:
for ord in filtered_ordinals_list:
if isinstance(self.store_path, str): # Single store path
image_path = os.path.join(self.store_path, self.image_info_list[ord].name)
else: # List of store paths
image_path = os.path.join(self.store_path[ord], self.image_info_list[ord].name)
if os.path.exists(image_path):
existed_ordinals_list.append(ord)
self.crawler_settings.log.debug(f"{image_path} exists and will be skipped.")
for ord in existed_ordinals_list:
filtered_ordinals_list.remove(ord)
skipped_ordinals_list.append(ord)
filtered_ordinals_list.sort() # Sort ordinals from small to large
skipped_ordinals_list.sort() # Sort ordinals from small to large
if len(existed_ordinals_list) > 0:
self.crawler_settings.log.info(f"{len(existed_ordinals_list)} {'images' if len(existed_ordinals_list) > 1 else 'image'} will be skipped because {'these images have' if len(existed_ordinals_list) > 1 else 'this image has'} existed.")
# Calc download image num
total_num = len(filtered_ordinals_list)
download_num = total_num if self.crawler_settings.capacity_count_config.image_num is None else min(total_num, self.crawler_settings.capacity_count_config.image_num)
# Move image num over download_num into skipped_ordinals_list
skipped_ordinals_list.extend(filtered_ordinals_list[download_num:])
skipped_ordinals_list.sort() # Sort ordinals from small to large
filtered_ordinals_list = filtered_ordinals_list[:download_num]
return download_num, filtered_ordinals_list, skipped_ordinals_list
# Download images
def __download_images(self, download_num: int, filtered_ordinals_list: list[ImageInfo]) -> tuple[float, list[ImageInfo], list[ImageInfo]]:
if download_num <= 0:
self.crawler_settings.log.warning(f"No images are to be downloaded.")
return 0, [], []
if isinstance(self.store_path, str): # Single store path
check_dir(self.store_path, self.crawler_settings.log)
self.crawler_settings.log.info(f'Images will be saved at [repr.filename]{markup.escape(os.path.abspath(self.store_path))}[reset]', extra={"markup": True})
else: # List of store paths
for ord in filtered_ordinals_list:
check_dir(self.store_path[ord], self.crawler_settings.log)
self.crawler_settings.log.info(f'Images will be saved at paths specified in the iterable store_path.')
self.crawler_settings.log.info("Starting image downloading.", output_msg="========== Start Image Downloading ==========")
self.crawler_settings.log.info(f"Total downloading num: {download_num}")
download_traffic = 0
succeeded_id = []
# Start downloading
with ProgressGroup(panel_title="Downloading [cyan]Images[reset]") as progress_group:
progress_group.sub_count_bar.columns = (SpinnerColumn(), *progress_group.sub_count_bar.columns) # Add a spinner to its left
task = progress_group.main_count_bar.add_task("Downloading:", total=download_num)
undone_ids = list(range(download_num))
failed_ids = []
fail_count = [0] * download_num
shutdown_flag = False
with requests.Session() as session:
if not self.cookies.is_none():
session.cookies.update(self.cookies.cookies_dict)
while len(undone_ids) > 0:
# Threading current undone ids
with futures.ThreadPoolExecutor(self.crawler_settings.download_config.thread_num) as executor:
if isinstance(self.store_path, str): # Single store path
download_thread_pool = [executor.submit(
download_image_from_url,
self.image_info_list[filtered_ordinals_list[i]].url if fail_count[i] == 0 else self.image_info_list[filtered_ordinals_list[i]].backup_urls[fail_count[i] - 1],
self.image_info_list[filtered_ordinals_list[i]].name,
self.crawler_settings.download_config,
self.crawler_settings.log,
self.store_path,
session,
progress_group,
i,
None,
) for i in undone_ids]
else: # List of store paths
download_thread_pool = [executor.submit(
download_image_from_url,
self.image_info_list[filtered_ordinals_list[i]].url if fail_count[i] == 0 else self.image_info_list[filtered_ordinals_list[i]].backup_urls[fail_count[i] - 1],
self.image_info_list[filtered_ordinals_list[i]].name,
self.crawler_settings.download_config,
self.crawler_settings.log,
self.store_path[filtered_ordinals_list[i]],
session,
progress_group,
i,
None,
) for i in undone_ids]
for thread in futures.as_completed(download_thread_pool):
if thread.result()[0] > 0:
# Successful download
succeeded_n = thread.result()[1]
download_traffic += thread.result()[0]
succeeded_id.append(succeeded_n)
undone_ids.remove(succeeded_n)
progress_group.main_count_bar.update(task, advance=1, description=f"Downloading [repr.number]{download_traffic / 2**20:.2f}[reset] MB:")
else:
# Failed download
download_traffic += thread.result()[0]
failed_n = thread.result()[1]
fail_count[failed_n] += 1
# If there are backup URLs, record it
if len(self.image_info_list[filtered_ordinals_list[failed_n]].backup_urls) >= fail_count[failed_n]:
self.crawler_settings.log.info(f"Found other URLs, putting [repr.filename]{markup.escape(self.image_info_list[filtered_ordinals_list[failed_n]].name)}[reset] into downloading queue again.", extra={"markup": True})
if failed_n not in failed_ids:
failed_ids.append(failed_n)
else:
progress_group.main_count_bar.update(task, advance=1, description=f"Downloading [repr.number]{download_traffic / 2**20:.2f}[reset] MB:")
# Remove from failed_ids recording
if failed_n in failed_ids:
failed_ids.remove(failed_n)
undone_ids.remove(failed_n)
if self.crawler_settings.capacity_count_config.capacity is not None and download_traffic > self.crawler_settings.capacity_count_config.capacity:
self.crawler_settings.log.warning("Downloading capacity reached!")
executor.shutdown(wait=False, cancel_futures=True)
undone_ids = []
failed_ids = []
shutdown_flag = True
break
if shutdown_flag: # Interrupted!
progress_group.main_count_bar.update(task, description=f"[red]Downloading interrupted! [repr.number]{download_traffic / 2**20:.2f}[reset] MB:")
else: # Finished normally, set progress bar to finished state
progress_group.main_count_bar.update(task, description=f"[green]Downloading finished! [repr.number]{download_traffic / 2**20:.2f}[reset] MB:")
succeeded_ordinals_list = [filtered_ordinals_list[i]
for i in succeeded_id]
failed_ordinals_list = [filtered_ordinals_list[i]
for i in range(len(filtered_ordinals_list)) if i not in succeeded_id]
# Remove .tmp files
if isinstance(self.store_path, str):
for root, dirs, files in os.walk(self.store_path):
for name in files:
if os.path.splitext(name)[1] == '.tmp':
os.remove(os.path.join(self.store_path, name))
else:
for path in self.store_path:
for root, dirs, files in os.walk(path):
for name in files:
if os.path.splitext(name)[1] == '.tmp':
os.remove(os.path.join(self.store_path, name))
self.crawler_settings.log.info("Image downloading completed.", output_msg="========== Image Downloading Complete ==========")
return download_traffic, succeeded_ordinals_list, failed_ordinals_list
##### Not directly related to downloading
# Display all config
[docs]
def display_all_configs(self):
"""
Display all config info.
Dataclasses will be displayed in a neater way.
"""
print("========== Current Downloader Config ==========")
print('\nBasic Info:')
try:
print(f" + Image info filter: {self.image_info_filter}")
print(f" + Store path: [repr.filename]{markup.escape(self.store_path)}[reset]")
print(f" + Absolute store path: [repr.filename]{markup.escape(os.path.abspath(self.store_path))}[reset]")
except Exception as e:
print(f"Basic Info missing because {e}!\n{traceback.format_exc()}", "error")
print('\nImage downloading info:')
try:
download_num, filtered_ordinals_list, skipped_ordinals_list = self.__filter_ordinals_list()
print(f" + Number of images to be downloaded: {len(filtered_ordinals_list)}")
except Exception as e:
print(f"Image downloading info has an error because {e}!\n{traceback.format_exc()}", "error")
print('')
print("CrawlerSettings used:")
self.crawler_settings.display_all_configs()
print('')
print("========== Config Display Ending ==========")