Implementation Guide: Attachment & Parent-Child Mapping Pipeline
The attachment-to-parent mapping stage serves as the structural backbone of any defensible eDiscovery processing workflow. When custodial data is ingested, establishing immutable parent-child relationships before downstream review or production prevents metadata fragmentation, preserves chain-of-custody integrity, and ensures that privilege logs and redaction layers propagate correctly across nested document families. This implementation assumes prior completion of Deduplication & Family Grouping at the ingestion boundary, guaranteeing that only unique parent containers and their associated attachments enter the mapping pipeline.
Memory-Aware Async Architecture & Batch Routing
Processing nested containers at scale requires strict memory isolation. Traditional synchronous parsers block the event loop and exhaust heap space when traversing deeply nested archives or malformed OLE streams. The pipeline implements a generator-driven batching strategy that yields fixed-size chunks of file paths, processes them through an asyncio.Semaphore-controlled worker pool, and streams mapping results directly to a relational or document-store backend. By decoupling I/O from CPU-bound parsing, the system maintains predictable memory footprints even when encountering multi-gigabyte PST exports or recursively compressed archives.
Hash verification occurs at the extraction boundary to prevent duplicate attachment inflation. When an extracted child matches an existing document hash, the pipeline consults Hash-Based Deduplication Strategies to determine whether to suppress the child, create a logical reference pointer, or flag it for cross-family review. This prevents review platform bloat while preserving audit trails for legal hold requirements.
Compliance Boundaries & Structural Sanitization
Defensible mapping requires strict enforcement of path traversal sanitization and maximum extraction depth. Zip bombs, symlink loops, and directory traversal payloads must be intercepted before extraction begins. The pipeline validates that all resolved paths remain strictly within the parent container’s boundary and enforces a configurable recursion limit.
For compound documents, specialized extraction routines handle embedded streams without corrupting parent metadata. Mapping embedded OLE objects back to their parent containers ensures that legacy Office attachments retain their original creation timestamps and authorship metadata. Likewise, mapping ZIP archive contents into review platform hierarchies standardizes how flattened archive structures are represented in downstream review interfaces, preserving logical folder paths while stripping unsafe traversal sequences.
The diagram below shows a parent email decomposed into a tree, where a nested ZIP attachment expands into its own child documents while every node shares one family.
graph TD
P["Parent email MSG"] --> A1["Attachment spreadsheet"]
P --> A2["Attachment ZIP"]
A2 --> N1["Document one"]
A2 --> N2["Document two"]
A2 --> N3["Embedded OLE object"]
Production-Grade Implementation
The following implementation demonstrates a production-ready mapping engine. It enforces path traversal sanitization, applies memory-aware chunking, routes failures through a structured fallback mechanism, and maintains strict audit compliance via structured logging.
import asyncio
import hashlib
import logging
import os
import structlog
import zipfile
from dataclasses import dataclass
from pathlib import Path
from typing import AsyncIterator, List, Optional
from uuid import uuid4
# Structured logging configuration for audit compliance
structlog.configure(
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
],
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True
)
logger = structlog.get_logger()
@dataclass
class MappingRecord:
parent_id: str
child_id: str
relative_path: str
extraction_depth: int
content_hash: str
status: str
error_detail: Optional[str] = None
class AttachmentMappingPipeline:
def __init__(self, max_depth: int = 5, batch_size: int = 50, concurrency: int = 8):
self.max_depth = max_depth
self.batch_size = batch_size
self.semaphore = asyncio.Semaphore(concurrency)
self.logger = logger.bind(pipeline="attachment_mapping")
@staticmethod
async def compute_sha256(file_path: Path) -> str:
"""Stream-based SHA-256 calculation to prevent memory exhaustion."""
def _hash() -> str:
sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
while chunk := f.read(8192):
sha256.update(chunk)
return sha256.hexdigest()
# Offload the blocking read+hash loop to a worker thread so the event
# loop stays responsive during large extractions.
return await asyncio.to_thread(_hash)
def sanitize_path(self, base: Path, target: Path) -> Optional[Path]:
"""Resolve and validate a path to prevent traversal attacks."""
try:
base_resolved = base.resolve()
resolved = (base / target).resolve()
if not resolved.is_relative_to(base_resolved):
return None
return resolved
except (ValueError, RuntimeError, OSError):
return None
async def extract_and_map(
self, parent_id: str, archive_path: Path, current_depth: int = 0
) -> AsyncIterator[MappingRecord]:
"""Recursively extract archives with depth limits and safe path validation."""
if current_depth > self.max_depth:
yield MappingRecord(
parent_id=parent_id,
child_id=str(uuid4()),
relative_path=str(archive_path.name),
extraction_depth=current_depth,
content_hash="",
status="DEPTH_LIMIT_EXCEEDED",
error_detail="Extraction halted at recursion boundary"
)
return
try:
with zipfile.ZipFile(archive_path, "r") as zf:
for info in zf.infolist():
if info.is_dir():
continue
safe_path = self.sanitize_path(archive_path.parent, Path(info.filename))
if not safe_path:
yield MappingRecord(parent_id=parent_id, child_id=str(uuid4()),
relative_path=info.filename, extraction_depth=current_depth,
content_hash="", status="PATH_TRAVERSAL_BLOCKED")
continue
# Extract to the sanitized location, hash it, then clean up.
extracted_path = safe_path
try:
await asyncio.to_thread(zf.extract, info, archive_path.parent)
content_hash = await self.compute_sha256(extracted_path)
yield MappingRecord(parent_id=parent_id, child_id=str(uuid4()),
relative_path=info.filename, extraction_depth=current_depth,
content_hash=content_hash, status="MAPPED")
except Exception as e:
yield MappingRecord(parent_id=parent_id, child_id=str(uuid4()),
relative_path=info.filename, extraction_depth=current_depth,
content_hash="", status="EXTRACTION_FAILED",
error_detail=str(e))
finally:
if extracted_path.exists():
await asyncio.to_thread(os.remove, extracted_path)
except zipfile.BadZipFile:
yield MappingRecord(parent_id=parent_id, child_id=str(uuid4()),
relative_path=str(archive_path.name), extraction_depth=current_depth,
content_hash="", status="CORRUPT_ARCHIVE")
async def process_batch(self, parent_id: str, batch_paths: List[Path]) -> List[MappingRecord]:
"""Process a fixed-size chunk of containers under a concurrency bound."""
async with self.semaphore:
results: List[MappingRecord] = []
for path in batch_paths:
async for record in self.extract_and_map(parent_id, path):
results.append(record)
return results
async def run(self, parent_containers: List[Path]) -> AsyncIterator[MappingRecord]:
"""Execute the full pipeline with deterministic batch routing."""
for i in range(0, len(parent_containers), self.batch_size):
batch = parent_containers[i:i + self.batch_size]
self.logger.info("processing_batch", batch_start=i, batch_size=len(batch))
batch_results = await self.process_batch(str(uuid4()), batch)
for record in batch_results:
yield record
Deterministic Fallback Routing & Downstream Integration
The pipeline routes extraction failures through a structured fallback mechanism rather than halting execution. Records with EXTRACTION_FAILED, CORRUPT_ARCHIVE, or PATH_TRAVERSAL_BLOCKED statuses are flagged for manual review while maintaining the parent-child relationship in the audit log. This deterministic routing ensures that processing throughput remains stable even when encountering malformed or encrypted containers.
Once mapping completes, the structured family graph feeds directly into downstream analytical workflows. Conversation reconstruction engines leverage the established parent-child topology to execute Email Threading Algorithms, ensuring that attachments remain anchored to their originating messages during privilege review and production. The pipeline’s reliance on official Python concurrency primitives and streaming hash verification aligns with asyncio concurrency documentation and zipfile security best practices, providing a defensible, auditable foundation for enterprise-scale legal data processing.