This commit is contained in:
2026-05-27 18:58:42 +02:00
parent cd0757ab6d
commit 01c562dc46
20 changed files with 4579 additions and 0 deletions
+6
View File
@@ -0,0 +1,6 @@
KAVITA_URL=http://192.168.1.100:5000
KAVITA_API_KEY=your-api-key-here
LIBRARY_IDS=3,5
LANGUAGE=en
MATCH_PATH=matches.json
WEB_PORT=8080
+7
View File
@@ -267,3 +267,10 @@ pyvenv.cfg
.venv
pip-selfcheck.json
manga-mover-and-metadata-collector/
# Project-local state
matches.json
config/
output/
+18
View File
@@ -0,0 +1,18 @@
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ /app/src/
COPY main.py /app/main.py
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1
VOLUME ["/config"]
EXPOSE 8080
CMD ["python", "/app/main.py"]
+54
View File
@@ -1,2 +1,56 @@
# kavita-lightnovel-metadata-fetcher
Pulls metadata (summary, tags, genres, characters, staff, score,
cover, links, related series) for light novels from **MangaBaka**,
enriched with **MyAnimeList** and **AniList** data, and writes it
back to a **Kavita** server through its REST API.
No file mover, no ComicInfo.xml — the source of truth is Kavita
itself. Series are discovered via the Kavita library API.
## Features
- Match every series in one or more Kavita libraries against
MangaBaka and persist the match in `matches.json` (editable via
the web UI).
- Update metadata for a single series or all matched series at
once. Updates are diff-based:
- Locked fields in Kavita are never overwritten.
- List fields (tags, genres, characters, writers, …) are merged:
new items are added, removed items are dropped.
- Cover images are only re-uploaded when MangaBaka's cover URL
actually changed.
- Characters and authors are synced to Kavita Person records
(image, description, MAL/AniList id) via Kavita's `/api/Person`
endpoints.
- MangaBaka relationships (sequel / prequel / spin-off / …) are
mirrored as Kavita series relationships, and every related
series that exists in Kavita is added to a shared collection.
## Environment
| Variable | Default | Description |
| ------------------ | ------------------------- | -------------------------------------------------------- |
| `KAVITA_URL` | — | Base URL of the Kavita server, e.g. `http://kavita:5000` |
| `KAVITA_API_KEY` | — | API key from Kavita user settings |
| `LIBRARY_IDS` | _(empty)_ | Default libraries (CSV of ids). Empty = pick in WebUI. |
| `LANGUAGE` | `en` | Series language ISO code (used for `language` field) |
| `REQUEST_TIMEOUT` | `30` | HTTP timeout in seconds |
| `MATCH_PATH` | `/config/matches.json` | Where to persist the match cache |
| `WEB_HOST` | `0.0.0.0` | Bind host for the Flask UI |
| `WEB_PORT` | `8080` | Bind port for the Flask UI |
## Running locally
```bash
pip install -r requirements.txt
KAVITA_URL=http://localhost:5000 KAVITA_API_KEY=... python main.py
```
Then open <http://localhost:8080/>.
## Docker
```bash
docker compose -f docker-compose.prod.yml up -d
```
+16
View File
@@ -0,0 +1,16 @@
services:
kavita-lightnovel-metadata-fetcher:
image: gitea.johannesbot.de/johannesbot/kavita-lightnovel-metadata-fetcher:latest
container_name: kavita-lightnovel-metadata-fetcher
restart: unless-stopped
environment:
KAVITA_URL: "${KAVITA_URL}"
KAVITA_API_KEY: "${KAVITA_API_KEY}"
LIBRARY_IDS: "${LIBRARY_IDS}"
LANGUAGE: "${LANGUAGE:-en}"
MATCH_PATH: "${MATCH_PATH:-/config/matches.json}"
WEB_PORT: "${WEB_PORT:-8080}"
ports:
- "${WEB_PORT:-8080}:${WEB_PORT:-8080}"
volumes:
- "${HOST_CONFIG_PATH}:/config"
+122
View File
@@ -0,0 +1,122 @@
"""
main.py
=======
Container entry point for the Kavita light-novel metadata fetcher.
Reads configuration from environment variables, starts the orchestrator
and exposes the Flask WebApp on WEB_HOST:WEB_PORT. Everything happens
through HTTP — there is no folder watcher and no file mover (Kavita is
the source of truth for the library content; this service only writes
metadata back to it).
Environment variables
---------------------
Required:
KAVITA_URL base URL of the Kavita server, e.g. http://kavita:5000
KAVITA_API_KEY Kavita API key (Settings -> User -> API key)
Optional:
LIBRARY_IDS comma-separated default library ids (e.g. "3,5").
Empty = user picks in the WebUI each time.
LANGUAGE default "en"
REQUEST_TIMEOUT default 30
MATCH_PATH default /config/matches.json
WEB_PORT default 8080
WEB_HOST default 0.0.0.0
"""
from __future__ import annotations
import os
import sys
from pathlib import Path
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
# Make src/ importable when running as `python main.py`.
sys.path.insert(0, str(Path(__file__).resolve().parent / "src"))
from src.MatchesCache import MatchesCache # noqa: E402
from src.LightNovelOrchestrator import LightNovelOrchestrator # noqa: E402
from src.MatchesWebApp import MatchesWebApp # noqa: E402
def _env_str(name: str, default: "str | None" = None,
required: bool = False) -> "str | None":
value = os.environ.get(name, default)
if required and not value:
print(f"[main] missing required env var: {name}", flush=True)
sys.exit(2)
return value
def _env_int(name: str, default: int) -> int:
raw = os.environ.get(name)
if raw is None or raw == "":
return default
try:
return int(raw)
except ValueError:
print(f"[main] {name}={raw!r} is not a valid integer; "
f"falling back to {default}", flush=True)
return default
def _env_int_list(name: str) -> list[int]:
raw = os.environ.get(name) or ""
out: list[int] = []
for part in raw.split(","):
part = part.strip()
if not part:
continue
try:
out.append(int(part))
except ValueError:
print(f"[main] {name}: ignoring non-integer value {part!r}",
flush=True)
return out
def main() -> int:
kavita_url = _env_str("KAVITA_URL", required=True)
kavita_api_key = _env_str("KAVITA_API_KEY", required=True)
language = _env_str("LANGUAGE", "en") or "en"
request_timeout = _env_int("REQUEST_TIMEOUT", 30)
match_path = _env_str("MATCH_PATH", "/config/matches.json")
web_host = _env_str("WEB_HOST", "0.0.0.0") or "0.0.0.0"
web_port = _env_int("WEB_PORT", 8080)
library_ids = _env_int_list("LIBRARY_IDS")
print(f"[main] kavita url = {kavita_url}", flush=True)
print(f"[main] language = {language}", flush=True)
print(f"[main] match path = {match_path}", flush=True)
print(f"[main] libraries = {library_ids or '(picked in WebUI)'}",
flush=True)
print(f"[main] web = {web_host}:{web_port}", flush=True)
cache = MatchesCache(match_path)
orchestrator = LightNovelOrchestrator(
kavita_url=kavita_url,
kavita_api_key=kavita_api_key,
matches_cache=cache,
language=language,
request_timeout=request_timeout,
)
app = MatchesWebApp(
cache, orchestrator=orchestrator,
default_library_ids=library_ids,
host=web_host, port=web_port,
)
app.start()
app.wait()
return 0
if __name__ == "__main__":
sys.exit(main())
+3
View File
@@ -0,0 +1,3 @@
requests>=2.31
Flask>=3.0
python-dotenv>=1.0
+507
View File
@@ -0,0 +1,507 @@
"""
anilist_resolver.py
===================
Fetches and caches AniList manga metadata (statistics, characters, staff)
using the public AniList GraphQL API.
AniList API: https://graphql.anilist.co (no authentication required)
Rate limit: 90 req/min -> a 700 ms guard between calls is applied.
On HTTP 429 (rate-limit exceeded) the response Retry-After header is
honoured; the request is retried once automatically.
Singleton
---------
Only one instance of this class exists per process. Subsequent calls to
AniListResolver() return the same object with its warm caches intact.
Provided features
-----------------
- Title-based AniList ID lookup with best-match scoring
- Manga statistics: score (010), rank, popularity, members, favorites
- Character list for a manga (names only — for <Characters> XML tag)
- Detailed character list: name, AniList character ID, image URL, role
- Detailed staff list: name, AniList person ID, image URL, positions
- Lazy full-detail fetches per character / person (for descriptions)
Dependencies
------------
requests -> pip install requests
"""
from __future__ import annotations
import datetime
import difflib
import time
import requests
from MediaResolver import MediaResolver
# --------------------------------------------------------------------------
# GraphQL query strings
# --------------------------------------------------------------------------
_SEARCH_MANGA = """
query ($search: String) {
Page(page: 1, perPage: 5) {
media(search: $search, type: MANGA, format_in: [NOVEL]) {
id title { romaji english native } siteUrl
}
}
}
"""
_MANGA_STATS = """
query ($id: Int) {
Media(id: $id, type: MANGA) {
id title { romaji english native }
meanScore popularity favourites
rankings { rank type allTime }
siteUrl
}
}
"""
_MANGA_CHARACTERS = """
query ($id: Int) {
Media(id: $id, type: MANGA) {
characters(sort: [ROLE, RELEVANCE], perPage: 25) {
nodes { id name { full } image { large } siteUrl }
edges { role }
}
}
}
"""
_MANGA_STAFF = """
query ($id: Int) {
Media(id: $id, type: MANGA) {
staff(perPage: 25) {
nodes { id name { full } image { large } siteUrl }
edges { role }
}
}
}
"""
_CHARACTER_DETAILS = """
query ($id: Int) {
Character(id: $id) {
id name { full } image { large }
description(asHtml: false)
favourites siteUrl
}
}
"""
_PERSON_DETAILS = """
query ($id: Int) {
Staff(id: $id) {
id name { full native } image { large }
description(asHtml: false)
favourites siteUrl
dateOfBirth { year month day }
primaryOccupations
homeTown
}
}
"""
_ANILIST_GQL = "https://graphql.anilist.co"
class AniListResolver(MediaResolver):
"""
Singleton: fetches and caches AniList manga data via GraphQL API.
The first call to AniListResolver() creates and initialises the instance;
all subsequent calls return the same object.
"""
_instance: "AniListResolver | None" = None
# ------------------------------------------------------------------
# Singleton machinery
# ------------------------------------------------------------------
def __new__(cls, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, *, request_timeout: int = 30):
if self._initialized:
return
self.request_timeout = request_timeout
self._session = requests.Session()
self._session.headers.update({
"User-Agent": "AniListResolver/1.0",
"Content-Type": "application/json",
"Accept": "application/json",
})
# title_lower -> al_id
self._id_cache: dict[str, "int | None"] = {}
# al_id -> stats dict
self._stats_cache: dict[int, dict] = {}
# manga_al_id -> [name_str, ...]
self._char_names_cache: dict[int, list[str]] = {}
# manga_al_id -> [{al_id, name, image_url, role}]
self._char_detailed_cache: dict[int, list[dict]] = {}
# manga_al_id -> [{al_id, name, image_url, positions}]
self._staff_detailed_cache: dict[int, list[dict]] = {}
# char_al_id -> {al_id, name, image_url, about, favorites, url}
self._char_info_cache: dict[int, dict] = {}
# person_al_id -> {al_id, name, image_url, about, favorites, url, ...}
self._person_info_cache: dict[int, dict] = {}
self._last_request_at: float = 0.0
self._initialized = True
# ------------------------------------------------------------------
# Public: ID lookup
# ------------------------------------------------------------------
def find_id(self, title: str) -> "int | None":
"""
Searches AniList for a manga by title and returns the best-matching
AniList ID. Returns None on failure or when no result is found.
"""
if not title or not title.strip():
return None
key = title.strip().lower()
if key in self._id_cache:
return self._id_cache[key]
try:
data = self._gql(_SEARCH_MANGA, {"search": title})
results = ((data.get("data") or {})
.get("Page", {})
.get("media") or [])
except requests.RequestException:
return None
if not results:
self._id_cache[key] = None
return None
results.sort(key=lambda e: _score_title(title, e), reverse=True)
al_id = results[0].get("id")
self._id_cache[key] = al_id
return al_id
# ------------------------------------------------------------------
# Public: statistics
# ------------------------------------------------------------------
def get_stats(self, tracker_id: "int | None") -> "dict | None":
"""
Returns a statistics dict for the given AniList manga ID:
{score, rank, scored_by, popularity, members, favorites,
url, title, as_of (DD-MM-YYYY)}
Returns None if tracker_id is None or on network failure.
"""
if tracker_id is None:
return None
if tracker_id in self._stats_cache:
return self._stats_cache[tracker_id]
try:
data = self._gql(_MANGA_STATS, {"id": tracker_id})
entry = (data.get("data") or {}).get("Media") or {}
except requests.RequestException:
return None
title_obj = entry.get("title") or {}
title = (title_obj.get("romaji")
or title_obj.get("english")
or title_obj.get("native") or "")
# AniList meanScore is 0100; normalise to 0.010.0 for consistency
# with the MALResolver stats dict shape.
raw_score = entry.get("meanScore")
score = round(raw_score / 10, 1) if raw_score is not None else None
# Ranked and popularity ranks are in the rankings array.
rated_rank = None
popular_rank = None
for r in (entry.get("rankings") or []):
if r.get("allTime"):
if r.get("type") == "RATED" and rated_rank is None:
rated_rank = r.get("rank")
if r.get("type") == "POPULAR" and popular_rank is None:
popular_rank = r.get("rank")
stats: dict = {
"score": score,
"rank": rated_rank,
"scored_by": None, # not exposed by AniList API
"popularity": popular_rank,
"members": entry.get("popularity"), # AniList's popularity = member count
"favorites": entry.get("favourites"),
"url": entry.get("siteUrl") or f"https://anilist.co/manga/{tracker_id}",
"title": title,
"as_of": datetime.date.today().strftime("%d-%m-%Y"),
}
self._stats_cache[tracker_id] = stats
return stats
# ------------------------------------------------------------------
# Public: character names (for ComicInfo <Characters> tag)
# ------------------------------------------------------------------
def get_characters(self, tracker_id: "int | None") -> list[str]:
"""Returns a flat list of character names for the manga."""
if tracker_id is None:
return []
if tracker_id in self._char_names_cache:
return self._char_names_cache[tracker_id]
detailed = self.get_characters_detailed(tracker_id)
names = [e["name"] for e in detailed if e.get("name")]
if names:
self._char_names_cache[tracker_id] = names
return names
# ------------------------------------------------------------------
# Public: detailed character data
# ------------------------------------------------------------------
def get_characters_detailed(self, tracker_id: "int | None") -> list[dict]:
"""
Returns detailed character entries for a manga:
[{al_id, mal_id, name, image_url, role, about=None}, ...]
"""
if tracker_id is None:
return []
if tracker_id in self._char_detailed_cache:
return self._char_detailed_cache[tracker_id]
try:
data = self._gql(_MANGA_CHARACTERS, {"id": tracker_id})
chars = ((data.get("data") or {})
.get("Media", {})
.get("characters") or {})
nodes = chars.get("nodes") or []
edges = chars.get("edges") or []
except requests.RequestException:
return []
results = []
for node, edge in zip(nodes, edges):
name = (node.get("name") or {}).get("full") or ""
if not name:
continue
results.append({
"al_id": node.get("id"),
"mal_id": None,
"name": name,
"raw_name": name,
"image_url": (node.get("image") or {}).get("large"),
"role": edge.get("role") or "SUPPORTING",
"about": None,
})
if results:
self._char_detailed_cache[tracker_id] = results
return results
# ------------------------------------------------------------------
# Public: detailed staff data
# ------------------------------------------------------------------
def get_staff_detailed(self, tracker_id: "int | None") -> list[dict]:
"""
Returns detailed staff entries for a manga:
[{al_id, mal_id, name, image_url, positions, about=None}, ...]
"""
if tracker_id is None:
return []
if tracker_id in self._staff_detailed_cache:
return self._staff_detailed_cache[tracker_id]
try:
data = self._gql(_MANGA_STAFF, {"id": tracker_id})
staff = ((data.get("data") or {})
.get("Media", {})
.get("staff") or {})
nodes = staff.get("nodes") or []
edges = staff.get("edges") or []
except requests.RequestException:
return []
results = []
for node, edge in zip(nodes, edges):
name = (node.get("name") or {}).get("full") or ""
if not name:
continue
results.append({
"al_id": node.get("id"),
"mal_id": None,
"name": name,
"raw_name": name,
"image_url": (node.get("image") or {}).get("large"),
"positions": [edge.get("role")] if edge.get("role") else [],
"about": None,
})
if results:
self._staff_detailed_cache[tracker_id] = results
return results
# ------------------------------------------------------------------
# Public: individual character / person details
# ------------------------------------------------------------------
def get_character_details(self, char_id: "int | None") -> "dict | None":
"""Returns full details for a single AniList character."""
if char_id is None:
return None
if char_id in self._char_info_cache:
return self._char_info_cache[char_id]
try:
data = self._gql(_CHARACTER_DETAILS, {"id": char_id})
entry = (data.get("data") or {}).get("Character") or {}
except requests.RequestException:
return None
result = {
"al_id": entry.get("id"),
"mal_id": None,
"name": (entry.get("name") or {}).get("full") or "",
"image_url": (entry.get("image") or {}).get("large"),
"about": entry.get("description"),
"favorites": entry.get("favourites"),
"url": entry.get("siteUrl") or f"https://anilist.co/character/{char_id}",
}
self._char_info_cache[char_id] = result
return result
def get_person_details(self, person_id: "int | None") -> "dict | None":
"""Returns full details for a single AniList staff person."""
if person_id is None:
return None
if person_id in self._person_info_cache:
return self._person_info_cache[person_id]
try:
data = self._gql(_PERSON_DETAILS, {"id": person_id})
entry = (data.get("data") or {}).get("Staff") or {}
except requests.RequestException:
return None
# dateOfBirth: {year, month, day} → ISO string for _format_birthday
dob = entry.get("dateOfBirth") or {}
birthday: "str | None" = None
if dob.get("year"):
m = dob.get("month") or 1
d = dob.get("day") or 1
birthday = f"{dob['year']}-{m:02d}-{d:02d}"
name_obj = entry.get("name") or {}
result = {
"al_id": entry.get("id"),
"mal_id": None,
"name": name_obj.get("full") or "",
"given_name": None, # AniList does not break names into given/family
"family_name": None,
"birthday": birthday,
"image_url": (entry.get("image") or {}).get("large"),
"about": entry.get("description"),
"favorites": entry.get("favourites"),
"website_url": None, # not exposed by AniList public API
"url": entry.get("siteUrl") or f"https://anilist.co/staff/{person_id}",
}
self._person_info_cache[person_id] = result
return result
# ------------------------------------------------------------------
# Public: cache management
# ------------------------------------------------------------------
def clear_cache(self) -> None:
"""Clears all internal caches (the Singleton instance is retained)."""
self._id_cache.clear()
self._stats_cache.clear()
self._char_names_cache.clear()
self._char_detailed_cache.clear()
self._staff_detailed_cache.clear()
self._char_info_cache.clear()
self._person_info_cache.clear()
# ------------------------------------------------------------------
# Internal: rate-limited GraphQL POST
# ------------------------------------------------------------------
def _gql(self, query: str, variables: "dict | None" = None) -> dict:
"""
Rate-limited GraphQL POST request (respects AniList's 90 req/min limit).
On HTTP 429 the Retry-After header is honoured and the request is
retried once.
"""
elapsed = time.monotonic() - self._last_request_at
if elapsed < 0.7:
time.sleep(0.7 - elapsed)
payload: dict = {"query": query}
if variables:
payload["variables"] = variables
resp = self._session.post(
_ANILIST_GQL, json=payload, timeout=self.request_timeout)
self._last_request_at = time.monotonic()
if resp.status_code == 429:
retry_after = int(resp.headers.get("Retry-After", 60))
time.sleep(retry_after)
resp = self._session.post(
_ANILIST_GQL, json=payload, timeout=self.request_timeout)
self._last_request_at = time.monotonic()
resp.raise_for_status()
return resp.json()
# --------------------------------------------------------------------------
# Module helpers
# --------------------------------------------------------------------------
def _score_title(query: str, entry: dict) -> float:
"""Returns the best title-similarity score for an AniList media entry."""
title_obj = entry.get("title") or {}
candidates = [
title_obj.get("romaji") or "",
title_obj.get("english") or "",
title_obj.get("native") or "",
]
best = 0.0
q = query.lower()
for t in candidates:
if t:
ratio = difflib.SequenceMatcher(None, q, t.lower()).ratio()
best = max(best, ratio)
return best
# --------------------------------------------------------------------------
# Usage example
# --------------------------------------------------------------------------
if __name__ == "__main__":
r1 = AniListResolver()
r2 = AniListResolver()
assert r1 is r2, "AniListResolver must be a Singleton"
al_id = r1.find_id("Yofukashi no Uta")
print("AniList ID :", al_id)
stats = r1.get_stats(al_id)
if stats:
print("Score :", stats["score"])
print("Rank :", stats["rank"])
print("Members :", stats["members"])
chars = r1.get_characters_detailed(al_id)
print("Characters (first 3):", [c["name"] for c in chars[:3]])
staff = r1.get_staff_detailed(al_id)
print("Staff :", [s["name"] for s in staff])
+229
View File
@@ -0,0 +1,229 @@
"""
kavita_client.py
================
Thin HTTP client for the Kavita server REST API (v0.9.x).
Authenticates via the ``x-api-key`` header. All series / library /
collection / metadata reads and writes used by the light-novel updater
go through this single client so request shaping (paging, content types,
timeouts, retries) is consistent.
The class is intentionally state-light: no caching layer, just one
``requests.Session``. Higher-level diff / update logic lives in
KavitaSeriesUpdater, KavitaPersonUpdater and RelationshipSync.
"""
from __future__ import annotations
import base64
from typing import Iterable
import requests
class KavitaClient:
def __init__(self, base_url: str, api_key: str, *,
request_timeout: int = 30):
self._base = base_url.rstrip("/")
self._timeout = request_timeout
# API session: sends + receives JSON.
self._session = requests.Session()
self._session.headers.update({
"x-api-key": api_key,
"Accept": "application/json",
"Content-Type": "application/json",
})
# Plain session for downloading external images (covers). Must NOT
# carry the API headers — some CDNs refuse to return image bytes
# when the client sends Accept: application/json.
self._image_session = requests.Session()
self._image_session.headers.update({
"User-Agent": "KavitaLightNovelUpdater/1.0",
})
# ------------------------------------------------------------------
# Libraries
# ------------------------------------------------------------------
def list_libraries(self) -> list[dict]:
"""Returns all libraries the authenticated user can access."""
r = self._session.get(f"{self._base}/api/Library/libraries",
timeout=self._timeout)
r.raise_for_status()
return r.json() or []
# ------------------------------------------------------------------
# Series
# ------------------------------------------------------------------
def list_series_in_library(self, library_id: int, *,
page_size: int = 200) -> list[dict]:
"""
Returns all SeriesDto entries in the given library.
Uses POST /api/Series/all-v2 with a FilterV2 that scopes by
library id. Pages through until an empty page is returned.
"""
results: list[dict] = []
page = 1
while True:
body = {
"statements": [
{
"comparison": 0, # Equal
"field": 19, # Libraries field id (Kavita v0.9.x)
"value": str(library_id),
}
],
"combination": 1, # And
"sortOptions": {"isAscending": True, "sortField": 1},
"limitTo": 0,
}
r = self._session.post(
f"{self._base}/api/Series/all-v2",
params={"PageNumber": page, "PageSize": page_size},
json=body, timeout=self._timeout)
r.raise_for_status()
chunk = r.json() or []
if not chunk:
break
results.extend(chunk)
if len(chunk) < page_size:
break
page += 1
return results
def get_series(self, series_id: int) -> dict:
"""Returns the SeriesDto for the given series id."""
r = self._session.get(f"{self._base}/api/Series/{series_id}",
timeout=self._timeout)
r.raise_for_status()
return r.json() or {}
def update_series(self, series: dict) -> None:
"""Updates the Series-level data (name, sortName, malId, …)."""
r = self._session.post(f"{self._base}/api/Series/update",
json=series, timeout=self._timeout)
r.raise_for_status()
# ------------------------------------------------------------------
# Series metadata
# ------------------------------------------------------------------
def get_series_metadata(self, series_id: int) -> dict:
"""Returns the SeriesMetadataDto for a series."""
r = self._session.get(
f"{self._base}/api/Series/metadata",
params={"seriesId": series_id}, timeout=self._timeout)
r.raise_for_status()
return r.json() or {}
def update_series_metadata(self, metadata: dict) -> None:
"""
Writes a SeriesMetadataDto back to Kavita.
Kavita expects the payload wrapped: {seriesMetadata: {...}}.
"""
r = self._session.post(
f"{self._base}/api/Series/metadata",
json={"seriesMetadata": metadata},
timeout=self._timeout)
r.raise_for_status()
# ------------------------------------------------------------------
# Related series
# ------------------------------------------------------------------
def get_related(self, series_id: int) -> dict:
"""Returns all related series grouped by relation type."""
r = self._session.get(
f"{self._base}/api/Series/all-related",
params={"seriesId": series_id}, timeout=self._timeout)
r.raise_for_status()
return r.json() or {}
def update_related(self, payload: dict) -> None:
"""
Sets the related-series relationships for a series.
Payload shape (UpdateRelatedSeriesDto):
{seriesId, prequels, sequels, sideStories, spinOffs,
adaptations, characters, contains, others,
alternativeSettings, alternativeVersions, doujinshis,
editions, annuals}
Each *_ids list contains target series ids (ints).
"""
r = self._session.post(
f"{self._base}/api/Series/update-related",
json=payload, timeout=self._timeout)
r.raise_for_status()
# ------------------------------------------------------------------
# Collections
# ------------------------------------------------------------------
def list_collections(self) -> list[dict]:
"""Returns all collection tags visible to the authenticated user."""
r = self._session.get(
f"{self._base}/api/Collection",
params={"ownedOnly": "false", "sortByLastModified": "false"},
timeout=self._timeout)
r.raise_for_status()
return r.json() or []
def add_series_to_collection(self, *, collection_id: int,
title: str,
series_ids: Iterable[int]) -> dict:
"""
Adds (or creates) a collection and attaches series to it.
Pass collection_id=0 to create a new collection named `title`.
For an existing collection set collection_id to its id (title is
still required by the API but acts as no-op when the id matches).
"""
body = {
"collectionTagId": int(collection_id),
"collectionTagTitle": title,
"seriesIds": [int(s) for s in series_ids],
}
r = self._session.post(
f"{self._base}/api/Collection/update-for-series",
json=body, timeout=self._timeout)
r.raise_for_status()
try:
return r.json() or {}
except ValueError:
return {}
# ------------------------------------------------------------------
# Series cover upload
# ------------------------------------------------------------------
def upload_series_cover(self, series_id: int, image_url: str, *,
lock: bool = False) -> None:
"""
Downloads an external image and uploads it as the series cover.
Mirrors the cover-upload trick used in KavitaPersonUpdater:
Kavita's `/api/Upload/series` accepts a raw base64 blob (no
``data:`` prefix) in the ``url`` field.
"""
img = self._image_session.get(image_url, timeout=self._timeout)
img.raise_for_status()
b64 = base64.b64encode(img.content).decode()
r = self._session.post(
f"{self._base}/api/Upload/series",
json={"id": series_id, "url": b64, "lockCover": lock},
timeout=self._timeout)
r.raise_for_status()
# ------------------------------------------------------------------
# Generic GET helper (used by callers that need a response object)
# ------------------------------------------------------------------
def get(self, path: str, params: "dict | None" = None) -> requests.Response:
return self._session.get(f"{self._base}{path}",
params=params, timeout=self._timeout)
def post(self, path: str, *,
json: "dict | list | None" = None,
params: "dict | None" = None) -> requests.Response:
return self._session.post(f"{self._base}{path}",
json=json, params=params,
timeout=self._timeout)
+545
View File
@@ -0,0 +1,545 @@
"""
kavita_person_updater.py
========================
Synchronises Kavita person / character records with MyAnimeList data.
For every character and staff member that MAL knows about for a given manga
the updater:
1. Searches Kavita for a matching Person record (by name similarity /
alias match, configurable threshold).
2. Sets the MAL ID on the Kavita person if it is not yet linked.
3. Uploads the MAL profile image when the cover is not locked and has
not been set in a previous sync run.
4. Populates the description field when Kavita has none and MAL provides
an 'about' text (requires an extra Jikan request per character; only
performed when update_descriptions=True).
Kavita API version
------------------
Tested against Kavita 0.9.0.2.
Authentication
--------------
Uses the `x-api-key` header (API key from Kavita user settings).
No JWT login is required.
Relevant endpoints (Kavita 0.9.0.2)
-------------------------------------
GET /api/Person/search find persons by name / alias
POST /api/Person/update write metadata (malId, description, …)
POST /api/Upload/person set cover image (base64 data URI)
POST /api/Upload/upload-by-url download an external URL to temp storage
(used as an alternative upload path)
Cover upload flow
-----------------
The image is downloaded locally, base64-encoded, and sent as a data URI
to POST /api/Upload/person. This is more reliable than the
upload-by-url → upload/person two-step because it avoids Kavita's temp
file handling (which had known issues in 0.8.x 0.9.x, GitHub #3900).
Dependencies
------------
requests -> pip install requests
"""
from __future__ import annotations
import base64
import datetime
import difflib
import re
import requests
from MALResolver import MALResolver
from AniListResolver import AniListResolver
class KavitaPersonUpdater:
"""
Syncs Kavita Person records with MyAnimeList data.
Parameters
----------
kavita_base_url : Base URL of the Kavita server, e.g. "http://192.168.2.2:5000"
api_key : Kavita API key (Settings → User → API key)
mal_resolver : Shared MALResolver singleton (created automatically if omitted)
request_timeout : HTTP timeout in seconds for both Kavita and image requests
min_name_score : Minimum difflib similarity ratio (01) required to accept a
Kavita person as a match for a MAL name. Default 0.80.
"""
def __init__(self, kavita_base_url: str, api_key: str, *,
mal_resolver: "MALResolver | None" = None,
al_resolver: "AniListResolver | None" = None,
request_timeout: int = 30,
min_name_score: float = 0.80):
self._base = kavita_base_url.rstrip("/")
self._timeout = request_timeout
self._min_score = min_name_score
self._mal = mal_resolver or MALResolver()
self._al = al_resolver or AniListResolver()
# Session used for Kavita API calls.
self._session = requests.Session()
self._session.headers.update({
"x-api-key": api_key,
"Content-Type": "application/json",
"Accept": "application/json",
})
# Plain session used to download external images (MAL CDN etc.).
# Must NOT carry the Kavita API headers — Accept: application/json
# would prevent MAL CDN from returning the image bytes.
self._image_session = requests.Session()
self._image_session.headers.update({
"User-Agent": "KavitaPersonUpdater/1.0",
})
# Cache: normalised name -> list of PersonDto dicts (best matches first)
self._person_search_cache: dict[str, list[dict]] = {}
# ------------------------------------------------------------------
# Public: combined update
# ------------------------------------------------------------------
def update_for_manga(self, mal_manga_id: "int | None", *,
al_manga_id: "int | None" = None,
update_covers: bool = True,
update_descriptions: bool = True) -> dict:
"""
Runs a full update pass for both characters and staff of the manga.
MAL is tried first; AniList is used as fallback when MAL returns nothing.
Returns
-------
{
"characters": {"updated": n, "skipped": n, "not_found": n},
"staff": {"updated": n, "skipped": n, "not_found": n},
}
"""
return {
"characters": self.update_characters(
mal_manga_id, al_manga_id=al_manga_id,
update_covers=update_covers,
update_descriptions=update_descriptions),
"staff": self.update_staff(
mal_manga_id, al_manga_id=al_manga_id,
update_covers=update_covers,
update_descriptions=update_descriptions),
}
# ------------------------------------------------------------------
# Public: character update
# ------------------------------------------------------------------
def update_characters(self, mal_manga_id: "int | None", *,
al_manga_id: "int | None" = None,
update_covers: bool = True,
update_descriptions: bool = True) -> dict:
"""
Updates Kavita persons that match MAL/AniList characters for the manga.
MAL is tried first; AniList is the fallback when MAL returns nothing.
Returns {"updated": n, "skipped": n, "not_found": n}.
"""
entries = self._mal.get_characters_detailed(mal_manga_id) if mal_manga_id else []
resolver = self._mal
if not entries and al_manga_id:
entries = self._al.get_characters_detailed(al_manga_id)
resolver = self._al
return self._sync_entries(entries, "character", resolver,
update_covers=update_covers,
update_descriptions=update_descriptions)
# ------------------------------------------------------------------
# Public: staff update
# ------------------------------------------------------------------
def update_staff(self, mal_manga_id: "int | None", *,
al_manga_id: "int | None" = None,
update_covers: bool = True,
update_descriptions: bool = True) -> dict:
"""
Updates Kavita persons that match MAL/AniList staff for the manga.
MAL is tried first; AniList is the fallback when MAL returns nothing.
Returns {"updated": n, "skipped": n, "not_found": n}.
"""
entries = self._mal.get_staff_detailed(mal_manga_id) if mal_manga_id else []
resolver = self._mal
if not entries and al_manga_id:
entries = self._al.get_staff_detailed(al_manga_id)
resolver = self._al
return self._sync_entries(entries, "staff", resolver,
update_covers=update_covers,
update_descriptions=update_descriptions)
# ------------------------------------------------------------------
# Public: cache management
# ------------------------------------------------------------------
def clear_cache(self) -> None:
"""Clears the Kavita person search cache."""
self._person_search_cache.clear()
# ------------------------------------------------------------------
# Internal: main sync loop
# ------------------------------------------------------------------
def _sync_entries(self, entries: list[dict], kind: str, resolver, *,
update_covers: bool,
update_descriptions: bool) -> dict:
result: dict = {"updated": 0, "skipped": 0, "not_found": 0,
"errors": []}
for entry in entries:
name = (entry.get("name") or "").strip()
raw_name = (entry.get("raw_name") or "").strip()
if not name and not raw_name:
continue
# Search by the cleaned (XML-safe) name first; if Kavita stores
# the legacy comma form, retry with the raw MAL name.
matches = self._find_kavita_person(name) if name else []
if not matches and raw_name and raw_name != name:
matches = self._find_kavita_person(raw_name)
if not matches:
result["not_found"] += 1
continue
changed = self._apply_mal_data(
matches[0], entry, kind, resolver,
update_cover=update_covers,
update_desc=update_descriptions,
errors=result["errors"])
result["updated" if changed else "skipped"] += 1
return result
# ------------------------------------------------------------------
# Internal: Kavita person search
# ------------------------------------------------------------------
def _find_kavita_person(self, name: str) -> list[dict]:
"""
Searches Kavita for persons matching `name`.
Checks both the main name and any stored aliases.
Returns persons sorted by similarity, filtered by min_name_score.
Results are cached per (normalised) query name.
"""
key = name.lower().strip()
if key in self._person_search_cache:
return self._person_search_cache[key]
try:
resp = self._session.get(
f"{self._base}/api/Person/search",
params={"queryString": name},
timeout=self._timeout,
)
resp.raise_for_status()
persons: list[dict] = resp.json() or []
except requests.RequestException:
self._person_search_cache[key] = []
return []
def score(p: dict) -> float:
candidates = [p.get("name") or ""]
candidates += [a for a in (p.get("aliases") or []) if a]
best = 0.0
q = key
for c in candidates:
r = difflib.SequenceMatcher(None, q, c.lower()).ratio()
best = max(best, r)
return best
ranked = sorted(persons, key=score, reverse=True)
filtered = [p for p in ranked if score(p) >= self._min_score]
self._person_search_cache[key] = filtered
return filtered
# ------------------------------------------------------------------
# Internal: apply MAL data to a single Kavita person
# ------------------------------------------------------------------
def _apply_mal_data(self, person: dict, mal_entry: dict, kind: str,
resolver, *,
update_cover: bool, update_desc: bool,
errors: "list | None" = None) -> bool:
"""
Applies tracker data (MAL or AniList) to one Kavita person record.
Fields updated
--------------
- malId : set when the entry carries a MAL ID and it differs
- aniListId : set when the entry carries an AniList ID and it differs
- description: set when empty and the tracker provides a description
- cover image: uploaded when not locked and no prior sync cover exists
Returns True if any change was made. Failures are appended to the
`errors` list (if provided) instead of being silently swallowed.
"""
person_id: "int | None" = person.get("id")
if not person_id:
return False
person_name = person.get("name") or ""
# Tracker IDs — a MAL entry has mal_id set; an AniList entry has al_id.
mal_id: "int | None" = mal_entry.get("mal_id")
al_id: "int | None" = mal_entry.get("al_id")
entity_id = mal_id or al_id # used for resolver detail calls
current_mal_id: int = person.get("malId") or 0
current_al_id: int = person.get("aniListId") or 0
needs_mal_id = bool(mal_id and current_mal_id != mal_id)
needs_al_id = bool(al_id and current_al_id != al_id)
# ------ Lazy description fetch -----------------------------------
description: "str | None" = None
if update_desc and not (person.get("description") or "").strip():
if entity_id:
if kind == "character":
details = resolver.get_character_details(entity_id)
if details:
description = _build_character_description(details) or None
else:
details = resolver.get_person_details(entity_id)
if details:
description = _build_person_description(details) or None
needs_desc = bool(description)
# ------ Metadata update ------------------------------------------
changed = False
if needs_mal_id or needs_al_id or needs_desc:
payload: dict = {
"id": person_id,
"name": person_name,
# MUST stay a boolean — the cover image itself is uploaded
# separately via POST /api/Upload/person (below). Putting a
# URL here makes Kavita reject the whole payload with HTTP 400.
"coverImageLocked": bool(person.get("coverImageLocked", False)),
"aliases": person.get("aliases") or [],
"description": description or person.get("description"),
"malId": mal_id if needs_mal_id else (current_mal_id or None),
"aniListId": al_id if needs_al_id else (current_al_id or None),
}
try:
resp = self._session.post(
f"{self._base}/api/Person/update",
json=payload,
timeout=self._timeout,
)
resp.raise_for_status()
changed = True
except requests.RequestException as e:
if errors is not None:
errors.append(
f"Person/update failed for #{person_id} "
f"'{person_name}': {e}")
# ------ Cover image upload ----------------------------------------
# Upload whenever:
# - caller requested cover updates
# - cover is NOT locked (user did not manually pin it)
# - we have not already uploaded this exact tracker entity's image
# (i.e. the tracked ID differs OR there is no cover yet).
if update_cover and not person.get("coverImageLocked"):
image_url = mal_entry.get("image_url")
already_uploaded = (
entity_id is not None
and (current_mal_id == mal_id or current_al_id == al_id)
and bool(person.get("coverImage"))
)
if image_url and not already_uploaded:
if self._upload_cover(person_id, image_url,
person_name=person_name,
errors=errors):
changed = True
return changed
# ------------------------------------------------------------------
# Internal: cover upload
# ------------------------------------------------------------------
def _upload_cover(self, person_id: int, image_url: str,
lock: bool = False, *,
person_name: str = "",
errors: "list | None" = None) -> bool:
"""
Uploads a cover image to a Kavita person.
The image is downloaded with the plain (header-less) image session
and posted to `POST /api/Upload/person` as a raw base64 string in
the `url` field.
Notes on protocol quirks discovered against Kavita 0.9.0.2:
- The two-step `upload-by-url` -> `Upload/person` flow returns
"Unable to save cover image to Person" (HTTP 400).
- A `data:image/jpeg;base64,...` data URI is rejected with the
same error.
- Only the raw base64 blob (no prefix) is accepted.
"""
label = (f"#{person_id} '{person_name}'"
if person_name else f"#{person_id}")
# 1) Download the image with a clean session — the Kavita session's
# `Accept: application/json` header makes some CDNs refuse to
# return image bytes.
try:
img_resp = self._image_session.get(image_url,
timeout=self._timeout)
img_resp.raise_for_status()
except requests.RequestException as e:
if errors is not None:
errors.append(
f"image download failed for {label} ({image_url}): {e}")
return False
b64 = base64.b64encode(img_resp.content).decode()
# 2) POST the raw base64 blob.
try:
resp = self._session.post(
f"{self._base}/api/Upload/person",
json={"id": person_id, "url": b64, "lockCover": lock},
timeout=self._timeout,
)
if resp.status_code >= 400:
if errors is not None:
errors.append(
f"Upload/person HTTP {resp.status_code} for {label}: "
f"{_short_body(resp)}")
return False
return True
except requests.RequestException as e:
if errors is not None:
errors.append(
f"Upload/person failed for {label}: {e}")
return False
# --------------------------------------------------------------------------
# Module helpers: description builders
# --------------------------------------------------------------------------
def _plain_to_html(text: str) -> str:
"""Converts plain text with paragraph breaks to compact HTML (no raw \\n)."""
if not text:
return ""
parts: list[str] = []
for para in re.split(r"\n{2,}", text.strip()):
para = para.strip()
if para:
parts.append(f"<p>{para.replace(chr(10), '<br>')}</p>")
return "".join(parts)
def _format_birthday(birthday: str) -> str:
"""Converts an ISO 8601 birthday string to "D Month YYYY"."""
if not birthday:
return ""
try:
dt = datetime.date.fromisoformat(birthday.split("T")[0])
return f"{dt.day} {dt.strftime('%B %Y')}"
except (ValueError, AttributeError):
return ""
def _build_character_description(details: dict) -> str:
"""
Builds a Kavita-safe HTML description for a MAL character.
Top line: "Favorites: N" as a link to the character's MAL page.
Remainder: the character's `about` text converted to HTML paragraphs.
"""
parts: list[str] = []
url = details.get("url") or ""
favorites = details.get("favorites")
if url and favorites is not None:
parts.append(f'<p><a href="{url}" target="_blank">Favorites: {favorites:,}</a></p>')
about = (details.get("about") or "").strip()
if about:
parts.append(_plain_to_html(about))
return "<br>".join(parts)
def _build_person_description(details: dict) -> str:
"""
Builds a Kavita-safe HTML description for a MAL person (mangaka / staff).
Renders a summary table (given name, family name, birthday, website,
member favorites) followed by the `about` biography as HTML paragraphs.
"""
_TD = 'style="padding-right:1.5em"'
rows: list[str] = []
given = (details.get("given_name") or "").strip()
family = (details.get("family_name") or "").strip()
birthday = details.get("birthday") or ""
favorites = details.get("favorites")
website = (details.get("website_url") or "").strip()
url = (details.get("url") or "").strip()
if given:
rows.append(f"<tr><td {_TD}>Given name</td><td>{given}</td></tr>")
if family:
rows.append(f"<tr><td {_TD}>Family name</td><td>{family}</td></tr>")
bday_str = _format_birthday(birthday)
if bday_str:
rows.append(f"<tr><td {_TD}>Birthday</td><td>{bday_str}</td></tr>")
if website:
rows.append(
f'<tr><td {_TD}>Website</td>'
f'<td><a href="{website}">{website}</a></td></tr>'
)
if favorites is not None:
fav_cell = (f'<a href="{url}" target="_blank">{favorites:,}</a>' if url
else f"{favorites:,}")
rows.append(
f"<tr><td {_TD}>Member Favorites</td><td>{fav_cell}</td></tr>")
parts: list[str] = []
if rows:
parts.append(f'<table>{"".join(rows)}</table>')
about = (details.get("about") or "").strip()
if about:
parts.append(_plain_to_html(about))
return "<br>".join(parts)
# --------------------------------------------------------------------------
# Module helper
# --------------------------------------------------------------------------
def _short_body(resp: requests.Response, limit: int = 400) -> str:
"""Returns the response body trimmed to `limit` chars for error logging."""
try:
text = resp.text or ""
except Exception:
return "<unreadable response body>"
text = text.strip().replace("\n", " ").replace("\r", " ")
if len(text) > limit:
text = text[:limit] + ""
return text or "<empty body>"
# --------------------------------------------------------------------------
# Usage example
# --------------------------------------------------------------------------
if __name__ == "__main__":
KAVITA_URL = "http://192.168.2.2:5000"
KAVITA_KEY = "Sq4a3hcV171dn3gzCl0K4eN7hZNk4sOA"
updater = KavitaPersonUpdater(KAVITA_URL, KAVITA_KEY)
mal = MALResolver()
mal_id = mal.find_mal_id("よふかしのうた")
print("MAL ID:", mal_id)
if mal_id:
result = updater.update_for_manga(mal_id)
print("Characters:", {k: v for k, v in result["characters"].items()
if k != "errors"})
print("Staff :", {k: v for k, v in result["staff"].items()
if k != "errors"})
# Surface any non-fatal upload / API errors for debugging
for section in ("characters", "staff"):
for err in result[section].get("errors", []):
print(f"[{section}] {err}")
+313
View File
@@ -0,0 +1,313 @@
"""
kavita_series_updater.py
========================
Diff-based update of a single Kavita series record from a
LightNovelMetadataBuilder output dict.
Behaviour
---------
* Locked fields in Kavita (``*Locked`` flags) are never touched, no matter
what MangaBaka returns.
* Scalar fields (summary, releaseYear, ageRating, publicationStatus,
language, score, sortName, localizedName) are overwritten when the
newly-built value differs from the value currently stored in Kavita.
* List fields (genres, tags, characters, writers, coverArtists,
publishers, imprints) are diff-merged: a name appearing in the new
set but not in the current one is added (id=0 so Kavita creates the
record); a name that is in Kavita but no longer in the new set is
dropped. Comparison is case-insensitive on the ``name`` field.
* Web links are stored as a comma-separated string in Kavita; this
updater treats them as a set and re-joins on write.
* Series-level cover image (URL different from last time) is re-uploaded
whenever ``coverImageLocked`` is False. The MangaBaka cover URL is
stamped onto matches.json as ``imageUrl`` so a subsequent run can skip
the upload when nothing changed.
Returns a small diff report ({field: 'changed'/'skipped'/'locked'}) per
series so the WebApp can surface what happened.
"""
from __future__ import annotations
from typing import Iterable
from KavitaClient import KavitaClient
# Maps Kavita "list" fields on SeriesMetadataDto to (lock_flag, item_key).
# `item_key` is the dict key Kavita uses for the display name on each item:
# GenreTagDto / TagDto use "title", PersonDto uses "name".
_LIST_FIELDS: list[tuple[str, str, str]] = [
("genres", "genresLocked", "title"),
("tags", "tagsLocked", "title"),
("characters", "characterLocked", "name"),
("writers", "writerLocked", "name"),
("coverArtists", "coverArtistLocked", "name"),
("publishers", "publisherLocked", "name"),
("imprints", "imprintLocked", "name"),
]
def _norm(name: str) -> str:
return (name or "").strip().lower()
def _merge_list(
current: list[dict],
new_names: Iterable[str],
item_key: str,
) -> "tuple[list[dict], bool]":
"""
Diff-merges a Kavita list field with the canonical name list from
MangaBaka. Returns (merged_list, changed_flag).
`item_key` is the dict key Kavita uses for the display name on each
item ("title" for GenreTagDto/TagDto, "name" for PersonDto).
* Items in `current` whose display value appears in `new_names` are
kept verbatim so existing ids and ancillary fields survive.
* New names (no matching entry in `current`) are appended with
``{"id": 0, <item_key>: <name>}`` — Kavita creates the record on save.
* Items in `current` whose display value is *not* in `new_names` are
dropped.
"""
new_set = [n for n in new_names if n and n.strip()]
new_index = {_norm(n): n.strip() for n in new_set}
merged: list[dict] = []
kept_keys: set[str] = set()
for item in (current or []):
key = _norm(item.get(item_key))
if key in new_index:
merged.append(item)
kept_keys.add(key)
added = False
for key, display in new_index.items():
if key not in kept_keys:
merged.append({"id": 0, item_key: display})
added = True
removed = len(current or []) != len(kept_keys)
return merged, added or removed
def _parse_web_links(value) -> list[str]:
if not value:
return []
if isinstance(value, list):
return [str(v).strip() for v in value if v]
return [p.strip() for p in str(value).split(",") if p.strip()]
def _merge_web_links(current_str, new_links: list[str]) -> "tuple[str, bool]":
current = _parse_web_links(current_str)
new_norm = [l for l in new_links if l]
if not new_norm:
return ",".join(current), False
# Mirror MangaBaka's set: keep order from new_norm, then anything from
# current that's still in new_norm (already covered above). Anything
# in current that's not in new_norm is dropped.
new_set = set(new_norm)
merged = list(new_norm)
changed = sorted(new_set) != sorted(set(current))
return ",".join(merged), changed
class KavitaSeriesUpdater:
def __init__(self, client: KavitaClient):
self._client = client
# ------------------------------------------------------------------
# Public
# ------------------------------------------------------------------
def update_series(self, series_id: int, built: dict, *,
previous_cover_url: "str | None" = None) -> dict:
"""
Applies the diff between Kavita's current state for `series_id`
and the freshly-built MangaBaka dict. Returns a per-field diff
report.
"""
series = self._client.get_series(series_id)
metadata = self._client.get_series_metadata(series_id)
report: dict = {}
meta_changed = self._diff_metadata(metadata, built, report)
if meta_changed:
self._client.update_series_metadata(metadata)
series_changed = self._diff_series(series, built, report)
if series_changed:
self._client.update_series(series)
# Cover: only re-upload when not locked AND URL actually changed.
new_cover = built.get("coverUrl")
if (new_cover
and not series.get("coverImageLocked")
and new_cover != previous_cover_url):
try:
self._client.upload_series_cover(series_id, new_cover)
report["coverImage"] = "changed"
except Exception as exc:
report["coverImage"] = f"error: {exc}"
elif series.get("coverImageLocked"):
report["coverImage"] = "locked"
else:
report["coverImage"] = "skipped"
return report
# ------------------------------------------------------------------
# Internal: SeriesMetadataDto
# ------------------------------------------------------------------
def _diff_metadata(self, metadata: dict, built: dict,
report: dict) -> bool:
changed = False
# ----- Scalars ------------------------------------------------
# (built_key, metadata_key, locked_key, transform, skip_when_zero)
# `skip_when_zero` covers fields where 0 means "no data" rather
# than a real value (releaseYear, ageRating). publicationStatus 0
# is a valid "Ongoing" status — never skip it.
scalar_map = [
("summary", "summary", "summaryLocked", None, False),
("releaseYear", "releaseYear", "releaseYearLocked", int, True),
("ageRating", "ageRating", "ageRatingLocked", int, True),
("publicationStatus", "publicationStatus", "publicationStatusLocked", int, False),
("language", "language", "languageLocked", None, False),
]
for built_key, meta_key, locked_key, transform, skip_zero in scalar_map:
new_val = built.get(built_key)
if new_val is None or new_val == "":
report[meta_key] = "skipped"
continue
if transform is not None:
try:
new_val = transform(new_val)
except (TypeError, ValueError):
report[meta_key] = "skipped"
continue
if skip_zero and new_val == 0:
report[meta_key] = "skipped"
continue
if metadata.get(locked_key):
report[meta_key] = "locked"
continue
if metadata.get(meta_key) != new_val:
metadata[meta_key] = new_val
changed = True
report[meta_key] = "changed"
else:
report[meta_key] = "unchanged"
# ----- Web links (single comma-separated string) ---------------
# SeriesMetadataDto has no dedicated lock for webLinks — always update.
web_str, web_changed = _merge_web_links(
metadata.get("webLinks"), built.get("webLinks") or [])
if web_changed:
metadata["webLinks"] = web_str
changed = True
report["webLinks"] = "changed"
else:
report["webLinks"] = "unchanged"
# ----- List fields --------------------------------------------
list_map = {
"genres": built.get("genres"),
"tags": built.get("tags"),
"characters": built.get("characters"),
"writers": built.get("writers"),
"coverArtists": built.get("coverArtists"),
"publishers": built.get("publishers"),
"imprints": [built["imprint"]] if built.get("imprint") else [],
}
for meta_key, locked_key, item_key in _LIST_FIELDS:
new_names = list_map.get(meta_key) or []
if metadata.get(locked_key):
report[meta_key] = "locked"
continue
if not new_names and not (metadata.get(meta_key) or []):
report[meta_key] = "unchanged"
continue
merged, list_changed = _merge_list(
metadata.get(meta_key) or [], new_names, item_key)
if list_changed:
metadata[meta_key] = merged
changed = True
report[meta_key] = "changed"
else:
report[meta_key] = "unchanged"
return changed
# ------------------------------------------------------------------
# Internal: SeriesDto (sortName, userRating, tracker ids)
# ------------------------------------------------------------------
def _diff_series(self, series: dict, built: dict, report: dict) -> bool:
changed = False
# sortName / localizedName
if not series.get("sortNameLocked"):
new_sort = built.get("sortName") or ""
if new_sort and series.get("sortName") != new_sort:
series["sortName"] = new_sort
changed = True
report["sortName"] = "changed"
else:
report["sortName"] = "unchanged"
else:
report["sortName"] = "locked"
if not series.get("localizedNameLocked"):
new_loc = built.get("localizedName") or ""
if new_loc and series.get("localizedName") != new_loc:
series["localizedName"] = new_loc
changed = True
report["localizedName"] = "changed"
else:
report["localizedName"] = "unchanged"
else:
report["localizedName"] = "locked"
# Tracker ids — Kavita exposes malId, aniListId, mangaBakaId
for built_key, series_key in (
("malId", "malId"),
("anilistId", "aniListId"),
("mangabakaId", "mangaBakaId"),
):
new_val = built.get(built_key)
if new_val in (None, "", 0):
continue
try:
new_int = int(new_val)
except (TypeError, ValueError):
continue
if int(series.get(series_key) or 0) != new_int:
series[series_key] = new_int
changed = True
report[series_key] = "changed"
# userRating from MangaBaka (0..5)
new_score = built.get("score")
if new_score is not None:
try:
new_score = float(new_score)
except (TypeError, ValueError):
new_score = None
if new_score is not None:
current_score = series.get("userRating")
try:
current_score = float(current_score) if current_score is not None else None
except (TypeError, ValueError):
current_score = None
if current_score != new_score:
series["userRating"] = new_score
series["hasUserRated"] = True
changed = True
report["userRating"] = "changed"
else:
report["userRating"] = "unchanged"
return changed
+560
View File
@@ -0,0 +1,560 @@
"""
light_novel_metadata_builder.py
===============================
Fetches series-level metadata for a light novel from MangaBaka, enriches
it with MyAnimeList / AniList tracker statistics and character data, and
returns a structured dict ready to be diffed against Kavita's
SeriesMetadataDto.
Differences vs. the manga project's ComicInfoBuilder:
- No chapter / page handling — Kavita reads volumes from the files.
- No XML output — produces a plain dict.
- No MangaDex resolver — light novels don't have a chapter→volume
mapping problem.
- MangaBaka search type is fixed to ``novel`` so only light/web novels
are returned.
"""
from __future__ import annotations
import re
import requests
from MangaBakaRateLimit import apply_to_session as _apply_mangabaka_rate_limit
from MALResolver import MALResolver
from AniListResolver import AniListResolver
from MatchesCache import MatchesCache
# MangaBaka series type for the search endpoint.
_SEARCH_TYPES = ["novel"]
# MangaBaka content_rating -> Kavita AgeRating enum
# Kavita AgeRating values (from openapi.json):
# 0=Unknown, 3=Everyone, 8=Teen, 10=Mature17Plus, 13=AdultsOnly
_AGE_RATING_MAP = {
"safe": 3, # Everyone
"suggestive": 8, # Teen
"erotica": 10, # Mature17Plus
"pornographic": 13, # AdultsOnly
}
# MangaBaka status -> Kavita PublicationStatus enum
# Kavita PublicationStatus (from openapi.json):
# 0=OnGoing, 1=Hiatus, 2=Completed, 3=Cancelled, 4=Ended
_PUB_STATUS_MAP = {
"ongoing": 0,
"hiatus": 1,
"completed": 2,
"cancelled": 3,
"ended": 4,
}
# External-tracker URL templates used to enrich the web-links list.
_TRACKER_URL_TEMPLATES = {
"anilist": "https://anilist.co/manga/{id}",
"myanimelist": "https://myanimelist.net/manga/{id}",
"mal": "https://myanimelist.net/manga/{id}",
"mangaupdates": "https://www.mangaupdates.com/series.html?id={id}",
"kitsu": "https://kitsu.app/manga/{id}",
"animenewsnetwork": "https://www.animenewsnetwork.com/encyclopedia/manga.php?id={id}",
"ann": "https://www.animenewsnetwork.com/encyclopedia/manga.php?id={id}",
"animeplanet": "https://www.anime-planet.com/manga/{id}",
"shikimori": "https://shikimori.one/mangas/{id}",
"bookwalker": "https://bookwalker.jp/{id}",
}
_MD_ESCAPE_RE = re.compile(r'\\([\\`*_{}\[\]()\#+\-.!|~])')
# --------------------------------------------------------------------------
# Helpers
# --------------------------------------------------------------------------
def _normalise_key(key) -> str:
return re.sub(r"[^a-z0-9]", "", str(key).lower())
def _format_term(value: str) -> str:
return str(value).replace("_", " ").strip().title() if value else ""
def _md_to_html(text: str) -> str:
"""Converts the subset of Markdown produced by MangaBaka to compact HTML."""
if not text:
return ""
text = _MD_ESCAPE_RE.sub(r'\1', text)
text = re.sub(
r'\[([^\]]+)\]\(([^)]+)\)',
lambda m: f'<a href="{m.group(2)}">{m.group(1)}</a>',
text,
)
text = re.sub(r'\*\*(.+?)\*\*', r'<strong>\1</strong>', text, flags=re.DOTALL)
text = re.sub(r'\*(.+?)\*', r'<em>\1</em>', text, flags=re.DOTALL)
parts: list[str] = []
for para in re.split(r'\n{2,}', text.strip()):
para = para.strip()
if para:
parts.append(f"<p>{para.replace(chr(10), '<br>')}</p>")
return "".join(parts)
def pick_cover_url(cover) -> "str | None":
"""Selects the best cover URL from a MangaBaka cover object."""
if not cover:
return None
if isinstance(cover, str):
return cover
if not isinstance(cover, dict):
return None
raw = cover.get("raw")
if isinstance(raw, dict):
url = raw.get("url")
if isinstance(url, str) and url:
return url
elif isinstance(raw, str) and raw:
return raw
for size_key in ("x350", "x250", "x150"):
variant = cover.get(size_key)
if isinstance(variant, dict):
for density in ("x3", "x2", "x1"):
url = variant.get(density)
if isinstance(url, str) and url:
return url
elif isinstance(variant, str) and variant:
return variant
for val in cover.values():
if isinstance(val, str) and val.startswith("http"):
return val
if isinstance(val, dict):
for sub in val.values():
if isinstance(sub, str) and sub.startswith("http"):
return sub
return None
def pick_thumbnail_url(cover) -> "str | None":
"""Picks a small cover variant suitable for a UI thumbnail."""
if not cover:
return None
if isinstance(cover, str):
return cover
if not isinstance(cover, dict):
return None
for size_key in ("x150", "x250", "x350"):
variant = cover.get(size_key)
if isinstance(variant, dict):
for density in ("x2", "x1", "x3"):
url = variant.get(density)
if isinstance(url, str) and url:
return url
elif isinstance(variant, str) and variant:
return variant
return pick_cover_url(cover)
def _id_from_source(md: dict, *names: str) -> "int | None":
target = {_normalise_key(n) for n in names}
for raw_key, info in (md.get("source") or {}).items():
if _normalise_key(raw_key) in target and isinstance(info, dict):
mid = info.get("id")
if mid is not None:
try:
return int(mid)
except (TypeError, ValueError):
pass
return None
# --------------------------------------------------------------------------
# Builder
# --------------------------------------------------------------------------
class LightNovelMetadataBuilder:
"""
Resolves a light-novel series on MangaBaka and produces a structured
metadata dict ready to be merged into Kavita.
"""
def __init__(self, *,
api_base_url: str = "https://api.mangabaka.dev/v1",
language: str = "en",
request_timeout: int = 30,
session: "requests.Session | None" = None,
mal_resolver: "MALResolver | None" = None,
al_resolver: "AniListResolver | None" = None,
matches_cache: "MatchesCache | None" = None):
self.api_base_url = api_base_url.rstrip("/")
self.language = language
self.request_timeout = request_timeout
self._session = session or requests.Session()
self._session.headers.setdefault("User-Agent",
"LightNovelMetadataBuilder/1.0")
_apply_mangabaka_rate_limit(self._session)
self._mal = mal_resolver or MALResolver(request_timeout=request_timeout)
self._al = al_resolver or AniListResolver(request_timeout=request_timeout)
self._matches_cache = matches_cache
# ------------------------------------------------------------------
# MangaBaka search / fetch
# ------------------------------------------------------------------
def search_series(self, title: str) -> "dict | None":
"""Returns the top MangaBaka novel hit for `title`, or None."""
if not title or not title.strip():
return None
url = f"{self.api_base_url}/series/search"
try:
resp = self._session.get(
url, params={"q": title, "type": _SEARCH_TYPES,
"page": 1, "limit": 1},
timeout=self.request_timeout)
resp.raise_for_status()
except requests.RequestException:
return None
data = resp.json().get("data") or []
return data[0] if data else None
def fetch_series(self, series_id) -> "dict | None":
"""Returns the full MangaBaka series dict for the given id."""
if series_id is None or str(series_id).strip() == "":
return None
url = f"{self.api_base_url}/series/{series_id}"
resp = self._session.get(url, timeout=self.request_timeout)
resp.raise_for_status()
data = resp.json().get("data")
if data and data.get("state") == "merged" and data.get("merged_with"):
return self.fetch_series(data["merged_with"])
return data
# ------------------------------------------------------------------
# Resolve title -> MangaBaka series (caches the match)
# ------------------------------------------------------------------
def resolve(self, title: str) -> "dict | None":
"""
Returns the MangaBaka series for `title`.
Lookup order:
1. MatchesCache (uses stored mangabakaId, skips the search).
2. Fresh MangaBaka search — top hit. Result is persisted to the
cache so it survives a crash.
"""
if self._matches_cache is not None:
cached = self._matches_cache.get(title)
if cached and cached.get("mangabakaId"):
try:
series = self.fetch_series(cached["mangabakaId"])
if series:
return series
except Exception:
pass
series = self.search_series(title)
if series and self._matches_cache is not None:
self._matches_cache.upsert(
title,
mangabaka_id=series.get("id"),
mangabaka_name=series.get("title") or "",
image_url=pick_thumbnail_url(series.get("cover")),
)
return series
# ------------------------------------------------------------------
# Main entry point
# ------------------------------------------------------------------
def build(self, *, title: str = "",
mangabaka_id=None) -> "dict | None":
"""
Fetches and enriches metadata for one series, returning the
normalised dict described in the module docstring.
Pass either `title` (will resolve via cache/search) or
`mangabaka_id` (direct fetch).
"""
if mangabaka_id is not None and str(mangabaka_id).strip():
md = self.fetch_series(mangabaka_id)
else:
md = self.resolve(title)
if not md:
return None
return self._assemble(md)
# ------------------------------------------------------------------
# Internal: assemble the result dict
# ------------------------------------------------------------------
def _assemble(self, md: dict) -> dict:
mal_id = _id_from_source(md, "myanimelist", "mal")
al_id = _id_from_source(md, "anilist")
# Fall back to a title-based MAL lookup when the source map does
# not carry an id — Jikan is the only tracker that ships staff
# data we can use to enrich author / artist person records.
if mal_id is None:
mal_id = self._mal.find_mal_id(md.get("title") or "")
mal_stats = self._mal.get_stats(mal_id) if mal_id else None
characters_detailed = self._mal.get_characters_detailed(mal_id) if mal_id else []
if not characters_detailed and al_id:
characters_detailed = self._al.get_characters_detailed(al_id)
staff_detailed = self._mal.get_staff_detailed(mal_id) if mal_id else []
if not staff_detailed and al_id:
staff_detailed = self._al.get_staff_detailed(al_id)
# Character / writer name lists for SeriesMetadata
character_names = [c["name"] for c in characters_detailed
if c.get("name")]
# Writers come from MangaBaka first (authoritative for novels)
writers = list(md.get("authors") or [])
# Illustrators / artists -> CoverArtists (Kavita has no dedicated
# illustrator field, and Pencillers is the wrong semantic for
# text-only novels).
cover_artists = list(md.get("artists") or [])
# Publisher: prefer English licence, else original
publishers = self._publishers_by_type(md, "English") \
or self._publishers_by_type(md, "Original")
imprint = None
if self._publishers_by_type(md, "English") and \
self._publishers_by_type(md, "Original"):
imprint = self._publishers_by_type(md, "Original")[0] if \
self._publishers_by_type(md, "Original") else None
# Release year
release_year = None
try:
if md.get("year") is not None:
release_year = int(md["year"])
except (TypeError, ValueError):
pass
# Score: MangaBaka rating is 0..100 -> Kavita userRating is 0..5
score = None
if md.get("rating") is not None:
try:
score = round(float(md["rating"]) / 20.0, 1)
except (TypeError, ValueError):
pass
# Tags / genres come back as snake_case slugs.
genres = [_format_term(g) for g in (md.get("genres") or []) if g]
tags = [_format_term(t) for t in (md.get("tags") or []) if t]
# Web links
web_links = self._collect_web_links(md)
# Summary HTML
summary = self._build_summary(md, mal_stats)
# Cover URL
cover_url = pick_cover_url(md.get("cover"))
# Title variants
all_alt = self._collect_all_alt_titles(md)
return {
"mangabakaId": str(md.get("id") or ""),
"mangabakaTitle": md.get("title") or "",
"originalName": md.get("native_title") or "",
"localizedName": md.get("romanized_title") or "",
"sortName": self._sort_title(md),
"altTitles": all_alt,
"summary": summary,
"genres": genres,
"tags": tags,
"characters": character_names,
"writers": writers,
"coverArtists": cover_artists,
"publishers": publishers,
"imprint": imprint,
"releaseYear": release_year,
"ageRating": _AGE_RATING_MAP.get(md.get("content_rating"), 0),
"publicationStatus": _PUB_STATUS_MAP.get(
(md.get("status") or "").lower(), 0),
"language": self.language,
"webLinks": web_links,
"score": score,
"coverUrl": cover_url,
"malId": mal_id,
"anilistId": al_id,
"relationships": list(md.get("relationships_v2") or []),
"charactersDetailed": characters_detailed,
"staffDetailed": staff_detailed,
"raw": md,
}
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
@staticmethod
def _publishers_by_type(md: dict, ptype: str) -> list[str]:
return [p.get("name") for p in (md.get("publishers") or [])
if p.get("type") == ptype and p.get("name")]
def _sort_title(self, md: dict) -> str:
lang = self.language.lower()
alts = self._collect_alt_titles(md)
return alts.get(lang) or md.get("title") or ""
def _collect_alt_titles(self, md: dict) -> "dict[str, str]":
"""Returns one best title per language code (en/de/jp/romaji)."""
titles = md.get("titles") or md.get("alt_titles") or []
def pick(language_codes: tuple, prefer_trait: "str | None" = None
) -> "str | None":
best_score = -1
best_title: "str | None" = None
for entry in titles:
if not isinstance(entry, dict):
continue
lang = (entry.get("language") or entry.get("lang") or "").lower()
if lang not in language_codes:
continue
title = entry.get("title")
if not title:
continue
traits = entry.get("traits") or []
score = 0
if prefer_trait and prefer_trait in traits:
score += 4
if "official" in traits:
score += 2
if entry.get("is_primary"):
score += 1
if score > best_score:
best_score, best_title = score, title
return best_title
result: dict[str, str] = {}
kanji = pick(("ja",), prefer_trait="native") or md.get("native_title")
if kanji:
result["jp"] = kanji
romaji = pick(("ja-latn", "ja-romaji"))
if not romaji:
rt = md.get("romanized_title") or ""
if rt and all(ord(c) < 128 for c in rt):
romaji = rt
if romaji:
result["romaji"] = romaji
en = pick(("en",)) or md.get("title")
if en:
result["en"] = en
de = pick(("de",))
if de:
result["de"] = de
return result
@staticmethod
def _collect_all_alt_titles(md: dict) -> "dict[str, list[str]]":
_GROUPS = {
"en": ("en",),
"de": ("de",),
"ja": ("ja",),
"ja-romaji": ("ja-latn", "ja-romaji"),
"ko": ("ko",),
"ko-romaji": ("ko-latn", "ko-romaji"),
"zh": ("zh", "zh-hk", "zh-tw", "zh-hans", "zh-hant"),
"zh-romaji": ("zh-latn",),
}
lang_to_group = {l: g for g, ls in _GROUPS.items() for l in ls}
result: dict[str, list[str]] = {}
seen: dict[str, set] = {}
for entry in (md.get("titles") or md.get("alt_titles") or []):
if not isinstance(entry, dict):
continue
lang = (entry.get("language") or entry.get("lang") or "").lower()
group = lang_to_group.get(lang)
if not group:
continue
title = (entry.get("title") or "").strip()
if not title:
continue
result.setdefault(group, [])
seen.setdefault(group, set())
if title not in seen[group]:
result[group].append(title)
seen[group].add(title)
return result
def _collect_web_links(self, md: dict) -> list[str]:
links: list[str] = [l for l in (md.get("links") or []) if l]
for raw_key, info in (md.get("source") or {}).items():
template = _TRACKER_URL_TEMPLATES.get(_normalise_key(raw_key))
if not template or not isinstance(info, dict):
continue
source_id = info.get("id")
if source_id is not None:
links.append(template.format(id=source_id))
seen: set[str] = set()
unique: list[str] = []
for link in links:
if link not in seen:
seen.add(link)
unique.append(link)
return unique
def _build_summary(self, md: dict,
mal_stats: "dict | None") -> str:
"""Builds the HTML summary with stats table + description + alt titles."""
_TD = 'style="padding-right:1.5em"'
parts: list[str] = []
if mal_stats:
url = mal_stats.get("url", "")
as_of = mal_stats.get("as_of", "")
rows: list[str] = []
for label, key, fmt in (
("Score", "score", "{}"),
("Ranked", "rank", "#{}"),
("Scored by", "scored_by", "{:,} users"),
("Popularity","popularity", "#{}"),
("Members", "members", "{:,}"),
("Favorites", "favorites", "{:,}"),
):
v = mal_stats.get(key)
if v is None:
continue
try:
formatted = fmt.format(v)
except (TypeError, ValueError):
formatted = str(v)
rows.append(f"<tr><td {_TD}>{label}</td><td>{formatted}</td></tr>")
if rows:
link = f'<a href="{url}" target="_blank">MyAnimeList</a>' if url else "MyAnimeList"
parts.append(f"<p>{link} stats as of {as_of}:</p>"
f"<table>{''.join(rows)}</table>")
desc_raw = (md.get("description") or "").strip()
if desc_raw:
parts.append(_md_to_html(desc_raw))
all_alt = self._collect_all_alt_titles(md)
if all_alt:
label_map = {
"en": "EN",
"de": "DE",
"ja": "JA",
"ja-romaji": "JA Romaji",
"ko": "KO",
"ko-romaji": "KO Romaji",
"zh": "ZH",
"zh-romaji": "ZH Romaji",
}
alt_rows: list[str] = []
for group in ("en", "de", "ja", "ja-romaji",
"ko", "ko-romaji", "zh", "zh-romaji"):
titles = all_alt.get(group)
if not titles:
continue
cell = "<br>".join(titles)
alt_rows.append(
f"<tr><td {_TD}>{label_map[group]}</td><td>{cell}</td></tr>")
if alt_rows:
parts.append(f"<table>{''.join(alt_rows)}</table>")
return "<br>".join(parts)
+257
View File
@@ -0,0 +1,257 @@
"""
light_novel_orchestrator.py
===========================
High-level workflow on top of the resolvers, the Kavita client and the
diff-based updaters. Exposes three operations to the WebApp:
- build_matches(library_ids):
Scan one or more Kavita libraries, resolve every series against
MangaBaka and persist the match in matches.json.
- update_series(kavita_series_id):
Re-fetch MangaBaka, MAL and AniList data for a single Kavita
series and apply the diff (metadata + persons + relationships).
- update_all(library_ids):
Run update_series for every series that has a match in the
cache and lives in the given libraries.
A single shared HTTP session (rate-limited for MangaBaka) and shared
resolver singletons are used across the whole run to maximise cache
hits.
"""
from __future__ import annotations
import requests
from MangaBakaRateLimit import apply_to_session as _apply_mangabaka_rate_limit
from MALResolver import MALResolver
from AniListResolver import AniListResolver
from MatchesCache import MatchesCache
from KavitaClient import KavitaClient
from KavitaPersonUpdater import KavitaPersonUpdater
from KavitaSeriesUpdater import KavitaSeriesUpdater
from LightNovelMetadataBuilder import (
LightNovelMetadataBuilder,
pick_thumbnail_url,
)
from RelationshipSync import RelationshipSync
class LightNovelOrchestrator:
def __init__(self, *,
kavita_url: str,
kavita_api_key: str,
matches_cache: MatchesCache,
language: str = "en",
request_timeout: int = 30,
api_base_url: str = "https://api.mangabaka.dev/v1"):
self._cache = matches_cache
self._timeout = request_timeout
session = requests.Session()
session.headers.setdefault("User-Agent",
"KavitaLightNovelOrchestrator/1.0")
_apply_mangabaka_rate_limit(session)
self._session = session
self._mal = MALResolver(request_timeout=request_timeout)
self._al = AniListResolver(request_timeout=request_timeout)
self._client = KavitaClient(kavita_url, kavita_api_key,
request_timeout=request_timeout)
self._builder = LightNovelMetadataBuilder(
api_base_url=api_base_url,
language=language,
request_timeout=request_timeout,
session=session,
mal_resolver=self._mal,
al_resolver=self._al,
matches_cache=matches_cache,
)
self._series_updater = KavitaSeriesUpdater(self._client)
self._person_updater = KavitaPersonUpdater(
kavita_url, kavita_api_key,
mal_resolver=self._mal,
al_resolver=self._al,
request_timeout=request_timeout,
)
self._relation_sync = RelationshipSync(
self._client, matches_cache, builder=self._builder)
# ------------------------------------------------------------------
# Library listings
# ------------------------------------------------------------------
def list_libraries(self) -> list[dict]:
return self._client.list_libraries()
def list_series_in_libraries(self, library_ids: list[int]) -> list[dict]:
result: list[dict] = []
for lib_id in library_ids:
try:
result.extend(self._client.list_series_in_library(int(lib_id)))
except Exception as exc:
print(f"[orchestrator] library {lib_id} list failed: {exc}",
flush=True)
return result
# ------------------------------------------------------------------
# Matching
# ------------------------------------------------------------------
def build_matches(self, library_ids: list[int]) -> dict:
"""
Resolves every series in the given libraries against MangaBaka.
Series already present in matches.json keep their stored
mangabakaId; the kavitaSeriesId + libraryId fields are refreshed
in case the user moved a series between libraries.
"""
stats = {"checked": 0, "matched": 0, "skipped": 0, "missing": 0}
for series in self.list_series_in_libraries(library_ids):
title = (series.get("name") or "").strip()
if not title:
continue
stats["checked"] += 1
kavita_id = int(series.get("id") or 0)
library_id = int(series.get("libraryId") or 0)
cached = self._cache.get(title)
if cached and cached.get("mangabakaId"):
self._cache.upsert(
title,
kavita_series_id=kavita_id,
library_id=library_id,
)
stats["skipped"] += 1
continue
mb_series = self._builder.search_series(title)
if not mb_series:
self._cache.upsert(
title,
kavita_series_id=kavita_id,
library_id=library_id,
)
stats["missing"] += 1
print(f"[match] {title!r}: no MangaBaka hit", flush=True)
continue
self._cache.upsert(
title,
mangabaka_id=mb_series.get("id"),
mangabaka_name=mb_series.get("title") or "",
image_url=pick_thumbnail_url(mb_series.get("cover")),
kavita_series_id=kavita_id,
library_id=library_id,
)
stats["matched"] += 1
print(f"[match] {title!r} -> {mb_series.get('title')!r} "
f"(id={mb_series.get('id')})", flush=True)
return stats
# ------------------------------------------------------------------
# Updating
# ------------------------------------------------------------------
def update_series(self, kavita_series_id: int) -> dict:
"""Runs a full metadata update for a single Kavita series."""
hit = self._cache.get_by_kavita_id(int(kavita_series_id))
if not hit:
# Try to resolve via the Kavita series name on the fly.
series = self._client.get_series(int(kavita_series_id))
title = (series.get("name") or "").strip()
if not title:
return {"ok": False, "error": "series not in matches.json"}
built = self._builder.build(title=title)
if not built:
return {"ok": False, "error": "no MangaBaka match"}
self._cache.upsert(
title,
mangabaka_id=built.get("mangabakaId"),
mangabaka_name=built.get("mangabakaTitle"),
image_url=built.get("coverUrl"),
kavita_series_id=int(kavita_series_id),
library_id=int(series.get("libraryId") or 0),
)
cached_title = title
cached_entry = self._cache.get(title) or {}
else:
cached_title, cached_entry = hit
built = self._builder.build(mangabaka_id=cached_entry.get("mangabakaId"))
if not built:
return {"ok": False, "error": "mangabaka id no longer resolvable"}
prev_cover = cached_entry.get("imageUrl") or ""
try:
series_report = self._series_updater.update_series(
int(kavita_series_id), built,
previous_cover_url=prev_cover,
)
except Exception as exc:
return {"ok": False, "error": f"series update failed: {exc}"}
# Persons
try:
person_report = self._person_updater.update_for_manga(
built.get("malId"),
al_manga_id=built.get("anilistId"),
)
except Exception as exc:
person_report = {"error": str(exc)}
# Relationships + collection
try:
relation_report = self._relation_sync.sync(
int(kavita_series_id), built)
except Exception as exc:
relation_report = {"error": str(exc)}
# Stamp the new cover URL on the cache so the next run knows when
# to re-upload.
self._cache.upsert(
cached_title,
image_url=built.get("coverUrl") or prev_cover,
)
self._cache.mark_updated(cached_title)
return {
"ok": True,
"title": cached_title,
"mangabakaId": built.get("mangabakaId"),
"series": series_report,
"persons": person_report,
"relationships": relation_report,
}
def update_all(self, library_ids: "list[int] | None") -> dict:
"""Updates every cached series in the given libraries."""
if library_ids is None:
entries = self._cache.all()["matches"]
else:
entries = self._cache.all_in_libraries(library_ids)["matches"]
results: list[dict] = []
ok = fail = 0
for title, entry in entries.items():
ksid = int(entry.get("kavitaSeriesId") or 0)
if not ksid or not entry.get("mangabakaId"):
continue
try:
res = self.update_series(ksid)
except Exception as exc:
res = {"ok": False, "error": str(exc)}
res["title"] = title
results.append(res)
if res.get("ok"):
ok += 1
else:
fail += 1
print(f"[update] {title!r}: "
f"{'ok' if res.get('ok') else 'FAIL ' + str(res.get('error'))}",
flush=True)
return {"ok": ok, "failed": fail, "results": results}
# ------------------------------------------------------------------
# Direct helpers exposed to the WebApp
# ------------------------------------------------------------------
def fetch_series(self, mangabaka_id) -> "dict | None":
return self._builder.fetch_series(mangabaka_id)
+442
View File
@@ -0,0 +1,442 @@
"""
mal_resolver.py
===============
Fetches and caches MyAnimeList manga metadata (statistics, characters, staff)
using the public Jikan REST API v4.
Jikan API: https://api.jikan.moe/v4 (no authentication required)
Rate limit: 3 req/s, 60 req/min -> a 400 ms guard between calls is applied.
Singleton
---------
Only one instance of this class exists per process. Subsequent calls to
MALResolver() return the same object with its warm caches intact.
Provided features
-----------------
- Title-based MAL ID lookup with best-match scoring
- MAL statistics: score, rank, scored_by, popularity, members, favorites
- Character list for a manga (names only — for <Characters> XML tag)
- Detailed character list: name, MAL character ID, image URL, role
- Detailed staff list: name, MAL person ID, image URL, positions
- Lazy full-detail fetches per character / person (for descriptions)
Dependencies
------------
requests -> pip install requests
"""
from __future__ import annotations
import datetime
import difflib
import time
import requests
from MediaResolver import MediaResolver
class MALResolver(MediaResolver):
"""
Singleton: fetches and caches MAL manga data via Jikan API v4.
The first call to MALResolver() creates and initialises the instance;
all subsequent calls return the same object.
"""
_instance: "MALResolver | None" = None
# ------------------------------------------------------------------
# Singleton machinery
# ------------------------------------------------------------------
def __new__(cls, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, *, request_timeout: int = 30):
if self._initialized:
return
self.JIKAN_BASE = "https://api.jikan.moe/v4"
self.request_timeout = request_timeout
self._session = requests.Session()
self._session.headers.setdefault("User-Agent", "MALResolver/1.0")
# title_lower -> mal_id
self._id_cache: dict[str, "int | None"] = {}
# mal_id -> stats dict
self._stats_cache: dict[int, dict] = {}
# manga_mal_id -> [name_str, ...] (for ComicInfo <Characters>)
self._char_names_cache: dict[int, list[str]] = {}
# manga_mal_id -> [{mal_id, name, image_url, role}]
self._char_detailed_cache: dict[int, list[dict]] = {}
# manga_mal_id -> [{mal_id, name, image_url, positions}]
self._staff_detailed_cache: dict[int, list[dict]] = {}
# char_mal_id -> {mal_id, name, image_url, about}
self._char_info_cache: dict[int, dict] = {}
# person_mal_id -> {mal_id, name, image_url, about, website_url}
self._person_info_cache: dict[int, dict] = {}
self._last_request_at: float = 0.0
self._initialized = True
# ------------------------------------------------------------------
# Public: ID lookup
# ------------------------------------------------------------------
def find_id(self, title: str) -> "int | None":
"""MediaResolver interface — delegates to find_mal_id."""
return self.find_mal_id(title)
def find_mal_id(self, title: str) -> "int | None":
"""
Searches MAL for a manga by title and returns the best-matching MAL ID.
Returns None on failure or when no result is found.
"""
if not title or not title.strip():
return None
key = title.strip().lower()
if key in self._id_cache:
return self._id_cache[key]
try:
data = self._get(f"{self.JIKAN_BASE}/manga",
{"q": title, "limit": 5, "type": "lightnovel"})
results = data.get("data") or []
except requests.RequestException:
return None
if not results:
self._id_cache[key] = None
return None
results.sort(key=lambda e: _score_title(title, e), reverse=True)
mal_id = results[0].get("mal_id")
self._id_cache[key] = mal_id
return mal_id
# ------------------------------------------------------------------
# Public: statistics
# ------------------------------------------------------------------
def get_stats(self, mal_id: "int | None") -> "dict | None":
"""
Returns a statistics dict for the given MAL manga ID:
{score, rank, scored_by, popularity, members, favorites,
url, title, as_of (DD-MM-YYYY)}
Returns None if mal_id is None or on network failure.
"""
if mal_id is None:
return None
if mal_id in self._stats_cache:
return self._stats_cache[mal_id]
try:
data = self._get(f"{self.JIKAN_BASE}/manga/{mal_id}")
entry = data.get("data") or {}
except requests.RequestException:
return None
stats: dict = {
"score": entry.get("score"),
"rank": entry.get("rank"),
"scored_by": entry.get("scored_by"),
"popularity": entry.get("popularity"),
"members": entry.get("members"),
"favorites": entry.get("favorites"),
"url": (entry.get("url")
or f"https://myanimelist.net/manga/{mal_id}"),
"title": entry.get("title") or "",
"as_of": datetime.date.today().strftime("%d-%m-%Y"),
}
self._stats_cache[mal_id] = stats
return stats
def get_stats_for_manga(self, title: str) -> "dict | None":
"""Convenience: find MAL ID by title, then return stats."""
return self.get_stats(self.find_mal_id(title))
# ------------------------------------------------------------------
# Public: character names (for ComicInfo <Characters> tag)
# ------------------------------------------------------------------
def get_characters(self, mal_id: "int | None") -> list[str]:
"""
Returns a flat list of character names for the manga.
Used by ComicInfoBuilder to populate the <Characters> XML element.
"""
if mal_id is None:
return []
if mal_id in self._char_names_cache:
return self._char_names_cache[mal_id]
detailed = self.get_characters_detailed(mal_id)
names = [e["name"] for e in detailed if e.get("name")]
if names:
# Only cache a successful result — empty could be a transient
# API failure and we want the next call to retry.
self._char_names_cache[mal_id] = names
return names
def get_characters_for_manga(self, title: str) -> list[str]:
"""Convenience: search by title, then return character names."""
return self.get_characters(self.find_mal_id(title))
# ------------------------------------------------------------------
# Public: detailed character data (for KavitaPersonUpdater)
# ------------------------------------------------------------------
def get_characters_detailed(self, mal_id: "int | None") -> list[dict]:
"""
Returns detailed character entries for a manga:
[{mal_id, name, image_url, role, about=None}, ...]
`about` is not populated here; call get_character_details(char_mal_id)
to fetch it lazily when needed.
"""
if mal_id is None:
return []
if mal_id in self._char_detailed_cache:
return self._char_detailed_cache[mal_id]
try:
data = self._get(f"{self.JIKAN_BASE}/manga/{mal_id}/characters")
entries = data.get("data") or []
except requests.RequestException:
return []
results = []
for entry in entries:
char = entry.get("character") or {}
raw_name = char.get("name") or ""
if not raw_name:
continue
jpg = (char.get("images") or {}).get("jpg") or {}
results.append({
"mal_id": char.get("mal_id"),
# Cleaned name: "Hibino, Susuki" -> "Susuki Hibino". ComicInfo
# <Characters> is comma-separated, so commas in names would
# cause Kavita to split a single character into two persons.
"name": _clean_mal_name(raw_name),
"raw_name": raw_name,
"image_url": jpg.get("image_url") or jpg.get("small_image_url"),
"role": entry.get("role") or "Supporting",
"about": None,
})
if results:
self._char_detailed_cache[mal_id] = results
return results
# ------------------------------------------------------------------
# Public: detailed staff data (for KavitaPersonUpdater)
# ------------------------------------------------------------------
def get_staff_detailed(self, mal_id: "int | None") -> list[dict]:
"""
Returns detailed staff (author) entries for a manga:
[{mal_id, name, image_url, positions, about=None}, ...]
Jikan has no `/manga/{id}/staff` endpoint — that route only exists for
anime. For manga the authors are listed on `/manga/{id}` under
`data.authors`, but each entry only has {mal_id, name, url}; the image
URL is fetched lazily via get_person_details (cached, so the later
description fetch is free).
"""
if mal_id is None:
return []
if mal_id in self._staff_detailed_cache:
return self._staff_detailed_cache[mal_id]
try:
data = self._get(f"{self.JIKAN_BASE}/manga/{mal_id}")
entry = data.get("data") or {}
except requests.RequestException:
return []
results = []
for author in (entry.get("authors") or []):
raw_name = author.get("name") or ""
person_mal_id = author.get("mal_id")
if not raw_name or person_mal_id is None:
continue
details = self.get_person_details(person_mal_id) or {}
results.append({
"mal_id": person_mal_id,
"name": _clean_mal_name(raw_name),
"raw_name": raw_name,
"image_url": details.get("image_url"),
"positions": [],
"about": None,
})
if results:
self._staff_detailed_cache[mal_id] = results
return results
# ------------------------------------------------------------------
# Public: individual character / person details (lazy, with description)
# ------------------------------------------------------------------
def get_character_details(self, char_mal_id: "int | None") -> "dict | None":
"""
Returns full details for a single MAL character, including `about`.
Result is cached.
"""
if char_mal_id is None:
return None
if char_mal_id in self._char_info_cache:
return self._char_info_cache[char_mal_id]
try:
data = self._get(f"{self.JIKAN_BASE}/characters/{char_mal_id}")
entry = data.get("data") or {}
except requests.RequestException:
return None
jpg = (entry.get("images") or {}).get("jpg") or {}
result = {
"mal_id": entry.get("mal_id"),
"name": entry.get("name") or "",
"image_url": jpg.get("image_url") or jpg.get("small_image_url"),
"about": entry.get("about"),
"favorites": entry.get("favorites"),
"url": (entry.get("url")
or f"https://myanimelist.net/character/{char_mal_id}"),
}
self._char_info_cache[char_mal_id] = result
return result
def get_person_details(self, person_mal_id: "int | None") -> "dict | None":
"""
Returns full details for a single MAL person (staff), including `about`.
Result is cached.
"""
if person_mal_id is None:
return None
if person_mal_id in self._person_info_cache:
return self._person_info_cache[person_mal_id]
try:
data = self._get(f"{self.JIKAN_BASE}/people/{person_mal_id}")
entry = data.get("data") or {}
except requests.RequestException:
return None
jpg = (entry.get("images") or {}).get("jpg") or {}
result = {
"mal_id": entry.get("mal_id"),
"name": entry.get("name") or "",
"given_name": entry.get("given_name"),
"family_name": entry.get("family_name"),
"birthday": entry.get("birthday"),
"image_url": jpg.get("image_url") or jpg.get("small_image_url"),
"about": entry.get("about"),
"favorites": entry.get("favorites"),
"website_url": entry.get("website_url"),
"url": (entry.get("url")
or f"https://myanimelist.net/people/{person_mal_id}"),
}
self._person_info_cache[person_mal_id] = result
return result
# ------------------------------------------------------------------
# Public: cache management
# ------------------------------------------------------------------
def clear_cache(self) -> None:
"""Clears all internal caches (the Singleton instance is retained)."""
self._id_cache.clear()
self._stats_cache.clear()
self._char_names_cache.clear()
self._char_detailed_cache.clear()
self._staff_detailed_cache.clear()
self._char_info_cache.clear()
self._person_info_cache.clear()
# ------------------------------------------------------------------
# Internal: rate-limited HTTP
# ------------------------------------------------------------------
def _get(self, url: str, params: "dict | None" = None) -> dict:
"""Rate-limited GET request (respects Jikan's ~3 req/s limit)."""
elapsed = time.monotonic() - self._last_request_at
if elapsed < 0.4:
time.sleep(0.4 - elapsed)
resp = self._session.get(url, params=params, timeout=self.request_timeout)
self._last_request_at = time.monotonic()
resp.raise_for_status()
return resp.json()
# --------------------------------------------------------------------------
# Module helper
# --------------------------------------------------------------------------
def _clean_mal_name(name: str) -> str:
"""
Converts an MAL name into a comma-free, ComicInfo-safe form.
The ComicInfo <Characters> tag is comma-separated, so a single MAL
character "Hibino, Susuki" written into the XML would be parsed by
Kavita as two persons ("Hibino" and "Susuki").
Conversion:
"Hibino, Susuki" -> "Susuki Hibino" (Western: First Last)
"Yamori, Kou" -> "Kou Yamori"
"Kotoyama" -> "Kotoyama" (unchanged)
Trailing/leading commas and stray whitespace are stripped defensively.
"""
if not name:
return ""
name = name.strip()
if "," in name:
last, _, first = name.partition(",")
first = first.strip()
last = last.strip()
if first and last:
return f"{first} {last}"
# Fallback: strip any remaining commas
return name.replace(",", " ").strip()
return name
def _score_title(query: str, entry: dict) -> float:
"""Returns the best title-similarity score for a Jikan manga entry."""
candidates = [
entry.get("title") or "",
entry.get("title_english") or "",
entry.get("title_japanese") or "",
]
for alt in (entry.get("titles") or []):
candidates.append(alt.get("title") or "")
best = 0.0
q = query.lower()
for t in candidates:
if t:
ratio = difflib.SequenceMatcher(None, q, t.lower()).ratio()
best = max(best, ratio)
return best
# --------------------------------------------------------------------------
# Usage example
# --------------------------------------------------------------------------
if __name__ == "__main__":
r1 = MALResolver()
r2 = MALResolver()
assert r1 is r2, "MALResolver must be a Singleton"
mal_id = r1.find_mal_id("Yofukashi no Uta")
print("MAL ID :", mal_id)
stats = r1.get_stats(mal_id)
if stats:
print("Score :", stats["score"])
print("Rank :", stats["rank"])
chars = r1.get_characters_detailed(mal_id)
print("Characters (first 3):", [c["name"] for c in chars[:3]])
staff = r1.get_staff_detailed(mal_id)
print("Staff :", [s["name"] for s in staff])
+92
View File
@@ -0,0 +1,92 @@
"""
mangabaka_rate_limit.py
=======================
Process-wide rate limiter for the MangaBaka API.
Apply via:
from MangaBakaRateLimit import apply_to_session
apply_to_session(session)
This mounts a custom ``requests.adapters.HTTPAdapter`` on the given
``requests.Session`` for the ``api.mangabaka.dev`` host. Every request
going through that adapter is:
* throttled so that no two requests are dispatched within
``_MIN_INTERVAL`` seconds of one another, and
* retried on HTTP 429, honouring the ``Retry-After`` header when
present, otherwise exponential backoff capped at ``_MAX_BACKOFF``.
Throttle state is module-global, so even if several sessions exist in
the same process they share one budget — important because they all hit
the same upstream IP-based limit.
"""
from __future__ import annotations
import threading
import time
from requests.adapters import HTTPAdapter
# Tune these if MangaBaka tightens or loosens limits.
_MIN_INTERVAL = 1.1 # seconds between consecutive requests
_MAX_RETRIES = 6 # retries on 429 before giving up
_MAX_BACKOFF = 60.0 # cap on per-attempt backoff sleep
# --- shared throttle state --------------------------------------------------
_state_lock = threading.Lock()
_last_request_time = 0.0
def _wait_for_slot() -> None:
"""Block until the next request slot is available, then reserve it."""
global _last_request_time
while True:
with _state_lock:
now = time.monotonic()
wait = _MIN_INTERVAL - (now - _last_request_time)
if wait <= 0:
_last_request_time = now
return
time.sleep(wait)
class _MangaBakaRateLimitAdapter(HTTPAdapter):
def send(self, request, **kwargs):
response = None
for attempt in range(_MAX_RETRIES + 1):
_wait_for_slot()
response = super().send(request, **kwargs)
if response.status_code != 429:
return response
retry_after = response.headers.get("Retry-After")
try:
wait = (float(retry_after) if retry_after
else min(_MAX_BACKOFF, 2.0 * (2 ** attempt)))
except ValueError:
wait = min(_MAX_BACKOFF, 2.0 * (2 ** attempt))
print(f"[MangaBaka] 429 — backing off {wait:.1f}s "
f"(attempt {attempt + 1}/{_MAX_RETRIES})",
flush=True)
response.close()
time.sleep(wait)
# Retries exhausted — let the caller deal with the last 429.
return response
def apply_to_session(session) -> None:
"""
Mount the rate-limit adapter on ``session`` so every MangaBaka call
is automatically throttled. Safe to call multiple times (later mounts
just replace the earlier adapter for the same prefix).
"""
adapter = _MangaBakaRateLimitAdapter()
session.mount("https://api.mangabaka.dev/", adapter)
session.mount("http://api.mangabaka.dev/", adapter)
+195
View File
@@ -0,0 +1,195 @@
"""
mangabaka_works_resolver.py
===========================
Fetches volume-level (work) data from the MangaBaka API.
Each "work" is a physical tankobon volume and may carry:
- volume number
- ISBN / GTIN
- page count (used for chapter-to-volume estimation)
- release date
- cover image (raw / default / small variants)
Only works that have a usable cover are kept in the cache.
Works without a cover are discarded at fetch time.
If no volume is assigned for a chapter, callers fall back to the
default series cover from the series object itself.
Dependencies
------------
requests -> pip install requests
"""
from __future__ import annotations
import requests
class MangaBakaWorksResolver:
"""
Fetches and caches MangaBaka volume (work) data for a series.
Only works that have a cover image are retained in the cache.
"""
def __init__(self, api_base_url: str = "https://api.mangabaka.dev/v1",
request_timeout: int = 30,
session: "requests.Session | None" = None):
self.api_base_url = api_base_url.rstrip("/")
self.request_timeout = request_timeout
self._session = session or requests.Session()
self._session.headers.setdefault("User-Agent", "MangaBakaWorksResolver/1.0")
# Cache: series_id (str) -> list of work dicts (only those with covers)
self._cache: dict[str, list[dict]] = {}
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def get_works(self, series_id: str) -> list[dict]:
"""
Returns volume-level works for a series, filtered to those that have
a usable cover image. Results are cached per series.
Pages through the API (limit=50) until the response returns an empty
page, collecting all works before applying the cover filter.
"""
if not series_id:
return []
if series_id in self._cache:
return self._cache[series_id]
all_works: list[dict] = []
page = 1
try:
while True:
resp = self._session.get(
f"{self.api_base_url}/series/{series_id}/works",
params={"limit": 50, "page": page},
timeout=self.request_timeout,
)
resp.raise_for_status()
page_data = resp.json().get("data") or []
if not page_data:
break
all_works.extend(page_data)
if len(page_data) < 50:
break
page += 1
except requests.RequestException:
if not all_works:
return []
# Discard works that carry no usable cover
works_with_cover = [w for w in all_works if w.get("images")]
self._cache[series_id] = works_with_cover
return works_with_cover
def get_work_for_volume(self, series_id: str, volume) -> "dict | None":
"""
Returns the work dict for a specific volume number, or None.
Volume comparison normalises trailing ".0" (e.g. "1.0" == "1").
"""
works = self.get_works(series_id)
if not works:
return None
target = _norm_vol(volume)
for work in works:
if _norm_vol(work.get("sequence_string")) == target:
return work
return None
def get_cover_for_volume(self, series_id: str, volume) -> "str | None":
"""Returns the cover URL for a specific volume, or None if not found."""
work = self.get_work_for_volume(series_id, volume)
if not work:
return None
return self._pick_cover_url(work.get("images")[0].get("image"))
def get_page_counts(self, series_id: str) -> "dict[str, int]":
"""
Returns {volume_str: page_count} for all cached works.
Used by MangaDexVolumeResolver for chapter-to-volume estimation.
"""
result: dict[str, int] = {}
for work in self.get_works(series_id):
vol = _norm_vol(work.get("volume"))
pages = work.get("pages")
if vol and pages is not None:
try:
result[vol] = int(pages)
except (TypeError, ValueError):
pass
return result
def clear_cache(self) -> None:
"""Clears the internal works cache."""
self._cache.clear()
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
@staticmethod
def _pick_cover_url(cover) -> "str | None":
"""
Selects the best cover URL from a MangaBaka cover object.
Real API shape:
"raw": {"url": "...", "size": ..., "height": ..., "width": ...}
"x150": {"x1": "...", "x2": "...", "x3": "..."}
"x250": {...}
"x350": {...}
Order: raw original > x350@x3 > x250@x3 > x150@x3 ...
"""
if not cover:
return None
if isinstance(cover, str):
return cover
if not isinstance(cover, dict):
return None
raw = cover.get("raw")
if isinstance(raw, dict):
url = raw.get("url")
if isinstance(url, str) and url:
return url
elif isinstance(raw, str) and raw:
return raw
for size_key in ("x350", "x250", "x150"):
variant = cover.get(size_key)
if isinstance(variant, dict):
for density in ("x3", "x2", "x1"):
url = variant.get(density)
if isinstance(url, str) and url:
return url
elif isinstance(variant, str) and variant:
return variant
# Last-ditch: any HTTP URL anywhere in the structure
for val in cover.values():
if isinstance(val, str) and val.startswith("http"):
return val
if isinstance(val, dict):
for sub_val in val.values():
if isinstance(sub_val, str) and sub_val.startswith("http"):
return sub_val
return None
# --------------------------------------------------------------------------
# Module helper
# --------------------------------------------------------------------------
def _norm_vol(value) -> str:
"""Normalises a volume identifier: strips whitespace, removes trailing .0."""
text = str(value or "").strip()
try:
f = float(text)
if f.is_integer():
return str(int(f))
except ValueError:
pass
return text
+191
View File
@@ -0,0 +1,191 @@
"""
matches_cache.py
================
Persistent JSON cache that maps a Kavita series title to the MangaBaka
series it was matched against, plus enough context to update the right
Kavita record later.
Structure on disk::
{
"matches": {
"<kavita series name>": {
"mangabakaId": "12345",
"mangabakaName": "Re:Zero",
"imageUrl": "https://.../cover.jpg",
"kavitaSeriesId": 42,
"libraryId": 3,
"firstMatchTime": 1700000000,
"lastUpdateTime": 1700100000
},
...
}
}
The cache is the source of truth for the WebUI's matches table and is
written back on every mutation so a crash mid-batch does not lose
matches that were resolved in the current run.
"""
from __future__ import annotations
import json
import threading
import time
from pathlib import Path
class MatchesCache:
def __init__(self, path):
self._path = Path(path)
self._lock = threading.RLock()
self._data: dict = {"matches": {}}
self._load()
# ------------------------------------------------------------------
# Public lookup / mutation API
# ------------------------------------------------------------------
def get(self, title: str) -> "dict | None":
with self._lock:
entry = self._data["matches"].get(title)
return dict(entry) if entry else None
def get_by_kavita_id(self, kavita_series_id: int) -> "tuple[str, dict] | None":
with self._lock:
for title, entry in self._data["matches"].items():
if entry.get("kavitaSeriesId") == kavita_series_id:
return title, dict(entry)
return None
def get_by_mangabaka_id(self, mangabaka_id) -> "tuple[str, dict] | None":
target = str(mangabaka_id) if mangabaka_id is not None else ""
if not target:
return None
with self._lock:
for title, entry in self._data["matches"].items():
if str(entry.get("mangabakaId") or "") == target:
return title, dict(entry)
return None
def upsert(self, title: str, *,
mangabaka_id=None,
mangabaka_name=None,
image_url=None,
kavita_series_id=None,
library_id=None,
first_match_time=None,
last_update_time=None) -> dict:
"""
Inserts or updates an entry. Only fields passed explicitly are
modified; the rest are preserved.
"""
with self._lock:
entry = self._data["matches"].get(title)
if entry is None:
entry = {
"mangabakaId": "",
"mangabakaName": "",
"imageUrl": "",
"kavitaSeriesId": 0,
"libraryId": 0,
"firstMatchTime": int(time.time()),
"lastUpdateTime": 0,
}
self._data["matches"][title] = entry
if mangabaka_id is not None:
entry["mangabakaId"] = str(mangabaka_id)
if mangabaka_name is not None:
entry["mangabakaName"] = mangabaka_name
if image_url is not None:
entry["imageUrl"] = image_url
if kavita_series_id is not None:
try:
entry["kavitaSeriesId"] = int(kavita_series_id)
except (TypeError, ValueError):
pass
if library_id is not None:
try:
entry["libraryId"] = int(library_id)
except (TypeError, ValueError):
pass
if first_match_time is not None:
try:
entry["firstMatchTime"] = int(first_match_time)
except (TypeError, ValueError):
pass
if last_update_time is not None:
try:
entry["lastUpdateTime"] = int(last_update_time)
except (TypeError, ValueError):
pass
self._save_unlocked()
return dict(entry)
def mark_updated(self, title: str) -> None:
with self._lock:
entry = self._data["matches"].get(title)
if entry is not None:
entry["lastUpdateTime"] = int(time.time())
self._save_unlocked()
def rename(self, old_title: str, new_title: str) -> bool:
if not new_title or old_title == new_title:
return False
with self._lock:
entry = self._data["matches"].pop(old_title, None)
if entry is None:
return False
self._data["matches"][new_title] = entry
self._save_unlocked()
return True
def remove(self, title: str) -> bool:
with self._lock:
existed = title in self._data["matches"]
if existed:
del self._data["matches"][title]
self._save_unlocked()
return existed
def all(self) -> dict:
with self._lock:
return {"matches": {k: dict(v)
for k, v in self._data["matches"].items()}}
def all_in_libraries(self, library_ids: "list[int] | None") -> dict:
"""
Returns the cache filtered to entries whose libraryId is in
`library_ids`. Pass None to return everything.
"""
if library_ids is None:
return self.all()
ids = {int(i) for i in library_ids}
with self._lock:
return {"matches": {
k: dict(v) for k, v in self._data["matches"].items()
if int(v.get("libraryId") or 0) in ids
}}
# ------------------------------------------------------------------
# Internal IO
# ------------------------------------------------------------------
def _load(self) -> None:
if not self._path.is_file():
return
try:
with self._path.open("r", encoding="utf-8") as f:
loaded = json.load(f)
except (OSError, json.JSONDecodeError) as exc:
print(f"[MatchesCache] failed to load {self._path}: {exc}",
flush=True)
return
if isinstance(loaded, dict) and isinstance(loaded.get("matches"), dict):
self._data = loaded
def _save_unlocked(self) -> None:
self._path.parent.mkdir(parents=True, exist_ok=True)
tmp = self._path.with_suffix(self._path.suffix + ".tmp")
with tmp.open("w", encoding="utf-8") as f:
json.dump(self._data, f, ensure_ascii=False, indent=2)
tmp.replace(self._path)
+757
View File
@@ -0,0 +1,757 @@
"""
matches_web_app.py
==================
Flask web UI for the Kavita light-novel metadata fetcher.
Pages
-----
GET / HTML UI (matches table + actions)
Match cache (JSON)
------------------
GET /api/libraries Lists Kavita libraries
GET /api/matches Full cache, optionally filtered by libraryIds=
POST /api/matches Upsert a single match
body: {title, mangabakaId}
POST /api/matches/delete Remove a match
body: {title}
Background jobs
---------------
POST /api/build Build matches for libraries
body: {libraryIds: [int, ...]}
POST /api/update Update a single series
body: {kavitaSeriesId}
POST /api/update-all Update every cached series in libraries
body: {libraryIds: [int, ...] | null}
GET /api/status Current background job status (status, log)
"""
from __future__ import annotations
import threading
import time
from flask import Flask, jsonify, request, Response
from MatchesCache import MatchesCache
from LightNovelMetadataBuilder import pick_thumbnail_url
_INDEX_HTML = r"""<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Kavita light-novel metadata fetcher</title>
<style>
body { font-family: system-ui, sans-serif; margin: 1.5rem; background: #111; color: #eee; }
h1 { margin: 0 0 1rem; font-size: 1.4rem; }
.bar { display: flex; gap: .5rem; align-items: center; margin-bottom: 1rem; flex-wrap: wrap; }
.bar input[type=search] { padding: .3rem .5rem; min-width: 18rem; background:#222; color:#eee; border:1px solid #444; }
.bar select[multiple] { background:#222; color:#eee; border:1px solid #444; min-width: 14rem; min-height: 4.2rem; }
button { padding: .35rem .7rem; cursor: pointer; background:#2a2a2a; color:#eee; border:1px solid #555; }
button.primary { background:#2563eb; border-color:#2563eb; color:white; }
button.danger { background:#7f1d1d; border-color:#7f1d1d; color:white; }
button.success { background:#15803d; border-color:#15803d; color:white; }
button:disabled { opacity:.5; cursor:default; }
table { border-collapse: collapse; width: 100%; }
th, td { border: 1px solid #333; padding: .4rem .6rem; vertical-align: top; }
th { background: #1d1d1d; text-align: left; position: sticky; top: 0; }
th.sortable { cursor: pointer; user-select: none; }
th.sortable:hover { background:#252525; }
th .arrow { display:inline-block; width:.8em; color:#9ca3af; }
tr:nth-child(even) td { background: #161616; }
td.image img { max-width: 90px; max-height: 130px; display:block; }
td.id input { width: 12rem; padding: .25rem; background:#222; color:#eee; border:1px solid #444; font-family: monospace; }
td.title a { color: #60a5fa; text-decoration: none; }
td.title a:hover { text-decoration: underline; }
td.actions { white-space: nowrap; }
.status { margin-left: .5rem; color:#9ca3af; font-size: .9rem; }
.dirty td { background: #1f2937 !important; }
.count { color:#9ca3af; font-size:.9rem; margin-left:.5rem; }
pre.log { background:#0a0a0a; color:#9ca3af; padding:.5rem .75rem; max-height:18rem; overflow:auto; border:1px solid #333; font-size:.8rem; white-space:pre-wrap; }
label { font-size:.9rem; color:#9ca3af; }
</style>
</head>
<body>
<h1>Kavita light-novel metadata fetcher <span id="count" class="count"></span></h1>
<div class="bar">
<label>Libraries
<select id="libraries" multiple size="3"></select>
</label>
<button id="reload">Reload</button>
<button id="build">Match all in libraries</button>
<button id="updateAll" class="success">Update all in libraries</button>
<button id="batchSave" class="primary">Save dirty (0)</button>
<span class="status" id="status"></span>
</div>
<div class="bar">
<input id="filter" type="search" placeholder="Filter by title…">
<span class="count" id="jobStatus"></span>
</div>
<pre id="jobLog" class="log" hidden></pre>
<table>
<thead>
<tr>
<th class="sortable" data-col="title">Title <span class="arrow" id="arrow-title"></span></th>
<th>mangabakaId</th>
<th>mangabakaName</th>
<th>library</th>
<th class="sortable" data-col="lastUpdateTime">Last update <span class="arrow" id="arrow-lastUpdateTime"></span></th>
<th>Image</th>
<th></th>
</tr>
</thead>
<tbody id="rows"></tbody>
</table>
<script>
const MB_SEARCH = "https://mangabaka.org/search?q=";
let matchesData = {};
let librariesById = {};
let currentSort = { col: "title", asc: true };
let jobPollHandle = null;
function fmtTime(unix) {
if (!unix) return "";
const d = new Date(unix * 1000);
return d.toLocaleString();
}
function setStatus(msg) { document.getElementById("status").textContent = msg; }
function selectedLibraryIds() {
const sel = document.getElementById("libraries");
return Array.from(sel.selectedOptions).map(o => parseInt(o.value, 10));
}
function updateDirtyCount() {
const n = document.querySelectorAll("#rows tr.dirty").length;
const btn = document.getElementById("batchSave");
btn.textContent = "Save dirty (" + n + ")";
btn.disabled = n === 0;
}
function makeRow(title, e) {
const tr = document.createElement("tr");
tr.dataset.title = title;
// Title — links to MangaBaka search
const titleTd = document.createElement("td");
titleTd.className = "title";
const a = document.createElement("a");
a.href = MB_SEARCH + encodeURIComponent(title) + "&type=novel";
a.target = "_blank";
a.rel = "noopener";
a.textContent = title;
titleTd.appendChild(a);
tr.appendChild(titleTd);
// mangabakaId (editable)
const idTd = document.createElement("td");
idTd.className = "id";
const idInp = document.createElement("input");
idInp.value = e.mangabakaId || "";
idInp.dataset.original = e.mangabakaId || "";
idInp.addEventListener("input", () => {
if (idInp.value !== idInp.dataset.original) tr.classList.add("dirty");
else tr.classList.remove("dirty");
updateDirtyCount();
});
idTd.appendChild(idInp);
tr.appendChild(idTd);
// mangabakaName
const nameTd = document.createElement("td");
nameTd.textContent = e.mangabakaName || "";
tr.appendChild(nameTd);
// library
const libTd = document.createElement("td");
const libId = e.libraryId || 0;
libTd.textContent = librariesById[libId] || (libId ? "#" + libId : "");
tr.appendChild(libTd);
// lastUpdateTime
const timeTd = document.createElement("td");
timeTd.textContent = e.lastUpdateTime ? fmtTime(e.lastUpdateTime) : "";
tr.appendChild(timeTd);
// Image
const imgTd = document.createElement("td");
imgTd.className = "image";
const img = document.createElement("img");
img.src = e.imageUrl || "";
img.alt = "";
img.loading = "lazy";
imgTd.appendChild(img);
tr.appendChild(imgTd);
// Actions
const actTd = document.createElement("td");
actTd.className = "actions";
const save = document.createElement("button");
save.textContent = "Save";
save.className = "primary";
save.addEventListener("click", () => saveRow(tr));
actTd.appendChild(save);
const update = document.createElement("button");
update.textContent = "Update";
update.className = "success";
update.style.marginLeft = ".25rem";
update.disabled = !e.kavitaSeriesId;
update.title = e.kavitaSeriesId
? "Push metadata to Kavita series #" + e.kavitaSeriesId
: "Run a Match cycle first so we know the Kavita series id";
update.addEventListener("click", () => updateRow(tr));
actTd.appendChild(update);
const del = document.createElement("button");
del.textContent = "Delete";
del.className = "danger";
del.style.marginLeft = ".25rem";
del.addEventListener("click", () => deleteRow(tr));
actTd.appendChild(del);
tr.appendChild(actTd);
tr._idInp = idInp;
tr._nameTd = nameTd;
tr._img = img;
tr._timeTd = timeTd;
tr._update = update;
return tr;
}
async function saveRow(tr) {
const title = tr.dataset.title;
const newId = tr._idInp.value.trim();
setStatus("Saving " + title + "");
try {
const r = await fetch("/api/matches", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ title: title, mangabakaId: newId }),
});
if (!r.ok) throw new Error(await r.text());
const data = await r.json();
const entry = data.entry || {};
matchesData[title] = entry;
tr._idInp.value = entry.mangabakaId || "";
tr._idInp.dataset.original = entry.mangabakaId || "";
tr._nameTd.textContent = entry.mangabakaName || "";
tr._img.src = entry.imageUrl || "";
tr.classList.remove("dirty");
updateDirtyCount();
setStatus("Saved " + title);
return true;
} catch (err) {
setStatus("Save failed (" + title + "): " + err.message);
return false;
}
}
async function deleteRow(tr) {
const title = tr.dataset.title;
if (!confirm("Delete " + title + "?")) return;
setStatus("Deleting " + title + "");
try {
const r = await fetch("/api/matches/delete", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ title: title }),
});
if (!r.ok) throw new Error(await r.text());
delete matchesData[title];
tr.remove();
document.getElementById("count").textContent =
"(" + Object.keys(matchesData).length + " entries)";
setStatus("Deleted");
} catch (err) {
setStatus("Delete failed: " + err.message);
}
}
async function updateRow(tr) {
const title = tr.dataset.title;
const entry = matchesData[title] || {};
if (!entry.kavitaSeriesId) {
setStatus("No kavitaSeriesId for " + title + " — run match first");
return;
}
setStatus("Updating " + title + "");
tr._update.disabled = true;
try {
const r = await fetch("/api/update", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ kavitaSeriesId: entry.kavitaSeriesId }),
});
if (!r.ok) throw new Error(await r.text());
const res = await r.json();
setStatus(res.ok ? "Updated " + title : "Update failed: " + res.error);
if (res.ok) {
entry.lastUpdateTime = Math.floor(Date.now() / 1000);
tr._timeTd.textContent = fmtTime(entry.lastUpdateTime);
}
} catch (err) {
setStatus("Update failed: " + err.message);
} finally {
tr._update.disabled = false;
}
}
async function batchSave() {
const dirty = Array.from(document.querySelectorAll("#rows tr.dirty"));
if (dirty.length === 0) return;
if (!confirm("Save " + dirty.length + " changed row(s)?")) return;
setStatus("Batch saving " + dirty.length + " rows…");
let ok = 0, fail = 0;
for (const tr of dirty) {
const success = await saveRow(tr);
if (success) ok++; else fail++;
}
setStatus("Batch: " + ok + " ok, " + fail + " failed");
}
function sortedTitles() {
const titles = Object.keys(matchesData);
const dir = currentSort.asc ? 1 : -1;
if (currentSort.col === "title") {
return titles.sort((a, b) => a.localeCompare(b) * dir);
}
if (currentSort.col === "lastUpdateTime") {
return titles.sort((a, b) => {
const av = matchesData[a].lastUpdateTime || 0;
const bv = matchesData[b].lastUpdateTime || 0;
return (av - bv) * dir;
});
}
return titles;
}
function updateSortArrows() {
for (const a of document.querySelectorAll("th .arrow")) a.textContent = "";
const id = "arrow-" + currentSort.col;
const el = document.getElementById(id);
if (el) el.textContent = currentSort.asc ? "" : "";
}
function applyFilter() {
const q = document.getElementById("filter").value.toLowerCase();
const libs = new Set(selectedLibraryIds());
for (const tr of document.querySelectorAll("#rows tr")) {
const title = tr.dataset.title;
const entry = matchesData[title] || {};
const titleMatch = title.toLowerCase().includes(q);
const libMatch = libs.size === 0 || libs.has(entry.libraryId || 0);
tr.style.display = (titleMatch && libMatch) ? "" : "none";
}
}
function render() {
const tbody = document.getElementById("rows");
tbody.innerHTML = "";
for (const t of sortedTitles()) {
tbody.appendChild(makeRow(t, matchesData[t]));
}
updateSortArrows();
applyFilter();
updateDirtyCount();
document.getElementById("count").textContent =
"(" + Object.keys(matchesData).length + " entries)";
}
async function loadLibraries() {
try {
const r = await fetch("/api/libraries");
const data = await r.json();
const libs = data.libraries || [];
const defaults = new Set(data.defaults || []);
librariesById = {};
const sel = document.getElementById("libraries");
sel.innerHTML = "";
for (const lib of libs) {
librariesById[lib.id] = lib.name;
const opt = document.createElement("option");
opt.value = lib.id;
opt.textContent = lib.name + " (#" + lib.id + ")";
if (defaults.has(lib.id)) opt.selected = true;
sel.appendChild(opt);
}
} catch (err) {
setStatus("Failed to load libraries: " + err.message);
}
}
async function load() {
setStatus("Loading…");
try {
const r = await fetch("/api/matches");
const data = await r.json();
matchesData = data.matches || {};
render();
setStatus(Object.keys(matchesData).length + " entries");
} catch (err) {
setStatus("Load failed: " + err.message);
}
}
async function pollJob() {
try {
const r = await fetch("/api/status");
const s = await r.json();
const jobStatus = document.getElementById("jobStatus");
const jobLog = document.getElementById("jobLog");
if (!s.running && !s.lastFinished) {
jobStatus.textContent = "";
jobLog.hidden = true;
stopPolling();
return;
}
jobLog.hidden = false;
jobLog.textContent = (s.log || []).join("\n");
jobLog.scrollTop = jobLog.scrollHeight;
if (s.running) {
jobStatus.textContent = "Running: " + (s.label || "");
} else {
jobStatus.textContent = "Done: " + (s.label || "");
stopPolling();
load();
}
} catch (err) {
/* keep polling silently */
}
}
function startPolling() {
if (jobPollHandle) return;
jobPollHandle = setInterval(pollJob, 1000);
pollJob();
}
function stopPolling() {
if (jobPollHandle) clearInterval(jobPollHandle);
jobPollHandle = null;
}
async function startBuild() {
const libs = selectedLibraryIds();
if (libs.length === 0) {
setStatus("Pick at least one library");
return;
}
if (!confirm("Match every series in " + libs.length + " library(ies)?")) return;
setStatus("Build started");
try {
const r = await fetch("/api/build", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ libraryIds: libs }),
});
if (!r.ok) throw new Error(await r.text());
startPolling();
} catch (err) {
setStatus("Build failed: " + err.message);
}
}
async function startUpdateAll() {
const libs = selectedLibraryIds();
if (libs.length === 0) {
if (!confirm("No libraries selected — update every cached series?")) return;
} else if (!confirm("Update every cached series in " + libs.length + " library(ies)?")) {
return;
}
setStatus("Update-all started");
try {
const r = await fetch("/api/update-all", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ libraryIds: libs.length ? libs : null }),
});
if (!r.ok) throw new Error(await r.text());
startPolling();
} catch (err) {
setStatus("Update-all failed: " + err.message);
}
}
document.getElementById("filter").addEventListener("input", applyFilter);
document.getElementById("libraries").addEventListener("change", applyFilter);
document.getElementById("reload").addEventListener("click", load);
document.getElementById("batchSave").addEventListener("click", batchSave);
document.getElementById("build").addEventListener("click", startBuild);
document.getElementById("updateAll").addEventListener("click", startUpdateAll);
for (const th of document.querySelectorAll("th.sortable")) {
th.addEventListener("click", () => {
const col = th.dataset.col;
if (currentSort.col === col) currentSort.asc = !currentSort.asc;
else { currentSort.col = col; currentSort.asc = true; }
render();
});
}
(async () => {
await loadLibraries();
await load();
// Resume polling if there's a job running from a previous session
pollJob();
})();
</script>
</body>
</html>
"""
class _JobState:
"""Thread-safe container for the current background job's progress."""
def __init__(self):
self._lock = threading.Lock()
self._running = False
self._label = ""
self._log: list[str] = []
self._last_finished_at = 0
self._thread: "threading.Thread | None" = None
def start(self, label: str, target, *args, **kwargs) -> bool:
with self._lock:
if self._running:
return False
self._running = True
self._label = label
self._log = [f"[{time.strftime('%H:%M:%S')}] {label} started"]
def runner():
try:
target(self, *args, **kwargs)
except Exception as exc:
self.append(f"FATAL: {exc}")
finally:
with self._lock:
self._running = False
self._last_finished_at = int(time.time())
self.append(f"[{time.strftime('%H:%M:%S')}] finished")
self._thread = threading.Thread(target=runner,
name=f"job:{label}",
daemon=True)
self._thread.start()
return True
def append(self, line: str) -> None:
with self._lock:
self._log.append(line)
# Cap log length so the response stays bounded.
if len(self._log) > 1000:
self._log = self._log[-800:]
def snapshot(self) -> dict:
with self._lock:
return {
"running": self._running,
"label": self._label,
"log": list(self._log),
"lastFinished": self._last_finished_at,
}
class MatchesWebApp:
def __init__(self, cache: MatchesCache, *,
orchestrator=None,
default_library_ids: "list[int] | None" = None,
host: str = "0.0.0.0",
port: int = 8080):
self._cache = cache
self._orchestrator = orchestrator
self._defaults = list(default_library_ids or [])
self._host = host
self._port = port
self._job = _JobState()
self._app = Flask(__name__)
self._thread: "threading.Thread | None" = None
self._register_routes()
@property
def app(self) -> Flask:
return self._app
def start(self) -> threading.Thread:
if self._thread is not None and self._thread.is_alive():
return self._thread
self._thread = threading.Thread(
target=self._app.run,
kwargs={"host": self._host, "port": self._port,
"debug": False, "use_reloader": False,
"threaded": True},
name="MatchesWebApp",
daemon=False,
)
self._thread.start()
print(f"[MatchesWebApp] listening on {self._host}:{self._port}",
flush=True)
return self._thread
def wait(self) -> None:
if self._thread is not None:
self._thread.join()
# ------------------------------------------------------------------
# Routes
# ------------------------------------------------------------------
def _register_routes(self) -> None:
app = self._app
cache = self._cache
@app.get("/")
def index() -> Response:
return Response(_INDEX_HTML, mimetype="text/html; charset=utf-8")
@app.get("/api/libraries")
def api_libraries():
if self._orchestrator is None:
return jsonify([])
try:
libs = self._orchestrator.list_libraries()
except Exception as exc:
return Response(f"libraries failed: {exc}", status=502)
return jsonify({"libraries": libs, "defaults": self._defaults})
@app.get("/api/matches")
def api_list():
raw = request.args.get("libraryIds") or ""
lib_ids = [int(p) for p in raw.split(",") if p.strip().isdigit()]
if lib_ids:
return jsonify(cache.all_in_libraries(lib_ids))
return jsonify(cache.all())
@app.post("/api/matches")
def api_upsert():
body = request.get_json(silent=True) or {}
title = (body.get("title") or "").strip()
if not title:
return Response("title is required", status=400)
new_id_raw = body.get("mangabakaId")
new_id = str(new_id_raw).strip() if new_id_raw is not None else ""
if not new_id:
return Response("mangabakaId is required", status=400)
new_name: "str | None" = None
new_image: "str | None" = None
if self._orchestrator is not None:
try:
series = self._orchestrator.fetch_series(new_id)
except Exception as exc:
return Response(f"resolve failed: {exc}", status=502)
if not series:
return Response(
f"MangaBaka has no series with id {new_id}",
status=404)
new_name = series.get("title") or ""
new_image = pick_thumbnail_url(series.get("cover")) or ""
entry = cache.upsert(
title,
mangabaka_id=new_id,
mangabaka_name=new_name,
image_url=new_image,
)
return jsonify({"title": title, "entry": entry})
@app.post("/api/matches/delete")
def api_delete():
body = request.get_json(silent=True) or {}
title = (body.get("title") or "").strip()
if not title:
return Response("title is required", status=400)
removed = cache.remove(title)
return jsonify({"removed": removed, "title": title})
@app.post("/api/build")
def api_build():
if self._orchestrator is None:
return Response("no orchestrator configured", status=503)
body = request.get_json(silent=True) or {}
library_ids = [int(i) for i in (body.get("libraryIds") or [])
if str(i).strip().lstrip("-").isdigit()]
if not library_ids:
return Response("libraryIds required", status=400)
label = f"match libraries {library_ids}"
def task(job: _JobState, lib_ids):
stats = self._orchestrator.build_matches(lib_ids)
job.append(f"matched={stats.get('matched')} "
f"skipped={stats.get('skipped')} "
f"missing={stats.get('missing')} "
f"checked={stats.get('checked')}")
if not self._job.start(label, task, library_ids):
return Response("a job is already running", status=409)
return jsonify({"started": label})
@app.post("/api/update")
def api_update():
if self._orchestrator is None:
return Response("no orchestrator configured", status=503)
body = request.get_json(silent=True) or {}
ksid = body.get("kavitaSeriesId")
try:
ksid_int = int(ksid)
except (TypeError, ValueError):
return Response("kavitaSeriesId required", status=400)
try:
res = self._orchestrator.update_series(ksid_int)
except Exception as exc:
return Response(f"update failed: {exc}", status=500)
return jsonify(res)
@app.post("/api/update-all")
def api_update_all():
if self._orchestrator is None:
return Response("no orchestrator configured", status=503)
body = request.get_json(silent=True) or {}
raw = body.get("libraryIds")
library_ids: "list[int] | None"
if raw is None:
library_ids = None
else:
library_ids = [int(i) for i in raw
if str(i).strip().lstrip("-").isdigit()]
label = ("update all (every library)" if library_ids is None
else f"update all in libraries {library_ids}")
def task(job: _JobState, lib_ids):
summary = self._orchestrator.update_all(lib_ids)
job.append(f"ok={summary.get('ok')} failed={summary.get('failed')}")
for res in summary.get("results", []):
title = res.get("title", "?")
if res.get("ok"):
flags = []
sr = res.get("series") or {}
for k, v in sr.items():
if v == "changed":
flags.append(k)
job.append(
f" {title}: changed=[{', '.join(flags) or '-'}]")
else:
job.append(f" {title}: FAIL {res.get('error')}")
if not self._job.start(label, task, library_ids):
return Response("a job is already running", status=409)
return jsonify({"started": label})
@app.get("/api/status")
def api_status():
snap = self._job.snapshot()
snap["defaults"] = self._defaults
return jsonify(snap)
+91
View File
@@ -0,0 +1,91 @@
"""
media_resolver.py
=================
Abstract base class for tracker-specific manga metadata resolvers.
Concrete implementations (MALResolver, AniListResolver) must implement
every abstract method, ensuring a uniform interface regardless of the
underlying data source (Jikan/MAL, AniList GraphQL, …).
"""
from __future__ import annotations
from abc import ABC, abstractmethod
class MediaResolver(ABC):
"""
Abstract base for tracker-specific manga metadata resolvers.
Subclasses connect to a specific tracker API and expose a common
interface for:
- Searching a manga by title → tracker-specific numeric ID
- Fetching summary statistics (score, rank, popularity, …)
- Listing characters and staff (name-only and detailed forms)
- Fetching full details for a single character or person
Methods that accept a tracker ID treat None as "unknown" and return
a safe empty value rather than raising.
"""
@abstractmethod
def find_id(self, title: str) -> "int | None":
"""
Searches the tracker for a manga by title.
Returns the best-matching tracker ID, or None on failure.
"""
@abstractmethod
def get_stats(self, tracker_id: "int | None") -> "dict | None":
"""
Returns a statistics dict for the given tracker ID:
{score, rank, scored_by, popularity, members, favorites,
url, title, as_of (DD-MM-YYYY)}
Returns None if tracker_id is None or on network failure.
"""
@abstractmethod
def get_characters(self, tracker_id: "int | None") -> "list[str]":
"""
Returns a flat list of character name strings for the manga.
Used to populate the ComicInfo <Characters> XML element.
"""
@abstractmethod
def get_characters_detailed(self, tracker_id: "int | None") -> "list[dict]":
"""
Returns detailed character entries for a manga:
[{id, name, image_url, role, about=None, ...}, ...]
'about' is not populated here; call get_character_details() lazily.
"""
@abstractmethod
def get_staff_detailed(self, tracker_id: "int | None") -> "list[dict]":
"""
Returns detailed staff/author entries for a manga:
[{id, name, image_url, positions, about=None, ...}, ...]
'about' is not populated here; call get_person_details() lazily.
"""
@abstractmethod
def get_character_details(self, char_id: "int | None") -> "dict | None":
"""
Returns full details for a single character, including description.
Implementations should cache the result.
"""
@abstractmethod
def get_person_details(self, person_id: "int | None") -> "dict | None":
"""
Returns full details for a single person (staff), including description.
Implementations should cache the result.
"""
@abstractmethod
def clear_cache(self) -> None:
"""Clears all internal caches."""
+174
View File
@@ -0,0 +1,174 @@
"""
relationship_sync.py
====================
Mirrors MangaBaka's ``relationships_v2`` graph into Kavita:
1. Every related MangaBaka series that is *also* present in Kavita
(resolved via MatchesCache) is added to a shared Kavita collection
so the whole franchise can be browsed in one place.
2. Series-level relationships (prequel / sequel / spin-off / …) are
written via ``POST /api/Series/update-related`` so navigating
between entries surfaces the right neighbours.
Only relationships where both endpoints exist in Kavita are written.
Relationships pointing to series that have not been imported yet are
silently skipped (the next match run picks them up).
"""
from __future__ import annotations
from KavitaClient import KavitaClient
from MatchesCache import MatchesCache
# MangaBaka relation_type -> Kavita UpdateRelatedSeriesDto bucket
_RELATION_MAP = {
"prequel": "prequels",
"sequel": "sequels",
"side_story": "sideStories",
"spin_off": "spinOffs",
"spinoff": "spinOffs",
"alternative_version": "alternativeVersions",
"alternative_story": "alternativeVersions",
"alternative_setting": "alternativeSettings",
"adapted_from": "adaptations",
"adaptation": "adaptations",
"doujinshi": "doujinshis",
"parent": "contains", # the parent "contains" the child
}
_ALL_BUCKETS = (
"adaptations", "characters", "contains", "others",
"prequels", "sequels", "sideStories", "spinOffs",
"alternativeSettings", "alternativeVersions", "doujinshis",
"editions", "annuals",
)
class RelationshipSync:
def __init__(self, client: KavitaClient, cache: MatchesCache, *,
builder=None):
"""
Parameters
----------
client : KavitaClient for collection / relation writes.
cache : MatchesCache to resolve mangabakaId -> kavitaSeriesId.
builder : optional LightNovelMetadataBuilder used to fetch parent
series titles when picking the collection name.
"""
self._client = client
self._cache = cache
self._builder = builder
# ------------------------------------------------------------------
# Public
# ------------------------------------------------------------------
def sync(self, kavita_series_id: int, built: dict) -> dict:
"""
Applies the relationship and collection links described by
`built["relationships"]` (raw MangaBaka relationships_v2 list)
for the given Kavita series. Returns a small status dict.
"""
report: dict = {"relations": {}, "collection": None,
"missing_series": []}
relationships = built.get("relationships") or []
if not relationships:
return report
# Resolve mangabakaId -> kavitaSeriesId for every related entry.
related: dict[str, list[int]] = {b: [] for b in _ALL_BUCKETS}
all_kavita_ids: set[int] = set()
for rel in relationships:
mb_id = rel.get("to_series_id")
if mb_id is None:
continue
hit = self._cache.get_by_mangabaka_id(mb_id)
if not hit:
report["missing_series"].append(int(mb_id))
continue
_title, entry = hit
ksid = int(entry.get("kavitaSeriesId") or 0)
if not ksid:
report["missing_series"].append(int(mb_id))
continue
bucket = _RELATION_MAP.get((rel.get("relation_type") or "").lower(),
"others")
if ksid not in related[bucket]:
related[bucket].append(ksid)
all_kavita_ids.add(ksid)
# ----- Relationships ------------------------------------------
if any(related.values()):
payload = {"seriesId": int(kavita_series_id)}
for bucket in _ALL_BUCKETS:
payload[bucket] = related[bucket]
try:
self._client.update_related(payload)
report["relations"] = {k: v for k, v in related.items() if v}
except Exception as exc:
report["relations"] = {"error": str(exc)}
# ----- Collection ---------------------------------------------
# Include the current series in the collection so it shows up too.
all_kavita_ids.add(int(kavita_series_id))
if len(all_kavita_ids) >= 2:
collection_name = self._collection_name(built, relationships)
collection_id = self._find_collection_id(collection_name)
try:
self._client.add_series_to_collection(
collection_id=collection_id,
title=collection_name,
series_ids=sorted(all_kavita_ids),
)
report["collection"] = collection_name
except Exception as exc:
report["collection"] = f"error: {exc}"
return report
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _find_collection_id(self, name: str) -> int:
"""Returns the id of an existing collection by title, or 0 to create."""
if not name:
return 0
target = name.strip().lower()
try:
for col in self._client.list_collections():
if (col.get("title") or "").strip().lower() == target:
try:
return int(col.get("id") or 0)
except (TypeError, ValueError):
return 0
except Exception:
pass
return 0
def _collection_name(self, built: dict,
relationships: list[dict]) -> str:
"""
Picks the collection name. Uses the parent series title from
MangaBaka if the current series has one; otherwise falls back to
the current series' own title.
"""
for rel in relationships:
if (rel.get("relation_type") or "").lower() == "parent":
parent_id = rel.get("to_series_id")
if parent_id is not None and self._builder is not None:
try:
parent_md = self._builder.fetch_series(parent_id)
if parent_md and parent_md.get("title"):
return parent_md["title"]
except Exception:
pass
# Even without a builder, the cache may know the parent.
hit = self._cache.get_by_mangabaka_id(parent_id)
if hit:
_title, entry = hit
name = entry.get("mangabakaName")
if name:
return name
return built.get("mangabakaTitle") or ""