PADISO.ai: AI Agent Orchestration Platform - Launching May 2026
Back to Blog
Guide 26 mins

Building an MCP Server for Snowflake: D23.io Reference Implementation

Production-ready guide to building MCP servers for Snowflake with row-level security, query budgets, and dbt metrics. Learn from D23.io implementation.

The PADISO Team ·2026-05-11

Table of Contents

  1. Why MCP Servers Matter for Snowflake
  2. Understanding the D23.io Reference Implementation
  3. Architecture and Security Design
  4. Building Row-Level Security Into Your MCP Server
  5. Query Budget Management and Cost Control
  6. dbt Metric Grounding for AI Agents
  7. Authentication and Token Management
  8. Deployment and Production Hardening
  9. Monitoring, Debugging, and Observability
  10. Real-World Implementation Patterns
  11. Next Steps and Scaling Considerations

Why MCP Servers Matter for Snowflake

Snowflake has become the data warehouse of choice for mid-market and enterprise organisations across Australia and globally. Yet most teams still interact with it through legacy SQL clients, BI dashboards, or manual query execution. The gap between your data and your AI agents is real, and it’s expensive to close.

The Model Context Protocol (MCP) changes that equation. By building an MCP server for Snowflake, you give AI agents—like Claude—direct, governed access to your data warehouse. No middleware. No API layer bolted on top. Just secure, auditable, production-grade connections that respect your data governance rules.

The D23.io reference implementation proves this works at scale. Built for mid-market organisations running $500K+ annual Snowflake spend, it delivers:

  • Row-level security (RLS) that maps user identity to data access without reimplementing ACLs in application code
  • Query budgets that prevent runaway costs when agents execute complex analytical queries
  • dbt metric grounding that lets agents reason about business metrics with semantic consistency
  • Production-tested resilience across 50+ client deployments

For founders and operators at seed-to-Series-B startups, this means you can ship AI-powered analytics features in weeks, not quarters. For enterprise security leads, this means audit-ready compliance with SOC 2 and ISO 27001 frameworks baked in from day one.

If you’re modernising your data stack with agentic AI, or you need a fractional CTO to guide your platform engineering team, understanding this architecture is non-negotiable.


Understanding the D23.io Reference Implementation

What D23.io Solves

D23.io is an open-source MCP server purpose-built for Snowflake. Unlike generic database connectors, it’s engineered around three constraints that matter in production:

  1. Identity and access control must be enforced at query time, not at the API layer
  2. Costs must be predictable, even when agents run exploratory queries
  3. Metrics must be grounded in business semantics, not raw SQL

The implementation sits between your Claude agent (or any MCP client) and your Snowflake warehouse. It intercepts queries, applies security policies, enforces budgets, and enriches responses with dbt-defined metrics.

Core Architecture

The D23.io server is built on the Model Context Protocol (MCP) specification, which defines a standardised way for AI agents to discover and call tools. When you build an MCP server for Snowflake, you’re implementing a set of tools that Claude can invoke:

  • query_warehouse: Execute a SQL query with RLS and budget enforcement
  • list_tables: Discover available tables and schemas
  • get_table_schema: Inspect column names, types, and metadata
  • get_metric_definition: Retrieve dbt metric definitions and formulas
  • validate_query: Pre-flight check a query before execution

Each tool is stateless and auditable. Every invocation is logged with user identity, query text, execution time, and cost impact.

Why This Matters for Mid-Market Teams

At mid-market scale—say, 200+ employees, $10M+ ARR—your data governance requirements are real but not as rigid as enterprise. You need to prevent data leaks and cost overruns, but you also need velocity. The D23.io implementation balances both:

  • Security isn’t a gate, it’s a filter. Agents get results, but only for rows they’re authorised to see
  • Cost control is soft, not hard. You set budgets per user or team, and the server warns when approaching limits rather than hard-blocking
  • Metrics are semantic, not syntactic. Agents understand what a “monthly recurring revenue” metric means, not just the SQL that computes it

This is why the reference implementation has shipped at 50+ clients without a single audit failure or data breach.


Architecture and Security Design

The Security Model

Snowflake’s native RBAC (role-based access control) is powerful but static. It doesn’t scale to dynamic user cohorts or context-dependent access rules. The D23.io server adds a policy enforcement layer that sits above Snowflake’s native controls.

Here’s how it works:

Claude Agent

MCP Client (Claude Desktop, SDK)

D23.io MCP Server
    ├─ Authentication (OAuth, PAT, SAML)
    ├─ Policy Engine (RLS rules)
    ├─ Budget Checker (cost limits)
    ├─ Query Rewriter (row filtering)
    └─ Audit Logger (every action)

Snowflake Warehouse

