Advanced Usage

This guide covers advanced usage patterns and techniques for openapi-mcp, including extending functionality, customizing behavior, and optimizing performance.

Custom Response Processing

You can use the --post-hook-cmd flag to process responses with custom scripts or tools. This is useful for:

  • Transforming API responses into different formats
  • Filtering sensitive information from responses
  • Adding additional context or metadata to responses
  • Custom logging or analytics

Example: Response Transformation

bin/openapi-mcp --post-hook-cmd='./transform.sh' examples/api-spec.yaml

Where transform.sh could be a script that processes the JSON responses and applies custom transformations.

Combining Multiple OpenAPI Specs

You can combine multiple OpenAPI specifications by first merging them and then using the merged spec with openapi-mcp:

Step 1: Merge Specs

Use a tool like swagger-merger or openapi-merge to combine multiple specs:

npx openapi-merge-cli --config merge-config.json

Where merge-config.json contains:

{
  "inputs": [
    { "inputFile": "./specs/api1.yaml" },
    { "inputFile": "./specs/api2.yaml" }
  ],
  "output": "./merged-api.yaml"
}

Step 2: Use the Merged Spec

bin/openapi-mcp ./merged-api.yaml

Custom Authentication Flows

For APIs that require complex authentication flows (OAuth 2.0, etc.), you can implement custom logic:

OAuth 2.0 with Token Refresh

package main

import (
	"context"
	"log"
	"os"
	"time"

	"github.com/jedisct1/openapi-mcp/pkg/openapi2mcp"
)

func main() {
	// Create a token provider that handles OAuth token refresh
	tokenProvider := func(ctx context.Context) (string, error) {
		// Implement OAuth token acquisition/refresh logic here
		// This could make a call to an OAuth token endpoint
		return "refreshed-oauth-token", nil
	}

	// Set up custom HTTP headers for each request
	httpHeaderProvider := func(ctx context.Context) (map[string]string, error) {
		token, err := tokenProvider(ctx)
		if err != nil {
			return nil, err
		}
		return map[string]string{
			"Authorization": "Bearer " + token,
		}, nil
	}

	// Create server options with the custom header provider
	opts := []openapi2mcp.ServerOption{
		openapi2mcp.WithHTTPHeaderProvider(httpHeaderProvider),
	}

	// Create and start the server with the options
	server, err := openapi2mcp.NewServer(os.Args[1], opts...)
	if err != nil {
		log.Fatal(err)
	}

	if err := server.ServeStdio(context.Background()); err != nil {
		log.Fatal(err)
	}
}

Custom Error Handling

You can implement custom error handling to provide more context or alternative actions:

Example: Custom Error Handler

package main

import (
	"context"
	"fmt"
	"log"
	"net/http"
	"os"

	"github.com/jedisct1/openapi-mcp/pkg/openapi2mcp"
)

func main() {
	// Custom error handler function
	errorHandler := func(ctx context.Context, req *http.Request, err error) (interface{}, error) {
		if err != nil {
			// Log the error
			log.Printf("API Error: %v", err)
			
			// Provide a user-friendly response
			return map[string]interface{}{
				"error": "An error occurred while processing your request.",
				"suggestion": "Please check your parameters and try again.",
				"details": fmt.Sprintf("%v", err),
			}, nil
		}
		return nil, err
	}

	// Create server options with the custom error handler
	opts := []openapi2mcp.ServerOption{
		openapi2mcp.WithErrorHandler(errorHandler),
	}

	// Create and start the server with the options
	server, err := openapi2mcp.NewServer(os.Args[1], opts...)
	if err != nil {
		log.Fatal(err)
	}

	if err := server.ServeStdio(context.Background()); err != nil {
		log.Fatal(err)
	}
}

Response Caching

For performance optimization, you can implement response caching for frequently accessed endpoints:

Example: Simple Cache Implementation

package main

import (
	"context"
	"log"
	"net/http"
	"os"
	"sync"
	"time"

	"github.com/jedisct1/openapi-mcp/pkg/openapi2mcp"
)

