Skip to content

Commit 5af5bfe

Browse files
committed
Fix Windows ConPTY worker interrupts
1 parent 5842aa2 commit 5af5bfe

6 files changed

Lines changed: 365 additions & 85 deletions

File tree

src/ipc/transport.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -157,13 +157,16 @@ impl IpcServer {
157157
}
158158

159159
#[cfg(target_family = "windows")]
160-
pub fn connect(
160+
pub fn connect<ChildExited>(
161161
self,
162162
handle: IpcHandle,
163163
handlers: IpcHandlers,
164-
child: &mut std::process::Child,
164+
child_exited: ChildExited,
165165
max_wait: Duration,
166-
) -> io::Result<()> {
166+
) -> io::Result<()>
167+
where
168+
ChildExited: FnMut() -> io::Result<bool>,
169+
{
167170
let Some(server_pipe_to_worker) = self.server_pipe_to_worker else {
168171
return Err(io::Error::other(
169172
"missing ipc named pipe handle (to-worker)",
@@ -175,9 +178,14 @@ impl IpcServer {
175178
));
176179
};
177180
let start = Instant::now();
178-
connect_named_pipe_with_process_retry(&server_pipe_to_worker, child, max_wait)?;
181+
let mut child_exited = child_exited;
182+
connect_named_pipe_with_process_retry(&server_pipe_to_worker, &mut child_exited, max_wait)?;
179183
let remaining = max_wait.saturating_sub(start.elapsed());
180-
connect_named_pipe_with_process_retry(&server_pipe_from_worker, child, remaining)?;
184+
connect_named_pipe_with_process_retry(
185+
&server_pipe_from_worker,
186+
&mut child_exited,
187+
remaining,
188+
)?;
181189
let conn = ServerIpcConnection::new(
182190
IpcTransport {
183191
reader: Box::new(server_pipe_from_worker),
@@ -507,12 +515,12 @@ fn join_connector_with_grace(connector: thread::JoinHandle<()>, max_wait: Durati
507515
#[cfg(target_family = "windows")]
508516
fn connect_named_pipe_with_process_retry(
509517
server_pipe: &File,
510-
child: &mut std::process::Child,
518+
child_exited: &mut impl FnMut() -> io::Result<bool>,
511519
max_wait: Duration,
512520
) -> io::Result<()> {
513521
connect_named_pipe_with_process_retry_impl(
514522
|timeout| connect_named_pipe(server_pipe, timeout),
515-
|| child.try_wait().map(|status| status.is_some()),
523+
child_exited,
516524
max_wait,
517525
)
518526
}

src/python_session.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,8 @@ fn finish_active_request_at_next_read() {
203203
let mut guard = state.inner.lock().unwrap();
204204
guard.waiting_for_input = false;
205205
if let Some(active) = guard.active_request.as_mut() {
206+
#[cfg(windows)]
207+
active.queued_lines.clear();
206208
active.line_count = active.consumed_lines.saturating_add(1);
207209
active.fallback_prompt = None;
208210
active.skip_next_hook = false;

src/server/response.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1739,6 +1739,24 @@ fn omission_event_line_len() -> usize {
17391739
build_events_log_server_line(OUTPUT_BUNDLE_OMITTED_NOTICE).len()
17401740
}
17411741

1742+
fn terminal_mode_toggle_only(text: &str) -> bool {
1743+
let mut rest = text;
1744+
let mut saw_toggle = false;
1745+
loop {
1746+
if let Some(next) = rest.strip_prefix("\u{1b}[?9001h") {
1747+
rest = next;
1748+
saw_toggle = true;
1749+
continue;
1750+
}
1751+
if let Some(next) = rest.strip_prefix("\u{1b}[?1004h") {
1752+
rest = next;
1753+
saw_toggle = true;
1754+
continue;
1755+
}
1756+
return saw_toggle && rest.trim_matches(['\r', '\n']).is_empty();
1757+
}
1758+
}
1759+
17421760
/// Normalizes one worker reply into renderable items while preserving the split between
17431761
/// worker-originated transcript text and inline-only server notices.
17441762
fn prepare_reply_material(reply: WorkerReply, detached_prefix_item_count: usize) -> ReplyMaterial {
@@ -1772,6 +1790,13 @@ fn prepare_reply_material(reply: WorkerReply, detached_prefix_item_count: usize)
17721790
} else {
17731791
text
17741792
};
1793+
if matches!(
1794+
(origin, stream),
1795+
(ContentOrigin::Worker, TextStream::Stdout)
1796+
) && terminal_mode_toggle_only(&text)
1797+
{
1798+
continue;
1799+
}
17751800
if text.is_empty() {
17761801
continue;
17771802
}
@@ -2797,6 +2822,54 @@ mod tests {
27972822
assert_eq!(normalize_error_prompt(text.clone(), true), text);
27982823
}
27992824

2825+
#[test]
2826+
fn standalone_terminal_mode_toggles_are_dropped_from_inline_reply() {
2827+
let mut state = ResponseState::new().expect("response state should initialize");
2828+
let result = state.finalize_worker_result(
2829+
Ok(worker_reply(
2830+
vec![
2831+
WorkerContent::worker_stdout("\u{1b}[?9001h\u{1b}[?1004h"),
2832+
WorkerContent::worker_stdout("2\n"),
2833+
],
2834+
None,
2835+
)),
2836+
false,
2837+
TimeoutBundleReuse::None,
2838+
0,
2839+
);
2840+
2841+
assert_eq!(result_text(&result), "2\n");
2842+
}
2843+
2844+
#[test]
2845+
fn standalone_terminal_mode_toggles_are_dropped_from_output_bundle() {
2846+
let mut state = ResponseState::new().expect("response state should initialize");
2847+
let visible = "x".repeat(super::INLINE_TEXT_HARD_SPILL_THRESHOLD + 200);
2848+
let result = state.finalize_worker_result(
2849+
Ok(worker_reply(
2850+
vec![
2851+
WorkerContent::worker_stdout("\u{1b}[?9001h\u{1b}[?1004h"),
2852+
WorkerContent::worker_stdout(visible.clone()),
2853+
],
2854+
None,
2855+
)),
2856+
false,
2857+
TimeoutBundleReuse::None,
2858+
0,
2859+
);
2860+
2861+
let text = result_text(&result);
2862+
let transcript_path = disclosed_path(&text, "transcript.txt")
2863+
.unwrap_or_else(|| panic!("expected transcript path, got: {text:?}"));
2864+
let transcript = fs::read_to_string(&transcript_path)
2865+
.unwrap_or_else(|err| panic!("expected transcript to be readable: {err}"));
2866+
assert_eq!(transcript, visible);
2867+
assert!(
2868+
!text.contains("\u{1b}[?9001h") && !text.contains("\u{1b}[?1004h"),
2869+
"did not expect terminal mode toggles inline: {text:?}"
2870+
);
2871+
}
2872+
28002873
#[test]
28012874
fn events_log_text_rows_preserve_partial_line_state_across_images() {
28022875
let mut store = OutputStore::new().expect("output store should initialize");

src/windows_conpty.rs

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -179,10 +179,7 @@ pub fn run_conpty_command_with_env_map(
179179
upsert_env_case_insensitive(&mut env_map, WINDOWS_CONPTY_REQUEST_ENV, "1");
180180
upsert_env_case_insensitive(&mut env_map, WINDOWS_CONPTY_ATTACHED_ENV, "1");
181181
crate::diagnostics::startup_log("windows-conpty: conpty created");
182-
let output_read = conpty
183-
.output_read
184-
.try_clone()
185-
.map_err(|err| format!("failed to clone ConPTY output handle: {err}"))?;
182+
let output_read = conpty.take_output_reader()?;
186183
let output_forwarder = spawn_conpty_output_forwarder(output_read);
187184
crate::diagnostics::startup_log("windows-conpty: spawning child");
188185
let proc_info = spawn_conpty_process(command, cwd, &env_map, conpty.hpc)?;
@@ -270,22 +267,31 @@ pub unsafe fn spawn_conpty_process_as_user(
270267
let mut conpty = Conpty::new()?;
271268
upsert_env_case_insensitive(env_map, WINDOWS_CONPTY_REQUEST_ENV, "1");
272269
upsert_env_case_insensitive(env_map, WINDOWS_CONPTY_ATTACHED_ENV, "1");
273-
let output_read = conpty
274-
.output_read
275-
.try_clone()
276-
.map_err(|err| format!("failed to clone ConPTY output handle: {err}"))?;
270+
let output_read = conpty.take_output_reader()?;
277271
let output_forwarder = spawn_conpty_output_forwarder(output_read);
278272
let proc_info = spawn_conpty_process_with_token(token, command, cwd, env_map, conpty.hpc)?;
279273
conpty.close_child_side_handles();
280274
Ok((proc_info, conpty, output_forwarder))
281275
}
282276

277+
pub unsafe fn spawn_conpty_process_direct(
278+
command: &[String],
279+
cwd: Option<&Path>,
280+
env_map: &mut HashMap<String, String>,
281+
) -> Result<(PROCESS_INFORMATION, Conpty), String> {
282+
let mut conpty = Conpty::new()?;
283+
upsert_env_case_insensitive(env_map, WINDOWS_CONPTY_REQUEST_ENV, "1");
284+
upsert_env_case_insensitive(env_map, WINDOWS_CONPTY_ATTACHED_ENV, "1");
285+
let proc_info = spawn_conpty_process(command, cwd, env_map, conpty.hpc)?;
286+
conpty.close_child_side_handles();
287+
Ok((proc_info, conpty))
288+
}
289+
283290
pub struct Conpty {
284291
hpc: HPCON,
285292
input_read: Option<File>,
286-
#[allow(dead_code)]
287-
input_write: File,
288-
output_read: File,
293+
input_write: Option<File>,
294+
output_read: Option<File>,
289295
output_write: Option<File>,
290296
}
291297

@@ -342,12 +348,24 @@ impl Conpty {
342348
Ok(Self {
343349
hpc,
344350
input_read: Some(File::from_raw_handle(input_read as _)),
345-
input_write: File::from_raw_handle(input_write as _),
346-
output_read: File::from_raw_handle(output_read as _),
351+
input_write: Some(File::from_raw_handle(input_write as _)),
352+
output_read: Some(File::from_raw_handle(output_read as _)),
347353
output_write: Some(File::from_raw_handle(output_write as _)),
348354
})
349355
}
350356

357+
pub fn take_input_writer(&mut self) -> Result<File, String> {
358+
self.input_write
359+
.take()
360+
.ok_or_else(|| "ConPTY input writer already taken".to_string())
361+
}
362+
363+
pub fn take_output_reader(&mut self) -> Result<File, String> {
364+
self.output_read
365+
.take()
366+
.ok_or_else(|| "ConPTY output reader already taken".to_string())
367+
}
368+
351369
fn close_child_side_handles(&mut self) {
352370
self.input_read.take();
353371
self.output_write.take();

0 commit comments

Comments
 (0)