Skip to content

Commit 4a20cad

Browse files
committed
Use a dedicated type for the index
Previously, we used `u64`, which is unlikely to ever overflow. Even at 60 Hz, it would take ~9.7 billion years. But the code used checked operations, even though they would not help. I was going to switch to regular operations, but realized that if we handle overflows properly, we could save some traffic. By default, integers in postcard are serialized with varint encoding. > 128 takes 2 bytes, > 16384 takes 3 bytes, and so on. But we can switch to `u16` with overflow handling and fixint encoding, which always takes 2 bytes. This also simplifies the logic inside `batches_after`. I also had to replace `BTreeMap` with `HashMap`, but we don't need to preserve the ordering since we iterate over indices, so it's fine.
1 parent 897d0b0 commit 4a20cad

7 files changed

Lines changed: 104 additions & 42 deletions

File tree

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -750,7 +750,7 @@ pub mod prelude {
750750
protocol::{ProtocolHash, ProtocolHasher, ProtocolMismatch},
751751
replication::{
752752
Replicated,
753-
diff::{DiffEntityExt, Diffable, PatchHistory, PatchIndex},
753+
diff::{DiffEntityExt, Diffable, PatchHistory, patch_index::PatchIndex},
754754
receive_markers::AppMarkerExt,
755755
registry::rule_fns::RuleFns,
756756
rules::{AppRuleExt, component::ReplicationMode},

src/server/replication_messages/mutations.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::{
1313
backend::channels::ServerChannel,
1414
replication::{
1515
client_ticks::{ClientTicks, MutateInfo, MutatedEntityInfo, PatchCursors},
16-
diff::PatchIndex,
16+
diff::patch_index::PatchIndex,
1717
mutate_index::MutateIndex,
1818
registry::{ComponentIndex, component_mask::ComponentMask},
1919
},

src/server/replication_messages/serialized_data.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::{
66
postcard_utils,
77
prelude::*,
88
shared::replication::{
9-
diff::{DiffFns, PatchIndex},
9+
diff::{DiffFns, patch_index::PatchIndex},
1010
registry::{FnsId, ctx::SerializeCtx, serde_fns::SerdeFns},
1111
},
1212
};

src/shared/replication/client_ticks.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use super::mutate_index::MutateIndex;
1212
use crate::{
1313
prelude::*,
1414
shared::replication::{
15-
diff::PatchIndex,
15+
diff::patch_index::PatchIndex,
1616
registry::{ComponentIndex, component_mask::ComponentMask},
1717
},
1818
};
@@ -183,15 +183,17 @@ impl EntityTicks {
183183
/// Advances the acknowledged diff patch cursor for `component`.
184184
///
185185
/// Mutation ACKs can arrive late relative to newer mutation ACKs. Keep the
186-
/// greatest cursor so an older ACK cannot make the sender resend patches
186+
/// newest cursor so an older ACK cannot make the sender resend patches
187187
/// already acknowledged by a newer packet.
188188
fn set_patch_cursor(&mut self, component: ComponentIndex, cursor: PatchIndex) {
189189
if let Some((_, existing)) = self
190190
.patch_cursors
191191
.iter_mut()
192192
.find(|(index, _)| *index == component)
193193
{
194-
*existing = (*existing).max(cursor);
194+
if cursor.is_newer_than(*existing) {
195+
*existing = cursor;
196+
}
195197
} else {
196198
self.patch_cursors.push((component, cursor));
197199
}

src/shared/replication/diff.rs

Lines changed: 21 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22
//!
33
//! See [`Diffable`] for the main user-facing API and example.
44
5+
pub mod patch_index;
6+
57
use core::{iter, mem};
68

79
use alloc::{
8-
collections::{BTreeMap, VecDeque, vec_deque},
10+
collections::{VecDeque, vec_deque},
911
format,
1012
vec::Vec,
1113
};
@@ -16,6 +18,7 @@ use bevy::{
1618
system::EntityCommands,
1719
world::EntityWorldMut,
1820
},
21+
platform::collections::HashMap,
1922
prelude::*,
2023
ptr::Ptr,
2124
};
@@ -33,9 +36,7 @@ use crate::{
3336
},
3437
},
3538
};
36-
37-
/// Monotonic index assigned to a sent diff batch.
38-
pub type PatchIndex = u64;
39+
use patch_index::PatchIndex;
3940

4041
/// Component whose mutations can be represented as an ordered history of patches.
4142
///
@@ -159,9 +160,7 @@ impl<C: Diffable> PatchHistory<C> {
159160
return self.last_index;
160161
}
161162

162-
let index = self
163-
.last_index
164-
.map_or(0, |last_index| last_index.saturating_add(1));
163+
let index = self.last_index.map_or(PatchIndex::new(0), |i| i + 1);
165164
self.last_index = Some(index);
166165
self.batches.push_back(mem::take(&mut self.pending));
167166
self.prune_to_limit();
@@ -178,20 +177,22 @@ impl<C: Diffable> PatchHistory<C> {
178177
return None;
179178
}
180179

181-
let first_index = last_index - (self.batches.len() as PatchIndex - 1);
182-
let next_index = cursor + 1;
183-
if next_index < first_index {
184-
// Cursor is outside of the history window.
180+
let missing_count = last_index.distance_after(cursor) as usize;
181+
if missing_count == 0 {
182+
// Client is already at the latest cursor.
183+
// The component was mutated directly.
185184
return None;
186185
}
187186

188-
if cursor >= last_index {
187+
if missing_count > self.batches.len() {
188+
// Client cursor is outside the history window.
189189
return None;
190190
}
191191

192-
let start = (next_index - first_index) as usize;
192+
let start = self.batches.len() - missing_count;
193+
193194
Some(BatchSlice {
194-
first_index: next_index,
195+
first_index: cursor + 1,
195196
batches: self.batches.range(start..),
196197
})
197198
}
@@ -233,7 +234,7 @@ impl<C: Diffable> Default for PatchHistory<C> {
233234
#[derive(Component, Debug)]
234235
pub struct PatchBuffer<C: Diffable> {
235236
last_applied: Option<PatchIndex>,
236-
pending: BTreeMap<PatchIndex, Vec<C::Patch>>,
237+
pending: HashMap<PatchIndex, Vec<C::Patch>>,
237238
}
238239

239240
impl<C: Diffable> PatchBuffer<C> {
@@ -260,29 +261,22 @@ impl<C: Diffable> PatchBuffer<C> {
260261
batches: Vec<Vec<C::Patch>>,
261262
) -> impl Iterator<Item = Vec<C::Patch>> + '_ {
262263
for (offset, batch) in batches.into_iter().enumerate() {
263-
let index = first_index + offset as PatchIndex;
264+
let index = first_index + offset as u16;
264265
if self
265266
.last_applied
266-
.is_none_or(|last_applied| index > last_applied)
267+
.is_none_or(|last_applied| index.is_newer_than(last_applied))
267268
{
268269
self.pending.entry(index).or_insert(batch);
269270
}
270271
}
271272

272273
iter::from_fn(move || {
273-
let next_index = self.next_patch_index()?;
274+
let next_index = self.last_applied.map_or(PatchIndex::new(0), |i| i + 1);
274275
let batch = self.pending.remove(&next_index)?;
275276
self.last_applied = Some(next_index);
276277
Some(batch)
277278
})
278279
}
279-
280-
fn next_patch_index(&self) -> Option<PatchIndex> {
281-
match self.last_applied {
282-
Some(index) => index.checked_add(1),
283-
None => Some(0),
284-
}
285-
}
286280
}
287281

288282
impl<C: Diffable> Default for PatchBuffer<C> {
@@ -569,8 +563,8 @@ mod tests {
569563
history.record(2);
570564
history.finish_pending();
571565

572-
let slice = history.batches_after(0).unwrap();
573-
assert_eq!(slice.first_index, 1);
566+
let slice = history.batches_after(PatchIndex::new(0)).unwrap();
567+
assert_eq!(slice.first_index.get(), 1);
574568
assert_ne!(slice.batches.len(), 0);
575569
}
576570
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use core::ops::{Add, Sub};
2+
3+
use serde::{Deserialize, Serialize};
4+
5+
/// Monotonic index assigned to a sent diff batch.
6+
///
7+
/// All operations on it are wrapping.
8+
#[derive(Debug, Default, Serialize, Deserialize, Eq, PartialEq, Hash, Clone, Copy)]
9+
pub struct PatchIndex(#[serde(with = "postcard::fixint::le")] u16);
10+
11+
impl PatchIndex {
12+
/// The maximum wrapping distance at which an index is considered newer.
13+
pub const MAX_NEWER_DISTANCE: u16 = u16::MAX / 2;
14+
15+
#[inline]
16+
pub fn new(value: u16) -> Self {
17+
Self(value)
18+
}
19+
20+
#[inline]
21+
pub fn get(self) -> u16 {
22+
self.0
23+
}
24+
25+
/// Returns `true` if `self` is newer than `other`.
26+
///
27+
/// The value is considered newer if it is ahead of the other value
28+
/// by less than [`PatchIndex::MAX_NEWER_DISTANCE`].
29+
pub fn is_newer_than(self, other: Self) -> bool {
30+
let distance = self.distance_after(other);
31+
distance != 0 && distance <= Self::MAX_NEWER_DISTANCE
32+
}
33+
34+
/// Returns the wrapping distance from `base` to `self`.
35+
#[inline]
36+
pub fn distance_after(self, base: Self) -> u16 {
37+
self.0.wrapping_sub(base.0)
38+
}
39+
}
40+
41+
impl Add<u16> for PatchIndex {
42+
type Output = Self;
43+
44+
#[inline]
45+
fn add(self, rhs: u16) -> Self::Output {
46+
Self(self.0.wrapping_add(rhs))
47+
}
48+
}
49+
50+
impl Sub<u16> for PatchIndex {
51+
type Output = Self;
52+
53+
#[inline]
54+
fn sub(self, rhs: u16) -> Self::Output {
55+
Self(self.0.wrapping_sub(rhs))
56+
}
57+
}
58+
59+
impl Sub for PatchIndex {
60+
type Output = u16;
61+
62+
#[inline]
63+
fn sub(self, rhs: Self) -> Self::Output {
64+
self.0.wrapping_sub(rhs.0)
65+
}
66+
}

tests/diff.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ fn multiple_patches_in_same_send_share_patch_cursor() {
157157
replicate_and_ack(&mut server_app, &mut client_app);
158158

159159
assert_client_point_values(&mut client_app, 0..=2);
160-
assert_diff_cursor(&server_app, server_entity, Some(0));
160+
assert_diff_cursor(&server_app, server_entity, Some(PatchIndex::new(0)));
161161

162162
server_app
163163
.world_mut()
@@ -167,7 +167,7 @@ fn multiple_patches_in_same_send_share_patch_cursor() {
167167
replicate_and_ack(&mut server_app, &mut client_app);
168168

169169
assert_client_point_values(&mut client_app, 0..=3);
170-
assert_diff_cursor(&server_app, server_entity, Some(1));
170+
assert_diff_cursor(&server_app, server_entity, Some(PatchIndex::new(1)));
171171
}
172172

173173
#[test]
@@ -334,15 +334,15 @@ fn duplicate_patches_are_ignored_by_receiver() {
334334
);
335335
entity.apply_write(
336336
wire(DiffWire::Patches {
337-
first_index: 0,
337+
first_index: PatchIndex::new(0),
338338
patches: vec![vec![PointPatch::PushBack(Vec2::new(2.0, 2.0))]],
339339
}),
340340
fns_id,
341341
RepliconTick::default(),
342342
);
343343
entity.apply_write(
344344
wire(DiffWire::Patches {
345-
first_index: 0,
345+
first_index: PatchIndex::new(0),
346346
patches: vec![vec![PointPatch::PushBack(Vec2::new(2.0, 2.0))]],
347347
}),
348348
fns_id,
@@ -368,7 +368,7 @@ fn out_of_order_patches_wait_for_missing_predecessor() {
368368
);
369369
entity.apply_write(
370370
wire(DiffWire::Patches {
371-
first_index: 1,
371+
first_index: PatchIndex::new(1),
372372
patches: vec![vec![PointPatch::PushBack(Vec2::new(3.0, 3.0))]],
373373
}),
374374
fns_id,
@@ -378,7 +378,7 @@ fn out_of_order_patches_wait_for_missing_predecessor() {
378378

379379
entity.apply_write(
380380
wire(DiffWire::Patches {
381-
first_index: 0,
381+
first_index: PatchIndex::new(0),
382382
patches: vec![vec![PointPatch::PushBack(Vec2::new(2.0, 2.0))]],
383383
}),
384384
fns_id,
@@ -396,7 +396,7 @@ fn patches_before_snapshot_are_rejected() {
396396

397397
entity.apply_write(
398398
wire(DiffWire::Patches {
399-
first_index: 0,
399+
first_index: PatchIndex::new(0),
400400
patches: vec![vec![PointPatch::PushBack(Vec2::new(1.0, 1.0))]],
401401
}),
402402
fns_id,
@@ -520,8 +520,8 @@ fn write_point_history(
520520
// Batch N transforms state cursor N - 1 into cursor N. Batch 0
521521
// transforms the pre-patch base, represented by `None`, into
522522
// cursor `Some(0)`.
523-
let base_cursor = first_index.checked_sub(1);
524-
let cursor = Some(first_index + patches.len() as PatchIndex - 1);
523+
let base_cursor = (first_index != PatchIndex::new(0)).then_some(first_index - 1);
524+
let cursor = Some(first_index + patches.len() as u16 - 1);
525525
// The base must come from a confirmed value in the history: consumers
526526
// like prediction/interpolation may locally mutate the live component,
527527
// so it can never be used as a patch base.

0 commit comments

Comments
 (0)