Skip to content

Pipeline stages

The Cartex pipeline runs in two sequential stages: extraction and enrichment. The entry point is main.py::run(), which calls the extractor, then passes the result to the enricher along with a user-defined schema.

def run(file_path: str, page_numbers: list[int], schema: UserTableSchema) -> list[EnrichedRow]:
    extracted = extractor.extract(file_path, page_numbers[0])
    results = enricher.enrich(extracted, schema)
    return results

Stage 1: Extraction

The Extractor class in src/pipeline/extractor.py converts a PDF page into an ExtractionResult containing tables and contextual information.

Page rendering

The extractor renders a PDF page to a PNG image using PyMuPDF at the DPI configured in src/config.py (default 210). This produces the image bytes that Gemini processes.

def _pdf_to_image(self, file_path: str, page_number: int) -> bytes:
    doc = pymupdf.open(file_path)
    page = doc.load_page(page_number)
    pix = page.get_pixmap(dpi=config.dpi)
    return pix.tobytes()

Gemini vision calls

Two independent Gemini calls run against the rendered image:

  1. TABLE_EXTRACTION — Detects all tables on the page (main schedules and auxiliary reference tables), extracts headers and row data, assigns roles (MAIN, AUXILIARY, OTHER), and identifies the primary key column.
  2. CONTEXT_EXTRACTION — Detects all non-table contextual information: general notes, performance specs, code requirements, legend diagrams, item cards, and dimension drawings.

Both calls use the advanced Gemini model (gemini-3.1-pro-preview by default) and return structured JSON validated against Pydantic schemas (GeminiTableResult and GeminiContextResult).

Multi-page extraction

When multiple page numbers are provided, extract_pages() runs all pages concurrently via asyncio.gather(). Each page runs both Gemini calls in parallel (table + context). Results are merged with content-based deduplication on context items to avoid duplicates across overlapping pages.

Async page pipeline

For a single page, the async path (extract_async) runs both the table extraction and context extraction concurrently:

tables_result, contexts_result = await asyncio.gather(
    self._extract_tables_async(image_bytes),
    self._extract_context_async(image_bytes),
)

Stage 2: Enrichment

The Enricher class in src/pipeline/enricher.py takes an ExtractionResult and a UserTableSchema, then fills every target column for every main schedule row.

Routing

The enricher first calls the Router to determine which specialist strategies apply to the extraction data. The router sends a compact summary (table metadata, context snippets, schema columns) to the fast Gemini model and receives a GeminiRoutingResult listing the applicable StrategyType values.

If the router returns an empty list, the enricher falls back to the monolithic ENRICHMENT prompt — a single Gemini call that handles all enrichment in one pass.

Specialist execution

When strategies are selected, the enricher runs each specialist concurrently via asyncio.gather():

tasks = [
    self._run_specialist_async(extraction_result, schema, strategy)
    for strategy in strategies
]
specialist_results = await asyncio.gather(*tasks)

Each specialist receives the same input (tables with injected __row_id__ fields, context items, and the target schema) but uses a strategy-specific prompt that focuses its attention on one enrichment approach.

Merge

After all specialists complete, _merge_specialist_results() combines their outputs into a single list[EnrichedRow]. See Merge algorithm for details.

End-to-end flow

The following diagram shows both stages and their internal concurrency.

flowchart TB
    subgraph stage1["Stage 1: Extraction"]
        PDF["PDF page"] --> RENDER["PyMuPDF render<br/>(210 DPI)"]
        RENDER --> IMG["Image bytes"]
        IMG --> TABLE_CALL["TABLE_EXTRACTION<br/>(Gemini Pro)"]
        IMG --> CTX_CALL["CONTEXT_EXTRACTION<br/>(Gemini Pro)"]
        TABLE_CALL --> TABLES["list[TableModel]"]
        CTX_CALL --> CONTEXTS["list[ContextModel]"]
        TABLES --> ER["ExtractionResult"]
        CONTEXTS --> ER
    end

    subgraph stage2["Stage 2: Enrichment"]
        ER --> ROUTER["Router<br/>(Gemini Flash)"]
        ROUTER --> STRATS["list[StrategyType]"]
        STRATS --> S1["Specialist 1"]
        STRATS --> S2["Specialist 2"]
        STRATS --> SN["Specialist N"]
        S1 --> MERGE["_merge_specialist_results()"]
        S2 --> MERGE
        SN --> MERGE
        MERGE --> OUT["list[EnrichedRow]"]
    end

    stage1 --> stage2

Row ID assignment

Before passing tables to specialists, the enricher injects a __row_id__ field into every main schedule row via _assign_row_ids(). The ID is derived from the primary_key_column detected during extraction (e.g., Type Mark). Duplicate primary key values receive a numeric suffix (W1_2, W1_3). When no primary key column is detected, rows fall back to index-based IDs (table_0_0_row_0).

Specialists must copy the __row_id__ value verbatim into their output row_id field. The merge step uses this value to group outputs across specialists for the same row.