Skip to main content

OpenAI with Danube MCP Server

This guide shows how to connect OpenAI to the Danube MCP Server using the Model Context Protocol (MCP). This approach allows OpenAI models to access Danube tools through a standardized protocol.

Overview

The Danube MCP Server exposes tools via the MCP protocol, which can be accessed over HTTP/SSE. You can:
  1. Connect to the Danube MCP Server
  2. List available tools
  3. Execute tools and return results to OpenAI

Prerequisites

pip install openai httpx httpx-sse

MCP Client for Danube

First, let’s create a simple MCP client that connects to the Danube MCP Server:
import json
import httpx
from typing import Any, Dict, List, Optional

class DanubeMCPClient:
    """Client for connecting to Danube MCP Server."""

    def __init__(
        self,
        api_key: str,
        mcp_url: str = "https://mcp.danubeai.com/mcp",
    ):
        self.api_key = api_key
        self.mcp_url = mcp_url
        self.headers = {
            "danube-api-key": api_key,
            "Content-Type": "application/json",
        }
        self._request_id = 0

    def _next_id(self) -> int:
        self._request_id += 1
        return self._request_id

    def _make_request(self, method: str, params: Dict = None) -> Dict:
        """Make a JSON-RPC request to the MCP server."""
        payload = {
            "jsonrpc": "2.0",
            "id": self._next_id(),
            "method": method,
        }
        if params:
            payload["params"] = params

        with httpx.Client(timeout=60.0) as client:
            response = client.post(
                self.mcp_url,
                headers=self.headers,
                json=payload,
            )
            response.raise_for_status()
            result = response.json()

            if "error" in result:
                raise Exception(f"MCP Error: {result['error']}")

            return result.get("result", {})

    def list_tools(self) -> List[Dict]:
        """List all available tools from the MCP server."""
        result = self._make_request("tools/list")
        return result.get("tools", [])

    def call_tool(self, name: str, arguments: Dict = None) -> Any:
        """Call a tool on the MCP server."""
        result = self._make_request("tools/call", {
            "name": name,
            "arguments": arguments or {},
        })
        return result

    def list_resources(self) -> List[Dict]:
        """List available resources."""
        result = self._make_request("resources/list")
        return result.get("resources", [])

    def read_resource(self, uri: str) -> Any:
        """Read a resource by URI."""
        result = self._make_request("resources/read", {"uri": uri})
        return result

Converting MCP Tools to OpenAI Format

def mcp_tool_to_openai_function(mcp_tool: Dict) -> Dict:
    """Convert an MCP tool to OpenAI function calling format."""
    input_schema = mcp_tool.get("inputSchema", {})

    return {
        "type": "function",
        "function": {
            "name": mcp_tool["name"],
            "description": mcp_tool.get("description", ""),
            "parameters": input_schema if input_schema else {
                "type": "object",
                "properties": {},
                "required": [],
            },
        },
    }

def convert_mcp_tools(mcp_tools: List[Dict]) -> List[Dict]:
    """Convert a list of MCP tools to OpenAI format."""
    return [mcp_tool_to_openai_function(t) for t in mcp_tools]

Complete Agent Example

Here’s a full implementation of an OpenAI agent using the Danube MCP Server:
import json
from typing import List, Dict, Any
from openai import OpenAI