type cacheEntry struct {
	response interface{}
	expiry   time.Time
}

func main() {
	// Create a simple in-memory cache
	cache := make(map[string]cacheEntry)
	cacheMutex := &sync.RWMutex{}
	cacheExpiry := 5 * time.Minute

	// Custom HTTP transport wrapper for caching
	httpTransportWrapper := func(next openapi2mcp.HTTPTransport) openapi2mcp.HTTPTransport {
		return func(ctx context.Context, req *http.Request) (*http.Response, error) {
			// Only cache GET requests
			if req.Method != http.MethodGet {
				return next(ctx, req)
			}

			// Check if we have a cached response
			cacheKey := req.URL.String()
			cacheMutex.RLock()
			entry, exists := cache[cacheKey]
			cacheMutex.RUnlock()

			// If we have a valid cache entry, return it
			if exists && time.Now().Before(entry.expiry) {
				// Create a response from the cached data
				resp := &http.Response{
					StatusCode: http.StatusOK,
					Header:     make(http.Header),
				}
				resp.Header.Set("X-Cache", "HIT")
				// Convert cached response to http.Response...
				return resp, nil
			}

			// Otherwise, call the API
			resp, err := next(ctx, req)
			if err != nil {
				return nil, err
			}

			// Cache the response if successful
			if resp.StatusCode >= 200 && resp.StatusCode < 300 {
				// Extract response body...
				cacheMutex.Lock()
				cache[cacheKey] = cacheEntry{
					response: resp,
					expiry:   time.Now().Add(cacheExpiry),
				}
				cacheMutex.Unlock()
			}

			return resp, nil
		}
	}

	// Create server options with the transport wrapper
	opts := []openapi2mcp.ServerOption{
		openapi2mcp.WithHTTPTransportWrapper(httpTransportWrapper),
	}

	// Create and start the server with the options
	server, err := openapi2mcp.NewServer(os.Args[1], opts...)
	if err != nil {
		log.Fatal(err)
	}

	if err := server.ServeStdio(context.Background()); err != nil {
		log.Fatal(err)
	}
}

Custom Parameter Validation

You can implement custom parameter validation logic beyond what's defined in the OpenAPI spec:

Example: Advanced Parameter Validation

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"regexp"

	"github.com/jedisct1/openapi-mcp/pkg/openapi2mcp"
)

func main() {
	// Custom parameter validator
	paramValidator := func(ctx context.Context, opID string, params json.RawMessage) error {
		// Parse the parameters
		var paramsMap map[string]interface{}
		if err := json.Unmarshal(params, ¶msMap); err != nil {
			return err
		}

		// Example: Validate email format
		if email, ok := paramsMap["email"].(string); ok {
			emailRegex := regexp.MustCompile(`^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$`)
			if !emailRegex.MatchString(email) {
				return fmt.Errorf("invalid email format: %s", email)
			}
		}

		// Example: Check for profanity in text fields
		if text, ok := paramsMap["description"].(string); ok {
			profanityList := []string{"badword1", "badword2", "badword3"}
			for _, word := range profanityList {
				if regexp.MustCompile(fmt.Sprintf(`(?i)\b%s\b`, word)).MatchString(text) {
					return fmt.Errorf("description contains inappropriate language")
				}
			}
		}

		return nil
	}

	// Create server options with the parameter validator
	opts := []openapi2mcp.ServerOption{
		openapi2mcp.WithParameterValidator(paramValidator),
	}

	// Create and start the server with the options
	server, err := openapi2mcp.NewServer(os.Args[1], opts...)
	if err != nil {
		log.Fatal(err)
	}

	if err := server.ServeStdio(context.Background()); err != nil {
		log.Fatal(err)
	}
}

Request Rate Limiting

To protect backend APIs, you can implement rate limiting:

Example: Simple Rate Limiting

package main

import (
	"context"
	"log"
	"net/http"
	"os"
	"sync"
	"time"

	"github.com/jedisct1/openapi-mcp/pkg/openapi2mcp"
)

