Implementing Celery for Async eDiscovery Batching: Resolving OOM & Hash Verification Failures

In high-volume ESI Ingestion & Processing Workflows, Celery provides deterministic parallelization for native file ingestion, metadata extraction, and cryptographic verification. However, scaling to multi-gigabyte containers, deeply nested email archives, or forensic imaging formats introduces strict memory boundaries. When workers operate under constrained container limits (typically 4 GB per process), synchronous I/O patterns trigger Out-Of-Memory (OOM) terminations that cascade into SHA-256 verification mismatches. This architecture requires strict adherence to Async Batch Processing Design principles to prevent partial state corruption and maintain defensible chain-of-custody records.

Diagnostic Signatures & Root-Cause Isolation

The failure manifests deterministically under --concurrency=8 --pool=prefork configurations processing batches exceeding 500 native files. Primary indicators appear in worker logs and audit trails:

text
[2024-03-15 14:22:01,112: ERROR/MainProcess] Task ediscovery.tasks.process_native_file[uuid-1] raised unexpected: MemoryError
[2024-03-15 14:22:01,115: WARNING/MainProcess] Process 'Worker-1' pid:10482 exited with 'signal 9 (SIGKILL)'
[2024-03-15 14:22:01,120: INFO/Worker-2] Hash verification failed for /mnt/esi/case_004/batch_12/IMG_8842.HEIC: expected SHA-256 mismatch (computed vs. manifest)

Exit code 137 (SIGKILL) confirms the Linux OOM killer terminated the worker. The root cause is twofold:

  1. Unbounded Memory Allocation: Loading entire files via pathlib.Path.read_bytes() before hashing exceeds per-worker RAM.
  2. Stream Interruption & FD Leaks: Extraction libraries (textract, pdfplumber, libmagic) retain open file descriptors and intermediate buffers. When the OS kills the process mid-task, the Celery result backend records a partial state, and hashlib never finalizes the digest. This violates deterministic hashing requirements and breaks downstream deduplication.

Production-Ready Remediation Architecture

Resolution requires streaming I/O, explicit resource cleanup, and Celery fault-tolerance configurations. The following implementation enforces bounded memory usage and guarantees cryptographic integrity.

python
import os
import hashlib
import logging
from pathlib import Path
from celery import Celery
from typing import Optional

# Configure Celery with fault-tolerant routing and memory safeguards
app = Celery('ediscovery_worker')
app.conf.update(
    task_acks_late=True,
    task_reject_on_worker_lost=True,
    worker_max_tasks_per_child=50,  # Prevents memory fragmentation over time
    worker_max_memory_per_child=3500000,  # ~3.5 GB soft limit (KB)
)

CHUNK_SIZE = 1024 * 1024  # 1 MB streaming buffer

def compute_sha256_streamed(file_path: Path) -> Optional[str]:
    """Compute SHA-256 using bounded memory streaming."""
    sha256 = hashlib.sha256()
    try:
        with open(file_path, 'rb') as f:
            while True:
                chunk = f.read(CHUNK_SIZE)
                if not chunk:
                    break
                sha256.update(chunk)
        return sha256.hexdigest()
    except (IOError, OSError) as e:
        logging.error(f"Stream read failed for {file_path}: {e}")
        return None

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def process_native_file(self, file_path: str, expected_hash: Optional[str] = None) -> dict:
    path = Path(file_path)
    
    # Validation: File existence and type check
    if not path.exists() or not path.is_file():
        raise FileNotFoundError(f"Native file not found: {file_path}")
    if path.stat().st_size == 0:
        raise ValueError(f"Zero-byte file rejected: {file_path}")

    computed_hash = compute_sha256_streamed(path)
    if computed_hash is None:
        # Transient I/O failure (e.g., a flaky network share). Retry with the
        # configured backoff before surfacing a hard failure to the backend.
        raise self.retry(
            exc=RuntimeError("Hash computation aborted due to I/O failure")
        )

    # Validation against manifest (if provided)
    if expected_hash and computed_hash.lower() != expected_hash.lower():
        logging.warning(
            f"Hash mismatch for {file_path}: expected={expected_hash}, computed={computed_hash}"
        )
        return {
            "status": "hash_mismatch",
            "file": file_path,
            "computed_hash": computed_hash,
            "expected_hash": expected_hash,
            "requires_review": True
        }

    return {
        "status": "success",
        "file": file_path,
        "computed_hash": computed_hash,
        "size_bytes": path.stat().st_size
    }

Defensible Recovery & Audit Trail Preservation

When OOM events occur mid-batch, recovery must preserve evidentiary integrity without reprocessing verified assets. The flow below traces triage from an OOM or SIGKILL termination through to a reconciled audit trail.

flowchart LR
    A["Worker OOM or SIGKILL"] --> B["Isolate partial states"]
    B --> C["Re-queue with bounded concurrency"]
    C --> D["Audit reconciliation"]

Implement the following triage protocol:

  1. Isolate Partial States: Query the Celery result backend for FAILURE or REVOKED tasks. Cross-reference against the ingestion manifest to identify files lacking finalized hashes.
  2. Re-queue with Bounded Concurrency: Restart the worker pool with --pool=solo or reduced --concurrency=2 to isolate memory-heavy files. Enable task_acks_late=True to ensure unacknowledged tasks return to the queue upon worker loss.
  3. Audit Trail Reconciliation: Log all hash computations, retries, and mismatches to an immutable audit store (e.g., append-only JSON or WORM storage). Maintain a strict mapping between task_id, file_path, computed_hash, and processing_timestamp.

Reference the official Celery Worker Memory Management guidelines for configuring worker_max_tasks_per_child and worker_max_memory_per_child to enforce graceful restarts before OOM triggers. Additionally, validate streaming implementations against Python’s hashlib documentation to ensure FIPS 180-4 compliance and deterministic digest generation.

Compliance Alignment & Validation

Cryptographic verification in eDiscovery must satisfy FRCP 34(b)(2)(E) production standards and NIST SP 800-88 media sanitization guidelines. The remediated architecture ensures:

  • Deterministic Hashing: Streaming hashlib.sha256() guarantees identical digests regardless of processing environment or chunk boundaries.
  • Chain-of-Custody Continuity: Partial task states are explicitly rejected and re-queued, preventing orphaned or unverified files from entering the production set.
  • Auditability: Every retry, mismatch, and memory-bound restart is logged with cryptographic hashes and timestamps, enabling defensible validation during privilege review or opposing counsel discovery disputes.

Deploy this configuration with strict schema validation for manifest ingestion, and route worker exceptions through a centralized, categorized error-logging pipeline to maintain operational visibility across distributed processing nodes.