Intermediate โฑ 120 min ๐Ÿ“‹ 14 Steps

Create a Defender XDR MCP Server

Build a Model Context Protocol server that integrates with Microsoft Defender XDR APIs, exposing incident management, threat hunting, and device isolation capabilities as MCP tools for AI-powered security operations.

๐Ÿ“‹ Overview

About This Lab

In this lab you will build a fully functional MCP server that integrates with Microsoft Defender XDR APIs. The server exposes incident management, advanced threat hunting, and device isolation capabilities as MCP tools. enabling AI clients to interact with your XDR environment programmatically for security operations.

๐Ÿข Enterprise Use Case

A SOC team wants their AI assistant to go beyond read-only queries and actively manage security incidents. Analysts need the AI to retrieve and update incidents, run KQL hunting queries across the XDR data lake, and isolate compromised devices. all through natural-language conversations.

By wrapping these Defender XDR capabilities in MCP tools, the team enables autonomous triage, faster mean-time-to-respond, and consistent incident handling across shifts.

๐ŸŽฏ What You Will Learn

  1. Review the Defender XDR API surface and permission model
  2. Configure Entra ID app registration with XDR scopes
  3. Build incident management tools (list, get, update, assign)
  4. Build advanced hunting tools that execute KQL queries
  5. Build device action tools (isolate, release, scan)
  6. Implement entity enrichment for users, IPs, and files
  7. Compose tools into logical groups with shared context
  8. Add rate limiting and retry logic for API calls
  9. Implement audit logging for all destructive actions
  10. Test the server end-to-end with the MCP Inspector

๐Ÿ”‘ Why This Matters

This lab extends MCP beyond read-only queries into active incident response. By exposing investigation and remediation actions as MCP tools, you enable AI agents to perform autonomous security operations. dramatically reducing response times and freeing analysts to focus on complex, high-judgment tasks.

โš™๏ธ Prerequisites

  • Completed Lab 01. working Sentinel MCP server with run_kql_query tool
  • Microsoft Defender XDR tenant. with active incidents and alerts for testing
  • Entra ID app registration. with SecurityIncident.ReadWrite.All, SecurityAlert.ReadWrite.All, and ThreatHunting.Read.All permissions
  • MCP SDK installed. Python mcp[cli] or Node.js @modelcontextprotocol/sdk
  • Microsoft Graph SDK. msgraph-sdk for Python or @microsoft/microsoft-graph-client for Node.js
๐Ÿ’ก Pro Tip: If you don’t have active incidents, you can generate test incidents by enabling Microsoft Defender XDR evaluation lab which provides simulated attack scenarios.

Step 1 ยท Review Defender XDR APIs

Before building the MCP server, review the Microsoft Graph Security API and map each endpoint to an MCP tool. Each API endpoint that provides value for AI-driven security operations should become an MCP tool with a clear name, description, and schema.

Key API Endpoints to Review

  1. Navigate to the Microsoft Graph Security API overview
  2. Review /security/incidents. list, get, and update incidents
  3. Review /security/alerts_v2. alert management and enrichment
  4. Review /security/runHuntingQuery. advanced hunting with KQL
  5. Review /security/tiIndicators. threat intelligence indicators
  6. Review device isolation APIs in MDE Machine Actions

Tool Mapping Table

# API Endpoint โ†’ MCP Tool Mapping
# Each Microsoft Graph Security API endpoint becomes an MCP tool
# Read-only endpoints = low-risk tools (auto-execute)
# Write endpoints = medium/high-risk tools (require confirmation)
#
# GET  /security/incidents           โ†’ list_incidents     (read-only)
# GET  /security/incidents/{id}      โ†’ get_incident       (read-only)
# PATCH /security/incidents/{id}     โ†’ update_incident    (state-changing!)
# POST /security/runHuntingQuery     โ†’ run_hunting_query  (read-only)
# POST /devices/{id}/isolate         โ†’ isolate_device     (destructive!)
# POST /devices/{id}/unisolate       โ†’ release_device     (state-changing)
# GET  /users/{id}                   โ†’ enrich_user        (read-only)
# GET  /devices/{id}                 โ†’ enrich_device      (read-only)

Step 2 ยท Set Up API Authentication

Configure Microsoft Graph API authentication using OAuth 2.0 client credentials flow. The app registration requires higher-privilege permissions than the Sentinel server because this server can modify incidents and take device actions.

