Skip to content

Commit 88b5e95

Browse files
committed
Run parallel steps concurrently
1 parent 97ecec3 commit 88b5e95

2 files changed

Lines changed: 35 additions & 7 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ This will execute the `my_pipeline` pipeline with the initial input "hello world
238238
* **Variable Substitution:** Pipeline steps can use variables defined in the state using `${variable_name}` syntax.
239239
* **Retry Mechanism:** Steps can be configured with a retry mechanism to handle transient errors.
240240
* **State Persistence:** Pipeline state is automatically saved and loaded, allowing for seamless resumption.
241-
* **Parallel Execution:** The `Parallel` step allows for concurrent execution of steps.
241+
* **Parallel Execution:** The `Parallel` step allows for concurrent execution of steps. Nested `Parallel` blocks are also run concurrently and their results aggregated when all tasks finish.
242242
* **Timeout Mechanism:** The `Timeout` step allows for setting a time limit for a specific step.
243243

244244
### Example Pipeline

crates/fluent-engines/src/pipeline_executor.rs

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -723,13 +723,41 @@ impl<S: StateStore + Clone + std::marker::Sync + std::marker::Send> PipelineExec
723723
}
724724
}
725725
PipelineStep::Parallel { name: _, steps } => {
726-
// For simplicity, we'll execute parallel steps sequentially in this context
727-
let mut result = HashMap::new();
728-
for sub_step in steps {
729-
let step_result = Self::execute_single_step(sub_step, state).await?;
730-
result.extend(step_result);
726+
let state_arc = Arc::new(tokio::sync::Mutex::new(state.clone()));
727+
let mut set = JoinSet::new();
728+
729+
for sub_step in steps.iter().cloned() {
730+
let state_clone = Arc::clone(&state_arc);
731+
set.spawn(async move {
732+
let mut guard = state_clone.lock().await;
733+
Self::execute_single_step(&sub_step, &mut guard).await
734+
});
731735
}
732-
Ok(result)
736+
737+
let mut combined_results = HashMap::new();
738+
while let Some(result) = set.join_next().await {
739+
match result {
740+
Ok(Ok(step_result)) => {
741+
combined_results.extend(step_result);
742+
}
743+
Ok(Err(e)) => {
744+
combined_results
745+
.insert(format!("error_{}", combined_results.len()), e.to_string());
746+
}
747+
Err(e) => {
748+
combined_results.insert(
749+
format!("join_error_{}", combined_results.len()),
750+
e.to_string(),
751+
);
752+
}
753+
}
754+
}
755+
756+
let state_guard = state_arc.lock().await;
757+
state.data.extend(state_guard.data.clone());
758+
state.data.extend(combined_results.clone());
759+
760+
Ok(combined_results)
733761
}
734762
_ => Err(anyhow!("Unknown step type")),
735763
}

0 commit comments

Comments
 (0)