-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathturbo_index.py
More file actions
107 lines (86 loc) · 3.73 KB
/
Copy pathturbo_index.py
File metadata and controls
107 lines (86 loc) · 3.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import os
import hashlib
import json
import requests
import lancedb
import pandas as pd
import time
from datetime import datetime
from fastembed import TextEmbedding
from dotenv import load_dotenv
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
load_dotenv()
# Config
TANA_TOKEN = os.getenv("TANA_TOKEN")
TANA_URL = "http://127.0.0.1:8262/mcp"
WORKSPACE_ID = "--D3QJHnLgSk"
LANCE_DB_PATH = "/Users/krshirkoohi/Documents/AI Workspace/projects/MCP Servers/tana-embeddings/vector_store"
TABLE_NAME = "tana_nodes"
# Setup Local BGE Model
print("Turbo Mode: Warming up Local BGE...")
model = TextEmbedding(model_name="BAAI/bge-small-en-v1.5")
def get_node_hash(n):
return hashlib.sha256(f"{n.get('name','')}|{n.get('description','')}".encode()).hexdigest()
def call_tana(method, params):
headers = {"Authorization": f"Bearer {TANA_TOKEN}", "Content-Type": "application/json", "Accept": "application/json, text/event-stream"}
try:
res = requests.post(TANA_URL, headers=headers, json={"jsonrpc": "2.0", "method": method, "params": params, "id": 1}, timeout=60)
return res.json()
except: return None
def get_tags():
res = call_tana("tools/call", {"name": "list_tags", "arguments": {"workspaceId": WORKSPACE_ID}})
return json.loads(res['result']['content'][0]['text'])
def fetch_tag_nodes(tag):
print(f" Fetching nodes for #{tag['name']}...")
try:
res = call_tana("tools/call", {"name": "search_nodes", "arguments": {"query": {"hasType": tag['id']}, "limit": 1000}})
return json.loads(res['result']['content'][0]['text'])
except: return []
def main():
db = lancedb.connect(LANCE_DB_PATH)
table = db.open_table(TABLE_NAME)
existing_df = table.to_pandas()
existing_map = {row['id']: row['hash'] for _, row in existing_df.iterrows()}
print(f"STARTING TURBO INDEX. Current Size: {len(existing_map)} nodes.")
tags = get_tags()
# Fast Parallel Fetching
print(f"Parallel Fetching across {len(tags)} tags...")
all_nodes = {}
with ThreadPoolExecutor(max_workers=8) as executor:
results = list(executor.map(fetch_tag_nodes, tags))
for nodes in results:
for n in nodes: all_nodes[n['id']] = n
print(f"Fetch Complete. Found {len(all_nodes)} total unique nodes.")
# Deduplicate
to_embed = []
for node_id, node in all_nodes.items():
h = get_node_hash(node)
if node_id in existing_map and existing_map[node_id] == h:
continue
to_embed.append(node)
if not to_embed:
print("Brain is already fully saturated. SUCCESS.")
return
print(f"TURBO EMBEDDING: Processing {len(to_embed)} nodes using local CPU cores...")
# Process in large chunks of 500 for maximum throughput
chunk_size = 500
for i in range(0, len(to_embed), chunk_size):
chunk = to_embed[i:i + chunk_size]
texts = [f"{n.get('name','')}\n{n.get('description','')}" for n in chunk]
print(f" [>] Processing Chunk {i//chunk_size + 1}...")
vectors = list(model.embed(texts))
updates = []
for node, vector in zip(chunk, vectors):
updates.append({
"vector": vector, "id": node['id'], "name": node.get('name',''),
"description": node.get('description',''), "hash": get_node_hash(node),
"last_updated": datetime.now().isoformat()
})
# Batch Save
for u in updates:
if u['id'] in existing_map: table.delete(f"id = '{u['id']}'")
table.add(updates)
print(f" [✓] Secured {i + len(chunk)} nodes total.")
print(f"TURBO INDEX FINISHED. Final Brain Size: {len(table.to_pandas())}")
if __name__ == "__main__":
main()