When Claude sends a query like “Show me revenue by region,” the D23.io server:

  1. Authenticates the user (via OAuth token or service account credentials)
  2. Resolves their permissions (which regions can this user see?)
  3. Rewrites the query to add a WHERE clause filtering to permitted regions
  4. Checks the budget (will this query exceed daily/weekly limits?)
  5. Executes the rewritten query in Snowflake
  6. Logs the full transaction with user, query, cost, and timestamp
  7. Returns results to Claude with cost metadata

This design means:

  • No data leaks: Even if Claude’s prompt is compromised, it can’t see rows outside its permissions
  • No surprise bills: Query costs are tracked in real-time and can be capped
  • No audit gaps: Every query is logged with full context

For teams pursuing SOC 2 or ISO 27001 compliance, this architecture is audit-ready. You can demonstrate data access controls, cost tracking, and comprehensive logging—three of the seven SOC 2 trust service categories.

Authentication Patterns

The reference implementation supports multiple authentication schemes:

OAuth 2.0 (recommended for user-facing agents)

  • User logs in via your identity provider (Okta, Azure AD, Google)
  • Token is passed to the MCP server
  • Server validates token and maps to Snowflake role

Personal Access Tokens (PAT) (for service accounts and automation)

  • Long-lived token issued by your identity provider
  • Stored securely in environment variables or secret management systems
  • Rotated quarterly

Snowflake Native Authentication (for direct Snowflake accounts)

  • Username/password or key pair authentication
  • Mapped to Snowflake roles
  • Less flexible but simpler for small teams

For mid-market teams, OAuth + service account PAT is the standard pattern. It gives you user-level audit trails while supporting both interactive agents (Claude in a chat interface) and batch agents (scheduled queries).

Network and Transport Security

The D23.io server runs as a containerised service, typically deployed on your infrastructure (AWS ECS, Kubernetes, or managed container platforms). It communicates with Snowflake over HTTPS with certificate pinning.

Key security controls:

  • TLS 1.3 for all external connections
  • mTLS for internal service-to-service communication (if deployed in Kubernetes)
  • Rate limiting on the MCP endpoint (prevent query flooding)
  • Request signing for audit trail integrity

For teams managing SOC 2 compliance, this means you can document network segmentation, encryption in transit, and access controls in your audit workbook.


Building Row-Level Security Into Your MCP Server

The RLS Problem

Row-level security is the hardest part of data governance. Snowflake’s native RLS uses row access policies, but they’re static and database-centric. They don’t adapt to user context or external permission systems.

When you build an MCP server for Snowflake, you need to solve RLS at the application layer. The D23.io reference implementation does this by:

  1. Storing permission mappings in a metadata table
  2. Resolving permissions at query time based on user identity
  3. Rewriting queries to enforce filters
  4. Auditing every access decision

Permission Metadata Schema

Create a table in Snowflake to store permission mappings:

CREATE TABLE governance.user_permissions (
  user_id STRING,
  user_email STRING,
  table_name STRING,
  filter_column STRING,
  filter_values ARRAY,
  created_at TIMESTAMP,
  expires_at TIMESTAMP,
  PRIMARY KEY (user_id, table_name, filter_column)
);

Example row:

user_id: 'alice@company.com'
table_name: 'analytics.sales_transactions'
filter_column: 'region'
filter_values: ['APAC', 'EMEA']
expires_at: 2025-12-31

This says: “Alice can only see rows where region is APAC or EMEA.”

Query Rewriting Logic

When Claude asks “Show me total revenue,” the MCP server intercepts the query and rewrites it:

Original query:

SELECT region, SUM(revenue) as total_revenue
FROM analytics.sales_transactions
GROUP BY region;

Rewritten query (for Alice):

SELECT region, SUM(revenue) as total_revenue
FROM analytics.sales_transactions
WHERE region IN ('APAC', 'EMEA')
GROUP BY region;

The server:

  1. Parses the incoming query (using a SQL parser library)
  2. Identifies tables and columns
  3. Looks up permissions for the user
  4. Injects WHERE clauses to enforce filters
  5. Validates the rewritten query
  6. Executes it in Snowflake

This happens transparently. Claude doesn’t need to know about permissions—the server handles it.

Handling Multi-Table Joins

RLS gets complex with joins. If Alice can see APAC sales but only EU customers, how do you enforce both constraints?

The D23.io approach:

  1. Resolve permissions per table: Alice can see APAC in sales_transactions, EU in customers
  2. Apply filters to each table: WHERE sales_transactions.region IN (‘APAC’) AND customers.country IN (‘EU’)
  3. Let the join semantics handle the rest: The join naturally filters rows

For complex scenarios (e.g., “show me sales where the customer’s account manager is in my team”), you can define permission rules in code:

def get_user_permissions(user_id: str, table_name: str) -> Dict[str, List[str]]:
    if table_name == 'sales_transactions':
        # Get user's region from identity provider
        user_region = get_user_region(user_id)
        return {'region': [user_region]}
    elif table_name == 'customers':
        # Get user's customer list from CRM
        customers = get_user_customers(user_id)
        return {'customer_id': customers}
    return {}

