Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ext/node): windows cancel stdin read in line mode #23969

Merged
merged 5 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ext/io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ os_pipe.workspace = true
[target.'cfg(windows)'.dependencies]
winapi = { workspace = true, features = ["winbase", "processenv", "errhandlingapi"] }
rand.workspace = true
parking_lot.workspace = true
150 changes: 133 additions & 17 deletions ext/io/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ use winapi::um::processenv::GetStdHandle;
#[cfg(windows)]
use winapi::um::winbase;

#[cfg(windows)]
use parking_lot::Condvar;
#[cfg(windows)]
use parking_lot::Mutex;
#[cfg(windows)]
use std::sync::Arc;

pub mod fs;
mod pipe;
#[cfg(windows)]
Expand Down Expand Up @@ -106,12 +113,21 @@ deno_core::extension!(deno_io,
},
state = |state, options| {
if let Some(stdio) = options.stdio {
#[cfg(windows)]
let stdin_state = {
let st = Arc::new(Mutex::new(WinTtyState::default()));
state.put(st.clone());
st
};
#[cfg(unix)]
let stdin_state = ();

let t = &mut state.resource_table;

let rid = t.add(fs::FileResource::new(
Rc::new(match stdio.stdin.pipe {
StdioPipeInner::Inherit => StdFileResourceInner::new(
StdFileResourceKind::Stdin,
StdFileResourceKind::Stdin(stdin_state),
STDIN_HANDLE.try_clone().unwrap(),
),
StdioPipeInner::File(pipe) => StdFileResourceInner::file(pipe),
Expand Down Expand Up @@ -317,14 +333,27 @@ impl Resource for ChildStderrResource {
}
}

#[derive(Clone, Copy)]
#[cfg(windows)]
#[derive(Default)]
pub struct WinTtyState {
pub cancelled: bool,
pub reading: bool,
pub screen_buffer_info:
Option<winapi::um::wincon::CONSOLE_SCREEN_BUFFER_INFO>,
pub cvar: Arc<Condvar>,
}

#[derive(Clone)]
enum StdFileResourceKind {
File,
// For stdout and stderr, we sometimes instead use std::io::stdout() directly,
// because we get some Windows specific functionality for free by using Rust
// std's wrappers. So we take a bit of a complexity hit in order to not
// have to duplicate the functionality in Rust's std/src/sys/windows/stdio.rs
Stdin,
#[cfg(windows)]
Stdin(Arc<Mutex<WinTtyState>>),
#[cfg(not(windows))]
Stdin(()),
Stdout,
Stderr,
}
Expand Down Expand Up @@ -422,6 +451,84 @@ impl StdFileResourceInner {
spawn_blocking(action).await.unwrap()
}
}

#[cfg(windows)]
async fn handle_stdin_read(
&self,
state: Arc<Mutex<WinTtyState>>,
mut buf: BufMutView,
) -> FsResult<(usize, BufMutView)> {
loop {
let state = state.clone();

let fut = self.with_inner_blocking_task(move |file| {
/* Start reading, and set the reading flag to true */
state.lock().reading = true;
let nread = match file.read(&mut buf) {
Ok(nread) => nread,
Err(e) => return Err((e.into(), buf)),
};

let mut state = state.lock();
state.reading = false;

/* If we canceled the read by sending a VK_RETURN event, restore
the screen state to undo the visual effect of the VK_RETURN event */
if state.cancelled {
if let Some(screen_buffer_info) = state.screen_buffer_info {
// SAFETY: WinAPI calls to open conout$ and restore visual state.
unsafe {
let handle = winapi::um::fileapi::CreateFileW(
"conout$"
.encode_utf16()
.chain(Some(0))
.collect::<Vec<_>>()
.as_ptr(),
winapi::um::winnt::GENERIC_READ
| winapi::um::winnt::GENERIC_WRITE,
winapi::um::winnt::FILE_SHARE_READ
| winapi::um::winnt::FILE_SHARE_WRITE,
std::ptr::null_mut(),
winapi::um::fileapi::OPEN_EXISTING,
0,
std::ptr::null_mut(),
);

let mut pos = screen_buffer_info.dwCursorPosition;
/* If the cursor was at the bottom line of the screen buffer, the
VK_RETURN would have caused the buffer contents to scroll up by
one line. The right position to reset the cursor to is therefore one
line higher */
if pos.Y == screen_buffer_info.dwSize.Y - 1 {
pos.Y -= 1;
}

winapi::um::wincon::SetConsoleCursorPosition(handle, pos);
winapi::um::handleapi::CloseHandle(handle);
}
}

/* Reset the cancelled flag */
state.cancelled = false;

/* Unblock the main thread */
state.cvar.notify_one();

return Err((FsError::FileBusy, buf));
}

Ok((nread, buf))
});

match fut.await {
Err((FsError::FileBusy, b)) => {
buf = b;
continue;
}
other => return other.map_err(|(e, _)| e),
}
}
}
}

#[async_trait::async_trait(?Send)]
Expand All @@ -435,7 +542,7 @@ impl crate::fs::File for StdFileResourceInner {
// std/src/sys/windows/stdio.rs in Rust's source code).
match self.kind {
StdFileResourceKind::File => self.with_sync(|file| Ok(file.write(buf)?)),
StdFileResourceKind::Stdin => {
StdFileResourceKind::Stdin(_) => {
Err(Into::<std::io::Error>::into(ErrorKind::Unsupported).into())
}
StdFileResourceKind::Stdout => {
Expand All @@ -457,7 +564,7 @@ impl crate::fs::File for StdFileResourceInner {

fn read_sync(self: Rc<Self>, buf: &mut [u8]) -> FsResult<usize> {
match self.kind {
StdFileResourceKind::File | StdFileResourceKind::Stdin => {
StdFileResourceKind::File | StdFileResourceKind::Stdin(_) => {
self.with_sync(|file| Ok(file.read(buf)?))
}
StdFileResourceKind::Stdout | StdFileResourceKind::Stderr => {
Expand All @@ -471,7 +578,7 @@ impl crate::fs::File for StdFileResourceInner {
StdFileResourceKind::File => {
self.with_sync(|file| Ok(file.write_all(buf)?))
}
StdFileResourceKind::Stdin => {
StdFileResourceKind::Stdin(_) => {
Err(Into::<std::io::Error>::into(ErrorKind::Unsupported).into())
}
StdFileResourceKind::Stdout => {
Expand All @@ -497,7 +604,7 @@ impl crate::fs::File for StdFileResourceInner {
.with_inner_blocking_task(move |file| Ok(file.write_all(&buf)?))
.await
}
StdFileResourceKind::Stdin => {
StdFileResourceKind::Stdin(_) => {
Err(Into::<std::io::Error>::into(ErrorKind::Unsupported).into())
}
StdFileResourceKind::Stdout => {
Expand Down Expand Up @@ -538,7 +645,7 @@ impl crate::fs::File for StdFileResourceInner {
})
.await
}
StdFileResourceKind::Stdin => {
StdFileResourceKind::Stdin(_) => {
Err(Into::<std::io::Error>::into(ErrorKind::Unsupported).into())
}
StdFileResourceKind::Stdout => {
Expand Down Expand Up @@ -568,7 +675,7 @@ impl crate::fs::File for StdFileResourceInner {

fn read_all_sync(self: Rc<Self>) -> FsResult<Vec<u8>> {
match self.kind {
StdFileResourceKind::File | StdFileResourceKind::Stdin => {
StdFileResourceKind::File | StdFileResourceKind::Stdin(_) => {
let mut buf = Vec::new();
self.with_sync(|file| Ok(file.read_to_end(&mut buf)?))?;
Ok(buf)
Expand All @@ -580,7 +687,7 @@ impl crate::fs::File for StdFileResourceInner {
}
async fn read_all_async(self: Rc<Self>) -> FsResult<Vec<u8>> {
match self.kind {
StdFileResourceKind::File | StdFileResourceKind::Stdin => {
StdFileResourceKind::File | StdFileResourceKind::Stdin(_) => {
self
.with_inner_blocking_task(|file| {
let mut buf = Vec::new();
Expand Down Expand Up @@ -736,19 +843,28 @@ impl crate::fs::File for StdFileResourceInner {
self: Rc<Self>,
mut buf: BufMutView,
) -> FsResult<(usize, BufMutView)> {
self
.with_inner_blocking_task(|file| {
let nread = file.read(&mut buf)?;
Ok((nread, buf))
})
.await
match &self.kind {
/* On Windows, we need to handle special read cancellation logic for stdin */
#[cfg(windows)]
StdFileResourceKind::Stdin(state) => {
self.handle_stdin_read(state.clone(), buf).await
}
_ => {
self
.with_inner_blocking_task(|file| {
let nread = file.read(&mut buf)?;
Ok((nread, buf))
})
.await
}
}
}

fn try_clone_inner(self: Rc<Self>) -> FsResult<Rc<dyn fs::File>> {
let inner: &Option<_> = &self.cell.borrow();
match inner {
Some(inner) => Ok(Rc::new(StdFileResourceInner {
kind: self.kind,
kind: self.kind.clone(),
cell: RefCell::new(Some(inner.try_clone()?)),
cell_async_task_queue: Default::default(),
handle: self.handle,
Expand Down
81 changes: 81 additions & 0 deletions runtime/ops/tty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ use rustyline::KeyCode;
use rustyline::KeyEvent;
use rustyline::Modifiers;

#[cfg(windows)]
use deno_core::parking_lot::Mutex;
#[cfg(windows)]
use deno_io::WinTtyState;
#[cfg(windows)]
use std::sync::Arc;

#[cfg(unix)]
use deno_core::ResourceId;
#[cfg(unix)]
Expand Down Expand Up @@ -94,6 +101,7 @@ fn op_set_raw(
#[cfg(windows)]
{
use winapi::shared::minwindef::FALSE;

use winapi::um::consoleapi;

let handle = handle_or_fd;
Expand All @@ -116,6 +124,79 @@ fn op_set_raw(
mode_raw_input_off(original_mode)
};

let stdin_state = state.borrow::<Arc<Mutex<WinTtyState>>>();
let mut stdin_state = stdin_state.lock();

if stdin_state.reading {
let cvar = stdin_state.cvar.clone();

/* Trick to unblock an ongoing line-buffered read operation if not already pending.
See https://github.com/libuv/libuv/pull/866 for prior art */
if original_mode & COOKED_MODE != 0 && !stdin_state.cancelled {
// SAFETY: Write enter key event to force the console wait to return.
let record = unsafe {
let mut record: wincon::INPUT_RECORD = std::mem::zeroed();
record.EventType = wincon::KEY_EVENT;
record.Event.KeyEvent_mut().wVirtualKeyCode =
winapi::um::winuser::VK_RETURN as u16;
record.Event.KeyEvent_mut().bKeyDown = 1;
record.Event.KeyEvent_mut().wRepeatCount = 1;
*record.Event.KeyEvent_mut().uChar.UnicodeChar_mut() = '\r' as u16;
record.Event.KeyEvent_mut().dwControlKeyState = 0;
record.Event.KeyEvent_mut().wVirtualScanCode =
winapi::um::winuser::MapVirtualKeyW(
winapi::um::winuser::VK_RETURN as u32,
winapi::um::winuser::MAPVK_VK_TO_VSC,
) as u16;
record
};
stdin_state.cancelled = true;

// SAFETY: winapi call to open conout$ and save screen state.
let active_screen_buffer = unsafe {
/* Save screen state before sending the VK_RETURN event */
let handle = winapi::um::fileapi::CreateFileW(
"conout$"
.encode_utf16()
.chain(Some(0))
.collect::<Vec<_>>()
.as_ptr(),
winapi::um::winnt::GENERIC_READ | winapi::um::winnt::GENERIC_WRITE,
winapi::um::winnt::FILE_SHARE_READ
| winapi::um::winnt::FILE_SHARE_WRITE,
std::ptr::null_mut(),
winapi::um::fileapi::OPEN_EXISTING,
0,
std::ptr::null_mut(),
);

let mut active_screen_buffer = std::mem::zeroed();
winapi::um::wincon::GetConsoleScreenBufferInfo(
handle,
&mut active_screen_buffer,
);
winapi::um::handleapi::CloseHandle(handle);
active_screen_buffer
};
stdin_state.screen_buffer_info = Some(active_screen_buffer);

// SAFETY: winapi call to write the VK_RETURN event.
if unsafe {
winapi::um::wincon::WriteConsoleInputW(handle, &record, 1, &mut 0)
} == FALSE
{
return Err(Error::last_os_error().into());
}

/* Wait for read thread to acknowledge the cancellation to ensure that nothing
interferes with the screen state.
NOTE: `wait_while` automatically unlocks stdin_state */
cvar.wait_while(&mut stdin_state, |state: &mut WinTtyState| {
state.cancelled
});
}
}

// SAFETY: winapi call
if unsafe { consoleapi::SetConsoleMode(handle, new_mode) } == FALSE {
return Err(Error::last_os_error().into());
Expand Down
14 changes: 14 additions & 0 deletions tests/integration/run_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3202,6 +3202,20 @@ itest!(byte_order_mark {
output: "run/byte_order_mark.out",
});

#[test]
#[cfg(windows)]
fn process_stdin_read_unblock() {
TestContext::default()
.new_command()
.args_vec(["run", "run/process_stdin_unblock.mjs"])
.with_pty(|mut console| {
console.write_raw("b");
console.human_delay();
console.write_line_raw("s");
console.expect_all(&["1", "1"]);
});
}

#[test]
fn issue9750() {
TestContext::default()
Expand Down
Loading