Skip to content

Commit 1b3820a

Browse files
committed
feat(#273): mimir-persist — persistent FastMCP EventStore over SQLite
Greenfield Python package implementing the real mcp.server.streamable_http EventStore ABC (store_event / replay_events_after) backed by a dedicated SQLite events table, giving any FastMCP server SSE stream resumability across restarts. Plaintext payloads in a separate DB file — does NOT share Mimir's Rust column-level AES-256-GCM on entities.body_json. - integrations/mimir-persist/: pyproject (pins mcp==1.28.1), store.py (MimirEventStore with lazy WAL init + cleanup_before retention), README, pytest-asyncio suite (append/replay-in-order, restart sim, cleanup). - Wired into integrations.yml CI matrix and integrations/README.md. MCP infrastructure (persistent EventStore backend), not a Mimir Python library. No Rust changes. Closes #273.
1 parent 2fd2f47 commit 1b3820a

7 files changed

Lines changed: 538 additions & 0 deletions

File tree

.github/workflows/integrations.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ jobs:
2828
# so the real package must be present to collect the tests.
2929
- adapter: langgraph
3030
extra_deps: "langgraph"
31+
# mimir-persist imports mcp.server.streamable_http at module load and
32+
# pins the SDK version (the EventStore ABC signature is version-specific).
33+
- adapter: mimir-persist
34+
extra_deps: "mcp==1.28.1"
3135
steps:
3236
- uses: actions/checkout@v4
3337

integrations/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ Ready-to-use adapters that connect Mimir to popular AI agent frameworks.
99
| **LangGraph** (LangChain) | `BaseStore` implementation | [`langgraph/`](langgraph/) |
1010
| **CrewAI** | Agent Tool | [`crewai/`](crewai/) |
1111
| **AutoGen** (AG2 / autogen-core) | `Memory` implementation | [`autogen/`](autogen/) |
12+
| **FastMCP EventStore** (MCP SDK) | `EventStore` implementation | [`mimir-persist/`](mimir-persist/) |
1213
| **Claude Code** (Anthropic) | MCP server config | [`../docs/integration/claude-code.md`](../docs/integration/claude-code.md) |
1314
| **Cursor** | MCP server config | [`../docs/integration/cursor.md`](../docs/integration/cursor.md) |
1415

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# mimir-persist
2+
3+
**Persistent FastMCP `EventStore` over SQLite.** Drop-in SSE stream
4+
resumability for any [FastMCP](https://github.com/modelcontextprotocol/python-sdk)
5+
/ Streamable-HTTP server, so client connections survive a server restart and
6+
replay the events they missed.
7+
8+
This is **MCP infrastructure** — a persistent `EventStore` backend that lives
9+
*below* the tool layer. It is **not** a "Mimir Python library" and does not
10+
expose Mimir's memory tools. It implements the MCP Python SDK's real
11+
`EventStore` ABC and stores events in its own dedicated SQLite file.
12+
13+
## Install
14+
15+
```bash
16+
pip install -e integrations/mimir-persist/
17+
```
18+
19+
This pulls in the pinned `mcp==1.28.1` SDK (the `EventStore` ABC signature is
20+
SDK-version specific).
21+
22+
## Quick Start
23+
24+
```python
25+
from mimir_persist import MimirEventStore
26+
from mcp.server.fastmcp import FastMCP
27+
28+
# Persist SSE events to a dedicated SQLite file.
29+
store = MimirEventStore(db_path="~/.mimir/data/mcp_events.db")
30+
31+
mcp = FastMCP("my-server", event_store=store)
32+
```
33+
34+
Now when a client reconnects with a `Last-Event-ID`, the transport replays the
35+
events that occurred after it — even if the server process was restarted in
36+
between.
37+
38+
## Interface
39+
40+
`MimirEventStore` implements the real
41+
`mcp.server.streamable_http.EventStore` ABC:
42+
43+
| Method | Purpose |
44+
|---|---|
45+
| `store_event(stream_id, message) -> event_id` | Persist one JSON-RPC event (or a `None` priming event), return its id |
46+
| `replay_events_after(last_event_id, send_callback) -> stream_id \| None` | Replay everything after `last_event_id` on its stream, in order |
47+
48+
Plus a retention helper that is **not** part of the ABC:
49+
50+
| Method | Purpose |
51+
|---|---|
52+
| `cleanup_before(cutoff=None, *, max_age_seconds=None) -> int` | Delete events past a retention boundary, return rows removed |
53+
54+
> The MCP SDK's method names evolve across versions. The names above
55+
> (`store_event` / `replay_events_after`) are what `mcp==1.28.1` actually
56+
> exposes — older proposals used `append_event` / `get_events_after`. This
57+
> package is implemented against the installed ABC, not a doc.
58+
59+
## Storage model
60+
61+
Events are stored in a **dedicated `events` table** in their **own** SQLite
62+
file — never in Mimir's Rust `entities` table.
63+
64+
```sql
65+
CREATE TABLE events (
66+
event_id TEXT PRIMARY KEY, -- opaque UUID returned to the client
67+
stream_id TEXT NOT NULL, -- the SSE stream the event belongs to
68+
seq INTEGER NOT NULL, -- global monotonic replay cursor
69+
payload TEXT, -- JSON of the JSONRPCMessage, NULL = priming
70+
created_at REAL NOT NULL -- unix seconds, used by cleanup_before
71+
);
72+
CREATE INDEX idx_events_stream_seq ON events(stream_id, seq);
73+
```
74+
75+
The connection is opened lazily in **WAL mode** (one writer, concurrent
76+
readers) and writes are serialised behind an `asyncio.Lock` so the monotonic
77+
`seq` counter is race-free within a process.
78+
79+
## ⚠️ Encryption: plaintext payloads
80+
81+
**Event payloads in this database are PLAINTEXT.** This store does **NOT**
82+
share Mimir's AES-256-GCM encryption.
83+
84+
Mimir's AES-256-GCM is implemented in the **Rust core** and applies only at the
85+
**column level** to `entities.body_json`. It does **not** extend to this
86+
separate, Python-side events database. If your JSON-RPC traffic is sensitive,
87+
encrypt the DB file at the OS/volume level. Optional Python-side payload
88+
encryption is a possible future enhancement, intentionally out of scope here.
89+
90+
## Requirements
91+
92+
- Python 3.10+
93+
- `mcp==1.28.1`
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
"""mimir-persist — a persistent FastMCP ``EventStore`` backed by SQLite.
2+
3+
MCP infrastructure: gives any FastMCP server SSE stream resumability across
4+
restarts. See :class:`mimir_persist.store.MimirEventStore`.
5+
"""
6+
7+
from mimir_persist.store import MimirEventStore
8+
9+
__all__ = ["MimirEventStore"]
10+
__version__ = "0.1.0"
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
"""
2+
MimirEventStore — a persistent FastMCP ``EventStore`` backed by SQLite.
3+
4+
This is **MCP infrastructure**, not a "Mimir Python library". It gives any
5+
FastMCP / Streamable-HTTP server SSE *stream resumability* across restarts by
6+
durably persisting the JSON-RPC events that flow over a session's SSE streams.
7+
When a client reconnects with a ``Last-Event-ID``, the transport asks this store
8+
to replay everything that came after it.
9+
10+
It implements the real ``mcp.server.streamable_http.EventStore`` ABC:
11+
12+
* ``store_event(stream_id, message) -> event_id``
13+
* ``replay_events_after(last_event_id, send_callback) -> stream_id | None``
14+
15+
plus a ``cleanup_before(...)`` retention helper (not part of the ABC).
16+
17+
Usage::
18+
19+
from mimir_persist import MimirEventStore
20+
from mcp.server.fastmcp import FastMCP
21+
22+
mcp = FastMCP("my-server", event_store=MimirEventStore())
23+
24+
Storage model
25+
-------------
26+
Events live in a **dedicated** ``events`` table in their **own** SQLite file
27+
(default ``~/.mimir/data/mcp_events.db``). This is deliberately separate from
28+
Mimir's Rust ``entities`` table — the two never share a table or a file.
29+
30+
CREATE TABLE events (
31+
event_id TEXT PRIMARY KEY, -- opaque UUID handed back to the client
32+
stream_id TEXT NOT NULL, -- the SSE stream the event belongs to
33+
seq INTEGER NOT NULL, -- global monotonic order (replay cursor)
34+
payload TEXT, -- JSON of the JSONRPCMessage, or NULL (priming)
35+
created_at REAL NOT NULL -- unix seconds, used by retention cleanup
36+
);
37+
CREATE INDEX idx_events_stream_seq ON events(stream_id, seq);
38+
39+
.. warning::
40+
41+
**Payloads are stored as PLAINTEXT.** This store does **NOT** share Mimir's
42+
AES-256-GCM encryption. That encryption is implemented in Mimir's Rust core
43+
and only ever applies at the *column* level to ``entities.body_json`` — it
44+
does not extend to this separate events database. If your JSON-RPC traffic
45+
carries sensitive data, encrypt the database file at the OS/volume level, or
46+
wrap this store (Python-side encryption of ``payload`` is an optional
47+
stretch, intentionally out of scope here).
48+
49+
Concurrency
50+
-----------
51+
The store opens one SQLite connection in WAL mode (one writer, many readers) and
52+
serialises writes behind an ``asyncio.Lock`` so the monotonic ``seq`` counter is
53+
assigned without races inside a single server process.
54+
"""
55+
56+
from __future__ import annotations
57+
58+
import asyncio
59+
import sqlite3
60+
import time
61+
import uuid
62+
from pathlib import Path
63+
from typing import Optional
64+
65+
from mcp.server.streamable_http import (
66+
EventCallback,
67+
EventId,
68+
EventMessage,
69+
EventStore,
70+
StreamId,
71+
)
72+
from mcp.types import JSONRPCMessage
73+
74+
__all__ = ["MimirEventStore"]
75+
76+
77+
class MimirEventStore(EventStore):
78+
"""Durable SQLite-backed :class:`EventStore` for FastMCP SSE resumability.
79+
80+
Args:
81+
db_path: Path to the dedicated events SQLite file. Created (with parent
82+
dirs) on first use. Defaults to ``~/.mimir/data/mcp_events.db``.
83+
This file is **separate** from Mimir's entity database.
84+
"""
85+
86+
def __init__(self, db_path: str = "~/.mimir/data/mcp_events.db") -> None:
87+
self.db_path = str(Path(db_path).expanduser())
88+
self._conn: Optional[sqlite3.Connection] = None
89+
self._lock = asyncio.Lock()
90+
91+
# ------------------------------------------------------------------ #
92+
# Lazy connection / schema
93+
# ------------------------------------------------------------------ #
94+
def _connect(self) -> sqlite3.Connection:
95+
"""Open (once) the SQLite connection, enabling WAL and the schema.
96+
97+
Initialisation is lazy so constructing the store never touches disk —
98+
the file and schema appear on the first ``store_event`` / replay call.
99+
"""
100+
if self._conn is not None:
101+
return self._conn
102+
103+
if self.db_path != ":memory:":
104+
Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
105+
106+
conn = sqlite3.connect(self.db_path, check_same_thread=False)
107+
conn.row_factory = sqlite3.Row
108+
# WAL: durable across restarts, one writer + concurrent readers.
109+
# A :memory: db rejects WAL, so only set it for real files.
110+
if self.db_path != ":memory:":
111+
conn.execute("PRAGMA journal_mode=WAL")
112+
conn.execute("PRAGMA synchronous=NORMAL")
113+
conn.execute(
114+
"""
115+
CREATE TABLE IF NOT EXISTS events (
116+
event_id TEXT PRIMARY KEY,
117+
stream_id TEXT NOT NULL,
118+
seq INTEGER NOT NULL,
119+
payload TEXT,
120+
created_at REAL NOT NULL
121+
)
122+
"""
123+
)
124+
conn.execute(
125+
"CREATE INDEX IF NOT EXISTS idx_events_stream_seq "
126+
"ON events(stream_id, seq)"
127+
)
128+
conn.commit()
129+
self._conn = conn
130+
return conn
131+
132+
def _next_seq(self, conn: sqlite3.Connection) -> int:
133+
"""Return the next global monotonic sequence value.
134+
135+
Called under ``self._lock`` so concurrent ``store_event`` coroutines in
136+
the same process can't hand out a duplicate ``seq``.
137+
"""
138+
row = conn.execute("SELECT COALESCE(MAX(seq), 0) AS m FROM events").fetchone()
139+
return int(row["m"]) + 1
140+
141+
# ------------------------------------------------------------------ #
142+
# EventStore ABC
143+
# ------------------------------------------------------------------ #
144+
async def store_event(
145+
self, stream_id: StreamId, message: JSONRPCMessage | None
146+
) -> EventId:
147+
"""Persist one event for ``stream_id`` and return its new event id.
148+
149+
``message`` may be ``None`` for transport "priming" events; that is
150+
stored as a NULL payload and replayed back as ``None``.
151+
"""
152+
event_id = uuid.uuid4().hex
153+
payload = message.model_dump_json() if message is not None else None
154+
created_at = time.time()
155+
156+
async with self._lock:
157+
conn = self._connect()
158+
seq = self._next_seq(conn)
159+
conn.execute(
160+
"INSERT INTO events (event_id, stream_id, seq, payload, created_at) "
161+
"VALUES (?, ?, ?, ?, ?)",
162+
(event_id, stream_id, seq, payload, created_at),
163+
)
164+
conn.commit()
165+
return event_id
166+
167+
async def replay_events_after(
168+
self,
169+
last_event_id: EventId,
170+
send_callback: EventCallback,
171+
) -> StreamId | None:
172+
"""Replay every event that followed ``last_event_id`` on its stream.
173+
174+
Resolves ``last_event_id`` to its ``(stream_id, seq)``, then streams all
175+
later events of that same stream — in ``seq`` order — through
176+
``send_callback``. Returns the resolved stream id, or ``None`` when the
177+
event id is unknown (e.g. already cleaned up, or never existed).
178+
"""
179+
async with self._lock:
180+
conn = self._connect()
181+
anchor = conn.execute(
182+
"SELECT stream_id, seq FROM events WHERE event_id = ?",
183+
(last_event_id,),
184+
).fetchone()
185+
if anchor is None:
186+
return None
187+
188+
stream_id = anchor["stream_id"]
189+
rows = conn.execute(
190+
"SELECT event_id, payload FROM events "
191+
"WHERE stream_id = ? AND seq > ? ORDER BY seq ASC",
192+
(stream_id, anchor["seq"]),
193+
).fetchall()
194+
195+
# Fire callbacks outside the lock — send_callback is user/transport code.
196+
for row in rows:
197+
payload = row["payload"]
198+
message = (
199+
JSONRPCMessage.model_validate_json(payload)
200+
if payload is not None
201+
else None
202+
)
203+
await send_callback(EventMessage(message=message, event_id=row["event_id"]))
204+
205+
return stream_id
206+
207+
# ------------------------------------------------------------------ #
208+
# Retention (not part of the ABC)
209+
# ------------------------------------------------------------------ #
210+
async def cleanup_before(
211+
self,
212+
cutoff: float | None = None,
213+
*,
214+
max_age_seconds: float | None = None,
215+
) -> int:
216+
"""Delete events older than a retention boundary; return rows removed.
217+
218+
Provide either an absolute ``cutoff`` (unix seconds — delete events with
219+
``created_at < cutoff``) or ``max_age_seconds`` (delete events older than
220+
that, relative to now). ``max_age_seconds`` wins if both are given.
221+
Calling with neither is a no-op that returns 0.
222+
"""
223+
if max_age_seconds is not None:
224+
cutoff = time.time() - max_age_seconds
225+
if cutoff is None:
226+
return 0
227+
228+
async with self._lock:
229+
conn = self._connect()
230+
cur = conn.execute("DELETE FROM events WHERE created_at < ?", (cutoff,))
231+
conn.commit()
232+
return cur.rowcount
233+
234+
def close(self) -> None:
235+
"""Close the underlying SQLite connection if open."""
236+
if self._conn is not None:
237+
self._conn.close()
238+
self._conn = None

0 commit comments

Comments
 (0)