Skip to content

Commit 6ca0b53

Browse files
tcconnallyclaude
andcommitted
fix(pool): resolve link/unlink ids on the held connection; make cohere's transaction drop-safe (closes #387, closes #388)
link()/unlink() resolved the source entity via get_entity(), which draws a SECOND pooled connection while the caller already holds one — at >= pool-size concurrent linkers every slot was held by a thread blocking on the nested draw, and calls failed with an opaque Error(None) after the ~30s r2d2 acquire timeout. Ids are now resolved by a resolve_entity_id helper on the caller's own connection (mirroring get_entity's deterministic #342 multi-workspace pick, without the needless body decrypt). While verifying #388 (cohere links RMW): the pair-scan links read already runs INSIDE the BEGIN IMMEDIATE at the top of cohere's write section, so the filed clobber scenario cannot occur — but the raw BEGIN/COMMIT had no rollback guard: any `?` error in between returned the pooled connection with the transaction still open, and the next checkout of that slot failed with "cannot start a transaction within a transaction" forever. cohere now uses the drop-safe Transaction (audited_write_tx); every statement rides tx, errors roll back. Test: many_concurrent_linkers_do_not_starve_the_pool (48 linkers = 3x pool size) — fails on pre-fix main with the exact Error(None) signature (verified in a scratch worktree at 4faaa95), passes in <1s post-fix. Suite 264/264. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
1 parent 2f5490b commit 6ca0b53

1 file changed

Lines changed: 90 additions & 20 deletions

File tree

src/db.rs

Lines changed: 90 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2857,6 +2857,27 @@ impl Database {
28572857
Ok(affected > 0)
28582858
}
28592859

