30 min read
Advanced
Building Servers

Advanced MCP Patterns

Master advanced MCP patterns including streaming, Streamable HTTP transport, authentication, and production server architecture

MCPgee Team

MCP Expert

Completed the first MCP server tutorialFamiliarity with TypeScript or Python MCP developmentUnderstanding of HTTP and authentication conceptsBasic knowledge of streaming protocols

Advanced MCP Patterns

Introduction

Once you have built basic MCP servers (see our first MCP server tutorial or Python MCP server guide), you are ready to explore advanced patterns that enable production-grade deployments. This tutorial covers streaming responses, Streamable HTTP transport, authentication, middleware, and architectural patterns for scalable MCP servers.

Streaming Responses

Streaming allows your MCP server to send data incrementally, which is essential for long-running operations, large datasets, or real-time updates.

Streaming in TypeScript

Use the high-level McpServer class for streaming support:

typescript
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { z } from 'zod';

const server = new McpServer({
  name: 'streaming-server',
  version: '1.0.0',
});

server.tool(
  'process-large-dataset',
  'Process a large dataset with progress updates',
  {
    datasetUrl: z.string().url(),
    chunkSize: z.number().default(1000),
  },
  async ({ datasetUrl, chunkSize }, { sendNotification }) => {
    const totalRecords = 10000;
    let processed = 0;

    while (processed < totalRecords) {
      // Process chunk
      processed += chunkSize;
      const progress = Math.min((processed / totalRecords) * 100, 100);

      // Send progress notification
      await sendNotification({
        method: 'notifications/progress',
        params: {
          progressToken: 'dataset-processing',
          progress: processed,
          total: totalRecords,
        },
      });
    }

    return {
      content: [
        {
          type: 'text',
          text: `Processed ${totalRecords} records from ${datasetUrl}`,
        },
      ],
    };
  }
);

Streaming in Python

python
from mcp.server.fastmcp import FastMCP, Context

mcp = FastMCP("streaming-server")

@mcp.tool()
async def process_large_file(filepath: str, ctx: Context) -> str:
    """Process a large file with progress updates.

    Args:
        filepath: Path to the file to process
    """
    total_lines = 10000
    processed = 0

    for i in range(0, total_lines, 100):
        processed += 100
        await ctx.report_progress(processed, total_lines)
        ctx.info(f"Processed {processed}/{total_lines} lines")

    return f"Completed processing {total_lines} lines from {filepath}"

Streamable HTTP Transport

Streamable HTTP is the recommended transport for remote MCP server deployments. It replaces the deprecated SSE (Server-Sent Events) transport with a more robust and flexible approach.

Why Streamable HTTP over SSE

  • Bidirectional communication without requiring two separate connections
  • Better error handling with standard HTTP status codes
  • Simpler deployment behind load balancers and proxies
  • Built-in session management via session tokens
  • Resumability for interrupted connections

TypeScript Streamable HTTP Server

typescript
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
import express from 'express';

const app = express();
app.use(express.json());

const server = new McpServer({
  name: 'remote-server',
  version: '1.0.0',
});

// Register tools
server.tool('echo', 'Echo a message', { message: z.string() }, async ({ message }) => ({
  content: [{ type: 'text', text: `Echo: ${message}` }],
}));

// Create transport and connect
const transport = new StreamableHTTPServerTransport({
  sessionIdGenerator: () => crypto.randomUUID(),
});

app.post('/mcp', async (req, res) => {
  await transport.handleRequest(req, res);
});

app.get('/mcp', async (req, res) => {
  await transport.handleRequest(req, res);
});

app.delete('/mcp', async (req, res) => {
  await transport.handleRequest(req, res);
});

await server.connect(transport);

app.listen(3000, () => {
  console.log('MCP server running on http://localhost:3000/mcp');
});

Python Streamable HTTP Server

python
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("remote-server")

@mcp.tool()
def echo(message: str) -> str:
    """Echo a message back.

    Args:
        message: The message to echo
    """
    return f"Echo: {message}"

if __name__ == "__main__":
    mcp.run(transport="streamable-http", host="0.0.0.0", port=3000)

Authentication and Authorization

For production MCP servers exposed over the network, authentication is critical. See our dedicated MCP authentication tutorial for comprehensive coverage.

OAuth 2.0 Integration

MCP supports OAuth 2.0 for authenticating clients:

typescript
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';

const server = new McpServer({
  name: 'authenticated-server',
  version: '1.0.0',
});

// Middleware for token validation
function validateToken(req: express.Request): boolean {
  const authHeader = req.headers.authorization;
  if (!authHeader || !authHeader.startsWith('Bearer ')) {
    return false;
  }
  const token = authHeader.slice(7);
  // Validate token against your auth provider
  return verifyJWT(token);
}

