Implementing Celery for Async eDiscovery Batching: Resolving OOM & Hash Verification Failures
In high-volume ESI Ingestion & Processing Workflows, Celery provides deterministic parallelization for native file ingestion, metadata extraction, and cryptographic verification. However, scaling to multi-gigabyte containers, deeply nested email archives, or forensic imaging formats introduces strict memory boundaries. When workers operate under constrained container limits (typically 4 GB per process), synchronous I/O patterns trigger Out-Of-Memory (OOM) terminations that cascade into SHA-256 verification mismatches. This architecture requires strict adherence to Async Batch Processing Design principles to prevent partial state corruption and maintain defensible chain-of-custody records.
Diagnostic Signatures & Root-Cause Isolation
The failure manifests deterministically under --concurrency=8 --pool=prefork configurations processing batches exceeding 500 native files. Primary indicators appear in worker logs and audit trails:
[2024-03-15 14:22:01,112: ERROR/MainProcess] Task ediscovery.tasks.process_native_file[uuid-1] raised unexpected: MemoryError
[2024-03-15 14:22:01,115: WARNING/MainProcess] Process 'Worker-1' pid:10482 exited with 'signal 9 (SIGKILL)'
[2024-03-15 14:22:01,120: INFO/Worker-2] Hash verification failed for /mnt/esi/case_004/batch_12/IMG_8842.HEIC: expected SHA-256 mismatch (computed vs. manifest)
Exit code 137 (SIGKILL) confirms the Linux OOM killer terminated the worker. The root cause is twofold:
- Unbounded Memory Allocation: Loading entire files via
pathlib.Path.read_bytes()before hashing exceeds per-worker RAM. - Stream Interruption & FD Leaks: Extraction libraries (
textract,pdfplumber,libmagic) retain open file descriptors and intermediate buffers. When the OS kills the process mid-task, the Celery result backend records a partial state, andhashlibnever finalizes the digest. This violates deterministic hashing requirements and breaks downstream deduplication.
Production-Ready Remediation Architecture
Resolution requires streaming I/O, explicit resource cleanup, and Celery fault-tolerance configurations. The following implementation enforces bounded memory usage and guarantees cryptographic integrity.
import os
import hashlib
import logging
from pathlib import Path
from celery import Celery
from typing import Optional
# Configure Celery with fault-tolerant routing and memory safeguards
app = Celery('ediscovery_worker')
app.conf.update(
task_acks_late=True,
task_reject_on_worker_lost=True,
worker_max_tasks_per_child=50, # Prevents memory fragmentation over time
worker_max_memory_per_child=3500000, # ~3.5 GB soft limit (KB)
)
CHUNK_SIZE = 1024 * 1024 # 1 MB streaming buffer
def compute_sha256_streamed(file_path: Path) -> Optional[str]:
"""Compute SHA-256 using bounded memory streaming."""
sha256 = hashlib.sha256()
try:
with open(file_path, 'rb') as f:
while True:
chunk = f.read(CHUNK_SIZE)
if not chunk:
break
sha256.update(chunk)
return sha256.hexdigest()
except (IOError, OSError) as e:
logging.error(f"Stream read failed for {file_path}: {e}")
return None
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def process_native_file(self, file_path: str, expected_hash: Optional[str] = None) -> dict:
path = Path(file_path)
# Validation: File existence and type check
if not path.exists() or not path.is_file():
raise FileNotFoundError(f"Native file not found: {file_path}")
if path.stat().st_size == 0:
raise ValueError(f"Zero-byte file rejected: {file_path}")
computed_hash = compute_sha256_streamed(path)
if computed_hash is None:
# Transient I/O failure (e.g., a flaky network share). Retry with the
# configured backoff before surfacing a hard failure to the backend.
raise self.retry(
exc=RuntimeError("Hash computation aborted due to I/O failure")
)
# Validation against manifest (if provided)
if expected_hash and computed_hash.lower() != expected_hash.lower():
logging.warning(
f"Hash mismatch for {file_path}: expected={expected_hash}, computed={computed_hash}"
)
return {
"status": "hash_mismatch",
"file": file_path,
"computed_hash": computed_hash,
"expected_hash": expected_hash,
"requires_review": True
}
return {
"status": "success",
"file": file_path,
"computed_hash": computed_hash,
"size_bytes": path.stat().st_size
}
Defensible Recovery & Audit Trail Preservation
When OOM events occur mid-batch, recovery must preserve evidentiary integrity without reprocessing verified assets. The flow below traces triage from an OOM or SIGKILL termination through to a reconciled audit trail.
flowchart LR
A["Worker OOM or SIGKILL"] --> B["Isolate partial states"]
B --> C["Re-queue with bounded concurrency"]
C --> D["Audit reconciliation"]
Implement the following triage protocol:
- Isolate Partial States: Query the Celery result backend for
FAILUREorREVOKEDtasks. Cross-reference against the ingestion manifest to identify files lacking finalized hashes. - Re-queue with Bounded Concurrency: Restart the worker pool with
--pool=soloor reduced--concurrency=2to isolate memory-heavy files. Enabletask_acks_late=Trueto ensure unacknowledged tasks return to the queue upon worker loss. - Audit Trail Reconciliation: Log all hash computations, retries, and mismatches to an immutable audit store (e.g., append-only JSON or WORM storage). Maintain a strict mapping between
task_id,file_path,computed_hash, andprocessing_timestamp.
Reference the official Celery Worker Memory Management guidelines for configuring worker_max_tasks_per_child and worker_max_memory_per_child to enforce graceful restarts before OOM triggers. Additionally, validate streaming implementations against Python’s hashlib documentation to ensure FIPS 180-4 compliance and deterministic digest generation.
Compliance Alignment & Validation
Cryptographic verification in eDiscovery must satisfy FRCP 34(b)(2)(E) production standards and NIST SP 800-88 media sanitization guidelines. The remediated architecture ensures:
- Deterministic Hashing: Streaming
hashlib.sha256()guarantees identical digests regardless of processing environment or chunk boundaries. - Chain-of-Custody Continuity: Partial task states are explicitly rejected and re-queued, preventing orphaned or unverified files from entering the production set.
- Auditability: Every retry, mismatch, and memory-bound restart is logged with cryptographic hashes and timestamps, enabling defensible validation during privilege review or opposing counsel discovery disputes.
Deploy this configuration with strict schema validation for manifest ingestion, and route worker exceptions through a centralized, categorized error-logging pipeline to maintain operational visibility across distributed processing nodes.