Extraction Pipelines#

ExtractionPipeline is a powerful component that enables you to create reusable collections of predefined aspects and concepts for consistent document analysis. Pipelines serve as templates that can be applied to multiple documents, ensuring standardized data extraction across your application.

πŸ“ Overview#

Extraction pipelines package common extraction patterns into reusable units, allowing you to:

  • Standardize document processing: Define a consistent set of aspects and concepts once, then apply them to multiple documents

  • Create reusable templates: Build domain-specific pipelines (e.g., contract analysis, invoice processing, report analysis)

  • Ensure consistent analysis: Maintain uniform extraction criteria across document batches

  • Simplify workflow management: Organize complex extraction workflows into manageable, reusable components

Pipelines are particularly valuable when processing multiple documents of the same type, where you need to extract the same categories of information consistently.

⭐ Key Features#

Template-Based Extraction#

Pipelines act as extraction templates that define what information to extract from documents. Once created, a pipeline can be assigned to any number of documents, ensuring consistent analysis criteria.

Aspect and Concept Organization#

Pipelines can contain both:

  • Aspects: For extracting document sections and organizing content hierarchically

  • Concepts: For extracting specific data points with intelligent inference

This allows you to create comprehensive extraction workflows that combine broad content organization with detailed data extraction.

Reusability and Scalability#

A single pipeline can be applied to multiple documents, making it ideal for batch processing, automated workflows, and applications that need to process similar document types repeatedly.

πŸ’» Basic Usage#

Simple Pipeline Creation#

Here’s how to create and use a basic extraction pipeline:

from contextgem import (
    Aspect,
    BooleanConcept,
    DateConcept,
    Document,
    ExtractionPipeline,
    StringConcept,
)


# Create a pipeline for NDA (Non-Disclosure Agreement) review
nda_pipeline = ExtractionPipeline(
    aspects=[
        Aspect(
            name="Confidential information",
            description="Clauses defining the confidential information",
        ),
        Aspect(
            name="Exclusions",
            description="Clauses defining exclusions from confidential information",
        ),
        Aspect(
            name="Obligations",
            description="Clauses defining confidentiality obligations",
        ),
        Aspect(
            name="Liability",
            description="Clauses defining liability for breach of the agreement",
        ),
        # ... Add more aspects as needed
    ],
    concepts=[
        StringConcept(
            name="Anomaly",
            description="Anomaly in the contract, e.g. out-of-context or nonsensical clauses",
            llm_role="reasoner_text",
            add_references=True,  # Add references to the source text
            reference_depth="sentences",  # Reference to the sentence level
            add_justifications=True,  # Add justifications for the anomaly
            justification_depth="balanced",  # Justification at the sentence level
            justification_max_sents=5,  # Maximum number of sentences in the justification
        ),
        BooleanConcept(
            name="Is mutual",
            description="Whether the NDA is mutual (bidirectional) or one-way",
            singular_occurrence=True,
            llm_role="reasoner_text",  # Use the reasoner role for this concept
        ),
        DateConcept(
            name="Effective date",
            description="The date when the NDA agreement becomes effective",
            singular_occurrence=True,
        ),
        StringConcept(
            name="Term",
            description="The term of the NDA",
        ),
        StringConcept(
            name="Governing law",
            description="The governing law of the agreement",
            singular_occurrence=True,
        ),
        # ... Add more concepts as needed
    ],
)

# Assign the pipeline to the NDA document
nda_document = Document(raw_text="[NDA text]")
nda_document.assign_pipeline(nda_pipeline)

# Now the document is ready for processing with the NDA review pipeline!
# The document can be processed to extract the defined aspects and concepts

# Extract all aspects and concepts from the NDA using an LLM group
# with LLMs with roles "extractor_text" and "reasoner_text".
# llm_group.extract_all(nda_document)

Pipeline Assignment to Documents#

Once created, pipelines can be easily assigned to documents:

from contextgem import Document, ExtractionPipeline

# Create your pipeline
my_pipeline = ExtractionPipeline(aspects=[...], concepts=[...])

# Create documents
doc1 = Document(raw_text="First document content...")
doc2 = Document(raw_text="Second document content...")

# Assign the same pipeline to multiple documents
doc1.assign_pipeline(my_pipeline)
doc2.assign_pipeline(my_pipeline)

# Now both documents have the same extraction configuration

βš™οΈ Parameters#

When creating an ExtractionPipeline, you can configure the following parameters:

Parameter

Type

Default Value

Description

aspects

list[Aspect]

[]

Optional. List of Aspect instances to extract from documents. Aspects represent structural categories of information and can contain their own sub-aspects and concepts for detailed analysis. See Aspect Extraction for more information.

concepts

list[_Concept]

[]

Optional. List of _Concept instances to identify within or infer from documents. These are document-level concepts that apply to the entire document content. See supported concept types in Supported Concepts.

πŸ“Š Pipeline Assignment#

