Skip to content

Commit a3ddb46

Browse files
tcconnallytcconnallyclaude
authored
feat: coldness-driven consolidation ("local dreaming") wired into autocohere (#350)
mimir_consolidate was similarity-driven over the RECENT window, never retired its sources, and only ran when an agent thought to call it — so it compressed the memories least in need of compression, doubled storage for everything it touched, and wasn't actually background. - cold_first param: scan longest-idle entities first (last_accessed ASC, id tie-break) — compress what decay is about to claim, before it is lost one entity at a time. - archive_sources param: retire merged sources once the observation exists; archive_reason names the observation (traceable, reversible) and their FTS rows are cleaned. Verified or importance-floored sources are NEVER archived — the same exemption promise decay makes. - autocohere step 4: bounded cold_first+archive_sources pass per category (5 observations max each), skipping 'observation' (no meta-observations / runaway recursion) and 'memories' (adapter files must never be similarity-merged). Reports observations_created + consolidate_sources_archived. - Defaults unchanged: a plain mimir_consolidate call behaves exactly as before. docs/retention.md gains a consolidation section. Tests: consolidate_archive_sources_retires_merged_but_exempts_verified_and_scored, consolidate_cold_first_targets_longest_idle_window. Suite: 182 passed / 0 failed. Co-authored-by: tcconnally <hermes@perseus.observer> Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
1 parent fdd5ae6 commit a3ddb46

5 files changed

Lines changed: 283 additions & 5 deletions

File tree

docs/retention.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,26 @@ Deletion is explicit and two-step:
104104
- **`purge`** — permanently delete entities that are **already archived**.
105105
Supports `dry_run`. This is the only way memory leaves the database.
106106

107+
## Consolidation ("local dreaming")
108+
109+
Decay forgets one memory at a time; consolidation compresses instead of
110+
losing. `mimir_consolidate` merges overlapping same-category entities into a
111+
single evidence-tracked *observation* (category `observation`, linked to each
112+
source via `evidence_for`, carrying a `proof_count`). Two opt-in flags shape
113+
it into background forgetting:
114+
115+
- `cold_first: true` scans the longest-idle entities first — the ones decay
116+
is about to claim — so fading knowledge is compressed before it is lost.
117+
- `archive_sources: true` retires the merged sources once the observation
118+
exists (`archive_reason` names the observation, so the merge is traceable
119+
and reversible). **Verified or importance-floored sources are never
120+
archived** — the same exemption promise decay makes.
121+
122+
`mimir_autocohere` runs a bounded pass automatically (a few observations per
123+
category per run, cold-first, archiving sources), skipping the `observation`
124+
category (no meta-observations) and `memories` (files from the /memories
125+
adapter are never similarity-merged).
126+
107127
## Semantic recall and reinforcement
108128

109129
By default, retrieval reinforcement fires only on the keyword (`fts5`) recall

src/db.rs

Lines changed: 199 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3869,9 +3869,16 @@ impl Database {
38693869
params: &crate::models::ConsolidateParams,
38703870
) -> Result<crate::models::ConsolidateReport, Box<dyn std::error::Error>> {
38713871
let conn = self.conn()?;
3872+
// cold_first scans the entities decay is about to claim (ASC = coldest
3873+
// first) — "local dreaming" compresses fading memories into durable
3874+
// observations instead of losing them one by one. Default (DESC)
3875+
// preserves the original recent-window behavior.
3876+
let order = if params.cold_first { "ASC" } else { "DESC" };
38723877
let mut stmt = conn.prepare(&format!(
3873-
"SELECT id, key, body_json, certainty FROM entities WHERE category = ?1 AND archived = 0
3874-
ORDER BY last_accessed_unix_ms DESC LIMIT {} OFFSET ?2",
3878+
"SELECT id, key, body_json, certainty, verified, importance
3879+
FROM entities WHERE category = ?1 AND archived = 0
3880+
ORDER BY last_accessed_unix_ms {}, id ASC LIMIT {} OFFSET ?2",
3881+
order,
38753882
Self::CONFLICT_SCAN_WINDOW
38763883
))?;
38773884
let rows = stmt.query_map(params![params.category, params.offset], |r| {
@@ -3880,9 +3887,12 @@ impl Database {
38803887
r.get::<_, String>(1)?,
38813888
r.get::<_, String>(2)?,
38823889
r.get::<_, f64>(3).unwrap_or(0.5),
3890+
r.get::<_, bool>(4).unwrap_or(false),
3891+
r.get::<_, Option<f64>>(5).unwrap_or(None).unwrap_or(0.0),
38833892
))
38843893
})?;
3885-
let entities: Vec<(String, String, String, f64)> = rows.filter_map(|r| r.ok()).collect();
3894+
let entities: Vec<(String, String, String, f64, bool, f64)> =
3895+
rows.filter_map(|r| r.ok()).collect();
38863896
drop(stmt);
38873897

38883898
// Union-find over entity indices, joining any pair whose trigram
@@ -3933,6 +3943,7 @@ impl Database {
39333943

39343944
let mut observations = Vec::new();
39353945
let mut source_entities_merged: i64 = 0;
3946+
let mut sources_archived: i64 = 0;
39363947
let now = now_ms();
39373948

39383949
// Deterministic order: sort clusters by their lowest member index so
@@ -3945,7 +3956,7 @@ impl Database {
39453956
continue;
39463957
}
39473958

3948-
let members: Vec<&(String, String, String, f64)> =
3959+
let members: Vec<&(String, String, String, f64, bool, f64)> =
39493960
cluster.iter().map(|&i| &entities[i]).collect();
39503961
// The highest-certainty member's body becomes the summary (most
39513962
// reliable source), ties broken by entity id for determinism.
@@ -4022,6 +4033,40 @@ impl Database {
40224033
last_accessed_unix_ms: now,
40234034
};
40244035
self.remember(&entity)?;
4036+
4037+
// Local dreaming: retire the merged sources now that their
4038+
// content lives in the observation (which links back to each
4039+
// via evidence_for, and the archive_reason names the
4040+
// observation — traceable and reversible). Verified or
4041+
// importance-floored sources keep the decay exemption promise
4042+
// and stay live alongside the observation.
4043+
if params.archive_sources {
4044+
let tx = conn.unchecked_transaction()?;
4045+
for m in &members {
4046+
let (id, _, _, _, verified, importance) =
4047+
(&m.0, &m.1, &m.2, m.3, m.4, m.5);
4048+
if verified || importance > 0.0 {
4049+
continue;
4050+
}
4051+
let affected = tx.execute(
4052+
"UPDATE entities SET archived = 1, archive_reason = ?1,
4053+
last_accessed_unix_ms = ?2 WHERE id = ?3 AND archived = 0",
4054+
params![
4055+
format!("consolidated into {}", entity_id),
4056+
now,
4057+
id
4058+
],
4059+
)?;
4060+
if affected > 0 {
4061+
sources_archived += 1;
4062+
let _ = tx.execute(
4063+
"DELETE FROM entities_fts WHERE rowid = (SELECT rowid FROM entities WHERE id = ?1)",
4064+
params![id],
4065+
);
4066+
}
4067+
}
4068+
tx.commit()?;
4069+
}
40254070
}
40264071

40274072
source_entities_merged += proof_count;
@@ -4033,6 +4078,7 @@ impl Database {
40334078
entities_examined: n as i64,
40344079
observations_created: observations.len() as i64,
40354080
source_entities_merged,
4081+
sources_archived,
40364082
dry_run: params.dry_run,
40374083
observations,
40384084
})
@@ -8491,6 +8537,8 @@ mod tests {
84918537
limit: 50,
84928538
offset: 0,
84938539
dry_run: false,
8540+
cold_first: false,
8541+
archive_sources: false,
84948542
};
84958543
let report = db.consolidate(&params).unwrap();
84968544

