Pipeline stages¶
The Cartex pipeline runs in two sequential stages: extraction and enrichment. The entry point is main.py::run(), which calls the extractor, then passes the result to the enricher along with a user-defined schema.
def run(file_path: str, page_numbers: list[int], schema: UserTableSchema) -> list[EnrichedRow]:
extracted = extractor.extract(file_path, page_numbers[0])
results = enricher.enrich(extracted, schema)
return results
Stage 1: Extraction¶
The Extractor class in src/pipeline/extractor.py converts a PDF page into an ExtractionResult containing tables and contextual information.
Page rendering¶
The extractor renders a PDF page to a PNG image using PyMuPDF at the DPI configured in src/config.py (default 210). This produces the image bytes that Gemini processes.
def _pdf_to_image(self, file_path: str, page_number: int) -> bytes:
doc = pymupdf.open(file_path)
page = doc.load_page(page_number)
pix = page.get_pixmap(dpi=config.dpi)
return pix.tobytes()
Gemini vision calls¶
Two independent Gemini calls run against the rendered image:
TABLE_EXTRACTION— Detects all tables on the page (main schedules and auxiliary reference tables), extracts headers and row data, assigns roles (MAIN,AUXILIARY,OTHER), and identifies the primary key column.CONTEXT_EXTRACTION— Detects all non-table contextual information: general notes, performance specs, code requirements, legend diagrams, item cards, and dimension drawings.
Both calls use the advanced Gemini model (gemini-3.1-pro-preview by default) and return structured JSON validated against Pydantic schemas (GeminiTableResult and GeminiContextResult).
Multi-page extraction¶
When multiple page numbers are provided, extract_pages() runs all pages concurrently via asyncio.gather(). Each page runs both Gemini calls in parallel (table + context). Results are merged with content-based deduplication on context items to avoid duplicates across overlapping pages.
Async page pipeline¶
For a single page, the async path (extract_async) runs both the table extraction and context extraction concurrently:
tables_result, contexts_result = await asyncio.gather(
self._extract_tables_async(image_bytes),
self._extract_context_async(image_bytes),
)
Stage 2: Enrichment¶
The Enricher class in src/pipeline/enricher.py takes an ExtractionResult and a UserTableSchema, then fills every target column for every main schedule row.
Routing¶
The enricher first calls the Router to determine which specialist strategies apply to the extraction data. The router sends a compact summary (table metadata, context snippets, schema columns) to the fast Gemini model and receives a GeminiRoutingResult listing the applicable StrategyType values.
If the router returns an empty list, the enricher falls back to the monolithic ENRICHMENT prompt — a single Gemini call that handles all enrichment in one pass.
Specialist execution¶
When strategies are selected, the enricher runs each specialist concurrently via asyncio.gather():
tasks = [
self._run_specialist_async(extraction_result, schema, strategy)
for strategy in strategies
]
specialist_results = await asyncio.gather(*tasks)
Each specialist receives the same input (tables with injected __row_id__ fields, context items, and the target schema) but uses a strategy-specific prompt that focuses its attention on one enrichment approach.
Merge¶
After all specialists complete, _merge_specialist_results() combines their outputs into a single list[EnrichedRow]. See Merge algorithm for details.
End-to-end flow¶
The following diagram shows both stages and their internal concurrency.
flowchart TB
subgraph stage1["Stage 1: Extraction"]
PDF["PDF page"] --> RENDER["PyMuPDF render<br/>(210 DPI)"]
RENDER --> IMG["Image bytes"]
IMG --> TABLE_CALL["TABLE_EXTRACTION<br/>(Gemini Pro)"]
IMG --> CTX_CALL["CONTEXT_EXTRACTION<br/>(Gemini Pro)"]
TABLE_CALL --> TABLES["list[TableModel]"]
CTX_CALL --> CONTEXTS["list[ContextModel]"]
TABLES --> ER["ExtractionResult"]
CONTEXTS --> ER
end
subgraph stage2["Stage 2: Enrichment"]
ER --> ROUTER["Router<br/>(Gemini Flash)"]
ROUTER --> STRATS["list[StrategyType]"]
STRATS --> S1["Specialist 1"]
STRATS --> S2["Specialist 2"]
STRATS --> SN["Specialist N"]
S1 --> MERGE["_merge_specialist_results()"]
S2 --> MERGE
SN --> MERGE
MERGE --> OUT["list[EnrichedRow]"]
end
stage1 --> stage2
Row ID assignment¶
Before passing tables to specialists, the enricher injects a __row_id__ field into every main schedule row via _assign_row_ids(). The ID is derived from the primary_key_column detected during extraction (e.g., Type Mark). Duplicate primary key values receive a numeric suffix (W1_2, W1_3). When no primary key column is detected, rows fall back to index-based IDs (table_0_0_row_0).
Specialists must copy the __row_id__ value verbatim into their output row_id field. The merge step uses this value to group outputs across specialists for the same row.