Register the Application in Entra ID

  1. Sign in to entra.microsoft.com
  2. Navigate to Identity > Applications > App registrations > New registration
  3. Name: xdr-mcp-server, Account type: Single tenant, click Register
  4. Go to API permissions > Add a permission > Microsoft Graph > Application permissions
  5. Add these permissions:
    • SecurityIncident.ReadWrite.All
    • SecurityAlert.ReadWrite.All
    • ThreatHunting.Read.All
    • Machine.Isolate (from WindowsDefenderATP)
    • User.Read.All
  6. Click Grant admin consent for your tenant
  7. Navigate to Certificates & secrets > create a new client secret
  8. Copy the Application ID, Tenant ID, and Secret value

Authentication Module

import os
from dotenv import load_dotenv
from azure.identity import ClientSecretCredential
from msgraph import GraphServiceClient

# Load Azure credentials from .env file
load_dotenv()

def get_graph_client() -> GraphServiceClient:
    """Create an authenticated Microsoft Graph client.
    Uses OAuth 2.0 client credentials flow (app-only, no user context).
    This is appropriate for background services like MCP servers
    that act on behalf of the application, not a specific user.
    Returns: GraphServiceClient ready to call Graph Security APIs.
    """
    # Authenticate using the Entra ID app registration credentials
    credential = ClientSecretCredential(
        tenant_id=os.environ['AZURE_TENANT_ID'],
        client_id=os.environ['AZURE_CLIENT_ID'],
        client_secret=os.environ['AZURE_CLIENT_SECRET']
    )
    # .default scope requests all permissions granted to the app
    scopes = ['https://graph.microsoft.com/.default']
    return GraphServiceClient(credential, scopes)

# Singleton client instance - reuse across all tool invocations
# Creating new clients repeatedly wastes auth token requests
graph_client = get_graph_client()
โš ๏ธ Important: The SecurityIncident.ReadWrite.All permission allows the MCP server to close or reassign incidents. Follow the principle of least privilege. only grant write permissions if your AI workflows require incident updates.

Step 3 ยท Create the Incident List Tool

Build a list_incidents tool that returns active incidents with filtering support. Design the output to be AI-friendly. include all relevant context in a single response so the AI doesn’t need multiple tool calls.

Tool Definition & Handler

import json
from datetime import datetime, timedelta, timezone
from mcp.server import Server
import mcp.types as types

# Create XDR MCP server - exposes Defender XDR capabilities as MCP tools
server = Server("xdr-mcp-server")

# MCP Tool Discovery: define all tools the server exposes to AI clients
@server.list_tools()
async def list_tools() -> list[types.Tool]:
    return [
        # Tool: list_incidents - retrieve active security incidents
        # AI uses this as the entry point for investigation workflows
        types.Tool(
            name="list_incidents",
            description="List active security incidents from Defender XDR "
                        "with severity, status, affected entities, and alert counts.",
            inputSchema={
                "type": "object",
                "properties": {
                    "severity": {
                        "type": "string",
                        "description": "Filter: high, medium, low, informational",
                        "enum": ["high", "medium", "low", "informational"]
                    },
                    "status": {
                        "type": "string",
                        "description": "Filter: active, resolved, redirected",
                        "enum": ["active", "resolved", "redirected"]
                    },
                    "top": {
                        "type": "integer",
                        "description": "Number of incidents to return (default: 20, max: 50)",
                        "default": 20
                    }
                },
                "required": []
            }
        ),
        # ... other tools defined below
    ]

async def handle_list_incidents(arguments: dict) -> list[types.TextContent]:
    """List incidents from Defender XDR via Microsoft Graph Security API.
    Supports filtering by severity and status with OData query parameters.
    Returns: JSON with incident metadata formatted for AI consumption.
    """
    # Cap results to prevent oversized responses
    top = min(int(arguments.get("top", 20)), 50)
    severity = arguments.get("severity")
    status = arguments.get("status", "active")

    # Build OData $filter string from provided parameters
    # OData filters are passed to Graph API for server-side filtering
    filters = []
    if severity:
        filters.append(f"severity eq '{severity}'")
    if status:
        filters.append(f"status eq '{status}'")
    filter_str = " and ".join(filters) if filters else None

    try:
        result = await graph_client.security.incidents.get(
            request_configuration=lambda c: setattr(c.query_parameters, 'top', top) or
                (setattr(c.query_parameters, 'filter', filter_str) if filter_str else None) or
                setattr(c.query_parameters, 'orderby', ['createdDateTime desc'])
        )

        # Extract key fields from each incident for AI-friendly output
        # Include enough context for triage without overwhelming the response
        incidents = []
        for inc in (result.value or []):
            incidents.append({
                "id": inc.id,
                "display_name": inc.display_name,
                "severity": str(inc.severity),
                "status": str(inc.status),
                "created": str(inc.created_date_time),
                "last_modified": str(inc.last_update_date_time),
                "assigned_to": inc.assigned_to or "Unassigned",
                "classification": str(inc.classification) if inc.classification else None,
                "alert_count": len(inc.alerts) if inc.alerts else 0,
                "description": (inc.description or "")[:200]  # Truncate for brevity
            })

        # Include a "hint" to guide the AI model to the next logical step
        return [types.TextContent(type="text", text=json.dumps({
            "status": "success",
            "incident_count": len(incidents),
            "filters_applied": {"severity": severity, "status": status},
            "incidents": incidents,
            "hint": "Use get_incident with an incident id for full details."
        }, default=str))]

    except Exception as e:
        return [types.TextContent(type="text", text=json.dumps({
            "status": "error", "message": str(e)
        }))]