This is where the Snowflake-managed MCP Server Documentation and the official MCP Server implementation become valuable—they show how to integrate custom permission logic into the server.

Testing RLS Enforcement

Before deploying to production, test RLS thoroughly:

  1. Create test users with different permission levels
  2. Run identical queries as each user
  3. Verify result sets differ based on permissions
  4. Check query logs to confirm filters were applied

Example test:

def test_rls_enforcement():
    # Alice can only see APAC
    alice_results = query_as_user('alice@company.com', 
        'SELECT COUNT(*) FROM sales_transactions')
    assert alice_results[0][0] < total_rows  # Filtered
    
    # Bob can see all regions
    bob_results = query_as_user('bob@company.com',
        'SELECT COUNT(*) FROM sales_transactions')
    assert bob_results[0][0] == total_rows  # Not filtered

This is non-negotiable. A single RLS bypass in production is a data breach.


Query Budget Management and Cost Control

Why Query Budgets Matter

Snowflake charges by compute (credits) and storage. When you give AI agents direct warehouse access, you risk runaway costs. A single badly-written query scanning terabytes can cost hundreds of dollars.

The D23.io server implements soft budget controls that track costs in real-time and warn users before they exceed limits.

Credit Estimation

Snowflake doesn’t expose real-time credit costs, but you can estimate them:

Estimated Credits = (Bytes Scanned / 1 GB) * Complexity Factor

Where:

  • Bytes Scanned: Snowflake reports this in query results
  • Complexity Factor: 1.0 for simple SELECT, 2.0+ for joins/aggregations

In practice, you track actual costs by:

  1. Running a query
  2. Checking QUERY_HISTORY in Snowflake for actual credits used
  3. Building a cost model: actual_credits = f(bytes_scanned, query_type, warehouse_size)

Budget Enforcement Logic

The MCP server maintains per-user budgets:

CREATE TABLE governance.user_budgets (
  user_id STRING,
  period STRING,  -- 'daily', 'weekly', 'monthly'
  budget_credits DECIMAL(10, 2),
  spent_credits DECIMAL(10, 2),
  reset_at TIMESTAMP,
  PRIMARY KEY (user_id, period)
);

Example: Alice has a daily budget of 100 credits.

When she runs a query:

  1. Estimate cost: Query will scan 50 GB → ~50 credits
  2. Check budget: 50 + 20 (already spent) = 70 credits < 100 limit ✓
  3. Execute query
  4. Record actual cost: Query actually used 48 credits
  5. Update spent_credits: 20 + 48 = 68 credits

If Alice’s next query would exceed the limit:

  1. Warn before execution: “This query will cost ~80 credits. Your daily budget is 100, and you’ve spent 68. Remaining: 32. Proceed? (y/n)”
  2. Log the warning: Audit trail shows cost awareness
  3. Execute only if confirmed: User takes responsibility for overages

For teams with strict cost controls, you can make budgets hard limits—queries are rejected if they would exceed the budget.

Cost Tracking and Reporting

Create a cost dashboard in your BI tool or Slack:

SELECT 
  user_id,
  DATE_TRUNC('day', query_time) as day,
  COUNT(*) as query_count,
  SUM(estimated_credits) as total_credits,
  MAX(estimated_credits) as max_query_cost,
  AVG(query_time_ms) as avg_duration_ms
FROM mcp_server.query_audit_log
GROUP BY user_id, day
ORDER BY day DESC, total_credits DESC;

This shows:

  • Which users are driving costs
  • Which queries are expensive
  • Cost trends over time

For mid-market teams running $500K+ annual Snowflake spend, this visibility is critical. It’s the difference between predictable costs and surprise $50K bills.

Optimisation Hints

When a query approaches budget limits, the server can suggest optimisations:

Query cost estimate: 95 credits
Your daily budget: 100 credits

Optimisation suggestions:
1. Add a date filter (last 30 days instead of all time) → estimated cost: 20 credits
2. Pre-aggregate in dbt instead of querying raw tables → estimated cost: 5 credits
3. Use a materialized view (refreshed hourly) → estimated cost: 1 credit

This requires maintaining a query cost model, but it pays for itself in the first week.


dbt Metric Grounding for AI Agents

The Metric Grounding Problem

When you ask Claude “What’s our revenue trend,” it needs to know:

  • Where is “revenue” defined?
  • Which tables does it depend on?
  • What’s the calculation logic?
  • What filters are applied (e.g., only paid customers)?

Without this grounding, Claude might:

  • Query the wrong table
  • Use an outdated formula
  • Include cancelled subscriptions in revenue
  • Double-count revenue from multi-year contracts

Metric grounding solves this by making dbt metrics available to the MCP server. When Claude asks for revenue, the server returns the dbt-defined formula, and Claude uses it.

dbt Metric Schema

In your dbt project, define metrics:

# models/metrics.yml
metrics:
  - name: monthly_recurring_revenue
    label: Monthly Recurring Revenue
    description: MRR from active subscriptions
    calculation_method: sum
    expression: subscription_value
    timestamp: subscription_start_date
    time_grains: [day, week, month, quarter, year]
    dimensions:
      - customer_segment
      - region
      - product_tier
    filters:
      - field: subscription_status
        value: 'active'
      - field: subscription_type
        value: 'recurring'
    meta:
      owner: finance_team
      refresh_frequency: daily
      sla_uptime: 99.5%

This tells the MCP server:

  • MRR is the SUM of subscription_value
  • Only include active, recurring subscriptions
  • Can be sliced by customer_segment, region, product_tier
  • Refreshed daily
  • Owned by the finance team

Exposing Metrics to the MCP Server

The D23.io server includes a tool to fetch metric definitions:

def get_metric_definition(metric_name: str) -> Dict:
    # Load dbt manifest
    manifest = load_dbt_manifest('target/manifest.json')
    
    # Find metric
    metric = manifest['metrics'].get(metric_name)
    if not metric:
        raise ValueError(f"Metric {metric_name} not found")
    
    return {
        'name': metric['name'],
        'label': metric['meta'].get('label'),
        'description': metric['description'],
        'calculation': metric['calculation_method'],
        'expression': metric['expression'],
        'table': metric['depends_on']['nodes'][0],  # source table
        'dimensions': metric['dimensions'],
        'filters': metric['filters'],
        'owner': metric['meta'].get('owner'),
        'refresh_frequency': metric['meta'].get('refresh_frequency'),
    }

When Claude asks “What’s our MRR,” the server:

  1. Calls get_metric_definition(‘monthly_recurring_revenue’)
  2. Returns the metric definition (formula, filters, dimensions)
  3. Claude uses the formula to construct the correct query
  4. Server executes the query with RLS and budget enforcement
  5. Returns results with metric metadata

Building Metric-Aware Agents

You can prime Claude with metric definitions in the system prompt:

You have access to the following metrics:

1. monthly_recurring_revenue (MRR)
   - Definition: SUM(subscription_value) WHERE subscription_status = 'active' AND subscription_type = 'recurring'
   - Dimensions: customer_segment, region, product_tier
   - Refresh: daily
   - Owner: finance_team

2. customer_acquisition_cost (CAC)
   - Definition: SUM(marketing_spend) / COUNT(DISTINCT new_customers)
   - Dimensions: channel, campaign, cohort_month
   - Refresh: weekly
   - Owner: growth_team

When asked about revenue, use the monthly_recurring_revenue metric. When asked about CAC, use the customer_acquisition_cost metric. Always include relevant dimensions in your queries.

With this context, Claude can reason about metrics semantically, not just as SQL.

Metric Freshness and SLAs

Metrics have freshness requirements. If MRR is refreshed daily but you’re querying it at 2 AM before the refresh, you’re using stale data.

The MCP server can track metric freshness:

def get_metric_freshness(metric_name: str) -> Dict:
    # Query dbt metadata
    last_run = get_dbt_last_run(metric_name)
    refresh_frequency = get_metric_config(metric_name)['refresh_frequency']
    
    age_hours = (datetime.now() - last_run).total_seconds() / 3600
    is_fresh = age_hours < parse_frequency_to_hours(refresh_frequency)
    
    return {
        'metric': metric_name,
        'last_refreshed': last_run,
        'age_hours': age_hours,
        'refresh_frequency': refresh_frequency,
        'is_fresh': is_fresh,
        'next_refresh': last_run + timedelta(hours=parse_frequency_to_hours(refresh_frequency))
    }

When Claude queries a metric, the server can warn:

Metric: monthly_recurring_revenue
Last refreshed: 18 hours ago
Refresh frequency: daily (24 hours)
Status: ✓ Fresh

Metric: customer_acquisition_cost
Last refreshed: 8 days ago
Refresh frequency: weekly (7 days)
Status: ⚠ Stale (refresh in progress)

This prevents Claude from making decisions based on outdated data.


Authentication and Token Management

OAuth 2.0 Integration

For user-facing agents (Claude in a chat interface), OAuth 2.0 is the standard. The flow:

  1. User clicks “Connect to Snowflake” in your application
  2. Redirects to identity provider (Okta, Azure AD, Google)
  3. User logs in
  4. Identity provider returns an access token
  5. Application stores token securely (encrypted in database, not in browser localStorage)
  6. MCP server receives token when Claude needs to query
  7. Server validates token with identity provider
  8. Server maps token to Snowflake role
  9. Server executes query as that Snowflake role

Implementation:

from authlib.integrations.flask_client import OAuth

oauth = OAuth()
oauth.register(
    name='okta',
    client_id='YOUR_CLIENT_ID',
    client_secret='YOUR_CLIENT_SECRET',
    server_metadata_url='https://your-org.okta.com/.well-known/openid-configuration',
    client_kwargs={'scope': 'openid profile email'}
)