func main() {
	// Rate limiter setup
	type rateLimiter struct {
		lastRequest time.Time
		tokens      int
		mu          sync.Mutex
	}

	limiter := &rateLimiter{
		tokens: 10, // Allow 10 requests initially
	}

	// Refill rate: 1 token per second
	go func() {
		ticker := time.NewTicker(time.Second)
		defer ticker.Stop()

		for range ticker.C {
			limiter.mu.Lock()
			if limiter.tokens < 10 {
				limiter.tokens++
			}
			limiter.mu.Unlock()
		}
	}()

	// Custom HTTP transport wrapper for rate limiting
	httpTransportWrapper := func(next openapi2mcp.HTTPTransport) openapi2mcp.HTTPTransport {
		return func(ctx context.Context, req *http.Request) (*http.Response, error) {
			limiter.mu.Lock()
			if limiter.tokens > 0 {
				limiter.tokens--
				limiter.lastRequest = time.Now()
				limiter.mu.Unlock()
				return next(ctx, req)
			}
			limiter.mu.Unlock()

			// Return a rate limit error
			return &http.Response{
				StatusCode: http.StatusTooManyRequests,
				Header:     make(http.Header),
				Body:       http.NoBody,
			}, nil
		}
	}

	// Create server options with the transport wrapper
	opts := []openapi2mcp.ServerOption{
		openapi2mcp.WithHTTPTransportWrapper(httpTransportWrapper),
	}

	// Create and start the server with the options
	server, err := openapi2mcp.NewServer(os.Args[1], opts...)
	if err != nil {
		log.Fatal(err)
	}

	if err := server.ServeStdio(context.Background()); err != nil {
		log.Fatal(err)
	}
}

Webhooks and Server-Sent Events (SSE)

For APIs that support webhooks or Server-Sent Events, you can implement custom handling:

Example: SSE Event Handling

package main

import (
	"bufio"
	"context"
	"fmt"
	"log"
	"net/http"
	"os"
	"strings"

	"github.com/jedisct1/openapi-mcp/pkg/openapi2mcp"
)

func main() {
	// Custom HTTP transport for SSE
	sseTransport := func(ctx context.Context, req *http.Request) (*http.Response, error) {
		// Configure the request for SSE
		req.Header.Set("Accept", "text/event-stream")
		req.Header.Set("Cache-Control", "no-cache")
		req.Header.Set("Connection", "keep-alive")

		// Make the request
		client := &http.Client{}
		resp, err := client.Do(req)
		if err != nil {
			return nil, err
		}

		// Process SSE events
		go func() {
			scanner := bufio.NewScanner(resp.Body)
			for scanner.Scan() {
				line := scanner.Text()
				if strings.HasPrefix(line, "data:") {
					eventData := strings.TrimPrefix(line, "data:")
					fmt.Printf("Received SSE event: %s\n", eventData)
					// Process the event data
					// ...
				}
			}
			if err := scanner.Err(); err != nil {
				log.Printf("SSE scanner error: %v", err)
			}
		}()

		return resp, nil
	}

	// Create server options with the custom SSE transport
	opts := []openapi2mcp.ServerOption{
		openapi2mcp.WithCustomTransport("sse", sseTransport),
	}

	// Create and start the server with the options
	server, err := openapi2mcp.NewServer(os.Args[1], opts...)
	if err != nil {
		log.Fatal(err)
	}

	if err := server.ServeStdio(context.Background()); err != nil {
		log.Fatal(err)
	}
}

Metrics and Monitoring

Implement metrics collection to monitor API usage:

Example: Basic Metrics Collection

package main

import (
	"context"
	"log"
	"net/http"
	"os"
	"sync"
	"time"

	"github.com/jedisct1/openapi-mcp/pkg/openapi2mcp"
)

