Files
manga-mover-and-metadata-co…/src/MatchesCache.py
T
johannesbot 2f30ac4e05
Build and Deploy / build (push) Successful in 26s
Build and Deploy / deploy (push) Successful in 41s
matches double key fix
2026-06-06 20:18:11 +02:00

219 lines
8.0 KiB
Python

"""
matches_cache.py
================
Persistent JSON cache that maps a normalised (lowercase) search title to the
MangaBaka series it was matched against.
Structure on disk::
{
"matches": {
"<normalised lowercase key>": {
"folderTitle": "Original Folder Name",
"mangabakaId": "12345",
"mangabakaName": "One-Punch Man",
"imageUrl": "https://.../cover.jpg",
"firstMatchTime": 1700000000
},
...
}
}
Keys are always stored lowercase so that folder names differing only in
capitalisation (e.g. "[Oshi No Ko]" vs "[oshi no ko]") are treated as
identical entries. The original casing is preserved in the ``folderTitle``
field and is used for display purposes (e.g. the web UI title link).
The cache is consulted by ComicInfoBuilder before issuing a MangaBaka
search request, and is written back to disk on every mutation so a crash
does not lose matches that were resolved in the current run.
"""
from __future__ import annotations
import json
import threading
import time
from pathlib import Path
def _norm_key(title: str) -> str:
"""Normalises a cache key to lowercase for case-insensitive deduplication."""
return title.lower()
class MatchesCache:
def __init__(self, path):
self._path = Path(path)
self._lock = threading.RLock()
self._data: dict = {"matches": {}}
self._load()
# ------------------------------------------------------------------
# Public lookup / mutation API
# ------------------------------------------------------------------
def get(self, title: str) -> "dict | None":
with self._lock:
entry = self._data["matches"].get(_norm_key(title))
return dict(entry) if entry else None
def add(self, title: str, *,
mangabaka_id,
mangabaka_name: str,
image_url: "str | None") -> dict:
entry = {
"folderTitle": title,
"mangabakaId": str(mangabaka_id) if mangabaka_id is not None else "",
"mangabakaName": mangabaka_name or "",
"imageUrl": image_url or "",
"firstMatchTime": int(time.time()),
}
with self._lock:
self._data["matches"][_norm_key(title)] = entry
self._save_unlocked()
return dict(entry)
def upsert(self, title: str, *,
mangabaka_id=None,
mangabaka_name=None,
image_url=None,
first_match_time=None) -> dict:
norm = _norm_key(title)
with self._lock:
entry = self._data["matches"].get(norm)
if entry is None:
entry = {
"folderTitle": title,
"mangabakaId": "",
"mangabakaName": "",
"imageUrl": "",
"firstMatchTime": int(time.time()),
}
self._data["matches"][norm] = entry
# folderTitle is only set on creation; preserve original casing on updates.
if mangabaka_id is not None:
entry["mangabakaId"] = str(mangabaka_id)
if mangabaka_name is not None:
entry["mangabakaName"] = mangabaka_name
if image_url is not None:
entry["imageUrl"] = image_url
if first_match_time is not None:
try:
entry["firstMatchTime"] = int(first_match_time)
except (TypeError, ValueError):
pass
self._save_unlocked()
return dict(entry)
def rename(self, old_title: str, new_title: str) -> bool:
old_norm = _norm_key(old_title)
new_norm = _norm_key(new_title)
if not new_title or old_norm == new_norm:
return False
with self._lock:
entry = self._data["matches"].pop(old_norm, None)
if entry is None:
return False
entry["folderTitle"] = new_title
self._data["matches"][new_norm] = entry
self._save_unlocked()
return True
def remove(self, title: str) -> bool:
norm = _norm_key(title)
with self._lock:
existed = norm in self._data["matches"]
if existed:
del self._data["matches"][norm]
self._save_unlocked()
return existed
def all(self) -> dict:
with self._lock:
return {"matches": {k: dict(v)
for k, v in self._data["matches"].items()}}
# ------------------------------------------------------------------
# Internal IO
# ------------------------------------------------------------------
def _load(self) -> None:
if not self._path.is_file():
return
try:
with self._path.open("r", encoding="utf-8") as f:
loaded = json.load(f)
except (OSError, json.JSONDecodeError) as exc:
print(f"[MatchesCache] failed to load {self._path}: {exc}",
flush=True)
return
if not isinstance(loaded, dict) or not isinstance(loaded.get("matches"), dict):
return
normalized, changed = self._normalize_on_load(loaded["matches"])
loaded["matches"] = normalized
self._data = loaded
if changed:
print(f"[MatchesCache] migrated {changed} entr{'y' if changed == 1 else 'ies'} "
f"(lowercase keys / folderTitle), saving", flush=True)
self._save_unlocked()
@staticmethod
def _normalize_on_load(raw: dict) -> "tuple[dict, int]":
"""
Normalises the raw matches dict loaded from disk.
- Keys are lowercased.
- ``folderTitle`` is added from the original key when missing.
- Duplicate keys (same normalised form) are merged by keeping the
entry with the higher ``firstMatchTime``.
Returns (normalised_dict, number_of_changed_entries).
"""
result: dict = {}
changed = 0
for orig_key, entry in raw.items():
if not isinstance(entry, dict):
continue
norm = _norm_key(orig_key)
entry = dict(entry)
# Add folderTitle if absent
if "folderTitle" not in entry:
entry["folderTitle"] = orig_key
changed += 1
if norm != orig_key:
changed += 1
# Merge duplicates: keep data from the more recent entry, but
# prefer the folderTitle that contains uppercase letters (= the
# original folder name) regardless of which entry is newer.
if norm in result:
existing = result[norm]
if entry.get("firstMatchTime", 0) > existing.get("firstMatchTime", 0):
# Newer entry wins for data; preserve better-cased folderTitle
existing_ft = existing.get("folderTitle", norm)
new_ft = entry.get("folderTitle", norm)
if existing_ft != existing_ft.lower() and new_ft == new_ft.lower():
entry["folderTitle"] = existing_ft
result[norm] = entry
else:
# Existing entry stays; but adopt new folderTitle if it has casing
existing_ft = existing.get("folderTitle", norm)
new_ft = entry.get("folderTitle", norm)
if new_ft != new_ft.lower() and existing_ft == existing_ft.lower():
existing["folderTitle"] = new_ft
else:
result[norm] = entry
return result, changed
def _save_unlocked(self) -> None:
self._path.parent.mkdir(parents=True, exist_ok=True)
tmp = self._path.with_suffix(self._path.suffix + ".tmp")
with tmp.open("w", encoding="utf-8") as f:
json.dump(self._data, f, ensure_ascii=False, indent=2)
tmp.replace(self._path)