Detailed documentation for each component in the LineageGraph system.
The main agent orchestration system using LangGraph.
Key Functions:
create_agent_graph(): Creates the LangGraph state machinerun_agent(query, verbose=False): Main entry point for agent execution
Graph Structure:
START → plan → investigate → tool → check_continue
↓
[loop back or]
↓
synthesize → END
Individual nodes in the agent graph:
-
plan_node(state)
- Analyzes user query
- Creates execution plan
- Determines required information
-
investigate_node(state)
- Selects appropriate tools
- Prepares tool inputs
- Decides investigation strategy
-
synthesize_node(state)
- Combines tool results
- Generates final answer using LLM
- Calculates confidence score
Type definitions for agent state:
class AgentState(TypedDict):
user_query: str
current_step: str
plan: str
next_tool: str
tool_results: Dict[str, Any]
final_answer: str
confidence_score: float
step_count: int
tool_calls_made: List[str]Tools available to the agent:
Purpose: Semantic search over table descriptions
Input:
{
"query": "What feeds into revenue?",
"limit": 3
}Output:
{
"success": True,
"count": 3,
"items": [
{
"id": "table_revenue_daily",
"table_name": "revenue_daily",
"text": "...",
"similarity": 0.85
},
...
]
}Purpose: Get upstream dependencies of a table
Input:
{
"table_id": "dashboard_revenue",
"depth": 3
}Output:
{
"success": True,
"dependencies": [
{
"id": "table_revenue_daily",
"name": "revenue_daily",
"type": "Table",
"depth": 0
},
...
]
}Purpose: Validate if a path exists between two nodes
Input:
{
"source_id": "table_orders",
"target_id": "dashboard_revenue"
}Output:
{
"success": True,
"is_valid": True,
"path_length": 3
}Purpose: Get metadata for a specific node
Input:
{
"node_id": "table_users"
}Output:
{
"success": True,
"node": {
"id": "table_users",
"name": "users",
"type": "Table",
"description": "..."
}
}Purpose: Trace complete data flow path
Input:
{
"start_node": "table_orders",
"end_node": "dashboard_revenue"
}Output:
{
"success": True,
"path": [
"table_orders",
"table_order_clean",
"table_revenue_daily",
"dashboard_revenue"
]
}Purpose: Check data freshness score
Input:
{
"table_id": "table_users"
}Output:
{
"success": True,
"freshness_score": 0.95,
"last_updated": "2024-01-01T00:00:00Z"
}DuckDB-based vector database for semantic search.
Key Methods:
add_embedding(id, text, embedding, table_name, source_type): Store embeddingsearch(query_embedding, limit=3): Search for similar embeddings
Schema:
embeddingstable: Text and metadatavectorstable: Embedding vectors
PostgreSQL-based graph database for lineage relationships.
Key Methods:
add_node(id, node_type, name, description): Add a nodeadd_edge(source_id, target_id, edge_type): Add a relationshipget_dependencies(node_id, depth): Get upstream dependencies
Schema:
nodestable: Graph nodesedgestable: Graph relationships
Sentence-transformers based embedder.
Model: all-MiniLM-L6-v2 (384 dimensions)
Key Methods:
embed_text(text): Generate embedding for textembed_batch(texts): Generate embeddings for multiple texts
Main FastAPI application.
Endpoints:
GET /health: Health checkPOST /api/query: Execute lineage query
Request/Response Models:
QueryRequest: Input modelQueryResponse: Output model
Main query interface component.
Features:
- Natural language query input
- Results display
- Error handling
HTTP client for backend communication.
Methods:
queryLineage(query, depth): Send query to backend
Bash script for managing services.
Commands:
start: Start all servicesstop: Stop all servicesstatus: Check service statusrestart: Restart all services
OpenTelemetry tracing support.
Features:
- Agent execution tracing
- Tool call tracing
- LLM inference tracing
Usage:
export TRACING_ENABLED=true
# Traces sent to Jaeger at http://localhost:16686Loads sample lineage data into PostgreSQL.
Sample Data:
- 5 nodes (users, orders, order_clean, revenue_daily, revenue_dashboard)
- 4 edges (lineage relationships)
Loads sample embeddings into DuckDB.
Sample Data:
- 5 table descriptions
- Embeddings for each description
Comprehensive evaluation system.
Features:
- Golden dataset evaluation
- Node recall calculation
- Answer relevance scoring
- Pass rate metrics
test_agent_tools.py: Unit tests for agent toolstest_agent_graph.py: Tests for agent graphtest_week1_5_integration.py: Integration teststest_evaluation_pipeline.py: Evaluation pipeline tests
DATABASE_URL: PostgreSQL connection stringTRACING_ENABLED: Enable/disable OpenTelemetry tracing
requirements.txt: Python dependenciesfrontend/package.json: Frontend dependencies.github/workflows/test.yml: CI/CD configuration