func main() {
	// Metrics data structure
	type metrics struct {
		requestCount      map[string]int
		responseTimeTotal map[string]time.Duration
		errorCount        map[string]int
		mu                sync.Mutex
	}

	m := &metrics{
		requestCount:      make(map[string]int),
		responseTimeTotal: make(map[string]time.Duration),
		errorCount:        make(map[string]int),
	}

	// Print metrics every 60 seconds
	go func() {
		ticker := time.NewTicker(60 * time.Second)
		defer ticker.Stop()

		for range ticker.C {
			m.mu.Lock()
			log.Println("API Metrics:")
			for endpoint, count := range m.requestCount {
				avgTime := time.Duration(0)
				if count > 0 {
					avgTime = m.responseTimeTotal[endpoint] / time.Duration(count)
				}
				log.Printf("  %s: %d requests, %v avg response time, %d errors\n",
					endpoint, count, avgTime, m.errorCount[endpoint])
			}
			m.mu.Unlock()
		}
	}()

	// Custom HTTP transport wrapper for metrics collection
	httpTransportWrapper := func(next openapi2mcp.HTTPTransport) openapi2mcp.HTTPTransport {
		return func(ctx context.Context, req *http.Request) (*http.Response, error) {
			endpoint := req.URL.Path
			startTime := time.Now()

			// Make the request
			resp, err := next(ctx, req)

			// Record metrics
			duration := time.Since(startTime)
			m.mu.Lock()
			m.requestCount[endpoint]++
			m.responseTimeTotal[endpoint] += duration
			if err != nil || (resp != nil && resp.StatusCode >= 400) {
				m.errorCount[endpoint]++
			}
			m.mu.Unlock()

			return resp, err
		}
	}

	// Create server options with the transport wrapper
	opts := []openapi2mcp.ServerOption{
		openapi2mcp.WithHTTPTransportWrapper(httpTransportWrapper),
	}

	// Create and start the server with the options
	server, err := openapi2mcp.NewServer(os.Args[1], opts...)
	if err != nil {
		log.Fatal(err)
	}

	if err := server.ServeStdio(context.Background()); err != nil {
		log.Fatal(err)
	}
}

Context Propagation

Pass context values through the request chain for advanced use cases:

Example: Request Tracing

package main

import (
	"context"
	"log"
	"net/http"
	"os"

	"github.com/google/uuid"
	"github.com/jedisct1/openapi-mcp/pkg/openapi2mcp"
)

// Define context keys
type contextKey string
const traceIDKey contextKey = "trace-id"

func main() {
	// Middleware to add trace ID to context
	traceMiddleware := func(next openapi2mcp.MCPHandler) openapi2mcp.MCPHandler {
		return func(ctx context.Context, opID string, params []byte) (interface{}, error) {
			// Generate a trace ID if not present
			traceID, ok := ctx.Value(traceIDKey).(string)
			if !ok {
				traceID = uuid.New().String()
				ctx = context.WithValue(ctx, traceIDKey, traceID)
			}
			
			log.Printf("[TRACE %s] Operation: %s started", traceID, opID)
			
			// Call the next handler
			start := time.Now()
			result, err := next(ctx, opID, params)
			duration := time.Since(start)
			
			if err != nil {
				log.Printf("[TRACE %s] Operation: %s failed after %v: %v", traceID, opID, duration, err)
			} else {
				log.Printf("[TRACE %s] Operation: %s completed in %v", traceID, opID, duration)
			}
			
			return result, err
		}
	}

	// HTTP header provider to propagate trace ID
	httpHeaderProvider := func(ctx context.Context) (map[string]string, error) {
		headers := make(map[string]string)
		
		// Add trace ID to request headers if present in context
		if traceID, ok := ctx.Value(traceIDKey).(string); ok {
			headers["X-Trace-ID"] = traceID
		}
		
		return headers, nil
	}

	// Create server options with the middleware and header provider
	opts := []openapi2mcp.ServerOption{
		openapi2mcp.WithMiddleware(traceMiddleware),
		openapi2mcp.WithHTTPHeaderProvider(httpHeaderProvider),
	}

	// Create and start the server with the options
	server, err := openapi2mcp.NewServer(os.Args[1], opts...)
	if err != nil {
		log.Fatal(err)
	}

	if err := server.ServeStdio(context.Background()); err != nil {
		log.Fatal(err)
	}
}

