Skip to content

Commit eadcde4

Browse files
tcconnallyclaude
andcommitted
fix(web,grpc): share one pooled Database, drop global mutexes, paginate /api/graph (#402)
The MCP HTTP/SSE transport got this right in #210/#217: Arc<Database> (Sync, internally r2d2-pooled) + spawn_blocking, no lock. The web dashboard and gRPC server never caught up: - Web: Arc<Mutex<Database>> — every handler held a std::Mutex across its full DB call inside an async handler (all dashboard requests single-lane AND a blocked tokio worker), and main.rs opened a SECOND Database (a second 16-conn pool) on the same file just for the web. - gRPC: same Arc<Mutex<Database>>; with_db ran closures synchronously in async fns — one slow query stalled a tonic runtime worker while holding the global lock. - GET /api/graph full-scanned entities and returned ALL nodes+edges unpaginated: tens of MB of JSON at 100k entities, per dashboard render, under that same global mutex. Fixes, mirroring transport.rs exactly: - main.rs wraps the one configured Database in Arc BEFORE the web block; web + transport + stdio (mcp::run_server now takes Arc<Database>) all share it. The second Database::open / second pool / second set_encryption are gone — one process, one pool. - web: WebState.db is Arc<Database>; every handler runs its DB work via a blocking_db helper (spawn_blocking). No handler needed the old mutex for atomicity: all six endpoints are read-only; list_entities' twin list+count reads tolerate a racing write (documented inline). - grpc: MnemeGrpcServer.db is Arc<Database>; with_db is async and runs the closure on the blocking pool. sanitize_error (#354) now runs inside the closure (Box<dyn Error> is not Send). Typed Status passthrough (get_entity not_found) preserved. serve()/stub signatures track the change. - /api/graph: limit (default 500, clamp 1..=5000) + offset params; deterministic newest-first ordering; response adds total_nodes / returned_nodes / truncated / limit / offset. Dashboard graph tab shows a truncation note; nodes/edges keys unchanged for compatibility. Edges dangling outside the returned node set are dropped (previously the unscoped path emitted edges to archived/deleted targets). Tests (proven RED pre-fix where feasible): - graph_default_is_capped_and_reports_totals: 510 seeded → pre-fix returned 510 nodes (unbounded), now 500 + total_nodes=510 + truncated. - graph_honors_limit_param_and_reports_totals: pre-fix ?limit=2 was ignored (5 nodes); also proves offset pages are disjoint + covering. - graph_under_cap_is_not_truncated: totals/flag correctness. - db_closures_overlap_instead_of_serializing (web) and rpc_db_closures_overlap_instead_of_serializing (grpc): two in-flight DB closures rendezvous on a barrier INSIDE the closure — impossible under the old serialize-on-mutex design (would deadlock; timeout fails the test), deterministic proof of overlap. - concurrent_requests_across_endpoints_all_succeed: full-stack burst against one shared Database. - web_state_is_send_sync_and_lock_free: type-level guard against reintroducing Arc<Mutex<Database>>. Closes #402 Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
1 parent 2a559bd commit eadcde4

7 files changed

Lines changed: 547 additions & 167 deletions

File tree

CHANGELOG.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,21 @@ All notable changes to Perseus Vault (formerly Mimir/Mneme) are documented here.
3333
(20 hits) recall 5.1ms → 0.08ms (~64x); dense-match queries pay a small
3434
fixed probe cost (common term ~33k hits: +~0.5ms, the intrinsic FTS5
3535
prefix-doclist materialization — they were and remain O(corpus)).
36+
- Web dashboard and gRPC no longer wrap the pooled `Database` in a global
37+
`std::Mutex` (#402): both surfaces now share the SAME `Arc<Database>` as the
38+
MCP transport (one process, one connection pool — the dashboard previously
39+
opened a second 16-conn pool on the same file) and run every DB call on the
40+
blocking thread pool via `tokio::task::spawn_blocking`, mirroring
41+
transport.rs (#210/#217). Dashboard requests and gRPC RPCs now execute in
42+
parallel instead of single-lane, and no longer stall async runtime workers.
43+
- `GET /api/graph` is paginated (#402): `limit` (default 500, max 5000) and
44+
`offset` query params; response adds `total_nodes` / `returned_nodes` /
45+
`truncated` so clients can tell a page from the whole graph (previously it
46+
full-scanned and returned every node+edge unpaginated — tens of MB of JSON
47+
at 100k entities, per dashboard render). The dashboard's graph tab shows a
48+
truncation note when capped. Edges dangling outside the returned node set
49+
are dropped (previously the unscoped path emitted edges to archived/deleted
50+
targets that the renderer couldn't resolve).
3651

3752
## [2.14.0] - 2026-07-02
3853

src/db.rs

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4468,20 +4468,43 @@ impl Database {
44684468
/// scope are dropped rather than pointing at a node the caller never
44694469
/// receives (the dashboard's graph tab leaked cross-workspace
44704470
/// nodes/edges before this).
4471+
///
4472+
/// Paginated (#402): `limit`/`offset` page over the node set in a
4473+
/// deterministic order (newest first, id as tiebreaker), and the returned
4474+
/// `total_nodes` is the full COUNT(*) under the same filter so callers can
4475+
/// report truncation. `limit <= 0` means "no limit" (SQLite `LIMIT -1`) —
4476+
/// network-facing callers must pass an explicit cap. Previously this
4477+
/// full-scanned and returned every node and edge unpaginated: tens of MB
4478+
/// of JSON at 100k entities, per dashboard render.
44714479
pub fn get_entity_graph(
44724480
&self,
44734481
workspace_hash: Option<&str>,
4474-
) -> Result<(Vec<GraphNode>, Vec<GraphEdge>), Box<dyn std::error::Error>> {
4482+
limit: i64,
4483+
offset: i64,
4484+
) -> Result<(Vec<GraphNode>, Vec<GraphEdge>, i64), Box<dyn std::error::Error>> {
44754485
let conn = self.conn()?;
4476-
let (sql, scoped) = match workspace_hash.filter(|ws| !ws.is_empty()) {
4477-
Some(_) => (
4478-
"SELECT id, category, key, links FROM entities WHERE archived = 0 AND workspace_hash = ?1",
4479-
true,
4480-
),
4481-
None => (
4482-
"SELECT id, category, key, links FROM entities WHERE archived = 0",
4483-
false,
4484-
),
4486+
let limit = if limit <= 0 { -1 } else { limit };
4487+
let offset = offset.max(0);
4488+
let scoped = workspace_hash.filter(|ws| !ws.is_empty()).is_some();
4489+
let (count_sql, sql) = if scoped {
4490+
(
4491+
"SELECT COUNT(*) FROM entities WHERE archived = 0 AND workspace_hash = ?1",
4492+
"SELECT id, category, key, links FROM entities
4493+
WHERE archived = 0 AND workspace_hash = ?1
4494+
ORDER BY created_at_unix_ms DESC, id ASC LIMIT ?2 OFFSET ?3",
4495+
)
4496+
} else {
4497+
(
4498+
"SELECT COUNT(*) FROM entities WHERE archived = 0",
4499+
"SELECT id, category, key, links FROM entities
4500+
WHERE archived = 0
4501+
ORDER BY created_at_unix_ms DESC, id ASC LIMIT ?1 OFFSET ?2",
4502+
)
4503+
};
4504+
let total_nodes: i64 = if scoped {
4505+
conn.query_row(count_sql, params![workspace_hash.unwrap()], |r| r.get(0))?
4506+
} else {
4507+
conn.query_row(count_sql, [], |r| r.get(0))?
44854508
};
44864509
let mut stmt = conn.prepare(sql)?;
44874510
let map_row = |row: &rusqlite::Row| {
@@ -4493,10 +4516,10 @@ impl Database {
44934516
Ok((id, category, key, links))
44944517
};
44954518
let rows: Vec<(String, String, String, Vec<MemoryLink>)> = if scoped {
4496-
stmt.query_map(params![workspace_hash.unwrap()], map_row)?
4519+
stmt.query_map(params![workspace_hash.unwrap(), limit, offset], map_row)?
44974520
.collect::<rusqlite::Result<Vec<_>>>()?
44984521
} else {
4499-
stmt.query_map([], map_row)?
4522+
stmt.query_map(params![limit, offset], map_row)?
45004523
.collect::<rusqlite::Result<Vec<_>>>()?
45014524
};
45024525

@@ -4520,14 +4543,14 @@ impl Database {
45204543
});
45214544
}
45224545
}
4523-
if scoped {
4524-
// Drop edges pointing outside the scoped node set: the target
4525-
// entity is in a different workspace, so the caller never
4526-
// receives that node and a dangling edge would be meaningless
4527-
// (or, worse, leak the existence/id of a cross-workspace entity).
4528-
edges.retain(|e| seen_ids.contains(&e.to));
4529-
}
4530-
Ok((nodes, edges))
4546+
// Drop edges pointing outside the returned node set. For workspace
4547+
// scoping that avoids dangling references and cross-workspace
4548+
// existence leaks (as before); with pagination it also keeps each
4549+
// page self-contained — an edge to a node the caller never received
4550+
// is unrenderable. (Side effect vs pre-#402: the unscoped path no
4551+
// longer emits dangling edges to archived/deleted targets either.)
4552+
edges.retain(|e| seen_ids.contains(&e.to));
4553+
Ok((nodes, edges, total_nodes))
45314554
}
45324555

45334556
/// Score an entity's quality (0.0–1.0). Agents rate memories as useful/wrong.

src/grpc.rs

Lines changed: 86 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,32 @@
1111
pub mod grpc {
1212
tonic::include_proto!("mneme.v1");
1313

14-
use std::sync::{Arc, Mutex};
14+
use std::sync::Arc;
1515
use tonic::{Request, Response, Status};
1616

1717
use crate::db::Database;
1818
use crate::models;
1919

20+
// #402: no Mutex — `Database` is Sync (internally r2d2-pooled, see the
21+
// #210 comment in transport.rs), and this is the SAME `Arc<Database>` the
22+
// other surfaces use: one process, one pool. Concurrent RPCs each check
23+
// out their own pooled connection instead of serializing on a global lock.
2024
pub struct MnemeGrpcServer {
21-
db: Arc<Mutex<Database>>,
25+
db: Arc<Database>,
2226
}
2327

2428
impl MnemeGrpcServer {
25-
pub fn new(db: Arc<Mutex<Database>>) -> Self {
29+
pub fn new(db: Arc<Database>) -> Self {
2630
Self { db }
2731
}
2832
}
2933

30-
// Helper to run DB operations inside the mutex.
34+
// Helper to run DB operations on the blocking thread pool.
35+
//
36+
// #402: DB work is synchronous rusqlite, so it must not run inline in an
37+
// async fn — that stalls a tonic/tokio runtime worker for the duration of
38+
// the query. Mirror the MCP HTTP transport (#217): `spawn_blocking` keeps
39+
// the async workers free, and with no mutex the closures run in parallel.
3140
//
3241
// Error hygiene (#354): this module is a documented external wire contract,
3342
// so internal error text (rusqlite constraint/column names, file paths)
@@ -36,12 +45,22 @@ pub mod grpc {
3645
// return a generic INTERNAL to the client. Handlers that raise a *typed*
3746
// Status inside the closure (e.g. get_entity's not_found) get it passed
3847
// through unchanged instead of being flattened into INTERNAL.
39-
fn with_db<T>(
48+
// (sanitize_error runs INSIDE the blocking closure: `Box<dyn Error>` is
49+
// not Send, so it must be mapped to a `Status` before crossing back.)
50+
async fn with_db<T>(
4051
server: &MnemeGrpcServer,
41-
f: impl FnOnce(&Database) -> Result<T, Box<dyn std::error::Error>>,
42-
) -> Result<T, Status> {
43-
let db = server.db.lock().map_err(|_| Status::internal("lock poisoned"))?;
44-
f(&db).map_err(sanitize_error)
52+
f: impl FnOnce(&Database) -> Result<T, Box<dyn std::error::Error>> + Send + 'static,
53+
) -> Result<T, Status>
54+
where
55+
T: Send + 'static,
56+
{
57+
let db = Arc::clone(&server.db);
58+
tokio::task::spawn_blocking(move || f(&db).map_err(sanitize_error))
59+
.await
60+
.map_err(|e| {
61+
eprintln!("mimir grpc: blocking task join error: {e}");
62+
Status::internal("internal error")
63+
})?
4564
}
4665

4766
/// Map a handler error to the client-facing Status: intentional `Status`
@@ -61,7 +80,7 @@ pub mod grpc {
6180
// ── CRUD ──
6281
async fn remember(&self, req: Request<RememberRequest>) -> Result<Response<RememberResponse>, Status> {
6382
let r = req.into_inner();
64-
with_db(self, |db| {
83+
with_db(self, move |db| {
6584
// Same id convention as the MCP surface (handle_remember):
6685
// db.remember does NOT generate ids — an empty id here would be
6786
// inserted verbatim, producing an entity unreachable by id.
@@ -100,11 +119,12 @@ pub mod grpc {
100119
let (id, action) = db.remember(&entity)?;
101120
Ok(Response::new(RememberResponse { id, action, category: entity.category, key: entity.key }))
102121
})
122+
.await
103123
}
104124

105125
async fn recall(&self, req: Request<RecallRequest>) -> Result<Response<RecallResponse>, Status> {
106126
let r = req.into_inner();
107-
with_db(self, |db| {
127+
with_db(self, move |db| {
108128
let params = models::RecallParams {
109129
query: r.query,
110130
category: r.category,
@@ -136,24 +156,27 @@ pub mod grpc {
136156
let total = items.len() as i64;
137157
Ok(Response::new(RecallResponse { items, total }))
138158
})
159+
.await
139160
}
140161

141162
async fn get_entity(&self, req: Request<GetEntityRequest>) -> Result<Response<EntityMessage>, Status> {
142163
let r = req.into_inner();
143-
with_db(self, |db| {
164+
with_db(self, move |db| {
144165
let entity = db.get_entity_by_id_public(&r.id)
145166
.map_err(|_| Status::not_found("entity not found"))?
146167
.ok_or_else(|| Status::not_found("entity not found"))?;
147168
Ok(Response::new(entity_to_proto(&entity)))
148169
})
170+
.await
149171
}
150172

151173
async fn forget(&self, req: Request<ForgetRequest>) -> Result<Response<ForgetResponse>, Status> {
152174
let r = req.into_inner();
153-
with_db(self, |db| {
175+
with_db(self, move |db| {
154176
db.forget(&r.category, &r.key, &r.reason)?;
155177
Ok(Response::new(ForgetResponse { ok: true }))
156178
})
179+
.await
157180
}
158181

159182
// ── Graph ──
@@ -170,7 +193,7 @@ pub mod grpc {
170193
// ── Journal ──
171194
async fn journal(&self, req: Request<JournalRequest>) -> Result<Response<JournalEvent>, Status> {
172195
let r = req.into_inner();
173-
with_db(self, |db| {
196+
with_db(self, move |db| {
174197
let event = models::JournalEvent {
175198
id: format!("jrn-{}", uuid::Uuid::new_v4().to_string().replace('-', "").chars().take(12).collect::<String>()),
176199
event_type: r.event_type,
@@ -186,6 +209,7 @@ pub mod grpc {
186209
db.journal(&event)?;
187210
Ok(Response::new(journal_event_to_proto(&event)))
188211
})
212+
.await
189213
}
190214

191215
async fn timeline(&self, _req: Request<TimelineRequest>) -> Result<Response<TimelineResponse>, Status> {
@@ -195,7 +219,7 @@ pub mod grpc {
195219
// ── State ──
196220
async fn state_set(&self, req: Request<StateSetRequest>) -> Result<Response<StateSetResponse>, Status> {
197221
let r = req.into_inner();
198-
with_db(self, |db| {
222+
with_db(self, move |db| {
199223
let now = crate::db::now_ms();
200224
let entry = models::StateEntry {
201225
key: r.key,
@@ -207,6 +231,7 @@ pub mod grpc {
207231
db.state_set(&entry)?;
208232
Ok(Response::new(StateSetResponse { ok: true }))
209233
})
234+
.await
210235
}
211236
async fn state_get(&self, _req: Request<StateGetRequest>) -> Result<Response<StateEntry>, Status> {
212237
Err(Status::unimplemented("state_get"))
@@ -220,12 +245,13 @@ pub mod grpc {
220245

221246
// ── Ops ──
222247
async fn health(&self, _req: Request<HealthRequest>) -> Result<Response<HealthResponse>, Status> {
223-
with_db(self, |db| {
248+
with_db(self, move |db| {
224249
Ok(Response::new(HealthResponse { healthy: db.health_check() }))
225250
})
251+
.await
226252
}
227253
async fn stats(&self, _req: Request<StatsRequest>) -> Result<Response<StatsResponse>, Status> {
228-
with_db(self, |db| {
254+
with_db(self, move |db| {
229255
let s = db.stats()?;
230256
Ok(Response::new(StatsResponse {
231257
total_entities: s.total_entities,
@@ -234,19 +260,22 @@ pub mod grpc {
234260
db_size_bytes: s.db_file_size_bytes as i64,
235261
}))
236262
})
263+
.await
237264
}
238265
async fn context(&self, req: Request<ContextRequest>) -> Result<Response<ContextResponse>, Status> {
239266
let r = req.into_inner();
240-
with_db(self, |db| {
267+
with_db(self, move |db| {
241268
let ctx = db.context(&r.categories, r.limit, r.workspace_hash.as_deref())?;
242269
Ok(Response::new(ContextResponse { context: ctx }))
243270
})
271+
.await
244272
}
245273
async fn workspace_list(&self, _req: Request<WorkspaceListRequest>) -> Result<Response<WorkspaceListResponse>, Status> {
246-
with_db(self, |db| {
274+
with_db(self, move |db| {
247275
let cats = db.workspace_list_categories()?;
248276
Ok(Response::new(WorkspaceListResponse { categories: cats }))
249277
})
278+
.await
250279
}
251280

252281
// ── AI ──
@@ -310,7 +339,7 @@ pub mod grpc {
310339
/// Start the gRPC server on the given address. Runs in the current thread
311340
/// and blocks until shutdown. For background usage, spawn via std::thread::spawn.
312341
pub async fn serve(
313-
db: Arc<Mutex<Database>>,
342+
db: Arc<Database>,
314343
addr: std::net::SocketAddr,
315344
) -> Result<(), Box<dyn std::error::Error>> {
316345
use tonic::transport::Server;
@@ -333,7 +362,7 @@ pub mod grpc {
333362
.join(format!("mimir-test-grpc-{}.db", uuid::Uuid::new_v4()));
334363
let path_str = path.to_str().unwrap().to_string();
335364
let db = Database::open(&path_str).expect("open test db");
336-
(MnemeGrpcServer::new(Arc::new(Mutex::new(db))), path_str)
365+
(MnemeGrpcServer::new(Arc::new(db)), path_str)
337366
}
338367

339368
fn remember_req(key: &str) -> RememberRequest {
@@ -457,6 +486,38 @@ pub mod grpc {
457486
let _ = std::fs::remove_file(&path);
458487
}
459488

489+
/// Deterministic overlap proof (#402): two in-flight `with_db`
490+
/// closures rendezvous on a 2-party barrier INSIDE their DB closure —
491+
/// only possible if both run simultaneously. Under the old
492+
/// `Arc<Mutex<Database>>` (closure executed synchronously while
493+
/// holding the global lock) this rendezvous would deadlock; the
494+
/// timeout turns that into a failure.
495+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
496+
async fn rpc_db_closures_overlap_instead_of_serializing() {
497+
let (server, path) = test_server();
498+
let barrier = Arc::new(std::sync::Barrier::new(2));
499+
let (b1, b2) = (barrier.clone(), barrier.clone());
500+
let fut = async {
501+
let a = with_db(&server, move |db| {
502+
let healthy = db.health_check();
503+
b1.wait();
504+
Ok(healthy)
505+
});
506+
let b = with_db(&server, move |db| {
507+
let healthy = db.health_check();
508+
b2.wait();
509+
Ok(healthy)
510+
});
511+
tokio::join!(a, b)
512+
};
513+
let (ra, rb) = tokio::time::timeout(std::time::Duration::from_secs(10), fut)
514+
.await
515+
.expect("concurrent RPC DB closures must overlap, not serialize");
516+
assert!(ra.unwrap());
517+
assert!(rb.unwrap());
518+
let _ = std::fs::remove_file(&path);
519+
}
520+
460521
#[tokio::test]
461522
async fn health_and_stats_respond() {
462523
let (server, path) = test_server();
@@ -480,15 +541,16 @@ pub mod grpc {
480541
// Non-grpc fallback
481542
#[cfg(not(feature = "grpc"))]
482543
pub mod grpc {
483-
use std::sync::{Arc, Mutex};
544+
use std::sync::Arc;
484545
use crate::db::Database;
485546

486547
/// Stub module — gRPC is compiled out.
487548
// No in-crate caller in the default (non-grpc) build; kept so callers behind
488549
// `--features grpc` get a clear error instead of a missing symbol.
550+
// (#402: signature tracks the real serve() — shared Arc<Database>, no Mutex.)
489551
#[allow(dead_code)]
490552
pub async fn serve(
491-
_db: Arc<Mutex<Database>>,
553+
_db: Arc<Database>,
492554
_addr: std::net::SocketAddr,
493555
) -> Result<(), Box<dyn std::error::Error>> {
494556
Err("gRPC transport not compiled in. Rebuild with: cargo build --features grpc".into())
@@ -505,7 +567,7 @@ pub mod grpc {
505567
let path_str = path.to_str().unwrap().to_string();
506568
let db = Database::open(&path_str).expect("open test db");
507569
let err = serve(
508-
Arc::new(Mutex::new(db)),
570+
Arc::new(db),
509571
"127.0.0.1:0".parse().unwrap(),
510572
)
511573
.await

0 commit comments

Comments
 (0)