๐Ÿ’ก Pro Tip: Include a hint field in list responses to guide the AI model to make follow-up calls. The hint “Use get_incident with an incident id for full details” teaches the model the correct workflow.

Step 4 ยท Create the Incident Detail Tool

Build a get_incident tool that returns the full incident with all alerts, affected entities, MITRE ATT&CK classifications, and evidence artifacts. Structure the response hierarchically for efficient AI navigation.

Implementation

async def handle_get_incident(arguments: dict) -> list[types.TextContent]:
    """Get comprehensive incident details including alerts and entities.
    Builds a hierarchical response: incident โ†’ alerts โ†’ entities โ†’ evidence.
    This allows AI models to navigate the data tree efficiently.
    Returns: JSON with full incident details, MITRE techniques, and next actions.
    """
    incident_id = arguments.get("incident_id")
    if not incident_id:
        return [types.TextContent(type="text", text=json.dumps({
            "status": "error",
            "message": "incident_id is required",
            "suggestion": "Use list_incidents first to find active incident IDs."
        }))]

    try:
        inc = await graph_client.security.incidents.by_incident_id(
            incident_id
        ).get()

        # Build hierarchical response for efficient AI navigation
        # Structure: incident > alerts > evidence > entities
        alerts_data = []
        # Group entities by type for structured access
        entities_by_type = {"users": [], "devices": [], "ips": [], "files": []}

        if inc.alerts:
            for alert in inc.alerts:
                alerts_data.append({
                    "id": alert.id,
                    "title": alert.title,
                    "severity": str(alert.severity),
                    "category": alert.category,
                    "created": str(alert.created_date_time),
                    "mitre_techniques": [
                        t.technique_id for t in (alert.mitre_techniques or [])
                    ],
                    "description": (alert.description or "")[:300]
                })

                # Extract entities from each alert
                for ev in (alert.evidence or []):
                    entity_type = str(type(ev).__name__)
                    if "User" in entity_type:
                        entities_by_type["users"].append({
                            "name": getattr(ev, 'user_account', {}).get('display_name', 'N/A'),
                            "upn": getattr(ev, 'user_account', {}).get('user_principal_name', 'N/A')
                        })
                    elif "Device" in entity_type:
                        entities_by_type["devices"].append({
                            "name": getattr(ev, 'device_dns_name', 'N/A'),
                            "id": getattr(ev, 'mde_device_id', 'N/A')
                        })

        result = {
            "status": "success",
            "incident": {
                "id": inc.id,
                "display_name": inc.display_name,
                "severity": str(inc.severity),
                "status": str(inc.status),
                "created": str(inc.created_date_time),
                "assigned_to": inc.assigned_to or "Unassigned",
                "classification": str(inc.classification) if inc.classification else None,
                "description": inc.description,
                "alerts": alerts_data,
                "entities": entities_by_type,
                "alert_count": len(alerts_data)
            },
            "available_actions": [
                "Use update_incident to change status, severity, or assignment",
                "Use run_hunting_query to investigate further",
                "Use isolate_device to contain compromised devices"
            ]
        }

        return [types.TextContent(type="text", text=json.dumps(result, default=str))]

    except Exception as e:
        return [types.TextContent(type="text", text=json.dumps({
            "status": "error", "message": str(e)
        }))]
๐Ÿ’ก Pro Tip: Include available_actions in detail responses to guide the AI model on what it can do next. This is like a contextual help menu that teaches the model the security workflow: investigate โ†’ assess โ†’ contain โ†’ remediate.

Step 5 ยท Create the Advanced Hunting Tool

Build a run_hunting_query tool that executes KQL queries against the Defender XDR advanced hunting data lake. This targets Defender-specific tables: DeviceEvents, AlertEvidence, EmailEvents, IdentityLogonEvents, etc.

Implementation

from msgraph.generated.security.microsoft_graph_security_run_hunting_query import \
    RunHuntingQueryPostRequestBody