2860+
/// Resolve the id that link()/unlink() operate on, on the CALLER's
2861+
/// connection. Mirrors get_entity's deterministic multi-workspace pick
2862+
/// (#342: global '' row first, then lexicographically-first workspace)
2863+
/// without drawing a SECOND pooled connection while the first is held —
2864+
/// that nested draw let >= pool-size concurrent linkers starve the pool
2865+
/// into r2d2 acquire timeouts (#387) — and without decrypting the body,
2866+
/// which id resolution never needed.
2867+
fn resolve_entity_id(
2868+
conn: &rusqlite::Connection,
2869+
category: &str,
2870+
key: &str,
2871+
) -> Option<String> {
2872+
conn.query_row(
2873+
"SELECT id FROM entities WHERE category = ?1 AND key = ?2 \
2874+
ORDER BY workspace_hash ASC, id ASC LIMIT 1",
2875+
params![category, key],
2876+
|r| r.get(0),
2877+
)
2878+
.ok()
2879+
}
2880+
28602881
/// Create a link from one entity to another.
28612882
pub fn link(
28622883
&self,
@@ -2867,10 +2888,10 @@ impl Database {
28672888
) -> Result<(), Box<dyn std::error::Error>> {
28682889
let conn = self.conn()?;
28692890
// Verify both entities exist (id resolution only — ids are immutable,
2870-
// so these reads don't need the writer lock).
2871-
let from = self
2872-
.get_entity(from_category, from_key)?
2873-
.ok_or("Source entity not found")?;
2891+
// so these reads don't need the writer lock; and they run on THIS
2892+
// connection, see resolve_entity_id / #387).
2893+
let from_id =
2894+
Self::resolve_entity_id(&conn, from_category, from_key).ok_or("Source entity not found")?;
28742895
let _to: String = conn
28752896
.query_row(
28762897
"SELECT id FROM entities WHERE id = ?1",
@@ -2888,7 +2909,7 @@ impl Database {
28882909
let links_str: String = tx
28892910
.query_row(
28902911
"SELECT links FROM entities WHERE id = ?1",
2891-
params![from.id],
2912+
params![from_id],
28922913
|r| r.get(0),
28932914
)
28942915
.unwrap_or_else(|_| "[]".to_string());
@@ -2905,7 +2926,7 @@ impl Database {
29052926
let new_links = serde_json::to_string(&links)?;
29062927
tx.execute(
29072928
"UPDATE entities SET links = ?1, last_accessed_unix_ms = ?2 WHERE id = ?3",
2908-
params![new_links, now_ms(), from.id],
2929+
params![new_links, now_ms(), from_id],
29092930
)?;
29102931
tx.commit()?;
29112932

@@ -2920,8 +2941,8 @@ impl Database {
29202941
to_id: &str,
29212942
) -> Result<(), Box<dyn std::error::Error>> {
29222943
let conn = self.conn()?;
2923-
let from = self
2924-
.get_entity(from_category, from_key)?
2944+
// #387: id resolution on the already-held connection — see link().
2945+
let from_id = Self::resolve_entity_id(&conn, from_category, from_key)
29252946
.ok_or("Source entity not found")?;
29262947

29272948
// #382: same writer-lock discipline as link() — an unlink racing a
@@ -2930,7 +2951,7 @@ impl Database {
29302951
let tx = Self::audited_write_tx(&conn)?;
29312952
let links_str: String = tx.query_row(
29322953
"SELECT links FROM entities WHERE id = ?1",
2933-
params![from.id],
2954+
params![from_id],
29342955
|r| r.get(0),
29352956
)?;
29362957

@@ -2945,7 +2966,7 @@ impl Database {
29452966
let new_links = serde_json::to_string(&links)?;
29462967
tx.execute(
29472968
"UPDATE entities SET links = ?1, last_accessed_unix_ms = ?2 WHERE id = ?3",
2948-
params![new_links, now_ms(), from.id],
2969+
params![new_links, now_ms(), from_id],
29492970
)?;
29502971
tx.commit()?;
29512972

@@ -6287,8 +6308,13 @@ last_accessed: {}
62876308
}
62886309

62896310
// Wrap all mutations in a transaction so partial writes are not left
6290-
// behind if any step fails (cohere runs multiple statements on self.conn).
6291-
conn.execute("BEGIN IMMEDIATE", [])?;
6311+
// behind if any step fails. A drop-safe Transaction, NOT a raw
6312+
// "BEGIN IMMEDIATE" execute: any `?` error between a raw BEGIN and
6313+
// its COMMIT returned this pooled connection with the transaction
6314+
// still open, so the next checkout of that connection failed with
6315+
// "cannot start a transaction within a transaction" — one failed
6316+
// cohere run poisoned that pool slot permanently (#388).
6317+
let tx = Self::audited_write_tx(&conn)?;
62926318
// Default promotion threshold matches the recall path's
62936319
// WORKING_THRESHOLD so buffer→working promotion happens at the same
62946320
// retrieval count everywhere. Previously cohere promoted at a literal
@@ -6300,7 +6326,7 @@ last_accessed: {}
63006326
} else {
63016327
Self::WORKING_THRESHOLD
63026328
};
6303-
promoted = conn.execute(
6329+
promoted = tx.execute(
63046330
"UPDATE entities SET layer = 'working' WHERE layer = 'buffer' AND retrieval_count >= ?1",
63056331
params![promote_threshold],
63066332
)? as i64;
@@ -6310,12 +6336,12 @@ last_accessed: {}
63106336
// so repeated standalone cohere calls can't walk them below the
63116337
// archive threshold (#298). Without the floor, ~59 cohere calls
63126338
// (0.95^59 ≈ 0.048) archived every unboosted entity, verified included.
6313-
let decayed_count: i64 = conn.query_row(
6339+
let decayed_count: i64 = tx.query_row(
63146340
"SELECT COUNT(*) FROM entities WHERE archived = 0 AND decay_score > 0.01",
63156341
[],
63166342
|r| r.get(0),
63176343
)?;
6318-
conn.execute(
6344+
tx.execute(
63196345
&format!(
63206346
"UPDATE entities SET decay_score = \
63216347
MAX(decay_score * 0.95, \
@@ -6349,7 +6375,7 @@ last_accessed: {}
63496375
let mut pending: std::collections::HashMap<String, Vec<MemoryLink>> =
63506376
std::collections::HashMap::new();
63516377
{
6352-
let mut stmt = conn.prepare(
6378+
let mut stmt = tx.prepare(
63536379
"SELECT e1.id, e1.links, e2.id as e2_id, e1.body_json, e2.body_json
63546380
FROM entities e1
63556381
JOIN entities e2 ON e1.category = e2.category AND e1.id < e2.id
@@ -6417,7 +6443,7 @@ last_accessed: {}
64176443
let link_ts = now_ms();
64186444
for (id, links) in &pending {
64196445
let new_links = serde_json::to_string(links)?;
6420-
conn.execute(
6446+
tx.execute(
64216447
"UPDATE entities SET links = ?1, last_accessed_unix_ms = ?2 WHERE id = ?3",
64226448
params![new_links, link_ts, id],
64236449
)?;
@@ -6432,21 +6458,21 @@ last_accessed: {}
64326458
} else {
64336459
Self::ARCHIVE_DECAY_THRESHOLD
64346460
};
6435-
archived = conn.execute(
6461+
archived = tx.execute(
64366462
"UPDATE entities SET archived = 1, archive_reason = 'auto-archived by coherence daemon (decay < threshold)'
64376463
WHERE archived = 0 AND verified = 0 AND decay_score < ?1",
64386464
params![archive_threshold],
64396465
)? as i64;
64406466

64416467
// Clean FTS5 entries for archived entities
64426468
if archived > 0 {
6443-
conn.execute(
6469+
tx.execute(
64446470
"DELETE FROM entities_fts WHERE rowid IN (SELECT rowid FROM entities WHERE archived = 1)",
64456471
[],
64466472
)?;
64476473
}
64486474

6449-
conn.execute("COMMIT", [])?;
6475+
tx.commit()?;
64506476

64516477
Ok(crate::models::CohereReport {
64526478
promoted,
@@ -8123,6 +8149,50 @@ mod tests {
81238149
let _ = fs::remove_file(&path);
81248150
}
81258151

8152+
#[test]
8153+
fn many_concurrent_linkers_do_not_starve_the_pool() {
8154+
// #387: link() resolved the source entity via get_entity(), which
8155+
// draws a SECOND pooled connection while the first is held. With
8156+
// pool max 16, >= 16 concurrent linkers each held one connection and
8157+
// blocked on the nested draw — r2d2 acquire timeout (~30s) and an
8158+
// opaque Error(None) instead of a link. Ids are now resolved on the
8159+
// caller's own connection, so 3x-pool-size linkers must all succeed.
8160+
use std::sync::Arc;
8161+
use std::thread;
8162+
8163+
let (db, path) = temp_db();
8164+
db.remember(&make_entity("e-387", "facts", "src387", r#"{"n":"v0"}"#))
8165+
.unwrap();
8166+
const LINKERS: usize = 48;
8167+
for i in 0..LINKERS {
8168+
db.remember_skip_dedup(&make_entity(
8169+
&format!("t-387-{i}"),
8170+
"facts",
8171+
&format!("tgt387-{i}"),
8172+
r#"{"n":"t"}"#,
8173+
))
8174+
.unwrap();
8175+
}
8176+
8177+
let db = Arc::new(db);
8178+
let handles: Vec<_> = (0..LINKERS)
8179+
.map(|i| {
8180+
let db = Arc::clone(&db);
8181+
thread::spawn(move || {
8182+
db.link("facts", "src387", &format!("t-387-{i}"), "related")
8183+
.expect("link must not starve the pool");
8184+
})
8185+
})
8186+
.collect();
8187+
for h in handles {
8188+
h.join().unwrap();
8189+
}
8190+
8191+
let live = db.get_entity("facts", "src387").unwrap().unwrap();
8192+
assert_eq!(live.links.len(), LINKERS, "every edge must land");
8193+
let _ = fs::remove_file(&path);
8194+
}
8195+
81268196
#[test]
81278197
fn remember_identical_content_creates_no_history() {
81288198
let (db, path) = temp_db();

0 commit comments

Comments
 (0)