Files
pdf-ocr-hotfolder/pdf_ocr_hotfolder/service.py
T
techadmin 76c3a991df Initial commit: PDF OCR Hotfolder v0.1.0
Komplettes Rewrite des alten Bash-Tools `pdf-tool` in Python.
- ocrmypdf als Library, watchdog für Hotfolder, ThreadPool für Parallelität
- Upload-Targets: folder, Nextcloud (WebDAV), SFTP
- E-Mail-Notify, optional veraPDF
- Interaktiver Installer mit Service-User-Support (lokal + AD via SSSD)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-08 00:22:31 +02:00

174 lines
5.6 KiB
Python

"""Hauptservice: Hotfolder via watchdog, ThreadPool für PDF-Verarbeitung."""
from __future__ import annotations
import logging
import signal
import threading
import time
from concurrent.futures import Future, ThreadPoolExecutor
from pathlib import Path
from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers import Observer
from .config import Config
from .processor import ProcessResult, process_pdf
from .uploaders import notify_email, upload_folder, upload_nextcloud, upload_sftp
log = logging.getLogger(__name__)
def _is_pdf(path: Path) -> bool:
return path.suffix.lower() == ".pdf" and path.is_file()
def _wait_until_stable(path: Path, checks: int = 3, interval: float = 1.0) -> bool:
"""Wartet bis Datei nicht mehr wächst (Scanner schreibt mehrmals)."""
last = -1
stable_count = 0
for _ in range(60): # max ~60s
try:
size = path.stat().st_size
except FileNotFoundError:
return False
if size == last and size > 0:
stable_count += 1
if stable_count >= checks:
return True
else:
stable_count = 0
last = size
time.sleep(interval)
return False
class _Handler(FileSystemEventHandler):
def __init__(self, service: "HotfolderService") -> None:
self.service = service
def on_created(self, event: FileSystemEvent) -> None:
if not event.is_directory:
self.service.enqueue(Path(event.src_path))
def on_moved(self, event: FileSystemEvent) -> None:
if not event.is_directory:
self.service.enqueue(Path(event.dest_path))
def on_closed(self, event: FileSystemEvent) -> None:
if not event.is_directory:
self.service.enqueue(Path(event.src_path))
class HotfolderService:
def __init__(self, cfg: Config) -> None:
self.cfg = cfg
self._executor = ThreadPoolExecutor(
max_workers=cfg.ocr.max_workers,
thread_name_prefix="ocr",
)
self._observer: Observer | None = None
self._stop = threading.Event()
self._inflight: set[str] = set()
self._lock = threading.Lock()
# ---- Setup ----
def _ensure_dirs(self) -> None:
for p in (self.cfg.paths.incoming, self.cfg.paths.outgoing,
self.cfg.paths.working, self.cfg.paths.error):
p.mkdir(parents=True, exist_ok=True)
# ---- Lifecycle ----
def run(self) -> None:
self._ensure_dirs()
self._scan_existing()
self._observer = Observer()
self._observer.schedule(_Handler(self), str(self.cfg.paths.incoming), recursive=False)
self._observer.start()
log.info("Hotfolder läuft. Watching: %s", self.cfg.paths.incoming)
signal.signal(signal.SIGTERM, lambda *_: self._stop.set())
signal.signal(signal.SIGINT, lambda *_: self._stop.set())
try:
while not self._stop.is_set():
self._stop.wait(1.0)
finally:
self.shutdown()
def shutdown(self) -> None:
log.info("Shutdown läuft...")
if self._observer:
self._observer.stop()
self._observer.join(timeout=5)
self._executor.shutdown(wait=True, cancel_futures=False)
log.info("Shutdown ok.")
# ---- Queue ----
def _scan_existing(self) -> None:
"""Beim Start: bereits liegende PDFs aufgreifen."""
for p in self.cfg.paths.incoming.iterdir():
if _is_pdf(p):
self.enqueue(p)
def enqueue(self, path: Path) -> None:
if not _is_pdf(path):
return
key = str(path.resolve())
with self._lock:
if key in self._inflight:
return
self._inflight.add(key)
fut = self._executor.submit(self._process, path)
fut.add_done_callback(lambda f, k=key: self._done(k, f))
def _done(self, key: str, fut: Future) -> None:
with self._lock:
self._inflight.discard(key)
exc = fut.exception()
if exc:
log.exception("Worker-Exception", exc_info=exc)
# ---- Processing ----
def _process(self, path: Path) -> None:
if not _wait_until_stable(path):
log.warning("Datei nicht stabilisiert, überspringe: %s", path)
return
if not path.exists():
return
result: ProcessResult = process_pdf(
src=path,
working_dir=self.cfg.paths.working,
outgoing_dir=self.cfg.paths.outgoing,
error_dir=self.cfg.paths.error,
ocr_cfg=self.cfg.ocr,
vera_cfg=self.cfg.verapdf,
)
if result.success:
self._dispatch_uploads(result.output)
self._notify(result)
def _dispatch_uploads(self, pdf: Path) -> None:
upload_folder(pdf, self.cfg.folder, self.cfg.paths.outgoing)
if self.cfg.nextcloud.enabled:
upload_nextcloud(pdf, self.cfg.nextcloud)
if self.cfg.sftp.enabled:
upload_sftp(pdf, self.cfg.sftp)
def _notify(self, result: ProcessResult) -> None:
if result.success:
subject = f"[pdf-ocr] OK: {result.source.name}"
body = f"Datei verarbeitet: {result.output}\n"
if result.verapdf_passed is not None:
body += f"veraPDF: {'PASS' if result.verapdf_passed else 'FAIL'}\n"
else:
subject = f"[pdf-ocr] FEHLER: {result.source.name}"
body = f"Fehler beim Verarbeiten von {result.source}\n\n{result.error}\n"
notify_email(self.cfg.email, subject, body, result.success)