Skip to content

Commit 95f91a3

Browse files
committed
Add the ability to attach custom data to replication messages
1 parent 4f280bf commit 95f91a3

8 files changed

Lines changed: 206 additions & 9 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Added
11+
12+
- `ReplicationUserdata` and `UserdataReceived` to attach custom data to replication messages.
13+
1014
### Changed
1115

1216
- Removing `Replicated` from an entity now stops replication without despawning the entity on clients. Client-side despawns of `Remote` entities clean up their `ServerEntityMap` mappings. Despawning an entity still replicates as despawn.

src/client.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,10 @@ fn apply_update_message(
307307
};
308308

309309
match flag {
310+
UpdateFlags::USERDATA => {
311+
apply_userdata(world, message)
312+
.map_err(|e| format!("unable to apply userdata: {e}"))?;
313+
}
310314
UpdateFlags::MAPPINGS => {
311315
let len = apply_array(array_kind, message, |message| {
312316
apply_entity_mapping(world, params, message)
@@ -397,6 +401,10 @@ fn apply_mutate_message(
397401

398402
for (_, flag) in mutate.flags.iter_names() {
399403
match flag {
404+
MutateFlags::USERDATA => {
405+
apply_userdata(world, &mut mutate.message)
406+
.map_err(|e| format!("unable to apply userdata: {e}"))?;
407+
}
400408
MutateFlags::MESSAGES_COUNT => {
401409
confirm_mutate_tick(world, params.mutate_ticks, mutate)
402410
.map_err(|e| format!("unable to confirm mutate tick: {e}"))?;
@@ -617,6 +625,22 @@ fn apply_changes(
617625
Ok(())
618626
}
619627

628+
fn apply_userdata(world: &mut World, message: &mut Bytes) -> Result<()> {
629+
let len = postcard_utils::from_buf(message)?;
630+
if len > message.len() {
631+
return Err(format!(
632+
"userdata length ({len}) exceeds remaining message length ({})",
633+
message.len()
634+
)
635+
.into());
636+
}
637+
world.trigger(UserdataReceived {
638+
bytes: message.split_to(len),
639+
});
640+
641+
Ok(())
642+
}
643+
620644
fn apply_array(
621645
kind: ArrayKind,
622646
message: &mut Bytes,
@@ -952,3 +976,12 @@ pub struct ClientReplicationStats {
952976
#[derive(Component, Default, Reflect, Debug, Clone, Copy)]
953977
#[reflect(Component)]
954978
pub struct Remote;
979+
980+
/// Triggered when user-defined bytes are received in a replication message.
981+
///
982+
/// This is emitted for data sent through [`ReplicationUserdata`](crate::server::ReplicationUserdata).
983+
#[derive(Event)]
984+
pub struct UserdataReceived {
985+
/// Raw user-defined bytes received from the server.
986+
pub bytes: Bytes,
987+
}

src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -641,6 +641,11 @@ It's also possible to override despawns received from the server via
641641
[`ReplicationRegistry::despawn`](shared::replication::registry::ReplicationRegistry::despawn), which is also often used together
642642
with receive markers.
643643
644+
### Replication userdata
645+
646+
It's possible to attach arbitrary bytes to replication messages by writing to the [`ReplicationUserdata`](server::ReplicationUserdata)
647+
resource. When the client receives a replication message containing userdata, [`UserdataReceived`](client::UserdataReceived) is triggered.
648+
644649
### Ticks information
645650
646651
This requires an understanding of how replication works. See the documentation on

src/server.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ impl Plugin for ServerPlugin {
135135
.init_resource::<ServerTick>()
136136
.init_resource::<ServerChangeTick>()
137137
.init_resource::<ReplicatedArchetypes>()
138+
.init_resource::<ReplicationUserdata>()
138139
.init_resource::<MessageBuffer>()
139140
.init_resource::<RelatedEntities>()
140141
.init_resource::<FilterRegistry>()
@@ -858,6 +859,7 @@ fn send_messages(
858859
server_tick: Res<ServerTick>,
859860
change_tick: Res<ServerChangeTick>,
860861
track_mutate_messages: Res<TrackMutateMessages>,
862+
userdata: Res<ReplicationUserdata>,
861863
mut serialized: ResMut<SerializedData>,
862864
mut messages: ResMut<ServerMessages>,
863865
mut clients: Query<(
@@ -875,7 +877,13 @@ fn send_messages(
875877
let server_tick_range =
876878
serialized.write_cached_tick(&mut server_tick_range, **server_tick)?;
877879

878-
updates.send(&mut messages, client, &serialized, server_tick_range)?;
880+
updates.send(
881+
&mut messages,
882+
client,
883+
&serialized,
884+
&userdata,
885+
server_tick_range,
886+
)?;
879887
}
880888

881889
if !mutations.is_empty() || **track_mutate_messages {
@@ -889,6 +897,7 @@ fn send_messages(
889897
&mut split_buffer,
890898
&serialized,
891899
**track_mutate_messages,
900+
&userdata,
892901
server_tick_range,
893902
**server_tick,
894903
**change_tick,
@@ -1023,3 +1032,16 @@ struct TicksTracked;
10231032
/// Value of the [`ServerPlugin::track_mutate_messages`].
10241033
#[derive(Resource, Deref, Default, Debug, Clone, Copy)]
10251034
struct TrackMutateMessages(bool);
1035+
1036+
/// User-defined bytes appended to outgoing replication messages.
1037+
///
1038+
/// When this resource is non-empty, its contents are sent with every replication
1039+
/// update and mutate message. On the client, the bytes are triggered as a
1040+
/// [`UserdataReceived`](crate::client::UserdataReceived) before the rest of
1041+
/// the message is applied.
1042+
///
1043+
/// This could be useful to store the game tick for prediction/interpolaton.
1044+
///
1045+
/// The bytes are not cleared after being sent.
1046+
#[derive(Resource, Deref, DerefMut, Default)]
1047+
pub struct ReplicationUserdata(pub Vec<u8>);

src/server/replication_messages/mutations.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use super::{entity_ranges::EntityRanges, serialized_data::SerializedData};
88
use crate::{
99
postcard_utils,
1010
prelude::*,
11+
server::ReplicationUserdata,
1112
shared::{
1213
backend::channels::ServerChannel,
1314
replication::{
@@ -147,6 +148,7 @@ impl Mutations {
147148
split_buffer: &mut Vec<MutationsSplit>,
148149
serialized: &SerializedData,
149150
track_mutate_messages: bool,
151+
userdata: &ReplicationUserdata,
150152
server_tick_range: Range<usize>,
151153
server_tick: RepliconTick,
152154
system_tick: Tick,
@@ -158,6 +160,9 @@ impl Mutations {
158160
let update_tick = postcard::to_slice(&ticks.update_tick, &mut tick_buffer)?;
159161
let mut base_header_size =
160162
size_of::<MutateFlags>() + update_tick.len() + server_tick_range.len();
163+
if !userdata.is_empty() {
164+
base_header_size += serialized_size(&userdata.len())? + userdata.len();
165+
}
161166
if track_mutate_messages {
162167
// We don't know the number of messages ahead of time, so we assume the maximum
163168
// possible size during the splits calculation to avoid exceeding MTU.
@@ -237,6 +242,9 @@ impl Mutations {
237242
if track_mutate_messages {
238243
base_flags |= MutateFlags::MESSAGES_COUNT;
239244
}
245+
if !userdata.is_empty() {
246+
base_flags |= MutateFlags::USERDATA;
247+
}
240248

241249
for split in &*split_buffer {
242250
let mut message_size = split.message_size;
@@ -256,6 +264,10 @@ impl Mutations {
256264
postcard_utils::to_extend_mut(&split.mutate_index, &mut message)?;
257265
message.extend_from_slice(update_tick);
258266
message.extend_from_slice(&serialized[server_tick_range.clone()]);
267+
if !userdata.is_empty() {
268+
postcard_utils::to_extend_mut(&userdata.len(), &mut message)?;
269+
message.extend_from_slice(userdata);
270+
}
259271
if track_mutate_messages {
260272
postcard_utils::to_extend_mut(&split_buffer.len(), &mut message)?;
261273
}
@@ -487,6 +499,7 @@ mod tests {
487499
&mut Default::default(),
488500
&serialized,
489501
track_mutate_messages,
502+
&Default::default(),
490503
Default::default(),
491504
Default::default(),
492505
Default::default(),

src/server/replication_messages/updates.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use super::{entity_ranges::EntityRanges, mutations::Mutations, serialized_data::
77
use crate::{
88
postcard_utils,
99
prelude::*,
10+
server::ReplicationUserdata,
1011
shared::{
1112
backend::channels::ServerChannel,
1213
replication::{
@@ -226,15 +227,19 @@ impl Updates {
226227
messages: &mut ServerMessages,
227228
client: Entity,
228229
serialized: &SerializedData,
230+
userdata: &ReplicationUserdata,
229231
server_tick_range: Range<usize>,
230232
) -> Result<()> {
231-
let flags = self.flags();
233+
let flags = self.flags(userdata);
232234
let last_flag = flags.last();
233235

234236
// Precalculate size first to avoid extra allocations.
235237
let mut message_size = size_of::<UpdateFlags>() + server_tick_range.len();
236238
for (_, flag) in flags.iter_names() {
237239
match flag {
240+
UpdateFlags::USERDATA => {
241+
message_size += serialized_size(&userdata.len())? + userdata.len();
242+
}
238243
UpdateFlags::MAPPINGS => {
239244
if flag != last_flag {
240245
message_size += serialized_size(&self.mappings_len)?;
@@ -274,6 +279,10 @@ impl Updates {
274279
message.extend_from_slice(&serialized[server_tick_range]);
275280
for (_, flag) in flags.iter_names() {
276281
match flag {
282+
UpdateFlags::USERDATA => {
283+
postcard_utils::to_extend_mut(&userdata.len(), &mut message)?;
284+
message.extend_from_slice(userdata);
285+
}
277286
UpdateFlags::MAPPINGS => {
278287
if flag != last_flag {
279288
postcard_utils::to_extend_mut(&self.mappings_len, &mut message)?;
@@ -323,7 +332,7 @@ impl Updates {
323332
Ok(())
324333
}
325334

326-
fn flags(&self) -> UpdateFlags {
335+
fn flags(&self, userdata: &ReplicationUserdata) -> UpdateFlags {
327336
let mut flags = UpdateFlags::default();
328337

329338
if !self.mappings.is_empty() {
@@ -338,6 +347,9 @@ impl Updates {
338347
if !self.changes.is_empty() {
339348
flags |= UpdateFlags::CHANGES;
340349
}
350+
if !userdata.is_empty() {
351+
flags |= UpdateFlags::USERDATA;
352+
}
341353

342354
flags
343355
}

src/shared/replication/message_flags.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@ bitflags! {
77
/// Serialized at the beginning of the message.
88
#[derive(Default, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Debug)]
99
pub(crate) struct UpdateFlags: u8 {
10-
const MAPPINGS = 0b00000001;
11-
const DESPAWNS = 0b00000010;
12-
const REMOVALS = 0b00000100;
13-
const CHANGES = 0b00001000;
10+
const USERDATA = 0b00000001;
11+
const MAPPINGS = 0b00000010;
12+
const DESPAWNS = 0b00000100;
13+
const REMOVALS = 0b00001000;
14+
const CHANGES = 0b00010000;
1415
}
1516
}
1617

@@ -30,8 +31,9 @@ bitflags! {
3031
/// Like [`UpdateFlags`], but for mutate messages.
3132
#[derive(Default, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Debug)]
3233
pub(crate) struct MutateFlags: u8 {
33-
const MESSAGES_COUNT = 0b00000001;
34-
const MUTATIONS = 0b00000010;
34+
const USERDATA = 0b00000001;
35+
const MESSAGES_COUNT = 0b00000010;
36+
const MUTATIONS = 0b00000100;
3537
}
3638
}
3739

tests/userdata.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
use bevy::{prelude::*, state::app::StatesPlugin};
2+
use bevy_replicon::{
3+
client::UserdataReceived, prelude::*, server::ReplicationUserdata,
4+
shared::backend::channels::ServerChannel, test_app::ServerTestAppExt,
5+
};
6+
use serde::{Deserialize, Serialize};
7+
use test_log::test;
8+
9+
#[test]
10+
fn update_message() {
11+
let mut server_app = App::new();
12+
let mut client_app = App::new();
13+
for app in [&mut server_app, &mut client_app] {
14+
app.add_plugins((
15+
MinimalPlugins,
16+
StatesPlugin,
17+
RepliconPlugins.set(ServerPlugin::new(PostUpdate)),
18+
))
19+
.init_resource::<ReceivedUserdata>()
20+
.add_observer(receive_userdata)
21+
.replicate::<TestComponent>()
22+
.finish();
23+
}
24+
25+
server_app.connect_client(&mut client_app);
26+
27+
let mut userdata = server_app.world_mut().resource_mut::<ReplicationUserdata>();
28+
userdata.extend_from_slice(&USERDATA.to_le_bytes());
29+
30+
server_app.world_mut().spawn((Replicated, TestComponent));
31+
32+
server_app.update();
33+
server_app.exchange_with_client(&mut client_app);
34+
35+
let messages = client_app.world().resource::<ClientMessages>();
36+
assert_eq!(messages.received_count(ServerChannel::Updates), 1);
37+
assert_eq!(messages.received_count(ServerChannel::Mutations), 0);
38+
39+
client_app.update();
40+
41+
let received = client_app.world().resource::<ReceivedUserdata>();
42+
assert_eq!(received.0, USERDATA);
43+
}
44+
45+
#[test]
46+
fn mutate_message() {
47+
let mut server_app = App::new();
48+
let mut client_app = App::new();
49+
for app in [&mut server_app, &mut client_app] {
50+
app.add_plugins((
51+
MinimalPlugins,
52+
StatesPlugin,
53+
RepliconPlugins.set(ServerPlugin::new(PostUpdate)),
54+
))
55+
.init_resource::<ReceivedUserdata>()
56+
.add_observer(receive_userdata)
57+
.replicate::<TestComponent>()
58+
.finish();
59+
}
60+
61+
server_app.connect_client(&mut client_app);
62+
63+
let server_entity = server_app
64+
.world_mut()
65+
.spawn((Replicated, TestComponent))
66+
.id();
67+
68+
server_app.update();
69+
server_app.exchange_with_client(&mut client_app);
70+
client_app.update();
71+
server_app.exchange_with_client(&mut client_app);
72+
73+
let mut component = server_app
74+
.world_mut()
75+
.get_mut::<TestComponent>(server_entity)
76+
.unwrap();
77+
component.set_changed();
78+
79+
let mut userdata = server_app.world_mut().resource_mut::<ReplicationUserdata>();
80+
userdata.extend_from_slice(&USERDATA.to_le_bytes());
81+
82+
server_app.update();
83+
server_app.exchange_with_client(&mut client_app);
84+
85+
let messages = client_app.world().resource::<ClientMessages>();
86+
assert_eq!(messages.received_count(ServerChannel::Updates), 0);
87+
assert_eq!(messages.received_count(ServerChannel::Mutations), 1);
88+
89+
client_app.update();
90+
91+
let received = client_app.world().resource::<ReceivedUserdata>();
92+
assert_eq!(received.0, USERDATA);
93+
}
94+
95+
const USERDATA: u32 = 42;
96+
97+
#[derive(Component, Deserialize, Serialize)]
98+
struct TestComponent;
99+
100+
#[derive(Resource, Default)]
101+
struct ReceivedUserdata(u32);
102+
103+
fn receive_userdata(received: On<UserdataReceived>, mut storage: ResMut<ReceivedUserdata>) {
104+
let bytes = received.bytes.as_ref().try_into().unwrap();
105+
storage.0 = u32::from_le_bytes(bytes);
106+
}

0 commit comments

Comments
 (0)