async def handle_hunting_query(arguments: dict) -> list[types.TextContent]:
    """Execute a KQL query against the Defender XDR advanced hunting data lake.
    Unlike Sentinel queries (Log Analytics), these target Defender-specific
    tables: DeviceEvents, AlertEvidence, EmailEvents, IdentityLogonEvents, etc.
    Returns: JSON with schema, row_count, and query results.
    """
    query = arguments.get("query", "")

    # Validate the query
    if len(query) > 10000:
        return [types.TextContent(type="text", text=json.dumps({
            "status": "error",
            "message": "Query exceeds 10,000 character limit."
        }))]

    try:
        # Submit the KQL query to the Graph Security hunting API
        body = RunHuntingQueryPostRequestBody(query=query)
        result = await graph_client.security \
            .microsoft_graph_security_run_hunting_query \
            .post(body=body)

        # Convert Graph API response rows to JSON-serializable dicts
        rows = []
        if result and result.results:
            for row in result.results:
                rows.append(row.additional_data)

        return [types.TextContent(type="text", text=json.dumps({
            "status": "success",
            "query": query,
            "row_count": len(rows),
            "results": rows[:500],  # Cap results
            "schema": [
                {"name": col.name, "type": col.type}
                for col in (result.schema or [])
            ] if result else []
        }, default=str))]

    except Exception as e:
        error_msg = str(e)
        suggestion = "Check KQL syntax."
        if "BadRequest" in error_msg:
            suggestion = "Invalid KQL. Verify table names and column references."
        return [types.TextContent(type="text", text=json.dumps({
            "status": "error", "message": error_msg,
            "suggestion": suggestion
        }))]

Sample Hunting Queries to Test

// Query 1: Detect suspicious process creation on endpoints
// Looks for encoded commands, bypass flags, and hidden execution
DeviceProcessEvents
| where Timestamp > ago(24h)
| where FileName in ("powershell.exe", "cmd.exe", "wscript.exe")
| where ProcessCommandLine has_any ("encode", "bypass", "hidden", "-e ")
| project Timestamp, DeviceName, FileName, ProcessCommandLine
| take 50

// Query 2: Find alerts that fire repeatedly on the same entity
// High alert counts suggest persistent threats or noisy rules
AlertEvidence
| where Timestamp > ago(7d)
| summarize AlertCount=dcount(AlertId) by EntityType, Title
| where AlertCount > 3
| order by AlertCount desc

// Query 3: Identify phishing and malware in inbound email
// Uses Defender for Office 365 email threat detection data
EmailEvents
| where Timestamp > ago(24h)
| where ThreatTypes has "Phish" or ThreatTypes has "Malware"
| summarize Count=count() by SenderFromDomain, ThreatTypes
| order by Count desc
| take 20

Step 6 ยท Create Incident Update Tools

Build tools that modify incident state. Implement confirmation patterns for state-changing tools to prevent AI models from making unintended changes.

Implementation with Safety Confirmation

import logging

# Audit logger for all state-changing operations
# Critical for compliance: every write action must be recorded
# with timestamp, tool name, parameters, and outcome
audit_logger = logging.getLogger("mcp.audit")
audit_logger.setLevel(logging.INFO)
handler = logging.FileHandler("mcp_audit.log")
handler.setFormatter(logging.Formatter(
    '%(asctime)s | %(message)s'
))
audit_logger.addHandler(handler)

async def handle_update_incident(arguments: dict) -> list[types.TextContent]:
    """Update incident status, severity, assignment, or classification.
    SAFETY: Requires explicit confirm=true to apply changes.
    This two-step pattern (preview โ†’ confirm) prevents AI models
    from making unintended changes to active incidents.
    """
    incident_id = arguments.get("incident_id")
    confirm = arguments.get("confirm", False)

    # Build update payload from provided fields
    # Only include fields the caller explicitly set
    updates = {}
    if "status" in arguments:
        updates["status"] = arguments["status"]
    if "severity" in arguments:
        updates["severity"] = arguments["severity"]
    if "assigned_to" in arguments:
        updates["assignedTo"] = arguments["assigned_to"]
    if "classification" in arguments:
        updates["classification"] = arguments["classification"]

    if not updates:
        return [types.TextContent(type="text", text=json.dumps({
            "status": "error",
            "message": "No update fields provided.",
            "available_fields": ["status", "severity", "assigned_to", "classification"]
        }))]

    # Safety gate: require explicit confirmation for state changes
    # The AI must call this tool twice: once to preview, once to apply
    if not confirm:
        return [types.TextContent(type="text", text=json.dumps({
            "status": "confirmation_required",
            "message": f"About to update incident {incident_id}",
            "changes": updates,
            "instruction": "Call update_incident again with confirm=true to apply."
        }))]

    try:
        # Apply the update via Graph API
        from msgraph.generated.models.security import Incident
        patch_body = Incident()
        if "status" in updates:
            patch_body.status = updates["status"]
        if "severity" in updates:
            patch_body.severity = updates["severity"]
        if "assignedTo" in updates:
            patch_body.assigned_to = updates["assignedTo"]

        await graph_client.security.incidents.by_incident_id(
            incident_id
        ).patch(patch_body)

        # Audit log
        audit_logger.info(
            f"INCIDENT_UPDATE | id={incident_id} | changes={json.dumps(updates)}"
        )

        return [types.TextContent(type="text", text=json.dumps({
            "status": "success",
            "message": f"Incident {incident_id} updated successfully.",
            "applied_changes": updates
        }))]

    except Exception as e:
        return [types.TextContent(type="text", text=json.dumps({
            "status": "error", "message": str(e)
        }))]
