OpenAI with Danube MCP Server
This guide shows how to connect OpenAI to the Danube MCP Server using the Model Context Protocol (MCP). This approach allows OpenAI models to access Danube tools through a standardized protocol.Overview
The Danube MCP Server exposes tools via the MCP protocol, which can be accessed over HTTP/SSE. You can:- Connect to the Danube MCP Server
- List available tools
- Execute tools and return results to OpenAI
Prerequisites
Copy
pip install openai httpx httpx-sse
MCP Client for Danube
First, let’s create a simple MCP client that connects to the Danube MCP Server:Copy
import json
import httpx
from typing import Any, Dict, List, Optional
class DanubeMCPClient:
"""Client for connecting to Danube MCP Server."""
def __init__(
self,
api_key: str,
mcp_url: str = "https://mcp.danubeai.com/mcp",
):
self.api_key = api_key
self.mcp_url = mcp_url
self.headers = {
"danube-api-key": api_key,
"Content-Type": "application/json",
}
self._request_id = 0
def _next_id(self) -> int:
self._request_id += 1
return self._request_id
def _make_request(self, method: str, params: Dict = None) -> Dict:
"""Make a JSON-RPC request to the MCP server."""
payload = {
"jsonrpc": "2.0",
"id": self._next_id(),
"method": method,
}
if params:
payload["params"] = params
with httpx.Client(timeout=60.0) as client:
response = client.post(
self.mcp_url,
headers=self.headers,
json=payload,
)
response.raise_for_status()
result = response.json()
if "error" in result:
raise Exception(f"MCP Error: {result['error']}")
return result.get("result", {})
def list_tools(self) -> List[Dict]:
"""List all available tools from the MCP server."""
result = self._make_request("tools/list")
return result.get("tools", [])
def call_tool(self, name: str, arguments: Dict = None) -> Any:
"""Call a tool on the MCP server."""
result = self._make_request("tools/call", {
"name": name,
"arguments": arguments or {},
})
return result
def list_resources(self) -> List[Dict]:
"""List available resources."""
result = self._make_request("resources/list")
return result.get("resources", [])
def read_resource(self, uri: str) -> Any:
"""Read a resource by URI."""
result = self._make_request("resources/read", {"uri": uri})
return result
Converting MCP Tools to OpenAI Format
Copy
def mcp_tool_to_openai_function(mcp_tool: Dict) -> Dict:
"""Convert an MCP tool to OpenAI function calling format."""
input_schema = mcp_tool.get("inputSchema", {})
return {
"type": "function",
"function": {
"name": mcp_tool["name"],
"description": mcp_tool.get("description", ""),
"parameters": input_schema if input_schema else {
"type": "object",
"properties": {},
"required": [],
},
},
}
def convert_mcp_tools(mcp_tools: List[Dict]) -> List[Dict]:
"""Convert a list of MCP tools to OpenAI format."""
return [mcp_tool_to_openai_function(t) for t in mcp_tools]
Complete Agent Example
Here’s a full implementation of an OpenAI agent using the Danube MCP Server:Copy
import json
from typing import List, Dict, Any
from openai import OpenAI
class OpenAIMCPAgent:
"""OpenAI agent that uses Danube tools via MCP."""
def __init__(
self,
openai_api_key: str,
danube_api_key: str,
model: str = "gpt-4-turbo-preview",
mcp_url: str = "https://mcp.danubeai.com/mcp",
):
self.openai = OpenAI(api_key=openai_api_key)
self.mcp = DanubeMCPClient(api_key=danube_api_key, mcp_url=mcp_url)
self.model = model
self.messages: List[Dict[str, Any]] = []
self.tools: List[Dict] = []
def initialize(self, system_prompt: str = None):
"""Initialize the agent and load available tools."""
if system_prompt:
self.messages = [{"role": "system", "content": system_prompt}]
else:
self.messages = [{
"role": "system",
"content": "You are a helpful assistant with access to various tools through the Danube platform."
}]
# Load tools from MCP server
mcp_tools = self.mcp.list_tools()
self.tools = convert_mcp_tools(mcp_tools)
print(f"Loaded {len(self.tools)} tools from Danube MCP Server")
return self
def _execute_tool(self, name: str, arguments: Dict) -> str:
"""Execute a tool via MCP and return the result."""
try:
result = self.mcp.call_tool(name, arguments)
# Extract content from MCP result format
content = result.get("content", [])
if content:
texts = [c.get("text", "") for c in content if c.get("type") == "text"]
return "\n".join(texts) if texts else json.dumps(result)
return json.dumps(result)
except Exception as e:
return f"Error executing tool: {str(e)}"
def chat(self, user_message: str, max_iterations: int = 5) -> str:
"""Chat with the agent, executing tools as needed."""
self.messages.append({"role": "user", "content": user_message})
for iteration in range(max_iterations):
# Call OpenAI
response = self.openai.chat.completions.create(
model=self.model,
messages=self.messages,
tools=self.tools if self.tools else None,
tool_choice="auto" if self.tools else None,
)
assistant_message = response.choices[0].message
# If no tool calls, return the response
if not assistant_message.tool_calls:
self.messages.append({
"role": "assistant",
"content": assistant_message.content
})
return assistant_message.content
# Process tool calls
self.messages.append(assistant_message)
for tool_call in assistant_message.tool_calls:
function_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments)
print(f"[Iteration {iteration + 1}] Calling: {function_name}")
print(f" Arguments: {json.dumps(arguments, indent=2)}")
result = self._execute_tool(function_name, arguments)
print(f" Result preview: {result[:200]}...")
self.messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result,
})
return "Reached maximum iterations without completing"
def get_identity(self) -> Dict:
"""Get user identity from MCP resource."""
return self.mcp.read_resource("identity://user")
# Example usage
if __name__ == "__main__":
agent = OpenAIMCPAgent(
openai_api_key="sk-your-openai-key",
danube_api_key="dk_your-danube-key",
)
agent.initialize(
system_prompt="""You are a helpful assistant with access to Danube tools.
You can search for services, tools, and execute them to help users.
Always explain what tools you're using and why."""
)
# Example conversation
print("\n" + "="*60)
print("User: What services are available for news?")
print("="*60)
response = agent.chat("What services are available for news?")
print(f"\nAssistant: {response}")
print("\n" + "="*60)
print("User: Get me the top stories from Hacker News")
print("="*60)
response = agent.chat("Get me the top stories from Hacker News")
print(f"\nAssistant: {response}")
Using SSE Transport (Streaming)
For better performance with long-running operations, use Server-Sent Events:Copy
import httpx
from httpx_sse import connect_sse
class DanubeMCPSSEClient:
"""MCP client using SSE transport for streaming."""
def __init__(self, api_key: str, mcp_url: str = "https://mcp.danubeai.com/mcp"):
self.api_key = api_key
self.mcp_url = mcp_url
self._request_id = 0
def _next_id(self) -> int:
self._request_id += 1
return self._request_id
def call_tool_streaming(self, name: str, arguments: Dict = None):
"""Call a tool with SSE streaming."""
payload = {
"jsonrpc": "2.0",
"id": self._next_id(),
"method": "tools/call",
"params": {
"name": name,
"arguments": arguments or {},
},
}
with httpx.Client() as client:
with connect_sse(
client,
"POST",
self.mcp_url,
headers={
"danube-api-key": self.api_key,
"Content-Type": "application/json",
"Accept": "text/event-stream",
},
json=payload,
) as event_source:
for event in event_source.iter_sse():
if event.data:
data = json.loads(event.data)
yield data
Advanced: Multi-Service Agent
Create an agent that can work with multiple Danube services:Copy
class MultiServiceAgent:
"""Agent that intelligently routes to different services."""
def __init__(self, openai_api_key: str, danube_api_key: str):
self.openai = OpenAI(api_key=openai_api_key)
self.mcp = DanubeMCPClient(api_key=danube_api_key)
self.tools: List[Dict] = []
self.tool_metadata: Dict[str, Dict] = {}
def load_service_tools(self, service_id: str):
"""Load tools for a specific service."""
# Use the get_service_tools MCP tool
result = self.mcp.call_tool("get_service_tools", {
"service_id": service_id,
})
# Parse and add tools
if "tools" in result:
for tool in result["tools"]:
openai_tool = mcp_tool_to_openai_function(tool)
self.tools.append(openai_tool)
self.tool_metadata[tool["name"]] = {
"service_id": service_id,
"tool_id": tool.get("id"),
}
def search_and_load_tools(self, query: str, limit: int = 10):
"""Search for tools and load them."""
# Use the search_tools MCP tool
result = self.mcp.call_tool("search_tools", {
"query": query,
"limit": limit,
})
# The result contains tools that can be executed
# via the execute_tool MCP tool
def chat(self, message: str) -> str:
"""Chat with automatic tool routing."""
# Implementation similar to OpenAIMCPAgent
pass
MCP Tool Reference
The Danube MCP Server exposes these tools:| Tool | Description |
|---|---|
list_services | List/search available services |
search_tools | Search for tools by query |
get_service_tools | Get all tools for a service |
execute_tool | Execute a tool by ID or name |
search_skills | Search for skills |
get_skill | Get skill with full content |
Example MCP Calls
Copy
# List services
services = mcp.call_tool("list_services", {"query": "email", "limit": 5})
# Search tools
tools = mcp.call_tool("search_tools", {"query": "send email", "limit": 10})
# Execute a tool
result = mcp.call_tool("execute_tool", {
"tool_name": "Gmail - Send Email",
"parameters": {
"to": "user@example.com",
"subject": "Hello",
"body": "Message content"
}
})
# Get user identity (via resource)
identity = mcp.read_resource("identity://user")
Connection Details
| Setting | Value |
|---|---|
| MCP Server URL | https://mcp.danubeai.com/mcp |
| Authentication | danube-api-key header |
| Transport | HTTP + SSE |
| Protocol | JSON-RPC 2.0 |
Error Handling
Copy
def safe_mcp_call(mcp: DanubeMCPClient, tool_name: str, args: Dict) -> str:
"""Safely call an MCP tool with error handling."""
try:
result = mcp.call_tool(tool_name, args)
# Check for MCP-level errors
if isinstance(result, dict) and result.get("isError"):
content = result.get("content", [])
error_text = next(
(c["text"] for c in content if c.get("type") == "text"),
"Unknown error"
)
return f"Tool error: {error_text}"
# Extract successful result
if isinstance(result, dict) and "content" in result:
texts = [c["text"] for c in result["content"] if c.get("type") == "text"]
return "\n".join(texts)
return json.dumps(result)
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
return "Authentication failed - check your API key"
elif e.response.status_code == 404:
return f"Tool '{tool_name}' not found"
else:
return f"HTTP error: {e.response.status_code}"
except Exception as e:
return f"Error: {str(e)}"
See Also
- Python SDK - Direct API access without MCP
- OpenAI Function Calling - Using the Python SDK with OpenAI
- MCP Client Setup - Setting up MCP clients