class OpenAIMCPAgent:
    """OpenAI agent that uses Danube tools via MCP."""

    def __init__(
        self,
        openai_api_key: str,
        danube_api_key: str,
        model: str = "gpt-4-turbo-preview",
        mcp_url: str = "https://mcp.danubeai.com/mcp",
    ):
        self.openai = OpenAI(api_key=openai_api_key)
        self.mcp = DanubeMCPClient(api_key=danube_api_key, mcp_url=mcp_url)
        self.model = model
        self.messages: List[Dict[str, Any]] = []
        self.tools: List[Dict] = []

    def initialize(self, system_prompt: str = None):
        """Initialize the agent and load available tools."""
        if system_prompt:
            self.messages = [{"role": "system", "content": system_prompt}]
        else:
            self.messages = [{
                "role": "system",
                "content": "You are a helpful assistant with access to various tools through the Danube platform."
            }]

        # Load tools from MCP server
        mcp_tools = self.mcp.list_tools()
        self.tools = convert_mcp_tools(mcp_tools)
        print(f"Loaded {len(self.tools)} tools from Danube MCP Server")

        return self

    def _execute_tool(self, name: str, arguments: Dict) -> str:
        """Execute a tool via MCP and return the result."""
        try:
            result = self.mcp.call_tool(name, arguments)

            # Extract content from MCP result format
            content = result.get("content", [])
            if content:
                texts = [c.get("text", "") for c in content if c.get("type") == "text"]
                return "\n".join(texts) if texts else json.dumps(result)

            return json.dumps(result)
        except Exception as e:
            return f"Error executing tool: {str(e)}"

    def chat(self, user_message: str, max_iterations: int = 5) -> str:
        """Chat with the agent, executing tools as needed."""
        self.messages.append({"role": "user", "content": user_message})

        for iteration in range(max_iterations):
            # Call OpenAI
            response = self.openai.chat.completions.create(
                model=self.model,
                messages=self.messages,
                tools=self.tools if self.tools else None,
                tool_choice="auto" if self.tools else None,
            )

            assistant_message = response.choices[0].message

            # If no tool calls, return the response
            if not assistant_message.tool_calls:
                self.messages.append({
                    "role": "assistant",
                    "content": assistant_message.content
                })
                return assistant_message.content

            # Process tool calls
            self.messages.append(assistant_message)

            for tool_call in assistant_message.tool_calls:
                function_name = tool_call.function.name
                arguments = json.loads(tool_call.function.arguments)

                print(f"[Iteration {iteration + 1}] Calling: {function_name}")
                print(f"  Arguments: {json.dumps(arguments, indent=2)}")

                result = self._execute_tool(function_name, arguments)

                print(f"  Result preview: {result[:200]}...")

                self.messages.append({
                    "role": "tool",
                    "tool_call_id": tool_call.id,
                    "content": result,
                })

        return "Reached maximum iterations without completing"

    def get_identity(self) -> Dict:
        """Get user identity from MCP resource."""
        return self.mcp.read_resource("identity://user")


# Example usage
if __name__ == "__main__":
    agent = OpenAIMCPAgent(
        openai_api_key="sk-your-openai-key",
        danube_api_key="dk_your-danube-key",
    )

    agent.initialize(
        system_prompt="""You are a helpful assistant with access to Danube tools.
        You can search for services, tools, and execute them to help users.
        Always explain what tools you're using and why."""
    )

    # Example conversation
    print("\n" + "="*60)
    print("User: What services are available for news?")
    print("="*60)

    response = agent.chat("What services are available for news?")
    print(f"\nAssistant: {response}")

    print("\n" + "="*60)
    print("User: Get me the top stories from Hacker News")
    print("="*60)

    response = agent.chat("Get me the top stories from Hacker News")
    print(f"\nAssistant: {response}")

Using SSE Transport (Streaming)

For better performance with long-running operations, use Server-Sent Events:
import httpx
from httpx_sse import connect_sse

class DanubeMCPSSEClient:
    """MCP client using SSE transport for streaming."""

    def __init__(self, api_key: str, mcp_url: str = "https://mcp.danubeai.com/mcp"):
        self.api_key = api_key
        self.mcp_url = mcp_url
        self._request_id = 0

    def _next_id(self) -> int:
        self._request_id += 1
        return self._request_id

    def call_tool_streaming(self, name: str, arguments: Dict = None):
        """Call a tool with SSE streaming."""
        payload = {
            "jsonrpc": "2.0",
            "id": self._next_id(),
            "method": "tools/call",
            "params": {
                "name": name,
                "arguments": arguments or {},
            },
        }

        with httpx.Client() as client:
            with connect_sse(
                client,
                "POST",
                self.mcp_url,
                headers={
                    "danube-api-key": self.api_key,
                    "Content-Type": "application/json",
                    "Accept": "text/event-stream",
                },
                json=payload,
            ) as event_source:
                for event in event_source.iter_sse():
                    if event.data:
                        data = json.loads(event.data)
                        yield data