โš ๏ธ Important: The confirmation pattern (confirm=true) is critical for production safety. Without it, an AI model might resolve every incident it encounters. This two-step flow gives humans visibility into what the AI intends to do before it does it.

Step 7 ยท Create Device Action Tools

Device actions are high-impact operations. Implement safety guardrails: require explicit confirmation, log all actions, enforce cooling-off periods, and validate device existence before acting.

Device Isolation Tool

import aiohttp

# Microsoft Defender for Endpoint API base URL
# Used for device-level actions (isolate, scan, unisolate)
MDE_API_BASE = "https://api.securitycenter.microsoft.com/api"

async def handle_isolate_device(arguments: dict) -> list[types.TextContent]:
    """Isolate a device from the network (containment action).
    HIGH-IMPACT: This cuts the device off from all network access
    except the Defender for Endpoint cloud service channel.
    Uses three-layer safety: dry_run (default) โ†’ preview โ†’ confirm.
    """
    device_id = arguments.get("device_id")
    comment = arguments.get("comment", "Isolated via MCP server")
    isolation_type = arguments.get("type", "Full")  # Full or Selective
    dry_run = arguments.get("dry_run", True)  # Default to dry-run!
    confirm = arguments.get("confirm", False)

    if not device_id:
        return [types.TextContent(type="text", text=json.dumps({
            "status": "error",
            "message": "device_id is required.",
            "suggestion": "Use enrich_device or get_incident to find device IDs."
        }))]

    # Layer 1: Dry-run mode (default=True) - describe what WOULD happen
    # This lets the AI show its plan before any real action
    if dry_run:
        return [types.TextContent(type="text", text=json.dumps({
            "status": "dry_run",
            "message": f"Would isolate device {device_id} ({isolation_type} isolation)",
            "impact": "Device will lose network connectivity except to Defender service.",
            "instruction": "Set dry_run=false and confirm=true to execute."
        }))]

    if not confirm:
        return [types.TextContent(type="text", text=json.dumps({
            "status": "confirmation_required",
            "message": f"About to {isolation_type} isolate device {device_id}.",
            "warning": "This will disconnect the device from the network!",
            "instruction": "Call isolate_device with confirm=true to proceed."
        }))]

    try:
        # Execute isolation via the MDE Machine Actions API
        # Requires Machine.Isolate permission on the app registration
        token = credential.get_token("https://api.securitycenter.microsoft.com/.default")
        headers = {"Authorization": f"Bearer {token.token}", "Content-Type": "application/json"}
        # Full isolation cuts ALL network; Selective keeps Outlook/Teams
        body = {"Comment": comment, "IsolationType": isolation_type}

        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"{MDE_API_BASE}/machines/{device_id}/isolate",
                headers=headers, json=body
            ) as resp:
                if resp.status == 201:
                    result = await resp.json()
                    audit_logger.info(
                        f"DEVICE_ISOLATE | device={device_id} | type={isolation_type}"
                    )
                    return [types.TextContent(type="text", text=json.dumps({
                        "status": "success",
                        "message": f"Device {device_id} isolation initiated.",
                        "action_id": result.get("id"),
                        "action_status": result.get("status")
                    }))]
                else:
                    error = await resp.text()
                    return [types.TextContent(type="text", text=json.dumps({
                        "status": "error",
                        "http_status": resp.status,
                        "message": error
                    }))]
    except Exception as e:
        return [types.TextContent(type="text", text=json.dumps({
            "status": "error", "message": str(e)
        }))]
๐Ÿ’ก Pro Tip: Default to dry_run=True for all destructive actions. This means the AI must explicitly opt out of dry-run mode, adding a natural safety layer. The AI describes its plan first (dry-run), then the human approves the real execution.

