"""OCR-Verarbeitung einer einzelnen PDF mit ocrmypdf + optional veraPDF.""" from __future__ import annotations import logging import shutil import subprocess from dataclasses import dataclass from pathlib import Path from .config import OcrConfig, OutputConfig, VeraPdfConfig log = logging.getLogger(__name__) def build_output_name(src_name: str, mode: str, tag: str) -> str: """Erzeugt den Ziel-Dateinamen für ein OCR-PDF. Args: src_name: Original-Dateiname (z.B. "scan.pdf") mode: "prefix" | "suffix" | "none" tag: Einzufügender String (verbatim, leer = kein Tag) Beispiele: prefix "OCR_": "scan.pdf" -> "OCR_scan.pdf" suffix "_OCR": "scan.pdf" -> "scan_OCR.pdf" suffix "_OCR": "scan.tar.gz.pdf" -> "scan.tar.gz_OCR.pdf" none: "scan.pdf" -> "scan.pdf" """ if mode == "none" or not tag: return src_name if mode == "prefix": return f"{tag}{src_name}" if mode == "suffix": # Nur die letzte Extension abspalten, sonst "foo.bar.pdf" kaputt gemacht p = Path(src_name) stem, ext = p.stem, p.suffix return f"{stem}{tag}{ext}" raise ValueError(f"Unbekannter name_mode: {mode!r}") @dataclass class ProcessResult: source: Path output: Path success: bool error: str = "" verapdf_passed: bool | None = None def run_ocr(src: Path, dst: Path, cfg: OcrConfig) -> None: """Führt ocrmypdf als Library-Call aus (kein Subprozess-Overhead).""" import ocrmypdf # lazy, damit Tests ohne ocrmypdf laufen kwargs: dict = { "language": cfg.languages, "jobs": cfg.jobs, "deskew": cfg.deskew, "clean": cfg.clean, "oversample": cfg.oversample, "progress_bar": False, "skip_text": cfg.skip_text, } if cfg.pdfa_level: kwargs["output_type"] = f"pdfa-{cfg.pdfa_level}" else: kwargs["output_type"] = "pdf" log.info("OCR start: %s", src.name) ocrmypdf.ocr(str(src), str(dst), **kwargs) log.info("OCR done: %s", dst.name) def run_verapdf(pdf: Path, cfg: VeraPdfConfig) -> bool: """Validiert PDF/A mit veraPDF (CLI). Gibt True zurück, wenn konform.""" if not cfg.enabled: return True if not Path(cfg.binary).exists(): log.warning("veraPDF binary nicht gefunden: %s", cfg.binary) return False try: result = subprocess.run( [cfg.binary, "--flavour", cfg.flavour, "--format", "text", str(pdf)], capture_output=True, text=True, timeout=300, ) ok = result.returncode == 0 and "PASS" in result.stdout log.info("veraPDF %s: %s", "PASS" if ok else "FAIL", pdf.name) return ok except subprocess.TimeoutExpired: log.error("veraPDF Timeout: %s", pdf.name) return False def process_pdf( src: Path, working_dir: Path, outgoing_dir: Path, error_dir: Path, ocr_cfg: OcrConfig, vera_cfg: VeraPdfConfig, output_cfg: OutputConfig, ) -> ProcessResult: """Verarbeitet eine einzelne PDF: move→OCR→validate→outgoing/error.""" out_name = build_output_name(src.name, output_cfg.name_mode, output_cfg.name_tag) work_src = working_dir / src.name work_out = working_dir / f"__ocr_{out_name}" # Temp-Name, damit er != src.name ist final_out = outgoing_dir / out_name try: shutil.move(str(src), str(work_src)) except OSError as e: return ProcessResult(src, final_out, False, f"move to working failed: {e}") try: run_ocr(work_src, work_out, ocr_cfg) except Exception as e: # noqa: BLE001 - ocrmypdf wirft viele Typen log.exception("OCR fehlgeschlagen für %s", src.name) _move_to_error(work_src, error_dir) return ProcessResult(src, final_out, False, f"ocr failed: {e}") vera_ok: bool | None = None if vera_cfg.enabled: vera_ok = run_verapdf(work_out, vera_cfg) if not vera_ok: _move_to_error(work_out, error_dir) work_src.unlink(missing_ok=True) return ProcessResult(src, final_out, False, "verapdf validation failed", verapdf_passed=False) outgoing_dir.mkdir(parents=True, exist_ok=True) shutil.move(str(work_out), str(final_out)) _dispose_original(work_src, src.name, output_cfg) return ProcessResult(src, final_out, True, verapdf_passed=vera_ok) def _dispose_original(work_src: Path, original_name: str, cfg: OutputConfig) -> None: """Entsorgt das Original nach erfolgreichem OCR — löschen oder archivieren.""" if not work_src.exists(): return mode = cfg.original_on_success if mode == "delete": work_src.unlink(missing_ok=True) return if mode == "archive": if not cfg.archive_dir: log.error("original_on_success=archive aber archive_dir ist leer — lösche stattdessen") work_src.unlink(missing_ok=True) return archive = Path(cfg.archive_dir) archive.mkdir(parents=True, exist_ok=True) dest = archive / original_name # Bei Namens-Kollision mit Timestamp umbenennen if dest.exists(): from datetime import datetime ts = datetime.now().strftime("%Y%m%d-%H%M%S") dest = archive / f"{dest.stem}_{ts}{dest.suffix}" shutil.move(str(work_src), str(dest)) log.info("Original archiviert: %s", dest) return log.warning("Unbekannter original_on_success=%r — lösche stattdessen", mode) work_src.unlink(missing_ok=True) def _move_to_error(p: Path, error_dir: Path) -> None: error_dir.mkdir(parents=True, exist_ok=True) try: shutil.move(str(p), str(error_dir / p.name)) except OSError: log.exception("Konnte %s nicht in error-Verzeichnis verschieben", p)