76c3a991df
Komplettes Rewrite des alten Bash-Tools `pdf-tool` in Python. - ocrmypdf als Library, watchdog für Hotfolder, ThreadPool für Parallelität - Upload-Targets: folder, Nextcloud (WebDAV), SFTP - E-Mail-Notify, optional veraPDF - Interaktiver Installer mit Service-User-Support (lokal + AD via SSSD) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
174 lines
5.6 KiB
Python
174 lines
5.6 KiB
Python
"""Hauptservice: Hotfolder via watchdog, ThreadPool für PDF-Verarbeitung."""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import signal
|
|
import threading
|
|
import time
|
|
from concurrent.futures import Future, ThreadPoolExecutor
|
|
from pathlib import Path
|
|
|
|
from watchdog.events import FileSystemEvent, FileSystemEventHandler
|
|
from watchdog.observers import Observer
|
|
|
|
from .config import Config
|
|
from .processor import ProcessResult, process_pdf
|
|
from .uploaders import notify_email, upload_folder, upload_nextcloud, upload_sftp
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def _is_pdf(path: Path) -> bool:
|
|
return path.suffix.lower() == ".pdf" and path.is_file()
|
|
|
|
|
|
def _wait_until_stable(path: Path, checks: int = 3, interval: float = 1.0) -> bool:
|
|
"""Wartet bis Datei nicht mehr wächst (Scanner schreibt mehrmals)."""
|
|
last = -1
|
|
stable_count = 0
|
|
for _ in range(60): # max ~60s
|
|
try:
|
|
size = path.stat().st_size
|
|
except FileNotFoundError:
|
|
return False
|
|
if size == last and size > 0:
|
|
stable_count += 1
|
|
if stable_count >= checks:
|
|
return True
|
|
else:
|
|
stable_count = 0
|
|
last = size
|
|
time.sleep(interval)
|
|
return False
|
|
|
|
|
|
class _Handler(FileSystemEventHandler):
|
|
def __init__(self, service: "HotfolderService") -> None:
|
|
self.service = service
|
|
|
|
def on_created(self, event: FileSystemEvent) -> None:
|
|
if not event.is_directory:
|
|
self.service.enqueue(Path(event.src_path))
|
|
|
|
def on_moved(self, event: FileSystemEvent) -> None:
|
|
if not event.is_directory:
|
|
self.service.enqueue(Path(event.dest_path))
|
|
|
|
def on_closed(self, event: FileSystemEvent) -> None:
|
|
if not event.is_directory:
|
|
self.service.enqueue(Path(event.src_path))
|
|
|
|
|
|
class HotfolderService:
|
|
def __init__(self, cfg: Config) -> None:
|
|
self.cfg = cfg
|
|
self._executor = ThreadPoolExecutor(
|
|
max_workers=cfg.ocr.max_workers,
|
|
thread_name_prefix="ocr",
|
|
)
|
|
self._observer: Observer | None = None
|
|
self._stop = threading.Event()
|
|
self._inflight: set[str] = set()
|
|
self._lock = threading.Lock()
|
|
|
|
# ---- Setup ----
|
|
|
|
def _ensure_dirs(self) -> None:
|
|
for p in (self.cfg.paths.incoming, self.cfg.paths.outgoing,
|
|
self.cfg.paths.working, self.cfg.paths.error):
|
|
p.mkdir(parents=True, exist_ok=True)
|
|
|
|
# ---- Lifecycle ----
|
|
|
|
def run(self) -> None:
|
|
self._ensure_dirs()
|
|
self._scan_existing()
|
|
|
|
self._observer = Observer()
|
|
self._observer.schedule(_Handler(self), str(self.cfg.paths.incoming), recursive=False)
|
|
self._observer.start()
|
|
log.info("Hotfolder läuft. Watching: %s", self.cfg.paths.incoming)
|
|
|
|
signal.signal(signal.SIGTERM, lambda *_: self._stop.set())
|
|
signal.signal(signal.SIGINT, lambda *_: self._stop.set())
|
|
|
|
try:
|
|
while not self._stop.is_set():
|
|
self._stop.wait(1.0)
|
|
finally:
|
|
self.shutdown()
|
|
|
|
def shutdown(self) -> None:
|
|
log.info("Shutdown läuft...")
|
|
if self._observer:
|
|
self._observer.stop()
|
|
self._observer.join(timeout=5)
|
|
self._executor.shutdown(wait=True, cancel_futures=False)
|
|
log.info("Shutdown ok.")
|
|
|
|
# ---- Queue ----
|
|
|
|
def _scan_existing(self) -> None:
|
|
"""Beim Start: bereits liegende PDFs aufgreifen."""
|
|
for p in self.cfg.paths.incoming.iterdir():
|
|
if _is_pdf(p):
|
|
self.enqueue(p)
|
|
|
|
def enqueue(self, path: Path) -> None:
|
|
if not _is_pdf(path):
|
|
return
|
|
key = str(path.resolve())
|
|
with self._lock:
|
|
if key in self._inflight:
|
|
return
|
|
self._inflight.add(key)
|
|
fut = self._executor.submit(self._process, path)
|
|
fut.add_done_callback(lambda f, k=key: self._done(k, f))
|
|
|
|
def _done(self, key: str, fut: Future) -> None:
|
|
with self._lock:
|
|
self._inflight.discard(key)
|
|
exc = fut.exception()
|
|
if exc:
|
|
log.exception("Worker-Exception", exc_info=exc)
|
|
|
|
# ---- Processing ----
|
|
|
|
def _process(self, path: Path) -> None:
|
|
if not _wait_until_stable(path):
|
|
log.warning("Datei nicht stabilisiert, überspringe: %s", path)
|
|
return
|
|
if not path.exists():
|
|
return
|
|
|
|
result: ProcessResult = process_pdf(
|
|
src=path,
|
|
working_dir=self.cfg.paths.working,
|
|
outgoing_dir=self.cfg.paths.outgoing,
|
|
error_dir=self.cfg.paths.error,
|
|
ocr_cfg=self.cfg.ocr,
|
|
vera_cfg=self.cfg.verapdf,
|
|
)
|
|
|
|
if result.success:
|
|
self._dispatch_uploads(result.output)
|
|
self._notify(result)
|
|
|
|
def _dispatch_uploads(self, pdf: Path) -> None:
|
|
upload_folder(pdf, self.cfg.folder, self.cfg.paths.outgoing)
|
|
if self.cfg.nextcloud.enabled:
|
|
upload_nextcloud(pdf, self.cfg.nextcloud)
|
|
if self.cfg.sftp.enabled:
|
|
upload_sftp(pdf, self.cfg.sftp)
|
|
|
|
def _notify(self, result: ProcessResult) -> None:
|
|
if result.success:
|
|
subject = f"[pdf-ocr] OK: {result.source.name}"
|
|
body = f"Datei verarbeitet: {result.output}\n"
|
|
if result.verapdf_passed is not None:
|
|
body += f"veraPDF: {'PASS' if result.verapdf_passed else 'FAIL'}\n"
|
|
else:
|
|
subject = f"[pdf-ocr] FEHLER: {result.source.name}"
|
|
body = f"Fehler beim Verarbeiten von {result.source}\n\n{result.error}\n"
|
|
notify_email(self.cfg.email, subject, body, result.success)
|