Step 8 ยท Create Entity Enrichment Tools

Entity enrichment tools aggregate data from multiple sources to build comprehensive user, device, and IP profiles. These are the most frequently called tools in security AI workflows.

User Enrichment

async def handle_enrich_user(arguments: dict) -> list[types.TextContent]:
    """Enrich a user entity by aggregating data from multiple sources.
    Combines: Entra ID profile + Identity Protection risk + sign-in history.
    This multi-source aggregation gives AI a complete user picture
    in a single tool call, reducing investigation steps.
    Returns: JSON with user profile, risk assessment, and recent sign-ins.
    """
    upn = arguments.get("user_principal_name")
    if not upn:
        return [types.TextContent(type="text", text=json.dumps({
            "status": "error", "message": "user_principal_name is required."
        }))]

    try:
        # Source 1: Entra ID user profile (department, title, account status)
        user = await graph_client.users.by_user_id(upn).get()

        # Source 2: Identity Protection risk detections (compromised likelihood)
        risk_query = f"UserPrincipalName eq '{upn}'"
        risks = await graph_client.identity_protection.risky_users.get(
            request_configuration=lambda c:
                setattr(c.query_parameters, 'filter', risk_query)
        )

        # Source 3: Recent sign-in activity (last 10 sign-ins for anomaly review)
        signin_query = f"userPrincipalName eq '{upn}'"
        signins = await graph_client.audit_logs.sign_ins.get(
            request_configuration=lambda c:
                setattr(c.query_parameters, 'filter', signin_query) or
                setattr(c.query_parameters, 'top', 10)
        )

        return [types.TextContent(type="text", text=json.dumps({
            "status": "success",
            "user": {
                "display_name": user.display_name,
                "upn": user.user_principal_name,
                "job_title": user.job_title,
                "department": user.department,
                "account_enabled": user.account_enabled,
                "created": str(user.created_date_time),
            },
            "risk": {
                "level": str(risks.value[0].risk_level) if risks.value else "none",
                "state": str(risks.value[0].risk_state) if risks.value else "none",
                "detail": str(risks.value[0].risk_detail) if risks.value else "none"
            },
            "recent_signins": [
                {
                    "date": str(s.created_date_time),
                    "app": s.app_display_name,
                    "status": "Success" if s.status.error_code == 0 else "Failed",
                    "ip": s.ip_address,
                    "location": f"{s.location.city}, {s.location.country_or_region}"
                        if s.location else "Unknown"
                }
                for s in (signins.value or [])[:5]
            ]
        }, default=str))]

    except Exception as e:
        return [types.TextContent(type="text", text=json.dumps({
            "status": "error", "message": str(e)
        }))]
๐Ÿ’ก Pro Tip: Optimize enrichment tools for performance. Cache frequently requested user/device data with a 5-minute TTL. In active investigations, the same entity is often queried multiple times across different tool calls.

Step 9 ยท Implement Tool Composition Patterns

Design tools that work well together. Verify that tool outputs contain the information needed as inputs for subsequent tools. for example, incident details should include device IDs that can be passed to isolate_device.

Multi-Step Investigation Workflow

# AI Investigation Workflow (simulated sequence)
#
# Step 1: Discover incidents
result = await call_tool("list_incidents", {"status": "active", "severity": "high"})
# โ†’ Returns incident IDs

# Step 2: Get details on the most critical incident
result = await call_tool("get_incident", {"incident_id": "INC-987654"})
# โ†’ Returns alerts, entities (user UPNs, device IDs), MITRE techniques

# Step 3: Enrich the affected user
result = await call_tool("enrich_user", {"user_principal_name": "john@contoso.com"})
# โ†’ Returns risk level, recent sign-ins, anomalies

# Step 4: Hunt for related activity
result = await call_tool("run_hunting_query", {
    "query": """DeviceProcessEvents
    | where Timestamp > ago(24h)
    | where AccountName == 'john'
    | where FileName in ('powershell.exe', 'cmd.exe')
    | project Timestamp, DeviceName, FileName, ProcessCommandLine"""
})
# โ†’ Returns suspicious process activity

# Step 5: Contain the compromised device (dry-run first)
result = await call_tool("isolate_device", {
    "device_id": "abc123",
    "dry_run": True,
    "comment": "Compromised in INC-987654"
})
# โ†’ Returns what would happen

# Step 6: Update the incident
result = await call_tool("update_incident", {
    "incident_id": "INC-987654",
    "status": "active",
    "assigned_to": "secops-team@contoso.com",
    "confirm": True
})
๐Ÿ’ก Pro Tip: Test this entire workflow end-to-end with the MCP Inspector. Verify that each step’s output provides the IDs and data needed for the next step. Broken data chains are the #1 cause of AI investigation failures.

