Build a Model Context Protocol server that integrates with Microsoft Defender XDR APIs, exposing incident management, threat hunting, and device isolation capabilities as MCP tools for AI-powered security operations.
In this lab you will build a fully functional MCP server that integrates with Microsoft Defender XDR APIs. The server exposes incident management, advanced threat hunting, and device isolation capabilities as MCP tools. enabling AI clients to interact with your XDR environment programmatically for security operations.
A SOC team wants their AI assistant to go beyond read-only queries and actively manage security incidents. Analysts need the AI to retrieve and update incidents, run KQL hunting queries across the XDR data lake, and isolate compromised devices. all through natural-language conversations.
By wrapping these Defender XDR capabilities in MCP tools, the team enables autonomous triage, faster mean-time-to-respond, and consistent incident handling across shifts.
This lab extends MCP beyond read-only queries into active incident response. By exposing investigation and remediation actions as MCP tools, you enable AI agents to perform autonomous security operations. dramatically reducing response times and freeing analysts to focus on complex, high-judgment tasks.
run_kql_query toolSecurityIncident.ReadWrite.All, SecurityAlert.ReadWrite.All, and ThreatHunting.Read.All permissionsmcp[cli] or Node.js @modelcontextprotocol/sdkmsgraph-sdk for Python or @microsoft/microsoft-graph-client for Node.jsBefore building the MCP server, review the Microsoft Graph Security API and map each endpoint to an MCP tool. Each API endpoint that provides value for AI-driven security operations should become an MCP tool with a clear name, description, and schema.
/security/incidents. list, get, and update incidents/security/alerts_v2. alert management and enrichment/security/runHuntingQuery. advanced hunting with KQL/security/tiIndicators. threat intelligence indicators# API Endpoint โ MCP Tool Mapping
# Each Microsoft Graph Security API endpoint becomes an MCP tool
# Read-only endpoints = low-risk tools (auto-execute)
# Write endpoints = medium/high-risk tools (require confirmation)
#
# GET /security/incidents โ list_incidents (read-only)
# GET /security/incidents/{id} โ get_incident (read-only)
# PATCH /security/incidents/{id} โ update_incident (state-changing!)
# POST /security/runHuntingQuery โ run_hunting_query (read-only)
# POST /devices/{id}/isolate โ isolate_device (destructive!)
# POST /devices/{id}/unisolate โ release_device (state-changing)
# GET /users/{id} โ enrich_user (read-only)
# GET /devices/{id} โ enrich_device (read-only)Configure Microsoft Graph API authentication using OAuth 2.0 client credentials flow. The app registration requires higher-privilege permissions than the Sentinel server because this server can modify incidents and take device actions.
xdr-mcp-server, Account type: Single tenant, click RegisterSecurityIncident.ReadWrite.AllSecurityAlert.ReadWrite.AllThreatHunting.Read.AllMachine.Isolate (from WindowsDefenderATP)User.Read.Allimport os
from dotenv import load_dotenv
from azure.identity import ClientSecretCredential
from msgraph import GraphServiceClient
# Load Azure credentials from .env file
load_dotenv()
def get_graph_client() -> GraphServiceClient:
"""Create an authenticated Microsoft Graph client.
Uses OAuth 2.0 client credentials flow (app-only, no user context).
This is appropriate for background services like MCP servers
that act on behalf of the application, not a specific user.
Returns: GraphServiceClient ready to call Graph Security APIs.
"""
# Authenticate using the Entra ID app registration credentials
credential = ClientSecretCredential(
tenant_id=os.environ['AZURE_TENANT_ID'],
client_id=os.environ['AZURE_CLIENT_ID'],
client_secret=os.environ['AZURE_CLIENT_SECRET']
)
# .default scope requests all permissions granted to the app
scopes = ['https://graph.microsoft.com/.default']
return GraphServiceClient(credential, scopes)
# Singleton client instance - reuse across all tool invocations
# Creating new clients repeatedly wastes auth token requests
graph_client = get_graph_client()SecurityIncident.ReadWrite.All permission allows the MCP server to close or reassign incidents. Follow the principle of least privilege. only grant write permissions if your AI workflows require incident updates.Build a list_incidents tool that returns active incidents with filtering support. Design the output to be AI-friendly. include all relevant context in a single response so the AI doesn’t need multiple tool calls.
import json
from datetime import datetime, timedelta, timezone
from mcp.server import Server
import mcp.types as types
# Create XDR MCP server - exposes Defender XDR capabilities as MCP tools
server = Server("xdr-mcp-server")
# MCP Tool Discovery: define all tools the server exposes to AI clients
@server.list_tools()
async def list_tools() -> list[types.Tool]:
return [
# Tool: list_incidents - retrieve active security incidents
# AI uses this as the entry point for investigation workflows
types.Tool(
name="list_incidents",
description="List active security incidents from Defender XDR "
"with severity, status, affected entities, and alert counts.",
inputSchema={
"type": "object",
"properties": {
"severity": {
"type": "string",
"description": "Filter: high, medium, low, informational",
"enum": ["high", "medium", "low", "informational"]
},
"status": {
"type": "string",
"description": "Filter: active, resolved, redirected",
"enum": ["active", "resolved", "redirected"]
},
"top": {
"type": "integer",
"description": "Number of incidents to return (default: 20, max: 50)",
"default": 20
}
},
"required": []
}
),
# ... other tools defined below
]
async def handle_list_incidents(arguments: dict) -> list[types.TextContent]:
"""List incidents from Defender XDR via Microsoft Graph Security API.
Supports filtering by severity and status with OData query parameters.
Returns: JSON with incident metadata formatted for AI consumption.
"""
# Cap results to prevent oversized responses
top = min(int(arguments.get("top", 20)), 50)
severity = arguments.get("severity")
status = arguments.get("status", "active")
# Build OData $filter string from provided parameters
# OData filters are passed to Graph API for server-side filtering
filters = []
if severity:
filters.append(f"severity eq '{severity}'")
if status:
filters.append(f"status eq '{status}'")
filter_str = " and ".join(filters) if filters else None
try:
result = await graph_client.security.incidents.get(
request_configuration=lambda c: setattr(c.query_parameters, 'top', top) or
(setattr(c.query_parameters, 'filter', filter_str) if filter_str else None) or
setattr(c.query_parameters, 'orderby', ['createdDateTime desc'])
)
# Extract key fields from each incident for AI-friendly output
# Include enough context for triage without overwhelming the response
incidents = []
for inc in (result.value or []):
incidents.append({
"id": inc.id,
"display_name": inc.display_name,
"severity": str(inc.severity),
"status": str(inc.status),
"created": str(inc.created_date_time),
"last_modified": str(inc.last_update_date_time),
"assigned_to": inc.assigned_to or "Unassigned",
"classification": str(inc.classification) if inc.classification else None,
"alert_count": len(inc.alerts) if inc.alerts else 0,
"description": (inc.description or "")[:200] # Truncate for brevity
})
# Include a "hint" to guide the AI model to the next logical step
return [types.TextContent(type="text", text=json.dumps({
"status": "success",
"incident_count": len(incidents),
"filters_applied": {"severity": severity, "status": status},
"incidents": incidents,
"hint": "Use get_incident with an incident id for full details."
}, default=str))]
except Exception as e:
return [types.TextContent(type="text", text=json.dumps({
"status": "error", "message": str(e)
}))]hint field in list responses to guide the AI model to make follow-up calls. The hint “Use get_incident with an incident id for full details” teaches the model the correct workflow.Build a get_incident tool that returns the full incident with all alerts, affected entities, MITRE ATT&CK classifications, and evidence artifacts. Structure the response hierarchically for efficient AI navigation.
async def handle_get_incident(arguments: dict) -> list[types.TextContent]:
"""Get comprehensive incident details including alerts and entities.
Builds a hierarchical response: incident โ alerts โ entities โ evidence.
This allows AI models to navigate the data tree efficiently.
Returns: JSON with full incident details, MITRE techniques, and next actions.
"""
incident_id = arguments.get("incident_id")
if not incident_id:
return [types.TextContent(type="text", text=json.dumps({
"status": "error",
"message": "incident_id is required",
"suggestion": "Use list_incidents first to find active incident IDs."
}))]
try:
inc = await graph_client.security.incidents.by_incident_id(
incident_id
).get()
# Build hierarchical response for efficient AI navigation
# Structure: incident > alerts > evidence > entities
alerts_data = []
# Group entities by type for structured access
entities_by_type = {"users": [], "devices": [], "ips": [], "files": []}
if inc.alerts:
for alert in inc.alerts:
alerts_data.append({
"id": alert.id,
"title": alert.title,
"severity": str(alert.severity),
"category": alert.category,
"created": str(alert.created_date_time),
"mitre_techniques": [
t.technique_id for t in (alert.mitre_techniques or [])
],
"description": (alert.description or "")[:300]
})
# Extract entities from each alert
for ev in (alert.evidence or []):
entity_type = str(type(ev).__name__)
if "User" in entity_type:
entities_by_type["users"].append({
"name": getattr(ev, 'user_account', {}).get('display_name', 'N/A'),
"upn": getattr(ev, 'user_account', {}).get('user_principal_name', 'N/A')
})
elif "Device" in entity_type:
entities_by_type["devices"].append({
"name": getattr(ev, 'device_dns_name', 'N/A'),
"id": getattr(ev, 'mde_device_id', 'N/A')
})
result = {
"status": "success",
"incident": {
"id": inc.id,
"display_name": inc.display_name,
"severity": str(inc.severity),
"status": str(inc.status),
"created": str(inc.created_date_time),
"assigned_to": inc.assigned_to or "Unassigned",
"classification": str(inc.classification) if inc.classification else None,
"description": inc.description,
"alerts": alerts_data,
"entities": entities_by_type,
"alert_count": len(alerts_data)
},
"available_actions": [
"Use update_incident to change status, severity, or assignment",
"Use run_hunting_query to investigate further",
"Use isolate_device to contain compromised devices"
]
}
return [types.TextContent(type="text", text=json.dumps(result, default=str))]
except Exception as e:
return [types.TextContent(type="text", text=json.dumps({
"status": "error", "message": str(e)
}))]available_actions in detail responses to guide the AI model on what it can do next. This is like a contextual help menu that teaches the model the security workflow: investigate โ assess โ contain โ remediate.Build a run_hunting_query tool that executes KQL queries against the Defender XDR advanced hunting data lake. This targets Defender-specific tables: DeviceEvents, AlertEvidence, EmailEvents, IdentityLogonEvents, etc.
from msgraph.generated.security.microsoft_graph_security_run_hunting_query import \
RunHuntingQueryPostRequestBody
async def handle_hunting_query(arguments: dict) -> list[types.TextContent]:
"""Execute a KQL query against the Defender XDR advanced hunting data lake.
Unlike Sentinel queries (Log Analytics), these target Defender-specific
tables: DeviceEvents, AlertEvidence, EmailEvents, IdentityLogonEvents, etc.
Returns: JSON with schema, row_count, and query results.
"""
query = arguments.get("query", "")
# Validate the query
if len(query) > 10000:
return [types.TextContent(type="text", text=json.dumps({
"status": "error",
"message": "Query exceeds 10,000 character limit."
}))]
try:
# Submit the KQL query to the Graph Security hunting API
body = RunHuntingQueryPostRequestBody(query=query)
result = await graph_client.security \
.microsoft_graph_security_run_hunting_query \
.post(body=body)
# Convert Graph API response rows to JSON-serializable dicts
rows = []
if result and result.results:
for row in result.results:
rows.append(row.additional_data)
return [types.TextContent(type="text", text=json.dumps({
"status": "success",
"query": query,
"row_count": len(rows),
"results": rows[:500], # Cap results
"schema": [
{"name": col.name, "type": col.type}
for col in (result.schema or [])
] if result else []
}, default=str))]
except Exception as e:
error_msg = str(e)
suggestion = "Check KQL syntax."
if "BadRequest" in error_msg:
suggestion = "Invalid KQL. Verify table names and column references."
return [types.TextContent(type="text", text=json.dumps({
"status": "error", "message": error_msg,
"suggestion": suggestion
}))]// Query 1: Detect suspicious process creation on endpoints
// Looks for encoded commands, bypass flags, and hidden execution
DeviceProcessEvents
| where Timestamp > ago(24h)
| where FileName in ("powershell.exe", "cmd.exe", "wscript.exe")
| where ProcessCommandLine has_any ("encode", "bypass", "hidden", "-e ")
| project Timestamp, DeviceName, FileName, ProcessCommandLine
| take 50
// Query 2: Find alerts that fire repeatedly on the same entity
// High alert counts suggest persistent threats or noisy rules
AlertEvidence
| where Timestamp > ago(7d)
| summarize AlertCount=dcount(AlertId) by EntityType, Title
| where AlertCount > 3
| order by AlertCount desc
// Query 3: Identify phishing and malware in inbound email
// Uses Defender for Office 365 email threat detection data
EmailEvents
| where Timestamp > ago(24h)
| where ThreatTypes has "Phish" or ThreatTypes has "Malware"
| summarize Count=count() by SenderFromDomain, ThreatTypes
| order by Count desc
| take 20Build tools that modify incident state. Implement confirmation patterns for state-changing tools to prevent AI models from making unintended changes.
import logging
# Audit logger for all state-changing operations
# Critical for compliance: every write action must be recorded
# with timestamp, tool name, parameters, and outcome
audit_logger = logging.getLogger("mcp.audit")
audit_logger.setLevel(logging.INFO)
handler = logging.FileHandler("mcp_audit.log")
handler.setFormatter(logging.Formatter(
'%(asctime)s | %(message)s'
))
audit_logger.addHandler(handler)
async def handle_update_incident(arguments: dict) -> list[types.TextContent]:
"""Update incident status, severity, assignment, or classification.
SAFETY: Requires explicit confirm=true to apply changes.
This two-step pattern (preview โ confirm) prevents AI models
from making unintended changes to active incidents.
"""
incident_id = arguments.get("incident_id")
confirm = arguments.get("confirm", False)
# Build update payload from provided fields
# Only include fields the caller explicitly set
updates = {}
if "status" in arguments:
updates["status"] = arguments["status"]
if "severity" in arguments:
updates["severity"] = arguments["severity"]
if "assigned_to" in arguments:
updates["assignedTo"] = arguments["assigned_to"]
if "classification" in arguments:
updates["classification"] = arguments["classification"]
if not updates:
return [types.TextContent(type="text", text=json.dumps({
"status": "error",
"message": "No update fields provided.",
"available_fields": ["status", "severity", "assigned_to", "classification"]
}))]
# Safety gate: require explicit confirmation for state changes
# The AI must call this tool twice: once to preview, once to apply
if not confirm:
return [types.TextContent(type="text", text=json.dumps({
"status": "confirmation_required",
"message": f"About to update incident {incident_id}",
"changes": updates,
"instruction": "Call update_incident again with confirm=true to apply."
}))]
try:
# Apply the update via Graph API
from msgraph.generated.models.security import Incident
patch_body = Incident()
if "status" in updates:
patch_body.status = updates["status"]
if "severity" in updates:
patch_body.severity = updates["severity"]
if "assignedTo" in updates:
patch_body.assigned_to = updates["assignedTo"]
await graph_client.security.incidents.by_incident_id(
incident_id
).patch(patch_body)
# Audit log
audit_logger.info(
f"INCIDENT_UPDATE | id={incident_id} | changes={json.dumps(updates)}"
)
return [types.TextContent(type="text", text=json.dumps({
"status": "success",
"message": f"Incident {incident_id} updated successfully.",
"applied_changes": updates
}))]
except Exception as e:
return [types.TextContent(type="text", text=json.dumps({
"status": "error", "message": str(e)
}))]confirm=true) is critical for production safety. Without it, an AI model might resolve every incident it encounters. This two-step flow gives humans visibility into what the AI intends to do before it does it.Device actions are high-impact operations. Implement safety guardrails: require explicit confirmation, log all actions, enforce cooling-off periods, and validate device existence before acting.
import aiohttp
# Microsoft Defender for Endpoint API base URL
# Used for device-level actions (isolate, scan, unisolate)
MDE_API_BASE = "https://api.securitycenter.microsoft.com/api"
async def handle_isolate_device(arguments: dict) -> list[types.TextContent]:
"""Isolate a device from the network (containment action).
HIGH-IMPACT: This cuts the device off from all network access
except the Defender for Endpoint cloud service channel.
Uses three-layer safety: dry_run (default) โ preview โ confirm.
"""
device_id = arguments.get("device_id")
comment = arguments.get("comment", "Isolated via MCP server")
isolation_type = arguments.get("type", "Full") # Full or Selective
dry_run = arguments.get("dry_run", True) # Default to dry-run!
confirm = arguments.get("confirm", False)
if not device_id:
return [types.TextContent(type="text", text=json.dumps({
"status": "error",
"message": "device_id is required.",
"suggestion": "Use enrich_device or get_incident to find device IDs."
}))]
# Layer 1: Dry-run mode (default=True) - describe what WOULD happen
# This lets the AI show its plan before any real action
if dry_run:
return [types.TextContent(type="text", text=json.dumps({
"status": "dry_run",
"message": f"Would isolate device {device_id} ({isolation_type} isolation)",
"impact": "Device will lose network connectivity except to Defender service.",
"instruction": "Set dry_run=false and confirm=true to execute."
}))]
if not confirm:
return [types.TextContent(type="text", text=json.dumps({
"status": "confirmation_required",
"message": f"About to {isolation_type} isolate device {device_id}.",
"warning": "This will disconnect the device from the network!",
"instruction": "Call isolate_device with confirm=true to proceed."
}))]
try:
# Execute isolation via the MDE Machine Actions API
# Requires Machine.Isolate permission on the app registration
token = credential.get_token("https://api.securitycenter.microsoft.com/.default")
headers = {"Authorization": f"Bearer {token.token}", "Content-Type": "application/json"}
# Full isolation cuts ALL network; Selective keeps Outlook/Teams
body = {"Comment": comment, "IsolationType": isolation_type}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{MDE_API_BASE}/machines/{device_id}/isolate",
headers=headers, json=body
) as resp:
if resp.status == 201:
result = await resp.json()
audit_logger.info(
f"DEVICE_ISOLATE | device={device_id} | type={isolation_type}"
)
return [types.TextContent(type="text", text=json.dumps({
"status": "success",
"message": f"Device {device_id} isolation initiated.",
"action_id": result.get("id"),
"action_status": result.get("status")
}))]
else:
error = await resp.text()
return [types.TextContent(type="text", text=json.dumps({
"status": "error",
"http_status": resp.status,
"message": error
}))]
except Exception as e:
return [types.TextContent(type="text", text=json.dumps({
"status": "error", "message": str(e)
}))]dry_run=True for all destructive actions. This means the AI must explicitly opt out of dry-run mode, adding a natural safety layer. The AI describes its plan first (dry-run), then the human approves the real execution.Entity enrichment tools aggregate data from multiple sources to build comprehensive user, device, and IP profiles. These are the most frequently called tools in security AI workflows.
async def handle_enrich_user(arguments: dict) -> list[types.TextContent]:
"""Enrich a user entity by aggregating data from multiple sources.
Combines: Entra ID profile + Identity Protection risk + sign-in history.
This multi-source aggregation gives AI a complete user picture
in a single tool call, reducing investigation steps.
Returns: JSON with user profile, risk assessment, and recent sign-ins.
"""
upn = arguments.get("user_principal_name")
if not upn:
return [types.TextContent(type="text", text=json.dumps({
"status": "error", "message": "user_principal_name is required."
}))]
try:
# Source 1: Entra ID user profile (department, title, account status)
user = await graph_client.users.by_user_id(upn).get()
# Source 2: Identity Protection risk detections (compromised likelihood)
risk_query = f"UserPrincipalName eq '{upn}'"
risks = await graph_client.identity_protection.risky_users.get(
request_configuration=lambda c:
setattr(c.query_parameters, 'filter', risk_query)
)
# Source 3: Recent sign-in activity (last 10 sign-ins for anomaly review)
signin_query = f"userPrincipalName eq '{upn}'"
signins = await graph_client.audit_logs.sign_ins.get(
request_configuration=lambda c:
setattr(c.query_parameters, 'filter', signin_query) or
setattr(c.query_parameters, 'top', 10)
)
return [types.TextContent(type="text", text=json.dumps({
"status": "success",
"user": {
"display_name": user.display_name,
"upn": user.user_principal_name,
"job_title": user.job_title,
"department": user.department,
"account_enabled": user.account_enabled,
"created": str(user.created_date_time),
},
"risk": {
"level": str(risks.value[0].risk_level) if risks.value else "none",
"state": str(risks.value[0].risk_state) if risks.value else "none",
"detail": str(risks.value[0].risk_detail) if risks.value else "none"
},
"recent_signins": [
{
"date": str(s.created_date_time),
"app": s.app_display_name,
"status": "Success" if s.status.error_code == 0 else "Failed",
"ip": s.ip_address,
"location": f"{s.location.city}, {s.location.country_or_region}"
if s.location else "Unknown"
}
for s in (signins.value or [])[:5]
]
}, default=str))]
except Exception as e:
return [types.TextContent(type="text", text=json.dumps({
"status": "error", "message": str(e)
}))]Design tools that work well together. Verify that tool outputs contain the information needed as inputs for subsequent tools. for example, incident details should include device IDs that can be passed to isolate_device.
# AI Investigation Workflow (simulated sequence)
#
# Step 1: Discover incidents
result = await call_tool("list_incidents", {"status": "active", "severity": "high"})
# โ Returns incident IDs
# Step 2: Get details on the most critical incident
result = await call_tool("get_incident", {"incident_id": "INC-987654"})
# โ Returns alerts, entities (user UPNs, device IDs), MITRE techniques
# Step 3: Enrich the affected user
result = await call_tool("enrich_user", {"user_principal_name": "john@contoso.com"})
# โ Returns risk level, recent sign-ins, anomalies
# Step 4: Hunt for related activity
result = await call_tool("run_hunting_query", {
"query": """DeviceProcessEvents
| where Timestamp > ago(24h)
| where AccountName == 'john'
| where FileName in ('powershell.exe', 'cmd.exe')
| project Timestamp, DeviceName, FileName, ProcessCommandLine"""
})
# โ Returns suspicious process activity
# Step 5: Contain the compromised device (dry-run first)
result = await call_tool("isolate_device", {
"device_id": "abc123",
"dry_run": True,
"comment": "Compromised in INC-987654"
})
# โ Returns what would happen
# Step 6: Update the incident
result = await call_tool("update_incident", {
"incident_id": "INC-987654",
"status": "active",
"assigned_to": "secops-team@contoso.com",
"confirm": True
})Protect both the Microsoft Graph API and your MCP server from overload. Respect Graph API throttling (429 responses), implement client-side rate limits, and queue requests during high-load periods.
import asyncio
import time
from collections import defaultdict
class RateLimiter:
"""Token-bucket rate limiter for MCP tool calls.
Prevents exceeding Microsoft Graph API throttling limits.
Each tool gets its own limiter with appropriate call rates.
"""
def __init__(self, calls_per_minute: int = 60):
self.calls_per_minute = calls_per_minute
self.semaphore = asyncio.Semaphore(calls_per_minute)
self.call_times: list[float] = [] # Timestamps of recent calls
self._lock = asyncio.Lock()
async def acquire(self, tool_name: str):
"""Wait until a call slot is available within the rate window."""
async with self._lock:
now = time.time()
# Remove calls older than 60 seconds
self.call_times = [t for t in self.call_times if now. t < 60]
if len(self.call_times) >= self.calls_per_minute:
wait_time = 60. (now. self.call_times[0])
await asyncio.sleep(wait_time)
self.call_times.append(time.time())
# Per-tool rate limits - expensive or high-impact tools get lower limits
# This prevents AI agents from overwhelming the Graph API during investigations
rate_limiters = {
"list_incidents": RateLimiter(30), # 30/min - lightweight list call
"get_incident": RateLimiter(60), # 60/min - single-item retrieval
"run_hunting_query": RateLimiter(10), # 10/min - expensive KQL execution
"isolate_device": RateLimiter(5), # 5/min - high-impact action
"update_incident": RateLimiter(20), # 20/min - state-changing write
}
async def rate_limited_call(tool_name: str, handler, arguments: dict):
"""Wrap tool calls with rate limiting."""
limiter = rate_limiters.get(tool_name, RateLimiter(60))
await limiter.acquire(tool_name)
return await handler(arguments)Log every tool invocation for compliance and forensic review. In a post-incident review, you need to show exactly what the AI did, what data it accessed, and what changes it made.
import json
import logging
from datetime import datetime, timezone
from functools import wraps
# Structured JSON logging for audit compliance
# Each log entry is a self-contained JSON object on one line (JSONL format)
# This enables log aggregation tools (Azure Monitor, Splunk) to parse entries
class JsonFormatter(logging.Formatter):
def format(self, record):
# Build a structured log entry with all investigation context
log_data = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"level": record.levelname,
"tool": getattr(record, 'tool_name', 'unknown'), # Which MCP tool
"action": getattr(record, 'action', 'unknown'), # What happened
"arguments": getattr(record, 'arguments', {}), # Input parameters
"result_status": getattr(record, 'result_status', 'unknown'),
"duration_ms": getattr(record, 'duration_ms', 0), # Performance data
"message": record.getMessage()
}
return json.dumps(log_data)
# Configure the audit logger to write JSONL to a file
# Each line = one tool invocation record
audit_logger = logging.getLogger("mcp.audit")
handler = logging.FileHandler("mcp_audit.jsonl")
handler.setFormatter(JsonFormatter())
audit_logger.addHandler(handler)
audit_logger.setLevel(logging.INFO)
def audit_tool_call(func):
"""Decorator to audit all tool invocations.
Wraps every tool call with timing, logging, and error tracking.
Critical for SOC 2, ISO 27001, and GDPR compliance.
"""
@wraps(func)
async def wrapper(name: str, arguments: dict):
start = time.time()
try:
result = await func(name, arguments)
duration = (time.time(). start) * 1000
audit_logger.info(
f"Tool call completed",
extra={
"tool_name": name,
"action": "call",
"arguments": {k: v for k, v in arguments.items()
if k != "confirm"}, # Don't log secrets
"result_status": "success",
"duration_ms": round(duration)
}
)
return result
except Exception as e:
duration = (time.time(). start) * 1000
audit_logger.error(
f"Tool call failed: {e}",
extra={
"tool_name": name,
"action": "call",
"arguments": arguments,
"result_status": "error",
"duration_ms": round(duration)
}
)
raise
return wrapperSimulate a complete AI-driven investigation workflow. Test both the happy path and error scenarios to ensure robust behaviour.
mcp dev src/server.pylist_incidents. verify incidents are returned with IDs and metadataget_incident with a real incident ID. verify alerts and entities are populatedenrich_user with a UPN from the incident. verify risk and sign-in datarun_hunting_query with a valid KQL query. verify structured resultsrun_hunting_query with invalid KQL. verify helpful error responseupdate_incident without confirm=true. verify confirmation promptisolate_device with dry_run=true. verify dry-run responsemcp_audit.jsonl. verify all calls were loggedCreate comprehensive documentation for each tool. Good documentation is essential because AI models use tool descriptions to decide when and how to call your tools.
# Defender XDR MCP Server
## Tools
### list_incidents
List active security incidents from Defender XDR.
- **Input:** severity (optional), status (optional), top (optional, max: 50)
- **Output:** Array of incidents with id, display_name, severity, status
- **Rate Limit:** 30 calls/minute
- **Example:** `{"status": "active", "severity": "high"}`
### get_incident
Get full incident details including alerts, entities, and MITRE techniques.
- **Input:** incident_id (required)
- **Output:** Hierarchical incident data with alerts and entities
- **Rate Limit:** 60 calls/minute
### update_incident
Update incident status, severity, or assignment. Requires confirmation.
- **Input:** incident_id (required), status/severity/assigned_to, confirm (boolean)
- **Safety:** Requires confirm=true to apply changes
- **Rate Limit:** 20 calls/minute
### isolate_device
Isolate a compromised device from the network.
- **Input:** device_id (required), dry_run (default: true), confirm (boolean)
- **Safety:** Defaults to dry_run mode; requires confirm=true for execution
- **Rate Limit:** 5 calls/minutePackage your MCP server for deployment using Docker. Create deployment templates for common scenarios.
# Dockerfile for packaging the XDR MCP server as a container
FROM python:3.12-slim
WORKDIR /app
# Install Python dependencies first (leverages Docker layer caching)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy source code into the container
COPY src/ ./src/
COPY .env.example .
# Launch the MCP server using stdio transport (default)
# Override MCP_TRANSPORT=sse for cloud deployment
CMD ["python", "src/server.py"]# Build the Docker image with tag 'xdr-mcp-server:latest'
docker build -t xdr-mcp-server:latest .
# Run the container with environment variables loaded from .env file
docker run --env-file .env xdr-mcp-server:latest
# Alternative: pass Azure credentials directly as environment variables
# Useful for CI/CD pipelines or when .env file isn't available
docker run \
-e AZURE_TENANT_ID=... \
-e AZURE_CLIENT_ID=... \
-e AZURE_CLIENT_SECRET=... \
xdr-mcp-server:latest| Resource | Description |
|---|---|
| Microsoft Defender XDR API overview | API access patterns and authentication |
| Create an app to access Defender XDR APIs | App registration and permissions |
| Incidents API | List, update, and manage incidents programmatically |
| Advanced hunting API | Run KQL queries through the Defender XDR API |
| MCP Tools | Define and implement tools for AI model interaction |
| MCP Transports | Understand stdio and SSE transport mechanisms |