MCP Server Performance and Scaling: A Production Guide
Guide to MCP server performance scaling with connection pooling, caching, horizontal scaling, load balancing, and async tool execution.
Optimizing MCP server performance for production means implementing connection pooling for external services, caching frequently accessed data, using async tool execution to handle concurrent requests, and scaling horizontally behind a load balancer when a single instance is not enough. This guide covers each strategy with practical implementation patterns.
A development MCP server that handles one user at a time has minimal performance requirements. A production server that supports a team of 50 developers, each running AI-assisted workflows that invoke tools dozens of times per session, needs careful optimization. The difference between a responsive server (sub-second tool responses) and a sluggish one (multi-second delays) determines whether your team actually uses the MCP integration or abandons it out of frustration.
This guide builds on our Deploying Remote MCP Servers pillar. Start there if you have not deployed a remote server yet. For the foundational server-building tutorial, see How to Build an MCP Server in Python.
Measuring Performance Before Optimizing
Before optimizing anything, establish baselines. You cannot improve what you do not measure.
Key Metrics to Track
| Metric | What It Measures | Target |
|---|---|---|
| Tool response time (p50) | Median time from tool call to response | Under 500ms |
| Tool response time (p99) | 99th percentile response time | Under 2 seconds |
| Concurrent connections | Number of simultaneous client connections | Depends on deployment |
| Error rate | Percentage of tool calls that fail | Under 1% |
| Memory usage | RSS of the server process | Stable (no leaks) |
| CPU utilization | Processor usage during tool execution | Under 70% average |
Adding Basic Timing to Tools
Add response time logging to every tool so you can identify bottlenecks:
import time
import logging
from mcp.server.fastmcp import FastMCP
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp-server")
mcp = FastMCP("production-server")
@mcp.tool()
async def query_database(sql: str) -> str:
"""Execute a read-only SQL query.
Args:
sql: The SQL SELECT query to execute.
"""
start = time.monotonic()
try:
result = await execute_query(sql)
duration = time.monotonic() - start
logger.info(
"tool=query_database duration=%.3fs status=success rows=%d",
duration,
len(result),
)
return format_results(result)
except Exception as e:
duration = time.monotonic() - start
logger.error(
"tool=query_database duration=%.3fs status=error error=%s",
duration,
str(e),
)
return f"Query failed: {str(e)}"
Use structured logging (key=value pairs) so logs are easy to parse with tools like Grafana Loki or CloudWatch Logs Insights.
Connection Pooling
The single biggest performance improvement for most MCP servers is connection pooling. Without it, every tool call opens a new connection to your database, API, or external service, then closes it when done. The overhead of establishing connections (TCP handshake, TLS negotiation, authentication) adds hundreds of milliseconds to every call.
Database Connection Pooling
Use a connection pool to maintain a set of reusable database connections:
import asyncpg
from contextlib import asynccontextmanager
# Global connection pool (initialized at startup)
db_pool = None
async def initialize_pool(database_url: str):
"""Create a connection pool at server startup."""
global db_pool
db_pool = await asyncpg.create_pool(
database_url,
min_size=5, # Minimum idle connections
max_size=20, # Maximum total connections
max_inactive_connection_lifetime=300, # Close idle connections after 5 min
command_timeout=30, # Timeout for individual queries
)
async def execute_query(sql: str):
"""Execute a query using a pooled connection."""
async with db_pool.acquire() as conn:
return await conn.fetch(sql)
@mcp.tool()
async def run_query(sql: str) -> str:
"""Run a read-only SQL query against the database.
Args:
sql: The SQL SELECT query to execute.
"""
if not sql.strip().upper().startswith("SELECT"):
return "Error: Only SELECT queries are allowed."
rows = await execute_query(sql)
return format_as_table(rows)
HTTP Client Connection Pooling
For servers that call external APIs, reuse an HTTP client instead of creating one per request:
import httpx
# Create a shared client with connection pooling
http_client = httpx.AsyncClient(
timeout=30.0,
limits=httpx.Limits(
max_connections=100,
max_keepalive_connections=20,
keepalive_expiry=30,
),
)
@mcp.tool()
async def fetch_api_data(endpoint: str) -> str:
"""Fetch data from the internal API.
Args:
endpoint: The API endpoint path (e.g., /users/123).
"""
response = await http_client.get(
f"https://api.example.com{endpoint}"
)
response.raise_for_status()
return response.text
Connection Pool Sizing Guide
| Deployment Size | DB Pool Size | HTTP Pool Size |
|---|---|---|
| Single developer | 2-5 connections | 10 connections |
| Small team (5-10) | 5-15 connections | 20-50 connections |
| Medium team (10-50) | 15-30 connections | 50-100 connections |
| Large deployment (50+) | 30-50 per instance | 100+ per instance |
Set the minimum pool size to handle your steady-state load and the maximum to handle spikes. Monitor connection usage and adjust based on actual traffic patterns.
Caching Strategies
Caching eliminates redundant work by storing the results of expensive operations and returning them directly on subsequent requests.
In-Memory Caching with TTL
For data that changes infrequently, use a simple time-based cache:
import time
from typing import Any
class TTLCache:
"""Simple in-memory cache with time-to-live expiration."""
def __init__(self, default_ttl: int = 300):
self._cache: dict = {}
self._default_ttl = default_ttl
def get(self, key: str) -> Any:
"""Get a value from cache. Returns None if expired or missing."""
entry = self._cache.get(key)
if entry is None:
return None
value, expiry = entry
if time.monotonic() > expiry:
del self._cache[key]
return None
return value
def set(self, key: str, value: Any, ttl: int = None):
"""Store a value in cache with optional custom TTL."""
expiry = time.monotonic() + (ttl or self._default_ttl)
self._cache[key] = (value, expiry)
def invalidate(self, key: str):
"""Remove a specific key from cache."""
self._cache.pop(key, None)
def clear(self):
"""Clear all cached entries."""
self._cache.clear()
# Create caches for different data types
schema_cache = TTLCache(default_ttl=3600) # Schema changes rarely: 1 hour
query_cache = TTLCache(default_ttl=60) # Query results: 1 minute
config_cache = TTLCache(default_ttl=300) # Config data: 5 minutes
@mcp.tool()
async def get_table_schema(table_name: str) -> str:
"""Get the schema of a database table.
Args:
table_name: The name of the table.
"""
# Check cache first
cached = schema_cache.get(f"schema:{table_name}")
if cached is not None:
return cached
# Cache miss: query the database
schema = await fetch_schema_from_db(table_name)
schema_cache.set(f"schema:{table_name}", schema)
return schema
When to Cache vs When Not to Cache
| Cache This | Do Not Cache This |
|---|---|
| Database schemas and metadata | User-specific sensitive data |
| API responses that change slowly | Write operation results |
| Configuration and reference data | Real-time metrics or live data |
| Static file contents | Unique per-request computations |
| Search results for common queries | Authentication tokens (use proper token stores) |
Redis for Shared Caching
When running multiple server instances, an in-memory cache per instance leads to inconsistencies. Use Redis as a shared cache:
import redis.asyncio as redis
import json
redis_client = redis.from_url(
"redis://localhost:6379",
decode_responses=True,
)
async def cached_query(cache_key: str, ttl: int, query_fn):
"""Generic caching wrapper using Redis.
Args:
cache_key: The Redis key for this cached value.
ttl: Time-to-live in seconds.
query_fn: Async function to call on cache miss.
"""
# Try cache first
cached = await redis_client.get(cache_key)
if cached is not None:
return json.loads(cached)
# Cache miss: execute the query
result = await query_fn()
await redis_client.setex(cache_key, ttl, json.dumps(result))
return result
Async Tool Execution
The MCP Python SDK supports async tools natively. Use async for any tool that performs I/O (network requests, database queries, file reads) to allow the server to handle other requests while waiting for I/O to complete.
Sync vs Async Performance Impact
| Scenario | Sync Behavior | Async Behavior |
|---|---|---|
| Tool A (200ms DB query) called while Tool B (500ms API call) is running | Tool A waits for Tool B to finish. Total: 700ms. | Both run concurrently. Total: 500ms. |
| 10 concurrent tool calls, each taking 100ms | Processed sequentially. Total: 1000ms. | Processed concurrently. Total: ~100ms. |
Writing Efficient Async Tools
import asyncio
@mcp.tool()
async def analyze_repository(repo_url: str) -> str:
"""Analyze a GitHub repository's health metrics.
Args:
repo_url: The GitHub repository URL (e.g., owner/repo).
"""
# Run multiple API calls concurrently instead of sequentially
stars_task = fetch_star_count(repo_url)
issues_task = fetch_open_issues(repo_url)
commits_task = fetch_recent_commits(repo_url)
license_task = fetch_license_info(repo_url)
# Wait for all tasks to complete
stars, issues, commits, license_info = await asyncio.gather(
stars_task, issues_task, commits_task, license_task
)
return format_analysis(stars, issues, commits, license_info)
By using asyncio.gather, all four API calls run in parallel. If each takes 200ms, the total is still ~200ms instead of 800ms.
Horizontal Scaling
When a single server instance cannot handle the load, scale horizontally by running multiple instances behind a load balancer.
MCP Server Scaling Architecture
A horizontally scaled MCP deployment looks like this:
Clients (Claude Desktop, Cursor, etc.)
|
Load Balancer (nginx / ALB / Cloud LB)
|
-----------------
| | |
Server Server Server
Inst 1 Inst 2 Inst 3
| | |
-----------------
|
Shared State (Redis / Database)
Load Balancer Configuration
For MCP servers using Streamable HTTP transport, standard HTTP load balancing works well:
upstream mcp_servers {
least_conn;
server 10.0.1.10:8000;
server 10.0.1.11:8000;
server 10.0.1.12:8000;
}
server {
listen 443 ssl;
server_name mcp.example.com;
ssl_certificate /etc/ssl/certs/mcp.crt;
ssl_certificate_key /etc/ssl/private/mcp.key;
location / {
proxy_pass http://mcp_servers;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# SSE support (long-lived connections)
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 86400s;
}
# Health check endpoint
location /health {
proxy_pass http://mcp_servers;
}
}
SSE Transport Considerations
If using SSE transport (which requires persistent connections), configure sticky sessions so that a client's SSE connection and its POST requests go to the same server instance:
upstream mcp_servers_sse {
ip_hash; # Sticky sessions based on client IP
server 10.0.1.10:8000;
server 10.0.1.11:8000;
server 10.0.1.12:8000;
}
Scaling Decision Guide
| Signal | Action |
|---|---|
| p99 response time consistently above 2 seconds | Profile and optimize the slowest tools first |
| CPU consistently above 70% | Add more server instances |
| Memory growing over time | Fix memory leaks (unclosed connections, growing caches) |
| Connection pool exhausted | Increase pool size or add instances |
| Single tool causing all latency | Optimize or cache that specific tool |
Resource Limits and Protection
Protect your server from runaway queries, oversized responses, and resource exhaustion:
import asyncio
MAX_RESPONSE_SIZE = 50_000 # Characters
TOOL_TIMEOUT = 30 # Seconds
@mcp.tool()
async def safe_query(sql: str) -> str:
"""Execute a database query with safety limits.
Args:
sql: The SQL SELECT query to execute.
"""
try:
result = await asyncio.wait_for(
execute_query(sql),
timeout=TOOL_TIMEOUT,
)
except asyncio.TimeoutError:
return "Error: Query timed out after 30 seconds. Try a more specific query."
response = format_results(result)
# Truncate oversized responses
if len(response) > MAX_RESPONSE_SIZE:
truncated = response[:MAX_RESPONSE_SIZE]
return truncated + "\n\n(Response truncated. Narrow your query for complete results.)"
return response
Resource Limit Recommendations
| Resource | Recommended Limit | Why |
|---|---|---|
| Tool timeout | 30 seconds | Prevents hung connections from consuming resources |
| Response size | 50,000 characters | Keeps responses within MCP client context windows |
| Query row limit | 1,000 rows | Prevents massive result sets from overwhelming memory |
| File read size | 1 MB | Avoids loading huge files into memory |
| Concurrent requests per client | 10 | Prevents a single client from monopolizing resources |
Monitoring Response Times in Production
Set up structured logging that feeds into your monitoring stack:
import time
import logging
import json
logger = logging.getLogger("mcp-metrics")
class MetricsMiddleware:
"""Track tool execution metrics."""
def __init__(self):
self.call_count = 0
self.error_count = 0
self.total_duration = 0.0
def record(self, tool_name: str, duration: float, success: bool):
"""Record a tool execution metric."""
self.call_count += 1
self.total_duration += duration
if not success:
self.error_count += 1
logger.info(json.dumps({
"event": "tool_execution",
"tool": tool_name,
"duration_ms": round(duration * 1000, 2),
"success": success,
"total_calls": self.call_count,
"error_rate": round(
self.error_count / self.call_count * 100, 2
),
}))
metrics = MetricsMiddleware()
Connect these metrics to dashboards in Grafana, Datadog, or CloudWatch to get real-time visibility into your server's performance. Set alerts for p99 response times exceeding your targets and error rates above 1%.
For comprehensive monitoring and observability guidance, see our MCP Monitoring and Observability pillar guide.
Performance Optimization Checklist
Work through this checklist in order -- each item builds on the previous:
| Priority | Optimization | Expected Impact |
|---|---|---|
| 1 | Add response time logging to all tools | Visibility (no speed improvement, but essential) |
| 2 | Implement connection pooling for databases and HTTP clients | 2-10x improvement for I/O-heavy tools |
| 3 | Convert sync tools to async | 2-5x improvement for concurrent usage |
| 4 | Add caching for frequently accessed, slowly changing data | 10-100x improvement for cached responses |
| 5 | Set resource limits (timeouts, response size caps) | Prevents cascading failures |
| 6 | Scale horizontally behind a load balancer | Linear capacity increase |
| 7 | Use Redis for shared caching across instances | Consistent cache hits across all instances |
What to Read Next
- Deploying Remote MCP Servers -- the complete production deployment guide
- MCP Monitoring and Observability -- deep dive into metrics, logging, and alerting
- MCP Server Python Quickstart -- build your first server before optimizing
- Adding Authentication to Python MCP Servers -- secure your production server
- Local vs Remote MCP Servers -- understand when remote deployment and scaling is needed