Skip to content

流水线阶段

Cartex 管道按顺序运行两个阶段:提取和富化。main.py 提供两个入口:

  • run():处理单个文档请求
  • run_batch():以多进程并行方式处理多文档批任务

run() 先调用提取器,再将结果连同用户定义的模式一起传递给富化器。

def run(
    file_path: str,
    page_numbers: list[int],
    schema: UserTableSchema,
    use_table_bbox_crop: bool = False,
    force_monolithic: bool = False,
) -> list[EnrichedRow]:
    if len(page_numbers) == 1:
        extracted = extractor.extract(
            file_path,
            page_numbers[0],
            use_table_bbox_crop=use_table_bbox_crop,
        )
    else:
        extracted = extractor.extract_pages(
            file_path,
            page_numbers,
            use_table_bbox_crop=use_table_bbox_crop,
        )

    if force_monolithic:
        results = enricher._enrich_monolithic(extracted, schema)
    else:
        results = enricher.enrich(extracted, schema)
    return results

对于批量工作负载,run_batch() 接收 list[RunJob],通过 ProcessPoolExecutor 并行执行任务,并按输入顺序返回 list[RunJobResult](包含行结果与每任务错误状态)。

运行开关

run() 暴露两个执行开关:

  • use_table_bbox_crop(默认 False):启用高精度表格提取,在阶段一增加“表格检测 + 按 bbox 裁剪提取”路径。
  • force_monolithic(默认 False):在阶段二绕过路由与专业策略分阶段执行,直接走单体富化。

在 Gradio 界面中,对应关系为:

  • High Accuracy Tables (BBox Crop) -> use_table_bbox_crop=True
  • Single Specialist Mode (Monolithic) -> force_monolithic=True

运行技术规格

Gemini 依赖

Cartex 在核心决策路径上依赖 Gemini:

  • 提取(TABLE_EXTRACTIONCONTEXT_EXTRACTION,以及可选的裁剪提取提示)
  • 路由(策略选择、阶段规划、上下文分配)
  • 富化(专业策略或单体生成,以及需要时的备注裁决)

该设计让系统围绕单一的结构化响应栈运行(提示词契约 + Pydantic 模式),降低集成复杂度,并在不同模式下保持稳定的输出结构体验。

API 扩展模型

API 调用量会随页数、检测到的表格数量以及富化路径动态变化。

设处理页数为 N

  • 标准提取:约 2N 次调用(每页 TABLE_EXTRACTION + CONTEXT_EXTRACTION)。
  • 高精度提取:约 2N + sum(T_i) 次调用,其中 T_i 为第 i 页检测到的表格数(TABLE_DETECTION + CONTEXT_EXTRACTION + 每个检测表格一次裁剪提取调用)。

富化阶段额外调用:

  • 分阶段模式:+1 次路由调用。
  • +S 次专业策略调用(S 为被选中的策略数量)。
  • 或单体模式 +1 次通用富化调用。
  • 另有可选备注裁决批处理调用(对需要语义综合的行,约为 ceil(R / B))。

延迟与 High-Accuracy Tables

High Accuracy Tables 会增加延迟,因为其执行“检测 + 按表格裁剪 + 高 DPI 推理”,而不是单次整页表格提取。该选项默认关闭,是为了在交互式使用中保持更快且更可预测的响应时间。遇到文档质量较差或表格结构复杂时,可按需启用以换取更高提取保真度。

Guardrails:硬约束与软约束

Cartex 采用混合 guardrail 策略:

  • 软约束(面向开放文本):提示词注入字段契约、基于权威矩阵的合并决策、备注综合策略。
  • 硬约束(面向边界域):模式结构保证、行标识约束、枚举后校验/兜底。

字段契约采用 definition + constraints 结构,将字段语义与行为规则集中管理,避免在各专业提示词中重复定义。 枚举域通过统一机制解析(字段级枚举策略 + 契约 type_values),并通过独立的 enum-constraints 区块注入提示词,避免对单一字段进行硬编码特判。

