-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvalidate.py
More file actions
150 lines (120 loc) · 5.43 KB
/
Copy pathvalidate.py
File metadata and controls
150 lines (120 loc) · 5.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import os
import json
def validate_templates():
base_dir = os.path.dirname(os.path.abspath(__file__))
registry_path = os.path.join(base_dir, "registry.json")
if not os.path.exists(registry_path):
print(f"Error: registry.json not found at {registry_path}")
return False
with open(registry_path, "r", encoding="utf-8") as f:
registry = json.load(f)
templates = registry.get("templates", [])
print(f"Loaded {len(templates)} templates from registry.json.\nStarting validation...\n")
errors = 0
warnings = 0
for t in templates:
t_id = t.get("id")
t_name = t.get("name")
t_path = t.get("path")
print(f"[{t_id}] Validating '{t_name}' at '{t_path}'...")
# 1. Check path exists
full_path = os.path.join(base_dir, t_path.replace("/", os.sep))
if not os.path.exists(full_path):
print(f" -> ERROR: Directory does not exist: {t_path}")
errors += 1
continue
# 2. Check swarm.json exists
swarm_json_path = os.path.join(full_path, "swarm.json")
if not os.path.exists(swarm_json_path):
print(f" -> ERROR: swarm.json is missing in {t_path}")
errors += 1
continue
# 3. Read and validate swarm.json
try:
with open(swarm_json_path, "r", encoding="utf-8") as sf:
swarm = json.load(sf)
except Exception as e:
print(f" -> ERROR: Failed to parse swarm.json in {t_path}: {e}")
errors += 1
continue
# 4. Check mcps.json reference
required_mcps = swarm.get("required_mcps")
if required_mcps:
mcps_file_path = os.path.join(full_path, required_mcps)
if not os.path.exists(mcps_file_path):
print(f" -> WARNING: swarm.json references '{required_mcps}' but the file does not exist in {t_path}")
warnings += 1
# 5. Validate Roster agents
roster = swarm.get("roster", [])
if not roster:
print(f" -> WARNING: Swarm roster is empty in {t_path}")
warnings += 1
for agent_ref in roster:
agent_path = agent_ref.get("path")
if not agent_path:
print(f" -> ERROR: Agent reference has no 'path' in swarm.json of {t_path}")
errors += 1
continue
full_agent_path = os.path.join(full_path, agent_path.replace("/", os.sep))
if not os.path.exists(full_agent_path):
print(f" -> ERROR: Referenced agent file does not exist: {t_path}/{agent_path}")
errors += 1
else:
# Validate agent JSON parsing
try:
with open(full_agent_path, "r", encoding="utf-8") as af:
json.load(af)
except Exception as e:
print(f" -> ERROR: Failed to parse agent JSON {t_path}/{agent_path}: {e}")
errors += 1
# 6. Validate Workflows
global_workflows = swarm.get("global_workflows", [])
for wf_path in global_workflows:
full_wf_path = os.path.join(full_path, wf_path.replace("/", os.sep))
if not os.path.exists(full_wf_path):
print(f" -> ERROR: Referenced workflow file does not exist: {t_path}/{wf_path}")
errors += 1
print("\n" + "="*40)
print(f"Validation Finished: {errors} Errors, {warnings} Warnings.")
print("="*40)
return errors == 0
def validate_mcps():
base_dir = os.path.dirname(os.path.abspath(__file__))
mcp_registry_path = os.path.join(base_dir, "mcp_registry.json")
if not os.path.exists(mcp_registry_path):
print(f"Error: mcp_registry.json not found at {mcp_registry_path}")
return False
with open(mcp_registry_path, "r", encoding="utf-8") as f:
registry = json.load(f)
connectors = registry.get("connectors", [])
print(f"\nLoaded {len(connectors)} connectors from mcp_registry.json.\nStarting validation...\n")
errors = 0
warnings = 0
for c in connectors:
c_id = c.get("id")
c_name = c.get("name")
c_path = c.get("path")
print(f"[{c_id}] Validating MCP '{c_name}' at '{c_path}'...")
full_path = os.path.join(base_dir, c_path.replace("/", os.sep))
if not os.path.exists(full_path):
print(f" -> ERROR: Directory does not exist: {c_path}")
errors += 1
continue
mcps_json_path = os.path.join(full_path, "mcps.json")
if not os.path.exists(mcps_json_path):
print(f" -> ERROR: mcps.json is missing in {c_path}")
errors += 1
server_py_path = os.path.join(full_path, "server.py")
if not os.path.exists(server_py_path):
print(f" -> ERROR: server.py is missing in {c_path}")
errors += 1
print("\n" + "="*40)
print(f"MCP Validation Finished: {errors} Errors, {warnings} Warnings.")
print("="*40)
return errors == 0
if __name__ == "__main__":
templates_valid = validate_templates()
mcps_valid = validate_mcps()
import sys
if not templates_valid or not mcps_valid:
sys.exit(1)