Design and implement a security AI agent that orchestrates multiple MCP servers. Sentinel, Defender XDR, and custom tools. to perform autonomous security investigations, correlate findings, and execute response actions.
In this lab you will build an AI agent that orchestrates multiple MCP servers. Sentinel, Defender XDR, and custom tools. to perform autonomous, end-to-end security investigations. The agent discovers available tools at runtime, plans multi-step investigations using an LLM, correlates findings across data sources, and executes response actions with built-in safety controls.
A SOC team needs an AI agent that can correlate data across Microsoft Sentinel, Defender XDR, and custom internal tools to perform complete security investigations. from initial alert triage through evidence collection to recommended (or automated) response actions.
By connecting multiple MCP servers to a single agent, the team eliminates manual context-switching between portals and enables faster, more thorough investigations that span the entire security stack.
This lab is the culmination of the MCP series. combining everything you have learned into a fully autonomous security agent. By orchestrating multiple MCP servers behind an LLM-powered planning loop, you can handle end-to-end investigations that would otherwise require analysts to manually pivot across multiple tools and portals, dramatically accelerating detection and response.
openai, mcp, and httpx packagesA multi-server MCP architecture connects a single AI agent to multiple MCP servers, each providing specialised tools. The agent discovers all tools across servers and orchestrates them for complex, multi-step security investigations.
Create the project structure for the multi-server security AI agent.
# Create the multi-server agent project
mkdir security-ai-agent && cd security-ai-agent
python -m venv .venv && .venv\Scripts\activate
# Install dependencies:
# mcp[cli] - MCP client SDK for connecting to remote MCP servers
# openai - LLM API for planning and reasoning (GPT-4.1)
# httpx - Async HTTP client for SSE transport connections
# python-dotenv - Load configuration from .env files
pip install "mcp[cli]" openai httpx python-dotenv
# Project structure: separate concerns into focused modules
mkdir -p src tests
touch src/__init__.py src/agent.py src/registry.py
touch src/memory.py src/planner.py src/config.py
touch .env requirements.txtimport os
from dotenv import load_dotenv
load_dotenv()
# MCP Server registry: each entry defines a remote MCP server endpoint
# The agent connects to all servers at startup and discovers their tools
# Tools are namespaced by server name (e.g., sentinel.run_kql_query)
MCP_SERVERS = {
"sentinel": {
"url": os.environ.get("SENTINEL_MCP_URL", "http://localhost:8000/sse"),
"token": os.environ.get("SENTINEL_MCP_TOKEN", ""),
"description": "Sentinel KQL queries and incident retrieval"
},
"xdr": {
"url": os.environ.get("XDR_MCP_URL", "http://localhost:8001/sse"),
"token": os.environ.get("XDR_MCP_TOKEN", ""),
"description": "Defender XDR incident management and hunting"
}
}
# LLM configuration for the planning/reasoning engine
OPENAI_MODEL = os.environ.get("OPENAI_MODEL", "gpt-4.1")
# Safety limit: max tool calls per investigation to prevent runaway agents
MAX_INVESTIGATION_STEPS = int(os.environ.get("MAX_STEPS", "20"))
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "")The tool registry discovers tools from all connected MCP servers and builds a unified catalog with server-qualified names (e.g., sentinel.run_kql_query, xdr.get_incident).
import json
from mcp import ClientSession
from mcp.client.sse import sse_client
class ToolRegistry:
"""Unified tool registry across multiple MCP servers.
Connects to each server, discovers its tools, and creates
a merged catalog with server-qualified names.
Example: sentinel server's 'run_kql_query' becomes 'sentinel.run_kql_query'.
"""
def __init__(self):
self.tools: dict[str, dict] = {} # qualified_name โ tool metadata
self.sessions: dict[str, ClientSession] = {} # server_name โ MCP session
self._tool_to_server: dict[str, str] = {} # qualified_name โ server_name
async def connect_server(self, name: str, url: str, token: str = ""):
"""Connect to an MCP server via SSE and register its tools.
Performs the MCP handshake, discovers tools, and adds them
to the unified registry with server-qualified names.
"""
# Establish SSE connection with optional Bearer token auth
headers = {}
if token:
headers["Authorization"] = f"Bearer {token}"
# Connect to the SSE endpoint and initialize the MCP session
transport = sse_client(url, headers=headers)
read, write = await transport.__aenter__()
session = ClientSession(read, write)
await session.initialize() # MCP protocol handshake
self.sessions[name] = session
# Discover all tools from this server and register with qualified names
# e.g., server "sentinel" + tool "run_kql_query" = "sentinel.run_kql_query"
tools_response = await session.list_tools()
for tool in tools_response.tools:
qualified_name = f"{name}.{tool.name}"
self.tools[qualified_name] = {
"name": tool.name,
"server": name,
# Prefix description with server name for AI disambiguation
"description": f"[{name}] {tool.description}",
"input_schema": tool.inputSchema
}
self._tool_to_server[qualified_name] = name
print(f"Connected to {name}: {len(tools_response.tools)} tools")
async def execute(self, qualified_name: str, arguments: dict):
"""Execute a tool on the appropriate MCP server.
Routes the call to the correct server based on the qualified name prefix.
Example: 'sentinel.run_kql_query' โ calls run_kql_query on sentinel server.
"""
server_name = self._tool_to_server.get(qualified_name)
if not server_name:
return {"error": f"Unknown tool: {qualified_name}"}
tool_name = qualified_name.split(".", 1)[1]
session = self.sessions[server_name]
result = await session.call_tool(tool_name, arguments)
return result.content[0].text if result.content else ""
def get_openai_tools(self) -> list[dict]:
"""Format all discovered tools for OpenAI function calling.
Converts MCP tool schemas to the OpenAI tools format
so the LLM can select and invoke tools by qualified name.
"""
return [
{
"type": "function",
"function": {
"name": qname,
"description": info["description"],
"parameters": info["input_schema"]
}
}
for qname, info in self.tools.items()
]
async def disconnect_all(self):
"""Clean up all server connections gracefully."""
for session in self.sessions.values():
await session.__aexit__(None, None, None)The memory system tracks the investigation objective, steps taken, tool results, findings, and current hypotheses. It is included in every LLM prompt to maintain context.
from datetime import datetime, timezone
from dataclasses import dataclass, field
@dataclass
class MemoryEntry:
"""A single step in the investigation timeline."""
timestamp: str
type: str # observation, finding, hypothesis, action
tool_used: str # Qualified tool name (e.g., sentinel.run_kql_query)
summary: str # Brief description of what was learned
raw_data: str = "" # Truncated tool output for context window efficiency
@dataclass
class InvestigationMemory:
"""Tracks the full state of an ongoing investigation.
Included in every LLM prompt to maintain context across steps.
Stores: objective, steps taken, key findings, and affected entities.
"""
objective: str # What we're investigating
entries: list[MemoryEntry] = field(default_factory=list) # Timeline of steps
findings: list[str] = field(default_factory=list) # Confirmed findings
affected_entities: list[str] = field(default_factory=list) # Users, devices, IPs
def add_step(self, tool_name: str, arguments: dict,
result: str, summary: str, entry_type: str = "observation"):
"""Record a tool invocation step in the investigation timeline."""
self.entries.append(MemoryEntry(
timestamp=datetime.now(timezone.utc).isoformat(),
type=entry_type,
tool_used=tool_name,
summary=summary
))
def add_finding(self, finding: str):
self.findings.append(finding)
def add_entity(self, entity: str):
if entity not in self.affected_entities:
self.affected_entities.append(entity)
def get_context(self, max_entries: int = 15) -> str:
"""Build context string for the LLM prompt.
Includes: objective, recent steps, key findings, affected entities.
Truncates to max_entries to fit within the LLM context window.
This is injected into every planning prompt to maintain state.
"""
recent = self.entries[-max_entries:]
steps = "\n".join([
f" [{e.timestamp}] ({e.type}) {e.tool_used}: {e.summary}"
for e in recent
])
return f"""INVESTIGATION OBJECTIVE: {self.objective}
STEPS COMPLETED ({len(self.entries)}):
{steps}
KEY FINDINGS: {'; '.join(self.findings) if self.findings else 'None yet'}
AFFECTED ENTITIES: {', '.join(self.affected_entities) if self.affected_entities else 'None identified yet'}
"""The system prompt guides the agent’s behaviour during investigations. Include guardrails: never take destructive actions without approval, verify findings with multiple sources, and document every step.
from openai import AsyncOpenAI
from config import OPENAI_MODEL, OPENAI_API_KEY
# Initialize the OpenAI client for LLM-powered planning
client = AsyncOpenAI(api_key=OPENAI_API_KEY)
# System prompt: the agent's operating manual
# Defines investigation methodology, safety rules, and tool naming
# This is the most critical piece - it determines agent behavior
SYSTEM_PROMPT = """You are a Security AI Agent with access to multiple
Microsoft security products via MCP tools. Your role is to investigate
security incidents methodically and thoroughly.
INVESTIGATION METHODOLOGY:
1. Start by understanding the scope. list recent incidents or run a broad query
2. Gather details on the most critical finding
3. Enrich affected entities (users, devices, IPs) across all available sources
4. Correlate findings across Sentinel AND Defender XDR for complete picture
5. Assess severity and business impact
6. Recommend containment and remediation actions
7. Generate a comprehensive investigation report
SAFETY RULES:
- NEVER execute destructive actions (isolate, disable) without explicit approval
- Always verify critical findings with at least 2 data sources
- If confidence is below 70%, escalate to a human analyst
- Document every step taken with reasoning
- Use read-only tools before write tools
- When unsure which tool to use, check available tools first
TOOL NAMING: Tools are prefixed with their server name:
- sentinel.*. Sentinel KQL queries, table listing, incidents
- xdr.*. Defender XDR incidents, hunting, device actions, entity enrichment
"""
async def plan_next_step(context: str, tools: list[dict]):
"""Ask the LLM to decide the next investigation step.
The LLM receives: system prompt + investigation context + available tools.
It returns either a tool call (continue investigating) or a text
response (investigation complete, deliver findings).
Temperature=0.1 ensures consistent, deterministic reasoning.
"""
response = await client.chat.completions.create(
model=OPENAI_MODEL,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": context}
],
tools=tools,
tool_choice="auto",
temperature=0.1 # Low temperature for consistent reasoning
)
return response.choices[0]The agent loop is the core: receive objective → plan next step → execute tool → store result → repeat until investigation is complete or the step budget is exhausted.
import json
import asyncio
from registry import ToolRegistry
from memory import InvestigationMemory
from planner import plan_next_step
from config import MCP_SERVERS, MAX_INVESTIGATION_STEPS
class SecurityAgent:
"""Multi-server security AI agent.
Orchestrates multiple MCP servers (Sentinel, XDR, Entra)
using an LLM-powered planning loop for autonomous investigations.
"""
def __init__(self):
self.registry = ToolRegistry()
async def connect(self):
"""Connect to all configured MCP servers and discover tools."""
for name, config in MCP_SERVERS.items():
try:
await self.registry.connect_server(
name, config["url"], config.get("token", "")
)
print(f"โ Connected to {name}")
except Exception as e:
print(f"โ Failed to connect to {name}: {e}")
async def investigate(self, objective: str,
max_steps: int = MAX_INVESTIGATION_STEPS) -> dict:
"""Run an autonomous, multi-step investigation.
Core agent loop: plan โ execute โ observe โ repeat.
Continues until the LLM signals completion or the step budget runs out.
Returns: dict with objective, steps taken, findings, and entities.
"""
memory = InvestigationMemory(objective=objective)
print(f"\n๐ Starting investigation: {objective}\n")
for step_num in range(1, max_steps + 1):
print(f"--- Step {step_num}/{max_steps} ---")
# Ask the LLM to plan the next step based on current context
# The LLM sees: system prompt, investigation memory, available tools
context = memory.get_context()
tools = self.registry.get_openai_tools()
choice = await plan_next_step(context, tools)
# Check if the LLM decided the investigation is complete
# finish_reason="stop" means no more tool calls needed
if choice.finish_reason == "stop":
print("โ Investigation complete")
break
# Execute the tool call(s) the LLM selected
# The LLM may request multiple parallel tool calls
if choice.message.tool_calls:
for tc in choice.message.tool_calls:
tool_name = tc.function.name
arguments = json.loads(tc.function.arguments)
print(f" โ Calling {tool_name}")
try:
result = await self.registry.execute(tool_name, arguments)
# Summarise the result for memory
summary = self._summarise_result(result)
memory.add_step(tool_name, arguments, result, summary)
print(f" โ {summary[:100]}")
except Exception as e:
memory.add_step(tool_name, arguments, "", f"Error: {e}")
print(f" โ Error: {e}")
return {
"objective": objective,
"steps_taken": len(memory.entries),
"findings": memory.findings,
"affected_entities": memory.affected_entities,
"complete": choice.finish_reason == "stop"
}
def _summarise_result(self, result: str, max_len: int = 200) -> str:
"""Create a brief summary of a tool result."""
try:
data = json.loads(result)
if "incident_count" in data:
return f"Found {data['incident_count']} incidents"
if "row_count" in data:
return f"Query returned {data['row_count']} rows"
if "status" in data:
return f"Status: {data['status']}"
except (json.JSONDecodeError, TypeError):
pass
return result[:max_len]
async def disconnect(self):
await self.registry.disconnect_all()
# Entry point
async def main():
agent = SecurityAgent()
await agent.connect()
try:
result = await agent.investigate(
"Investigate the most critical active incident in our environment. "
"Identify affected users and devices, determine the attack vector, "
"and recommend containment actions."
)
print(f"\n๐ Investigation Summary:")
print(json.dumps(result, indent=2))
finally:
await agent.disconnect()
if __name__ == "__main__":
asyncio.run(main())max_steps=20). This prevents runaway investigations that consume excessive tokens and API calls. Start with 20 and adjust based on typical investigation complexity.Classify tools by risk level and enforce approval workflows for high-impact actions.
# Tool risk classification - determines safety controls per tool
# LOW: Read-only, no side effects โ execute automatically
# MEDIUM: State-changing but reversible โ log and execute
# HIGH: Destructive or impactful โ require human approval
# IMPORTANT: Unknown tools default to HIGH for safety!
TOOL_RISK_LEVELS = {
# LOW: Read-only tools - safe for autonomous execution
"sentinel.run_kql_query": "low",
"sentinel.list_sentinel_tables": "low",
"sentinel.get_recent_incidents": "low",
"xdr.list_incidents": "low",
"xdr.get_incident": "low",
"xdr.run_hunting_query": "low",
"xdr.enrich_user": "low",
# MEDIUM: Reversible state changes - log but auto-execute
"xdr.update_incident": "medium",
# HIGH: Destructive actions - ALWAYS require human approval
"xdr.isolate_device": "high",
"xdr.release_device": "high",
}
async def execute_with_safety(registry, tool_name, arguments):
"""Execute a tool with risk-appropriate safety controls.
Low-risk: auto-execute. Medium: log and execute. High: human approval.
Unknown tools default to HIGH risk - never trust unclassified tools.
"""
risk = TOOL_RISK_LEVELS.get(tool_name, "high") # Default to high for safety
if risk == "low":
return await registry.execute(tool_name, arguments)
elif risk == "medium":
print(f" โ Medium-risk action: {tool_name}")
return await registry.execute(tool_name, arguments)
elif risk == "high":
print(f"\n ๐ HIGH-RISK ACTION REQUIRES APPROVAL")
print(f" Tool: {tool_name}")
print(f" Args: {json.dumps(arguments, indent=2)}")
approval = input(" Approve? (yes/no): ").strip().lower()
if approval == "yes":
return await registry.execute(tool_name, arguments)
else:
return json.dumps({
"status": "blocked",
"message": "Action blocked by human operator."
})When the agent needs to enrich multiple entities, call all enrichment tools concurrently rather than sequentially to dramatically speed up investigations.
import asyncio
async def execute_parallel(registry, tool_calls: list) -> dict:
"""Execute multiple independent tool calls concurrently.
Dramatically speeds up investigations when enriching multiple entities.
Each call runs independently - one failure doesn't block others.
Returns: dict mapping tool_name โ {status, result/error}.
"""
async def _safe_execute(tc):
"""Wrapper that catches errors per-tool instead of failing all."""
try:
result = await registry.execute(tc["name"], tc["arguments"])
return {"tool": tc["name"], "status": "success", "result": result}
except Exception as e:
return {"tool": tc["name"], "status": "error", "error": str(e)}
# asyncio.gather runs all tasks concurrently and waits for all to finish
tasks = [_safe_execute(tc) for tc in tool_calls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {r["tool"]: r for r in results}
# Example: enrich 3 users simultaneously instead of sequentially
# This turns a 3-second serial operation into a ~1-second parallel one
parallel_calls = [
{"name": "xdr.enrich_user", "arguments": {"user_principal_name": "alice@contoso.com"}},
{"name": "xdr.enrich_user", "arguments": {"user_principal_name": "bob@contoso.com"}},
{"name": "xdr.enrich_user", "arguments": {"user_principal_name": "carol@contoso.com"}},
]
results = await execute_parallel(registry, parallel_calls)Pre-built templates accelerate investigations by providing a structured starting point with the right objective, context, and preferred tool sequence.
# Pre-built investigation templates for common security scenarios
# Each template provides: a structured objective, preferred starting tools,
# and sample KQL queries that the LLM can use as starting points
# Usage: agent.investigate(INVESTIGATION_TEMPLATES["phishing"]["objective"])
INVESTIGATION_TEMPLATES = {
# Template 1: Phishing campaign investigation
"phishing": {
"objective": "Investigate a phishing campaign targeting our organization. "
"Identify affected users, compromised credentials, and "
"malicious infrastructure.",
"initial_tools": ["xdr.run_hunting_query", "sentinel.run_kql_query"],
"suggested_queries": [
"EmailEvents | where ThreatTypes has 'Phish' | take 50",
"SigninLogs | where ResultType != '0' | summarize by UserPrincipalName"
]
},
# Template 2: Malware infection response
"malware": {
"objective": "Investigate a malware infection. Identify patient zero, "
"lateral movement, and all compromised systems.",
"initial_tools": ["xdr.list_incidents", "xdr.run_hunting_query"],
"suggested_queries": [
"DeviceProcessEvents | where FileName in ('powershell.exe','cmd.exe')"
]
},
# Template 3: Compromised identity investigation
"identity_compromise": {
"objective": "Investigate a compromised identity. Determine scope of "
"access, data exposure, and persistence mechanisms.",
"initial_tools": ["xdr.enrich_user", "sentinel.run_kql_query"],
"suggested_queries": [
"SigninLogs | where RiskLevelDuringSignIn != 'none'"
]
}
}
# Usage:
# result = await agent.investigate(
# INVESTIGATION_TEMPLATES["phishing"]["objective"]
# )After the investigation completes, generate a structured report with the executive summary, timeline, findings, and recommended actions.
from planner import client, OPENAI_MODEL
async def generate_report(memory: InvestigationMemory) -> str:
"""Generate a structured investigation report using the LLM.
Takes the complete investigation memory and produces a formatted
report with executive summary, timeline, findings, MITRE mapping,
and prioritized remediation steps.
Returns: Markdown-formatted investigation report string.
"""
report_prompt = f"""Generate a detailed security investigation report:
OBJECTIVE: {memory.objective}
STEPS TAKEN: {len(memory.entries)}
FINDINGS: {json.dumps(memory.findings)}
AFFECTED ENTITIES: {json.dumps(memory.affected_entities)}
FULL INVESTIGATION LOG:
{memory.get_context(max_entries=50)}
Format the report with these sections:
1. Executive Summary (2-3 sentences for leadership)
2. Investigation Timeline (chronological steps with timestamps)
3. Key Findings (with severity and confidence level)
4. Affected Entities (users, devices, IPs with context)
5. MITRE ATT&CK Mapping (techniques observed)
6. Recommended Actions (prioritized remediation steps)
7. Evidence References (tool outputs supporting findings)
"""
response = await client.chat.completions.create(
model=OPENAI_MODEL,
messages=[{"role": "user", "content": report_prompt}],
temperature=0.2
)
return response.choices[0].message.contentRun the agent against your environment and evaluate its investigation quality.
python src/agent.py --template phishingDeploy the agent as an Azure Container App and establish ongoing operational processes.
# Deploy the multi-server agent as a Container App
# It connects to the MCP servers via their SSE endpoints
# All MCP server URLs and secrets are passed as environment variables
az containerapp create \
--name security-ai-agent \
--resource-group rg-mcp-servers \
--environment mcp-environment \
--image mcpserversacr.azurecr.io/security-agent:v1 \
--target-port 8000 \
--min-replicas 1 \
--env-vars \
SENTINEL_MCP_URL=https://sentinel-mcp.azurecontainerapps.io/sse \
XDR_MCP_URL=https://xdr-mcp.azurecontainerapps.io/sse \
OPENAI_API_KEY=secretref:openai-key| Resource | Description |
|---|---|
| MCP Architecture | Multi-server client-host-server architecture patterns |
| MCP Sampling | Enable AI model inference from within MCP servers |
| Azure OpenAI Service | Foundation models for building security AI agents |
| Semantic Kernel overview | AI orchestration framework for multi-tool agents |
| MCP Tools | Tool discovery and invocation across multiple servers |
| Function calling with Azure OpenAI | Connect AI models to external tools and APIs |