-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhyper_sync.py
More file actions
100 lines (82 loc) · 3.86 KB
/
Copy pathhyper_sync.py
File metadata and controls
100 lines (82 loc) · 3.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import os
import hashlib
import json
import requests
import lancedb
import pandas as pd
import time
from datetime import datetime, timedelta
from fastembed import TextEmbedding
from dotenv import load_dotenv
from concurrent.futures import ThreadPoolExecutor
load_dotenv()
# Config
TANA_TOKEN = os.getenv("TANA_TOKEN")
TANA_URL = "http://127.0.0.1:8262/mcp"
WORKSPACE_ID = "--D3QJHnLgSk"
LANCE_DB_PATH = "/Users/krshirkoohi/Documents/AI Workspace/projects/MCP Servers/tana-embeddings/vector_store"
TABLE_NAME = "tana_nodes"
print("Hyper-Sync: Powering up local BGE engine...")
model = TextEmbedding(model_name="BAAI/bge-small-en-v1.5")
def get_node_hash(n):
return hashlib.sha256(f"{n.get('name','')}|{n.get('description','')}".encode()).hexdigest()
def call_tana(method, params):
headers = {"Authorization": f"Bearer {TANA_TOKEN}", "Content-Type": "application/json", "Accept": "application/json, text/event-stream"}
try:
res = requests.post(TANA_URL, headers=headers, json={"jsonrpc": "2.0", "method": method, "params": params, "id": 1}, timeout=30)
return res.json()
except: return None
def fetch_day(date_str):
"""Fetch nodes for a specific date."""
try:
res = call_tana("tools/call", {"name": "search_nodes", "arguments": {"query": {"onDate": date_str}, "limit": 100}})
if res and 'result' in res:
return json.loads(res['result']['content'][0]['text'])
except: pass
return []
def main():
db = lancedb.connect(LANCE_DB_PATH)
table = db.open_table(TABLE_NAME)
print("Loading existing index map...")
existing_df = table.to_pandas()
existing_map = {row['id']: row['hash'] for _, row in existing_df.iterrows()}
print(f"Current Brain Size: {len(existing_map)} nodes.")
# Generate dates for the last 3 years (approx 1100 days)
end_date = datetime.now()
start_date = end_date - timedelta(days=1100)
date_list = [(start_date + timedelta(days=x)).strftime("%Y-%m-%d") for x in range((end_date - start_date).days + 1)]
print(f"[{datetime.now()}] STARTING HYPER-SYNC (Time-Slicing 1100 days)...")
# Batch process dates in groups of 50 to keep things moving
batch_size = 50
for i in range(0, len(date_list), batch_size):
date_batch = date_list[i:i + batch_size]
print(f" [>] Fetching dates: {date_batch[0]} to {date_batch[-1]}...")
all_new_nodes = {}
with ThreadPoolExecutor(max_workers=20) as fetcher:
results = list(fetcher.map(fetch_day, date_batch))
for nodes in results:
for n in nodes:
if n['id'] not in existing_map or existing_map[n['id']] != get_node_hash(n):
all_new_nodes[n['id']] = n
if all_new_nodes:
to_embed = list(all_new_nodes.values())
print(f" [+] Embedding {len(to_embed)} new nodes from this time slice...")
texts = [f"{n.get('name','')}\n{n.get('description','')}" for n in to_embed]
vectors = list(model.embed(texts))
updates = []
for node, vector in zip(to_embed, vectors):
updates.append({
"vector": vector, "id": node['id'], "name": node.get('name',''),
"description": node.get('description',''), "hash": get_node_hash(node),
"last_updated": datetime.now().isoformat()
})
# Atomic save
for u in updates:
if u['id'] in existing_map: table.delete(f"id = '{u['id']}'")
table.add(updates)
# Update memory map
for u in updates: existing_map[u['id']] = u['hash']
print(f" [✓] Current Brain Size: {len(existing_map)} nodes.")
print(f"HYPER-SYNC FINISHED. Total nodes indexed: {len(table.to_pandas())}")
if __name__ == "__main__":
main()