Step 10 ยท Add Rate Limiting and Throttling

Protect both the Microsoft Graph API and your MCP server from overload. Respect Graph API throttling (429 responses), implement client-side rate limits, and queue requests during high-load periods.

Rate Limiter Implementation

import asyncio
import time
from collections import defaultdict

class RateLimiter:
    """Token-bucket rate limiter for MCP tool calls.
    Prevents exceeding Microsoft Graph API throttling limits.
    Each tool gets its own limiter with appropriate call rates.
    """

    def __init__(self, calls_per_minute: int = 60):
        self.calls_per_minute = calls_per_minute
        self.semaphore = asyncio.Semaphore(calls_per_minute)
        self.call_times: list[float] = []  # Timestamps of recent calls
        self._lock = asyncio.Lock()

    async def acquire(self, tool_name: str):
        """Wait until a call slot is available within the rate window."""
        async with self._lock:
            now = time.time()
            # Remove calls older than 60 seconds
            self.call_times = [t for t in self.call_times if now. t < 60]
            if len(self.call_times) >= self.calls_per_minute:
                wait_time = 60. (now. self.call_times[0])
                await asyncio.sleep(wait_time)
            self.call_times.append(time.time())

# Per-tool rate limits - expensive or high-impact tools get lower limits
# This prevents AI agents from overwhelming the Graph API during investigations
rate_limiters = {
    "list_incidents": RateLimiter(30),     # 30/min - lightweight list call
    "get_incident": RateLimiter(60),       # 60/min - single-item retrieval
    "run_hunting_query": RateLimiter(10),  # 10/min - expensive KQL execution
    "isolate_device": RateLimiter(5),      # 5/min  - high-impact action
    "update_incident": RateLimiter(20),    # 20/min - state-changing write
}

async def rate_limited_call(tool_name: str, handler, arguments: dict):
    """Wrap tool calls with rate limiting."""
    limiter = rate_limiters.get(tool_name, RateLimiter(60))
    await limiter.acquire(tool_name)
    return await handler(arguments)

Step 11 ยท Add Audit Logging

Log every tool invocation for compliance and forensic review. In a post-incident review, you need to show exactly what the AI did, what data it accessed, and what changes it made.

Structured Audit Logging

import json
import logging
from datetime import datetime, timezone
from functools import wraps

# Structured JSON logging for audit compliance
# Each log entry is a self-contained JSON object on one line (JSONL format)
# This enables log aggregation tools (Azure Monitor, Splunk) to parse entries
class JsonFormatter(logging.Formatter):
    def format(self, record):
        # Build a structured log entry with all investigation context
        log_data = {
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "level": record.levelname,
            "tool": getattr(record, 'tool_name', 'unknown'),   # Which MCP tool
            "action": getattr(record, 'action', 'unknown'),     # What happened
            "arguments": getattr(record, 'arguments', {}),      # Input parameters
            "result_status": getattr(record, 'result_status', 'unknown'),
            "duration_ms": getattr(record, 'duration_ms', 0),   # Performance data
            "message": record.getMessage()
        }
        return json.dumps(log_data)

# Configure the audit logger to write JSONL to a file
# Each line = one tool invocation record
audit_logger = logging.getLogger("mcp.audit")
handler = logging.FileHandler("mcp_audit.jsonl")
handler.setFormatter(JsonFormatter())
audit_logger.addHandler(handler)
audit_logger.setLevel(logging.INFO)

def audit_tool_call(func):
    """Decorator to audit all tool invocations.
    Wraps every tool call with timing, logging, and error tracking.
    Critical for SOC 2, ISO 27001, and GDPR compliance.
    """
    @wraps(func)
    async def wrapper(name: str, arguments: dict):
        start = time.time()
        try:
            result = await func(name, arguments)
            duration = (time.time(). start) * 1000
            audit_logger.info(
                f"Tool call completed",
                extra={
                    "tool_name": name,
                    "action": "call",
                    "arguments": {k: v for k, v in arguments.items()
                                  if k != "confirm"},  # Don't log secrets
                    "result_status": "success",
                    "duration_ms": round(duration)
                }
            )
            return result
        except Exception as e:
            duration = (time.time(). start) * 1000
            audit_logger.error(
                f"Tool call failed: {e}",
                extra={
                    "tool_name": name,
                    "action": "call",
                    "arguments": arguments,
                    "result_status": "error",
                    "duration_ms": round(duration)
                }
            )
            raise
    return wrapper
