Skip to content

Commit 4faaa95

Browse files
tcconnallytcconnallyclaude
authored
fix(links): writer-lock the links read-modify-write; remember unions instead of clobbering (closes #382) (#384)
link/unlink read the entity's links JSON on the bare pooled connection, modified it in memory, and wrote it back — two concurrent link() calls could read the same base array and the second write silently dropped the first edge. Both now run their read-modify-write under audited_write_tx (#380). Worse than filed: remember's full-row UPDATE (links = ?10) writes the caller's array wholesale, and the MCP remember tool always constructs the entity with links: [] — so ANY re-remember of a linked entity deterministically erased its graph edges (remember -> link -> remember lost the edge single-threaded). Owner decision: remember now UNIONS caller links with stored links (dedup by target_id) inside the writer transaction; unlink is the only removal path. Tests (all fail 5/5 on pre-fix main, verified in a scratch worktree at 13be00c): remember_reassert_preserves_stored_links (deterministic MCP-path erase), remember_unions_caller_links_with_stored (+ unlink still removes), concurrent_links_and_remembers_do_not_lose_edges (12 parallel linkers racing a re-assert writer). Co-authored-by: tcconnally <hermes@perseus.observer> Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
1 parent 054c28f commit 4faaa95

1 file changed

Lines changed: 186 additions & 5 deletions

File tree

src/db.rs

Lines changed: 186 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1859,6 +1859,30 @@ impl Database {
18591859
Self::snapshot_live_row_to_history(&tx, &history_id, now, &id)?;
18601860
}
18611861

1862+
// #382: remember must not clobber the stored link graph. Callers
1863+
// construct the Entity without its stored links (the MCP remember
1864+
// tool always passes []), so a wholesale `links = caller` erased
1865+
// every edge on each re-assert — and even a caller that read
1866+
// first could lose a concurrently added edge. Union stored ∪
1867+
// caller under the writer lock; unlink is the only removal path.
1868+
let links_json = {
1869+
let stored: String = tx
1870+
.query_row(
1871+
"SELECT links FROM entities WHERE id = ?1",
1872+
params![id],
1873+
|r| r.get(0),
1874+
)
1875+
.unwrap_or_else(|_| "[]".to_string());
1876+
let mut merged: Vec<MemoryLink> =
1877+
serde_json::from_str(&stored).unwrap_or_default();
1878+
for l in &entity.links {
1879+
if !merged.iter().any(|m| m.target_id == l.target_id) {
1880+
merged.push(l.clone());
1881+
}
1882+
}
1883+
serde_json::to_string(&merged)?
1884+
};
1885+
18621886
tx.execute(
18631887
"UPDATE entities SET
18641888
body_json = ?1, status = ?2, type = ?3, tags = ?4,
@@ -2815,7 +2839,8 @@ impl Database {
28152839
relationship: &str,
28162840
) -> Result<(), Box<dyn std::error::Error>> {
28172841
let conn = self.conn()?;
2818-
// Verify both entities exist
2842+
// Verify both entities exist (id resolution only — ids are immutable,
2843+
// so these reads don't need the writer lock).
28192844
let from = self
28202845
.get_entity(from_category, from_key)?
28212846
.ok_or("Source entity not found")?;
@@ -2827,8 +2852,13 @@ impl Database {
28272852
)
28282853
.map_err(|_| "Target entity not found")?;
28292854

2855+
// #382: the links read-modify-write must hold the writer lock — two
2856+
// concurrent link() calls reading the same base array on the bare
2857+
// pooled connection would both write back, silently dropping the
2858+
// first edge. See audited_write_tx (#380).
2859+
let tx = Self::audited_write_tx(&conn)?;
28302860
// Get existing links (default to empty array if missing)
2831-
let links_str: String = conn
2861+
let links_str: String = tx
28322862
.query_row(
28332863
"SELECT links FROM entities WHERE id = ?1",
28342864
params![from.id],
@@ -2846,10 +2876,11 @@ impl Database {
28462876
});
28472877
}
28482878
let new_links = serde_json::to_string(&links)?;
2849-
conn.execute(
2879+
tx.execute(
28502880
"UPDATE entities SET links = ?1, last_accessed_unix_ms = ?2 WHERE id = ?3",
28512881
params![new_links, now_ms(), from.id],
28522882
)?;
2883+
tx.commit()?;
28532884

28542885
Ok(())
28552886
}
@@ -2866,7 +2897,11 @@ impl Database {
28662897
.get_entity(from_category, from_key)?
28672898
.ok_or("Source entity not found")?;
28682899

2869-
let links_str: String = conn.query_row(
2900+
// #382: same writer-lock discipline as link() — an unlink racing a
2901+
// link on the same entity must not clobber the other's edit. The
2902+
// no-op early return drops the tx (rollback): nothing written.
2903+
let tx = Self::audited_write_tx(&conn)?;
2904+
let links_str: String = tx.query_row(
28702905
"SELECT links FROM entities WHERE id = ?1",
28712906
params![from.id],
28722907
|r| r.get(0),
@@ -2881,10 +2916,11 @@ impl Database {
28812916
}
28822917

28832918
let new_links = serde_json::to_string(&links)?;
2884-
conn.execute(
2919+
tx.execute(
28852920
"UPDATE entities SET links = ?1, last_accessed_unix_ms = ?2 WHERE id = ?3",
28862921
params![new_links, now_ms(), from.id],
28872922
)?;
2923+
tx.commit()?;
28882924

28892925
Ok(())
28902926
}
@@ -7908,6 +7944,151 @@ mod tests {
79087944
let _ = fs::remove_file(&path);
79097945
}
79107946

7947+
#[test]
7948+
fn remember_reassert_preserves_stored_links() {
7949+
// #382 (deterministic half): callers construct the Entity without its
7950+
// stored links — the MCP remember tool always passes [] — so the
7951+
// wholesale `links = caller` UPDATE erased every edge on each
7952+
// re-assert: remember → link → remember lost the link even
7953+
// single-threaded.
7954+
let (db, path) = temp_db();
7955+
db.remember(&make_entity("e-382a", "facts", "src382a", r#"{"n":"v1"}"#))
7956+
.unwrap();
7957+
db.remember(&make_entity("e-382t", "facts", "tgt382", r#"{"n":"t"}"#))
7958+
.unwrap();
7959+
db.link("facts", "src382a", "e-382t", "related").unwrap();
7960+
7961+
// Identical-body re-assert (the common MCP path) …
7962+
db.remember(&make_entity("ignored", "facts", "src382a", r#"{"n":"v1"}"#))
7963+
.unwrap();
7964+
let e = db.get_entity("facts", "src382a").unwrap().unwrap();
7965+
assert_eq!(
7966+
e.links.len(),
7967+
1,
7968+
"identical re-assert must not erase stored links"
7969+
);
7970+
7971+
// … and a content-changing one.
7972+
db.remember(&make_entity("ignored", "facts", "src382a", r#"{"n":"v2"}"#))
7973+
.unwrap();
7974+
let e = db.get_entity("facts", "src382a").unwrap().unwrap();
7975+
assert_eq!(
7976+
e.links.len(),
7977+
1,
7978+
"content-changing re-assert must not erase stored links"
7979+
);
7980+
assert_eq!(e.links[0].target_id, "e-382t");
7981+
let _ = fs::remove_file(&path);
7982+
}
7983+
7984+
#[test]
7985+
fn remember_unions_caller_links_with_stored() {
7986+
// #382 semantics decision: remember merges caller links with stored
7987+
// links (dedup by target_id); unlink is the only removal path.
7988+
let (db, path) = temp_db();
7989+
db.remember(&make_entity("e-382b", "facts", "src382b", r#"{"n":"v1"}"#))
7990+
.unwrap();
7991+
db.remember_skip_dedup(&make_entity("e-382u", "facts", "tgtu", r#"{"n":"t"}"#))
7992+
.unwrap();
7993+
db.remember_skip_dedup(&make_entity("e-382v", "facts", "tgtv", r#"{"n":"t"}"#))
7994+
.unwrap();
7995+
db.link("facts", "src382b", "e-382u", "related").unwrap();
7996+
7997+
let mut e = make_entity("ignored", "facts", "src382b", r#"{"n":"v2"}"#);
7998+
e.links.push(MemoryLink {
7999+
target_id: "e-382v".to_string(),
8000+
relationship: "caused-by".to_string(),
8001+
weight: 0.9,
8002+
});
8003+
db.remember(&e).unwrap();
8004+
8005+
let live = db.get_entity("facts", "src382b").unwrap().unwrap();
8006+
let targets: Vec<&str> = live.links.iter().map(|l| l.target_id.as_str()).collect();
8007+
assert!(
8008+
targets.contains(&"e-382u") && targets.contains(&"e-382v"),
8009+
"remember must union caller links with stored links, got {targets:?}"
8010+
);
8011+
assert_eq!(live.links.len(), 2, "no duplicate edges");
8012+
8013+
// unlink still removes.
8014+
db.unlink("facts", "src382b", "e-382u").unwrap();
8015+
let live = db.get_entity("facts", "src382b").unwrap().unwrap();
8016+
assert_eq!(live.links.len(), 1);
8017+
assert_eq!(live.links[0].target_id, "e-382v");
8018+
let _ = fs::remove_file(&path);
8019+
}
8020+
8021+
#[test]
8022+
fn concurrent_links_and_remembers_do_not_lose_edges() {
8023+
// #382 (concurrent half): link()'s read-modify-write on the links
8024+
// JSON ran on the bare pooled connection — two concurrent link()
8025+
// calls could read the same base array and the second write dropped
8026+
// the first edge; remember's full-row UPDATE clobbered edges added
8027+
// after the caller's read. Hammer one source entity with parallel
8028+
// link() calls AND content-changing re-asserts; every edge must
8029+
// survive.
8030+
use std::sync::Arc;
8031+
use std::thread;
8032+
8033+
let (db, path) = temp_db();
8034+
db.remember(&make_entity("e-382c", "facts", "src382c", r#"{"n":"v0"}"#))
8035+
.unwrap();
8036+
const TARGETS: usize = 12;
8037+
for i in 0..TARGETS {
8038+
// skip_dedup: the identical tiny bodies would otherwise
8039+
// near-dup-merge into one row and the links would have no target.
8040+
db.remember_skip_dedup(&make_entity(
8041+
&format!("t-382-{i}"),
8042+
"facts",
8043+
&format!("tgt382c-{i}"),
8044+
r#"{"n":"t"}"#,
8045+
))
8046+
.unwrap();
8047+
}
8048+
8049+
let db = Arc::new(db);
8050+
let mut handles = Vec::new();
8051+
// Parallel linkers, one distinct target each.
8052+
for i in 0..TARGETS {
8053+
let db = Arc::clone(&db);
8054+
handles.push(thread::spawn(move || {
8055+
db.link("facts", "src382c", &format!("t-382-{i}"), "related")
8056+
.expect("link");
8057+
}));
8058+
}
8059+
// A re-assert writer racing them (the remember clobber path).
8060+
{
8061+
let db = Arc::clone(&db);
8062+
handles.push(thread::spawn(move || {
8063+
for i in 0..10 {
8064+
let e = make_entity(
8065+
"ignored",
8066+
"facts",
8067+
"src382c",
8068+
&format!(r#"{{"n":"v{}"}}"#, i + 1),
8069+
);
8070+
db.remember(&e).expect("re-assert");
8071+
}
8072+
}));
8073+
}
8074+
for h in handles {
8075+
h.join().unwrap();
8076+
}
8077+
8078+
let live = db.get_entity("facts", "src382c").unwrap().unwrap();
8079+
assert_eq!(
8080+
live.links.len(),
8081+
TARGETS,
8082+
"edges lost under concurrency: expected {TARGETS}, got {} ({:?})",
8083+
live.links.len(),
8084+
live.links
8085+
.iter()
8086+
.map(|l| l.target_id.as_str())
8087+
.collect::<Vec<_>>()
8088+
);
8089+
let _ = fs::remove_file(&path);
8090+
}
8091+
79118092
#[test]
79128093
fn remember_identical_content_creates_no_history() {
79138094
let (db, path) = temp_db();

0 commit comments

Comments
 (0)