@@ -1668,15 +1668,22 @@ impl Database {
16681668 if let Some(ex_id) = existing_id {
16691669 // Update existing entity — compute decay + boost (it's being remembered)
16701670 id = ex_id;
1671+ // #379: this branch is an audited read-decide-write (the #371
1672+ // re-assert audit below) — take the writer lock BEFORE the reads
1673+ // so a concurrent set_valid_to/status flip/re-assert can't
1674+ // invalidate them mid-flight. See audited_write_tx. The inversion
1675+ // guards' early Err returns drop the tx (rollback): a rejected
1676+ // write still mutates nothing.
1677+ let tx = Self::audited_write_tx(&conn)?;
16711678 let now = now_ms();
1672- let old_decay: f64 = conn
1679+ let old_decay: f64 = tx
16731680 .query_row(
16741681 "SELECT decay_score FROM entities WHERE id = ?1",
16751682 params![id],
16761683 |r| r.get(0),
16771684 )
16781685 .unwrap_or(1.0);
1679- let old_count: i64 = conn
1686+ let old_count: i64 = tx
16801687 .query_row(
16811688 "SELECT retrieval_count FROM entities WHERE id = ?1",
16821689 params![id],
@@ -1691,7 +1698,7 @@ impl Database {
16911698 // before overwriting, so history is kept for as-of queries. An
16921699 // identical re-assertion is NOT a new version (no spurious history).
16931700 // Compare on plaintext — GCM ciphertext differs every call.
1694- let old_raw_body: String = conn
1701+ let old_raw_body: String = tx
16951702 .query_row(
16961703 "SELECT body_json FROM entities WHERE id = ?1",
16971704 params![id],
@@ -1737,7 +1744,7 @@ impl Database {
17371744 let audit_reassert_from: Option<i64> = if !content_changed
17381745 && (valid_from.is_some() || valid_to.is_some())
17391746 {
1740- let (stored_eff_from, stored_to): (i64, Option<i64>) = conn .query_row(
1747+ let (stored_eff_from, stored_to): (i64, Option<i64>) = tx .query_row(
17411748 "SELECT COALESCE(valid_from_unix_ms, recorded_at_unix_ms, created_at_unix_ms), \
17421749 valid_to_unix_ms \
17431750 FROM entities WHERE id = ?1",
@@ -1765,7 +1772,7 @@ impl Database {
17651772 // `invalidated_at_unix_ms > ?` can never match for any timestamp
17661773 // -- permanently unreachable despite mimir_history still listing it.
17671774 let now = if content_changed || audit_period_change {
1768- let old_recorded_or_created: i64 = conn
1775+ let old_recorded_or_created: i64 = tx
17691776 .query_row(
17701777 "SELECT COALESCE(recorded_at_unix_ms, created_at_unix_ms) FROM entities WHERE id = ?1",
17711778 params![id],
@@ -1790,13 +1797,13 @@ impl Database {
17901797 // period that valid_at can never match while still shadowing older
17911798 // versions in bitemporal_at — the exact "unanswerable fact" state
17921799 // set_valid_to already refuses for the identical defaulted-from
1793- // case. Checked BEFORE the transaction so a rejected write mutates
1794- // nothing.
1800+ // case. Checked before any write lands, so a rejected write
1801+ // mutates nothing (the Err return drops the tx — rollback) .
17951802 if let Some(vt) = valid_to {
17961803 let eff_from: i64 = match valid_from {
17971804 Some(vf) => vf,
17981805 None if content_changed => now,
1799- None => conn .query_row(
1806+ None => tx .query_row(
18001807 "SELECT COALESCE(valid_from_unix_ms, recorded_at_unix_ms, created_at_unix_ms) \
18011808 FROM entities WHERE id = ?1",
18021809 params![id],
@@ -1820,7 +1827,7 @@ impl Database {
18201827 // UPDATE re-sets valid_to to the caller's value (NULL here,
18211828 // i.e. [vf, ∞)), which cannot invert.
18221829 if !content_changed {
1823- let stored_to: Option<i64> = conn .query_row(
1830+ let stored_to: Option<i64> = tx .query_row(
18241831 "SELECT valid_to_unix_ms FROM entities WHERE id = ?1",
18251832 params![id],
18261833 |r| r.get(0),
@@ -1837,8 +1844,9 @@ impl Database {
18371844 }
18381845 }
18391846
1840- // M-1: wrap entity UPDATE + FTS UPDATE in a transaction
1841- let tx = conn.unchecked_transaction()?;
1847+ // M-1: entity UPDATE + FTS UPDATE ride the same audited-writer
1848+ // transaction opened at the top of this branch (#379), so a
1849+ // failure in one can't leave the other orphaned.
18421850
18431851 // Snapshot the OLD row BEFORE the UPDATE overwrites it. invalidated_at
18441852 // = now (transaction time it was retired); superseded_by = the live id
@@ -3610,7 +3618,9 @@ impl Database {
36103618 reason: &str,
36113619 ) -> Result<(), Box<dyn std::error::Error>> {
36123620 let conn = self.conn()?;
3613- let (cur_status, eff_from, old_rec): (Option<String>, i64, i64) = conn.query_row(
3621+ // #379: writer lock BEFORE the precondition read — see audited_write_tx.
3622+ let tx = Self::audited_write_tx(&conn)?;
3623+ let (cur_status, eff_from, old_rec): (Option<String>, i64, i64) = tx.query_row(
36143624 "SELECT status, \
36153625 COALESCE(valid_from_unix_ms, recorded_at_unix_ms, created_at_unix_ms), \
36163626 COALESCE(recorded_at_unix_ms, created_at_unix_ms) \
@@ -3619,10 +3629,11 @@ impl Database {
36193629 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
36203630 )?;
36213631 if cur_status.as_deref() == Some(status) {
3622- conn .execute(
3632+ tx .execute(
36233633 "UPDATE entities SET archive_reason = ?1, last_accessed_unix_ms = ?2 WHERE id = ?3",
36243634 params![reason, now_ms(), id],
36253635 )?;
3636+ tx.commit()?;
36263637 return Ok(());
36273638 }
36283639 // Audited flip — same shape as set_valid_to's close (#373): snapshot
@@ -3636,7 +3647,6 @@ impl Database {
36363647 "hist-{}",
36373648 uuid::Uuid::new_v4().to_string().replace('-', "")[..16].to_string()
36383649 );
3639- let tx = conn.unchecked_transaction()?;
36403650 Self::snapshot_live_row_to_history(&tx, &history_id, now, id)?;
36413651 tx.execute(
36423652 "UPDATE entities SET status = ?1, archive_reason = ?2, last_accessed_unix_ms = ?3,
@@ -3649,6 +3659,27 @@ impl Database {
36493659 Ok(())
36503660 }
36513661
3662+ /// Begin an IMMEDIATE transaction for an audited temporal writer (#379).
3663+ ///
3664+ /// The audited writers (the #371 re-assert path in remember, the #373
3665+ /// set_valid_to close, the #377 status flip) are read-decide-write: they
3666+ /// read a precondition (stored body/period/status, old recorded_at),
3667+ /// decide whether and how to snapshot, then write. Reading on the bare
3668+ /// pooled connection — or inside a DEFERRED transaction — lets two
3669+ /// concurrent writers on the same id both pass their checks against the
3670+ /// same stale read and interleave: double snapshots, zero/inverted
3671+ /// history windows, and a live recorded_at that moves backwards.
3672+ /// IMMEDIATE takes the writer lock up front, so the precondition read
3673+ /// happens under the same lock as the write; concurrent writers
3674+ /// serialize on the connection's busy_timeout instead of corrupting.
3675+ /// Every exit path must either commit or let the transaction drop
3676+ /// (rollback) — a rejected write mutates nothing.
3677+ fn audited_write_tx(
3678+ conn: &rusqlite::Connection,
3679+ ) -> rusqlite::Result<rusqlite::Transaction<'_>> {
3680+ rusqlite::Transaction::new_unchecked(conn, rusqlite::TransactionBehavior::Immediate)
3681+ }
3682+
36523683 /// Snapshot the current live row of `id` into `entity_history`, retired at
36533684 /// transaction time `invalidated_at` and linked back to the live id via
36543685 /// `superseded_by`. All other columns (incl. the prior recorded_at) are
@@ -3710,7 +3741,10 @@ impl Database {
37103741 /// stored close that was kept).
37113742 pub fn set_valid_to(&self, id: &str, valid_to: i64) -> Result<i64, Box<dyn std::error::Error>> {
37123743 let conn = self.conn()?;
3713- let (eff_from, cur_to, old_rec): (i64, Option<i64>, i64) = conn.query_row(
3744+ // #379: writer lock BEFORE the precondition read — see audited_write_tx.
3745+ // The refusal/no-op returns below drop the tx (rollback): nothing written.
3746+ let tx = Self::audited_write_tx(&conn)?;
3747+ let (eff_from, cur_to, old_rec): (i64, Option<i64>, i64) = tx.query_row(
37143748 "SELECT COALESCE(valid_from_unix_ms, recorded_at_unix_ms, created_at_unix_ms), \
37153749 valid_to_unix_ms, \
37163750 COALESCE(recorded_at_unix_ms, created_at_unix_ms) \
@@ -3745,7 +3779,6 @@ impl Database {
37453779 "hist-{}",
37463780 uuid::Uuid::new_v4().to_string().replace('-', "")[..16].to_string()
37473781 );
3748- let tx = conn.unchecked_transaction()?;
37493782 Self::snapshot_live_row_to_history(&tx, &history_id, now, id)?;
37503783 tx.execute(
37513784 "UPDATE entities SET valid_to_unix_ms = ?1, recorded_at_unix_ms = ?2,
@@ -7661,6 +7694,112 @@ mod tests {
76617694 let _ = fs::remove_file(&path);
76627695 }
76637696
7697+ #[test]
7698+ fn concurrent_audited_writers_serialize_without_corrupting_history() {
7699+ // #379: the audited writers are read-decide-write. Before the
7700+ // IMMEDIATE writer lock (audited_write_tx), concurrent writers on the
7701+ // same id could both pass their precondition checks against the same
7702+ // stale read and interleave — double snapshots, zero/inverted history
7703+ // windows, a live recorded_at moving backwards. Hammer one id from
7704+ // several threads across two writer kinds and then assert the
7705+ // partition invariants that as_of/bitemporal reconstruction depends
7706+ // on: every window strictly positive, windows contiguous, and the
7707+ // last snapshot handing off exactly at the live row's recorded_at.
7708+ use std::sync::Arc;
7709+ use std::thread;
7710+
7711+ let (db, path) = temp_db();
7712+ let e = make_entity("e-379", "facts", "race-key", r#"{"note":"v0"}"#);
7713+ db.remember(&e).unwrap();
7714+ let id: String = {
7715+ let conn = db.conn().unwrap();
7716+ conn.query_row(
7717+ "SELECT id FROM entities WHERE category='facts' AND key='race-key'",
7718+ [],
7719+ |r| r.get(0),
7720+ )
7721+ .unwrap()
7722+ };
7723+
7724+ let db = Arc::new(db);
7725+ let mut handles = Vec::new();
7726+ // Two threads of status flips (#377 writer), phase-shifted so most
7727+ // calls are actual changes.
7728+ for t in 0..2usize {
7729+ let db = Arc::clone(&db);
7730+ let id = id.clone();
7731+ handles.push(thread::spawn(move || {
7732+ for i in 0..20 {
7733+ let s = if (t + i) % 2 == 0 { "deprecated" } else { "active" };
7734+ db.update_entity_status(&id, s, "race").expect("status flip");
7735+ }
7736+ }));
7737+ }
7738+ // One thread of content-changing re-asserts (the remember writer).
7739+ {
7740+ let db = Arc::clone(&db);
7741+ handles.push(thread::spawn(move || {
7742+ for i in 0..20 {
7743+ let e = make_entity(
7744+ "ignored",
7745+ "facts",
7746+ "race-key",
7747+ &format!(r#"{{"note":"v{}"}}"#, i + 1),
7748+ );
7749+ db.remember(&e).expect("re-assert");
7750+ }
7751+ }));
7752+ }
7753+ for h in handles {
7754+ h.join().unwrap();
7755+ }
7756+
7757+ let conn = db.conn().unwrap();
7758+ let mut stmt = conn
7759+ .prepare(
7760+ "SELECT recorded_at_unix_ms, invalidated_at_unix_ms FROM entity_history \
7761+ WHERE id = ?1 ORDER BY recorded_at_unix_ms ASC",
7762+ )
7763+ .unwrap();
7764+ let windows: Vec<(i64, i64)> = stmt
7765+ .query_map(params![id], |r| Ok((r.get(0)?, r.get(1)?)))
7766+ .unwrap()
7767+ .map(|r| r.unwrap())
7768+ .collect();
7769+ let live_rec: i64 = conn
7770+ .query_row(
7771+ "SELECT COALESCE(recorded_at_unix_ms, created_at_unix_ms) FROM entities WHERE id = ?1",
7772+ params![id],
7773+ |r| r.get(0),
7774+ )
7775+ .unwrap();
7776+
7777+ assert!(
7778+ windows.len() >= 20,
7779+ "the actual-change writes must have snapshotted (got {})",
7780+ windows.len()
7781+ );
7782+ for (rec, inv) in &windows {
7783+ assert!(
7784+ rec < inv,
7785+ "zero/inverted history window [{rec}, {inv}) — writers interleaved"
7786+ );
7787+ }
7788+ for w in windows.windows(2) {
7789+ assert_eq!(
7790+ w[0].1, w[1].0,
7791+ "history windows must partition transaction time contiguously"
7792+ );
7793+ }
7794+ assert_eq!(
7795+ windows.last().unwrap().1,
7796+ live_rec,
7797+ "the last snapshot must hand off exactly at the live row's recorded_at"
7798+ );
7799+
7800+ let _ = fs::remove_file(&path);
7801+ }
7802+
76647803 #[test]
76657804 fn remember_identical_content_creates_no_history() {
76667805 let (db, path) = temp_db();
0 commit comments