Skip to content

Pipeline stages

The Cartex pipeline runs in two sequential stages: extraction and enrichment. main.py exposes two entry points:

  • run() for one document request
  • run_batch() for process-parallel multi-document execution

run() calls the extractor, then passes the result to the enricher along with a user-defined schema.

def run(
    file_path: str,
    page_numbers: list[int],
    schema: UserTableSchema,
    use_table_bbox_crop: bool = False,
    force_monolithic: bool = False,
) -> list[EnrichedRow]:
    if len(page_numbers) == 1:
        extracted = extractor.extract(
            file_path,
            page_numbers[0],
            use_table_bbox_crop=use_table_bbox_crop,
        )
    else:
        extracted = extractor.extract_pages(
            file_path,
            page_numbers,
            use_table_bbox_crop=use_table_bbox_crop,
        )

    if force_monolithic:
        results = enricher._enrich_monolithic(extracted, schema)
    else:
        results = enricher.enrich(extracted, schema)
    return results

For batched workloads, run_batch() accepts a list[RunJob] and executes jobs with ProcessPoolExecutor, returning ordered list[RunJobResult] records (rows + per-job error state).

Runtime switches

run() exposes two execution switches:

  • use_table_bbox_crop (default False): enables high-accuracy table extraction. This adds a table detection + per-bbox crop extraction path in Stage 1.
  • force_monolithic (default False): bypasses router/specialist staging and runs monolithic enrichment directly in Stage 2.

In the Gradio UI, these map to:

  • High Accuracy Tables (BBox Crop) -> use_table_bbox_crop=True
  • Single Specialist Mode (Monolithic) -> force_monolithic=True

Operational specifications

Gemini dependency

Cartex relies on Gemini for the system's core decisioning path:

  • extraction (TABLE_EXTRACTION, CONTEXT_EXTRACTION, and optional crop extraction prompts)
  • routing (strategy selection, stage planning, context assignment)
  • enrichment (specialist or monolithic generation, plus notes adjudication when needed)

This design keeps the pipeline behavior consistent around one structured-response stack (prompt contracts + Pydantic schemas). In practice, it reduces integration complexity and gives users predictable output shape across modes.

API scaling model

API call volume scales with page count, detected table count, and selected enrichment path.

For N pages:

  • Standard extraction: about 2N calls (TABLE_EXTRACTION + CONTEXT_EXTRACTION per page).
  • High-accuracy extraction: about 2N + sum(T_i) calls, where T_i is detected tables on page i (TABLE_DETECTION + CONTEXT_EXTRACTION + one crop extraction call per detected table).

Enrichment adds:

  • +1 routing call in staged mode.
  • +S specialist calls, where S is selected strategies.
  • or +1 monolithic enrichment call when monolithic mode is selected.
  • plus optional notes adjudication calls in batches (ceil(R / B) for rows that require semantic synthesis).

Latency and High-Accuracy Tables

High Accuracy Tables increases latency because it adds detection and per-table crop inference at high DPI, rather than a single full-page table extraction call. It is disabled by default to keep baseline runs fast and predictable for interactive use. Users can enable it when document quality or table layout complexity requires better extraction fidelity.

Guardrails: hard vs soft

Cartex uses a hybrid guardrail strategy:

  • Soft guardrails for open-text generation: prompt-injected field contracts, authority-ranked merge resolution, and notes synthesis policies.
  • Hard guardrails for bounded domains: schema shape guarantees, row identity requirements, and enum post-validation/coercion.

Field contracts are structured as definition + constraints so semantic meaning and behavioral rules stay centralized and non-duplicative across specialist prompts. Enum domains are resolved centrally (field enum policies + contract type_values) and injected through a dedicated enum-constraints block, so enum behavior is not hardcoded per single field.

Hard heuristic guardrails are intentionally avoided for most open-style fields. LLM outputs are non-deterministic in free-form text, and rigid downstream heuristics are brittle: they often overfit one phrasing pattern, reject valid variants, and regress across document families. Hard enforcement is used where correctness is objectively checkable (for example enums with finite allowed values).

Stage 1: Extraction

The Extractor class in src/pipeline/extractor.py converts a PDF page into an ExtractionResult containing tables and contextual information.

Page rendering

The extractor renders a PDF page to a PNG image using PyMuPDF at the DPI configured in src/config.py (default 210). This produces the image bytes that Gemini processes.

def _pdf_to_image(self, file_path: str, page_number: int) -> bytes:
    doc = pymupdf.open(file_path)
    page = doc.load_page(page_number)
    pix = page.get_pixmap(dpi=config.dpi)
    return pix.tobytes()

Gemini vision calls

Table extraction has two operating modes:

  1. Standard mode (use_table_bbox_crop=False)
    Runs one full-page TABLE_EXTRACTION call (advanced model) and converts the result to TableModel objects.
  2. High-accuracy mode (use_table_bbox_crop=True)
    Runs TABLE_DETECTION (advanced model), crops each detected table bbox at high DPI, then runs TABLE_EXTRACTION_FROM_CROP (fast model) per crop.

If high-accuracy detection/crop extraction fails or returns no usable tables, extraction falls back to full-page TABLE_EXTRACTION.

Context extraction always runs CONTEXT_EXTRACTION (advanced model) on the full rendered page image.

Multi-page extraction

When multiple page numbers are provided, extract_pages() runs all pages concurrently via asyncio.gather(). Each page runs both Gemini calls in parallel (table + context). Results are merged with content-based deduplication on context items to avoid duplicates across overlapping pages.

