Build an AI-Powered MCP pipeline with Mage Pro — Part II: Building the intelligence layer, connect your documents to Claude via MCP
Link to original article written by Cole Freeman : https://www.mage.ai/blog/build-an-ai-powered-mcp-pipeline-with-mage-pro-part-ii-building-the-intelligence-layer-connect-your-documents-to-claude-via-mcp
TLDR
We’re continuing our goal to build “Attractor,” an AI system that answers questions about Space and Physics using a research paper by Christopher Cillian O’neill, “Evidence of the Great Attractor and Great Repeller from Artificial Neural Network Imputation of Sloan Digital Sky Survey.” In Part I, we built the data foundation, extracting, cleaning, and chunking PDF content to prepare it for AI integration.
In Part II we’ll focus on implementing an intelligence layer that connects the processed documents to Anthropic’s API through a Model Context Protocol (MCP) server. We’ll implement the core functionality where an MCP server manages document search and retrieval, then sends relevant content to Claude for analysis. The we’ll return an intelligent answer.
Table of contents
Introduction
In Part I, we built the data processing foundation of our Model Context Protocol (MCP) pipeline — extracting, cleaning, and chunking PDF content about the Great Attractor. Now we’ll bring our “Attractor” AI to life by integrating Anthropic’s Claude API. Part II focuses on the intelligence layer where we’ll connect our processed document chunks to Claude through an MCP server. Instead of relying on Claude’s general training data, our AI will answer questions based strictly on the research paper we processed. This will create more reliable and traceable responses which is perfect for internal or external business applications that need to answer questions from company knowledge assets.
Understanding MCP in practice
Unlike traditional RAG that directly embed and query document chunks, leveraging MCP creates a structured, server-client architecture for document interaction. Attractor, the name of our AI tool, acts as a dedicated document service that organizes PDF chunks into searchable content. It provides tools that can understand questions and find relevant answers from the document.
When a question is asked, the process follows the MCP protocol in several steps:
Taking this approach separates the document handling from the AI Processing and follows the new MCP standard. It will work well with other tools and only send the most relevant document sections to Claude. This will save costs and give better answers. Check out the code below to build you MCP server if you are following along with this project:
#!/usr/bin/env python3
import asyncio
import json
import os
import sys
try:
from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
from mcp.types import (
Resource,
Tool,
TextContent,
)
print("All imports successful", file=sys.stderr)
except ImportError as e:
print(f"Import error: {e}", file=sys.stderr)
sys.exit(1)
# Initialize the MCP server
server = Server("simple-document-server")
DOCUMENT_CHUNKS = []
@server.list_resources()
async def handle_list_resources():
"""List available document resources"""
print(f"Listing {len(DOCUMENT_CHUNKS)} resources", file=sys.stderr)
resources = []
for chunk in DOCUMENT_CHUNKS:
resources.append(
Resource(
uri=f"document://great_attractor/chunk_{chunk['chunk_id']}",
name=f"Document Chunk {chunk['chunk_id']}",
description=f"Section {chunk['chunk_id']} of the Great Attractor research paper",
mimeType="text/plain"
)
)
return resources
@server.read_resource()
async def handle_read_resource(uri: str):
"""Read content from a specific document resource"""
# Convert URI to string if it's a Pydantic URL object
uri_str = str(uri)
print(f"Reading resource: {uri_str}", file=sys.stderr)
if uri_str.startswith("document://great_attractor/chunk_"):
try:
chunk_id = int(uri_str.split("_")[-1])
print(f"Looking for chunk_id: {chunk_id}", file=sys.stderr)
# Ensure we have the right data structure
chunks_to_search = DOCUMENT_CHUNKS
if isinstance(DOCUMENT_CHUNKS, dict) and 'chunks' in DOCUMENT_CHUNKS:
chunks_to_search = DOCUMENT_CHUNKS['chunks']
for chunk in chunks_to_search:
if chunk.get("chunk_id") == chunk_id:
content = chunk.get("text", "")
print(f"Found chunk {chunk_id}, returning {len(content)} characters", file=sys.stderr)
return content
print(f"Chunk {chunk_id} not found in {len(chunks_to_search)} available chunks", file=sys.stderr)
raise ValueError(f"Chunk {chunk_id} not found")
except (ValueError, IndexError) as e:
print(f"Error parsing chunk_id from {uri_str}: {e}", file=sys.stderr)
raise ValueError(f"Invalid chunk URI: {uri_str}")
raise ValueError(f"Resource not found: {uri_str}")
@server.list_tools()
async def handle_list_tools():
"""List available tools"""
return [
Tool(
name="search_document",
description="Search for relevant document chunks",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string"},
"max_results": {"type": "integer", "default": 3}
},
"required": ["query"]
}
)
]
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict):
"""Handle tool calls"""
print(f"Tool called: {name} with args: {arguments}", file=sys.stderr)
print(f"Available chunks: {len(DOCUMENT_CHUNKS)}", file=sys.stderr)
if name == "search_document":
query = arguments.get("query", "")
max_results = arguments.get("max_results", 3)
print(f"Searching for: {query} with max_results: {max_results}", file=sys.stderr)
print(f"DOCUMENT_CHUNKS contains: {len(DOCUMENT_CHUNKS)} chunks", file=sys.stderr)
try:
# Ensure DOCUMENT_CHUNKS is a list
chunks_to_search = DOCUMENT_CHUNKS
if isinstance(DOCUMENT_CHUNKS, dict):
if 'chunks' in DOCUMENT_CHUNKS:
chunks_to_search = DOCUMENT_CHUNKS['chunks']
else:
# Convert dict values to list if it's a dict of chunks
chunks_to_search = list(DOCUMENT_CHUNKS.values())
if not chunks_to_search:
print("WARNING: No document chunks available for search!", file=sys.stderr)
return [TextContent(
type="text",
text=json.dumps({
"chunk_ids": [],
"total_matches": 0,
"error": "No document chunks loaded"
})
)]
print(f"Searching through {len(chunks_to_search)} chunks", file=sys.stderr)
# Simple search - return first few chunks that exist
chunk_ids = []
for i, chunk in enumerate(chunks_to_search):
if i >= max_results:
break
chunk_id = chunk.get("chunk_id", i) if isinstance(chunk, dict) else i
chunk_ids.append(chunk_id)
result = {
"chunk_ids": chunk_ids,
"total_matches": len(chunks_to_search),
"query": query,
"debug_chunks_type": str(type(chunks_to_search))
}
print(f"Search result: {result}", file=sys.stderr)
return [TextContent(
type="text",
text=json.dumps(result)
)]
except Exception as e:
print(f"Search error: {e}", file=sys.stderr)
import traceback
print(f"Full traceback: {traceback.format_exc()}", file=sys.stderr)
return [TextContent(
type="text",
text=json.dumps({"error": str(e), "query": query})
)]
raise ValueError(f"Unknown tool: {name}")
async def main():
print("Starting MCP server", file=sys.stderr)
# Load document chunks from environment variable
chunks_env = os.getenv("DOCUMENT_CHUNKS")
global DOCUMENT_CHUNKS
if chunks_env:
try:
DOCUMENT_CHUNKS = json.loads(chunks_env)
print(f"Loaded {len(DOCUMENT_CHUNKS)} chunks from environment", file=sys.stderr)
# Debug: print first chunk info
if DOCUMENT_CHUNKS:
print(f"DOCUMENT_CHUNKS type: {type(DOCUMENT_CHUNKS)}", file=sys.stderr)
if isinstance(DOCUMENT_CHUNKS, list):
first_chunk = DOCUMENT_CHUNKS[0]
print(f"First chunk keys: {list(first_chunk.keys())}", file=sys.stderr)
print(f"First chunk ID: {first_chunk.get('chunk_id', 'NO_ID')}", file=sys.stderr)
print(f"First chunk text preview: {first_chunk.get('text', 'NO_TEXT')[:100]}...", file=sys.stderr)
elif isinstance(DOCUMENT_CHUNKS, dict):
print(f"DOCUMENT_CHUNKS is a dict with keys: {list(DOCUMENT_CHUNKS.keys())}", file=sys.stderr)
# Convert dict to list if needed
if 'chunks' in DOCUMENT_CHUNKS:
DOCUMENT_CHUNKS = DOCUMENT_CHUNKS['chunks']
print(f"Extracted chunks list with {len(DOCUMENT_CHUNKS)} items", file=sys.stderr)
else:
print("DOCUMENT_CHUNKS is a dict but no 'chunks' key found", file=sys.stderr)
else:
print(f"DOCUMENT_CHUNKS is unexpected type: {type(DOCUMENT_CHUNKS)}", file=sys.stderr)
else:
print("DOCUMENT_CHUNKS is empty", file=sys.stderr)
except json.JSONDecodeError as e:
print(f"Failed to parse DOCUMENT_CHUNKS: {e}", file=sys.stderr)
print(f"Raw DOCUMENT_CHUNKS: {chunks_env[:200]}...", file=sys.stderr)
sys.exit(1)
else:
print("No DOCUMENT_CHUNKS environment variable found", file=sys.stderr)
print("Available env vars:", [k for k in os.environ.keys() if 'CHUNK' in k.upper()], file=sys.stderr)
# Run the server
try:
print("Creating stdio server streams", file=sys.stderr)
async with stdio_server() as (read_stream, write_stream):
print("Server streams created, running server", file=sys.stderr)
# Create server capabilities
capabilities = server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
)
print(f"Server capabilities created: {type(capabilities)}", file=sys.stderr)
# Initialize and run server
init_options = InitializationOptions(
server_name="simple-document-server",
server_version="1.0.0",
capabilities=capabilities,
)
print("Starting server.run()", file=sys.stderr)
await server.run(read_stream, write_stream, init_options)
print("Server.run() completed", file=sys.stderr)
except EOFError:
print("Server received EOF (stdin closed) - this is normal for testing", file=sys.stderr)
except KeyboardInterrupt:
print("Server interrupted by user", file=sys.stderr)
except Exception as e:
print(f"Server error: {e}", file=sys.stderr)
print(f"Error type: {type(e)}", file=sys.stderr)
import traceback
print(f"Full traceback: {traceback.format_exc()}", file=sys.stderr)
raise
if __name__ == "__main__":
try:
print("Starting asyncio main loop", file=sys.stderr)
asyncio.run(main())
print("Main loop completed normally", file=sys.stderr)
except KeyboardInterrupt:
print("Server interrupted by user", file=sys.stderr)
sys.exit(0)
except Exception as e:
print(f"Fatal error: {e}", file=sys.stderr)
print(f"Error type: {type(e)}", file=sys.stderr)
import traceback
print(f"Full traceback: {traceback.format_exc()}", file=sys.stderr)
sys.exit(1)
Implement an MCP server in Mage Pro
Once you’ve written or copied the code above you’ll need to create and store a file in Mage Pro for your pipeline to reference. Take the following steps to complete this process:
Recommended by LinkedIn
env = os.environ.copy()
env["DOCUMENT_CHUNKS"] = json.dumps(data.to_dict() if hasattr(data, 'to_dict') else data)
env["PATH"] = "/usr/local/bin:/usr/bin:/bin"
try:
print(f"Asking MCP server: {question}")
# Start MCP server
server_process = subprocess.Popen([
"python", "/home/src/demo/mcp_document_server.py"
],
Once you complete the steps above you can add a new block of code that will call anthropic to return an analytical answer based on the document provided.
Call Anthropic’s Claude
The final transformation block serves as the intelligence layer of our MCP system. it orchestrates communication between the processed document chunks and the Anathropic API. This block:
This creates a complete audit trail from source document to AI-generated insights that exemplifies the core principles of MCP.
from mage_ai.data_preparation.shared.secrets import get_secret_value
import subprocess
import json
import os
import time
import anthropic
@transformer
def mcp_retrieval_and_claude_analysis(data, **kwargs):
"""Get MCP content and analyze with Claude API in one step"""
question = "What is the Great Attractor and what did the research discover about it?"
# Set up environment
env = os.environ.copy()
env["DOCUMENT_CHUNKS"] = json.dumps(data.to_dict() if hasattr(data, 'to_dict') else data)
env["PATH"] = "/usr/local/bin:/usr/bin:/bin"
try:
print(f"Asking MCP server: {question}")
# Start MCP server
server_process = subprocess.Popen([
"python", "/home/src/demo/mcp_document_server.py"
],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
text=True
)
time.sleep(2)
# Initialize MCP session
init_msg = {"jsonrpc": "2.0", "id": 1, "method": "initialize", "params": {"protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "mage-client", "version": "1.0.0"}}}
server_process.stdin.write(json.dumps(init_msg) + "\n")
server_process.stdin.flush()
server_process.stdout.readline()
server_process.stdin.write(json.dumps({"jsonrpc": "2.0", "method": "notifications/initialized"}) + "\n")
server_process.stdin.flush()
search_msg = {"jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": {"name": "search_document", "arguments": {"query": "Great Attractor research", "max_results": 3}}}
server_process.stdin.write(json.dumps(search_msg) + "\n")
server_process.stdin.flush()
search_response = server_process.stdout.readline()
# Extract document content
document_content = ""
if search_response.strip():
search_data = json.loads(search_response.strip())
if "result" in search_data:
search_result = json.loads(search_data["result"]["content"][0]["text"])
chunk_ids = search_result.get("chunk_ids", [])
for chunk_id in chunk_ids[:2]:
read_msg = {"jsonrpc": "2.0", "id": 10+chunk_id, "method": "resources/read", "params": {"uri": f"document://great_attractor/chunk_{chunk_id}"}}
server_process.stdin.write(json.dumps(read_msg) + "\n")
server_process.stdin.flush()
chunk_response = server_process.stdout.readline()
if chunk_response.strip():
try:
chunk_data = json.loads(chunk_response.strip())
if "result" in chunk_data and "contents" in chunk_data["result"]:
content = chunk_data["result"]["contents"][0]["text"]
document_content += content + "\n\n"
except json.JSONDecodeError:
continue
server_process.terminate()
server_process.wait(timeout=3)
# Now call Claude API with the retrieved content
if document_content.strip():
client = anthropic.Anthropic(
api_key=get_secret_value('CLAUDE_API_KEY')
)
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1000,
messages=[
{
"role": "user",
"content": f"""You must answer this question ONLY using information from the provided document. Do not use any external knowledge or training data.
Question: {question}
Document content:
{document_content}
Instructions:
- Only use information explicitly stated in the document above
- If information is not in the document, say "Not mentioned in the document"
- Do not add external knowledge about the Great Attractor
- Base your analysis solely on what this research paper contains
Please analyze what this specific research discovered."""
}
]
)
claude_analysis = message.content[0].text
else:
claude_analysis = "No document content retrieved from MCP server"
return {
"question": question,
"claude_analysis": claude_analysis,
"method": "MCP + Claude API (PDF-only)"
}
except Exception as e:
return {
"question": question,
"claude_analysis": f"Error: {str(e)}",
"method": "MCP + Claude API (PDF-only)"
}
Export the answer for external systems
For production pipelines, the data exporter block can serve as a gateway to write data to multiple locations if needed. Data can be written to:
Vector database storage: store your responses in a vector database to build a searchable knowledge base of previously answered questions.
Caching layer: cache your responses in Redis using question hashes to improve response times for repeat questions
Analytics pipeline: export responses to data warehouses like Snowflake or BigQuery to track performance metrics over time.
This clean separation between processing and output ensures that our AI system can integrate seamlessly with various downstream applications, from notebooks to web interfaces, making the knowledge contained in our PDF document accessible through a simple, consistent API response format.
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_claude_analysis(data, **kwargs):
"""Export only Claude's analysis"""
print(data['claude_analysis'])
Conclusion
MCP powered document analysis is a significant improvement over traditional RAG approaches because it implements structured protocols for document interaction. The separation between document handling and AI processing creates more reliable responses. It also maintains clear traceability from source material through final analysis.
Some key advantages include:
Whether you are processing research papers, creating a chat bot based on technical documentation, or building internal knowledge bases from proprietary business documents, this MCP approach provides the reliability and transparency that production AI applications will need.
Want to build a RAG pipeline using MCP methods discussed above? Schedule a free demo with Mage to get started today.