@@ -8576,6 +8624,8 @@ mod tests {
85768624
limit: 50,
85778625
offset: 0,
85788626
dry_run: true,
8627+
cold_first: false,
8628+
archive_sources: false,
85798629
};
85808630
let report = db.consolidate(&params).unwrap();
85818631
assert_eq!(report.observations_created, 1);
@@ -8611,6 +8661,8 @@ mod tests {
86118661
limit: 50,
86128662
offset: 0,
86138663
dry_run: false,
8664+
cold_first: false,
8665+
archive_sources: false,
86148666
};
86158667
let report = db.consolidate(&params).unwrap();
86168668
assert_eq!(
@@ -8621,6 +8673,149 @@ mod tests {
86218673
let _ = fs::remove_file(&path);
86228674
}
86238675

8676+
#[test]
8677+
fn consolidate_archive_sources_retires_merged_but_exempts_verified_and_scored() {
8678+
// Local dreaming: archive_sources retires merged sources (reason names
8679+
// the observation), but verified or importance-floored sources keep
8680+
// the decay exemption promise and stay live.
8681+
let (db, path) = temp_db();
8682+
let ins = |id: &str, key: &str, verified: i64, importance: f64| {
8683+
db.conn()
8684+
.unwrap()
8685+
.execute(
8686+
"INSERT INTO entities (id, category, key, body_json, status, type, tags, \
8687+
decay_score, retrieval_count, layer, topic_path, archived, archive_reason, \
8688+
links, verified, source, certainty, importance, created_at_unix_ms, last_accessed_unix_ms) \
8689+
VALUES (?1, 'lore', ?2, \
8690+
'{\"note\":\"the gateway service handles authentication and rate limiting\"}', \
8691+
'active', 'insight', '[]', 1.0, 0, 'working', '', 0, '', '[]', ?3, 'agent', 0.5, ?4, 0, 0)",
8692+
params![id, key, verified, importance],
8693+
)
8694+
.unwrap();
8695+
db.conn()
8696+
.unwrap()
8697+
.execute(
8698+
"INSERT INTO entities_fts (rowid, body_json) \
8699+
VALUES ((SELECT rowid FROM entities WHERE id = ?1), \
8700+
'{\"note\":\"the gateway service handles authentication and rate limiting\"}')",
8701+
params![id],
8702+
)
8703+
.unwrap();
8704+
};
8705+
// Identical bodies → one cluster of four.
8706+
ins("cs-plain-a", "gw-a", 0, 0.0);
8707+
ins("cs-plain-b", "gw-b", 0, 0.0);
8708+
ins("cs-verified", "gw-c", 1, 0.0);
8709+
ins("cs-scored", "gw-d", 0, 0.8);
8710+
8711+
let report = db
8712+
.consolidate(&crate::models::ConsolidateParams {
8713+
category: "lore".to_string(),
8714+
similarity_threshold: 0.6,
8715+
limit: 50,
8716+
offset: 0,
8717+
dry_run: false,
8718+
cold_first: true,
8719+
archive_sources: true,
8720+
})
8721+
.unwrap();
8722+
assert_eq!(report.observations_created, 1);
8723+
assert_eq!(report.source_entities_merged, 4);
8724+
assert_eq!(
8725+
report.sources_archived, 2,
8726+
"only the two plain sources may be archived"
8727+
);
8728+
8729+
let archived_reason: String = db
8730+
.conn()
8731+
.unwrap()
8732+
.query_row(
8733+
"SELECT archive_reason FROM entities WHERE id = 'cs-plain-a'",
8734+
[],
8735+
|r| r.get(0),
8736+
)
8737+
.unwrap();
8738+
assert!(
8739+
archived_reason.starts_with("consolidated into obs-"),
8740+
"archive reason must name the observation, got: {archived_reason}"
8741+
);
8742+
let live: i64 = db
8743+
.conn()
8744+
.unwrap()
8745+
.query_row(
8746+
"SELECT COUNT(*) FROM entities WHERE id IN ('cs-verified','cs-scored') AND archived = 0",
8747+
[],
8748+
|r| r.get(0),
8749+
)
8750+
.unwrap();
8751+
assert_eq!(live, 2, "verified and importance-floored sources must stay live");
8752+
8753+
// Archived sources drop out of FTS.
8754+
let hits = db
8755+
.recall(&crate::models::RecallParams {
8756+
query: "authentication".to_string(),
8757+
skip_side_effects: true,
8758+
..crate::models::RecallParams::default()
8759+
})
8760+
.unwrap();
8761+
assert!(
8762+
hits.iter().all(|e| e.id != "cs-plain-a" && e.id != "cs-plain-b"),
8763+
"archived sources must not be recallable"
8764+
);
8765+
8766+
let _ = fs::remove_file(&path);
8767+
}
8768+
8769+
#[test]
8770+
fn consolidate_cold_first_targets_longest_idle_window() {
8771+
// With more entities than the scan window... (window is large, so
8772+
// instead assert ordering semantics directly: cold_first=true examines
8773+
// coldest-first, which changes WHICH side of a big category is seen
8774+
// when the window clips. Simulate with offset=0 and verify the report
8775+
// examines all rows here, and that the scan is deterministic in both
8776+
// modes — the behavioral contract the autocohere step relies on.)
8777+
let (db, path) = temp_db();
8778+
let ins = |id: &str, key: &str, body: &str, last_access: i64| {
8779+
db.conn()
8780+
.unwrap()
8781+
.execute(
8782+
"INSERT INTO entities (id, category, key, body_json, status, type, tags, \
8783+
decay_score, retrieval_count, layer, topic_path, archived, archive_reason, \
8784+
links, verified, source, certainty, created_at_unix_ms, last_accessed_unix_ms) \
8785+
VALUES (?1, 'coldcat', ?2, ?3, 'active', 'insight', '[]', 1.0, 0, \
8786+
'working', '', 0, '', '[]', 0, 'agent', 0.5, 0, ?4)",
8787+
params![id, key, body, last_access],
8788+
)
8789+
.unwrap();
8790+
};
8791+
// Two cold near-duplicates and two hot near-duplicates (different topic).
8792+
ins("cold-a", "ka", r#"{"note":"legacy billing cron runs at midnight utc"}"#, 1000);
8793+
ins("cold-b", "kb", r#"{"note":"legacy billing cron runs at midnight utc daily"}"#, 2000);
8794+
ins("hot-a", "kc", r#"{"note":"new search cluster deployed in frankfurt region"}"#, 9_000_000);
8795+
ins("hot-b", "kd", r#"{"note":"new search cluster deployed in frankfurt region today"}"#, 9_100_000);
8796+
8797+
let report = db
8798+
.consolidate(&crate::models::ConsolidateParams {
8799+
category: "coldcat".to_string(),
8800+
similarity_threshold: 0.6,
8801+
limit: 1, // only ONE observation allowed this run
8802+
offset: 0,
8803+
dry_run: true,
8804+
cold_first: true,
8805+
archive_sources: false,
8806+
})
8807+
.unwrap();
8808+
assert_eq!(report.observations_created, 1);
8809+
let obs = &report.observations[0];
8810+
assert!(
8811+
obs.source_ids.contains(&"cold-a".to_string()),
8812+
"cold_first with limit 1 must consolidate the COLD cluster first, got {:?}",
8813+
obs.source_ids
8814+
);
8815+
8816+
let _ = fs::remove_file(&path);
8817+
}
8818+
86248819
#[test]
86258820
fn graph_expand_returns_empty_for_no_seeds_or_no_links() {
86268821
let (db, path) = temp_db();

src/mcp.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2002,7 +2002,7 @@ fn list_tools(id: Option<Value>) -> JsonRpcResponse {
20022002
},
20032003
{
20042004
"name": "mimir_consolidate",
2005-
"description": "Merge overlapping/duplicative entities in the same category into durable, evidence-tracked 'observations' — the mirror image of mimir_conflicts, which flags dissimilar (contradictory) pairs. Groups entities whose pairwise trigram similarity meets similarity_threshold, then creates one new entity per group (category='observation') whose body carries a summary (the highest-certainty source's content), the full list of source entity ids as evidence, and a proof_count. Source entities are NOT deleted or archived — they remain independently accessible, and the new observation links back to each of them (relationship='evidence_for') for full audit. Read-only preview with dry_run=true.",
2005+
"description": "Merge overlapping/duplicative entities in the same category into durable, evidence-tracked 'observations' — the mirror image of mimir_conflicts, which flags dissimilar (contradictory) pairs. Groups entities whose pairwise trigram similarity meets similarity_threshold, then creates one new entity per group (category='observation') whose body carries a summary (the highest-certainty source's content), the full list of source entity ids as evidence, and a proof_count. The observation links back to each source (relationship='evidence_for') for full audit. By default sources stay live; set archive_sources=true to retire merged sources ('local dreaming' — verified or importance-floored sources are never archived), and cold_first=true to target the memories decay is about to claim. mimir_autocohere runs a bounded cold_first+archive_sources pass automatically. Read-only preview with dry_run=true.",
20062006
"inputSchema": {
20072007
"type": "object",
20082008
"properties": {
@@ -2029,6 +2029,16 @@ fn list_tools(id: Option<Value>) -> JsonRpcResponse {
20292029
"type": "boolean",
20302030
"default": false,
20312031
"description": "Preview which observations would be created without writing anything"
2032+
},
2033+
"cold_first": {
2034+
"type": "boolean",
2035+
"default": false,
2036+
"description": "Scan the COLDEST entities first (longest since last access) instead of the most recent — compress memories that are fading anyway, before decay archives them individually"
2037+
},
2038+
"archive_sources": {
2039+
"type": "boolean",
2040+
"default": false,
2041+
"description": "Archive merged source entities after the observation is created (archive_reason names the observation; reversible). Verified or importance-floored sources are never archived."
20322042
}
20332043
},
20342044
"required": [
@@ -2053,6 +2063,10 @@ fn list_tools(id: Option<Value>) -> JsonRpcResponse {
20532063
"type": "integer",
20542064
"description": "Total count of source entities folded into the created observations"
20552065
},
2066+
"sources_archived": {
2067+
"type": "integer",
2068+
"description": "Sources archived because archive_sources was set (verified/importance-floored sources are exempt)"
2069+
},
20562070
"dry_run": {
20572071
"type": "boolean"
20582072
},

src/models.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -673,6 +673,18 @@ pub struct ConsolidateParams {
673673
pub offset: i64,
674674
#[serde(default)]
675675
pub dry_run: bool,
676+
/// When true, scan the COLDEST entities first (last_accessed ASC) instead
677+
/// of the most recent — "local dreaming": compress memories that are
678+
/// fading anyway, before decay archives them individually. Default false
679+
/// preserves the original recent-window behavior.
680+
#[serde(default)]
681+
pub cold_first: bool,
682+
/// When true, archive the merged source entities after the observation is
683+
/// created (archive_reason names the observation, so the merge is
684+
/// traceable and reversible). Verified or importance-floored sources are
685+
/// never archived — same exemption policy as decay. Default false.
686+
#[serde(default)]
687+
pub archive_sources: bool,
676688
}
677689

678690
fn default_consolidate_threshold() -> f64 {
@@ -706,6 +718,9 @@ pub struct ConsolidateReport {
706718
pub entities_examined: i64,
707719
pub observations_created: i64,
708720
pub source_entities_merged: i64,
721+
/// Sources archived because archive_sources was set. Always <=
722+
/// source_entities_merged: verified/importance-floored sources stay live.
723+
pub sources_archived: i64,
709724
pub dry_run: bool,
710725
pub observations: Vec<Observation>,
711726
}

0 commit comments

Comments
 (0)