MCP Authentication Implementation
Introduction
Authentication is essential for any MCP server exposed over the network. Without it, anyone who discovers your server endpoint can invoke tools, access resources, and potentially compromise your systems. This tutorial covers implementing authentication for MCP servers using multiple strategies, from simple API keys to full OAuth 2.0 flows.
For foundational security practices, read our security fundamentals tutorial first.
Authentication Strategies Overview
| Strategy | Complexity | Best For |
|---|
| API Keys | Low | Internal tools, development |
| JWT Tokens | Medium | Service-to-service, custom auth |
| OAuth 2.0 | High | Third-party access, user delegation |
| mTLS | High | High-security environments |
API Key Authentication
The simplest approach for internal or development MCP servers.
TypeScript Implementation
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
import express from 'express';
import crypto from 'crypto';
import { z } from 'zod';
const app = express();
app.use(express.json());
// Store API keys securely (in production, use a database or secrets manager)
const API_KEYS = new Set(process.env.MCP_API_KEYS?.split(',') || []);
function apiKeyAuth(req: express.Request, res: express.Response, next: express.NextFunction) {
const apiKey = req.headers['x-api-key'] as string;
if (!apiKey) {
return res.status(401).json({ error: 'Missing X-API-Key header' });
}
// Use timing-safe comparison to prevent timing attacks
const isValid = Array.from(API_KEYS).some(
(key) => crypto.timingSafeEqual(Buffer.from(apiKey), Buffer.from(key))
);
if (!isValid) {
return res.status(403).json({ error: 'Invalid API key' });
}
next();
}
const server = new McpServer({ name: 'authenticated-server', version: '1.0.0' });
server.tool('hello', 'Greet someone', { name: z.string() }, async ({ name }) => ({
content: [{ type: 'text', text: `Hello, ${name}!` }],
}));
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => crypto.randomUUID(),
});
app.use('/mcp', apiKeyAuth);
app.post('/mcp', (req, res) => transport.handleRequest(req, res));
app.get('/mcp', (req, res) => transport.handleRequest(req, res));
app.delete('/mcp', (req, res) => transport.handleRequest(req, res));
await server.connect(transport);
app.listen(3000);
Python Implementation
import os
import hmac
from mcp.server.fastmcp import FastMCP
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
API_KEYS = set(os.environ.get("MCP_API_KEYS", "").split(","))
class APIKeyMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
api_key = request.headers.get("x-api-key", "")
if not any(hmac.compare_digest(api_key, key) for key in API_KEYS):
return JSONResponse({"error": "Invalid API key"}, status_code=403)
return await call_next(request)
mcp = FastMCP("secure-server")
@mcp.tool()
def hello(name: str) -> str:
"""Greet someone.
Args:
name: Name to greet
"""
return f"Hello, {name}!"
JWT Token Authentication
JWT (JSON Web Tokens) provide stateless authentication with embedded claims.
Generating and Verifying Tokens
import jwt from 'jsonwebtoken';
import express from 'express';
const JWT_SECRET = process.env.JWT_SECRET!;
const JWT_ISSUER = 'mcp-auth-server';
// Token generation endpoint
app.post('/auth/token', (req, res) => {
const { clientId, clientSecret } = req.body;
// Validate client credentials against your database
const client = validateClient(clientId, clientSecret);
if (!client) {
return res.status(401).json({ error: 'Invalid credentials' });
}
const token = jwt.sign(
{
sub: clientId,
role: client.role,
permissions: client.permissions,
},
JWT_SECRET,
{
issuer: JWT_ISSUER,
expiresIn: '1h',
algorithm: 'HS256',
}
);
res.json({
access_token: token,
token_type: 'Bearer',
expires_in: 3600,
});
});
// JWT verification middleware
function jwtAuth(req: express.Request, res: express.Response, next: express.NextFunction) {
const authHeader = req.headers.authorization;
if (!authHeader || !authHeader.startsWith('Bearer ')) {
return res.status(401).json({ error: 'Missing Bearer token' });
}
const token = authHeader.slice(7);
try {
const decoded = jwt.verify(token, JWT_SECRET, {
issuer: JWT_ISSUER,
algorithms: ['HS256'],
});
(req as any).auth = decoded;
next();
} catch (error) {
if (error instanceof jwt.TokenExpiredError) {
return res.status(401).json({ error: 'Token expired' });
}
return res.status(401).json({ error: 'Invalid token' });
}
}
app.use('/mcp', jwtAuth);
Python JWT Verification
import jwt
from datetime import datetime, timedelta
JWT_SECRET = os.environ["JWT_SECRET"]
def create_token(client_id: str, role: str, permissions: list[str]) -> str:
payload = {
"sub": client_id,
"role": role,
"permissions": permissions,
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + timedelta(hours=1),
"iss": "mcp-auth-server",
}
return jwt.encode(payload, JWT_SECRET, algorithm="HS256")
def verify_token(token: str) -> dict:
try:
return jwt.decode(
token,
JWT_SECRET,
algorithms=["HS256"],
issuer="mcp-auth-server",
)
except jwt.ExpiredSignatureError:
raise ValueError("Token expired")
except jwt.InvalidTokenError:
raise ValueError("Invalid token")
OAuth 2.0 Authentication
OAuth 2.0 is the recommended approach for production MCP servers that support third-party access.
Client Credentials Flow
For service-to-service authentication:
import express from 'express';
import crypto from 'crypto';
const app = express();
app.use(express.json());
// OAuth client registry (use a database in production)
const clients = new Map([
['client-123', {
secret: 'hashed-secret-here',
scopes: ['tools:read', 'tools:execute'],
name: 'AI Assistant Client',
}],
]);
// Token endpoint
app.post('/oauth/token', async (req, res) => {
const { grant_type, client_id, client_secret, scope } = req.body;
if (grant_type !== 'client_credentials') {
return res.status(400).json({ error: 'unsupported_grant_type' });
}
const client = clients.get(client_id);
if (!client || !verifySecret(client_secret, client.secret)) {
return res.status(401).json({ error: 'invalid_client' });
}
// Validate requested scopes
const requestedScopes = (scope || '').split(' ');
const grantedScopes = requestedScopes.filter((s: string) => client.scopes.includes(s));
const accessToken = generateAccessToken({
sub: client_id,
scopes: grantedScopes,
});
res.json({
access_token: accessToken,
token_type: 'Bearer',
expires_in: 3600,
scope: grantedScopes.join(' '),
});
});
// Protected MCP endpoint
app.use('/mcp', (req, res, next) => {
const token = req.headers.authorization?.replace('Bearer ', '');
if (!token) {
return res.status(401).json({ error: 'Missing token' });
}
const payload = verifyAccessToken(token);
if (!payload) {
return res.status(401).json({ error: 'Invalid token' });
}
(req as any).auth = payload;
next();
});
Authorization Code Flow
For user-delegated access (browser-based):
// Authorization endpoint - renders consent screen
app.get('/oauth/authorize', (req, res) => {
const { client_id, redirect_uri, response_type, scope, state } = req.query;
if (response_type !== 'code') {
return res.status(400).json({ error: 'unsupported_response_type' });
}
// Render authorization consent page
res.render('authorize', { client_id, scope, state, redirect_uri });
});
// Handle user consent
app.post('/oauth/authorize', (req, res) => {
const { client_id, redirect_uri, scope, state, approved } = req.body;
if (!approved) {
return res.redirect(`${redirect_uri}?error=access_denied&state=${state}`);
}
const code = generateAuthorizationCode(client_id, scope);
res.redirect(`${redirect_uri}?code=${code}&state=${state}`);
});
// Token exchange endpoint
app.post('/oauth/token', (req, res) => {
const { grant_type, code, redirect_uri, client_id, client_secret } = req.body;
if (grant_type === 'authorization_code') {
const authData = validateAuthorizationCode(code, client_id, redirect_uri);
if (!authData) {
return res.status(400).json({ error: 'invalid_grant' });
}
const accessToken = generateAccessToken({ sub: authData.userId, scopes: authData.scopes });
const refreshToken = generateRefreshToken(authData.userId);
res.json({
access_token: accessToken,
refresh_token: refreshToken,
token_type: 'Bearer',
expires_in: 3600,
});
}
});
Role-Based Access Control (RBAC)
Restrict tool access based on user roles:
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { z } from 'zod';
type Role = 'viewer' | 'editor' | 'admin';
interface ToolPermission {
roles: Role[];
rateLimit: number;
}
const toolPermissions: Record<string, ToolPermission> = {
'read-data': { roles: ['viewer', 'editor', 'admin'], rateLimit: 100 },
'write-data': { roles: ['editor', 'admin'], rateLimit: 50 },
'delete-data': { roles: ['admin'], rateLimit: 10 },
'manage-users': { roles: ['admin'], rateLimit: 20 },
};
function checkPermission(toolName: string, userRole: Role): boolean {
const permission = toolPermissions[toolName];
if (!permission) return false;
return permission.roles.includes(userRole);
}
// Middleware that checks permissions before tool execution
app.use('/mcp', (req, res, next) => {
const auth = (req as any).auth;
if (!auth) {
return res.status(401).json({ error: 'Not authenticated' });
}
// Store auth context for tool handlers
(req as any).userRole = auth.role;
next();
});
Python RBAC
from enum import Enum
from functools import wraps
class Role(Enum):
VIEWER = "viewer"
EDITOR = "editor"
ADMIN = "admin"
ROLE_HIERARCHY = {
Role.ADMIN: {Role.VIEWER, Role.EDITOR, Role.ADMIN},
Role.EDITOR: {Role.VIEWER, Role.EDITOR},
Role.VIEWER: {Role.VIEWER},
}
def requires_role(minimum_role: Role):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
current_role = get_current_user_role()
if minimum_role not in ROLE_HIERARCHY.get(current_role, set()):
return json.dumps({"error": "Insufficient permissions"})
return func(*args, **kwargs)
return wrapper
return decorator
@mcp.tool()
@requires_role(Role.VIEWER)
def read_data(key: str) -> str:
"""Read data (requires viewer role).
Args:
key: Data key to read
"""
return json.dumps({"key": key, "value": data_store.get(key)})
@mcp.tool()
@requires_role(Role.ADMIN)
def delete_data(key: str) -> str:
"""Delete data (requires admin role).
Args:
key: Data key to delete
"""
data_store.pop(key, None)
return json.dumps({"status": "deleted"})
Token Refresh and Session Management
Refresh Token Flow
app.post('/oauth/token', async (req, res) => {
if (req.body.grant_type === 'refresh_token') {
const { refresh_token } = req.body;
const tokenData = validateRefreshToken(refresh_token);
if (!tokenData) {
return res.status(400).json({ error: 'invalid_grant' });
}
// Revoke old refresh token (rotation)
revokeRefreshToken(refresh_token);
// Issue new tokens
const newAccessToken = generateAccessToken({ sub: tokenData.userId });
const newRefreshToken = generateRefreshToken(tokenData.userId);
res.json({
access_token: newAccessToken,
refresh_token: newRefreshToken,
token_type: 'Bearer',
expires_in: 3600,
});
}
});
MCP Client Authentication Configuration
Claude Desktop with Authenticated Server
{
"mcpServers": {
"authenticated-server": {
"url": "https://mcp.example.com/mcp",
"headers": {
"Authorization": "Bearer your-token-here"
}
}
}
}
VS Code Configuration
{
"servers": {
"secure-server": {
"url": "https://mcp.example.com/mcp",
"headers": {
"X-API-Key": "your-api-key"
}
}
}
}
For more on client configuration, see our Claude integration and VS Code tutorials.
Security Best Practices
- Never hardcode secrets in source code. Use environment variables or secrets managers.
- Use HTTPS for all authenticated MCP endpoints.
- Rotate secrets regularly and implement token expiration.
- Log authentication events for monitoring and incident response.
- Implement account lockout after repeated failed attempts.
- Use timing-safe comparisons for secret validation.
For comprehensive security guidance, review our MCP server security guide.
Conclusion
Authentication protects your MCP servers from unauthorized access. Start with API keys for development and internal tools, then upgrade to JWT or OAuth 2.0 for production deployments. Combine authentication with RBAC and audit logging for defense in depth. Remember that authentication is just one layer of security, and should always be paired with the practices covered in our security fundamentals tutorial.