"""Hauptservice: Hotfolder via watchdog, ThreadPool für PDF-Verarbeitung.""" from __future__ import annotations import logging import shutil import signal import threading import time from concurrent.futures import Future, ThreadPoolExecutor from pathlib import Path from watchdog.events import FileSystemEvent, FileSystemEventHandler from watchdog.observers import Observer from .config import Config from .processor import ProcessResult, process_pdf from .uploaders import notify_email, upload_folder, upload_nextcloud, upload_sftp log = logging.getLogger(__name__) class PreflightError(RuntimeError): """Erforderliche externe Binaries fehlen.""" # Pflicht-Binaries für ocrmypdf _REQUIRED_BINARIES = ("tesseract", "gs") def check_preflight() -> None: """Prüft, ob alle externen Abhängigkeiten (Tesseract, Ghostscript) installiert sind. Wirft PreflightError mit Liste der fehlenden Binaries. """ missing = [b for b in _REQUIRED_BINARIES if shutil.which(b) is None] if missing: raise PreflightError( "Fehlende Abhängigkeiten: " + ", ".join(missing) + ". Bitte installieren: sudo apt install tesseract-ocr ghostscript" ) def _is_pdf(path: Path) -> bool: return path.suffix.lower() == ".pdf" and path.is_file() def _wait_until_stable(path: Path, checks: int = 3, interval: float = 1.0) -> bool: """Wartet bis Datei nicht mehr wächst (Scanner schreibt mehrmals).""" last = -1 stable_count = 0 for _ in range(60): # max ~60s try: size = path.stat().st_size except FileNotFoundError: return False if size == last and size > 0: stable_count += 1 if stable_count >= checks: return True else: stable_count = 0 last = size time.sleep(interval) return False class _Handler(FileSystemEventHandler): def __init__(self, service: "HotfolderService") -> None: self.service = service def on_created(self, event: FileSystemEvent) -> None: if not event.is_directory: self.service.enqueue(Path(event.src_path)) def on_moved(self, event: FileSystemEvent) -> None: if not event.is_directory: self.service.enqueue(Path(event.dest_path)) def on_closed(self, event: FileSystemEvent) -> None: if not event.is_directory: self.service.enqueue(Path(event.src_path)) class HotfolderService: def __init__(self, cfg: Config) -> None: self.cfg = cfg self._executor = ThreadPoolExecutor( max_workers=cfg.ocr.max_workers, thread_name_prefix="ocr", ) self._observer: Observer | None = None self._stop = threading.Event() self._inflight: set[str] = set() self._lock = threading.Lock() self._success_count = 0 self._error_count = 0 @property def success_count(self) -> int: return self._success_count @property def error_count(self) -> int: return self._error_count # ---- Setup ---- def ensure_dirs(self) -> None: for p in (self.cfg.paths.incoming, self.cfg.paths.outgoing, self.cfg.paths.working, self.cfg.paths.error): p.mkdir(parents=True, exist_ok=True) # ---- Lifecycle ---- def run(self) -> None: check_preflight() self.ensure_dirs() self._scan_existing() self._observer = Observer() self._observer.schedule(_Handler(self), str(self.cfg.paths.incoming), recursive=False) self._observer.start() log.info("Hotfolder läuft. Watching: %s", self.cfg.paths.incoming) signal.signal(signal.SIGTERM, lambda *_: self._stop.set()) signal.signal(signal.SIGINT, lambda *_: self._stop.set()) try: while not self._stop.is_set(): self._stop.wait(1.0) finally: self.shutdown() def run_once(self) -> int: """Verarbeitet alle bereits im incoming-Ordner liegenden PDFs und beendet sich. Returns: Anzahl fehlgeschlagener PDFs (0 = alles ok). """ check_preflight() self.ensure_dirs() self._scan_existing() self._executor.shutdown(wait=True) log.info("One-shot fertig: %d ok, %d Fehler", self._success_count, self._error_count) return self._error_count def shutdown(self) -> None: log.info("Shutdown läuft...") if self._observer: self._observer.stop() self._observer.join(timeout=5) self._executor.shutdown(wait=True, cancel_futures=False) log.info("Shutdown ok.") # ---- Queue ---- def _scan_existing(self) -> None: """Beim Start: bereits liegende PDFs aufgreifen.""" for p in self.cfg.paths.incoming.iterdir(): if _is_pdf(p): self.enqueue(p) def enqueue(self, path: Path) -> None: if not _is_pdf(path): return key = str(path.resolve()) with self._lock: if key in self._inflight: return self._inflight.add(key) fut = self._executor.submit(self._process, path) fut.add_done_callback(lambda f, k=key: self._done(k, f)) def _done(self, key: str, fut: Future) -> None: with self._lock: self._inflight.discard(key) exc = fut.exception() if exc: log.exception("Worker-Exception", exc_info=exc) # ---- Processing ---- def _process(self, path: Path) -> None: if not _wait_until_stable(path): log.warning("Datei nicht stabilisiert, überspringe: %s", path) return if not path.exists(): return result: ProcessResult = process_pdf( src=path, working_dir=self.cfg.paths.working, outgoing_dir=self.cfg.paths.outgoing, error_dir=self.cfg.paths.error, ocr_cfg=self.cfg.ocr, vera_cfg=self.cfg.verapdf, ) with self._lock: if result.success: self._success_count += 1 else: self._error_count += 1 if result.success: self._dispatch_uploads(result.output) self._notify(result) def _dispatch_uploads(self, pdf: Path) -> None: upload_folder(pdf, self.cfg.folder, self.cfg.paths.outgoing) if self.cfg.nextcloud.enabled: upload_nextcloud(pdf, self.cfg.nextcloud) if self.cfg.sftp.enabled: upload_sftp(pdf, self.cfg.sftp) def _notify(self, result: ProcessResult) -> None: if result.success: subject = f"[pdf-ocr] OK: {result.source.name}" body = f"Datei verarbeitet: {result.output}\n" if result.verapdf_passed is not None: body += f"veraPDF: {'PASS' if result.verapdf_passed else 'FAIL'}\n" else: subject = f"[pdf-ocr] FEHLER: {result.source.name}" body = f"Fehler beim Verarbeiten von {result.source}\n\n{result.error}\n" notify_email(self.cfg.email, subject, body, result.success)