@app.route('/callback')
def oauth_callback():
    token = oauth.okta.authorize_access_token()
    user = oauth.okta.parse_id_token(token)
    
    # Store token securely
    db.store_user_token(user['sub'], token)
    
    return redirect('/dashboard')

@app.route('/api/query', methods=['POST'])
def execute_query():
    user_id = get_current_user()
    token = db.get_user_token(user_id)
    
    # Validate token with identity provider
    user_info = oauth.okta.get('/userinfo', token=token).json()
    
    # Map to Snowflake role
    sf_role = map_user_to_role(user_info['email'])
    
    # Execute query as role
    results = snowflake_query(request.json['query'], role=sf_role)
    
    return jsonify(results)

Service Account Authentication

For scheduled agents or batch queries, use service account tokens:

from google.oauth2 import service_account

# Load service account credentials
credentials = service_account.Credentials.from_service_account_file(
    'service-account-key.json',
    scopes=['https://www.googleapis.com/auth/cloud-platform']
)

# Get access token
token = credentials.token

# Use token in MCP server
mcp_server = MCPServer(token=token, snowflake_role='SERVICE_ACCOUNT')

Store service account keys in a secrets manager (AWS Secrets Manager, HashiCorp Vault, or Snowflake Native App Secrets):

aws secretsmanager create-secret \
  --name mcp-service-account \
  --secret-string file://service-account-key.json

Token Rotation and Expiry

Tokens expire. The MCP server must handle token refresh:

def get_valid_token(user_id: str) -> str:
    token = db.get_user_token(user_id)
    
    # Check if token expires in next 5 minutes
    if token['expires_at'] < datetime.now() + timedelta(minutes=5):
        # Refresh token
        new_token = oauth.okta.fetch_token(
            url='https://your-org.okta.com/oauth2/v1/token',
            grant_type='refresh_token',
            refresh_token=token['refresh_token']
        )
        
        # Store new token
        db.store_user_token(user_id, new_token)
        return new_token['access_token']
    
    return token['access_token']

For Snowflake-specific authentication, rotate key pairs quarterly:

def rotate_snowflake_key_pair():
    # Generate new key pair
    private_key, public_key = generate_rsa_key_pair()
    
    # Update Snowflake user
    snowflake.execute(f"""
        ALTER USER service_account SET RSA_PUBLIC_KEY = '{public_key}';
    """)
    
    # Store new private key in secrets manager
    secrets_manager.put_secret('snowflake-private-key', private_key)
    
    # Log rotation
    audit_log.record('key_rotation', user='service_account', timestamp=datetime.now())

For teams pursuing SOC 2 compliance, token rotation is a control requirement. Document the rotation schedule in your audit workbook.


Deployment and Production Hardening

Container Deployment

The D23.io server runs as a Docker container:

FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY src/ .

# Security: run as non-root user
RUN useradd -m -u 1000 appuser
USER appuser

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
  CMD curl -f http://localhost:8000/health || exit 1

EXPOSE 8000

CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]

Deploy to AWS ECS or Kubernetes:

# ECS Task Definition
{
  "family": "mcp-server",
  "containerDefinitions": [
    {
      "name": "mcp-server",
      "image": "your-registry/mcp-server:latest",
      "portMappings": [{"containerPort": 8000}],
      "environment": [
        {"name": "LOG_LEVEL", "value": "INFO"},
        {"name": "ENVIRONMENT", "value": "production"}
      ],
      "secrets": [
        {"name": "SNOWFLAKE_ACCOUNT", "valueFrom": "arn:aws:secretsmanager:..."},
        {"name": "SNOWFLAKE_PRIVATE_KEY", "valueFrom": "arn:aws:secretsmanager:..."}
      ],
      "logConfiguration": {
        "logDriver": "awslogs",
        "options": {
          "awslogs-group": "/ecs/mcp-server",
          "awslogs-region": "us-east-1",
          "awslogs-stream-prefix": "ecs"
        }
      }
    }
  ]
}

Network Security

Deploy the MCP server in a private subnet with:

  1. Security group restricting inbound traffic to known clients (Claude Desktop, your application)
  2. NAT gateway for outbound Snowflake connections
  3. VPC endpoint for AWS services (Secrets Manager, CloudWatch)
# Terraform
resource "aws_security_group" "mcp_server" {
  name = "mcp-server-sg"
  vpc_id = aws_vpc.main.id

  # Allow inbound from application
  ingress {
    from_port = 8000
    to_port = 8000
    protocol = "tcp"
    security_groups = [aws_security_group.app.id]
  }

  # Allow outbound to Snowflake
  egress {
    from_port = 443
    to_port = 443
    protocol = "tcp"
    cidr_blocks = ["0.0.0.0/0"]  # Restrict to Snowflake IPs in production
  }
}

Resource Limits

Set CPU and memory limits to prevent resource exhaustion:

# Kubernetes
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mcp-server
spec:
  replicas: 3
  template:
    spec:
      containers:
      - name: mcp-server
        resources:
          requests:
            cpu: "500m"
            memory: "512Mi"
          limits:
            cpu: "2000m"
            memory: "2Gi"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 10
          periodSeconds: 30

For teams at mid-market scale, 3 replicas with auto-scaling based on CPU/memory is standard.

Secrets Management

Never hardcode credentials. Use a secrets manager:

import os
from aws_secretsmanager_caching import SecretCache

cache = SecretCache()

def get_snowflake_credentials():
    secret = cache.get_secret_string('mcp-snowflake-credentials')
    return json.loads(secret)

Rotate secrets quarterly and log all access:

def log_secret_access(secret_name: str, user: str):
    audit_log.record(
        event='secret_access',
        secret=secret_name,
        user=user,
        timestamp=datetime.now(),
        ip_address=get_client_ip()
    )

Disaster Recovery

Plan for failures:

  1. Database backups: Automated daily snapshots of governance tables (permissions, budgets, audit logs)
  2. Query cache: Cache frequent queries (MRR, CAC, etc.) to reduce Snowflake load
  3. Fallback mode: If MCP server is down, fall back to direct Snowflake connections (with warnings)
  4. Incident response: Document how to restart the server, restore from backups, and notify users

Example backup strategy:

#!/bin/bash
# Daily backup script
snowsql -c prod_account -q "CREATE OR REPLACE TABLE governance.permissions_backup AS SELECT * FROM governance.user_permissions;"
snowsql -c prod_account -q "CREATE OR REPLACE TABLE governance.budgets_backup AS SELECT * FROM governance.user_budgets;"
snowsql -c prod_account -q "CREATE OR REPLACE TABLE governance.audit_log_backup AS SELECT * FROM governance.audit_log;"

# Upload to S3
aws s3 cp - s3://mcp-backups/$(date +%Y-%m-%d).sql

Monitoring, Debugging, and Observability

Logging Strategy

Log every significant event:

import logging
import json
from datetime import datetime

logger = logging.getLogger(__name__)

def log_query_execution(user_id: str, query: str, status: str, duration_ms: int, cost_credits: float, error: str = None):
    log_entry = {
        'timestamp': datetime.utcnow().isoformat(),
        'event': 'query_execution',
        'user_id': user_id,
        'query_hash': hashlib.sha256(query.encode()).hexdigest(),
        'status': status,  # 'success', 'failed', 'blocked'
        'duration_ms': duration_ms,
        'cost_credits': cost_credits,
        'error': error,
        'trace_id': get_trace_id()
    }
    logger.info(json.dumps(log_entry))
    
    # Also write to audit table
    db.insert('governance.audit_log', log_entry)

Structured logging (JSON) makes it easy to search and analyse logs in CloudWatch or Datadog.

Metrics and Observability

Track key metrics:

from prometheus_client import Counter, Histogram, Gauge

# Counters
queries_total = Counter('mcp_queries_total', 'Total queries', ['status'])
auth_failures = Counter('mcp_auth_failures_total', 'Auth failures', ['reason'])

# Histograms
query_duration = Histogram('mcp_query_duration_seconds', 'Query duration')
query_cost = Histogram('mcp_query_cost_credits', 'Query cost')

# Gauges
active_connections = Gauge('mcp_active_connections', 'Active connections')
budget_utilisation = Gauge('mcp_budget_utilisation', 'Budget utilisation %', ['user_id'])

# Usage
queries_total.labels(status='success').inc()
query_duration.observe(0.45)  # 450ms
query_cost.observe(12.5)  # 12.5 credits

Expose metrics on /metrics endpoint for Prometheus scraping:

from prometheus_client import generate_latest

@app.get('/metrics')
def metrics():
    return generate_latest()

Alerting

Set up alerts for:

  1. High query costs: Alert if a single query exceeds 100 credits
  2. Auth failures: Alert if >5 auth failures in 5 minutes
  3. Slow queries: Alert if query duration > 30 seconds
  4. Budget overruns: Alert if user spends >80% of daily budget
  5. Service unavailability: Alert if health check fails

Example CloudWatch alarm:

import boto3

cloudwatch = boto3.client('cloudwatch')

cloudwatch.put_metric_alarm(
    AlarmName='mcp-high-query-cost',
    MetricName='mcp_query_cost_credits',
    Namespace='MCP',
    Statistic='Maximum',
    Period=300,  # 5 minutes
    EvaluationPeriods=1,
    Threshold=100,
    ComparisonOperator='GreaterThanThreshold',
    AlarmActions=['arn:sns:us-east-1:123456789:mcp-alerts']
)

Debugging Queries

When a query fails, provide detailed debugging info:

def execute_query_with_debugging(user_id: str, query: str) -> Dict:
    trace_id = generate_trace_id()
    
    try:
        # Parse query
        parsed = parse_sql(query)
        logger.info(f"[{trace_id}] Parsed query: {parsed}")
        
        # Resolve permissions
        permissions = get_user_permissions(user_id)
        logger.info(f"[{trace_id}] User permissions: {permissions}")
        
        # Rewrite query
        rewritten = rewrite_query(parsed, permissions)
        logger.info(f"[{trace_id}] Rewritten query: {rewritten}")
        
        # Check budget
        estimated_cost = estimate_query_cost(rewritten)
        logger.info(f"[{trace_id}] Estimated cost: {estimated_cost} credits")
        
        # Execute
        results = snowflake.execute(rewritten)
        logger.info(f"[{trace_id}] Query succeeded")
        
        return {
            'status': 'success',
            'results': results,
            'trace_id': trace_id
        }
    
    except Exception as e:
        logger.error(f"[{trace_id}] Query failed: {e}")
        return {
            'status': 'error',
            'error': str(e),
            'trace_id': trace_id,
            'debug_info': {
                'user_id': user_id,
                'original_query': query,
                'parsed_query': parsed,
                'user_permissions': permissions,
                'rewritten_query': rewritten
            }
        }

When Claude reports an issue, ask for the trace_id. You can then look up the full execution path in your logs.


Real-World Implementation Patterns

Pattern 1: Sales Analytics Agent

A sales team wants Claude to answer questions like “Which regions are underperforming?”

Setup:

  • Define metrics: monthly_recurring_revenue, customer_churn_rate, average_deal_size
  • Set permissions: Sales reps see only their region, sales managers see all regions
  • Set budgets: $50/day per rep, $500/day per manager

Claude’s interaction:

User: "Which regions have declining MRR?"

Claude: I'll query monthly_recurring_revenue by region for the last 3 months.

[Server executes]
SELECT region, month, monthly_recurring_revenue
FROM metrics.monthly_recurring_revenue
WHERE region IN ('APAC', 'EMEA')  -- User's permitted regions
AND month >= DATE_TRUNC('month', CURRENT_DATE - INTERVAL '3 months')
ORDER BY month DESC, monthly_recurring_revenue DESC;

Results:
APAC, 2024-10, $2.5M
APAC, 2024-09, $2.6M  (declining)
APAC, 2024-08, $2.7M
EMEA, 2024-10, $1.8M
EMEA, 2024-09, $1.7M
EMEA, 2024-08, $1.6M

Claude: APAC is declining month-over-month, down from $2.7M to $2.5M. EMEA is growing.

Cost: Query scans 500 MB → ~0.5 credits

Pattern 2: Finance Compliance Agent

A finance team needs to audit revenue recognition for SOC 2 compliance.

Setup:

  • Define metrics: deferred_revenue, recognized_revenue, revenue_by_contract_type
  • Set permissions: Finance team sees all contracts, auditors see only flagged contracts
  • Set budgets: $1000/day (high, because audits are expensive)

Claude’s interaction:

User: "Show me all contracts with revenue recognition anomalies"

Claude: I'll query recognized_revenue and deferred_revenue for contracts with >20% variance.

[Server executes]
SELECT 
  contract_id, 
  customer_name,
  deferred_revenue,
  recognized_revenue,
  ABS(deferred_revenue - LAG(deferred_revenue) OVER (PARTITION BY contract_id ORDER BY month)) / LAG(deferred_revenue) OVER (...) as variance
FROM metrics.revenue_by_contract_type
WHERE variance > 0.20
AND month = CURRENT_DATE
ORDER BY variance DESC;

Results:
[Shows 12 contracts with anomalies]

Claude: Found 12 contracts with >20% revenue recognition variance. These should be reviewed by the finance team.

Cost: Query scans 2 GB → ~2 credits

Pattern 3: Data Quality Monitoring Agent

A data engineering team wants Claude to monitor dbt test results.

Setup:

  • Store dbt test results in Snowflake
  • Define metrics: test_pass_rate, test_execution_time, data_freshness
  • Set permissions: Data engineers see all tests, data analysts see only tests for tables they use
  • Set budgets: $100/day

Claude’s interaction:

User: "Are there any failing dbt tests?"

Claude: I'll check dbt test results from the last 24 hours.

[Server executes]
SELECT 
  test_name,
  model_name,
  status,
  error_message,
  execution_time_seconds,
  executed_at
FROM dbt.test_results
WHERE executed_at >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
AND status = 'failed'
ORDER BY executed_at DESC;

Results:
test_name: not_null_orders.id
model_name: orders
status: failed
error_message: 3 rows with NULL id
executed_at: 2024-10-15 14:32:00

Claude: Found 1 failing test: orders.id has 3 NULL values. This should be investigated immediately.

Cost: Query scans 100 MB → ~0.1 credits


Next Steps and Scaling Considerations

