11#pragma once
22
3+ #include < algorithm>
34#include < cstddef>
5+ #include < cstring>
6+ #include < endian.h>
47#include < functional>
58#include < unordered_map>
69#include < vector>
710
811#include " pomai/graph.h"
12+ #include " pomai/slice.h"
913#include " pomai/status.h"
1014#include " storage/wal/wal.h"
1115#include " core/graph/graph_key.h"
@@ -14,35 +18,75 @@ namespace pomai::core {
1418
1519/* *
1620 * @brief Internal implementation of GraphMembrane.
21+ *
22+ * Persists vertex and edge writes to a WAL using kRawKV records.
23+ * On restart, WarmUp() replays the WAL to rebuild adj_lists_.
24+ * Deletions use tombstone key prefixes ('T' for vertex, 'X' for edge)
25+ * that are also replayed during WarmUp().
1726 */
1827class GraphMembraneImpl : public pomai ::GraphMembrane {
1928public:
20- GraphMembraneImpl (std::unique_ptr<storage::Wal> wal) : wal_(std::move(wal)) {}
29+ explicit GraphMembraneImpl (std::unique_ptr<storage::Wal> wal) : wal_(std::move(wal)) {}
2130
2231 Status AddVertex (VertexId id, TagId tag, const Metadata& meta) override {
23- // 1. Persist to WAL
2432 std::string key = GraphKey::EncodeVertex (id, tag);
2533 Status st = wal_->AppendRawKV (4 /* kRawKV */ , Slice (key), Slice (meta.tenant ));
2634 if (!st.ok ()) return st;
27-
28- // 2. Update RAM index (structural only)
2935 if (adj_lists_.find (id) == adj_lists_.end ()) {
3036 adj_lists_[id] = {};
37+ bytes_used_ += sizeof (VertexId) + sizeof (std::vector<Neighbor>);
3138 }
3239 return Status::Ok ();
3340 }
3441
3542 Status AddEdge (VertexId src, VertexId dst, EdgeType type, uint32_t rank, const Metadata& meta) override {
36- // 1. Persist to WAL
3743 std::string key = GraphKey::EncodeEdge (src, type, rank, dst);
3844 Status st = wal_->AppendRawKV (4 /* kRawKV */ , Slice (key), Slice (meta.tenant ));
3945 if (!st.ok ()) return st;
40-
41- // 2. Update RAM index (structural only)
42- // Add to contiguous store
4346 Neighbor n{dst, type, rank};
44- auto & list = adj_lists_[src];
45- list.push_back (n);
47+ adj_lists_[src].push_back (n);
48+ bytes_used_ += sizeof (Neighbor);
49+ return Status::Ok ();
50+ }
51+
52+ Status DeleteVertex (VertexId id) override {
53+ // Tombstone key: 'T' (1) | VertexId (8)
54+ std::string key (1 , ' T' );
55+ uint64_t v_be = htobe64 (id);
56+ key.append (reinterpret_cast <const char *>(&v_be), 8 );
57+ Status st = wal_->AppendRawKV (4 /* kRawKV */ , Slice (key), Slice (" " ));
58+ if (!st.ok ()) return st;
59+ auto it = adj_lists_.find (id);
60+ if (it != adj_lists_.end ()) {
61+ bytes_used_ -= sizeof (VertexId) + sizeof (std::vector<Neighbor>) +
62+ it->second .size () * sizeof (Neighbor);
63+ adj_lists_.erase (it);
64+ }
65+ return Status::Ok ();
66+ }
67+
68+ Status DeleteEdge (VertexId src, VertexId dst, EdgeType type) override {
69+ // Tombstone key: 'X' (1) | SrcID (8) | EdgeType (4) | Rank=0 (4) | DstID (8)
70+ std::string key (1 , ' X' );
71+ uint64_t s_be = htobe64 (src);
72+ uint32_t t_be = htobe32 (static_cast <uint32_t >(type));
73+ uint32_t r_be = 0 ;
74+ uint64_t d_be = htobe64 (dst);
75+ key.append (reinterpret_cast <const char *>(&s_be), 8 );
76+ key.append (reinterpret_cast <const char *>(&t_be), 4 );
77+ key.append (reinterpret_cast <const char *>(&r_be), 4 );
78+ key.append (reinterpret_cast <const char *>(&d_be), 8 );
79+ Status st = wal_->AppendRawKV (4 /* kRawKV */ , Slice (key), Slice (" " ));
80+ if (!st.ok ()) return st;
81+ auto it = adj_lists_.find (src);
82+ if (it != adj_lists_.end ()) {
83+ auto & v = it->second ;
84+ auto before = v.size ();
85+ v.erase (std::remove_if (v.begin (), v.end (),
86+ [dst, type](const Neighbor& n) { return n.id == dst && n.type == type; }),
87+ v.end ());
88+ bytes_used_ -= (before - v.size ()) * sizeof (Neighbor);
89+ }
4690 return Status::Ok ();
4791 }
4892
@@ -58,36 +102,101 @@ class GraphMembraneImpl : public pomai::GraphMembrane {
58102 auto it = adj_lists_.find (src);
59103 if (it != adj_lists_.end ()) {
60104 for (const auto & n : it->second ) {
61- if (n.type == type) {
62- out->push_back (n);
63- }
105+ if (n.type == type) out->push_back (n);
64106 }
65107 }
66108 return Status::Ok ();
67109 }
68110
69- Status Flush () override {
70- return wal_->Flush ();
71- }
111+ Status Flush () override { return wal_->Flush (); }
72112
73113 Status BeginBatch () { return wal_ ? wal_->BeginBatch () : Status::Ok (); }
74- Status EndBatch () { return wal_ ? wal_->EndBatch () : Status::Ok (); }
114+ Status EndBatch () { return wal_ ? wal_->EndBatch () : Status::Ok (); }
115+
116+ /* *
117+ * Called during Database::Open() to rebuild adj_lists_ from the WAL.
118+ * Delegates to Wal::ReplayGraphInto which calls ReplayEntry() for each
119+ * kRawKV record found in the WAL segments.
120+ */
121+ Status WarmUp () { return wal_->ReplayGraphInto (this ); }
122+
123+ /* *
124+ * Called by Wal::ReplayGraphInto for each kRawKV key decoded from the WAL.
125+ * Reconstructs adj_lists_ and bytes_used_ from the encoded key bytes.
126+ */
127+ void ReplayEntry (pomai::Slice key) {
128+ if (key.size () < 1 ) return ;
129+ const auto * p = static_cast <const uint8_t *>(static_cast <const void *>(key.data ()));
130+ const uint8_t prefix = p[0 ];
131+
132+ if (prefix == GraphKey::kVertex && key.size () >= 13 ) {
133+ uint64_t vid_be;
134+ std::memcpy (&vid_be, p + 1 , 8 );
135+ VertexId vid = be64toh (vid_be);
136+ if (adj_lists_.find (vid) == adj_lists_.end ()) {
137+ adj_lists_[vid] = {};
138+ bytes_used_ += sizeof (VertexId) + sizeof (std::vector<Neighbor>);
139+ }
75140
76- // Called during Database::Open()
77- Status WarmUp () {
78- // WarmUp can replay WAL entries to rebuild adj_lists_
79- return Status::Ok ();
141+ } else if (prefix == GraphKey::kEdge && key.size () >= 25 ) {
142+ uint64_t src_be, dst_be;
143+ uint32_t type_be, rank_be;
144+ std::memcpy (&src_be, p + 1 , 8 );
145+ std::memcpy (&type_be, p + 9 , 4 );
146+ std::memcpy (&rank_be, p + 13 , 4 );
147+ std::memcpy (&dst_be, p + 17 , 8 );
148+ VertexId src = be64toh (src_be);
149+ Neighbor n{be64toh (dst_be), be32toh (type_be), be32toh (rank_be)};
150+ adj_lists_[src].push_back (n);
151+ bytes_used_ += sizeof (Neighbor);
152+
153+ } else if (prefix == ' T' && key.size () >= 9 ) {
154+ // Vertex tombstone
155+ uint64_t vid_be;
156+ std::memcpy (&vid_be, p + 1 , 8 );
157+ VertexId vid = be64toh (vid_be);
158+ auto it = adj_lists_.find (vid);
159+ if (it != adj_lists_.end ()) {
160+ bytes_used_ -= sizeof (VertexId) + sizeof (std::vector<Neighbor>) +
161+ it->second .size () * sizeof (Neighbor);
162+ adj_lists_.erase (it);
163+ }
164+
165+ } else if (prefix == ' X' && key.size () >= 25 ) {
166+ // Edge tombstone
167+ uint64_t src_be, dst_be;
168+ uint32_t type_be;
169+ std::memcpy (&src_be, p + 1 , 8 );
170+ std::memcpy (&type_be, p + 9 , 4 );
171+ // rank at +13 is ignored for tombstone matching
172+ std::memcpy (&dst_be, p + 17 , 8 );
173+ VertexId src = be64toh (src_be);
174+ VertexId dst = be64toh (dst_be);
175+ EdgeType etype = be32toh (type_be);
176+ auto it = adj_lists_.find (src);
177+ if (it != adj_lists_.end ()) {
178+ auto & v = it->second ;
179+ auto before = v.size ();
180+ v.erase (std::remove_if (v.begin (), v.end (),
181+ [dst, etype](const Neighbor& n) { return n.id == dst && n.type == etype; }),
182+ v.end ());
183+ bytes_used_ -= (before - v.size ()) * sizeof (Neighbor);
184+ }
185+ }
80186 }
81187
188+ std::size_t MemoryBytesUsed () const { return bytes_used_; }
189+
82190 void ForEachVertex (const std::function<void (pomai::VertexId id, std::size_t out_degree)>& fn) const {
83191 for (const auto & [vid, neigh] : adj_lists_) fn (vid, neigh.size ());
84192 }
85193
86194private:
87195 std::unique_ptr<storage::Wal> wal_;
88- // Contiguous Adjacency Store (Simplified for now - using map to vectors but intended for mmap)
89- // In a full implementation, this would be a single large buffer + offset index.
196+ // In-memory adjacency store. Rebuilt from WAL on open via WarmUp().
90197 std::unordered_map<VertexId, std::vector<Neighbor>> adj_lists_;
198+ // Approximate memory usage for backpressure / quota reporting.
199+ std::size_t bytes_used_ = 0 ;
91200};
92201
93202} // namespace pomai::core
0 commit comments