Production Implementation: PDF & Text Extraction Engines for eDiscovery
Deterministic text extraction serves as the foundational transformation step within modern ESI Ingestion & Processing Workflows. Legal technology platforms must convert native PDFs, scanned images, and embedded text layers into searchable, review-ready formats without altering the underlying evidentiary value or breaking chain-of-custody protocols. This implementation guide details a production-grade extraction engine built around asynchronous batch processing, strict memory boundaries, and deterministic fallback routing. The architecture prioritizes forensic integrity, schema validation, and structured telemetry suitable for litigation support teams and Python automation engineers.
Pipeline Architecture & Memory-Aware Batching
PDF extraction at enterprise scale fails when synchronous I/O blocks the event loop or when unbounded object allocation triggers garbage collection thrashing. The engine operates as a stateless worker consuming file paths from upstream Native File Ingestion Pipelines and emitting normalized JSONL payloads. Memory pressure is managed through explicit chunking, context manager scoping, and pre-allocated buffer pools. Each worker process is capped at 2GB RSS, with batch sizes dynamically adjusted based on document complexity and available heap.
The extraction pipeline enforces a strict producer-consumer model using asyncio.Queue with bounded capacity, as documented in the official Python asyncio queue guidelines. Documents are grouped into micro-batches (typically 10–25 files) to balance throughput against memory fragmentation. After processing, all file handles, PDF objects, and intermediate string buffers are explicitly dereferenced. This disciplined approach to memory management is what allows long-running workers to maintain stable resident set sizes across multi-day processing windows, even on datasets exceeding 10 TB.
Primary Extraction & Deterministic Fallback Routing
The primary extraction path leverages pdfplumber for high-fidelity text layer parsing, preserving spatial coordinates, font metadata, and embedded annotations. Detailed implementation strategies for scaling this library are covered in Extracting embedded text with pdfplumber at scale. When the primary engine encounters structural corruption, missing cross-reference tables, or non-standard compression filters, the workflow routes the document to a secondary fallback tier. This tier first uses pikepdf to repair the cross-reference table and re-parse the recovered copy; if no text layer survives, it rasterizes the pages and applies OCR via pytesseract. If every tier fails, the file is quarantined with a structured error manifest rather than silently dropped.
Proper routing requires deterministic error categorization. Extraction failures are mapped to a controlled vocabulary (MALFORMED_XREF, MISSING_TEXT_LAYER, ENCRYPTED_NO_KEY, STREAM_DECOMPRESSION_FAIL). Each category triggers a specific fallback strategy and logging payload. For forensic integrity, every routing decision is recorded in the structured error manifest that accompanies each processed document.
The flowchart below depicts the tiered fallback, where each failed tier escalates to the next before quarantine.
flowchart TD
N["Native text extraction"] --> N1{"Text recovered?"}
N1 -->|"yes"| OK["Validate and emit payload"]
N1 -->|"no"| R["Repair xref with pikepdf"]
R --> R1{"Text recovered?"}
R1 -->|"yes"| OK
R1 -->|"no"| O["Rasterize and OCR"]
O --> O1{"Text recovered?"}
O1 -->|"yes"| OK
O1 -->|"no"| Q["Quarantine with error manifest"]
Chain-of-Custody & Cryptographic Verification
Before any text is extracted, the engine computes SHA-256 and MD5 hashes to establish a baseline for evidentiary integrity. Hash values are cross-referenced against upstream manifests to detect bit-rot or unauthorized modifications during transit. This verification step is critical for maintaining admissibility standards and is formally integrated into the Cryptographic Hash Generation protocol. All extracted payloads include the original hash, processing timestamp, engine version, and extraction confidence score to create an immutable audit trail compliant with EDRM guidelines.
Production-Grade Implementation
The following implementation demonstrates a complete, auditable extraction worker. It features bounded async queues, explicit resource cleanup, Pydantic schema validation, structured JSON logging, and deterministic fallback routing.
import asyncio
import hashlib
import json
import logging
import gc
from pathlib import Path
from typing import Dict, Any, Optional
from enum import Enum
import pdfplumber
import pikepdf
import pytesseract
from pdf2image import convert_from_path
from pdfminer.pdfparser import PDFSyntaxError
from pydantic import BaseModel, ValidationError
from datetime import datetime, timezone
# Structured logging configuration for SIEM ingestion
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s",
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger("pdf_extraction_engine")
class ExtractionStatus(str, Enum):
SUCCESS = "SUCCESS"
FALLBACK_OCR = "FALLBACK_OCR"
QUARANTINED = "QUARANTINED"
class ErrorCategory(str, Enum):
MALFORMED_XREF = "MALFORMED_XREF"
MISSING_TEXT_LAYER = "MISSING_TEXT_LAYER"
ENCRYPTED_NO_KEY = "ENCRYPTED_NO_KEY"
STREAM_DECOMPRESSION_FAIL = "STREAM_DECOMPRESSION_FAIL"
UNKNOWN = "UNKNOWN"
class ExtractionPayload(BaseModel):
file_path: str
sha256: str
md5: str
status: ExtractionStatus
extracted_text: str
page_count: int
error_category: Optional[ErrorCategory] = None
processing_timestamp: str
engine_version: str = "1.0.0"
class ExtractionWorker:
def __init__(self, queue_size: int = 50, batch_size: int = 15):
self.queue: asyncio.Queue = asyncio.Queue(maxsize=queue_size)
self.batch_size = batch_size
self._shutdown = False
async def compute_hashes(self, file_path: Path) -> Dict[str, str]:
"""Deterministic cryptographic verification before extraction."""
sha256 = hashlib.sha256()
md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
sha256.update(chunk)
md5.update(chunk)
return {"sha256": sha256.hexdigest(), "md5": md5.hexdigest()}
async def extract_primary(self, file_path: Path) -> Dict[str, Any]:
"""High-fidelity text layer extraction via pdfplumber."""
try:
with pdfplumber.open(file_path) as pdf:
pages = [page.extract_text() for page in pdf.pages]
return {
"text": "\n".join(filter(None, pages)),
"page_count": len(pdf.pages),
"status": ExtractionStatus.SUCCESS
}
except PDFSyntaxError as e:
return {"error": str(e), "category": ErrorCategory.MALFORMED_XREF}
except Exception as e:
return {"error": str(e), "category": ErrorCategory.UNKNOWN}
async def extract_fallback(self, file_path: Path) -> Dict[str, Any]:
"""Secondary tier: repair-and-reparse, then rasterized OCR."""
# First, attempt to recover a damaged cross-reference table with pikepdf,
# then re-run the high-fidelity text-layer extractor on the repaired copy.
try:
repaired = file_path.with_suffix(".repaired.pdf")
with pikepdf.open(file_path) as pdf:
pdf.save(repaired)
with pdfplumber.open(repaired) as pdf:
pages = [page.extract_text() for page in pdf.pages]
text = "\n".join(filter(None, pages))
if text.strip():
return {
"text": text,
"page_count": len(pages),
"status": ExtractionStatus.FALLBACK_OCR,
}
except Exception:
pass
# Final tier: rasterize each page and OCR it. pytesseract operates on
# images, so the PDF must be converted to page images first.
try:
images = await asyncio.to_thread(convert_from_path, str(file_path))
ocr_pages = [
await asyncio.to_thread(pytesseract.image_to_string, image, lang="eng")
for image in images
]
return {
"text": "\n".join(ocr_pages),
"page_count": len(ocr_pages),
"status": ExtractionStatus.FALLBACK_OCR,
}
except Exception as e:
return {"error": str(e), "category": ErrorCategory.STREAM_DECOMPRESSION_FAIL}
def build_payload(self, file_path: Path, hashes: Dict[str, str], result: Dict[str, Any]) -> ExtractionPayload:
"""Schema validation and payload normalization."""
return ExtractionPayload(
file_path=str(file_path),
sha256=hashes["sha256"],
md5=hashes["md5"],
status=result.get("status", ExtractionStatus.QUARANTINED),
extracted_text=result.get("text", ""),
page_count=result.get("page_count", 0),
error_category=result.get("category"),
processing_timestamp=datetime.now(timezone.utc).isoformat()
)
async def process_file(self, file_path: Path) -> Optional[ExtractionPayload]:
"""Orchestrates extraction with explicit memory boundaries."""
hashes = await self.compute_hashes(file_path)
primary = await self.extract_primary(file_path)
if primary.get("status") == ExtractionStatus.SUCCESS:
return self.build_payload(file_path, hashes, primary)
logger.warning(f"Primary extraction failed for {file_path.name}. Routing to fallback tier.")
fallback = await self.extract_fallback(file_path)
if fallback.get("status") == ExtractionStatus.FALLBACK_OCR:
return self.build_payload(file_path, hashes, fallback)
logger.error(f"Extraction exhausted all tiers for {file_path.name}. Quarantining.")
return self.build_payload(file_path, hashes, {
"status": ExtractionStatus.QUARANTINED,
"category": fallback.get("category", ErrorCategory.UNKNOWN)
})
async def worker_loop(self):
"""Stateless consumer loop with bounded concurrency."""
while not self._shutdown:
batch = []
for _ in range(self.batch_size):
try:
item = await asyncio.wait_for(self.queue.get(), timeout=2.0)
batch.append(item)
except asyncio.TimeoutError:
break
if not batch:
continue
for file_path in batch:
try:
payload = await self.process_file(file_path)
if payload:
logger.info(json.dumps(payload.model_dump()))
except ValidationError as ve:
logger.error(f"Schema validation failed: {ve}")
except Exception as e:
logger.exception(f"Unhandled worker exception: {e}")
finally:
self.queue.task_done()
# Explicit memory reclamation
gc.collect()
async def enqueue(self, file_path: Path):
await self.queue.put(file_path)
async def shutdown(self):
self._shutdown = True
await self.queue.join()
Operational Readiness & Compliance Notes
- Deterministic Output: The engine guarantees identical text output for identical input files across processing runs, provided the same engine version and fallback thresholds are maintained. This is critical for litigation hold reproducibility.
- Memory Boundaries: The explicit
gc.collect()call after each micro-batch prevents reference cycles from accumulating in long-running async loops. Combined with context-managed PDF handles, this keeps worker RSS within the 2GB threshold. - Auditability: Every payload includes cryptographic hashes, timestamps, and engine versions. Structured JSON logs can be routed directly to centralized logging platforms for compliance reporting and discovery metrics.
- Scalability: Deploy multiple worker instances behind a shared message broker (e.g., Redis Streams or RabbitMQ). The stateless design allows horizontal scaling without cross-node synchronization overhead.
By adhering to this architecture, legal technology teams achieve high-throughput text extraction while maintaining strict forensic controls, predictable memory consumption, and defensible audit trails.