Async page pipeline

For a single page, the async path (extract_async) runs both the table extraction and context extraction concurrently:

tables_result, contexts_result = await asyncio.gather(
    self._extract_tables_async(image_bytes),
    self._extract_context_async(image_bytes),
)

When use_table_bbox_crop=True, the table task switches from _extract_tables_async(...) to _extract_tables_with_bbox_detection_async(...).

Stage 2: Enrichment

The Enricher class in src/pipeline/enricher.py takes an ExtractionResult and a UserTableSchema, then fills every target column for every main schedule row.

Routing

The enricher first calls the Router to plan enrichment execution. The router sends a compact summary (table metadata, context snippets with context_id values, schema columns) to the advanced Gemini model and receives a GeminiRoutingResult containing:

  • strategies — which specialist strategies to run
  • execution_order — staged execution plan with dependency awareness
  • context_assignments — which context items each specialist receives

The monolithic ENRICHMENT path is used in two cases:

  1. force_monolithic=True (explicit selection)
  2. router returns an empty strategies list (automatic fallback)

In both cases, enrichment runs as a single Gemini call without staged specialist execution.

Staged execution

When strategies are selected, the enricher executes them in stages as planned by the router. Each stage runs its specialists concurrently via asyncio.gather(), but stages execute sequentially so that later stages can access earlier stage output.

for stage_idx, stage_strategies in enumerate(execution_order):
    # Build filtered contents per specialist
    # Run all specialists in this stage concurrently
    stage_results = await asyncio.gather(*tasks)
    # Accumulate outputs from all completed stages
    all_strategy_outputs.update(...)
    # Resolve rows using field-level authority across all outputs so far
    prior_enrichment, note_observations = self.merge_resolver.resolve(...)

Each specialist receives filtered input:

  • Tables: main tables always included; auxiliary tables included only for auxiliary_table and text_rule strategies
  • Context: only the context items whose context_id appears in context_assignments[strategy]
  • Prior enrichment: if not the first stage, a <prior_enrichment> JSON block is injected containing the merged output from all previous stages

Model tiers

The router uses ModelType.ADVANCED (Gemini Pro) for its complex planning task. All specialists use ModelType.FAST (Gemini Flash) since they receive pre-filtered, pre-assigned payloads with well-scoped tasks.

Merge

After each stage completes, the enricher re-runs global field-level resolution using all completed stage outputs:

  1. Structured fields are resolved by MergeResolver using FIELD_AUTHORITY_MATRIX rankings (global + template overrides).
  2. Special Notes observations are collected but synthesized only after the final stage.

After the final stage:

  1. SpecialNotesAdjudicator.apply() generates semantic bullet notes per row.
  2. Enum post-validation (_apply_enum_validations) coerces invalid enum values to Other: ... and appends validation_flags.

See Merge algorithm for scoring and adjudication details.

End-to-end flow

The following diagram shows both stages and their internal concurrency.

flowchart TB
    subgraph stage1["Stage 1: Extraction"]
        PDF["PDF page"] --> RENDER["PyMuPDF render<br/>(210 DPI)"]
        RENDER --> IMG["Image bytes"]
        IMG --> TABLE_MODE["Table extraction mode"]
        TABLE_MODE --> TABLE_CALL["Standard: TABLE_EXTRACTION<br/>(Gemini Pro)"]
        TABLE_MODE --> DETECT["High-accuracy: TABLE_DETECTION<br/>(Gemini Pro)"]
        DETECT --> CROP["TABLE_EXTRACTION_FROM_CROP<br/>(Gemini Flash, per bbox)"]
        CROP --> TABLES["list[TableModel]"]
        IMG --> CTX_CALL["CONTEXT_EXTRACTION<br/>(Gemini Pro)"]
        TABLE_CALL --> TABLES
        CTX_CALL --> CONTEXTS["list[ContextModel]"]
        TABLES --> ER["ExtractionResult"]
        CONTEXTS --> ER
    end

    subgraph stage2["Stage 2: Enrichment"]
        ER --> ROUTER["Router<br/>(Gemini Pro)"]
        ROUTER --> PLAN["execution_order +<br/>context_assignments"]
        PLAN --> S1["Stage 1 specialists<br/>(Gemini Flash, concurrent)"]
        S1 --> M1["MergeResolver<br/>(field-level authority)"]
        M1 --> S2["Stage 2 specialists<br/>(Gemini Flash, concurrent)"]
        S2 --> M2["MergeResolver<br/>(re-resolve all completed outputs)"]
        M2 --> SN["Stage N specialists<br/>(Gemini Flash, concurrent)"]
        SN --> MN["MergeResolver"]
        MN --> ADJ["SpecialNotesAdjudicator<br/>(semantic bullets)"]
        ADJ --> VALID["Enum validation<br/>(Other: ... fallback)"]
        ER --> MONO["Monolithic ENRICHMENT prompt<br/>(optional path)"]
        MONO --> VALID
        VALID --> OUT["list[EnrichedRow]"]
    end

    stage1 --> stage2

Row ID assignment

Before passing tables to specialists, the enricher injects a __row_id__ field into every main schedule row via _assign_row_ids(). The ID is derived from the primary_key_column detected during extraction (e.g., Type Mark). Duplicate primary key values receive a numeric suffix (W1_2, W1_3). When no primary key column is detected, rows fall back to index-based IDs (table_0_0_row_0).

Specialists must copy the __row_id__ value verbatim into their output row_id field. The merge step uses this value to group outputs across specialists for the same row.