Docker stuff and folder watcher
This commit is contained in:
@@ -0,0 +1,211 @@
|
||||
"""
|
||||
suwayomi_folder_watcher.py
|
||||
==========================
|
||||
|
||||
Watches a Suwayomi download directory and triggers SuwayomiMover.process_series
|
||||
for a series once it has been quiet for a configurable settle period
|
||||
(default 10 minutes). This prevents the mover from running while Suwayomi
|
||||
is still downloading additional chapters of the same series.
|
||||
|
||||
Behaviour
|
||||
---------
|
||||
* Recursively watches `<suwayomi_path>/<source>/<title>/<chapter>/<file>`.
|
||||
* Every filesystem event maps back to its series (the 2nd-level directory)
|
||||
and (re)starts that series' 10-minute timer.
|
||||
* When the timer fires, the series title is queued for processing.
|
||||
* A single worker thread drains the queue, so series are processed
|
||||
sequentially — this avoids hammering rate-limited tracker APIs and
|
||||
sidesteps thread-safety issues in the resolver singletons.
|
||||
* Filesystem events generated while a series is already being processed
|
||||
are ignored (otherwise the mover's own rmtree of source chapter folders
|
||||
would re-schedule the series immediately).
|
||||
|
||||
Dependencies
|
||||
------------
|
||||
watchdog -> pip install watchdog
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import queue
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
from watchdog.events import FileSystemEventHandler
|
||||
from watchdog.observers import Observer
|
||||
|
||||
from SuwayomiMover import SuwayomiMover
|
||||
|
||||
|
||||
def _now() -> str:
|
||||
return datetime.now().isoformat(timespec="seconds")
|
||||
|
||||
|
||||
class SuwayomiFolderWatcher:
|
||||
"""
|
||||
Watches `suwayomi_path` recursively and triggers `mover.process_series`
|
||||
on the corresponding series whenever the series folder has been quiet
|
||||
for `settle_seconds`.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
suwayomi_path : Root of the Suwayomi download tree (same value passed
|
||||
to SuwayomiMover).
|
||||
mover : The SuwayomiMover instance to use for processing.
|
||||
settle_seconds : Quiet period before a series is processed. Each new
|
||||
filesystem event resets the per-series timer.
|
||||
Default: 600 (10 minutes).
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
suwayomi_path,
|
||||
mover: SuwayomiMover,
|
||||
*,
|
||||
settle_seconds: int = 600):
|
||||
self._suwayomi_path = Path(suwayomi_path).resolve()
|
||||
self._mover = mover
|
||||
self._settle = settle_seconds
|
||||
|
||||
self._lock = threading.Lock()
|
||||
self._timers: dict[str, threading.Timer] = {}
|
||||
self._queued: set[str] = set()
|
||||
self._processing: set[str] = set()
|
||||
|
||||
self._queue: "queue.Queue[str]" = queue.Queue()
|
||||
self._stop = threading.Event()
|
||||
|
||||
self._observer = Observer()
|
||||
self._worker = threading.Thread(
|
||||
target=self._worker_loop, name="SuwayomiWorker", daemon=True)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Public API
|
||||
# ------------------------------------------------------------------
|
||||
def start(self) -> None:
|
||||
"""Starts the observer and worker thread. Non-blocking."""
|
||||
if not self._suwayomi_path.is_dir():
|
||||
raise FileNotFoundError(
|
||||
f"Suwayomi path does not exist: {self._suwayomi_path}")
|
||||
|
||||
handler = _Handler(self._on_event)
|
||||
self._observer.schedule(handler, str(self._suwayomi_path), recursive=True)
|
||||
self._observer.start()
|
||||
self._worker.start()
|
||||
print(f"[{_now()}] [watcher] watching {self._suwayomi_path} "
|
||||
f"(settle={self._settle}s)", flush=True)
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Stops the observer, cancels all pending timers and joins the worker."""
|
||||
print(f"[{_now()}] [watcher] stopping…", flush=True)
|
||||
self._stop.set()
|
||||
self._observer.stop()
|
||||
self._observer.join()
|
||||
with self._lock:
|
||||
for t in self._timers.values():
|
||||
t.cancel()
|
||||
self._timers.clear()
|
||||
# Worker loop checks `_stop` every second between queue polls.
|
||||
self._worker.join(timeout=5)
|
||||
print(f"[{_now()}] [watcher] stopped", flush=True)
|
||||
|
||||
def wait(self) -> None:
|
||||
"""Blocks the calling thread until stop() is invoked."""
|
||||
self._stop.wait()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Event handling
|
||||
# ------------------------------------------------------------------
|
||||
def _on_event(self, src_path: str) -> None:
|
||||
series_title = self._series_from_path(Path(src_path))
|
||||
if not series_title:
|
||||
return
|
||||
|
||||
with self._lock:
|
||||
# Ignore events generated by the mover's own rmtree during
|
||||
# processing — otherwise every delete restarts the timer.
|
||||
if series_title in self._processing:
|
||||
return
|
||||
|
||||
self._schedule(series_title)
|
||||
|
||||
def _series_from_path(self, path: Path) -> "str | None":
|
||||
"""Maps an event path to its series title (2nd-level directory)."""
|
||||
try:
|
||||
rel = path.resolve().relative_to(self._suwayomi_path)
|
||||
except ValueError:
|
||||
return None
|
||||
parts = rel.parts
|
||||
# Expected layout: <source>/<title>/<chapter>/<file>
|
||||
# Anything shallower than <source>/<title> can't be mapped to a series.
|
||||
if len(parts) < 2:
|
||||
return None
|
||||
return parts[1]
|
||||
|
||||
def _schedule(self, series_title: str) -> None:
|
||||
with self._lock:
|
||||
old = self._timers.get(series_title)
|
||||
if old is not None:
|
||||
old.cancel()
|
||||
timer = threading.Timer(
|
||||
self._settle, self._fire, args=(series_title,))
|
||||
timer.daemon = True
|
||||
self._timers[series_title] = timer
|
||||
timer.start()
|
||||
print(f"[{_now()}] [watcher] {series_title}: "
|
||||
f"timer (re)started ({self._settle}s)", flush=True)
|
||||
|
||||
def _fire(self, series_title: str) -> None:
|
||||
with self._lock:
|
||||
self._timers.pop(series_title, None)
|
||||
if series_title in self._processing or series_title in self._queued:
|
||||
return
|
||||
self._queued.add(series_title)
|
||||
self._queue.put(series_title)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Worker
|
||||
# ------------------------------------------------------------------
|
||||
def _worker_loop(self) -> None:
|
||||
while not self._stop.is_set():
|
||||
try:
|
||||
series_title = self._queue.get(timeout=1)
|
||||
except queue.Empty:
|
||||
continue
|
||||
|
||||
with self._lock:
|
||||
self._queued.discard(series_title)
|
||||
self._processing.add(series_title)
|
||||
|
||||
print(f"[{_now()}] [watcher] {series_title}: processing", flush=True)
|
||||
try:
|
||||
result = self._mover.process_series(series_title)
|
||||
ok = sum(1 for c in result.get("chapters", []) if c.get("ok"))
|
||||
failed = sum(1 for c in result.get("chapters", []) if not c.get("ok"))
|
||||
print(f"[{_now()}] [watcher] {series_title}: "
|
||||
f"done ({ok} ok, {failed} failed)", flush=True)
|
||||
except FileNotFoundError:
|
||||
# Series folder no longer exists — happens if the user / Suwayomi
|
||||
# cleaned it up between event and timer expiry. Nothing to do.
|
||||
print(f"[{_now()}] [watcher] {series_title}: "
|
||||
f"folder gone, skipping", flush=True)
|
||||
except Exception as exc:
|
||||
print(f"[{_now()}] [watcher] {series_title}: "
|
||||
f"ERROR {exc}", flush=True)
|
||||
finally:
|
||||
with self._lock:
|
||||
self._processing.discard(series_title)
|
||||
|
||||
|
||||
class _Handler(FileSystemEventHandler):
|
||||
"""Forwards every event src_path to a single callback."""
|
||||
|
||||
def __init__(self, callback):
|
||||
super().__init__()
|
||||
self._callback = callback
|
||||
|
||||
def on_any_event(self, event):
|
||||
# Both file and directory events count — series-level activity
|
||||
# of any kind should reset the timer.
|
||||
self._callback(event.src_path)
|
||||
+60
-1
@@ -161,6 +161,60 @@ def _al_id_from_metadata(md: dict) -> "int | None":
|
||||
return None
|
||||
|
||||
|
||||
def _chapter_image_size(chapter_dir: Path) -> int:
|
||||
"""Returns the total file size of all images in a chapter folder."""
|
||||
return sum(
|
||||
f.stat().st_size
|
||||
for f in chapter_dir.iterdir()
|
||||
if f.is_file() and f.suffix.lower() in _IMAGE_EXTS
|
||||
)
|
||||
|
||||
|
||||
def _deduplicate_chapters(
|
||||
chapter_items: list[tuple[Path, dict, str]],
|
||||
) -> tuple[list[tuple[Path, dict, str]], list[Path]]:
|
||||
"""
|
||||
When multiple chapter folders share the exact same chapter number
|
||||
(e.g. two folders for chapter "2" — not "2" vs "2.2"), keeps only the
|
||||
one with the highest total image file size, which is a reliable proxy
|
||||
for image quality.
|
||||
|
||||
Chapter number comes from ComicInfo.xml <Number>; comparison is an exact
|
||||
string match so "2" and "2.2" are never considered duplicates.
|
||||
|
||||
Returns
|
||||
-------
|
||||
kept : deduplicated chapter_items list (original sort order preserved)
|
||||
rejected : Path list of lower-quality duplicate folders to be removed
|
||||
"""
|
||||
best: dict[str, tuple[Path, dict, str]] = {}
|
||||
best_size: dict[str, int] = {}
|
||||
rejected: list[Path] = []
|
||||
|
||||
for item in chapter_items:
|
||||
chapter_dir, fields, chapter_num = item
|
||||
size = _chapter_image_size(chapter_dir)
|
||||
|
||||
if chapter_num not in best:
|
||||
best[chapter_num] = item
|
||||
best_size[chapter_num] = size
|
||||
elif size > best_size[chapter_num]:
|
||||
prev_dir = best[chapter_num][0]
|
||||
print(f" [dup] ch.{chapter_num}: replacing {prev_dir.name!r} "
|
||||
f"({best_size[chapter_num]:,}B) with {chapter_dir.name!r} "
|
||||
f"({size:,}B) — higher quality")
|
||||
rejected.append(prev_dir)
|
||||
best[chapter_num] = item
|
||||
best_size[chapter_num] = size
|
||||
else:
|
||||
print(f" [dup] ch.{chapter_num}: skipping {chapter_dir.name!r} "
|
||||
f"({size:,}B), keeping {best[chapter_num][0].name!r} "
|
||||
f"({best_size[chapter_num]:,}B)")
|
||||
rejected.append(chapter_dir)
|
||||
|
||||
return list(best.values()), rejected
|
||||
|
||||
|
||||
def _extract_chapter_num(folder_name: str) -> "str | None":
|
||||
"""
|
||||
Fallback: extracts chapter number from the folder name.
|
||||
@@ -325,6 +379,11 @@ class SuwayomiMover:
|
||||
continue
|
||||
chapter_items.append((chapter_dir, fields, chapter_num))
|
||||
|
||||
chapter_items, rejected_dirs = _deduplicate_chapters(chapter_items)
|
||||
if self._delete_source:
|
||||
for d in rejected_dirs:
|
||||
shutil.rmtree(d, ignore_errors=True)
|
||||
|
||||
# <Series> from the first chapter's XML → strip source labels → clean title
|
||||
# for the MangaBaka search. Folder name is the last resort.
|
||||
raw_series = manga_title
|
||||
@@ -436,7 +495,7 @@ if __name__ == "__main__":
|
||||
)
|
||||
|
||||
# Process a single series
|
||||
result = mover.process_series("Yofukashi no Uta_ Rakuen-hen")
|
||||
result = mover.process_series("Yofukashi no Uta")
|
||||
ok = sum(1 for c in result["chapters"] if c["ok"])
|
||||
failed = sum(1 for c in result["chapters"] if not c["ok"])
|
||||
print(f"\nDone: {ok} ok, {failed} failed")
|
||||
|
||||
Reference in New Issue
Block a user