""" mangabaka_works_resolver.py =========================== Fetches volume-level (work) data and volume cover images from the MangaBaka API. Each "work" is a physical tankobon volume and may carry: - volume number - ISBN / GTIN - page count (used for chapter-to-volume estimation) - release date - cover image (raw / default / small variants) Cover resolution order (per volume) ------------------------------------ 1. GET /v1/series/{id}/images — covers that exist independently of a work (some series have covers but no works). English edition preferred; original language used when no English cover is available. 2. GET /v1/series/{id}/works — physical tankobon data including covers. Fallback when /images returns nothing for the requested volume. If no volume cover is found at all, callers fall back to the series-level default cover from the series object itself. Dependencies ------------ requests -> pip install requests """ from __future__ import annotations import requests # -------------------------------------------------------------------------- # Generic image-block URL picker (shared by /images and /works responses) # -------------------------------------------------------------------------- def _pick_image_url(image) -> "str | None": """ Returns the best URL from a MangaBaka image block. Handles the common ``{raw, x150, x250, x350}`` structure used by both the ``cover`` field on series/work objects and the ``image`` field on ``/images`` endpoint items:: { "raw": {"url": "...", "size": ..., "height": ..., "width": ...}, "x150": {"x1": "...", "x2": "...", "x3": "..."}, "x250": {...}, "x350": {...} } Preference: raw original > x350@x3 > x250@x3 > x150@x3 > … (falling through to lower densities and sizes as needed). """ if not image: return None if isinstance(image, str): return image if not isinstance(image, dict): return None # 1) Raw / unscaled image raw = image.get("raw") if isinstance(raw, dict): url = raw.get("url") if isinstance(url, str) and url: return url elif isinstance(raw, str) and raw: return raw # 2) Size-keyed CDN variants, largest first, highest density first for size_key in ("x350", "x250", "x150"): variant = image.get(size_key) if isinstance(variant, dict): for density in ("x3", "x2", "x1"): url = variant.get(density) if isinstance(url, str) and url: return url elif isinstance(variant, str) and variant: return variant # 3) Last-ditch: any HTTP URL anywhere in the structure for val in image.values(): if isinstance(val, str) and val.startswith("http"): return val if isinstance(val, dict): for sub_val in val.values(): if isinstance(sub_val, str) and sub_val.startswith("http"): return sub_val return None class MangaBakaWorksResolver: """ Fetches and caches MangaBaka volume (work) data and cover images. Cover lookup order per volume ------------------------------ 1. ``/v1/series/{id}/images`` — edition covers (English > original). 2. ``/v1/series/{id}/works`` — physical tankobon covers. Only works that carry a cover image are retained in the works cache. """ def __init__(self, api_base_url: str = "https://api.mangabaka.dev/v1", request_timeout: int = 30, session: "requests.Session | None" = None): self.api_base_url = api_base_url.rstrip("/") self.request_timeout = request_timeout self._session = session or requests.Session() self._session.headers.setdefault("User-Agent", "MangaBakaWorksResolver/1.0") # Cache: series_id (str) -> list of work dicts (only those with covers) self._cache: dict[str, list[dict]] = {} # Cache: series_id (str) -> {norm_vol (str): url (str)} self._images_cache: dict[str, dict[str, str]] = {} # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def get_works(self, series_id: str) -> list[dict]: """ Returns volume-level works for a series, filtered to those that have a usable cover image. Results are cached per series. Pages through the API (limit=50) until the response returns an empty page, collecting all works before applying the cover filter. """ if not series_id: return [] if series_id in self._cache: return self._cache[series_id] all_works: list[dict] = [] page = 1 try: while True: resp = self._session.get( f"{self.api_base_url}/series/{series_id}/works", params={"limit": 50, "page": page}, timeout=self.request_timeout, ) resp.raise_for_status() page_data = resp.json().get("data") or [] if not page_data: break all_works.extend(page_data) if len(page_data) < 50: break page += 1 except requests.RequestException: if not all_works: return [] # Discard works that carry no usable cover works_with_cover = [w for w in all_works if w.get("images")] self._cache[series_id] = works_with_cover return works_with_cover def get_work_for_volume(self, series_id: str, volume) -> "dict | None": """ Returns the work dict for a specific volume number, or None. Volume comparison normalises trailing ".0" (e.g. "1.0" == "1"). """ works = self.get_works(series_id) if not works: return None target = _norm_vol(volume) for work in works: if _norm_vol(work.get("sequence_string")) == target: return work return None def get_volume_covers(self, series_id: str) -> "dict[str, str]": """ Fetches all volume-type cover images for a series from ``/v1/series/{id}/images`` and returns a ``{normalised_volume_str: url}`` mapping. English-edition covers are preferred; the first available language is used as fallback when no English cover exists for a volume. Results are cached per series. """ if not series_id: return {} if series_id in self._images_cache: return self._images_cache[series_id] raw_items: list[dict] = [] page = 1 try: while True: resp = self._session.get( f"{self.api_base_url}/series/{series_id}/images", params={"limit": 50, "page": page}, timeout=self.request_timeout, ) resp.raise_for_status() page_data = resp.json().get("data") or [] if not page_data: break raw_items.extend(page_data) if len(page_data) < 50: break page += 1 except requests.RequestException: pass # Group by normalised volume index; collect all languages per volume. by_volume: dict[str, dict[str, str]] = {} # norm_vol -> {lang: url} for item in raw_items: if item.get("type") != "volume": continue idx = item.get("index_numeric") if idx is None: continue norm = _norm_vol(idx) lang = (item.get("language") or "").lower() or "unknown" url = _pick_image_url(item.get("image")) if not url: continue if norm not in by_volume: by_volume[norm] = {} # First entry per language wins (API order reflects quality/rank). if lang not in by_volume[norm]: by_volume[norm][lang] = url # Pick best language per volume: English first, then first available. result: dict[str, str] = {} for norm, lang_map in by_volume.items(): url = lang_map.get("en") or next(iter(lang_map.values()), None) if url: result[norm] = url self._images_cache[series_id] = result return result def get_cover_for_volume_from_images(self, series_id: str, volume) -> "str | None": """ Returns the cover URL for a specific volume from the /images endpoint, or None if not available. """ covers = self.get_volume_covers(series_id) if not covers: return None return covers.get(_norm_vol(volume)) def get_cover_for_volume(self, series_id: str, volume) -> "str | None": """ Returns the best cover URL for a specific volume. Tries the ``/images`` endpoint first (covers that exist even when no physical work has been catalogued), then falls back to the ``/works`` endpoint. Returns None if neither source has a cover for the volume. """ # 1. /images endpoint (covers without works) url = self.get_cover_for_volume_from_images(series_id, volume) if url: return url # 2. /works endpoint fallback work = self.get_work_for_volume(series_id, volume) if not work or not work.get("images"): return None return _pick_image_url(work["images"][0].get("image")) def get_page_counts(self, series_id: str) -> "dict[str, int]": """ Returns {volume_str: page_count} for all cached works. Used by MangaDexVolumeResolver for chapter-to-volume estimation. """ result: dict[str, int] = {} for work in self.get_works(series_id): vol = _norm_vol(work.get("volume")) pages = work.get("pages") if vol and pages is not None: try: result[vol] = int(pages) except (TypeError, ValueError): pass return result def clear_cache(self) -> None: """Clears both the works cache and the images cover cache.""" self._cache.clear() self._images_cache.clear() # -------------------------------------------------------------------------- # Module helper # -------------------------------------------------------------------------- def _norm_vol(value) -> str: """Normalises a volume identifier: strips whitespace, removes trailing .0.""" text = str(value or "").strip() try: f = float(text) if f.is_integer(): return str(int(f)) except ValueError: pass return text