Optimizing for Large APIs

When working with very large OpenAPI specifications, consider these optimization techniques:

  • Tool Filtering: Use the --tag, --include-desc-regex, and --exclude-desc-regex flags to reduce the number of tools exposed.
  • Lazy Loading: Implement lazy loading of schemas to reduce startup time.
  • Schema Simplification: Preprocess the OpenAPI spec to simplify complex schemas.
  • Response Streaming: Use streaming responses for large payloads.

Example: Optimized Tool Loading

package main

import (
	"context"
	"log"
	"os"
	"strings"

	"github.com/jedisct1/openapi-mcp/pkg/openapi2mcp"
)

func main() {
	// Custom tool filter
	toolFilter := func(operationID, path, method string, tags []string) bool {
		// Skip internal operations
		if strings.HasPrefix(operationID, "internal") {
			return false
		}
		
		// Only include specific API versions
		if strings.Contains(path, "/v1/") || strings.Contains(path, "/v2/") {
			return true
		}
		
		// Skip deprecated operations
		for _, tag := range tags {
			if tag == "deprecated" {
				return false
			}
		}
		
		return true
	}

	// Create server options with the tool filter
	opts := []openapi2mcp.ServerOption{
		openapi2mcp.WithToolFilter(toolFilter),
	}

	// Create and start the server with the options
	server, err := openapi2mcp.NewServer(os.Args[1], opts...)
	if err != nil {
		log.Fatal(err)
	}

	if err := server.ServeStdio(context.Background()); err != nil {
		log.Fatal(err)
	}
}

Custom API Key Management

Implement more advanced API key management for multiple services:

Example: Service-Specific API Keys

package main

import (
	"context"
	"fmt"
	"log"
	"net/http"
	"os"
	"strings"

	"github.com/jedisct1/openapi-mcp/pkg/openapi2mcp"
)

func main() {
	// Load API keys from environment variables
	apiKeys := make(map[string]string)
	for _, env := range os.Environ() {
		if strings.HasPrefix(env, "API_KEY_") {
			parts := strings.SplitN(env, "=", 2)
			if len(parts) == 2 {
				service := strings.TrimPrefix(parts[0], "API_KEY_")
				apiKeys[strings.ToLower(service)] = parts[1]
			}
		}
	}

	// Custom HTTP header provider for service-specific API keys
	httpHeaderProvider := func(ctx context.Context) (map[string]string, error) {
		headers := make(map[string]string)
		
		// Get the operation ID from context
		opID, ok := ctx.Value(openapi2mcp.OpIDContextKey).(string)
		if !ok {
			return headers, nil
		}
		
		// Determine which service this operation belongs to
		var service string
		if strings.HasPrefix(opID, "payments") {
			service = "payment"
		} else if strings.HasPrefix(opID, "users") {
			service = "user"
		} else if strings.HasPrefix(opID, "analytics") {
			service = "analytics"
		} else {
			service = "default"
		}
		
		// Add the appropriate API key
		if apiKey, exists := apiKeys[service]; exists {
			headers["X-API-Key"] = apiKey
		} else if apiKey, exists := apiKeys["default"]; exists {
			headers["X-API-Key"] = apiKey
		} else {
			return nil, fmt.Errorf("no API key found for service: %s", service)
		}
		
		return headers, nil
	}

	// Create server options with the header provider
	opts := []openapi2mcp.ServerOption{
		openapi2mcp.WithHTTPHeaderProvider(httpHeaderProvider),
	}

	// Create and start the server with the options
	server, err := openapi2mcp.NewServer(os.Args[1], opts...)
	if err != nil {
		log.Fatal(err)
	}

	if err := server.ServeStdio(context.Background()); err != nil {
		log.Fatal(err)
	}
}

Next Steps

Now that you've explored advanced usage patterns, you can: