a23a3968ef
Neue [output]-Section: - name_mode: prefix | suffix | none (suffix wird vor Extension eingefügt) - name_tag: verbatim einfügbarer String - original_on_success: delete | archive - archive_dir mit Kollisions-Schutz (Timestamp-Suffix) 20 neue Tests (50 insgesamt, alle grün). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
305 lines
9.9 KiB
Python
305 lines
9.9 KiB
Python
"""Hauptservice: Hotfolder via watchdog, ThreadPool für PDF-Verarbeitung."""
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
import re
|
||
import shutil
|
||
import signal
|
||
import subprocess
|
||
import threading
|
||
import time
|
||
from concurrent.futures import Future, ThreadPoolExecutor
|
||
from pathlib import Path
|
||
|
||
from watchdog.events import FileSystemEvent, FileSystemEventHandler
|
||
from watchdog.observers import Observer
|
||
|
||
from .config import Config
|
||
from .processor import ProcessResult, process_pdf
|
||
from .uploaders import notify_email, upload_folder, upload_nextcloud, upload_sftp
|
||
|
||
log = logging.getLogger(__name__)
|
||
|
||
|
||
class PreflightError(RuntimeError):
|
||
"""Erforderliche externe Binaries fehlen."""
|
||
|
||
|
||
# Pflicht-Binaries für ocrmypdf
|
||
_REQUIRED_BINARIES = ("tesseract", "gs")
|
||
|
||
# Ghostscript-Versionen mit bekanntem PDF/A+skip_text Bug (Issue #3):
|
||
# 10.0.0 .. 10.02.0 (inklusive). Ab 10.02.1 wieder nutzbar.
|
||
_GS_BROKEN_MIN = (10, 0, 0)
|
||
_GS_BROKEN_MAX = (10, 2, 0)
|
||
|
||
|
||
def _parse_version(text: str) -> tuple[int, ...] | None:
|
||
"""Extrahiert die erste X.Y[.Z] Version aus einem String."""
|
||
m = re.search(r"(\d+)\.(\d+)(?:\.(\d+))?", text)
|
||
if not m:
|
||
return None
|
||
return tuple(int(x) if x is not None else 0 for x in m.groups())
|
||
|
||
|
||
def is_ghostscript_broken(version: str | None) -> bool:
|
||
"""Prüft, ob eine Ghostscript-Version vom PDF/A+skip_text Bug betroffen ist.
|
||
|
||
Betrifft 10.0.0 bis einschließlich 10.02.0. Ab 10.02.1 wieder sicher.
|
||
"""
|
||
if not version:
|
||
return False
|
||
parsed = _parse_version(version)
|
||
if parsed is None:
|
||
return False
|
||
# Auf 3-Tupel normalisieren
|
||
while len(parsed) < 3:
|
||
parsed = parsed + (0,)
|
||
parsed = parsed[:3]
|
||
return _GS_BROKEN_MIN <= parsed <= _GS_BROKEN_MAX
|
||
|
||
|
||
def detect_ghostscript_version() -> str | None:
|
||
"""Ruft `gs --version` auf und gibt den Versionsstring zurück (oder None)."""
|
||
gs = shutil.which("gs")
|
||
if gs is None:
|
||
return None
|
||
try:
|
||
result = subprocess.run([gs, "--version"], capture_output=True,
|
||
text=True, timeout=5)
|
||
except (OSError, subprocess.TimeoutExpired):
|
||
return None
|
||
return result.stdout.strip() or None
|
||
|
||
|
||
def check_output_config(mode: str, archive_dir: str) -> None:
|
||
"""Validiert die [output]-Section. Wirft PreflightError bei Problemen."""
|
||
valid_modes = {"delete", "archive"}
|
||
if mode not in valid_modes:
|
||
raise PreflightError(
|
||
f"[output].original_on_success={mode!r} ungültig. "
|
||
f"Erlaubt: {sorted(valid_modes)}"
|
||
)
|
||
if mode == "archive" and not archive_dir:
|
||
raise PreflightError(
|
||
"[output].original_on_success='archive' erfordert [output].archive_dir"
|
||
)
|
||
|
||
|
||
def check_preflight(pdfa_level: str = "") -> None:
|
||
"""Prüft externe Abhängigkeiten.
|
||
|
||
- Tesseract und Ghostscript müssen im PATH sein
|
||
- Bei gesetztem pdfa_level wird die Ghostscript-Version gegen den
|
||
bekannten 10.0.0–10.02.0 Bug geprüft
|
||
|
||
Wirft PreflightError bei fehlenden Binaries oder unsicherem Ghostscript.
|
||
"""
|
||
missing = [b for b in _REQUIRED_BINARIES if shutil.which(b) is None]
|
||
if missing:
|
||
raise PreflightError(
|
||
"Fehlende Abhängigkeiten: " + ", ".join(missing)
|
||
+ ". Bitte installieren: sudo apt install tesseract-ocr ghostscript"
|
||
)
|
||
|
||
if pdfa_level:
|
||
gs_version = detect_ghostscript_version()
|
||
if is_ghostscript_broken(gs_version):
|
||
raise PreflightError(
|
||
f"Ghostscript {gs_version} ist mit pdfa_level='{pdfa_level}' nicht "
|
||
"kompatibel (bekannter Bug in 10.0.0–10.02.0). "
|
||
"Entweder ghostscript auf >=10.02.1 upgraden (z.B. via bookworm-backports) "
|
||
"oder in der Config [ocr].pdfa_level = \"\" setzen."
|
||
)
|
||
|
||
|
||
def _is_pdf(path: Path) -> bool:
|
||
return path.suffix.lower() == ".pdf" and path.is_file()
|
||
|
||
|
||
def _wait_until_stable(path: Path, checks: int = 3, interval: float = 1.0) -> bool:
|
||
"""Wartet bis Datei nicht mehr wächst (Scanner schreibt mehrmals)."""
|
||
last = -1
|
||
stable_count = 0
|
||
for _ in range(60): # max ~60s
|
||
try:
|
||
size = path.stat().st_size
|
||
except FileNotFoundError:
|
||
return False
|
||
if size == last and size > 0:
|
||
stable_count += 1
|
||
if stable_count >= checks:
|
||
return True
|
||
else:
|
||
stable_count = 0
|
||
last = size
|
||
time.sleep(interval)
|
||
return False
|
||
|
||
|
||
class _Handler(FileSystemEventHandler):
|
||
def __init__(self, service: "HotfolderService") -> None:
|
||
self.service = service
|
||
|
||
def on_created(self, event: FileSystemEvent) -> None:
|
||
if not event.is_directory:
|
||
self.service.enqueue(Path(event.src_path))
|
||
|
||
def on_moved(self, event: FileSystemEvent) -> None:
|
||
if not event.is_directory:
|
||
self.service.enqueue(Path(event.dest_path))
|
||
|
||
def on_closed(self, event: FileSystemEvent) -> None:
|
||
if not event.is_directory:
|
||
self.service.enqueue(Path(event.src_path))
|
||
|
||
|
||
class HotfolderService:
|
||
def __init__(self, cfg: Config) -> None:
|
||
self.cfg = cfg
|
||
self._executor = ThreadPoolExecutor(
|
||
max_workers=cfg.ocr.max_workers,
|
||
thread_name_prefix="ocr",
|
||
)
|
||
self._observer: Observer | None = None
|
||
self._stop = threading.Event()
|
||
self._inflight: set[str] = set()
|
||
self._lock = threading.Lock()
|
||
self._success_count = 0
|
||
self._error_count = 0
|
||
|
||
@property
|
||
def success_count(self) -> int:
|
||
return self._success_count
|
||
|
||
@property
|
||
def error_count(self) -> int:
|
||
return self._error_count
|
||
|
||
# ---- Setup ----
|
||
|
||
def ensure_dirs(self) -> None:
|
||
for p in (self.cfg.paths.incoming, self.cfg.paths.outgoing,
|
||
self.cfg.paths.working, self.cfg.paths.error):
|
||
p.mkdir(parents=True, exist_ok=True)
|
||
|
||
# ---- Lifecycle ----
|
||
|
||
def run(self) -> None:
|
||
check_preflight(self.cfg.ocr.pdfa_level)
|
||
check_output_config(self.cfg.output.original_on_success,
|
||
self.cfg.output.archive_dir)
|
||
self.ensure_dirs()
|
||
self._scan_existing()
|
||
|
||
self._observer = Observer()
|
||
self._observer.schedule(_Handler(self), str(self.cfg.paths.incoming), recursive=False)
|
||
self._observer.start()
|
||
log.info("Hotfolder läuft. Watching: %s", self.cfg.paths.incoming)
|
||
|
||
signal.signal(signal.SIGTERM, lambda *_: self._stop.set())
|
||
signal.signal(signal.SIGINT, lambda *_: self._stop.set())
|
||
|
||
try:
|
||
while not self._stop.is_set():
|
||
self._stop.wait(1.0)
|
||
finally:
|
||
self.shutdown()
|
||
|
||
def run_once(self) -> int:
|
||
"""Verarbeitet alle bereits im incoming-Ordner liegenden PDFs und beendet sich.
|
||
|
||
Returns:
|
||
Anzahl fehlgeschlagener PDFs (0 = alles ok).
|
||
"""
|
||
check_preflight(self.cfg.ocr.pdfa_level)
|
||
check_output_config(self.cfg.output.original_on_success,
|
||
self.cfg.output.archive_dir)
|
||
self.ensure_dirs()
|
||
self._scan_existing()
|
||
self._executor.shutdown(wait=True)
|
||
log.info("One-shot fertig: %d ok, %d Fehler",
|
||
self._success_count, self._error_count)
|
||
return self._error_count
|
||
|
||
def shutdown(self) -> None:
|
||
log.info("Shutdown läuft...")
|
||
if self._observer:
|
||
self._observer.stop()
|
||
self._observer.join(timeout=5)
|
||
self._executor.shutdown(wait=True, cancel_futures=False)
|
||
log.info("Shutdown ok.")
|
||
|
||
# ---- Queue ----
|
||
|
||
def _scan_existing(self) -> None:
|
||
"""Beim Start: bereits liegende PDFs aufgreifen."""
|
||
for p in self.cfg.paths.incoming.iterdir():
|
||
if _is_pdf(p):
|
||
self.enqueue(p)
|
||
|
||
def enqueue(self, path: Path) -> None:
|
||
if not _is_pdf(path):
|
||
return
|
||
key = str(path.resolve())
|
||
with self._lock:
|
||
if key in self._inflight:
|
||
return
|
||
self._inflight.add(key)
|
||
fut = self._executor.submit(self._process, path)
|
||
fut.add_done_callback(lambda f, k=key: self._done(k, f))
|
||
|
||
def _done(self, key: str, fut: Future) -> None:
|
||
with self._lock:
|
||
self._inflight.discard(key)
|
||
exc = fut.exception()
|
||
if exc:
|
||
log.exception("Worker-Exception", exc_info=exc)
|
||
|
||
# ---- Processing ----
|
||
|
||
def _process(self, path: Path) -> None:
|
||
if not _wait_until_stable(path):
|
||
log.warning("Datei nicht stabilisiert, überspringe: %s", path)
|
||
return
|
||
if not path.exists():
|
||
return
|
||
|
||
result: ProcessResult = process_pdf(
|
||
src=path,
|
||
working_dir=self.cfg.paths.working,
|
||
outgoing_dir=self.cfg.paths.outgoing,
|
||
error_dir=self.cfg.paths.error,
|
||
ocr_cfg=self.cfg.ocr,
|
||
vera_cfg=self.cfg.verapdf,
|
||
output_cfg=self.cfg.output,
|
||
)
|
||
|
||
with self._lock:
|
||
if result.success:
|
||
self._success_count += 1
|
||
else:
|
||
self._error_count += 1
|
||
|
||
if result.success:
|
||
self._dispatch_uploads(result.output)
|
||
self._notify(result)
|
||
|
||
def _dispatch_uploads(self, pdf: Path) -> None:
|
||
upload_folder(pdf, self.cfg.folder, self.cfg.paths.outgoing)
|
||
if self.cfg.nextcloud.enabled:
|
||
upload_nextcloud(pdf, self.cfg.nextcloud)
|
||
if self.cfg.sftp.enabled:
|
||
upload_sftp(pdf, self.cfg.sftp)
|
||
|
||
def _notify(self, result: ProcessResult) -> None:
|
||
if result.success:
|
||
subject = f"[pdf-ocr] OK: {result.source.name}"
|
||
body = f"Datei verarbeitet: {result.output}\n"
|
||
if result.verapdf_passed is not None:
|
||
body += f"veraPDF: {'PASS' if result.verapdf_passed else 'FAIL'}\n"
|
||
else:
|
||
subject = f"[pdf-ocr] FEHLER: {result.source.name}"
|
||
body = f"Fehler beim Verarbeiten von {result.source}\n\n{result.error}\n"
|
||
notify_email(self.cfg.email, subject, body, result.success)
|