โš ๏ธ Important: Audit logs for MCP servers may be required for compliance frameworks like SOC 2, ISO 27001, and GDPR. Retain these logs for the same period as your other security audit trails (typically 1–7 years).

Step 12 ยท Test Multi-Step Investigation Workflows

Simulate a complete AI-driven investigation workflow. Test both the happy path and error scenarios to ensure robust behaviour.

Test Checklist

  1. Launch the MCP Inspector: mcp dev src/server.py
  2. Call list_incidents. verify incidents are returned with IDs and metadata
  3. Call get_incident with a real incident ID. verify alerts and entities are populated
  4. Call enrich_user with a UPN from the incident. verify risk and sign-in data
  5. Call run_hunting_query with a valid KQL query. verify structured results
  6. Call run_hunting_query with invalid KQL. verify helpful error response
  7. Call update_incident without confirm=true. verify confirmation prompt
  8. Call isolate_device with dry_run=true. verify dry-run response
  9. Check mcp_audit.jsonl. verify all calls were logged
  10. Verify rate limiting by rapidly calling the same tool. confirm throttling kicks in

Step 13 ยท Write Tool Documentation

Create comprehensive documentation for each tool. Good documentation is essential because AI models use tool descriptions to decide when and how to call your tools.

README Template

# Defender XDR MCP Server

## Tools

### list_incidents
List active security incidents from Defender XDR.
- **Input:** severity (optional), status (optional), top (optional, max: 50)
- **Output:** Array of incidents with id, display_name, severity, status
- **Rate Limit:** 30 calls/minute
- **Example:** `{"status": "active", "severity": "high"}`

### get_incident
Get full incident details including alerts, entities, and MITRE techniques.
- **Input:** incident_id (required)
- **Output:** Hierarchical incident data with alerts and entities
- **Rate Limit:** 60 calls/minute

### update_incident
Update incident status, severity, or assignment. Requires confirmation.
- **Input:** incident_id (required), status/severity/assigned_to, confirm (boolean)
- **Safety:** Requires confirm=true to apply changes
- **Rate Limit:** 20 calls/minute

### isolate_device
Isolate a compromised device from the network.
- **Input:** device_id (required), dry_run (default: true), confirm (boolean)
- **Safety:** Defaults to dry_run mode; requires confirm=true for execution
- **Rate Limit:** 5 calls/minute

Step 14 ยท Package for Distribution

Package your MCP server for deployment using Docker. Create deployment templates for common scenarios.

Dockerfile

# Dockerfile for packaging the XDR MCP server as a container
FROM python:3.12-slim

WORKDIR /app

# Install Python dependencies first (leverages Docker layer caching)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy source code into the container
COPY src/ ./src/
COPY .env.example .

# Launch the MCP server using stdio transport (default)
# Override MCP_TRANSPORT=sse for cloud deployment
CMD ["python", "src/server.py"]

Docker Commands

# Build the Docker image with tag 'xdr-mcp-server:latest'
docker build -t xdr-mcp-server:latest .

# Run the container with environment variables loaded from .env file
docker run --env-file .env xdr-mcp-server:latest

# Alternative: pass Azure credentials directly as environment variables
# Useful for CI/CD pipelines or when .env file isn't available
docker run \
  -e AZURE_TENANT_ID=... \
  -e AZURE_CLIENT_ID=... \
  -e AZURE_CLIENT_SECRET=... \
  xdr-mcp-server:latest

Clean Up & Next Steps

  • Push your Docker image to your container registry (ACR, Docker Hub, or GitHub Container Registry)
  • Set up CI/CD with GitHub Actions to automate testing and deployment
๐Ÿ’ก Pro Tip: Create separate deployment modes: single-tenant (one server per client), multi-tenant (one server with tenant isolation), and dev mode (local with mock data). This flexibility makes your MCP server suitable for both production and training environments.

๐Ÿ“š Documentation Resources

ResourceDescription
Microsoft Defender XDR API overviewAPI access patterns and authentication
Create an app to access Defender XDR APIsApp registration and permissions
Incidents APIList, update, and manage incidents programmatically
Advanced hunting APIRun KQL queries through the Defender XDR API
MCP ToolsDefine and implement tools for AI model interaction
MCP TransportsUnderstand stdio and SSE transport mechanisms

Summary

What You Accomplished

  • Configured OAuth client credentials authentication for Microsoft Graph Security API
  • Built read-only Graph Security queries for alerts, incidents, and secure scores
  • Implemented write operations with safety gates for incident updates
  • Added rate limiting to respect Graph API throttling limits
  • Integrated structured audit logging for all MCP tool invocations
  • Packaged the MCP server in a Docker container for portable deployment

Next Steps

← Previous Lab Next Lab →