app.post('/mcp', async (req, res) => {
  if (!validateToken(req)) {
    return res.status(401).json({ error: 'Unauthorized' });
  }
  await transport.handleRequest(req, res);
});

API Key Authentication

For simpler setups, API key authentication works well:

python
from mcp.server.fastmcp import FastMCP
import os

mcp = FastMCP("secure-server")

API_KEYS = set(os.environ.get("MCP_API_KEYS", "").split(","))

@mcp.tool()
def secure_operation(data: str) -> str:
    """Perform a secure operation.

    Args:
        data: Input data to process
    """
    return f"Processed: {data}"

Middleware and Request Processing

Logging Middleware

typescript
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';

const server = new McpServer({
  name: 'logged-server',
  version: '1.0.0',
});

// Express middleware for request logging
app.use('/mcp', (req, res, next) => {
  const start = Date.now();
  console.log(`[${new Date().toISOString()}] ${req.method} ${req.path}`);

  res.on('finish', () => {
    const duration = Date.now() - start;
    console.log(`[${new Date().toISOString()}] Response: ${res.statusCode} (${duration}ms)`);
  });

  next();
});

Rate Limiting

typescript
const rateLimits = new Map<string, { count: number; resetAt: number }>();

function rateLimit(clientId: string, maxRequests = 100, windowMs = 60000): boolean {
  const now = Date.now();
  const limit = rateLimits.get(clientId);

  if (!limit || now > limit.resetAt) {
    rateLimits.set(clientId, { count: 1, resetAt: now + windowMs });
    return true;
  }

  if (limit.count >= maxRequests) {
    return false;
  }

  limit.count++;
  return true;
}

For more on rate limiting and caching, see our performance optimization tutorial.

Multi-Tool Server Architecture

Tool Registry Pattern

For servers with many tools, use a registry pattern to keep code organized:

typescript
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { z } from 'zod';

const server = new McpServer({
  name: 'multi-tool-server',
  version: '1.0.0',
});

// Database tools module
function registerDatabaseTools(server: McpServer) {
  server.tool(
    'query',
    'Execute a database query',
    { sql: z.string(), database: z.string().default('main') },
    async ({ sql, database }) => {
      // Execute query
      return { content: [{ type: 'text', text: 'Query results...' }] };
    }
  );

  server.tool(
    'list-tables',
    'List database tables',
    { database: z.string().default('main') },
    async ({ database }) => {
      return { content: [{ type: 'text', text: 'Tables: users, orders, products' }] };
    }
  );
}

// File tools module
function registerFileTools(server: McpServer) {
  server.tool(
    'read-file',
    'Read a file',
    { path: z.string() },
    async ({ path }) => {
      return { content: [{ type: 'text', text: 'File contents...' }] };
    }
  );
}

// Register all tool modules
registerDatabaseTools(server);
registerFileTools(server);

Error Handling Patterns

Structured Error Responses

typescript
server.tool(
  'risky-operation',
  'An operation that might fail',
  { input: z.string() },
  async ({ input }) => {
    try {
      const result = await performOperation(input);
      return {
        content: [{ type: 'text', text: JSON.stringify(result) }],
      };
    } catch (error) {
      return {
        content: [
          {
            type: 'text',
            text: JSON.stringify({
              error: error instanceof Error ? error.message : 'Unknown error',
              suggestion: 'Check input format and try again',
            }),
          },
        ],
        isError: true,
      };
    }
  }
);

Graceful Degradation

python
@mcp.tool()
async def fetch_with_fallback(url: str, ctx: Context) -> str:
    """Fetch data with fallback to cache.

    Args:
        url: URL to fetch data from
    """
    try:
        async with httpx.AsyncClient(timeout=10.0) as client:
            response = await client.get(url)
            response.raise_for_status()
            # Cache the result
            cache[url] = response.text
            return response.text
    except httpx.TimeoutException:
        ctx.warning(f"Timeout fetching {url}, using cache")
        if url in cache:
            return cache[url]
        return json.dumps({"error": "Timeout and no cached data available"})
    except httpx.HTTPStatusError as e:
        ctx.error(f"HTTP {e.response.status_code} for {url}")
        return json.dumps({"error": f"HTTP error: {e.response.status_code}"})

Testing Advanced Servers

Integration Testing

typescript
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js';

