Skip to content
This repository was archived by the owner on Mar 7, 2026. It is now read-only.

Commit e0d08e8

Browse files
mosiddiCopilot
andcommitted
feat: add Microsoft Entra Agent ID adapter
Bridge AgentMesh DID identity with Entra Agent ID for enterprise Zero Trust governance. Includes EntraAgentIdentity, EntraAgentRegistry with sponsor accountability, lifecycle management, scope verification, blueprint templates, and audit logging. 23/23 tests passing. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 13a793d commit e0d08e8

3 files changed

Lines changed: 591 additions & 0 deletions

File tree

src/agentmesh/identity/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@
66
- Human sponsor accountability
77
- Ephemeral credentials (15-min TTL)
88
- SPIFFE/SVID workload identity
9+
- Microsoft Entra Agent ID integration
910
"""
1011

1112
from .agent_id import AgentIdentity, AgentDID
13+
from .entra import EntraAgentIdentity, EntraAgentRegistry, EntraAgentBlueprint
1214
from .credentials import Credential, CredentialManager
1315
from .delegation import ScopeChain, DelegationLink, UserContext
1416
from .sponsor import HumanSponsor
@@ -50,4 +52,7 @@
5052
"KeyStore",
5153
"SoftwareKeyStore",
5254
"PKCS11KeyStore",
55+
"EntraAgentIdentity",
56+
"EntraAgentRegistry",
57+
"EntraAgentBlueprint",
5358
]

src/agentmesh/identity/entra.py

Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
"""
2+
Microsoft Entra Agent ID adapter for AgentMesh.
3+
4+
Bridges AgentMesh DID-based identity with Microsoft Entra Agent ID,
5+
enabling enterprise Zero Trust governance for AI agents via Entra's
6+
identity lifecycle, Conditional Access, and sponsor accountability.
7+
8+
Requires: azure-identity (optional dependency)
9+
"""
10+
11+
from __future__ import annotations
12+
13+
from datetime import datetime, timezone
14+
from enum import Enum
15+
from typing import Any, Optional
16+
17+
from pydantic import BaseModel, Field
18+
19+
20+
class EntraAgentStatus(str, Enum):
21+
"""Entra Agent ID lifecycle states."""
22+
23+
ACTIVE = "active"
24+
SUSPENDED = "suspended"
25+
DISABLED = "disabled"
26+
DELETED = "deleted"
27+
28+
29+
class EntraAgentBlueprint(BaseModel):
30+
"""Template for creating Entra agent identities."""
31+
32+
display_name: str = Field(..., description="Human-readable agent name")
33+
description: str = Field(default="", description="Agent purpose description")
34+
default_capabilities: list[str] = Field(default_factory=list)
35+
require_sponsor: bool = Field(default=True)
36+
max_delegation_depth: int = Field(default=2)
37+
conditional_access_policy: Optional[str] = Field(
38+
default=None, description="Conditional Access policy ID to apply"
39+
)
40+
41+
42+
class EntraAgentIdentity(BaseModel):
43+
"""
44+
Microsoft Entra Agent ID binding for an AgentMesh agent.
45+
46+
Maps an AgentMesh DID identity to an Entra Agent ID, enabling:
47+
- Enterprise identity lifecycle management
48+
- Sponsor accountability (human owner per agent)
49+
- Conditional Access policies for agents
50+
- Unified audit trail across Entra + AgentMesh
51+
"""
52+
53+
# AgentMesh identity
54+
agent_did: str = Field(..., description="AgentMesh DID (did:mesh:...)")
55+
agent_name: str = Field(..., description="Agent display name")
56+
57+
# Entra identity
58+
entra_object_id: str = Field(..., description="Entra Agent ID object ID")
59+
entra_app_id: str = Field(default="", description="Entra application/client ID")
60+
tenant_id: str = Field(..., description="Azure AD tenant ID")
61+
62+
# Sponsor (human accountability)
63+
sponsor_email: str = Field(..., description="Human sponsor email (Entra UPN)")
64+
sponsor_object_id: str = Field(
65+
default="", description="Sponsor's Entra object ID"
66+
)
67+
68+
# Lifecycle
69+
status: EntraAgentStatus = Field(default=EntraAgentStatus.ACTIVE)
70+
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
71+
last_activity: Optional[datetime] = Field(default=None)
72+
73+
# Capabilities & access
74+
capabilities: list[str] = Field(default_factory=list)
75+
conditional_access_policy: Optional[str] = Field(default=None)
76+
scopes: list[str] = Field(
77+
default_factory=list,
78+
description="Entra API permissions/scopes granted to this agent",
79+
)
80+
81+
# Blueprint reference
82+
blueprint_name: Optional[str] = Field(default=None)
83+
84+
@classmethod
85+
def create(
86+
cls,
87+
agent_did: str,
88+
agent_name: str,
89+
entra_object_id: str,
90+
tenant_id: str,
91+
sponsor_email: str,
92+
capabilities: Optional[list[str]] = None,
93+
scopes: Optional[list[str]] = None,
94+
blueprint: Optional[EntraAgentBlueprint] = None,
95+
) -> EntraAgentIdentity:
96+
"""Create an Entra Agent ID binding for an AgentMesh agent."""
97+
caps = capabilities or (blueprint.default_capabilities if blueprint else [])
98+
ca_policy = blueprint.conditional_access_policy if blueprint else None
99+
100+
return cls(
101+
agent_did=agent_did,
102+
agent_name=agent_name,
103+
entra_object_id=entra_object_id,
104+
tenant_id=tenant_id,
105+
sponsor_email=sponsor_email,
106+
capabilities=caps,
107+
scopes=scopes or [],
108+
conditional_access_policy=ca_policy,
109+
blueprint_name=blueprint.display_name if blueprint else None,
110+
)
111+
112+
def record_activity(self) -> None:
113+
"""Update last activity timestamp."""
114+
self.last_activity = datetime.now(timezone.utc)
115+
116+
def suspend(self, reason: str = "") -> None:
117+
"""Suspend agent identity (reversible)."""
118+
self.status = EntraAgentStatus.SUSPENDED
119+
120+
def reactivate(self) -> None:
121+
"""Reactivate a suspended agent."""
122+
if self.status == EntraAgentStatus.SUSPENDED:
123+
self.status = EntraAgentStatus.ACTIVE
124+
125+
def disable(self) -> None:
126+
"""Disable agent identity (requires admin to re-enable)."""
127+
self.status = EntraAgentStatus.DISABLED
128+
129+
def is_active(self) -> bool:
130+
"""Check if the agent identity is active."""
131+
return self.status == EntraAgentStatus.ACTIVE
132+
133+
def has_scope(self, scope: str) -> bool:
134+
"""Check if agent has a specific Entra API scope."""
135+
return scope in self.scopes or f"{scope}/*" in self.scopes
136+
137+
def to_audit_record(self) -> dict[str, Any]:
138+
"""Export identity state for audit logging."""
139+
return {
140+
"agent_did": self.agent_did,
141+
"entra_object_id": self.entra_object_id,
142+
"tenant_id": self.tenant_id,
143+
"sponsor_email": self.sponsor_email,
144+
"status": self.status.value,
145+
"capabilities": self.capabilities,
146+
"scopes": self.scopes,
147+
"last_activity": (
148+
self.last_activity.isoformat() if self.last_activity else None
149+
),
150+
}
151+
152+
153+
class EntraAgentRegistry:
154+
"""
155+
Registry mapping AgentMesh DIDs to Microsoft Entra Agent IDs.
156+
157+
Provides enterprise identity management for AI agents, bridging
158+
AgentMesh's cryptographic DID identity with Entra's lifecycle
159+
governance, Conditional Access, and sponsor accountability.
160+
"""
161+
162+
def __init__(self, tenant_id: str) -> None:
163+
self.tenant_id = tenant_id
164+
self._agents: dict[str, EntraAgentIdentity] = {} # keyed by agent_did
165+
self._by_entra_id: dict[str, str] = {} # entra_object_id -> agent_did
166+
self._blueprints: dict[str, EntraAgentBlueprint] = {}
167+
self._audit_log: list[dict[str, Any]] = []
168+
169+
def register_blueprint(self, blueprint: EntraAgentBlueprint) -> None:
170+
"""Register an agent blueprint for consistent identity creation."""
171+
self._blueprints[blueprint.display_name] = blueprint
172+
173+
def register(
174+
self,
175+
agent_did: str,
176+
agent_name: str,
177+
entra_object_id: str,
178+
sponsor_email: str,
179+
capabilities: Optional[list[str]] = None,
180+
scopes: Optional[list[str]] = None,
181+
blueprint_name: Optional[str] = None,
182+
) -> EntraAgentIdentity:
183+
"""Register an agent with both AgentMesh DID and Entra Agent ID."""
184+
blueprint = self._blueprints.get(blueprint_name) if blueprint_name else None
185+
186+
if blueprint and blueprint.require_sponsor and not sponsor_email:
187+
raise ValueError(
188+
f"Blueprint '{blueprint_name}' requires a sponsor email"
189+
)
190+
191+
identity = EntraAgentIdentity.create(
192+
agent_did=agent_did,
193+
agent_name=agent_name,
194+
entra_object_id=entra_object_id,
195+
tenant_id=self.tenant_id,
196+
sponsor_email=sponsor_email,
197+
capabilities=capabilities,
198+
scopes=scopes,
199+
blueprint=blueprint,
200+
)
201+
202+
self._agents[agent_did] = identity
203+
self._by_entra_id[entra_object_id] = agent_did
204+
self._log_event("register", identity)
205+
return identity
206+
207+
def get_by_did(self, agent_did: str) -> Optional[EntraAgentIdentity]:
208+
"""Look up agent by AgentMesh DID."""
209+
return self._agents.get(agent_did)
210+
211+
def get_by_entra_id(self, entra_object_id: str) -> Optional[EntraAgentIdentity]:
212+
"""Look up agent by Entra object ID."""
213+
did = self._by_entra_id.get(entra_object_id)
214+
return self._agents.get(did) if did else None
215+
216+
def suspend_agent(self, agent_did: str, reason: str = "") -> bool:
217+
"""Suspend an agent (e.g., on anomaly detection)."""
218+
identity = self._agents.get(agent_did)
219+
if identity and identity.is_active():
220+
identity.suspend(reason)
221+
self._log_event("suspend", identity, {"reason": reason})
222+
return True
223+
return False
224+
225+
def reactivate_agent(self, agent_did: str) -> bool:
226+
"""Reactivate a suspended agent."""
227+
identity = self._agents.get(agent_did)
228+
if identity and identity.status == EntraAgentStatus.SUSPENDED:
229+
identity.reactivate()
230+
self._log_event("reactivate", identity)
231+
return True
232+
return False
233+
234+
def disable_agent(self, agent_did: str) -> bool:
235+
"""Disable an agent (admin action)."""
236+
identity = self._agents.get(agent_did)
237+
if identity:
238+
identity.disable()
239+
self._log_event("disable", identity)
240+
return True
241+
return False
242+
243+
def verify_access(
244+
self, agent_did: str, required_scope: str
245+
) -> tuple[bool, str]:
246+
"""
247+
Verify an agent has the required Entra scope and is active.
248+
249+
Returns (allowed, reason) tuple.
250+
"""
251+
identity = self._agents.get(agent_did)
252+
if not identity:
253+
return False, "Agent not registered in Entra registry"
254+
if not identity.is_active():
255+
return False, f"Agent status: {identity.status.value}"
256+
if required_scope and not identity.has_scope(required_scope):
257+
return False, f"Agent lacks scope: {required_scope}"
258+
259+
identity.record_activity()
260+
return True, "Access granted"
261+
262+
def list_agents(
263+
self, status: Optional[EntraAgentStatus] = None
264+
) -> list[EntraAgentIdentity]:
265+
"""List all registered agents, optionally filtered by status."""
266+
agents = list(self._agents.values())
267+
if status:
268+
agents = [a for a in agents if a.status == status]
269+
return agents
270+
271+
def get_sponsor_agents(self, sponsor_email: str) -> list[EntraAgentIdentity]:
272+
"""Get all agents owned by a specific sponsor."""
273+
return [
274+
a for a in self._agents.values() if a.sponsor_email == sponsor_email
275+
]
276+
277+
def get_audit_log(self) -> list[dict[str, Any]]:
278+
"""Return the full audit log."""
279+
return list(self._audit_log)
280+
281+
def _log_event(
282+
self,
283+
event_type: str,
284+
identity: EntraAgentIdentity,
285+
extra: Optional[dict[str, Any]] = None,
286+
) -> None:
287+
"""Log an identity lifecycle event."""
288+
entry = {
289+
"timestamp": datetime.now(timezone.utc).isoformat(),
290+
"event_type": event_type,
291+
"agent_did": identity.agent_did,
292+
"entra_object_id": identity.entra_object_id,
293+
"sponsor_email": identity.sponsor_email,
294+
"status": identity.status.value,
295+
}
296+
if extra:
297+
entry.update(extra)
298+
self._audit_log.append(entry)

0 commit comments

Comments
 (0)