The assign_pipeline() method is used to apply a pipeline to a document. This method:

  • Assigns aspects and concepts: Transfers the pipeline’s aspects and concepts to the document

  • Validates compatibility: Ensures no conflicts with existing document configuration

Assignment Options#

# Basic assignment (will raise error if document already has aspects/concepts)
document.assign_pipeline(my_pipeline)

# Overwrite existing configuration
document.assign_pipeline(my_pipeline, overwrite_existing=True)

πŸš€ Advanced Usage#

Multi-Document Processing#

Pipelines excel at processing multiple documents of the same type. Here’s a comprehensive example:

# Advanced Usage Example - analyzing multiple documents with a single pipeline,
# with different LLMs, concurrency and cost tracking

import os

from contextgem import (
    Aspect,
    DateConcept,
    Document,
    DocumentLLM,
    DocumentLLMGroup,
    ExtractionPipeline,
    JsonObjectConcept,
    JsonObjectExample,
    LLMPricing,
    NumericalConcept,
    RatingConcept,
    StringConcept,
    StringExample,
)


# Construct documents

# Document 1 - Consultancy Agreement (shortened for brevity)
doc1 = Document(
    raw_text=(
        "Consultancy Agreement\n"
        "This agreement between Company A (Supplier) and Company B (Customer)...\n"
        "The term of the agreement is 1 year from the Effective Date...\n"
        "The Supplier shall provide consultancy services as described in Annex 2...\n"
        "The Customer shall pay the Supplier within 30 calendar days of receiving an invoice...\n"
        "All intellectual property created during the provision of services shall belong to the Customer...\n"
        "This agreement is governed by the laws of Norway...\n"
        "Annex 1: Data processing agreement...\n"
        "Annex 2: Statement of Work...\n"
        "Annex 3: Service Level Agreement...\n"
    ),
)

# Document 2 - Service Level Agreement (shortened for brevity)
doc2 = Document(
    raw_text=(
        "Service Level Agreement\n"
        "This agreement between TechCorp (Provider) and GlobalInc (Client)...\n"
        "The agreement shall commence on January 1, 2023 and continue for 2 years...\n"
        "The Provider shall deliver IT support services as outlined in Schedule A...\n"
        "The Client shall make monthly payments of $5,000 within 15 days of invoice receipt...\n"
        "The Provider guarantees [99.9%] uptime for all critical systems...\n"
        "Either party may terminate with 60 days written notice...\n"
        "This agreement is governed by the laws of California...\n"
        "Schedule A: Service Descriptions...\n"
        "Schedule B: Response Time Requirements...\n"
    ),
)

# Create a reusable extraction pipeline
contract_pipeline = ExtractionPipeline()

# Define aspects and aspect-level concepts in the pipeline
# Concepts in the aspects will be extracted from the extracted aspect context
contract_pipeline.aspects = [  # or use .add_aspects([...])
    Aspect(
        name="Contract Parties",
        description="Clauses defining the parties to the agreement",
        concepts=[  # define aspect-level concepts, if any
            StringConcept(
                name="Party names and roles",
                description="Names of all parties entering into the agreement and their roles",
                examples=[  # optional
                    StringExample(
                        content="X (Client)",  # guidance regarding the expected output format
                    )
                ],
            )
        ],
    ),
    Aspect(
        name="Term",
        description="Clauses defining the term of the agreement",
        concepts=[
            NumericalConcept(
                name="Contract term",
                description="The term of the agreement in years",
                numeric_type="int",  # or "float", or "any" for auto-detection
                add_references=True,  # extract references to the source text
                reference_depth="paragraphs",
            )
        ],
    ),
]

# Define document-level concepts
# Concepts in the document will be extracted from the whole document content
contract_pipeline.concepts = [  # or use .add_concepts()
    DateConcept(
        name="Effective date",
        description="The effective date of the agreement",
    ),
    StringConcept(
        name="Contract type",
        description="The type of agreement",
        llm_role="reasoner_text",  # for this concept, we use a more advanced LLM for reasoning
    ),
    StringConcept(
        name="Governing law",
        description="The law that governs the agreement",
    ),
    JsonObjectConcept(
        name="Attachments",
        description="The titles and concise descriptions of the attachments to the agreement",
        structure={"title": str, "description": str | None},
        examples=[  # optional
            JsonObjectExample(  # guidance regarding the expected output format
                content={
                    "title": "Appendix A",
                    "description": "Code of conduct",
                }
            ),
        ],
    ),
    RatingConcept(
        name="Duration adequacy",
        description="Contract duration adequacy considering the subject matter and best practices.",
        llm_role="reasoner_text",  # for this concept, we use a more advanced LLM for reasoning
        rating_scale=(1, 10),
        add_justifications=True,  # add justifications for the rating
        justification_depth="balanced",  # provide a balanced justification
        justification_max_sents=3,
    ),
]

# Assign pipeline to the documents
# You can re-use the same pipeline for multiple documents
doc1.assign_pipeline(
    contract_pipeline
)  # assigns pipeline aspects and concepts to the document
doc2.assign_pipeline(
    contract_pipeline
)  # assigns pipeline aspects and concepts to the document

