Skip to content

Commit 25d7d10

Browse files
committed
refactor: simplify work unit handling
1 parent 6a16e6e commit 25d7d10

1 file changed

Lines changed: 12 additions & 13 deletions

File tree

src/concurrent.rs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ pub struct SpscParser {
140140

141141
impl SpscParser {
142142
pub fn new() -> Self {
143-
let (input_sender, input_receiver) = bounded(4096);
143+
let (input_sender, input_receiver) = bounded::<WorkUnit>(4096);
144144
let (output_sender, output_receiver) = bounded(4096);
145145
let shutdown = Arc::new(AtomicBool::new(false));
146146
let stats = Arc::new(AtomicStats::new());
@@ -162,12 +162,7 @@ impl SpscParser {
162162
if shutdown_flag.load(Ordering::Acquire) {
163163
while let Ok(work_unit) = input_r.try_recv() {
164164
pending_ref.fetch_sub(1, Ordering::Relaxed);
165-
let (data_slice, data_len) = match &work_unit {
166-
WorkUnit::Owned(v) => (v.as_slice(), v.len()),
167-
WorkUnit::ArcSlice(arc, start, end) => {
168-
(&arc[*start..*end], end - start)
169-
}
170-
};
165+
let (data_slice, data_len) = work_unit.as_slice();
171166
if let Ok(iter) = parser.parse_all(data_slice)
172167
&& let Ok(msgs) = iter.collect::<crate::error::Result<Vec<Message>>>()
173168
{
@@ -184,12 +179,7 @@ impl SpscParser {
184179
match input_r.try_recv() {
185180
Ok(work_unit) => {
186181
pending_ref.fetch_sub(1, Ordering::Relaxed);
187-
let (data_slice, data_len) = match &work_unit {
188-
WorkUnit::Owned(v) => (v.as_slice(), v.len()),
189-
WorkUnit::ArcSlice(arc, start, end) => {
190-
(&arc[*start..*end], end - start)
191-
}
192-
};
182+
let (data_slice, data_len) = work_unit.as_slice();
193183
match parser.parse_all(data_slice) {
194184
Ok(iter) => {
195185
let messages: Result<Vec<Message>> = iter.collect();
@@ -363,6 +353,15 @@ enum WorkUnit {
363353
ArcSlice(Arc<[u8]>, usize, usize),
364354
}
365355

356+
impl WorkUnit {
357+
fn as_slice(&self) -> (&[u8], usize) {
358+
match self {
359+
WorkUnit::Owned(v) => (v.as_slice(), v.len()),
360+
WorkUnit::ArcSlice(arc, start, end) => (&arc[*start..*end], end - start),
361+
}
362+
}
363+
}
364+
366365
impl ParallelParser {
367366
pub fn new(num_workers: usize) -> Self {
368367
let num_workers = num_workers.max(1);

0 commit comments

Comments
 (0)