Deploying Your First MCP Server

  1. Start small: Build a read-only MCP server for analytics queries. No writes, no mutations.
  2. Test RLS thoroughly: Write 50+ test cases covering edge cases.
  3. Set conservative budgets: Start with low per-user budgets and increase based on actual usage.
  4. Monitor for 2 weeks: Watch logs, metrics, and audit trails. Look for unexpected queries or costs.
  5. Gradually expand: Add more tables, metrics, and users.

Scaling to Enterprise

As you grow, you’ll hit scaling challenges:

Query Performance: Large joins and aggregations slow down. Solutions:

  • Pre-aggregate in dbt (materialized views, incremental models)
  • Add query caching (Redis, Snowflake query result cache)
  • Use Snowflake clustering keys on permission columns

Permission Complexity: Managing permissions for 1000+ users is hard. Solutions:

  • Integrate with identity provider (Okta, Azure AD) for group-based permissions
  • Use dynamic permission rules (e.g., “sales reps see their own region”)
  • Audit permissions quarterly

Cost Growth: As usage grows, Snowflake bills grow. Solutions:

  • Implement strict budget enforcement (hard limits, not soft warnings)
  • Build cost optimisation recommendations into Claude’s responses
  • Use Snowflake’s cost management tools (resource monitors, query acceleration)

Connecting to Claude via MCP

Once your MCP server is deployed, connect Claude Desktop:

  1. Install Claude for Desktop (macOS, Windows, or Linux)
  2. Edit claude_desktop_config.json:
{
  "mcpServers": {
    "snowflake": {
      "command": "python",
      "args": ["-m", "mcp_server.snowflake"],
      "env": {
        "SNOWFLAKE_ACCOUNT": "xy12345",
        "SNOWFLAKE_USER": "mcp_service_account",
        "SNOWFLAKE_PRIVATE_KEY_PATH": "/path/to/private/key.p8",
        "MCP_SERVER_URL": "https://mcp.your-company.com"
      }
    }
  }
}
  1. Restart Claude Desktop
  2. Start asking questions: Claude now has access to your Snowflake data

For detailed integration steps, refer to the Model Context Protocol (MCP) - Anthropic Documentation and the Build an MCP Server for Cortex Agents Guide.

Working with PADISO for Production Deployment

Building and maintaining an MCP server is complex. If you’re a founder or operator at a seed-to-Series-B startup, you might not have the engineering bandwidth. That’s where PADISO’s CTO as a Service comes in.

PADISO is a Sydney-based venture studio and AI digital agency that specialises in exactly this kind of work. We’ve shipped MCP servers, agentic AI systems, and custom platform engineering for 50+ clients. We can help you:

  • Design the architecture: Security, scalability, cost management
  • Build the MVP: Get your first MCP server live in 4 weeks
  • Harden for production: SOC 2 / ISO 27001 compliance, monitoring, disaster recovery
  • Scale it up: Add more tables, metrics, users, and features

If you’re modernising your data stack or need fractional CTO leadership, reach out to PADISO. We work with ambitious teams across Australia and globally.

Alternatively, if you’re an enterprise operator at a mid-market or enterprise company, PADISO can help you implement agentic AI across your organisation. We specialise in AI automation for supply chains, customer service, e-commerce, financial services, healthcare, and more. We also handle security audits for SOC 2 and ISO 27001 compliance via Vanta.

Open-Source Contributions

The D23.io reference implementation is open-source. If you build on it, contribute back:

  • Bug fixes
  • Performance improvements
  • New tools (e.g., dbt metadata queries, Snowflake admin commands)
  • Documentation and examples

Check out the Snowflake-Labs/mcp GitHub Repository to get started.

Further Reading

To deepen your understanding, explore:


Summary

Building an MCP server for Snowflake is a powerful way to give AI agents secure, governed access to your data warehouse. The D23.io reference implementation proves this works at mid-market scale, with 50+ clients shipping production systems.

The key principles:

  1. Security first: Row-level security, budget enforcement, and comprehensive audit logging
  2. Simplicity: Stateless tools, standard protocols, minimal infrastructure
  3. Observability: Log everything, monitor key metrics, debug with trace IDs
  4. Scalability: Start small, test thoroughly, scale gradually

If you’re a founder or operator looking to ship agentic AI at your startup, or an enterprise operator modernising your data stack, the patterns in this guide will accelerate your timeline. You can build and deploy an MCP server in 4 weeks, not 6 months.

For teams in Sydney or Australia, PADISO is here to help. We specialise in exactly this kind of work—venture studio partnerships for startups, fractional CTO leadership for scaling teams, and enterprise AI transformation for mid-market companies. We also handle AI adoption strategies, security audits for SOC 2 and ISO 27001 compliance, and AI agency services for enterprises, SMEs, and startups.

Ready to build? Start with the Snowflake-Labs/mcp GitHub Repository and the official Snowflake documentation. Deploy your first server this week. Scale it next week. Ship features with your agents the week after.

Let’s go.