Skip to content

Commit 90d6207

Browse files
committed
refactor: align chunks with message boundaries
1 parent 1eba7bb commit 90d6207

1 file changed

Lines changed: 223 additions & 9 deletions

File tree

src/mmap.rs

Lines changed: 223 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -93,41 +93,150 @@ impl MmapParser {
9393

9494
pub struct ChunkedMmapParser {
9595
mmap: Mmap,
96-
chunk_size: usize,
96+
chunk_ranges: Vec<(usize, usize)>,
97+
target_chunk_size: usize,
9798
}
9899

99100
impl ChunkedMmapParser {
100-
pub fn open<P: AsRef<Path>>(path: P, chunk_size: usize) -> io::Result<Self> {
101+
pub fn open<P: AsRef<Path>>(path: P, target_chunk_size: usize) -> io::Result<Self> {
101102
let file = File::open(path)?;
102103
let mmap = unsafe { Mmap::map(&file)? };
104+
let target_chunk_size = target_chunk_size.max(4096);
105+
106+
let boundaries = Self::build_message_boundaries(&mmap);
107+
let chunk_ranges = Self::compute_aligned_chunks(&boundaries, mmap.len(), target_chunk_size);
108+
103109
Ok(Self {
104110
mmap,
105-
chunk_size: chunk_size.max(4096),
111+
chunk_ranges,
112+
target_chunk_size,
106113
})
107114
}
108115

116+
fn build_message_boundaries(data: &[u8]) -> Vec<usize> {
117+
let estimated = data.len() / 32;
118+
let mut boundaries = Vec::with_capacity(estimated);
119+
let mut offset = 0;
120+
121+
while offset + 2 <= data.len() {
122+
let len = u16::from_be_bytes([data[offset], data[offset + 1]]) as usize;
123+
let next = offset + 2 + len;
124+
if next > data.len() {
125+
break;
126+
}
127+
boundaries.push(next);
128+
offset = next;
129+
}
130+
131+
boundaries
132+
}
133+
134+
fn compute_aligned_chunks(
135+
boundaries: &[usize],
136+
total_len: usize,
137+
target_size: usize,
138+
) -> Vec<(usize, usize)> {
139+
if boundaries.is_empty() {
140+
return if total_len > 0 {
141+
vec![(0, total_len)]
142+
} else {
143+
Vec::new()
144+
};
145+
}
146+
147+
let mut chunks = Vec::new();
148+
let mut start = 0;
149+
150+
while start < total_len {
151+
let target_end = (start + target_size).min(total_len);
152+
153+
let end = match boundaries.binary_search(&target_end) {
154+
Ok(idx) => boundaries[idx],
155+
Err(idx) => {
156+
if idx >= boundaries.len() {
157+
*boundaries.last().unwrap_or(&total_len)
158+
} else {
159+
boundaries[idx]
160+
}
161+
}
162+
};
163+
164+
if end > start {
165+
chunks.push((start, end));
166+
start = end;
167+
} else {
168+
break;
169+
}
170+
}
171+
172+
chunks
173+
}
174+
175+
#[inline]
176+
pub fn chunk_ranges(&self) -> &[(usize, usize)] {
177+
&self.chunk_ranges
178+
}
179+
109180
#[inline]
110181
pub fn chunks(&self) -> impl Iterator<Item = &[u8]> {
111-
self.mmap.chunks(self.chunk_size)
182+
self.chunk_ranges
183+
.iter()
184+
.map(|(start, end)| &self.mmap[*start..*end])
112185
}
113186

114187
#[inline]
115188
pub fn num_chunks(&self) -> usize {
116-
self.mmap.len().div_ceil(self.chunk_size)
189+
self.chunk_ranges.len()
117190
}
118191

119192
pub fn parse_chunk(&self, chunk_idx: usize) -> Result<(Vec<ZeroCopyMessage<'_>>, usize)> {
120-
let start = chunk_idx * self.chunk_size;
121-
if start >= self.mmap.len() {
193+
if chunk_idx >= self.chunk_ranges.len() {
194+
return Ok((Vec::new(), 0));
195+
}
196+
197+
let (start, end) = self.chunk_ranges[chunk_idx];
198+
self.parse_chunk_range(start, end)
199+
}
200+
201+
pub fn parse_chunk_range(
202+
&self,
203+
start: usize,
204+
end: usize,
205+
) -> Result<(Vec<ZeroCopyMessage<'_>>, usize)> {
206+
if start >= self.mmap.len() || start >= end {
122207
return Ok((Vec::new(), 0));
123208
}
124-
let end = (start + self.chunk_size).min(self.mmap.len());
125-
let chunk = &self.mmap[start..end];
209+
210+
let chunk = &self.mmap[start..end.min(self.mmap.len())];
126211
let mut parser = ZeroCopyParser::new(chunk);
127212
let messages: Vec<_> = parser.parse_all().collect();
128213
let consumed = parser.position();
129214
Ok((messages, consumed))
130215
}
216+
217+
#[inline]
218+
pub fn data(&self) -> &[u8] {
219+
&self.mmap
220+
}
221+
222+
#[inline]
223+
pub fn len(&self) -> usize {
224+
self.mmap.len()
225+
}
226+
227+
#[inline]
228+
pub fn is_empty(&self) -> bool {
229+
self.mmap.is_empty()
230+
}
231+
232+
#[inline]
233+
pub fn target_chunk_size(&self) -> usize {
234+
self.target_chunk_size
235+
}
236+
237+
pub fn total_messages(&self) -> usize {
238+
crate::simd::count_messages_fast(&self.mmap)
239+
}
131240
}
132241

133242
pub struct MmapParserShared {
@@ -179,11 +288,29 @@ mod tests {
179288
use std::io::Write;
180289
use tempfile::NamedTempFile;
181290

291+
const VALID_TYPES: [u8; 21] = [
292+
b'S', b'R', b'H', b'Y', b'L', b'V', b'W', b'K', b'A', b'F', b'E', b'C', b'X', b'D', b'U',
293+
b'P', b'Q', b'B', b'I', b'N', b'J',
294+
];
295+
182296
fn create_test_file() -> NamedTempFile {
183297
let mut file = NamedTempFile::new().unwrap();
184298
file.write_all(&[0u8; 100]).unwrap();
185299
file
186300
}
301+
fn create_itch_test_file(messages: &[(u8, usize)]) -> NamedTempFile {
302+
let mut file = NamedTempFile::new().unwrap();
303+
for (msg_type, payload_len) in messages {
304+
let len = 1 + 10 + payload_len; // type + header + payload
305+
file.write_all(&(len as u16).to_be_bytes()).unwrap();
306+
file.write_all(&[*msg_type]).unwrap();
307+
file.write_all(&[0u8; 10]).unwrap(); // header
308+
file.write_all(&vec![0xABu8; *payload_len]).unwrap();
309+
}
310+
// Ensure all data is synced to disk before mmap reads it
311+
file.as_file().sync_all().unwrap();
312+
file
313+
}
187314

188315
#[test]
189316
fn test_mmap_parser_open() {
@@ -198,4 +325,91 @@ mod tests {
198325
let parser = MmapParser::open(file.path()).unwrap();
199326
assert_eq!(parser.len(), 100);
200327
}
328+
329+
#[test]
330+
fn test_chunked_parser_message_alignment() {
331+
let messages: Vec<(u8, usize)> = (0..500)
332+
.map(|i| (VALID_TYPES[i % VALID_TYPES.len()], (i % 50) + 15))
333+
.collect();
334+
335+
let file = create_itch_test_file(&messages);
336+
337+
let parser = ChunkedMmapParser::open(file.path(), 4096).unwrap();
338+
339+
assert!(
340+
parser.num_chunks() > 1,
341+
"Should have multiple chunks, got {}",
342+
parser.num_chunks()
343+
);
344+
345+
let mut total_messages = 0;
346+
for chunk_idx in 0..parser.num_chunks() {
347+
let (msgs, consumed) = parser.parse_chunk(chunk_idx).unwrap();
348+
total_messages += msgs.len();
349+
350+
let (start, end) = parser.chunk_ranges()[chunk_idx];
351+
assert_eq!(
352+
consumed,
353+
end - start,
354+
"Chunk {} should be fully consumed",
355+
chunk_idx
356+
);
357+
}
358+
359+
assert_eq!(total_messages, 500, "All 500 messages should be parsed");
360+
}
361+
362+
#[test]
363+
fn test_chunked_parser_no_overlap() {
364+
let messages: Vec<(u8, usize)> = (0..50)
365+
.map(|_| (b'A', 30)) // Fixed 30-byte payloads, valid 'A' (AddOrder) type
366+
.collect();
367+
368+
let file = create_itch_test_file(&messages);
369+
let parser = ChunkedMmapParser::open(file.path(), 100).unwrap();
370+
371+
let ranges = parser.chunk_ranges();
372+
for i in 1..ranges.len() {
373+
assert_eq!(
374+
ranges[i].0,
375+
ranges[i - 1].1,
376+
"Chunk {} should start where chunk {} ends",
377+
i,
378+
i - 1
379+
);
380+
}
381+
382+
if !ranges.is_empty() {
383+
assert_eq!(ranges[0].0, 0, "First chunk should start at 0");
384+
}
385+
}
386+
387+
#[test]
388+
fn test_chunked_parser_parallel_safety() {
389+
let messages: Vec<(u8, usize)> = (0..200)
390+
.map(|i| (VALID_TYPES[i % VALID_TYPES.len()], 10 + (i % 15)))
391+
.collect();
392+
393+
let file = create_itch_test_file(&messages);
394+
let parser = ChunkedMmapParser::open(file.path(), 500).unwrap();
395+
396+
let mut all_types: Vec<u8> = Vec::new();
397+
for chunk_idx in 0..parser.num_chunks() {
398+
let (msgs, _) = parser.parse_chunk(chunk_idx).unwrap();
399+
for msg in msgs {
400+
all_types.push(msg.msg_type());
401+
}
402+
}
403+
404+
assert_eq!(
405+
all_types.len(),
406+
200,
407+
"Should have 200 messages, got {}",
408+
all_types.len()
409+
);
410+
for (i, &msg_type) in all_types.iter().enumerate() {
411+
let expected = VALID_TYPES[i % VALID_TYPES.len()];
412+
assert_eq!(msg_type, expected, "Message {} has wrong type", i);
413+
}
414+
}
201415
}

0 commit comments

Comments
 (0)