Advanced Usage
This guide covers advanced usage patterns and techniques for openapi-mcp, including extending functionality, customizing behavior, and optimizing performance.
Custom Response Processing
You can use the --post-hook-cmd flag to process responses with custom scripts or tools. This is useful for:
- Transforming API responses into different formats
- Filtering sensitive information from responses
- Adding additional context or metadata to responses
- Custom logging or analytics
Example: Response Transformation
bin/openapi-mcp --post-hook-cmd='./transform.sh' examples/api-spec.yaml
Where transform.sh could be a script that processes the JSON responses and applies custom transformations.
Combining Multiple OpenAPI Specs
You can combine multiple OpenAPI specifications by first merging them and then using the merged spec with openapi-mcp:
Step 1: Merge Specs
Use a tool like swagger-merger or openapi-merge to combine multiple specs:
npx openapi-merge-cli --config merge-config.json
Where merge-config.json contains:
{
"inputs": [
{ "inputFile": "./specs/api1.yaml" },
{ "inputFile": "./specs/api2.yaml" }
],
"output": "./merged-api.yaml"
}
Step 2: Use the Merged Spec
bin/openapi-mcp ./merged-api.yaml
Custom Authentication Flows
For APIs that require complex authentication flows (OAuth 2.0, etc.), you can implement custom logic:
OAuth 2.0 with Token Refresh
package main
import (
"context"
"log"
"os"
"time"
"github.com/jedisct1/openapi-mcp/pkg/openapi2mcp"
)
func main() {
// Create a token provider that handles OAuth token refresh
tokenProvider := func(ctx context.Context) (string, error) {
// Implement OAuth token acquisition/refresh logic here
// This could make a call to an OAuth token endpoint
return "refreshed-oauth-token", nil
}
// Set up custom HTTP headers for each request
httpHeaderProvider := func(ctx context.Context) (map[string]string, error) {
token, err := tokenProvider(ctx)
if err != nil {
return nil, err
}
return map[string]string{
"Authorization": "Bearer " + token,
}, nil
}
// Create server options with the custom header provider
opts := []openapi2mcp.ServerOption{
openapi2mcp.WithHTTPHeaderProvider(httpHeaderProvider),
}
// Create and start the server with the options
server, err := openapi2mcp.NewServer(os.Args[1], opts...)
if err != nil {
log.Fatal(err)
}
if err := server.ServeStdio(context.Background()); err != nil {
log.Fatal(err)
}
}
Custom Error Handling
You can implement custom error handling to provide more context or alternative actions:
Example: Custom Error Handler
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"github.com/jedisct1/openapi-mcp/pkg/openapi2mcp"
)
func main() {
// Custom error handler function
errorHandler := func(ctx context.Context, req *http.Request, err error) (interface{}, error) {
if err != nil {
// Log the error
log.Printf("API Error: %v", err)
// Provide a user-friendly response
return map[string]interface{}{
"error": "An error occurred while processing your request.",
"suggestion": "Please check your parameters and try again.",
"details": fmt.Sprintf("%v", err),
}, nil
}
return nil, err
}
// Create server options with the custom error handler
opts := []openapi2mcp.ServerOption{
openapi2mcp.WithErrorHandler(errorHandler),
}
// Create and start the server with the options
server, err := openapi2mcp.NewServer(os.Args[1], opts...)
if err != nil {
log.Fatal(err)
}
if err := server.ServeStdio(context.Background()); err != nil {
log.Fatal(err)
}
}
Response Caching
For performance optimization, you can implement response caching for frequently accessed endpoints:
Example: Simple Cache Implementation
package main
import (
"context"
"log"
"net/http"
"os"
"sync"
"time"
"github.com/jedisct1/openapi-mcp/pkg/openapi2mcp"
)
type cacheEntry struct {
response interface{}
expiry time.Time
}
func main() {
// Create a simple in-memory cache
cache := make(map[string]cacheEntry)
cacheMutex := &sync.RWMutex{}
cacheExpiry := 5 * time.Minute
// Custom HTTP transport wrapper for caching
httpTransportWrapper := func(next openapi2mcp.HTTPTransport) openapi2mcp.HTTPTransport {
return func(ctx context.Context, req *http.Request) (*http.Response, error) {
// Only cache GET requests
if req.Method != http.MethodGet {
return next(ctx, req)
}
// Check if we have a cached response
cacheKey := req.URL.String()
cacheMutex.RLock()
entry, exists := cache[cacheKey]
cacheMutex.RUnlock()
// If we have a valid cache entry, return it
if exists && time.Now().Before(entry.expiry) {
// Create a response from the cached data
resp := &http.Response{
StatusCode: http.StatusOK,
Header: make(http.Header),
}
resp.Header.Set("X-Cache", "HIT")
// Convert cached response to http.Response...
return resp, nil
}
// Otherwise, call the API
resp, err := next(ctx, req)
if err != nil {
return nil, err
}
// Cache the response if successful
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
// Extract response body...
cacheMutex.Lock()
cache[cacheKey] = cacheEntry{
response: resp,
expiry: time.Now().Add(cacheExpiry),
}
cacheMutex.Unlock()
}
return resp, nil
}
}
// Create server options with the transport wrapper
opts := []openapi2mcp.ServerOption{
openapi2mcp.WithHTTPTransportWrapper(httpTransportWrapper),
}
// Create and start the server with the options
server, err := openapi2mcp.NewServer(os.Args[1], opts...)
if err != nil {
log.Fatal(err)
}
if err := server.ServeStdio(context.Background()); err != nil {
log.Fatal(err)
}
}
Custom Parameter Validation
You can implement custom parameter validation logic beyond what's defined in the OpenAPI spec:
Example: Advanced Parameter Validation
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"regexp"
"github.com/jedisct1/openapi-mcp/pkg/openapi2mcp"
)
func main() {
// Custom parameter validator
paramValidator := func(ctx context.Context, opID string, params json.RawMessage) error {
// Parse the parameters
var paramsMap map[string]interface{}
if err := json.Unmarshal(params, ¶msMap); err != nil {
return err
}
// Example: Validate email format
if email, ok := paramsMap["email"].(string); ok {
emailRegex := regexp.MustCompile(`^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$`)
if !emailRegex.MatchString(email) {
return fmt.Errorf("invalid email format: %s", email)
}
}
// Example: Check for profanity in text fields
if text, ok := paramsMap["description"].(string); ok {
profanityList := []string{"badword1", "badword2", "badword3"}
for _, word := range profanityList {
if regexp.MustCompile(fmt.Sprintf(`(?i)\b%s\b`, word)).MatchString(text) {
return fmt.Errorf("description contains inappropriate language")
}
}
}
return nil
}
// Create server options with the parameter validator
opts := []openapi2mcp.ServerOption{
openapi2mcp.WithParameterValidator(paramValidator),
}
// Create and start the server with the options
server, err := openapi2mcp.NewServer(os.Args[1], opts...)
if err != nil {
log.Fatal(err)
}
if err := server.ServeStdio(context.Background()); err != nil {
log.Fatal(err)
}
}
Request Rate Limiting
To protect backend APIs, you can implement rate limiting:
Example: Simple Rate Limiting
package main
import (
"context"
"log"
"net/http"
"os"
"sync"
"time"
"github.com/jedisct1/openapi-mcp/pkg/openapi2mcp"
)
func main() {
// Rate limiter setup
type rateLimiter struct {
lastRequest time.Time
tokens int
mu sync.Mutex
}
limiter := &rateLimiter{
tokens: 10, // Allow 10 requests initially
}
// Refill rate: 1 token per second
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for range ticker.C {
limiter.mu.Lock()
if limiter.tokens < 10 {
limiter.tokens++
}
limiter.mu.Unlock()
}
}()
// Custom HTTP transport wrapper for rate limiting
httpTransportWrapper := func(next openapi2mcp.HTTPTransport) openapi2mcp.HTTPTransport {
return func(ctx context.Context, req *http.Request) (*http.Response, error) {
limiter.mu.Lock()
if limiter.tokens > 0 {
limiter.tokens--
limiter.lastRequest = time.Now()
limiter.mu.Unlock()
return next(ctx, req)
}
limiter.mu.Unlock()
// Return a rate limit error
return &http.Response{
StatusCode: http.StatusTooManyRequests,
Header: make(http.Header),
Body: http.NoBody,
}, nil
}
}
// Create server options with the transport wrapper
opts := []openapi2mcp.ServerOption{
openapi2mcp.WithHTTPTransportWrapper(httpTransportWrapper),
}
// Create and start the server with the options
server, err := openapi2mcp.NewServer(os.Args[1], opts...)
if err != nil {
log.Fatal(err)
}
if err := server.ServeStdio(context.Background()); err != nil {
log.Fatal(err)
}
}
Webhooks and Server-Sent Events (SSE)
For APIs that support webhooks or Server-Sent Events, you can implement custom handling:
Example: SSE Event Handling
package main
import (
"bufio"
"context"
"fmt"
"log"
"net/http"
"os"
"strings"
"github.com/jedisct1/openapi-mcp/pkg/openapi2mcp"
)
func main() {
// Custom HTTP transport for SSE
sseTransport := func(ctx context.Context, req *http.Request) (*http.Response, error) {
// Configure the request for SSE
req.Header.Set("Accept", "text/event-stream")
req.Header.Set("Cache-Control", "no-cache")
req.Header.Set("Connection", "keep-alive")
// Make the request
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
// Process SSE events
go func() {
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "data:") {
eventData := strings.TrimPrefix(line, "data:")
fmt.Printf("Received SSE event: %s\n", eventData)
// Process the event data
// ...
}
}
if err := scanner.Err(); err != nil {
log.Printf("SSE scanner error: %v", err)
}
}()
return resp, nil
}
// Create server options with the custom SSE transport
opts := []openapi2mcp.ServerOption{
openapi2mcp.WithCustomTransport("sse", sseTransport),
}
// Create and start the server with the options
server, err := openapi2mcp.NewServer(os.Args[1], opts...)
if err != nil {
log.Fatal(err)
}
if err := server.ServeStdio(context.Background()); err != nil {
log.Fatal(err)
}
}
Metrics and Monitoring
Implement metrics collection to monitor API usage:
Example: Basic Metrics Collection
package main
import (
"context"
"log"
"net/http"
"os"
"sync"
"time"
"github.com/jedisct1/openapi-mcp/pkg/openapi2mcp"
)
func main() {
// Metrics data structure
type metrics struct {
requestCount map[string]int
responseTimeTotal map[string]time.Duration
errorCount map[string]int
mu sync.Mutex
}
m := &metrics{
requestCount: make(map[string]int),
responseTimeTotal: make(map[string]time.Duration),
errorCount: make(map[string]int),
}
// Print metrics every 60 seconds
go func() {
ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()
for range ticker.C {
m.mu.Lock()
log.Println("API Metrics:")
for endpoint, count := range m.requestCount {
avgTime := time.Duration(0)
if count > 0 {
avgTime = m.responseTimeTotal[endpoint] / time.Duration(count)
}
log.Printf(" %s: %d requests, %v avg response time, %d errors\n",
endpoint, count, avgTime, m.errorCount[endpoint])
}
m.mu.Unlock()
}
}()
// Custom HTTP transport wrapper for metrics collection
httpTransportWrapper := func(next openapi2mcp.HTTPTransport) openapi2mcp.HTTPTransport {
return func(ctx context.Context, req *http.Request) (*http.Response, error) {
endpoint := req.URL.Path
startTime := time.Now()
// Make the request
resp, err := next(ctx, req)
// Record metrics
duration := time.Since(startTime)
m.mu.Lock()
m.requestCount[endpoint]++
m.responseTimeTotal[endpoint] += duration
if err != nil || (resp != nil && resp.StatusCode >= 400) {
m.errorCount[endpoint]++
}
m.mu.Unlock()
return resp, err
}
}
// Create server options with the transport wrapper
opts := []openapi2mcp.ServerOption{
openapi2mcp.WithHTTPTransportWrapper(httpTransportWrapper),
}
// Create and start the server with the options
server, err := openapi2mcp.NewServer(os.Args[1], opts...)
if err != nil {
log.Fatal(err)
}
if err := server.ServeStdio(context.Background()); err != nil {
log.Fatal(err)
}
}
Context Propagation
Pass context values through the request chain for advanced use cases:
Example: Request Tracing
package main
import (
"context"
"log"
"net/http"
"os"
"github.com/google/uuid"
"github.com/jedisct1/openapi-mcp/pkg/openapi2mcp"
)
// Define context keys
type contextKey string
const traceIDKey contextKey = "trace-id"
func main() {
// Middleware to add trace ID to context
traceMiddleware := func(next openapi2mcp.MCPHandler) openapi2mcp.MCPHandler {
return func(ctx context.Context, opID string, params []byte) (interface{}, error) {
// Generate a trace ID if not present
traceID, ok := ctx.Value(traceIDKey).(string)
if !ok {
traceID = uuid.New().String()
ctx = context.WithValue(ctx, traceIDKey, traceID)
}
log.Printf("[TRACE %s] Operation: %s started", traceID, opID)
// Call the next handler
start := time.Now()
result, err := next(ctx, opID, params)
duration := time.Since(start)
if err != nil {
log.Printf("[TRACE %s] Operation: %s failed after %v: %v", traceID, opID, duration, err)
} else {
log.Printf("[TRACE %s] Operation: %s completed in %v", traceID, opID, duration)
}
return result, err
}
}
// HTTP header provider to propagate trace ID
httpHeaderProvider := func(ctx context.Context) (map[string]string, error) {
headers := make(map[string]string)
// Add trace ID to request headers if present in context
if traceID, ok := ctx.Value(traceIDKey).(string); ok {
headers["X-Trace-ID"] = traceID
}
return headers, nil
}
// Create server options with the middleware and header provider
opts := []openapi2mcp.ServerOption{
openapi2mcp.WithMiddleware(traceMiddleware),
openapi2mcp.WithHTTPHeaderProvider(httpHeaderProvider),
}
// Create and start the server with the options
server, err := openapi2mcp.NewServer(os.Args[1], opts...)
if err != nil {
log.Fatal(err)
}
if err := server.ServeStdio(context.Background()); err != nil {
log.Fatal(err)
}
}
Optimizing for Large APIs
When working with very large OpenAPI specifications, consider these optimization techniques:
- Tool Filtering: Use the
--tag,--include-desc-regex, and--exclude-desc-regexflags to reduce the number of tools exposed. - Lazy Loading: Implement lazy loading of schemas to reduce startup time.
- Schema Simplification: Preprocess the OpenAPI spec to simplify complex schemas.
- Response Streaming: Use streaming responses for large payloads.
Example: Optimized Tool Loading
package main
import (
"context"
"log"
"os"
"strings"
"github.com/jedisct1/openapi-mcp/pkg/openapi2mcp"
)
func main() {
// Custom tool filter
toolFilter := func(operationID, path, method string, tags []string) bool {
// Skip internal operations
if strings.HasPrefix(operationID, "internal") {
return false
}
// Only include specific API versions
if strings.Contains(path, "/v1/") || strings.Contains(path, "/v2/") {
return true
}
// Skip deprecated operations
for _, tag := range tags {
if tag == "deprecated" {
return false
}
}
return true
}
// Create server options with the tool filter
opts := []openapi2mcp.ServerOption{
openapi2mcp.WithToolFilter(toolFilter),
}
// Create and start the server with the options
server, err := openapi2mcp.NewServer(os.Args[1], opts...)
if err != nil {
log.Fatal(err)
}
if err := server.ServeStdio(context.Background()); err != nil {
log.Fatal(err)
}
}
Custom API Key Management
Implement more advanced API key management for multiple services:
Example: Service-Specific API Keys
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"strings"
"github.com/jedisct1/openapi-mcp/pkg/openapi2mcp"
)
func main() {
// Load API keys from environment variables
apiKeys := make(map[string]string)
for _, env := range os.Environ() {
if strings.HasPrefix(env, "API_KEY_") {
parts := strings.SplitN(env, "=", 2)
if len(parts) == 2 {
service := strings.TrimPrefix(parts[0], "API_KEY_")
apiKeys[strings.ToLower(service)] = parts[1]
}
}
}
// Custom HTTP header provider for service-specific API keys
httpHeaderProvider := func(ctx context.Context) (map[string]string, error) {
headers := make(map[string]string)
// Get the operation ID from context
opID, ok := ctx.Value(openapi2mcp.OpIDContextKey).(string)
if !ok {
return headers, nil
}
// Determine which service this operation belongs to
var service string
if strings.HasPrefix(opID, "payments") {
service = "payment"
} else if strings.HasPrefix(opID, "users") {
service = "user"
} else if strings.HasPrefix(opID, "analytics") {
service = "analytics"
} else {
service = "default"
}
// Add the appropriate API key
if apiKey, exists := apiKeys[service]; exists {
headers["X-API-Key"] = apiKey
} else if apiKey, exists := apiKeys["default"]; exists {
headers["X-API-Key"] = apiKey
} else {
return nil, fmt.Errorf("no API key found for service: %s", service)
}
return headers, nil
}
// Create server options with the header provider
opts := []openapi2mcp.ServerOption{
openapi2mcp.WithHTTPHeaderProvider(httpHeaderProvider),
}
// Create and start the server with the options
server, err := openapi2mcp.NewServer(os.Args[1], opts...)
if err != nil {
log.Fatal(err)
}
if err := server.ServeStdio(context.Background()); err != nil {
log.Fatal(err)
}
}
Next Steps
Now that you've explored advanced usage patterns, you can:
- Check out the examples directory for more implementation ideas
- Review the API reference for all available options
- Contribute improvements or bug fixes to the GitHub repository