Building MCP Servers
Guide

MCP Server Performance and Scaling: A Production Guide

Guide to MCP server performance scaling with connection pooling, caching, horizontal scaling, load balancing, and async tool execution.

12 min read
Updated February 26, 2026
By MCPServerSpot Team

Optimizing MCP server performance for production means implementing connection pooling for external services, caching frequently accessed data, using async tool execution to handle concurrent requests, and scaling horizontally behind a load balancer when a single instance is not enough. This guide covers each strategy with practical implementation patterns.

A development MCP server that handles one user at a time has minimal performance requirements. A production server that supports a team of 50 developers, each running AI-assisted workflows that invoke tools dozens of times per session, needs careful optimization. The difference between a responsive server (sub-second tool responses) and a sluggish one (multi-second delays) determines whether your team actually uses the MCP integration or abandons it out of frustration.

This guide builds on our Deploying Remote MCP Servers pillar. Start there if you have not deployed a remote server yet. For the foundational server-building tutorial, see How to Build an MCP Server in Python.

Measuring Performance Before Optimizing

Before optimizing anything, establish baselines. You cannot improve what you do not measure.

Key Metrics to Track

MetricWhat It MeasuresTarget
Tool response time (p50)Median time from tool call to responseUnder 500ms
Tool response time (p99)99th percentile response timeUnder 2 seconds
Concurrent connectionsNumber of simultaneous client connectionsDepends on deployment
Error ratePercentage of tool calls that failUnder 1%
Memory usageRSS of the server processStable (no leaks)
CPU utilizationProcessor usage during tool executionUnder 70% average

Adding Basic Timing to Tools

Add response time logging to every tool so you can identify bottlenecks:

import time
import logging
from mcp.server.fastmcp import FastMCP

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp-server")

mcp = FastMCP("production-server")


@mcp.tool()
async def query_database(sql: str) -> str:
    """Execute a read-only SQL query.

    Args:
        sql: The SQL SELECT query to execute.
    """
    start = time.monotonic()
    try:
        result = await execute_query(sql)
        duration = time.monotonic() - start
        logger.info(
            "tool=query_database duration=%.3fs status=success rows=%d",
            duration,
            len(result),
        )
        return format_results(result)
    except Exception as e:
        duration = time.monotonic() - start
        logger.error(
            "tool=query_database duration=%.3fs status=error error=%s",
            duration,
            str(e),
        )
        return f"Query failed: {str(e)}"

Use structured logging (key=value pairs) so logs are easy to parse with tools like Grafana Loki or CloudWatch Logs Insights.

Connection Pooling

The single biggest performance improvement for most MCP servers is connection pooling. Without it, every tool call opens a new connection to your database, API, or external service, then closes it when done. The overhead of establishing connections (TCP handshake, TLS negotiation, authentication) adds hundreds of milliseconds to every call.

Database Connection Pooling

Use a connection pool to maintain a set of reusable database connections:

import asyncpg
from contextlib import asynccontextmanager

# Global connection pool (initialized at startup)
db_pool = None


async def initialize_pool(database_url: str):
    """Create a connection pool at server startup."""
    global db_pool
    db_pool = await asyncpg.create_pool(
        database_url,
        min_size=5,      # Minimum idle connections
        max_size=20,     # Maximum total connections
        max_inactive_connection_lifetime=300,  # Close idle connections after 5 min
        command_timeout=30,  # Timeout for individual queries
    )


async def execute_query(sql: str):
    """Execute a query using a pooled connection."""
    async with db_pool.acquire() as conn:
        return await conn.fetch(sql)


@mcp.tool()
async def run_query(sql: str) -> str:
    """Run a read-only SQL query against the database.

    Args:
        sql: The SQL SELECT query to execute.
    """
    if not sql.strip().upper().startswith("SELECT"):
        return "Error: Only SELECT queries are allowed."
    rows = await execute_query(sql)
    return format_as_table(rows)

HTTP Client Connection Pooling

For servers that call external APIs, reuse an HTTP client instead of creating one per request:

import httpx

# Create a shared client with connection pooling
http_client = httpx.AsyncClient(
    timeout=30.0,
    limits=httpx.Limits(
        max_connections=100,
        max_keepalive_connections=20,
        keepalive_expiry=30,
    ),
)


@mcp.tool()
async def fetch_api_data(endpoint: str) -> str:
    """Fetch data from the internal API.

    Args:
        endpoint: The API endpoint path (e.g., /users/123).
    """
    response = await http_client.get(
        f"https://api.example.com{endpoint}"
    )
    response.raise_for_status()
    return response.text

Connection Pool Sizing Guide

Deployment SizeDB Pool SizeHTTP Pool Size
Single developer2-5 connections10 connections
Small team (5-10)5-15 connections20-50 connections
Medium team (10-50)15-30 connections50-100 connections
Large deployment (50+)30-50 per instance100+ per instance

Set the minimum pool size to handle your steady-state load and the maximum to handle spikes. Monitor connection usage and adjust based on actual traffic patterns.

Caching Strategies

Caching eliminates redundant work by storing the results of expensive operations and returning them directly on subsequent requests.

In-Memory Caching with TTL

For data that changes infrequently, use a simple time-based cache:

import time
from typing import Any

class TTLCache:
    """Simple in-memory cache with time-to-live expiration."""

    def __init__(self, default_ttl: int = 300):
        self._cache: dict = {}
        self._default_ttl = default_ttl

    def get(self, key: str) -> Any:
        """Get a value from cache. Returns None if expired or missing."""
        entry = self._cache.get(key)
        if entry is None:
            return None
        value, expiry = entry
        if time.monotonic() > expiry:
            del self._cache[key]
            return None
        return value

    def set(self, key: str, value: Any, ttl: int = None):
        """Store a value in cache with optional custom TTL."""
        expiry = time.monotonic() + (ttl or self._default_ttl)
        self._cache[key] = (value, expiry)

    def invalidate(self, key: str):
        """Remove a specific key from cache."""
        self._cache.pop(key, None)

    def clear(self):
        """Clear all cached entries."""
        self._cache.clear()


# Create caches for different data types
schema_cache = TTLCache(default_ttl=3600)   # Schema changes rarely: 1 hour
query_cache = TTLCache(default_ttl=60)       # Query results: 1 minute
config_cache = TTLCache(default_ttl=300)     # Config data: 5 minutes


@mcp.tool()
async def get_table_schema(table_name: str) -> str:
    """Get the schema of a database table.

    Args:
        table_name: The name of the table.
    """
    # Check cache first
    cached = schema_cache.get(f"schema:{table_name}")
    if cached is not None:
        return cached

    # Cache miss: query the database
    schema = await fetch_schema_from_db(table_name)
    schema_cache.set(f"schema:{table_name}", schema)
    return schema

When to Cache vs When Not to Cache

Cache ThisDo Not Cache This
Database schemas and metadataUser-specific sensitive data
API responses that change slowlyWrite operation results
Configuration and reference dataReal-time metrics or live data
Static file contentsUnique per-request computations
Search results for common queriesAuthentication tokens (use proper token stores)

Redis for Shared Caching

When running multiple server instances, an in-memory cache per instance leads to inconsistencies. Use Redis as a shared cache:

import redis.asyncio as redis
import json

redis_client = redis.from_url(
    "redis://localhost:6379",
    decode_responses=True,
)


async def cached_query(cache_key: str, ttl: int, query_fn):
    """Generic caching wrapper using Redis.

    Args:
        cache_key: The Redis key for this cached value.
        ttl: Time-to-live in seconds.
        query_fn: Async function to call on cache miss.
    """
    # Try cache first
    cached = await redis_client.get(cache_key)
    if cached is not None:
        return json.loads(cached)

    # Cache miss: execute the query
    result = await query_fn()
    await redis_client.setex(cache_key, ttl, json.dumps(result))
    return result

Async Tool Execution

The MCP Python SDK supports async tools natively. Use async for any tool that performs I/O (network requests, database queries, file reads) to allow the server to handle other requests while waiting for I/O to complete.

Sync vs Async Performance Impact

ScenarioSync BehaviorAsync Behavior
Tool A (200ms DB query) called while Tool B (500ms API call) is runningTool A waits for Tool B to finish. Total: 700ms.Both run concurrently. Total: 500ms.
10 concurrent tool calls, each taking 100msProcessed sequentially. Total: 1000ms.Processed concurrently. Total: ~100ms.

Writing Efficient Async Tools

import asyncio


@mcp.tool()
async def analyze_repository(repo_url: str) -> str:
    """Analyze a GitHub repository's health metrics.

    Args:
        repo_url: The GitHub repository URL (e.g., owner/repo).
    """
    # Run multiple API calls concurrently instead of sequentially
    stars_task = fetch_star_count(repo_url)
    issues_task = fetch_open_issues(repo_url)
    commits_task = fetch_recent_commits(repo_url)
    license_task = fetch_license_info(repo_url)

    # Wait for all tasks to complete
    stars, issues, commits, license_info = await asyncio.gather(
        stars_task, issues_task, commits_task, license_task
    )

    return format_analysis(stars, issues, commits, license_info)

By using asyncio.gather, all four API calls run in parallel. If each takes 200ms, the total is still ~200ms instead of 800ms.

Horizontal Scaling

When a single server instance cannot handle the load, scale horizontally by running multiple instances behind a load balancer.

MCP Server Scaling Architecture

A horizontally scaled MCP deployment looks like this:

Clients (Claude Desktop, Cursor, etc.)
           |
    Load Balancer (nginx / ALB / Cloud LB)
           |
    -----------------
    |       |       |
 Server  Server  Server
 Inst 1  Inst 2  Inst 3
    |       |       |
    -----------------
           |
    Shared State (Redis / Database)

Load Balancer Configuration

For MCP servers using Streamable HTTP transport, standard HTTP load balancing works well:

upstream mcp_servers {
    least_conn;
    server 10.0.1.10:8000;
    server 10.0.1.11:8000;
    server 10.0.1.12:8000;
}

server {
    listen 443 ssl;
    server_name mcp.example.com;

    ssl_certificate /etc/ssl/certs/mcp.crt;
    ssl_certificate_key /etc/ssl/private/mcp.key;

    location / {
        proxy_pass http://mcp_servers;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;

        # SSE support (long-lived connections)
        proxy_buffering off;
        proxy_cache off;
        proxy_read_timeout 86400s;
    }

    # Health check endpoint
    location /health {
        proxy_pass http://mcp_servers;
    }
}

SSE Transport Considerations

If using SSE transport (which requires persistent connections), configure sticky sessions so that a client's SSE connection and its POST requests go to the same server instance:

upstream mcp_servers_sse {
    ip_hash;  # Sticky sessions based on client IP
    server 10.0.1.10:8000;
    server 10.0.1.11:8000;
    server 10.0.1.12:8000;
}

Scaling Decision Guide

SignalAction
p99 response time consistently above 2 secondsProfile and optimize the slowest tools first
CPU consistently above 70%Add more server instances
Memory growing over timeFix memory leaks (unclosed connections, growing caches)
Connection pool exhaustedIncrease pool size or add instances
Single tool causing all latencyOptimize or cache that specific tool

Resource Limits and Protection

Protect your server from runaway queries, oversized responses, and resource exhaustion:

import asyncio

MAX_RESPONSE_SIZE = 50_000  # Characters
TOOL_TIMEOUT = 30  # Seconds


@mcp.tool()
async def safe_query(sql: str) -> str:
    """Execute a database query with safety limits.

    Args:
        sql: The SQL SELECT query to execute.
    """
    try:
        result = await asyncio.wait_for(
            execute_query(sql),
            timeout=TOOL_TIMEOUT,
        )
    except asyncio.TimeoutError:
        return "Error: Query timed out after 30 seconds. Try a more specific query."

    response = format_results(result)

    # Truncate oversized responses
    if len(response) > MAX_RESPONSE_SIZE:
        truncated = response[:MAX_RESPONSE_SIZE]
        return truncated + "\n\n(Response truncated. Narrow your query for complete results.)"

    return response

Resource Limit Recommendations

ResourceRecommended LimitWhy
Tool timeout30 secondsPrevents hung connections from consuming resources
Response size50,000 charactersKeeps responses within MCP client context windows
Query row limit1,000 rowsPrevents massive result sets from overwhelming memory
File read size1 MBAvoids loading huge files into memory
Concurrent requests per client10Prevents a single client from monopolizing resources

Monitoring Response Times in Production

Set up structured logging that feeds into your monitoring stack:

import time
import logging
import json

logger = logging.getLogger("mcp-metrics")


class MetricsMiddleware:
    """Track tool execution metrics."""

    def __init__(self):
        self.call_count = 0
        self.error_count = 0
        self.total_duration = 0.0

    def record(self, tool_name: str, duration: float, success: bool):
        """Record a tool execution metric."""
        self.call_count += 1
        self.total_duration += duration
        if not success:
            self.error_count += 1

        logger.info(json.dumps({
            "event": "tool_execution",
            "tool": tool_name,
            "duration_ms": round(duration * 1000, 2),
            "success": success,
            "total_calls": self.call_count,
            "error_rate": round(
                self.error_count / self.call_count * 100, 2
            ),
        }))


metrics = MetricsMiddleware()

Connect these metrics to dashboards in Grafana, Datadog, or CloudWatch to get real-time visibility into your server's performance. Set alerts for p99 response times exceeding your targets and error rates above 1%.

For comprehensive monitoring and observability guidance, see our MCP Monitoring and Observability pillar guide.

Performance Optimization Checklist

Work through this checklist in order -- each item builds on the previous:

PriorityOptimizationExpected Impact
1Add response time logging to all toolsVisibility (no speed improvement, but essential)
2Implement connection pooling for databases and HTTP clients2-10x improvement for I/O-heavy tools
3Convert sync tools to async2-5x improvement for concurrent usage
4Add caching for frequently accessed, slowly changing data10-100x improvement for cached responses
5Set resource limits (timeouts, response size caps)Prevents cascading failures
6Scale horizontally behind a load balancerLinear capacity increase
7Use Redis for shared caching across instancesConsistent cache hits across all instances

What to Read Next