Advanced: Multi-Service Agent

Create an agent that can work with multiple Danube services:
class MultiServiceAgent:
    """Agent that intelligently routes to different services."""

    def __init__(self, openai_api_key: str, danube_api_key: str):
        self.openai = OpenAI(api_key=openai_api_key)
        self.mcp = DanubeMCPClient(api_key=danube_api_key)
        self.tools: List[Dict] = []
        self.tool_metadata: Dict[str, Dict] = {}

    def load_service_tools(self, service_id: str):
        """Load tools for a specific service."""
        # Use the get_service_tools MCP tool
        result = self.mcp.call_tool("get_service_tools", {
            "service_id": service_id,
        })

        # Parse and add tools
        if "tools" in result:
            for tool in result["tools"]:
                openai_tool = mcp_tool_to_openai_function(tool)
                self.tools.append(openai_tool)
                self.tool_metadata[tool["name"]] = {
                    "service_id": service_id,
                    "tool_id": tool.get("id"),
                }

    def search_and_load_tools(self, query: str, limit: int = 10):
        """Search for tools and load them."""
        # Use the search_tools MCP tool
        result = self.mcp.call_tool("search_tools", {
            "query": query,
            "limit": limit,
        })

        # The result contains tools that can be executed
        # via the execute_tool MCP tool

    def chat(self, message: str) -> str:
        """Chat with automatic tool routing."""
        # Implementation similar to OpenAIMCPAgent
        pass

MCP Tool Reference

The Danube MCP Server exposes these tools:
ToolDescription
list_servicesList/search available services
search_toolsSearch for tools by query
get_service_toolsGet all tools for a service
execute_toolExecute a tool by ID or name
search_skillsSearch for skills
get_skillGet skill with full content

Example MCP Calls

# List services
services = mcp.call_tool("list_services", {"query": "email", "limit": 5})

# Search tools
tools = mcp.call_tool("search_tools", {"query": "send email", "limit": 10})

# Execute a tool
result = mcp.call_tool("execute_tool", {
    "tool_name": "Gmail - Send Email",
    "parameters": {
        "to": "user@example.com",
        "subject": "Hello",
        "body": "Message content"
    }
})

# Get user identity (via resource)
identity = mcp.read_resource("identity://user")

Connection Details

SettingValue
MCP Server URLhttps://mcp.danubeai.com/mcp
Authenticationdanube-api-key header
TransportHTTP + SSE
ProtocolJSON-RPC 2.0

Error Handling

def safe_mcp_call(mcp: DanubeMCPClient, tool_name: str, args: Dict) -> str:
    """Safely call an MCP tool with error handling."""
    try:
        result = mcp.call_tool(tool_name, args)

        # Check for MCP-level errors
        if isinstance(result, dict) and result.get("isError"):
            content = result.get("content", [])
            error_text = next(
                (c["text"] for c in content if c.get("type") == "text"),
                "Unknown error"
            )
            return f"Tool error: {error_text}"

        # Extract successful result
        if isinstance(result, dict) and "content" in result:
            texts = [c["text"] for c in result["content"] if c.get("type") == "text"]
            return "\n".join(texts)

        return json.dumps(result)

    except httpx.HTTPStatusError as e:
        if e.response.status_code == 401:
            return "Authentication failed - check your API key"
        elif e.response.status_code == 404:
            return f"Tool '{tool_name}' not found"
        else:
            return f"HTTP error: {e.response.status_code}"

    except Exception as e:
        return f"Error: {str(e)}"

See Also