系统尽量避免在开放式字段上使用硬编码启发式约束。LLM 在自由文本场景天然存在非确定性,刚性下游启发式通常较脆弱:容易过拟合某一种表达,误伤合法变体,并在不同文档家族间引入回归。硬性约束仅用于可客观验证的有限域(例如枚举值集合)。

阶段一:提取

src/pipeline/extractor.py 中的 Extractor 类将 PDF 页面转换为包含表格和上下文信息的 ExtractionResult

页面渲染

提取器使用 PyMuPDF 以 src/config.py 中配置的 DPI(默认 210)将 PDF 页面渲染为 PNG 图像,产生 Gemini 处理所需的图像字节。

def _pdf_to_image(self, file_path: str, page_number: int) -> bytes:
    doc = pymupdf.open(file_path)
    page = doc.load_page(page_number)
    pix = page.get_pixmap(dpi=config.dpi)
    return pix.tobytes()

Gemini 视觉调用

表格提取有两种运行模式:

  1. 标准模式use_table_bbox_crop=False
    执行一次整页 TABLE_EXTRACTION(高级模型),并转换为 TableModel
  2. 高精度模式use_table_bbox_crop=True
    先执行 TABLE_DETECTION(高级模型),再对每个检测到的表格 bbox 进行高 DPI 裁剪,并对裁剪图执行 TABLE_EXTRACTION_FROM_CROP(快速模型)。

若高精度检测/裁剪提取失败或未产出可用表格,会回退到整页 TABLE_EXTRACTION

上下文提取始终在整页渲染图上执行 CONTEXT_EXTRACTION(高级模型)。

多页提取

提供多个页码时,extract_pages() 通过 asyncio.gather() 并发处理所有页面。每页并行运行表格提取和上下文提取两次调用。结果合并时,对上下文条目进行基于内容的去重,以避免跨页重复。

异步页面管道

对于单页,异步路径(extract_async)并发运行表格提取和上下文提取:

tables_result, contexts_result = await asyncio.gather(
    self._extract_tables_async(image_bytes),
    self._extract_context_async(image_bytes),
)

use_table_bbox_crop=True 时,表格任务会从 _extract_tables_async(...) 切换为 _extract_tables_with_bbox_detection_async(...)

阶段二:富化

src/pipeline/enricher.py 中的 Enricher 类接收 ExtractionResultUserTableSchema,为每个主清单行填充所有目标列。

路由

富化器首先调用 Router 规划富化执行。路由器将提取数据的紧凑摘要(表格元数据、含 context_id 值的上下文片段、模式列)发送至高级 Gemini 模型,接收包含以下内容的 GeminiRoutingResult

  • strategies — 应运行哪些专业策略
  • execution_order — 具有依赖感知的分阶段执行计划
  • context_assignments — 每个专业策略接收哪些上下文条目

单体 ENRICHMENT 路径在两种情况下启用:

  1. force_monolithic=True(显式选择)
  2. 路由器返回空 strategies(自动回退)

两种情况下都采用单次 Gemini 调用,不执行分阶段专业策略流程。

分阶段执行

选定策略后,富化器按路由器规划的阶段执行。每个阶段通过 asyncio.gather() 并发运行其专业策略,但阶段之间按顺序执行,以便后续阶段可以访问前面阶段的输出。

for stage_idx, stage_strategies in enumerate(execution_order):
    # 为每个专业策略构建过滤后的内容
    # 并发运行本阶段所有专业策略
    stage_results = await asyncio.gather(*tasks)
    # 累积所有已完成阶段输出
    all_strategy_outputs.update(...)
    # 基于字段级权威矩阵对“截至当前阶段的全部输出”统一解析
    prior_enrichment, note_observations = self.merge_resolver.resolve(...)

