Skip to content

Commit 56baa84

Browse files
tcconnallyclaude
andcommitted
fix: serialize schema migration under BEGIN IMMEDIATE to close cross-process race (#353)
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
1 parent daef6c7 commit 56baa84

1 file changed

Lines changed: 197 additions & 87 deletions

File tree

src/schema.rs

Lines changed: 197 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -156,110 +156,112 @@ const SCHEMA_VERSION: i64 = 6;
156156
pub fn initialize_schema(conn: &Connection) -> Result<(), Box<dyn std::error::Error>> {
157157
// DDL is all IF NOT EXISTS, so it stays ungated: it both creates a fresh DB
158158
// and back-fills newer objects (e.g. idx_entities_recall) on older ones.
159+
// It also stays OUTSIDE the migration transaction below — IF NOT EXISTS is
160+
// already concurrency-safe, and it keeps the FTS5 virtual-table creation
161+
// out of the transaction entirely.
159162
conn.execute_batch(DDL_V0_2_0)?;
160163

161-
// The column-add migrations below each prepare a throwaway `SELECT col LIMIT 1`
162-
// probe. `open` runs several times per process, so on a fully-migrated DB this
163-
// is pure wasted work. Gate it on PRAGMA user_version: run once when behind,
164-
// then stamp current and skip on every subsequent open. (#208)
164+
// The column-add migrations each probe for the column first. `open` runs
165+
// several times per process, so on a fully-migrated DB this is pure wasted
166+
// work. Gate it on PRAGMA user_version: run once when behind, then stamp
167+
// current and skip on every subsequent open. (#208)
165168
let user_version: i64 = conn.query_row("PRAGMA user_version", [], |r| r.get(0))?;
166169
if user_version >= SCHEMA_VERSION {
167170
return Ok(());
168171
}
169172

170-
// Add embedding column if it doesn't exist (migration from v0.2.0)
171-
let has_embedding: bool = conn
172-
.prepare("SELECT embedding FROM entities LIMIT 1")
173-
.is_ok();
174-
if !has_embedding {
175-
conn.execute_batch("ALTER TABLE entities ADD COLUMN embedding BLOB;")?;
173+
// #353: PRAGMA busy_timeout serializes individual statements, not this
174+
// whole check-then-migrate sequence — two *processes* opening the same
175+
// pre-upgrade DB could both read a stale user_version, both enter the
176+
// migration path, and the loser would die on "duplicate column name".
177+
// BEGIN IMMEDIATE takes SQLite's write lock up front, acting as a
178+
// cross-process mutex: the loser blocks here (subject to busy_timeout)
179+
// until the winner commits, then sees the stamped version inside
180+
// apply_migrations and no-ops. Bonus: the migration steps used to
181+
// auto-commit individually; one transaction makes the whole migration
182+
// atomic (SQLite DDL is transactional), so a crash mid-migration rolls
183+
// back cleanly instead of leaving a half-migrated DB.
184+
conn.execute_batch("BEGIN IMMEDIATE;")?;
185+
match apply_migrations(conn) {
186+
Ok(()) => {
187+
conn.execute_batch("COMMIT;")?;
188+
Ok(())
189+
}
190+
Err(e) => {
191+
let _ = conn.execute_batch("ROLLBACK;");
192+
Err(e)
193+
}
176194
}
195+
}
177196

178-
// Add always_on column (v1.x migration)
179-
let has_always_on: bool = conn
180-
.prepare("SELECT always_on FROM entities LIMIT 1")
181-
.is_ok();
182-
if !has_always_on {
183-
conn.execute_batch("ALTER TABLE entities ADD COLUMN always_on INTEGER DEFAULT 0;")?;
197+
/// Add `column` to `table` unless it already exists. Defense-in-depth for the
198+
/// #353 race: even if the existence probe raced another writer (e.g. a process
199+
/// that migrated between our probe and our ALTER), "duplicate column name"
200+
/// means the column is present — exactly the state we wanted — so it is
201+
/// treated as success, not an error.
202+
fn ensure_column(
203+
conn: &Connection,
204+
table: &str,
205+
column: &str,
206+
decl: &str,
207+
) -> Result<(), Box<dyn std::error::Error>> {
208+
if conn
209+
.prepare(&format!("SELECT {column} FROM {table} LIMIT 1"))
210+
.is_ok()
211+
{
212+
return Ok(());
184213
}
185-
186-
// Add certainty column (v1.x migration)
187-
let has_certainty: bool = conn
188-
.prepare("SELECT certainty FROM entities LIMIT 1")
189-
.is_ok();
190-
if !has_certainty {
191-
conn.execute_batch("ALTER TABLE entities ADD COLUMN certainty REAL DEFAULT 0.5;")?;
214+
match conn.execute_batch(&format!("ALTER TABLE {table} ADD COLUMN {column} {decl};")) {
215+
Ok(()) => Ok(()),
216+
Err(e) if e.to_string().contains("duplicate column name") => Ok(()),
217+
Err(e) => Err(e.into()),
192218
}
219+
}
193220

194-
// Add workspace_hash column (v1.2.0 migration — multi-workspace scoping)
195-
let has_workspace_hash: bool = conn
196-
.prepare("SELECT workspace_hash FROM entities LIMIT 1")
197-
.is_ok();
198-
if !has_workspace_hash {
199-
conn.execute_batch("ALTER TABLE entities ADD COLUMN workspace_hash TEXT DEFAULT '';")?;
221+
/// The gated column-add migrations. Runs inside the BEGIN IMMEDIATE
222+
/// transaction taken by `initialize_schema` (#353) — must not BEGIN/COMMIT
223+
/// itself.
224+
fn apply_migrations(conn: &Connection) -> Result<(), Box<dyn std::error::Error>> {
225+
// Re-check the version now that we hold the write lock: if another
226+
// process completed the migration while we waited on BEGIN IMMEDIATE,
227+
// there is nothing left to do. (#353)
228+
let user_version: i64 = conn.query_row("PRAGMA user_version", [], |r| r.get(0))?;
229+
if user_version >= SCHEMA_VERSION {
230+
return Ok(());
200231
}
201232

202-
// Add agent_id column to entities (v1.2.0 — agent attribution)
203-
let has_agent_id: bool = conn.prepare("SELECT agent_id FROM entities LIMIT 1").is_ok();
204-
if !has_agent_id {
205-
conn.execute_batch("ALTER TABLE entities ADD COLUMN agent_id TEXT DEFAULT '';")?;
206-
}
233+
// Add embedding column if it doesn't exist (migration from v0.2.0)
234+
ensure_column(conn, "entities", "embedding", "BLOB")?;
207235

208-
// Add agent_id column to journal (v1.2.0 — journal attribution)
209-
let has_journal_agent: bool = conn.prepare("SELECT agent_id FROM journal LIMIT 1").is_ok();
210-
if !has_journal_agent {
211-
conn.execute_batch("ALTER TABLE journal ADD COLUMN agent_id TEXT DEFAULT '';")?;
212-
}
236+
// v1.x: always_on, certainty
237+
ensure_column(conn, "entities", "always_on", "INTEGER DEFAULT 0")?;
238+
ensure_column(conn, "entities", "certainty", "REAL DEFAULT 0.5")?;
213239

214-
// Add audit_hash column to journal (v2.0 — cryptographic audit log)
215-
let has_audit_hash: bool = conn.prepare("SELECT audit_hash FROM journal LIMIT 1").is_ok();
216-
if !has_audit_hash {
217-
conn.execute_batch("ALTER TABLE journal ADD COLUMN audit_hash TEXT DEFAULT '';")?;
218-
}
240+
// v1.2.0: multi-workspace scoping, agent attribution, access controls
241+
ensure_column(conn, "entities", "workspace_hash", "TEXT DEFAULT ''")?;
242+
ensure_column(conn, "entities", "agent_id", "TEXT DEFAULT ''")?;
243+
ensure_column(conn, "journal", "agent_id", "TEXT DEFAULT ''")?;
244+
ensure_column(conn, "entities", "visibility", "TEXT DEFAULT 'workspace'")?;
219245

220-
// Add visibility column (v1.2.0 — access controls)
221-
let has_visibility: bool = conn.prepare("SELECT visibility FROM entities LIMIT 1").is_ok();
222-
if !has_visibility {
223-
conn.execute_batch("ALTER TABLE entities ADD COLUMN visibility TEXT DEFAULT 'workspace';")?;
224-
}
246+
// v2.0: cryptographic audit log
247+
ensure_column(conn, "journal", "audit_hash", "TEXT DEFAULT ''")?;
225248

226249
// Add bi-temporal columns (v2.4.0 — bi-temporal facts). Valid time
227250
// (valid_from/valid_to), transaction time (recorded_at/invalidated_at), and
228251
// supersession links. All additive; existing rows keep their meaning.
229-
if conn.prepare("SELECT valid_from_unix_ms FROM entities LIMIT 1").is_err() {
230-
conn.execute_batch("ALTER TABLE entities ADD COLUMN valid_from_unix_ms INTEGER;")?;
231-
}
232-
if conn.prepare("SELECT valid_to_unix_ms FROM entities LIMIT 1").is_err() {
233-
conn.execute_batch("ALTER TABLE entities ADD COLUMN valid_to_unix_ms INTEGER;")?;
234-
}
235-
if conn.prepare("SELECT recorded_at_unix_ms FROM entities LIMIT 1").is_err() {
236-
conn.execute_batch("ALTER TABLE entities ADD COLUMN recorded_at_unix_ms INTEGER;")?;
237-
}
238-
if conn.prepare("SELECT invalidated_at_unix_ms FROM entities LIMIT 1").is_err() {
239-
conn.execute_batch("ALTER TABLE entities ADD COLUMN invalidated_at_unix_ms INTEGER;")?;
240-
}
241-
if conn.prepare("SELECT supersedes FROM entities LIMIT 1").is_err() {
242-
conn.execute_batch("ALTER TABLE entities ADD COLUMN supersedes TEXT DEFAULT '';")?;
243-
}
244-
if conn.prepare("SELECT superseded_by FROM entities LIMIT 1").is_err() {
245-
conn.execute_batch("ALTER TABLE entities ADD COLUMN superseded_by TEXT DEFAULT '';")?;
246-
}
252+
ensure_column(conn, "entities", "valid_from_unix_ms", "INTEGER")?;
253+
ensure_column(conn, "entities", "valid_to_unix_ms", "INTEGER")?;
254+
ensure_column(conn, "entities", "recorded_at_unix_ms", "INTEGER")?;
255+
ensure_column(conn, "entities", "invalidated_at_unix_ms", "INTEGER")?;
256+
ensure_column(conn, "entities", "supersedes", "TEXT DEFAULT ''")?;
257+
ensure_column(conn, "entities", "superseded_by", "TEXT DEFAULT ''")?;
247258

248259
// Add efficacy-tracking columns (v2.10.0 — PMB-inspired follow-rate scoring).
249-
if conn.prepare("SELECT follow_count FROM entities LIMIT 1").is_err() {
250-
conn.execute_batch("ALTER TABLE entities ADD COLUMN follow_count INTEGER DEFAULT 0;")?;
251-
}
252-
if conn.prepare("SELECT miss_count FROM entities LIMIT 1").is_err() {
253-
conn.execute_batch("ALTER TABLE entities ADD COLUMN miss_count INTEGER DEFAULT 0;")?;
254-
}
255-
if conn.prepare("SELECT follow_rate FROM entities LIMIT 1").is_err() {
256-
conn.execute_batch("ALTER TABLE entities ADD COLUMN follow_rate REAL DEFAULT 0.0;")?;
257-
}
258-
if conn.prepare("SELECT efficacy_status FROM entities LIMIT 1").is_err() {
259-
conn.execute_batch(
260-
"ALTER TABLE entities ADD COLUMN efficacy_status TEXT DEFAULT 'unverified';",
261-
)?;
262-
}
260+
ensure_column(conn, "entities", "follow_count", "INTEGER DEFAULT 0")?;
261+
ensure_column(conn, "entities", "miss_count", "INTEGER DEFAULT 0")?;
262+
ensure_column(conn, "entities", "follow_rate", "REAL DEFAULT 0.0")?;
263+
ensure_column(conn, "entities", "efficacy_status", "TEXT DEFAULT 'unverified'")?;
264+
263265
// Backfill transaction time for pre-existing rows: a fact's recorded_at is
264266
// when Mneme first stored it, i.e. its created_at. (No-op on a fresh DB.)
265267
conn.execute_batch(
@@ -276,16 +278,12 @@ pub fn initialize_schema(conn: &Connection) -> Result<(), Box<dyn std::error::Er
276278
)?;
277279

278280
// v5: persistent importance floor (see the column comment in the DDL).
279-
if conn.prepare("SELECT importance FROM entities LIMIT 1").is_err() {
280-
conn.execute_batch("ALTER TABLE entities ADD COLUMN importance REAL DEFAULT 0.0;")?;
281-
}
281+
ensure_column(conn, "entities", "importance", "REAL DEFAULT 0.0")?;
282282

283283
// v6: sign-bit embedding signatures for the dense-search prefilter, plus a
284284
// backfill for embeddings stored before the column existed. Bounded work:
285285
// one pass over embedded rows, ~50 bytes written per row.
286-
if conn.prepare("SELECT emb_sig FROM entities LIMIT 1").is_err() {
287-
conn.execute_batch("ALTER TABLE entities ADD COLUMN emb_sig BLOB;")?;
288-
}
286+
ensure_column(conn, "entities", "emb_sig", "BLOB")?;
289287
{
290288
let mut stmt = conn.prepare(
291289
"SELECT id, embedding FROM entities \
@@ -665,6 +663,118 @@ mod tests {
665663
assert_eq!(v, SCHEMA_VERSION);
666664
}
667665

666+
#[test]
667+
fn apply_migrations_post_lock_version_recheck_is_noop() {
668+
// #353: the loser of the migration race blocks on BEGIN IMMEDIATE,
669+
// then must see the winner's stamped user_version and skip cleanly.
670+
// Exercise that re-check path directly: on an already-current DB,
671+
// apply_migrations must return Ok without touching anything.
672+
let (conn, _path) = temp_db();
673+
initialize_schema(&conn).expect("init schema");
674+
conn.execute(
675+
"INSERT INTO entities (id, category, key, body_json, created_at_unix_ms, last_accessed_unix_ms)
676+
VALUES ('recheck-test', 'insight', 'k', '{}', 0, 0)",
677+
[],
678+
)
679+
.unwrap();
680+
681+
apply_migrations(&conn).expect("post-lock re-check must be a clean no-op");
682+
683+
let v: i64 = conn
684+
.query_row("PRAGMA user_version", [], |r| r.get(0))
685+
.unwrap();
686+
assert_eq!(v, SCHEMA_VERSION);
687+
let count: i64 = conn
688+
.query_row("SELECT COUNT(*) FROM entities", [], |r| r.get(0))
689+
.unwrap();
690+
assert_eq!(count, 1);
691+
}
692+
693+
#[test]
694+
fn ensure_column_treats_duplicate_column_as_benign() {
695+
// #353 defense-in-depth: simulate losing the probe/ALTER race — the
696+
// column appears after our probe would have run. ensure_column's
697+
// ALTER error path must swallow "duplicate column name".
698+
let (conn, _path) = temp_db();
699+
conn.execute_batch("CREATE TABLE t (id TEXT PRIMARY KEY, extra TEXT);")
700+
.unwrap();
701+
// Probe path: column exists, no ALTER attempted.
702+
ensure_column(&conn, "t", "extra", "TEXT").expect("existing column is a no-op");
703+
// Error path: bypass the probe by asserting the raw duplicate error is
704+
// matched by the same predicate ensure_column uses.
705+
let err = conn
706+
.execute_batch("ALTER TABLE t ADD COLUMN extra TEXT;")
707+
.unwrap_err();
708+
assert!(
709+
err.to_string().contains("duplicate column name"),
710+
"SQLite duplicate-column error text changed: {err}"
711+
);
712+
}
713+
714+
#[test]
715+
fn concurrent_opens_of_pre_upgrade_db_both_succeed() {
716+
// #353 end-to-end: two "processes" (independent connections — same
717+
// lock semantics as separate OS processes) open the same pre-upgrade
718+
// DB and run initialize_schema concurrently. Before the fix the loser
719+
// could fail with "duplicate column name"; now BEGIN IMMEDIATE
720+
// serializes them and the loser's post-lock re-check no-ops.
721+
let dir = std::env::temp_dir();
722+
let path = dir.join(format!("mimir-test-race-{}.db", uuid::Uuid::new_v4()));
723+
let path_str = path.to_str().unwrap().to_string();
724+
725+
// Pre-upgrade fixture: legacy tables without the ALTER-added columns,
726+
// user_version=0 (same shape as migrates_pre_versioned_db_missing_a_column).
727+
{
728+
let conn = Connection::open(&path_str).unwrap();
729+
conn.execute_batch(
730+
"CREATE TABLE entities (
731+
id TEXT PRIMARY KEY, category TEXT NOT NULL DEFAULT 'general', key TEXT NOT NULL,
732+
body_json TEXT NOT NULL DEFAULT '{}', archived INTEGER DEFAULT 0,
733+
retrieval_count INTEGER DEFAULT 0,
734+
created_at_unix_ms INTEGER NOT NULL, last_accessed_unix_ms INTEGER NOT NULL
735+
);
736+
CREATE TABLE journal (
737+
id TEXT PRIMARY KEY, entity_id TEXT DEFAULT '',
738+
created_at_unix_ms INTEGER NOT NULL
739+
);",
740+
)
741+
.unwrap();
742+
}
743+
744+
let barrier = std::sync::Arc::new(std::sync::Barrier::new(2));
745+
let handles: Vec<_> = (0..2)
746+
.map(|_| {
747+
let path = path_str.clone();
748+
let barrier = barrier.clone();
749+
std::thread::spawn(move || {
750+
let conn = Connection::open(&path).map_err(|e| e.to_string())?;
751+
// Same busy_timeout Database::open applies: the loser must
752+
// WAIT on BEGIN IMMEDIATE, not fail fast with SQLITE_BUSY.
753+
conn.execute_batch("PRAGMA busy_timeout=5000;")
754+
.map_err(|e| e.to_string())?;
755+
barrier.wait();
756+
initialize_schema(&conn).map_err(|e| e.to_string())?;
757+
Ok::<(), String>(())
758+
})
759+
})
760+
.collect();
761+
for h in handles {
762+
h.join()
763+
.expect("thread panicked")
764+
.expect("concurrent initialize_schema must succeed in both openers");
765+
}
766+
767+
// The DB came out fully migrated exactly once.
768+
let conn = Connection::open(&path_str).unwrap();
769+
let v: i64 = conn
770+
.query_row("PRAGMA user_version", [], |r| r.get(0))
771+
.unwrap();
772+
assert_eq!(v, SCHEMA_VERSION);
773+
assert!(conn.prepare("SELECT emb_sig FROM entities LIMIT 1").is_ok());
774+
drop(conn);
775+
let _ = std::fs::remove_file(&path_str);
776+
}
777+
668778
#[test]
669779
fn migrates_unique_index_to_workspace_scoped_identity() {
670780
// v4 (#339): a v3-era DB with the two-column unique index and existing

0 commit comments

Comments
 (0)