Async Batch Processing Design for eDiscovery ESI Workflows
Deterministic throughput and strict chain-of-custody preservation are foundational to modern ESI Ingestion & Processing Workflows. Async batch processing bridges the gap between raw custodial data delivery and production-ready review databases by decoupling I/O-bound extraction, cryptographic hashing, and metadata normalization from synchronous request cycles. This guide details the architecture of a memory-aware, async-driven batch processor that enforces compliance boundaries, routes failures deterministically, and scales horizontally without compromising auditability.
Memory-Aware Batch Orchestration
ESI volumes routinely exceed terabyte-scale thresholds, making naive glob() or os.walk() approaches untenable in production. A compliant pipeline must bound memory consumption through backpressure-aware generators and fixed-capacity queues. Instead of materializing entire directory trees in RAM, the orchestrator yields file paths or byte streams in controlled chunks, respecting worker concurrency limits via asyncio.Semaphore. This prevents out-of-memory conditions during peak ingestion windows and aligns with the deterministic chunking strategies required by Native File Ingestion Pipelines.
Batch sizing should be dynamic, calculated per worker based on available heap, file type distribution, and downstream service latency. Large container formats (PST, ZIP, EML bundles) require stream-based parsing rather than full in-memory extraction. Coupling async generators with bounded asyncio.Queue instances maintains a steady-state processing rhythm while isolating memory spikes to individual worker contexts.
Async Execution & Extraction Integration
Once batches are materialized, task dispatch must remain non-blocking. Each file enters an async execution graph in which cryptographic hash generation (SHA-256) and schema validation run synchronously, while I/O-heavy operations such as text extraction are awaited as coroutines. Because hashing is CPU- and disk-bound, production deployments should offload compute_sha256 to a thread executor (for example, asyncio.to_thread) so it does not stall the event loop. This hash-first model guarantees that integrity verification—a foundational requirement for FRCP-compliant chain of custody—completes before any transformation alters the original binary.
Text extraction introduces significant latency variance. Integrating PDF & Text Extraction Engines requires connection pooling, timeout enforcement, and graceful degradation when encountering corrupted or password-protected documents. Async patterns allow the pipeline to yield control during blocking extraction calls, enabling concurrent processing of unrelated files while maintaining strict per-batch isolation.
Resilience, Fallback Routing & Compliance Boundaries
Async systems inevitably encounter transient failures, corrupted archives, or downstream service degradation. Production-grade eDiscovery pipelines must implement deterministic retry policies, exponential backoff, and dead-letter queue (DLQ) routing. When a worker encounters repeated failures, it must trigger isolation protocols to prevent cascading resource exhaustion. Implementing circuit breakers for async processing failures provides the necessary guardrails to halt processing on specific endpoints while allowing unaffected batches to proceed.
Every state transition, hash computation, and routing decision must be logged to an immutable audit trail. Compliance boundaries require that no file is marked PROCESSED until its cryptographic signature matches the original ingest manifest, and all extracted metadata conforms to the target review platform schema.
Production Implementation
The following module demonstrates a production-ready async batch processor. It enforces hash-first validation, backpressure via bounded queues, and deterministic error routing to a DLQ. All operations are instrumented for auditability and align with EDRM processing standards.
The diagram below traces a single item through the six pipeline stages, including the dead-letter branch taken on repeated failure.
flowchart LR
A["Discovery"] --> B["Hash-first validation"]
B --> C["Async extraction"]
C --> D["Schema normalization"]
D --> E{"Repeated failure?"}
E -->|"no"| F["Audit manifest"]
E -->|"yes"| G["DLQ routing"]
G --> F
import asyncio
import hashlib
import json
import logging
import os
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import AsyncGenerator, Optional, Dict, Any
# Configure structured audit logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s"
)
logger = logging.getLogger("ediscovery.async_batch")
class ProcessingStatus(str, Enum):
PENDING = "PENDING"
HASHED = "HASHED"
EXTRACTED = "EXTRACTED"
FAILED = "FAILED"
DLQ = "DLQ"
@dataclass
class ESIRecord:
file_path: Path
sha256: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
status: ProcessingStatus = ProcessingStatus.PENDING
error: Optional[str] = None
class AsyncBatchProcessor:
def __init__(
self,
max_concurrency: int = 8,
queue_capacity: int = 128,
max_retries: int = 3,
dlq_path: Path = Path("./dlq")
):
self.max_concurrency = max_concurrency
self.semaphore = asyncio.Semaphore(max_concurrency)
self.queue: asyncio.Queue = asyncio.Queue(maxsize=queue_capacity)
self.max_retries = max_retries
self.dlq_path = dlq_path
self.dlq_path.mkdir(parents=True, exist_ok=True)
self.retry_counts: Dict[str, int] = {}
async def file_generator(self, root_dir: Path) -> AsyncGenerator[Path, None]:
"""Yields file paths with backpressure-aware traversal."""
for dirpath, _, filenames in os.walk(root_dir):
for fname in filenames:
yield Path(dirpath) / fname
# Yield control to event loop to prevent queue starvation
await asyncio.sleep(0)
def compute_sha256(self, file_path: Path) -> str:
"""Synchronous, streaming SHA-256 computation for chain-of-custody."""
sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
while chunk := f.read(8192):
sha256.update(chunk)
return sha256.hexdigest()
async def extract_text_async(self, record: ESIRecord) -> str:
"""Simulates async I/O-bound extraction. Wrap the real engine call in
asyncio.wait_for() to enforce a per-file timeout in production."""
await asyncio.sleep(0.1) # Placeholder for actual engine call
# In production: integrate with an async HTTP client or subprocess pool
return f"extracted_text_{record.file_path.stem}"
async def route_to_dlq(self, record: ESIRecord) -> None:
"""Deterministic failure routing with audit trail preservation."""
record.status = ProcessingStatus.DLQ
dlq_file = self.dlq_path / f"{record.file_path.name}.json"
logger.warning(f"Routing {record.file_path} to DLQ: {record.error}")
# Persist failure manifest for legal hold/compliance review. Use json.dumps
# so error strings containing quotes or newlines never corrupt the manifest.
with open(dlq_file, "w") as f:
json.dump(
{"path": str(record.file_path), "error": record.error, "status": "DLQ"},
f,
)
async def process_record(self, record: ESIRecord) -> ESIRecord:
"""Core async execution graph with hash-first enforcement.
Retries are driven by an iterative loop rather than recursion so the
concurrency semaphore is released between attempts; recursing while
holding the semaphore would deadlock the worker pool under contention.
"""
retry_key = str(record.file_path)
for attempt in range(1, self.max_retries + 1):
async with self.semaphore:
try:
# 1. Cryptographic hash (synchronous streaming read).
record.sha256 = self.compute_sha256(record.file_path)
record.status = ProcessingStatus.HASHED
logger.info(f"Hashed: {record.file_path.name} | SHA256: {record.sha256[:12]}...")
# 2. Async text/metadata extraction.
record.metadata["extracted_text"] = await self.extract_text_async(record)
record.status = ProcessingStatus.EXTRACTED
logger.info(f"Extracted: {record.file_path.name}")
return record
except Exception as e:
record.error = str(e)
record.status = ProcessingStatus.FAILED
self.retry_counts[retry_key] = attempt
if attempt < self.max_retries:
logger.warning(
f"Retry {attempt}/{self.max_retries} for {record.file_path.name}: {e}"
)
# Backoff happens outside the semaphore so a sleeping retry does not
# hold a concurrency slot hostage.
if attempt < self.max_retries:
await asyncio.sleep(0.5 * attempt)
await self.route_to_dlq(record)
return record
async def run(self, ingest_root: Path) -> None:
"""Pipeline orchestrator: materializes batches, dispatches workers, drains queue."""
producer_task = asyncio.create_task(self._producer(ingest_root))
consumer_tasks = [
asyncio.create_task(self._consumer()) for _ in range(self.max_concurrency)
]
await producer_task
await self.queue.join()
for task in consumer_tasks:
task.cancel()
await asyncio.gather(*consumer_tasks, return_exceptions=True)
logger.info("Batch processing complete. All workers drained.")
async def _producer(self, root: Path) -> None:
async for path in self.file_generator(root):
record = ESIRecord(file_path=path)
await self.queue.put(record)
async def _consumer(self) -> None:
while True:
try:
record = await self.queue.get()
except asyncio.CancelledError:
break
try:
await self.process_record(record)
finally:
# Always acknowledge the item so queue.join() can complete even
# if processing raises an unexpected error.
self.queue.task_done()
Horizontal Scaling & Observability
The in-process async model above is ideal for single-node ingestion servers. For distributed litigation support environments, workload partitioning should shift to a message broker architecture. Implementing Celery for async eDiscovery batching outlines how to decouple the producer/consumer pattern across Kubernetes pods or VM fleets while preserving task idempotency and result backend consistency.
Observability must track three core metrics:
- Throughput (files/sec & GB/hr): Validates SLA adherence during tight discovery deadlines.
- Hash Collision & Integrity Rate: Monitors cryptographic verification success against ingest manifests.
- DLQ Accumulation Velocity: Early indicator of systemic extraction failures or malformed custodial media.
Instrumentation should export metrics via OpenTelemetry or Prometheus, with structured logs forwarded to immutable storage for audit compliance. The pipeline flow remains explicit: Discovery → Hash-First Validation → Async Extraction → Schema Normalization → DLQ Routing → Audit Manifest Generation.
Conclusion
Async batch processing for eDiscovery ESI workflows eliminates synchronous bottlenecks while enforcing strict chain-of-custody boundaries. By combining memory-aware generators, hash-first execution graphs, and deterministic failure routing, legal automation teams can scale ingestion throughput without sacrificing compliance or auditability. The architecture presented here serves as a foundational blueprint for modern, production-grade discovery platforms.