-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
332 lines (286 loc) · 9.09 KB
/
Copy pathdatabase.py
File metadata and controls
332 lines (286 loc) · 9.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
"""SQLite storage layer for AURA, an AI-powered personal memory system."""
from __future__ import annotations
import json
import re
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Any
import os
import sys
from pathlib import Path
def get_base_path():
if getattr(sys, "frozen", False):
return Path(sys.executable).parent
return Path(__file__).parent
BASE_DIR = get_base_path()
DB_PATH = BASE_DIR / "aura_memory.db"
SEARCH_STOPWORDS = {
"a",
"about",
"all",
"an",
"and",
"anything",
"can",
"did",
"do",
"for",
"give",
"how",
"i",
"list",
"me",
"of",
"on",
"recall",
"search",
"show",
"tell",
"the",
"to",
"what",
"when",
"where",
"who",
}
def _now() -> str:
return datetime.now().isoformat(timespec="seconds")
def _connect() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
return conn
def _row_to_dict(row: sqlite3.Row) -> dict[str, Any]:
return dict(row)
def _memory_from_row(conn: sqlite3.Connection, row: sqlite3.Row) -> dict[str, Any]:
memory = _row_to_dict(row)
memory["tags"] = [
tag_row["tag_name"]
for tag_row in conn.execute(
"""
SELECT t.tag_name
FROM tags AS t
JOIN memory_tags AS mt ON mt.tag_id = t.tag_id
WHERE mt.memory_id = ?
ORDER BY t.tag_name
""",
(memory["memory_id"],),
)
]
return memory
def init_db() -> dict[str, Any]:
"""Create all AURA database tables if they do not already exist."""
with _connect() as conn:
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS memories (
memory_id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT NOT NULL,
timestamp TEXT NOT NULL,
topic TEXT,
category TEXT,
emotion TEXT,
importance INTEGER
);
CREATE TABLE IF NOT EXISTS tags (
tag_id INTEGER PRIMARY KEY AUTOINCREMENT,
tag_name TEXT NOT NULL UNIQUE
);
CREATE TABLE IF NOT EXISTS memory_tags (
memory_id INTEGER NOT NULL,
tag_id INTEGER NOT NULL,
PRIMARY KEY (memory_id, tag_id),
FOREIGN KEY (memory_id) REFERENCES memories(memory_id)
ON DELETE CASCADE,
FOREIGN KEY (tag_id) REFERENCES tags(tag_id)
ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS retrieval_logs (
log_id INTEGER PRIMARY KEY AUTOINCREMENT,
query TEXT NOT NULL,
timestamp TEXT NOT NULL,
results TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS knowledge_triples (
triple_id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT NOT NULL,
relation TEXT NOT NULL,
target TEXT NOT NULL,
memory_id INTEGER,
timestamp TEXT NOT NULL,
FOREIGN KEY (memory_id) REFERENCES memories(memory_id) ON DELETE CASCADE
);
"""
)
return {"success": True, "database": str(DB_PATH)}
def save_memory(
content: str,
topic: str | None,
category: str | None,
emotion: str | None,
importance: int | None,
tags_list: list[str] | tuple[str, ...] | None,
) -> dict[str, Any]:
"""Insert a memory and attach any provided tags."""
timestamp = _now()
normalized_tags = sorted(
{
str(tag).strip()
for tag in (tags_list or [])
if str(tag).strip()
}
)
with _connect() as conn:
cursor = conn.execute(
"""
INSERT INTO memories (
content, timestamp, topic, category, emotion, importance
)
VALUES (?, ?, ?, ?, ?, ?)
""",
(content, timestamp, topic, category, emotion, importance),
)
memory_id = cursor.lastrowid
for tag_name in normalized_tags:
conn.execute(
"INSERT OR IGNORE INTO tags (tag_name) VALUES (?)",
(tag_name,),
)
tag_row = conn.execute(
"SELECT tag_id FROM tags WHERE tag_name = ?",
(tag_name,),
).fetchone()
conn.execute(
"""
INSERT OR IGNORE INTO memory_tags (memory_id, tag_id)
VALUES (?, ?)
""",
(memory_id, tag_row["tag_id"]),
)
row = conn.execute(
"SELECT * FROM memories WHERE memory_id = ?",
(memory_id,),
).fetchone()
return _memory_from_row(conn, row)
def get_all_memories() -> list[dict[str, Any]]:
"""Return every saved memory as a clean dictionary."""
with _connect() as conn:
rows = conn.execute(
"SELECT * FROM memories ORDER BY timestamp DESC, memory_id DESC"
).fetchall()
return [_memory_from_row(conn, row) for row in rows]
def search_memories(query: str) -> list[dict[str, Any]]:
"""Search memory content, topic, and category columns using LIKE."""
terms = [
term
for term in re.findall(r"[A-Za-z0-9_]+", query.lower())
if len(term) > 1 and term not in SEARCH_STOPWORDS
]
if not terms:
terms = [query.strip()]
with _connect() as conn:
where_parts = []
params = []
for term in terms:
like_query = f"%{term}%"
where_parts.append(
"(LOWER(content) LIKE ? OR LOWER(topic) LIKE ? OR LOWER(category) LIKE ?)"
)
params.extend([like_query, like_query, like_query])
rows = conn.execute(
f"""
SELECT *
FROM memories
WHERE {" OR ".join(where_parts)}
ORDER BY timestamp DESC, memory_id DESC
""",
params,
).fetchall()
results = [_memory_from_row(conn, row) for row in rows]
return results
def get_stats() -> dict[str, Any]:
"""Return total memory count, top topic, and most active day."""
with _connect() as conn:
total_count = conn.execute(
"SELECT COUNT(*) AS count FROM memories"
).fetchone()["count"]
top_topic_row = conn.execute(
"""
SELECT topic, COUNT(*) AS count
FROM memories
WHERE topic IS NOT NULL AND TRIM(topic) != ''
GROUP BY topic
ORDER BY count DESC, topic ASC
LIMIT 1
"""
).fetchone()
most_active_day_row = conn.execute(
"""
SELECT DATE(timestamp) AS day, COUNT(*) AS count
FROM memories
GROUP BY day
ORDER BY count DESC, day DESC
LIMIT 1
"""
).fetchone()
return {
"total_count": total_count,
"top_topic": top_topic_row["topic"] if top_topic_row else None,
"top_topic_count": top_topic_row["count"] if top_topic_row else 0,
"most_active_day": most_active_day_row["day"] if most_active_day_row else None,
"most_active_day_count": (
most_active_day_row["count"] if most_active_day_row else 0
),
}
def log_retrieval(query: str, results: Any) -> dict[str, Any]:
"""Log a retrieval query and its results as JSON."""
timestamp = _now()
serialized_results = json.dumps(results, ensure_ascii=True, default=str)
with _connect() as conn:
cursor = conn.execute(
"""
INSERT INTO retrieval_logs (query, timestamp, results)
VALUES (?, ?, ?)
""",
(query, timestamp, serialized_results),
)
log_id = cursor.lastrowid
return {
"log_id": log_id,
"query": query,
"timestamp": timestamp,
"results": results,
}
def save_triple(
source: str,
relation: str,
target: str,
memory_id: int | None = None,
) -> dict[str, Any]:
"""Insert a knowledge triple relationship into the database."""
timestamp = _now()
with _connect() as conn:
cursor = conn.execute(
"""
INSERT INTO knowledge_triples (source, relation, target, memory_id, timestamp)
VALUES (?, ?, ?, ?, ?)
""",
(source.strip(), relation.strip(), target.strip(), memory_id, timestamp),
)
triple_id = cursor.lastrowid
return {
"triple_id": triple_id,
"source": source,
"relation": relation,
"target": target,
"memory_id": memory_id,
"timestamp": timestamp,
}
def get_all_triples() -> list[dict[str, Any]]:
"""Return all saved knowledge triples."""
with _connect() as conn:
rows = conn.execute(
"SELECT * FROM knowledge_triples ORDER BY timestamp DESC, triple_id DESC"
).fetchall()
return [_row_to_dict(row) for row in rows]