每个专业策略接收过滤后的输入:

  • 表格:始终包含主表;仅 auxiliary_tabletext_rule 策略包含辅助表
  • 上下文:仅包含 context_id 出现在 context_assignments[strategy] 中的上下文条目
  • 前序富化:如果不是第一个阶段,注入包含所有前面阶段合并输出的 <prior_enrichment> JSON 块

模型层级

路由器使用 ModelType.ADVANCED(Gemini Pro)执行复杂的规划任务。所有专业策略使用 ModelType.FAST(Gemini Flash),因为它们接收预过滤、预分配的有效载荷和明确定义的任务。

合并

每个阶段完成后,富化器会对“所有已完成阶段输出”执行一次全局字段级解析:

  1. 结构化字段MergeResolverFIELD_AUTHORITY_MATRIX(全局 + 模板覆盖)决策。
  2. Special Notes 观察项 先收集,最终阶段后再统一语义综合。

最终阶段后:

  1. 运行 SpecialNotesAdjudicator.apply() 生成按行的语义 bullet 备注。
  2. 运行枚举后校验(_apply_enum_validations),将非法枚举值兜底为 Other: ... 并记录 validation_flags

详见合并算法中的评分与备注裁决规则。

端到端流程

下图展示了两个阶段及其内部并发结构。

flowchart TB
    subgraph stage1["阶段一:提取"]
        PDF["PDF 页面"] --> RENDER["PyMuPDF 渲染<br/>(210 DPI)"]
        RENDER --> IMG["图像字节"]
        IMG --> TABLE_MODE["表格提取模式"]
        TABLE_MODE --> TABLE_CALL["标准:TABLE_EXTRACTION<br/>(Gemini Pro)"]
        TABLE_MODE --> DETECT["高精度:TABLE_DETECTION<br/>(Gemini Pro)"]
        DETECT --> CROP["TABLE_EXTRACTION_FROM_CROP<br/>(Gemini Flash,按 bbox)"]
        CROP --> TABLES["list[TableModel]"]
        IMG --> CTX_CALL["CONTEXT_EXTRACTION<br/>(Gemini Pro)"]
        TABLE_CALL --> TABLES
        CTX_CALL --> CONTEXTS["list[ContextModel]"]
        TABLES --> ER["ExtractionResult"]
        CONTEXTS --> ER
    end

    subgraph stage2["阶段二:富化"]
        ER --> ROUTER["路由器<br/>(Gemini Pro)"]
        ROUTER --> PLAN["execution_order +<br/>context_assignments"]
        PLAN --> S1["阶段 1 专业策略<br/>(Gemini Flash, 并发)"]
        S1 --> M1["MergeResolver<br/>(字段级权威解析)"]
        M1 --> S2["阶段 2 专业策略<br/>(Gemini Flash, 并发)"]
        S2 --> M2["MergeResolver<br/>(对全部已完成输出重解析)"]
        M2 --> SN["阶段 N 专业策略<br/>(Gemini Flash, 并发)"]
        SN --> MN["MergeResolver"]
        MN --> ADJ["SpecialNotesAdjudicator<br/>(语义 bullet)"]
        ADJ --> VALID["枚举校验<br/>(Other: ... 兜底)"]
        ER --> MONO["单体 ENRICHMENT 提示词<br/>(可选路径)"]
        MONO --> VALID
        VALID --> OUT["list[EnrichedRow]"]
    end

    stage1 --> stage2

行 ID 分配

在将表格传递给专业策略之前,富化器通过 _assign_row_ids() 向每条主清单行注入 __row_id__ 字段。该 ID 来源于提取阶段检测到的 primary_key_column(如 Type Mark)。主键值重复时附加数字后缀(W1_2W1_3)。若未检测到主键列,行将回退至基于索引的 ID(table_0_0_row_0)。

专业策略必须将 __row_id__ 值原样复制至输出的 row_id 字段。合并步骤使用该值跨策略聚合同一行的输出。