async function testServer() {
  const transport = new StreamableHTTPClientTransport(
    new URL('http://localhost:3000/mcp')
  );
  const client = new Client({ name: 'test-client', version: '1.0.0' });
  await client.connect(transport);

  // List tools
  const tools = await client.listTools();
  console.log('Available tools:', tools);

  // Call a tool
  const result = await client.callTool('echo', { message: 'test' });
  console.log('Result:', result);

  await client.close();
}

Production Deployment Considerations

When deploying advanced MCP servers to production:

  1. Use Streamable HTTP for remote deployments, not SSE
  2. Implement authentication (see MCP authentication)
  3. Add rate limiting to prevent abuse
  4. Set up monitoring with structured logging
  5. Deploy with containers (see Docker deployment)
  6. Consider serverless for variable workloads (see Lambda deployment)

For comprehensive security guidance, read our security fundamentals tutorial and our MCP server security guide.

Conclusion

Advanced MCP patterns enable you to build production-ready servers that handle streaming, authentication, error recovery, and scale. The key is choosing the right transport for your deployment model, implementing proper security, and structuring your code for maintainability.

Code Examples

Streamable HTTP Server (TypeScript)typescript
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
import express from 'express';
import { z } from 'zod';

const app = express();
app.use(express.json());

const server = new McpServer({ name: 'remote-server', version: '1.0.0' });

server.tool('echo', 'Echo a message', { message: z.string() }, async ({ message }) => ({
  content: [{ type: 'text', text: `Echo: ${message}` }],
}));

const transport = new StreamableHTTPServerTransport({
  sessionIdGenerator: () => crypto.randomUUID(),
});

app.post('/mcp', async (req, res) => transport.handleRequest(req, res));
app.get('/mcp', async (req, res) => transport.handleRequest(req, res));
app.delete('/mcp', async (req, res) => transport.handleRequest(req, res));

await server.connect(transport);
app.listen(3000);
Streaming Progress (Python)python
from mcp.server.fastmcp import FastMCP, Context

mcp = FastMCP("streaming-server")

@mcp.tool()
async def long_task(steps: int, ctx: Context) -> str:
    """Run a long task with progress updates.

    Args:
        steps: Number of steps to process
    """
    for i in range(steps):
        await ctx.report_progress(i + 1, steps)
        ctx.info(f"Step {i + 1}/{steps} complete")
    return f"Completed {steps} steps"

if __name__ == "__main__":
    mcp.run()
Authentication Middlewaretypescript
import express from 'express';
import jwt from 'jsonwebtoken';

const JWT_SECRET = process.env.JWT_SECRET || 'your-secret';

function authMiddleware(req: express.Request, res: express.Response, next: express.NextFunction) {
  const token = req.headers.authorization?.replace('Bearer ', '');
  if (!token) {
    return res.status(401).json({ error: 'Missing authorization header' });
  }
  try {
    const decoded = jwt.verify(token, JWT_SECRET);
    (req as any).user = decoded;
    next();
  } catch {
    return res.status(401).json({ error: 'Invalid token' });
  }
}

app.use('/mcp', authMiddleware);

Key Takeaways

  • Use Streamable HTTP transport for remote deployments instead of deprecated SSE
  • Streaming responses enable progress tracking for long-running operations
  • Authentication is essential for any network-exposed MCP server
  • The tool registry pattern keeps multi-tool servers organized and maintainable
  • Structured error handling with isError flag helps clients handle failures gracefully

Troubleshooting

Streamable HTTP connection drops behind a reverse proxy

Ensure your reverse proxy (nginx, Caddy) supports HTTP streaming and has appropriate timeout settings. Set proxy_read_timeout to at least 300s for long-running operations. Disable response buffering with proxy_buffering off.

OAuth tokens expire during long-running tool calls

Implement token refresh logic in your authentication middleware. Use refresh tokens to obtain new access tokens before they expire. Consider using longer-lived tokens for MCP server connections.

Progress notifications not reaching the client

Verify that your client supports the notifications/progress method. Check that the transport connection is still active. In Streamable HTTP, ensure the GET endpoint is properly configured for server-to-client messages.

Next Steps

  • Implement authentication with the MCP authentication tutorial
  • Deploy your server with Docker or Kubernetes
  • Add performance optimization with caching and rate limiting
  • Explore security hardening in the security fundamentals tutorial

Was this helpful?

Share tutorial:

Stay Updated with MCP Insights

Join 5,000+ developers and get weekly insights on MCP development, new server releases, and implementation strategies delivered to your inbox.

We respect your privacy. Unsubscribe at any time.

MCPgee Team

We write in-depth guides, tutorials, and reviews to help developers get the most out of the Model Context Protocol ecosystem.

Frequently Asked Questions

Explore MCP Servers

Browse our directory of 33,000+ MCP servers. Find the perfect tools for your AI-powered workflows.