Skip to content

Commit bf07728

Browse files
committed
refactor: remove duplicated WorkUnit slice extraction
1 parent 25d7d10 commit bf07728

1 file changed

Lines changed: 34 additions & 114 deletions

File tree

src/concurrent.rs

Lines changed: 34 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -393,31 +393,19 @@ impl ParallelParser {
393393
while !shutdown_flag.load(Ordering::Acquire) {
394394
match rx.recv() {
395395
Ok(work) => {
396-
match work {
397-
WorkUnit::Owned(data_vec) => {
398-
let data_len = data_vec.len();
399-
match parser.parse_all(&data_vec) {
400-
Ok(iter) => {
401-
let messages: Result<Vec<Message>> = iter.collect();
402-
match messages {
403-
Ok(msgs) => {
404-
let msg_count = msgs.len() as u64;
405-
msg_counter
406-
.fetch_add(msg_count, Ordering::Relaxed);
407-
byte_counter.fetch_add(
408-
data_len as u64,
409-
Ordering::Relaxed,
410-
);
411-
stats[worker_id]
412-
.record_batch(msg_count, data_len as u64);
413-
let _ = tx.send(msgs);
414-
}
415-
Err(_) => {
416-
err_counter.fetch_add(1, Ordering::Relaxed);
417-
stats[worker_id].record_error();
418-
let _ = tx.send(Vec::new());
419-
}
420-
}
396+
let (data_slice, data_len) = work.as_slice();
397+
match parser.parse_all(data_slice) {
398+
Ok(iter) => {
399+
let messages: Result<Vec<Message>> = iter.collect();
400+
match messages {
401+
Ok(msgs) => {
402+
let msg_count = msgs.len() as u64;
403+
msg_counter.fetch_add(msg_count, Ordering::Relaxed);
404+
byte_counter
405+
.fetch_add(data_len as u64, Ordering::Relaxed);
406+
stats[worker_id]
407+
.record_batch(msg_count, data_len as u64);
408+
let _ = tx.send(msgs);
421409
}
422410
Err(_) => {
423411
err_counter.fetch_add(1, Ordering::Relaxed);
@@ -426,38 +414,10 @@ impl ParallelParser {
426414
}
427415
}
428416
}
429-
WorkUnit::ArcSlice(arc, start, end) => {
430-
let slice = &arc[start..end];
431-
let data_len = slice.len();
432-
match parser.parse_all(slice) {
433-
Ok(iter) => {
434-
let messages: Result<Vec<Message>> = iter.collect();
435-
match messages {
436-
Ok(msgs) => {
437-
let msg_count = msgs.len() as u64;
438-
msg_counter
439-
.fetch_add(msg_count, Ordering::Relaxed);
440-
byte_counter.fetch_add(
441-
data_len as u64,
442-
Ordering::Relaxed,
443-
);
444-
stats[worker_id]
445-
.record_batch(msg_count, data_len as u64);
446-
let _ = tx.send(msgs);
447-
}
448-
Err(_) => {
449-
err_counter.fetch_add(1, Ordering::Relaxed);
450-
stats[worker_id].record_error();
451-
let _ = tx.send(Vec::new());
452-
}
453-
}
454-
}
455-
Err(_) => {
456-
err_counter.fetch_add(1, Ordering::Relaxed);
457-
stats[worker_id].record_error();
458-
let _ = tx.send(Vec::new());
459-
}
460-
}
417+
Err(_) => {
418+
err_counter.fetch_add(1, Ordering::Relaxed);
419+
stats[worker_id].record_error();
420+
let _ = tx.send(Vec::new());
461421
}
462422
}
463423
parser.reset();
@@ -485,10 +445,7 @@ impl ParallelParser {
485445

486446
pub fn submit(&self, data: Vec<u8>) -> Result<()> {
487447
self.sender.send(WorkUnit::Owned(data)).map_err(|e| {
488-
let size = match &e.0 {
489-
WorkUnit::Owned(v) => v.len(),
490-
WorkUnit::ArcSlice(_, s, e) => e - s,
491-
};
448+
let size = e.0.as_slice().1;
492449
crate::error::ParseError::BufferOverflow { size, max: 0 }
493450
})
494451
}
@@ -505,10 +462,7 @@ impl ParallelParser {
505462
self.sender
506463
.send(WorkUnit::ArcSlice(data, start, end))
507464
.map_err(|e| {
508-
let size = match &e.0 {
509-
WorkUnit::Owned(v) => v.len(),
510-
WorkUnit::ArcSlice(_, s, e) => e - s,
511-
};
465+
let size = e.0.as_slice().1;
512466
crate::error::ParseError::BufferOverflow { size, max: 0 }
513467
})
514468
}
@@ -1630,27 +1584,17 @@ impl WorkStealingParser {
16301584

16311585
match work {
16321586
Some(unit) => {
1633-
match unit {
1634-
WorkUnit::Owned(data_vec) => {
1635-
let data_len = data_vec.len();
1636-
match parser.parse_all(&data_vec) {
1637-
Ok(iter) => {
1638-
let messages: Result<Vec<Message>> = iter.collect();
1639-
match messages {
1640-
Ok(msgs) => {
1641-
let msg_count = msgs.len() as u64;
1642-
stats_ref.add_messages(msg_count);
1643-
stats_ref.add_bytes(data_len as u64);
1644-
ws[worker_id]
1645-
.record_batch(msg_count, data_len as u64);
1646-
let _ = tx.send(msgs);
1647-
}
1648-
Err(_) => {
1649-
stats_ref.add_error();
1650-
ws[worker_id].record_error();
1651-
let _ = tx.send(Vec::new());
1652-
}
1653-
}
1587+
let (data_slice, data_len) = unit.as_slice();
1588+
match parser.parse_all(data_slice) {
1589+
Ok(iter) => {
1590+
let messages: Result<Vec<Message>> = iter.collect();
1591+
match messages {
1592+
Ok(msgs) => {
1593+
let msg_count = msgs.len() as u64;
1594+
stats_ref.add_messages(msg_count);
1595+
stats_ref.add_bytes(data_len as u64);
1596+
ws[worker_id].record_batch(msg_count, data_len as u64);
1597+
let _ = tx.send(msgs);
16541598
}
16551599
Err(_) => {
16561600
stats_ref.add_error();
@@ -1659,34 +1603,10 @@ impl WorkStealingParser {
16591603
}
16601604
}
16611605
}
1662-
WorkUnit::ArcSlice(arc, start, end) => {
1663-
let slice = &arc[start..end];
1664-
let data_len = slice.len();
1665-
match parser.parse_all(slice) {
1666-
Ok(iter) => {
1667-
let messages: Result<Vec<Message>> = iter.collect();
1668-
match messages {
1669-
Ok(msgs) => {
1670-
let msg_count = msgs.len() as u64;
1671-
stats_ref.add_messages(msg_count);
1672-
stats_ref.add_bytes(data_len as u64);
1673-
ws[worker_id]
1674-
.record_batch(msg_count, data_len as u64);
1675-
let _ = tx.send(msgs);
1676-
}
1677-
Err(_) => {
1678-
stats_ref.add_error();
1679-
ws[worker_id].record_error();
1680-
let _ = tx.send(Vec::new());
1681-
}
1682-
}
1683-
}
1684-
Err(_) => {
1685-
stats_ref.add_error();
1686-
ws[worker_id].record_error();
1687-
let _ = tx.send(Vec::new());
1688-
}
1689-
}
1606+
Err(_) => {
1607+
stats_ref.add_error();
1608+
ws[worker_id].record_error();
1609+
let _ = tx.send(Vec::new());
16901610
}
16911611
}
16921612
parser.reset();

0 commit comments

Comments
 (0)