ContextGem and other frameworks#
Due to ContextGem’s powerful abstractions, it is the easiest and fastest way to build LLM extraction workflows for document analysis.
✏️ Basic Example#
Below is a basic example of an extraction workflow - extraction of anomalies from a document - implemented side-by-side in ContextGem and other frameworks. (All implementations are self-contained. Comparison as of 24 March 2025.)
Even implementing this basic extraction workflow requires significantly more effort in other frameworks:
🔧 Manual model definition: Developers must define Pydantic validation models for structured output
📝 Prompt engineering: Crafting comprehensive prompts that guide the LLM effectively
🔄 Output parsing logic: Setting up parsers to handle the LLM’s response
📄 Reference mapping: Writing custom logic for mapping references in the source document
In contrast, ContextGem handles all these complexities automatically. Users simply describe what to extract in natural language, provide basic configuration parameters, and the framework takes care of the rest.
⚡ Fastest way
ContextGem is the fastest and easiest way to implement an LLM extraction workflow. All the boilerplate code is handled behind the scenes.
Major time savers:
⌨️ Simple syntax: ContextGem uses a simple, intuitive API that requires minimal code
📝 Automatic prompt engineering: ContextGem automatically constructs a prompt tailored to the extraction task
🔄 Automatic model definition: ContextGem automatically defines the Pydantic model for structured output
🧩 Automatic output parsing: ContextGem automatically parses the LLM’s response
🔍 Automatic reference tracking: Precise references are automatically extracted and mapped to the original document
📏 Flexible reference granularity: References can be tracked at different levels (paragraphs, sentences)
# Quick Start Example - Extracting anomalies from a document, with source references and justifications
import os
from contextgem import Document, DocumentLLM, StringConcept
# Example document instance
# Document content is shortened for brevity
doc = Document(
raw_text=(
"Consultancy Agreement\n"
"This agreement between Company A (Supplier) and Company B (Customer)...\n"
"The term of the agreement is 1 year from the Effective Date...\n"
"The Supplier shall provide consultancy services as described in Annex 2...\n"
"The Customer shall pay the Supplier within 30 calendar days of receiving an invoice...\n"
"The purple elephant danced gracefully on the moon while eating ice cream.\n" # 💎 anomaly
"This agreement is governed by the laws of Norway...\n"
),
)
# Attach a document-level concept
doc.concepts = [
StringConcept(
name="Anomalies", # in longer contexts, this concept is hard to capture with RAG
description="Anomalies in the document",
add_references=True,
reference_depth="sentences",
add_justifications=True,
justification_depth="brief",
)
# add more concepts to the document, if needed
# see the docs for available concepts: StringConcept, JsonObjectConcept, etc.
]
# Or use doc.add_concepts([...])
# Create an LLM for extracting data and insights from the document
llm = DocumentLLM(
model="openai/gpt-4o-mini", # or any other LLM from e.g. Anthropic, etc.
api_key=os.environ.get(
"CONTEXTGEM_OPENAI_API_KEY"
), # your API key for the LLM provider, e.g. OpenAI, Anthropic, etc.
# see the docs for more configuration options
)
# Extract information from the document
doc = llm.extract_all(doc) # or use async version llm.extract_all_async(doc)
# Access extracted information in the document object
print(
doc.concepts[0].extracted_items
) # extracted items with references & justifications
# or doc.get_concept_by_name("Anomalies").extracted_items
LangChain is a popular and versatile framework for building LLM applications through composable components. It offers excellent flexibility and a rich ecosystem of integrations. While powerful, feature-rich, and widely adopted in the industry, it requires more manual configuration and setup work for structured data extraction tasks compared to ContextGem’s streamlined approach.
Development overhead:
📝 Manual prompt engineering: Crafting comprehensive prompts that guide the LLM effectively
🔧 Manual model definition: Developers must define Pydantic validation models for structured output
🧩 Manual output parsing: Setting up parsers to handle the LLM’s response
🔍 Manual reference mapping: Writing custom logic for mapping references
# LangChain implementation for extracting anomalies from a document, with source references and justifications
import os
from textwrap import dedent
from typing import Optional
from langchain.output_parsers import PydanticOutputParser
from langchain.prompts import PromptTemplate
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
# Pydantic models must be manually defined
class Anomaly(BaseModel):
"""An anomaly found in the document."""
text: str = Field(description="The anomalous text found in the document")
justification: str = Field(
description="Brief justification for why this is an anomaly"
)
reference: str = Field(
description="The sentence containing the anomaly"
) # LLM reciting a reference is error-prone and unreliable
class AnomaliesList(BaseModel):
"""List of anomalies found in the document."""
anomalies: list[Anomaly] = Field(
description="List of anomalies found in the document"
)
def extract_anomalies_with_langchain(
document_text: str, api_key: Optional[str] = None
) -> list[Anomaly]:
"""
Extract anomalies from a document using LangChain.
Args:
document_text: The text content of the document
api_key: OpenAI API key (defaults to environment variable)
Returns:
List of extracted anomalies with justifications and references
"""
openai_api_key = api_key or os.environ.get("CONTEXTGEM_OPENAI_API_KEY")
llm = ChatOpenAI(model="gpt-4o-mini", openai_api_key=openai_api_key, temperature=0)
# Create a parser for structured output
parser = PydanticOutputParser(pydantic_object=AnomaliesList)
# Prompt must be manually drafted
# This is a basic example, which is shortened for brevity. The prompt should be improved for better accuracy.
template = dedent(
"""
You are an expert document analyzer. Your task is to identify any anomalies in the document.
Anomalies are statements, phrases, or content that seem out of place, irrelevant, or inconsistent
with the rest of the document's context and purpose.
Document:
{document_text}
Identify all anomalies in the document. For each anomaly, provide:
1. The anomalous text
2. A brief justification explaining why it's an anomaly
3. The complete sentence containing the anomaly for reference
{format_instructions}
"""
)
prompt = PromptTemplate(
template=template,
input_variables=["document_text"],
partial_variables={"format_instructions": parser.get_format_instructions()},
)
# Create a runnable chain
chain = (
{"document_text": lambda x: x}
| RunnablePassthrough.assign()
| prompt
| llm
| RunnableLambda(lambda x: parser.parse(x.content))
)
# Run the chain and extract anomalies
parsed_output = chain.invoke(document_text)
return parsed_output.anomalies
# Example usage
# Sample document text (shortened for brevity)
document_text = (
"Consultancy Agreement\n"
"This agreement between Company A (Supplier) and Company B (Customer)...\n"
"The term of the agreement is 1 year from the Effective Date...\n"
"The Supplier shall provide consultancy services as described in Annex 2...\n"
"The Customer shall pay the Supplier within 30 calendar days of receiving an invoice...\n"
"The purple elephant danced gracefully on the moon while eating ice cream.\n" # out-of-context / anomaly
"This agreement is governed by the laws of Norway...\n"
)
# Extract anomalies
anomalies = extract_anomalies_with_langchain(document_text)
# Print results
for anomaly in anomalies:
print(f"Anomaly: {anomaly}")
LlamaIndex is a powerful and versatile framework for building LLM applications with data, particularly excelling at RAG workflows and document retrieval. It offers a comprehensive set of tools for data indexing and querying. While highly effective for its intended use cases, for structured data extraction tasks (non-RAG setup), it requires more manual configuration and setup work compared to ContextGem’s streamlined approach.
Development overhead:
📝 Manual prompt engineering: Crafting comprehensive prompts that guide the LLM effectively
🔧 Manual model definition: Developers must define Pydantic validation models for structured output
🧩 Manual output parsing: Setting up parsers to handle the LLM’s response
🔍 Manual reference mapping: Writing custom logic for mapping references
# LlamaIndex implementation for extracting anomalies from a document, with source references and justifications
import os
from textwrap import dedent
from typing import Optional
from llama_index.core.output_parsers import PydanticOutputParser
from llama_index.core.program import LLMTextCompletionProgram
from llama_index.llms.openai import OpenAI
from pydantic import BaseModel, Field
# Pydantic models must be manually defined
class Anomaly(BaseModel):
"""An anomaly found in the document."""
text: str = Field(description="The anomalous text found in the document")
justification: str = Field(
description="Brief justification for why this is an anomaly"
)
reference: str = Field(
description="The sentence containing the anomaly"
) # LLM reciting a reference is error-prone and unreliable
class AnomaliesList(BaseModel):
"""List of anomalies found in the document."""
anomalies: list[Anomaly] = Field(
description="List of anomalies found in the document"
)
def extract_anomalies_with_llama_index(
document_text: str, api_key: Optional[str] = None
) -> list[Anomaly]:
"""
Extract anomalies from a document using LlamaIndex.
Args:
document_text: The text content of the document
api_key: OpenAI API key (defaults to environment variable)
Returns:
List of extracted anomalies with justifications and references
"""
openai_api_key = api_key or os.environ.get("CONTEXTGEM_OPENAI_API_KEY")
llm = OpenAI(model="gpt-4o-mini", api_key=openai_api_key, temperature=0)
# Prompt must be manually drafted
# This is a basic example, which is shortened for brevity. The prompt should be improved for better accuracy.
prompt_template = dedent(
"""
You are an expert document analyzer. Your task is to identify any anomalies in the document.
Anomalies are statements, phrases, or content that seem out of place, irrelevant, or inconsistent
with the rest of the document's context and purpose.
Document:
{document_text}
Identify all anomalies in the document. For each anomaly, provide:
1. The anomalous text
2. A brief justification explaining why it's an anomaly
3. The complete sentence containing the anomaly for reference
"""
)
# Use PydanticOutputParser to directly parse the LLM output into our structured format
program = LLMTextCompletionProgram.from_defaults(
output_parser=PydanticOutputParser(output_cls=AnomaliesList),
prompt_template_str=prompt_template,
llm=llm,
verbose=True,
)
# Execute the program
try:
result = program(document_text=document_text)
return result.anomalies
except Exception as e:
print(f"Error parsing LLM response: {e}")
return []
# Example usage
# Sample document text (shortened for brevity)
document_text = (
"Consultancy Agreement\n"
"This agreement between Company A (Supplier) and Company B (Customer)...\n"
"The term of the agreement is 1 year from the Effective Date...\n"
"The Supplier shall provide consultancy services as described in Annex 2...\n"
"The Customer shall pay the Supplier within 30 calendar days of receiving an invoice...\n"
"The purple elephant danced gracefully on the moon while eating ice cream.\n" # out-of-context / anomaly
"This agreement is governed by the laws of Norway...\n"
)
# Extract anomalies
anomalies = extract_anomalies_with_llama_index(document_text)
# Print results
for anomaly in anomalies:
print(f"Anomaly: {anomaly}")
LlamaIndex with RAG setup is a powerful and sophisticated framework for document retrieval and analysis, offering exceptional capabilities for knowledge-intensive applications. Its comprehensive architecture excels at handling complex document interactions and information retrieval tasks across large document collections. While it provides robust and versatile capabilities for building advanced document-based applications, it does require more manual configuration and specialized setup for structured extraction tasks compared to ContextGem’s streamlined and intuitive approach.
Development overhead:
📝 Manual prompt engineering: Crafting comprehensive prompts that guide the LLM effectively
🔧 Manual model definition: Developers must define Pydantic validation models for structured output
🧩 Manual output parsing: Setting up parsers to handle the LLM’s response
🔍 Complex reference mapping: Getting precise references correctly requires additional config, such as setting up a sentence splitter, CitationQueryEngine, adjusting chunk sizes, etc.
# LlamaIndex (RAG) implementation for extracting anomalies from a document, with source references and justifications
import os
from textwrap import dedent
from typing import Any, Optional
from llama_index.core import Document, Settings, VectorStoreIndex
from llama_index.core.base.response.schema import RESPONSE_TYPE
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.output_parsers import PydanticOutputParser
from llama_index.core.query_engine import CitationQueryEngine
from llama_index.core.response_synthesizers.base import BaseSynthesizer
from llama_index.core.retrievers import VectorIndexRetriever
from llama_index.llms.openai import OpenAI
from pydantic import BaseModel, Field
# Pydantic models must be manually defined
class Anomaly(BaseModel):
text: str = Field(description="The anomalous text found in the document")
justification: str = Field(
description="Brief justification for why this is an anomaly"
)
# This field will hold the citation info (e.g., node references)
source_id: Optional[str] = Field(
description="Automatically added source reference", default=None
)
class AnomaliesList(BaseModel):
anomalies: list[Anomaly] = Field(
description="List of anomalies found in the document"
)
# Custom synthesizer that instructs the LLM to extract anomalies in JSON format.
class AnomalyExtractorSynthesizer(BaseSynthesizer):
def __init__(self, llm=None, nodes=None):
super().__init__()
self._llm = llm or Settings.llm
# Nodes are still provided in case additional context is needed.
self._nodes = nodes or []
def _get_prompts(self) -> dict[str, Any]:
return {}
def _update_prompts(self, prompts: dict[str, Any]):
return
async def aget_response(
self, query_str: str, text_chunks: list[str], **kwargs: Any
) -> RESPONSE_TYPE:
return self.get_response(query_str, text_chunks, **kwargs)
def get_response(
self, query_str: str, text_chunks: list[str], **kwargs: Any
) -> str:
all_text = "\n".join(text_chunks)
# Prompt must be manually drafted
# This is a basic example, which is shortened for brevity. The prompt should be improved for better accuracy.
prompt_str = dedent(
"""
You are an expert document analyzer. Your task is to identify anomalies in the document.
Anomalies are statements or phrases that seem out of place or inconsistent with the document's context.
Document:
{all_text}
For each anomaly, provide:
1. The anomalous text (only the specific phrase).
2. A brief justification for why it is an anomaly.
Format your answer as a JSON object:
{{
"anomalies": [
{{
"text": "anomalous text",
"justification": "reason for anomaly",
}}
]
}}
"""
)
print(prompt_str)
output_parser = PydanticOutputParser(output_cls=AnomaliesList)
response = self._llm.complete(prompt_str.format(all_text=all_text))
try:
parsed_response = output_parser.parse(response.text)
self._last_anomalies = parsed_response
return parsed_response.model_dump_json()
except Exception as e:
print(f"Error parsing LLM response: {e}")
print(f"Raw response: {response.text}")
return "{}"
def extract_anomalies_with_citations(
document_text: str, api_key: Optional[str] = None
) -> list[Anomaly]:
"""
Extract anomalies from a document using LlamaIndex with citation support.
Args:
document_text: The content of the document.
api_key: OpenAI API key (if not provided, read from environment variable).
Returns:
List of extracted anomalies with automatically added source references.
"""
openai_api_key = api_key or os.environ.get("CONTEXTGEM_OPENAI_API_KEY")
llm = OpenAI(model="gpt-4o-mini", api_key=openai_api_key, temperature=0)
Settings.llm = llm
# Create a Document and split it into nodes
doc = Document(text=document_text)
splitter = SentenceSplitter(
paragraph_separator="\n",
chunk_size=100,
chunk_overlap=0,
)
nodes = splitter.get_nodes_from_documents([doc])
print(f"Document split into {len(nodes)} nodes")
# Build a vector index and retriever using all nodes.
index = VectorStoreIndex(nodes)
retriever = VectorIndexRetriever(index=index, similarity_top_k=len(nodes))
# Create a custom synthesizer.
synthesizer = AnomalyExtractorSynthesizer(llm=llm, nodes=nodes)
# Initialize CitationQueryEngine by passing the expected components.
citation_query_engine = CitationQueryEngine(
retriever=retriever,
llm=llm,
response_synthesizer=synthesizer,
citation_chunk_size=100, # Adjust as needed
citation_chunk_overlap=10, # Adjust as needed
)
try:
response = citation_query_engine.query(
"Extract all anomalies from this document"
)
# If the synthesizer stored the anomalies, attach the citation info
if hasattr(synthesizer, "_last_anomalies"):
anomalies = synthesizer._last_anomalies.anomalies
formatted_citations = (
response.get_formatted_sources()
if hasattr(response, "get_formatted_sources")
else None
)
for anomaly in anomalies:
anomaly.source_id = formatted_citations
return anomalies
return []
except Exception as e:
print(f"Error querying document: {e}")
return []
# Example usage
document_text = (
"Consultancy Agreement\n"
"This agreement between Company A (Supplier) and Company B (Customer)...\n"
"The term of the agreement is 1 year from the Effective Date...\n"
"The Supplier shall provide consultancy services as described in Annex 2...\n"
"The Customer shall pay the Supplier within 30 calendar days of receiving an invoice...\n"
"The purple elephant danced gracefully on the moon while eating ice cream.\n" # anomaly
"This agreement is governed by the laws of Norway...\n"
)
anomalies = extract_anomalies_with_citations(document_text)
for anomaly in anomalies:
print(f"Anomaly: {anomaly}")
Instructor is a popular framework that specializes in structured data extraction with LLMs using Pydantic. It offers excellent type safety and validation capabilities, making it a solid choice for many extraction tasks. While powerful for structured outputs, Instructor requires more manual setup for document analysis workflows.
Development overhead:
📝 Manual prompt engineering: Crafting comprehensive prompts that guide the LLM effectively
🔧 Manual model definition: Developers must define Pydantic validation models for structured output
🔍 Manual reference mapping: Writing custom logic for mapping references
# Instructor implementation for extracting anomalies from a document, with source references and justifications
import os
from textwrap import dedent
from typing import Optional
import instructor
from openai import OpenAI
from pydantic import BaseModel, Field
# Pydantic models must be manually defined
class Anomaly(BaseModel):
"""An anomaly found in the document."""
text: str = Field(description="The anomalous text found in the document")
justification: str = Field(
description="Brief justification for why this is an anomaly"
)
source_text: str = Field(
description="The sentence containing the anomaly"
) # LLM reciting a reference is error-prone and unreliable
class AnomaliesList(BaseModel):
"""List of anomalies found in the document."""
anomalies: list[Anomaly] = Field(
description="List of anomalies found in the document"
)
def extract_anomalies_with_instructor(
document_text: str, api_key: Optional[str] = None
) -> list[Anomaly]:
"""
Extract anomalies from a document using Instructor.
Args:
document_text: The text content of the document
api_key: OpenAI API key (defaults to environment variable)
Returns:
List of extracted anomalies with justifications and references
"""
openai_api_key = api_key or os.environ.get("CONTEXTGEM_OPENAI_API_KEY")
client = OpenAI(api_key=openai_api_key)
instructor_client = instructor.from_openai(client)
# Prompt must be manually drafted
# This is a basic example, which is shortened for brevity. The prompt should be improved for better accuracy.
prompt = dedent(
f"""
You are an expert document analyzer. Your task is to identify any anomalies in the document.
Anomalies are statements, phrases, or content that seem out of place, irrelevant, or inconsistent
with the rest of the document's context and purpose.
Document:
{document_text}
Identify all anomalies in the document. For each anomaly, provide:
1. The anomalous text - just the specific anomalous phrase
2. A brief justification explaining why it's an anomaly
3. The exact complete sentence containing the anomaly for reference
Only identify real anomalies that truly don't belong in this type of document.
"""
)
# Extract structured data using Instructor
response = instructor_client.chat.completions.create(
model="gpt-4o-mini",
response_model=AnomaliesList,
messages=[
{"role": "system", "content": "You are an expert document analyzer."},
{"role": "user", "content": prompt},
],
temperature=0,
)
return response.anomalies
# Example usage
# Sample document text (shortened for brevity)
document_text = (
"Consultancy Agreement\n"
"This agreement between Company A (Supplier) and Company B (Customer)...\n"
"The term of the agreement is 1 year from the Effective Date...\n"
"The Supplier shall provide consultancy services as described in Annex 2...\n"
"The Customer shall pay the Supplier within 30 calendar days of receiving an invoice...\n"
"The purple elephant danced gracefully on the moon while eating ice cream.\n" # out-of-context / anomaly
"This agreement is governed by the laws of Norway...\n"
)
# Extract anomalies
anomalies = extract_anomalies_with_instructor(document_text)
# Print results
for anomaly in anomalies:
print(f"Anomaly: {anomaly}")
🔬 Advanced Example#
As use cases grow more complex, the development overhead of alternative frameworks becomes increasingly evident, while ContextGem’s abstractions deliver substantial time savings. As extraction steps stack up, the implementation with other frameworks quickly becomes non-scalable:
📝 Manual prompt engineering: Crafting comprehensive prompts for each extraction step
🔧 Manual model definition: Defining Pydantic validation models for each element of extraction
🧩 Manual output parsing: Setting up parsers to handle the LLM’s response
🔍 Manual reference mapping: Writing custom logic for mapping references
📄 Complex pipeline configuration: Writing custom logic for pipeline configuration and extraction components
📊 Implementing usage and cost tracking callbacks, which quickly increases in complexity when multiple LLMs are used in the pipeline
🔄 Complex concurrency setup: Implementing complex concurrency logic with asyncio
📝 Embedding examples in prompts: Writing output examples directly in the custom prompts
📋 Manual result aggregation: Need to write code to collect and organize results
Below is a more advanced example of an extraction workflow - using an extraction pipeline for multiple documents, with concurrency and cost tracking - implemented side-by-side in ContextGem and other frameworks. (All implementations are self-contained. Comparison as of 24 March 2025.)
⚡ Fastest way
ContextGem is the fastest and easiest way to implement an LLM extraction workflow. All the boilerplate code is handled behind the scenes.
Major time savers:
⌨️ Simple syntax: ContextGem uses a simple, intuitive API that requires minimal code
🔄 Automatic model definition: ContextGem automatically defines the Pydantic model for structured output
📝 Automatic prompt engineering: ContextGem automatically constructs a prompt tailored to the extraction task
🧩 Automatic output parsing: ContextGem automatically parses the LLM’s response
🔍 Automatic reference tracking: Precise references are automatically extracted and mapped to the original document
📏 Flexible reference granularity: References can be tracked at different levels (paragraphs, sentences)
📄 Easy pipeline definition: Simple, declarative syntax for defining the extraction pipeline involving multiple LLMs, in a few lines of code
💰 Automated usage and cost tracking: Built-in token counting and cost calculation without additional setup
🔄 Built-in concurrency: Concurrent execution of extraction steps with a simple switch
use_concurrency=True
📊 Easy example definition: Output examples can be easily defined without modifying any prompts
📋 Built-in result aggregation: Results are automatically collected and organized in a unified storage model (document)
# Advanced Usage Example - analyzing multiple documents with a single pipeline,
# with different LLMs, concurrency and cost tracking
import os
from contextgem import (
Aspect,
DateConcept,
Document,
DocumentLLM,
DocumentLLMGroup,
DocumentPipeline,
JsonObjectConcept,
JsonObjectExample,
LLMPricing,
NumericalConcept,
RatingConcept,
RatingScale,
StringConcept,
StringExample,
)
# Construct documents
# Document 1 - Consultancy Agreement (shortened for brevity)
doc1 = Document(
raw_text=(
"Consultancy Agreement\n"
"This agreement between Company A (Supplier) and Company B (Customer)...\n"
"The term of the agreement is 1 year from the Effective Date...\n"
"The Supplier shall provide consultancy services as described in Annex 2...\n"
"The Customer shall pay the Supplier within 30 calendar days of receiving an invoice...\n"
"All intellectual property created during the provision of services shall belong to the Customer...\n"
"This agreement is governed by the laws of Norway...\n"
"Annex 1: Data processing agreement...\n"
"Annex 2: Statement of Work...\n"
"Annex 3: Service Level Agreement...\n"
),
)
# Document 2 - Service Level Agreement (shortened for brevity)
doc2 = Document(
raw_text=(
"Service Level Agreement\n"
"This agreement between TechCorp (Provider) and GlobalInc (Client)...\n"
"The agreement shall commence on January 1, 2023 and continue for 2 years...\n"
"The Provider shall deliver IT support services as outlined in Schedule A...\n"
"The Client shall make monthly payments of $5,000 within 15 days of invoice receipt...\n"
"The Provider guarantees [99.9%] uptime for all critical systems...\n"
"Either party may terminate with 60 days written notice...\n"
"This agreement is governed by the laws of California...\n"
"Schedule A: Service Descriptions...\n"
"Schedule B: Response Time Requirements...\n"
),
)
# Create a reusable document pipeline for extraction
contract_pipeline = DocumentPipeline()
# Define aspects and aspect-level concepts in the pipeline
# Concepts in the aspects will be extracted from the extracted aspect context
contract_pipeline.aspects = [ # or use .add_aspects([...])
Aspect(
name="Contract Parties",
description="Clauses defining the parties to the agreement",
concepts=[ # define aspect-level concepts, if any
StringConcept(
name="Party names and roles",
description="Names of all parties entering into the agreement and their roles",
examples=[ # optional
StringExample(
content="X (Client)", # guidance regarding the expected output format
)
],
)
],
),
Aspect(
name="Term",
description="Clauses defining the term of the agreement",
concepts=[
NumericalConcept(
name="Contract term",
description="The term of the agreement in years",
numeric_type="int", # or "float", or "any" for auto-detection
add_references=True, # extract references to the source text
reference_depth="paragraphs",
)
],
),
]
# Define document-level concepts
# Concepts in the document will be extracted from the whole document content
contract_pipeline.concepts = [ # or use .add_concepts()
DateConcept(
name="Effective date",
description="The effective date of the agreement",
),
StringConcept(
name="Contract type",
description="The type of agreement",
llm_role="reasoner_text", # for this concept, we use a more advanced LLM for reasoning
),
StringConcept(
name="Governing law",
description="The law that governs the agreement",
),
JsonObjectConcept(
name="Attachments",
description="The titles and concise descriptions of the attachments to the agreement",
structure={"title": str, "description": str | None},
examples=[ # optional
JsonObjectExample( # guidance regarding the expected output format
content={
"title": "Appendix A",
"description": "Code of conduct",
}
),
],
),
RatingConcept(
name="Duration adequacy",
description="Contract duration adequacy considering the subject matter and best practices.",
llm_role="reasoner_text", # for this concept, we use a more advanced LLM for reasoning
rating_scale=RatingScale(start=1, end=10),
add_justifications=True, # add justifications for the rating
justification_depth="balanced", # provide a balanced justification
justification_max_sents=3,
),
]
# Assign pipeline to the documents
# You can re-use the same pipeline for multiple documents
doc1.assign_pipeline(
contract_pipeline
) # assigns pipeline aspects and concepts to the document
doc2.assign_pipeline(
contract_pipeline
) # assigns pipeline aspects and concepts to the document
# Create an LLM group for data extraction and reasoning
llm_extractor = DocumentLLM(
model="openai/gpt-4o-mini", # or any other LLM from e.g. Anthropic, etc.
api_key=os.environ["CONTEXTGEM_OPENAI_API_KEY"], # your API key
role="extractor_text", # signifies the LLM is used for data extraction tasks
pricing_details=LLMPricing( # optional, for costs calculation
input_per_1m_tokens=0.150,
output_per_1m_tokens=0.600,
),
)
llm_reasoner = DocumentLLM(
model="openai/o3-mini", # or any other LLM from e.g. Anthropic, etc.
api_key=os.environ["CONTEXTGEM_OPENAI_API_KEY"], # your API key
role="reasoner_text", # signifies the LLM is used for reasoning tasks
pricing_details=LLMPricing( # optional, for costs calculation
input_per_1m_tokens=1.10,
output_per_1m_tokens=4.40,
),
)
# The LLM group is used for all extraction tasks within the pipeline
llm_group = DocumentLLMGroup(llms=[llm_extractor, llm_reasoner])
# Extract all information from the documents at once
doc1 = llm_group.extract_all(
doc1, use_concurrency=True
) # use concurrency to speed up extraction
doc2 = llm_group.extract_all(
doc2, use_concurrency=True
) # use concurrency to speed up extraction
# Or use async variants .extract_all_async(...)
# Get the extracted data
print("Some extracted data from doc 1:")
print("Contract Parties > Party names and roles:")
print(
doc1.get_aspect_by_name("Contract Parties")
.get_concept_by_name("Party names and roles")
.extracted_items
)
print("Attachments:")
print(doc1.get_concept_by_name("Attachments").extracted_items)
# ...
print("\nSome extracted data from doc 2:")
print("Term > Contract term:")
print(
doc2.get_aspect_by_name("Term")
.get_concept_by_name("Contract term")
.extracted_items[0]
.value
)
print("Duration adequacy:")
print(doc2.get_concept_by_name("Duration adequacy").extracted_items[0].value)
print(doc2.get_concept_by_name("Duration adequacy").extracted_items[0].justification)
# ...
# Output processing costs (requires setting the pricing details for each LLM)
print("\nProcessing costs:")
print(llm_group.get_cost())
LangChain provides a powerful and flexible framework for building LLM applications with excellent composability and a rich ecosystem of integrations. While it offers great versatility for many use cases, it does require additional manual setup and configuration for complex extraction workflows.
Development overhead:
📝 Manual prompt engineering: Must craft detailed prompts for each extraction step
🔧 Manual model definition: Need to define Pydantic models and output parsers for structured data
🧩 Complex chain configuration: Requires manual setup of chains and their connections involving multiple LLMs
🔍 Manual reference mapping: Must implement custom logic to track source references
🔄 Complex concurrency setup: Implementing concurrent processing requires additional setup with asyncio
💰 Cost tracking setup: Requires custom logic for cost tracking for each LLM
💾 No unified storage model: Need to write additional code to collect and organize results
# LangChain implementation of analyzing multiple documents with a single pipeline,
# with different LLMs, concurrency, and cost tracking
# Jupyter notebook compatible version
import asyncio
import os
import time
from dataclasses import dataclass, field
from textwrap import dedent
from typing import Optional
import nest_asyncio
nest_asyncio.apply()
from langchain.callbacks import get_openai_callback
from langchain.output_parsers import PydanticOutputParser
from langchain.prompts import PromptTemplate
from langchain_core.runnables import (
RunnableLambda,
RunnableParallel,
RunnablePassthrough,
)
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
# Pydantic models must be manually defined
class PartyInfo(BaseModel):
"""Information about contract parties"""
name: str = Field(description="Name of the party")
role: str = Field(description="Role of the party (e.g., Client, Provider)")
class Term(BaseModel):
"""Contract term information"""
duration_years: int = Field(description="Duration in years")
reference: str = Field(
description="Reference text from document"
) # LLM reciting a reference is error-prone and unreliable
class Attachment(BaseModel):
"""Contract attachment information"""
title: str = Field(description="Title of the attachment")
description: Optional[str] = Field(
description="Brief description of the attachment"
)
class ContractRating(BaseModel):
"""Rating with justification"""
score: int = Field(description="Rating score (1-10)")
justification: str = Field(description="Justification for the rating")
class ContractInfo(BaseModel):
"""Complete contract information"""
contract_type: str = Field(description="Type of contract")
effective_date: Optional[str] = Field(description="Effective date of the contract")
governing_law: Optional[str] = Field(description="Governing law of the contract")
class AspectExtraction(BaseModel):
"""Result of aspect extraction"""
aspect_text: str = Field(
description="Extracted text for this aspect"
) # this does not provide granular structured content, such as specific paragraphs and sentences
class PartyExtraction(BaseModel):
"""Party extraction results"""
parties: list[PartyInfo] = Field(description="List of parties in the contract")
class TermExtraction(BaseModel):
"""Term extraction results"""
terms: list[Term] = Field(description="Contract term details")
class AttachmentExtraction(BaseModel):
"""Attachment extraction results"""
attachments: list[Attachment] = Field(description="List of contract attachments")
class DurationRatingExtraction(BaseModel):
"""Duration adequacy rating"""
rating: ContractRating = Field(description="Rating of contract duration adequacy")
# Configuration models must be manually defined
@dataclass
class ExtractorConfig:
"""Configuration for a specific extractor"""
name: str
description: str
model_name: str = "gpt-4o-mini" # Default model
@dataclass
class PipelineConfig:
"""Complete pipeline configuration"""
# Aspect extractors
party_extractor: ExtractorConfig = field(
default_factory=lambda: ExtractorConfig(
name="Contract Parties",
description="Clauses defining the parties to the agreement",
)
)
term_extractor: ExtractorConfig = field(
default_factory=lambda: ExtractorConfig(
name="Term", description="Clauses defining the term of the agreement"
)
)
# Document-level extractors
contract_info_extractor: ExtractorConfig = field(
default_factory=lambda: ExtractorConfig(
name="Contract Information",
description="Basic contract information including type, date, and governing law",
)
)
attachment_extractor: ExtractorConfig = field(
default_factory=lambda: ExtractorConfig(
name="Attachments",
description="Contract attachments and their descriptions",
)
)
duration_rating_extractor: ExtractorConfig = field(
default_factory=lambda: ExtractorConfig(
name="Duration Rating",
description="Rating of contract duration adequacy",
model_name="o3-mini", # Using a more capable model for judgment
)
)
# LLM configuration
def get_llm(model_name="gpt-4o-mini", api_key=None):
"""Get a ChatOpenAI instance with the specified configuration"""
# Skipped temperature etc. for brevity, as e.g. temperature is not supported by o3-mini
api_key = api_key or os.environ.get("CONTEXTGEM_OPENAI_API_KEY", "")
return ChatOpenAI(model=model_name, openai_api_key=api_key)
# Chain components must be manually defined
def create_aspect_extractor(aspect_name, aspect_description, model_name="gpt-4o-mini"):
"""Create a chain to extract text related to a specific aspect"""
llm = get_llm(model_name=model_name)
parser = PydanticOutputParser(pydantic_object=AspectExtraction)
# Prompt must be manually drafted
# This is a basic example, which is shortened for brevity. The prompt should be improved for better accuracy.
prompt = PromptTemplate(
template=dedent(
"""
You are an expert document analyzer. Extract the text related to the following aspect from the document.
Document:
{document_text}
Aspect: {aspect_name}
Description: {aspect_description}
Extract all text related to this aspect.
{format_instructions}
"""
),
input_variables=["document_text", "aspect_name", "aspect_description"],
partial_variables={"format_instructions": parser.get_format_instructions()},
) # this does not provide granular structured content, such as specific paragraphs and sentences
chain = prompt | llm | parser
# Return a callable that works with both sync and async code
def extractor(doc):
return chain.invoke(
{
"document_text": doc,
"aspect_name": aspect_name,
"aspect_description": aspect_description,
}
)
# Add an async version that will be used when awaited
async def async_extractor(doc):
return await chain.ainvoke(
{
"document_text": doc,
"aspect_name": aspect_name,
"aspect_description": aspect_description,
}
)
extractor.ainvoke = async_extractor
return extractor
def create_party_extractor(model_name="gpt-4o-mini"):
"""Create a chain to extract party information"""
llm = get_llm(model_name=model_name)
parser = PydanticOutputParser(pydantic_object=PartyExtraction)
# Prompt must be manually drafted
# This is a basic example, which is shortened for brevity. The prompt should be improved for better accuracy.
prompt = PromptTemplate(
template=dedent(
"""
You are an expert document analyzer. Extract all party information from the following contract text.
Contract text:
{aspect_text}
For each party, extract their name and role in the agreement.
{format_instructions}
"""
),
input_variables=["aspect_text"],
partial_variables={"format_instructions": parser.get_format_instructions()},
)
chain = prompt | llm | parser
return chain
def create_term_extractor(model_name="gpt-4o-mini"):
"""Create a chain to extract term information"""
llm = get_llm(model_name=model_name)
parser = PydanticOutputParser(pydantic_object=TermExtraction)
# Prompt must be manually drafted
# This is a basic example, which is shortened for brevity. The prompt should be improved for better accuracy.
prompt = PromptTemplate(
template=dedent(
"""
You are an expert document analyzer. Extract term information from the following contract text.
Contract text:
{aspect_text}
Extract the contract term duration in years. Include the relevant reference text.
{format_instructions}
"""
),
input_variables=["aspect_text"],
partial_variables={"format_instructions": parser.get_format_instructions()},
)
chain = prompt | llm | parser
return chain
def create_contract_info_extractor(model_name="gpt-4o-mini"):
"""Create a chain to extract basic contract information"""
llm = get_llm(model_name=model_name)
parser = PydanticOutputParser(pydantic_object=ContractInfo)
# Prompt must be manually drafted
# This is a basic example, which is shortened for brevity. The prompt should be improved for better accuracy.
prompt = PromptTemplate(
template=dedent(
"""
You are an expert document analyzer. Extract the following information from the contract document.
Contract document:
{document_text}
Extract the contract type, effective date if mentioned, and governing law if specified.
{format_instructions}
"""
),
input_variables=["document_text"],
partial_variables={"format_instructions": parser.get_format_instructions()},
)
chain = prompt | llm | parser
return chain
def create_attachment_extractor(model_name="gpt-4o-mini"):
"""Create a chain to extract attachment information"""
llm = get_llm(model_name=model_name)
parser = PydanticOutputParser(pydantic_object=AttachmentExtraction)
# Prompt must be manually drafted
# This is a basic example, which is shortened for brevity. The prompt should be improved for better accuracy.
prompt = PromptTemplate(
template=dedent(
"""
You are an expert document analyzer. Extract information about all attachments, annexes,
schedules, or appendices mentioned in the contract.
Contract document:
{document_text}
For each attachment, extract:
1. The title/name of the attachment (e.g., "Appendix A", "Schedule 1", "Annex 2")
2. A brief description of what the attachment contains (if mentioned in the document)
Example format:
{{"title": "Appendix A", "description": "Code of conduct"}}
{format_instructions}
"""
),
input_variables=["document_text"],
partial_variables={"format_instructions": parser.get_format_instructions()},
)
chain = prompt | llm | parser
return chain
def create_duration_rating_extractor(model_name="o3-mini"):
"""Create a chain to rate contract duration adequacy"""
llm = get_llm(model_name=model_name)
parser = PydanticOutputParser(pydantic_object=DurationRatingExtraction)
# Prompt must be manually drafted
# This is a basic example, which is shortened for brevity. The prompt should be improved for better accuracy.
prompt = PromptTemplate(
template=dedent(
"""
You are an expert contract analyst. Evaluate the adequacy of the contract duration
considering the subject matter and best practices.
Contract document:
{document_text}
Rate the duration adequacy on a scale of 1-10, where:
1 = Extremely inadequate duration
10 = Perfectly adequate duration
Provide a brief justification for your rating (2-3 sentences).
{format_instructions}
"""
),
input_variables=["document_text"],
partial_variables={"format_instructions": parser.get_format_instructions()},
)
chain = prompt | llm | parser
return chain
# Complete pipeline definition
def create_document_pipeline(config=PipelineConfig()):
"""Create a complete document analysis pipeline and return it along with its components"""
# Create aspect extractors
party_aspect_extractor = create_aspect_extractor(
config.party_extractor.name,
config.party_extractor.description,
config.party_extractor.model_name,
)
term_aspect_extractor = create_aspect_extractor(
config.term_extractor.name,
config.term_extractor.description,
config.term_extractor.model_name,
)
# Create concept extractors for aspects
party_extractor = create_party_extractor(config.party_extractor.model_name)
term_extractor = create_term_extractor(config.term_extractor.model_name)
# Create document-level extractors
contract_info_extractor = create_contract_info_extractor(
config.contract_info_extractor.model_name
)
attachment_extractor = create_attachment_extractor(
config.attachment_extractor.model_name
)
duration_rating_extractor = create_duration_rating_extractor(
config.duration_rating_extractor.model_name
)
# Create aspect extraction pipeline
party_pipeline = (
RunnablePassthrough()
| party_aspect_extractor
| RunnableLambda(lambda x: {"aspect_text": x.aspect_text})
| party_extractor
)
term_pipeline = (
RunnablePassthrough()
| term_aspect_extractor
| RunnableLambda(lambda x: {"aspect_text": x.aspect_text})
| term_extractor
)
# Create document-level extraction pipeline
document_extraction = RunnableParallel(
contract_info=contract_info_extractor,
attachments=attachment_extractor,
duration_rating=duration_rating_extractor,
)
# Combine into complete pipeline
complete_pipeline = RunnableParallel(
parties=party_pipeline, terms=term_pipeline, document_info=document_extraction
)
# Create a components dictionary for easy access
components = {
"party_pipeline": party_pipeline,
"term_pipeline": term_pipeline,
"contract_info_extractor": contract_info_extractor,
"attachment_extractor": attachment_extractor,
"duration_rating_extractor": duration_rating_extractor,
}
return complete_pipeline, components
# Cost tracking
class CostTracker:
"""Track LLM costs across multiple extractions"""
def __init__(self):
self.costs = {
"gpt-4o-mini": {
"input_per_1m": 0.15,
"output_per_1m": 0.60,
"input_tokens": 0,
"output_tokens": 0,
},
"o3-mini": {
"input_per_1m": 1.10,
"output_per_1m": 4.40,
"input_tokens": 0,
"output_tokens": 0,
},
}
self.total_cost = 0.0
def track_usage(self, model_name, input_tokens, output_tokens):
"""Track token usage for a model"""
# Extract base model name
base_model = model_name.split("/")[-1] if "/" in model_name else model_name
if base_model in self.costs:
self.costs[base_model]["input_tokens"] += input_tokens
self.costs[base_model]["output_tokens"] += output_tokens
# Calculate costs separately for input and output tokens
input_cost = input_tokens * (
self.costs[base_model]["input_per_1m"] / 1000000
)
output_cost = output_tokens * (
self.costs[base_model]["output_per_1m"] / 1000000
)
self.total_cost += input_cost + output_cost
def get_costs(self):
"""Get cost summary"""
model_costs = {}
for model, data in self.costs.items():
if data["input_tokens"] > 0 or data["output_tokens"] > 0:
input_cost = data["input_tokens"] * (data["input_per_1m"] / 1000000)
output_cost = data["output_tokens"] * (data["output_per_1m"] / 1000000)
model_costs[model] = {
"input_cost": input_cost,
"output_cost": output_cost,
"total_cost": input_cost + output_cost,
"input_tokens": data["input_tokens"],
"output_tokens": data["output_tokens"],
}
return {
"model_costs": model_costs,
"total_cost": self.total_cost,
}
# Document processing functions
async def process_document_async(
document_text, pipeline_and_components, cost_tracker=None, use_concurrency=True
):
"""Process a document asynchronously and track costs"""
pipeline, components = pipeline_and_components # Unpack the pipeline and components
results = {}
# Track tokens used across all calls
total_tokens = {
"gpt-4o-mini": {"input": 0, "output": 0},
"o3-mini": {"input": 0, "output": 0},
}
# Use the provided components
async def process_parties():
"""Process parties using the party pipeline"""
with get_openai_callback() as cb:
party_results = await components["party_pipeline"].ainvoke(document_text)
total_tokens["gpt-4o-mini"]["input"] += cb.prompt_tokens
total_tokens["gpt-4o-mini"]["output"] += cb.completion_tokens
return party_results
async def process_terms():
"""Process terms using the term pipeline"""
with get_openai_callback() as cb:
term_results = await components["term_pipeline"].ainvoke(document_text)
total_tokens["gpt-4o-mini"]["input"] += cb.prompt_tokens
total_tokens["gpt-4o-mini"]["output"] += cb.completion_tokens
return term_results
async def process_contract_info():
"""Process contract info"""
with get_openai_callback() as cb:
info_results = await components["contract_info_extractor"].ainvoke(
document_text
)
total_tokens["gpt-4o-mini"]["input"] += cb.prompt_tokens
total_tokens["gpt-4o-mini"]["output"] += cb.completion_tokens
return info_results
async def process_attachments():
"""Process attachments"""
with get_openai_callback() as cb:
attachment_results = await components["attachment_extractor"].ainvoke(
document_text
)
total_tokens["gpt-4o-mini"]["input"] += cb.prompt_tokens
total_tokens["gpt-4o-mini"]["output"] += cb.completion_tokens
return attachment_results
async def process_duration_rating():
"""Process duration rating"""
with get_openai_callback() as cb:
duration_results = await components["duration_rating_extractor"].ainvoke(
document_text
)
# Duration rating is done with o3-mini
total_tokens["o3-mini"]["input"] += cb.prompt_tokens
total_tokens["o3-mini"]["output"] += cb.completion_tokens
return duration_results
# Run extractions based on concurrency preference
if use_concurrency:
# Process all extractions concurrently for maximum speed
parties, terms, contract_info, attachments, duration_rating = (
await asyncio.gather(
process_parties(),
process_terms(),
process_contract_info(),
process_attachments(),
process_duration_rating(),
)
)
else:
# Process extractions sequentially
parties = await process_parties()
terms = await process_terms()
contract_info = await process_contract_info()
attachments = await process_attachments()
duration_rating = await process_duration_rating()
# Update cost tracker if provided
if cost_tracker:
for model, tokens in total_tokens.items():
cost_tracker.track_usage(model, tokens["input"], tokens["output"])
# Structure results in an easy-to-use format
results["contract_type"] = contract_info.contract_type
results["governing_law"] = contract_info.governing_law
results["effective_date"] = contract_info.effective_date
results["parties"] = parties.parties
results["term_years"] = terms.terms[0].duration_years if terms.terms else None
results["term_reference"] = terms.terms[0].reference if terms.terms else None
results["attachments"] = attachments.attachments
results["duration_rating"] = duration_rating.rating
return results
def process_document(
document_text, pipeline_and_components, cost_tracker=None, use_concurrency=True
):
"""
Process a document and track costs.
This is a Jupyter-compatible version that uses the existing event loop
instead of creating a new one with asyncio.run().
"""
# Get the current event loop
loop = asyncio.get_event_loop()
# Run the async function in the current event loop
return loop.run_until_complete(
process_document_async(
document_text, pipeline_and_components, cost_tracker, use_concurrency
)
)
# Example usage
# Sample contract texts (shortened for brevity)
doc1_text = (
"Consultancy Agreement\n"
"This agreement between Company A (Supplier) and Company B (Customer)...\n"
"The term of the agreement is 1 year from the Effective Date...\n"
"The Supplier shall provide consultancy services as described in Annex 2...\n"
"The Customer shall pay the Supplier within 30 calendar days of receiving an invoice...\n"
"All intellectual property created during the provision of services shall belong to the Customer...\n"
"This agreement is governed by the laws of Norway...\n"
"Annex 1: Data processing agreement...\n"
"Annex 2: Statement of Work...\n"
"Annex 3: Service Level Agreement...\n"
)
doc2_text = (
"Service Level Agreement\n"
"This agreement between TechCorp (Provider) and GlobalInc (Client)...\n"
"The agreement shall commence on January 1, 2023 and continue for 2 years...\n"
"The Provider shall deliver IT support services as outlined in Schedule A...\n"
"The Client shall make monthly payments of $5,000 within 15 days of invoice receipt...\n"
"The Provider guarantees [99.9%] uptime for all critical systems...\n"
"Either party may terminate with 60 days written notice...\n"
"This agreement is governed by the laws of California...\n"
"Schedule A: Service Descriptions...\n"
"Schedule B: Response Time Requirements...\n"
)
# Function to pretty-print document results
def print_document_results(doc_name, results):
print(f"\nResults from {doc_name}:")
print(f"Contract Type: {results['contract_type']}")
print(f"Parties: {[f'{p.name} ({p.role})' for p in results['parties']]}")
print(f"Term: {results['term_years']} years")
print(
f"Term Reference: {results['term_reference'] if results['term_reference'] else 'Not specified'}"
)
print(f"Governing Law: {results['governing_law']}")
print(f"Attachments: {[(a.title, a.description) for a in results['attachments']]}")
print(f"Duration Rating: {results['duration_rating'].score}/10")
print(f"Rating Justification: {results['duration_rating'].justification}")
# Create cost tracker
cost_tracker = CostTracker()
# Create pipeline with default configuration - returns both pipeline and components
pipeline, pipeline_components = create_document_pipeline()
# Process documents
print("Processing document 1 with concurrency...")
start_time = time.time()
doc1_results = process_document(
doc1_text, (pipeline, pipeline_components), cost_tracker, use_concurrency=True
)
print(f"Processing time: {time.time() - start_time:.2f} seconds")
print("Processing document 2 with concurrency...")
start_time = time.time()
doc2_results = process_document(
doc2_text, (pipeline, pipeline_components), cost_tracker, use_concurrency=True
)
print(f"Processing time: {time.time() - start_time:.2f} seconds")
# Print results
print_document_results("Document 1 (Consultancy Agreement)", doc1_results)
print_document_results("Document 2 (Service Level Agreement)", doc2_results)
# Print cost information
print("\nProcessing costs:")
costs = cost_tracker.get_costs()
for model, model_data in costs["model_costs"].items():
print(f"\n{model}:")
print(f" Input cost: ${model_data['input_cost']:.4f}")
print(f" Output cost: ${model_data['output_cost']:.4f}")
print(f" Total cost: ${model_data['total_cost']:.4f}")
print(f"\nTotal across all models: ${costs['total_cost']:.4f}")
LlamaIndex provides a robust data framework for LLM applications with excellent capabilities for knowledge retrieval and RAG. It offers powerful tools for working with documents and structured data, though implementing complex extraction workflows may require some additional configuration to fully leverage its capabilities.
Development overhead:
📝 Manual prompt engineering: Must craft detailed prompts for each extraction task
🔧 Manual model definition: Need to define Pydantic models and output parsers for structured data
🧩 Pipeline setup: Requires manual configuration of extraction pipeline components involving multiple LLMs
🔍 Limited reference tracking: Basic source tracking, but requires additional work for fine-grained references
📊 Embedding examples in prompts: Examples must be manually incorporated into prompts
🔄 Complex concurrency setup: Implementing concurrent processing requires additional setup with asyncio
💰 Cost tracking setup: Requires custom logic for cost tracking for each LLM
💾 No unified storage model: Need to write additional code to collect and organize results
# LlamaIndex implementation of analyzing multiple documents with a single pipeline,
# with different LLMs, concurrency, and cost tracking
# Jupyter notebook compatible version
import asyncio
import os
from textwrap import dedent
from typing import Optional
import nest_asyncio
nest_asyncio.apply()
from llama_index.core.callbacks import CallbackManager, TokenCountingHandler
from llama_index.core.output_parsers import PydanticOutputParser
from llama_index.core.program import LLMTextCompletionProgram
from llama_index.llms.openai import OpenAI
from pydantic import BaseModel, Field
# Pydantic models must be manually defined
class PartyInfo(BaseModel):
"""Information about contract parties"""
name: str = Field(description="Name of the party")
role: str = Field(description="Role of the party (e.g., Client, Provider)")
class Term(BaseModel):
"""Contract term information"""
duration_years: int = Field(description="Duration in years")
reference: str = Field(
description="Reference text from document"
) # LLM reciting a reference is error-prone and unreliable
class Attachment(BaseModel):
"""Contract attachment information"""
title: str = Field(description="Title of the attachment")
description: Optional[str] = Field(
description="Brief description of the attachment"
)
class ContractRating(BaseModel):
"""Rating with justification"""
score: int = Field(description="Rating score (1-10)")
justification: str = Field(description="Justification for the rating")
class ContractInfo(BaseModel):
"""Complete contract information"""
contract_type: str = Field(description="Type of contract")
effective_date: Optional[str] = Field(description="Effective date of the contract")
governing_law: Optional[str] = Field(description="Governing law of the contract")
class AspectExtraction(BaseModel):
"""Result of aspect extraction"""
aspect_text: str = Field(
description="Extracted text for this aspect"
) # this does not provide granular structured content, such as specific paragraphs and sentences
class PartyExtraction(BaseModel):
"""Party extraction results"""
parties: list[PartyInfo] = Field(description="List of parties in the contract")
class TermExtraction(BaseModel):
"""Term extraction results"""
terms: list[Term] = Field(description="Contract term details")
class AttachmentExtraction(BaseModel):
"""Attachment extraction results"""
attachments: list[Attachment] = Field(description="List of contract attachments")
class DurationRatingExtraction(BaseModel):
"""Duration adequacy rating"""
rating: ContractRating = Field(description="Rating of contract duration adequacy")
# Cost tracking class
class CostTracker:
"""Track LLM costs across multiple extractions"""
def __init__(self):
self.costs = {
"gpt-4o-mini": {
"input_per_1m": 0.15,
"output_per_1m": 0.60,
"input_tokens": 0,
"output_tokens": 0,
},
"o3-mini": {
"input_per_1m": 1.10,
"output_per_1m": 4.40,
"input_tokens": 0,
"output_tokens": 0,
},
}
self.total_cost = 0.0
def track_usage(self, model_name, input_tokens, output_tokens):
"""Track token usage for a model"""
# Extract base model name
base_model = model_name.split("/")[-1] if "/" in model_name else model_name
if base_model in self.costs:
self.costs[base_model]["input_tokens"] += input_tokens
self.costs[base_model]["output_tokens"] += output_tokens
# Calculate costs separately for input and output tokens
input_cost = input_tokens * (
self.costs[base_model]["input_per_1m"] / 1000000
)
output_cost = output_tokens * (
self.costs[base_model]["output_per_1m"] / 1000000
)
self.total_cost += input_cost + output_cost
def get_costs(self):
"""Get cost summary"""
model_costs = {}
for model, data in self.costs.items():
if data["input_tokens"] > 0 or data["output_tokens"] > 0:
input_cost = data["input_tokens"] * (data["input_per_1m"] / 1000000)
output_cost = data["output_tokens"] * (data["output_per_1m"] / 1000000)
model_costs[model] = {
"input_cost": input_cost,
"output_cost": output_cost,
"total_cost": input_cost + output_cost,
"input_tokens": data["input_tokens"],
"output_tokens": data["output_tokens"],
}
return {
"model_costs": model_costs,
"total_cost": self.total_cost,
}
# Helper functions for extractors
def get_llm(model_name="gpt-4o-mini", api_key=None, temperature=0, token_counter=None):
"""Get an OpenAI instance with the specified configuration"""
api_key = api_key or os.environ.get("CONTEXTGEM_OPENAI_API_KEY", "")
# Create callback manager with token counter if provided
callback_manager = None
if token_counter is not None:
callback_manager = CallbackManager([token_counter])
return OpenAI(
model=model_name,
api_key=api_key,
temperature=temperature,
callback_manager=callback_manager,
)
def create_aspect_extractor(
aspect_name, aspect_description, model_name="gpt-4o-mini", token_counter=None
):
"""Create an extractor to extract text related to a specific aspect"""
llm = get_llm(model_name=model_name, token_counter=token_counter)
# Prompt must be manually drafted
# This is a basic example, which is shortened for brevity. The prompt should be improved for better accuracy.
prompt_template = dedent(
f"""
You are an expert document analyzer. Extract the text related to the following aspect from the document.
Document:
{{document_text}}
Aspect: {aspect_name}
Description: {aspect_description}
Extract all text related to this aspect.
"""
) # this does not provide granular structured content, such as specific paragraphs and sentences
program = LLMTextCompletionProgram.from_defaults(
output_parser=PydanticOutputParser(output_cls=AspectExtraction),
prompt_template_str=prompt_template,
llm=llm,
)
return program
def create_party_extractor(model_name="gpt-4o-mini", token_counter=None):
"""Create an extractor for party information"""
llm = get_llm(model_name=model_name, token_counter=token_counter)
# Prompt must be manually drafted
# This is a basic example, which is shortened for brevity. The prompt should be improved for better accuracy.
prompt_template = dedent(
"""
You are an expert document analyzer. Extract all party information from the following contract text.
Contract text:
{aspect_text}
For each party, extract their name and role in the agreement.
"""
)
program = LLMTextCompletionProgram.from_defaults(
output_parser=PydanticOutputParser(output_cls=PartyExtraction),
prompt_template_str=prompt_template,
llm=llm,
)
return program
def create_term_extractor(model_name="gpt-4o-mini", token_counter=None):
"""Create an extractor for term information"""
llm = get_llm(model_name=model_name, token_counter=token_counter)
# Prompt must be manually drafted
# This is a basic example, which is shortened for brevity. The prompt should be improved for better accuracy.
prompt_template = dedent(
"""
You are an expert document analyzer. Extract term information from the following contract text.
Contract text:
{aspect_text}
Extract the contract term duration in years. Include the relevant reference text.
"""
)
program = LLMTextCompletionProgram.from_defaults(
output_parser=PydanticOutputParser(output_cls=TermExtraction),
prompt_template_str=prompt_template,
llm=llm,
)
return program
def create_contract_info_extractor(model_name="gpt-4o-mini", token_counter=None):
"""Create an extractor for basic contract information"""
llm = get_llm(model_name=model_name, token_counter=token_counter)
# Prompt must be manually drafted
# This is a basic example, which is shortened for brevity. The prompt should be improved for better accuracy.
prompt_template = dedent(
"""
You are an expert document analyzer. Extract the following information from the contract document.
Contract document:
{document_text}
Extract the contract type, effective date if mentioned, and governing law if specified.
"""
)
program = LLMTextCompletionProgram.from_defaults(
output_parser=PydanticOutputParser(output_cls=ContractInfo),
prompt_template_str=prompt_template,
llm=llm,
)
return program
def create_attachment_extractor(model_name="gpt-4o-mini", token_counter=None):
"""Create an extractor for attachment information"""
llm = get_llm(model_name=model_name, token_counter=token_counter)
# Prompt must be manually drafted
# This is a basic example, which is shortened for brevity. The prompt should be improved for better accuracy.
prompt_template = dedent(
"""
You are an expert document analyzer. Extract information about all attachments, annexes,
schedules, or appendices mentioned in the contract.
Contract document:
{document_text}
For each attachment, extract:
1. The title/name of the attachment (e.g., "Appendix A", "Schedule 1", "Annex 2")
2. A brief description of what the attachment contains (if mentioned in the document)
Example format:
{"title": "Appendix A", "description": "Code of conduct"}
"""
)
program = LLMTextCompletionProgram.from_defaults(
output_parser=PydanticOutputParser(output_cls=AttachmentExtraction),
prompt_template_str=prompt_template,
llm=llm,
)
return program
def create_duration_rating_extractor(model_name="o3-mini", token_counter=None):
"""Create an extractor to rate contract duration adequacy"""
llm = get_llm(model_name=model_name, token_counter=token_counter)
# Prompt must be manually drafted
# This is a basic example, which is shortened for brevity. The prompt should be improved for better accuracy.
prompt_template = dedent(
"""
You are an expert contract analyst. Evaluate the adequacy of the contract duration
considering the subject matter and best practices.
Contract document:
{document_text}
Rate the duration adequacy on a scale of 1-10, where:
1 = Extremely inadequate duration
10 = Perfectly adequate duration
Provide a brief justification for your rating (2-3 sentences).
"""
)
program = LLMTextCompletionProgram.from_defaults(
output_parser=PydanticOutputParser(output_cls=DurationRatingExtraction),
prompt_template_str=prompt_template,
llm=llm,
)
return program
# Main document processing functions
async def process_document_async(
document_text, cost_tracker=None, use_concurrency=True
):
"""Process a document asynchronously and track costs"""
results = {}
# Create separate token counting handlers for each model
gpt4o_token_counter = TokenCountingHandler()
o3_token_counter = TokenCountingHandler()
# Create extractors with appropriate token counters
party_aspect_extractor = create_aspect_extractor(
"Contract Parties",
"Clauses defining the parties to the agreement",
token_counter=gpt4o_token_counter,
)
term_aspect_extractor = create_aspect_extractor(
"Term",
"Clauses defining the term of the agreement",
token_counter=gpt4o_token_counter,
)
party_extractor = create_party_extractor(token_counter=gpt4o_token_counter)
term_extractor = create_term_extractor(token_counter=gpt4o_token_counter)
contract_info_extractor = create_contract_info_extractor(
token_counter=gpt4o_token_counter
)
attachment_extractor = create_attachment_extractor(
token_counter=gpt4o_token_counter
)
# Use separate token counter for o3-mini
duration_rating_extractor = create_duration_rating_extractor(
model_name="o3-mini", token_counter=o3_token_counter
)
# Define processing functions using native async methods
async def process_party_aspect():
response = await party_aspect_extractor.acall(document_text=document_text)
return response
async def process_term_aspect():
response = await term_aspect_extractor.acall(document_text=document_text)
return response
# Get aspect texts
if use_concurrency:
party_aspect, term_aspect = await asyncio.gather(
process_party_aspect(), process_term_aspect()
)
else:
party_aspect = await process_party_aspect()
term_aspect = await process_term_aspect()
async def process_parties():
party_results = await party_extractor.acall(
aspect_text=party_aspect.aspect_text
)
return party_results
async def process_terms():
term_results = await term_extractor.acall(aspect_text=term_aspect.aspect_text)
return term_results
async def process_contract_info():
contract_info = await contract_info_extractor.acall(document_text=document_text)
return contract_info
async def process_attachments():
attachments = await attachment_extractor.acall(document_text=document_text)
return attachments
async def process_duration_rating():
duration_rating = await duration_rating_extractor.acall(
document_text=document_text
)
return duration_rating
# Run extractions based on concurrency preference
if use_concurrency:
parties, terms, contract_info, attachments, duration_rating = (
await asyncio.gather(
process_parties(),
process_terms(),
process_contract_info(),
process_attachments(),
process_duration_rating(),
)
)
else:
parties = await process_parties()
terms = await process_terms()
contract_info = await process_contract_info()
attachments = await process_attachments()
duration_rating = await process_duration_rating()
# Get token usage from the token counter and update cost tracker
if cost_tracker:
cost_tracker.track_usage(
"gpt-4o-mini",
gpt4o_token_counter.prompt_llm_token_count,
gpt4o_token_counter.completion_llm_token_count,
)
cost_tracker.track_usage(
"o3-mini",
o3_token_counter.prompt_llm_token_count,
o3_token_counter.completion_llm_token_count,
)
# Structure results in an easy-to-use format
results["contract_type"] = contract_info.contract_type
results["governing_law"] = contract_info.governing_law
results["effective_date"] = contract_info.effective_date
results["parties"] = parties.parties
results["term_years"] = terms.terms[0].duration_years if terms.terms else None
results["term_reference"] = terms.terms[0].reference if terms.terms else None
results["attachments"] = attachments.attachments
results["duration_rating"] = duration_rating.rating
return results
def process_document(document_text, cost_tracker=None, use_concurrency=True):
"""
Process a document and track costs.
This is a Jupyter-compatible version that uses the existing event loop
instead of creating a new one with asyncio.run().
"""
loop = asyncio.get_event_loop()
return loop.run_until_complete(
process_document_async(document_text, cost_tracker, use_concurrency)
)
# Function to pretty-print document results
def print_document_results(doc_name, results):
print(f"\nResults from {doc_name}:")
print(f"Contract Type: {results['contract_type']}")
print(f"Parties: {[f'{p.name} ({p.role})' for p in results['parties']]}")
print(f"Term: {results['term_years']} years")
print(
f"Term Reference: {results['term_reference'] if results['term_reference'] else 'Not specified'}"
)
print(f"Governing Law: {results['governing_law']}")
print(f"Attachments: {[(a.title, a.description) for a in results['attachments']]}")
print(f"Duration Rating: {results['duration_rating'].score}/10")
print(f"Rating Justification: {results['duration_rating'].justification}")
# Example usage
# Sample contract texts (shortened for brevity)
doc1_text = (
"Consultancy Agreement\n"
"This agreement between Company A (Supplier) and Company B (Customer)...\n"
"The term of the agreement is 1 year from the Effective Date...\n"
"The Supplier shall provide consultancy services as described in Annex 2...\n"
"The Customer shall pay the Supplier within 30 calendar days of receiving an invoice...\n"
"All intellectual property created during the provision of services shall belong to the Customer...\n"
"This agreement is governed by the laws of Norway...\n"
"Annex 1: Data processing agreement...\n"
"Annex 2: Statement of Work...\n"
"Annex 3: Service Level Agreement...\n"
)
doc2_text = (
"Service Level Agreement\n"
"This agreement between TechCorp (Provider) and GlobalInc (Client)...\n"
"The agreement shall commence on January 1, 2023 and continue for 2 years...\n"
"The Provider shall deliver IT support services as outlined in Schedule A...\n"
"The Client shall make monthly payments of $5,000 within 15 days of invoice receipt...\n"
"The Provider guarantees [99.9%] uptime for all critical systems...\n"
"Either party may terminate with 60 days written notice...\n"
"This agreement is governed by the laws of California...\n"
"Schedule A: Service Descriptions...\n"
"Schedule B: Response Time Requirements...\n"
)
# Create cost tracker
cost_tracker = CostTracker()
# Process documents
print("Processing document 1 with concurrency...")
doc1_results = process_document(doc1_text, cost_tracker, use_concurrency=True)
print("Processing document 2 with concurrency...")
doc2_results = process_document(doc2_text, cost_tracker, use_concurrency=True)
# Print results
print_document_results("Document 1 (Consultancy Agreement)", doc1_results)
print_document_results("Document 2 (Service Level Agreement)", doc2_results)
# Print cost information
print("\nProcessing costs:")
costs = cost_tracker.get_costs()
for model, model_data in costs["model_costs"].items():
print(f"\n{model}:")
print(f" Input cost: ${model_data['input_cost']:.4f}")
print(f" Output cost: ${model_data['output_cost']:.4f}")
print(f" Total cost: ${model_data['total_cost']:.4f}")
print(f"\nTotal across all models: ${costs['total_cost']:.4f}")
Instructor is a powerful library focused on structured outputs from LLMs with strong typing support through Pydantic. It excels at extracting structured data with validation, but requires additional work to build complex extraction pipelines.
Development overhead:
📝 Manual prompt engineering: Crafting comprehensive prompts that guide the LLM effectively
🔧 Manual model definition: Developers must define Pydantic validation models for structured output
🧩 Manual pipeline assembly: Requires custom code to connect extraction components involving multiple LLMs
🔍 Manual reference mapping: Must implement custom logic to track source references
📊 Embedding examples in prompts: Examples must be manually incorporated into prompts
🔄 Complex concurrency setup: Implementing concurrent processing requires additional setup with asyncio
💰 Cost tracking setup: Requires custom logic for cost tracking for each LLM
# Instructor implementation of analyzing multiple documents with a single pipeline,
# with different LLMs, concurrency, and cost tracking
# Jupyter notebook compatible version
import asyncio
import os
from dataclasses import dataclass, field
from textwrap import dedent
from typing import Optional
import instructor
import nest_asyncio
from openai import AsyncOpenAI, OpenAI
from pydantic import BaseModel, Field
nest_asyncio.apply()
# Pydantic models must be manually defined
class PartyInfo(BaseModel):
"""Information about contract parties"""
name: str = Field(description="Name of the party")
role: str = Field(description="Role of the party (e.g., Client, Provider)")
class Term(BaseModel):
"""Contract term information"""
duration_years: int = Field(description="Duration in years")
reference: str = Field(
description="Reference text from document"
) # LLM reciting a reference is error-prone and unreliable
class Attachment(BaseModel):
"""Contract attachment information"""
title: str = Field(description="Title of the attachment")
description: Optional[str] = Field(
description="Brief description of the attachment"
)
class ContractRating(BaseModel):
"""Rating with justification"""
score: int = Field(description="Rating score (1-10)")
justification: str = Field(description="Justification for the rating")
class ContractInfo(BaseModel):
"""Complete contract information"""
contract_type: str = Field(description="Type of contract")
effective_date: Optional[str] = Field(description="Effective date of the contract")
governing_law: Optional[str] = Field(description="Governing law of the contract")
class AspectExtraction(BaseModel):
"""Result of aspect extraction"""
aspect_text: str = Field(
description="Extracted text for this aspect"
) # this does not provide granular structured content, such as specific paragraphs and sentences
class PartyExtraction(BaseModel):
"""Party extraction results"""
parties: list[PartyInfo] = Field(description="List of parties in the contract")
class TermExtraction(BaseModel):
"""Term extraction results"""
terms: list[Term] = Field(description="Contract term details")
class AttachmentExtraction(BaseModel):
"""Attachment extraction results"""
attachments: list[Attachment] = Field(description="List of contract attachments")
class DurationRatingExtraction(BaseModel):
"""Duration adequacy rating"""
rating: ContractRating = Field(description="Rating of contract duration adequacy")
# Configuration models must be manually defined
@dataclass
class ExtractorConfig:
"""Configuration for a specific extractor"""
name: str
description: str
model_name: str = "gpt-4o-mini" # Default model
@dataclass
class PipelineConfig:
"""Complete pipeline configuration"""
# Aspect extractors
party_extractor: ExtractorConfig = field(
default_factory=lambda: ExtractorConfig(
name="Contract Parties",
description="Clauses defining the parties to the agreement",
)
)
term_extractor: ExtractorConfig = field(
default_factory=lambda: ExtractorConfig(
name="Term", description="Clauses defining the term of the agreement"
)
)
# Document-level extractors
contract_info_extractor: ExtractorConfig = field(
default_factory=lambda: ExtractorConfig(
name="Contract Information",
description="Basic contract information including type, date, and governing law",
)
)
attachment_extractor: ExtractorConfig = field(
default_factory=lambda: ExtractorConfig(
name="Attachments",
description="Contract attachments and their descriptions",
)
)
duration_rating_extractor: ExtractorConfig = field(
default_factory=lambda: ExtractorConfig(
name="Duration Rating",
description="Rating of contract duration adequacy",
model_name="o3-mini", # Using a more capable model for judgment
)
)
# LLM client setup
def get_client(api_key=None):
"""Get an OpenAI client with instructor integrated"""
api_key = api_key or os.environ.get("CONTEXTGEM_OPENAI_API_KEY", "")
client = OpenAI(api_key=api_key)
return instructor.from_openai(client)
async def get_async_client(api_key=None):
"""Get an AsyncOpenAI client with instructor integrated"""
api_key = api_key or os.environ.get("CONTEXTGEM_OPENAI_API_KEY", "")
client = AsyncOpenAI(api_key=api_key)
return instructor.from_openai(client)
# Helper function to execute completions with token tracking
async def execute_with_tracking(model, messages, response_model, cost_tracker=None):
"""
Execute a completion request with token tracking.
"""
# Create the Instructor client
client = await get_async_client()
# Make a single API call with Instructor
response = await client.chat.completions.create(
model=model, response_model=response_model, messages=messages
)
# Access the raw response to get token usage
if cost_tracker and hasattr(response, "_raw_response"):
raw_response = response._raw_response
if hasattr(raw_response, "usage"):
prompt_tokens = raw_response.usage.prompt_tokens
completion_tokens = raw_response.usage.completion_tokens
cost_tracker.track_usage(model, prompt_tokens, completion_tokens)
return response
def execute_sync(model, messages, response_model):
"""Execute a completion request synchronously"""
client = get_client()
return client.chat.completions.create(
model=model, response_model=response_model, messages=messages
)
# Unified extraction functions
def extract_aspect(
document_text,
aspect_name,
aspect_description,
model_name="gpt-4o-mini",
is_async=False,
cost_tracker=None,
):
"""Extract text related to a specific aspect"""
# Prompt must be manually drafted
# This is a basic example, which is shortened for brevity. The prompt should be improved for better accuracy.
prompt = dedent(
f"""
You are an expert document analyzer. Extract the text related to the following aspect from the document.
Document:
{document_text}
Aspect: {aspect_name}
Description: {aspect_description}
Extract all text related to this aspect.
"""
) # this does not provide granular structured content, such as specific paragraphs and sentences
messages = [
{"role": "system", "content": "You are an expert document analyzer."},
{"role": "user", "content": prompt},
]
if is_async:
return execute_with_tracking(
model_name, messages, AspectExtraction, cost_tracker
)
else:
return execute_sync(model_name, messages, AspectExtraction)
def extract_parties(
aspect_text, model_name="gpt-4o-mini", is_async=False, cost_tracker=None
):
"""Extract party information"""
# Prompt must be manually drafted
# This is a basic example, which is shortened for brevity. The prompt should be improved for better accuracy.
prompt = dedent(
f"""
You are an expert document analyzer. Extract all party information from the following contract text.
Contract text:
{aspect_text}
For each party, extract their name and role in the agreement.
"""
)
messages = [
{"role": "system", "content": "You are an expert document analyzer."},
{"role": "user", "content": prompt},
]
if is_async:
return execute_with_tracking(
model_name, messages, PartyExtraction, cost_tracker
)
else:
return execute_sync(model_name, messages, PartyExtraction)
def extract_terms(
aspect_text, model_name="gpt-4o-mini", is_async=False, cost_tracker=None
):
"""Extract term information"""
# Prompt must be manually drafted
# This is a basic example, which is shortened for brevity. The prompt should be improved for better accuracy.
prompt = dedent(
f"""
You are an expert document analyzer. Extract term information from the following contract text.
Contract text:
{aspect_text}
Extract the contract term duration in years. Include the relevant reference text.
"""
)
messages = [
{"role": "system", "content": "You are an expert document analyzer."},
{"role": "user", "content": prompt},
]
if is_async:
return execute_with_tracking(model_name, messages, TermExtraction, cost_tracker)
else:
return execute_sync(model_name, messages, TermExtraction)
def extract_contract_info(
document_text, model_name="gpt-4o-mini", is_async=False, cost_tracker=None
):
"""Extract basic contract information"""
# Prompt must be manually drafted
# This is a basic example, which is shortened for brevity. The prompt should be improved for better accuracy.
prompt = dedent(
f"""
You are an expert document analyzer. Extract the following information from the contract document.
Contract document:
{document_text}
Extract the contract type, effective date if mentioned, and governing law if specified.
"""
)
messages = [
{"role": "system", "content": "You are an expert document analyzer."},
{"role": "user", "content": prompt},
]
if is_async:
return execute_with_tracking(model_name, messages, ContractInfo, cost_tracker)
else:
return execute_sync(model_name, messages, ContractInfo)
def extract_attachments(
document_text, model_name="gpt-4o-mini", is_async=False, cost_tracker=None
):
"""Extract attachment information"""
# Prompt must be manually drafted
# This is a basic example, which is shortened for brevity. The prompt should be improved for better accuracy.
prompt = dedent(
f"""
You are an expert document analyzer. Extract information about all attachments, annexes,
schedules, or appendices mentioned in the contract.
Contract document:
{document_text}
For each attachment, extract:
1. The title/name of the attachment (e.g., "Appendix A", "Schedule 1", "Annex 2")
2. A brief description of what the attachment contains (if mentioned in the document)
"""
)
messages = [
{"role": "system", "content": "You are an expert document analyzer."},
{"role": "user", "content": prompt},
]
if is_async:
return execute_with_tracking(
model_name, messages, AttachmentExtraction, cost_tracker
)
else:
return execute_sync(model_name, messages, AttachmentExtraction)
def extract_duration_rating(
document_text, model_name="o3-mini", is_async=False, cost_tracker=None
):
"""Rate contract duration adequacy"""
# Prompt must be manually drafted
# This is a basic example, which is shortened for brevity. The prompt should be improved for better accuracy.
prompt = dedent(
f"""
You are an expert contract analyst. Evaluate the adequacy of the contract duration
considering the subject matter and best practices.
Contract document:
{document_text}
Rate the duration adequacy on a scale of 1-10, where:
1 = Extremely inadequate duration
10 = Perfectly adequate duration
Provide a brief justification for your rating (2-3 sentences).
"""
)
messages = [
{"role": "system", "content": "You are an expert contract analyst."},
{"role": "user", "content": prompt},
]
if is_async:
return execute_with_tracking(
model_name, messages, DurationRatingExtraction, cost_tracker
)
else:
return execute_sync(model_name, messages, DurationRatingExtraction)
# Cost tracking
class CostTracker:
"""Track LLM costs across multiple extractions"""
def __init__(self):
self.costs = {
"gpt-4o-mini": {
"input_per_1m": 0.15,
"output_per_1m": 0.60,
"input_tokens": 0,
"output_tokens": 0,
},
"o3-mini": {
"input_per_1m": 1.10,
"output_per_1m": 4.40,
"input_tokens": 0,
"output_tokens": 0,
},
}
self.total_cost = 0.0
def track_usage(self, model_name, input_tokens, output_tokens):
"""Track token usage for a model"""
# Extract base model name
base_model = model_name.split("/")[-1] if "/" in model_name else model_name
if base_model in self.costs:
self.costs[base_model]["input_tokens"] += input_tokens
self.costs[base_model]["output_tokens"] += output_tokens
# Calculate costs separately for input and output tokens
input_cost = input_tokens * (
self.costs[base_model]["input_per_1m"] / 1000000
)
output_cost = output_tokens * (
self.costs[base_model]["output_per_1m"] / 1000000
)
self.total_cost += input_cost + output_cost
def get_costs(self):
"""Get cost summary"""
model_costs = {}
for model, data in self.costs.items():
if data["input_tokens"] > 0 or data["output_tokens"] > 0:
input_cost = data["input_tokens"] * (data["input_per_1m"] / 1000000)
output_cost = data["output_tokens"] * (data["output_per_1m"] / 1000000)
model_costs[model] = {
"input_cost": input_cost,
"output_cost": output_cost,
"total_cost": input_cost + output_cost,
"input_tokens": data["input_tokens"],
"output_tokens": data["output_tokens"],
}
return {
"model_costs": model_costs,
"total_cost": self.total_cost,
}
# Document processing functions
async def process_document_async(
document_text, config=None, cost_tracker=None, use_concurrency=True
):
"""Process a document asynchronously and track costs"""
if config is None:
config = PipelineConfig()
results = {}
# Define processing functions
async def process_party_pipeline():
# Extract party aspect
party_aspect = await extract_aspect(
document_text,
config.party_extractor.name,
config.party_extractor.description,
model_name=config.party_extractor.model_name,
is_async=True,
cost_tracker=cost_tracker,
)
# Extract parties from the aspect
parties = await extract_parties(
party_aspect.aspect_text,
model_name=config.party_extractor.model_name,
is_async=True,
cost_tracker=cost_tracker,
)
return parties
async def process_term_pipeline():
# Extract term aspect
term_aspect = await extract_aspect(
document_text,
config.term_extractor.name,
config.term_extractor.description,
model_name=config.term_extractor.model_name,
is_async=True,
cost_tracker=cost_tracker,
)
# Extract terms from the aspect
terms = await extract_terms(
term_aspect.aspect_text,
model_name=config.term_extractor.model_name,
is_async=True,
cost_tracker=cost_tracker,
)
return terms
async def process_contract_info():
return await extract_contract_info(
document_text,
model_name=config.contract_info_extractor.model_name,
is_async=True,
cost_tracker=cost_tracker,
)
async def process_attachments():
return await extract_attachments(
document_text,
model_name=config.attachment_extractor.model_name,
is_async=True,
cost_tracker=cost_tracker,
)
async def process_duration_rating():
return await extract_duration_rating(
document_text,
model_name=config.duration_rating_extractor.model_name,
is_async=True,
cost_tracker=cost_tracker,
)
# Run extractions based on concurrency preference
if use_concurrency:
# Process all extractions concurrently for maximum speed
parties, terms, contract_info, attachments, duration_rating = (
await asyncio.gather(
process_party_pipeline(),
process_term_pipeline(),
process_contract_info(),
process_attachments(),
process_duration_rating(),
)
)
else:
# Process extractions sequentially
parties = await process_party_pipeline()
terms = await process_term_pipeline()
contract_info = await process_contract_info()
attachments = await process_attachments()
duration_rating = await process_duration_rating()
# Structure results in the same format as the LangChain implementation
results["contract_type"] = contract_info.contract_type
results["governing_law"] = contract_info.governing_law
results["effective_date"] = contract_info.effective_date
results["parties"] = parties.parties
results["term_years"] = terms.terms[0].duration_years if terms.terms else None
results["term_reference"] = terms.terms[0].reference if terms.terms else None
results["attachments"] = attachments.attachments
results["duration_rating"] = duration_rating.rating
return results
def process_document(
document_text, config=None, cost_tracker=None, use_concurrency=True
):
"""
Process a document and track costs.
"""
# Get the current event loop
loop = asyncio.get_event_loop()
# Run the async function in the current event loop
return loop.run_until_complete(
process_document_async(document_text, config, cost_tracker, use_concurrency)
)
# Example usage
# Sample contract texts (shortened for brevity)
doc1_text = (
"Consultancy Agreement\n"
"This agreement between Company A (Supplier) and Company B (Customer)...\n"
"The term of the agreement is 1 year from the Effective Date...\n"
"The Supplier shall provide consultancy services as described in Annex 2...\n"
"The Customer shall pay the Supplier within 30 calendar days of receiving an invoice...\n"
"All intellectual property created during the provision of services shall belong to the Customer...\n"
"This agreement is governed by the laws of Norway...\n"
"Annex 1: Data processing agreement...\n"
"Annex 2: Statement of Work...\n"
"Annex 3: Service Level Agreement...\n"
)
doc2_text = (
"Service Level Agreement\n"
"This agreement between TechCorp (Provider) and GlobalInc (Client)...\n"
"The agreement shall commence on January 1, 2023 and continue for 2 years...\n"
"The Provider shall deliver IT support services as outlined in Schedule A...\n"
"The Client shall make monthly payments of $5,000 within 15 days of invoice receipt...\n"
"The Provider guarantees [99.9%] uptime for all critical systems...\n"
"Either party may terminate with 60 days written notice...\n"
"This agreement is governed by the laws of California...\n"
"Schedule A: Service Descriptions...\n"
"Schedule B: Response Time Requirements...\n"
)
# Function to pretty-print document results
def print_document_results(doc_name, results):
print(f"\nResults from {doc_name}:")
print(f"Contract Type: {results['contract_type']}")
print(f"Parties: {[f'{p.name} ({p.role})' for p in results['parties']]}")
print(f"Term: {results['term_years']} years")
print(
f"Term Reference: {results['term_reference'] if results['term_reference'] else 'Not specified'}"
)
print(f"Governing Law: {results['governing_law']}")
print(f"Attachments: {[(a.title, a.description) for a in results['attachments']]}")
print(f"Duration Rating: {results['duration_rating'].score}/10")
print(f"Rating Justification: {results['duration_rating'].justification}")
# Create cost tracker
cost_tracker = CostTracker()
# Create pipeline with default configuration
config = PipelineConfig()
# Process documents
print("Processing document 1 with concurrency...")
doc1_results = process_document(doc1_text, config, cost_tracker, use_concurrency=True)
print("Processing document 2 with concurrency...")
doc2_results = process_document(doc2_text, config, cost_tracker, use_concurrency=True)
# Print results
print_document_results("Document 1 (Consultancy Agreement)", doc1_results)
print_document_results("Document 2 (Service Level Agreement)", doc2_results)
# Print cost information
print("\nProcessing costs:")
costs = cost_tracker.get_costs()
for model, model_data in costs["model_costs"].items():
print(f"\n{model}:")
print(f" Input cost: ${model_data['input_cost']:.4f}")
print(f" Output cost: ${model_data['output_cost']:.4f}")
print(f" Total cost: ${model_data['total_cost']:.4f}")
print(f"\nTotal across all models: ${costs['total_cost']:.4f}")