# Create an LLM group for data extraction and reasoning
llm_extractor = DocumentLLM(
    model="openai/gpt-4o-mini",  # or any other LLM from e.g. Anthropic, etc.
    api_key=os.environ["CONTEXTGEM_OPENAI_API_KEY"],  # your API key
    role="extractor_text",  # signifies the LLM is used for data extraction tasks
    pricing_details=LLMPricing(  # optional, for costs calculation
        input_per_1m_tokens=0.150,
        output_per_1m_tokens=0.600,
    ),
)
llm_reasoner = DocumentLLM(
    model="openai/o3-mini",  # or any other LLM from e.g. Anthropic, etc.
    api_key=os.environ["CONTEXTGEM_OPENAI_API_KEY"],  # your API key
    role="reasoner_text",  # signifies the LLM is used for reasoning tasks
    pricing_details=LLMPricing(  # optional, for costs calculation
        input_per_1m_tokens=1.10,
        output_per_1m_tokens=4.40,
    ),
)
# The LLM group is used for all extraction tasks within the pipeline
llm_group = DocumentLLMGroup(llms=[llm_extractor, llm_reasoner])

# Extract all information from the documents at once
doc1 = llm_group.extract_all(
    doc1, use_concurrency=True
)  # use concurrency to speed up extraction
doc2 = llm_group.extract_all(
    doc2, use_concurrency=True
)  # use concurrency to speed up extraction
# Or use async variants .extract_all_async(...)

# Get the extracted data
print("Some extracted data from doc 1:")
print("Contract Parties > Party names and roles:")
print(
    doc1.get_aspect_by_name("Contract Parties")
    .get_concept_by_name("Party names and roles")
    .extracted_items
)
print("Attachments:")
print(doc1.get_concept_by_name("Attachments").extracted_items)
# ...

print("\nSome extracted data from doc 2:")
print("Term > Contract term:")
print(
    doc2.get_aspect_by_name("Term")
    .get_concept_by_name("Contract term")
    .extracted_items[0]
    .value
)
print("Duration adequacy:")
print(doc2.get_concept_by_name("Duration adequacy").extracted_items[0].value)
print(doc2.get_concept_by_name("Duration adequacy").extracted_items[0].justification)
# ...

# Output processing costs (requires setting the pricing details for each LLM)
print("\nProcessing costs:")
print(llm_group.get_cost())
Open In Colab

Pipeline Serialization#

Pipelines can be serialized for storage and later reuse:

# Serialize the pipeline
pipeline_json = pipeline.to_json()  # or to_dict() / to_disk()

# Deserialize the pipeline
pipeline_deserialized = ExtractionPipeline.from_json(
    pipeline_json
)  # or from_dict() / from_disk()

πŸ’‘ Best Practices#

Pipeline Design#

  • Domain-specific organization: Create pipelines tailored to specific document types (contracts, invoices, reports, etc.)

  • Logical grouping: Group related aspects and concepts together for coherent analysis

  • Reusable templates: Design pipelines to be generic enough for reuse across similar documents

Concept Placement Strategy#

  • Document-level concepts: Place concepts that apply to the entire document in the pipeline’s concepts list

  • Aspect-level concepts: Place concepts that are specific to particular document sections within the relevant aspects

  • Avoid duplication: Don’t create similar concepts at both document and aspect levels

🎯 Example Use Cases#

Invoice Processing Pipeline#

invoice_pipeline = ExtractionPipeline(
    concepts=[
        StringConcept(name="Vendor Name", description="Name of the vendor/supplier"),
        StringConcept(name="Invoice Number", description="Unique invoice identifier"),
        DateConcept(name="Invoice Date", description="Date the invoice was issued"),
        DateConcept(name="Due Date", description="Payment due date"),
        NumericalConcept(name="Total Amount", description="Total invoice amount"),
        StringConcept(name="Currency", description="Currency of the invoice"),
    ]
)

Research Paper Analysis Pipeline#

research_pipeline = ExtractionPipeline(
    aspects=[
        Aspect(name="Abstract", description="Paper abstract and summary"),
        Aspect(name="Methodology", description="Research methods and approach"),
        Aspect(name="Results", description="Findings and outcomes"),
        Aspect(name="Conclusions", description="Conclusions and implications"),
    ],
    concepts=[
        StringConcept(name="Research Field", description="Primary research domain"),
        StringConcept(name="Keywords", description="Paper keywords and topics"),
        DateConcept(name="Publication Date", description="When the paper was published"),
        RatingConcept(name="Novelty Score", description="Novelty of the research", rating_scale=(1, 10)),
    ]
)

⚑ Pipeline Reuse Benefits#

  • Consistency: Ensures all documents are processed with identical extraction criteria

  • Efficiency: Eliminates the need to recreate aspects and concepts for each document

  • Maintainability: Changes to extraction logic only need to be made in one place