diff --git a/source/gosling/crates/tor-interface/src/arti_process.rs b/source/gosling/crates/tor-interface/src/arti_process.rs index 89a82277..ffa8631d 100644 --- a/source/gosling/crates/tor-interface/src/arti_process.rs +++ b/source/gosling/crates/tor-interface/src/arti_process.rs @@ -1,11 +1,12 @@ // standard use std::fs; use std::fs::File; -use std::io::Write; +use std::io::{BufRead, BufReader, Write}; use std::ops::Drop; use std::process; use std::process::{Child, ChildStdout, Command, Stdio}; use std::path::Path; +use std::sync::{Mutex, Weak}; use std::time::{Duration, Instant}; #[derive(thiserror::Error, Debug)] @@ -19,20 +20,26 @@ pub enum Error { #[error("provided data directory '{0}' must be an absolute path")] ArtiDataDirectoryPathNotAbsolute(String), - #[error("failed to create data directory")] + #[error("failed to create data directory: {0}")] ArtiDataDirectoryCreationFailed(#[source] std::io::Error), #[error("file exists in provided data directory path '{0}'")] ArtiDataDirectoryPathExistsAsFile(String), - #[error("failed to create arti.toml file")] + #[error("failed to create arti.toml file: {0}")] ArtiTomlFileCreationFailed(#[source] std::io::Error), - #[error("failed to write arti.toml file")] + #[error("failed to write arti.toml file: {0}")] ArtiTomlFileWriteFailed(#[source] std::io::Error), - #[error("failed to start arti process")] + #[error("failed to start arti process: {0}")] ArtiProcessStartFailed(#[source] std::io::Error), + + #[error("unable to take arti process stdout")] + ArtiProcessStdoutTakeFailed(), + + #[error("failed to spawn arti process stdout read thread: {0}")] + ArtiStdoutReadThreadSpawnFailed(#[source] std::io::Error), } pub(crate) struct ArtiProcess { @@ -41,7 +48,7 @@ pub(crate) struct ArtiProcess { } impl ArtiProcess { - pub fn new(arti_bin_path: &Path, data_directory: &Path) -> Result { + pub fn new(arti_bin_path: &Path, data_directory: &Path, stdout_lines: Weak>>) -> Result { // verify provided paths are absolute if arti_bin_path.is_relative() { return Err(Error::ArtiBinPathNotAbsolute(format!( @@ -94,9 +101,8 @@ impl ArtiProcess { .write_all(arti_toml_content.as_bytes()) .map_err(Error::ArtiTomlFileWriteFailed)?; - let process = Command::new(arti_bin_path.as_os_str()) - // TODO: make this pipe() and fwd log events - .stdout(Stdio::inherit()) + let mut process = Command::new(arti_bin_path.as_os_str()) + .stdout(Stdio::piped()) .stdin(Stdio::null()) .stderr(Stdio::null()) // set working directory to data directory @@ -109,6 +115,18 @@ impl ArtiProcess { .spawn() .map_err(Error::ArtiProcessStartFailed)?; + // spawn a task to read stdout lines and forward to list + let stdout = BufReader::new(match process.stdout.take() { + Some(stdout) => stdout, + None => return Err(Error::ArtiProcessStdoutTakeFailed()), + }); + std::thread::Builder::new() + .name("arti_stdout_reader".to_string()) + .spawn(move || { + ArtiProcess::read_stdout_task(&stdout_lines, stdout); + }) + .map_err(Error::ArtiStdoutReadThreadSpawnFailed)?; + let connect_string = format!("unix:{rpc_listen}"); Ok(ArtiProcess { process, connect_string }) @@ -117,6 +135,26 @@ impl ArtiProcess { pub fn connect_string(&self) -> &str { self.connect_string.as_str() } + + fn read_stdout_task( + stdout_lines: &std::sync::Weak>>, + mut stdout: BufReader, + ) { + while let Some(stdout_lines) = stdout_lines.upgrade() { + let mut line = String::default(); + // read line + if stdout.read_line(&mut line).is_ok() { + // remove trailing '\n' + line.pop(); + // then acquire the lock on the line buffer + let mut stdout_lines = match stdout_lines.lock() { + Ok(stdout_lines) => stdout_lines, + Err(_) => unreachable!(), + }; + stdout_lines.push(line); + } + } + } } impl Drop for ArtiProcess { diff --git a/source/gosling/crates/tor-interface/src/arti_tor_client.rs b/source/gosling/crates/tor-interface/src/arti_tor_client.rs index 28d6701d..585ecc46 100644 --- a/source/gosling/crates/tor-interface/src/arti_tor_client.rs +++ b/source/gosling/crates/tor-interface/src/arti_tor_client.rs @@ -56,6 +56,7 @@ pub enum ArtiTorClientConfig { pub struct ArtiTorClient { daemon: Option, rpc_conn: RpcConn, + pending_log_lines: Arc>>, pending_events: Arc>>, bootstrapped: bool, // our list of circuit tokens for the arti daemon @@ -65,14 +66,17 @@ pub struct ArtiTorClient { impl ArtiTorClient { pub fn new(config: ArtiTorClientConfig) -> Result { + let pending_log_lines: Arc>> = Default::default(); + let (daemon, rpc_conn) = match &config { ArtiTorClientConfig::BundledArti { arti_bin_path, data_directory, } => { + // launch arti let daemon = - ArtiProcess::new(arti_bin_path.as_path(), data_directory.as_path()) + ArtiProcess::new(arti_bin_path.as_path(), data_directory.as_path(), Arc::downgrade(&pending_log_lines)) .map_err(Error::ArtiProcessCreationFailed)?; let builder = RpcConnBuilder::from_connect_string(daemon.connect_string()).unwrap(); @@ -109,6 +113,7 @@ impl ArtiTorClient { Ok(Self { daemon: Some(daemon), rpc_conn, + pending_log_lines, pending_events, bootstrapped: false, circuit_token_counter: 0, @@ -120,12 +125,28 @@ impl ArtiTorClient { impl TorProvider for ArtiTorClient { fn update(&mut self) -> Result, tor_provider::Error> { std::thread::sleep(std::time::Duration::from_millis(16)); - match self.pending_events.lock() { - Ok(mut pending_events) => Ok(std::mem::take(pending_events.deref_mut())), + let mut tor_events = match self.pending_events.lock() { + Ok(mut pending_events) => std::mem::take(pending_events.deref_mut()), Err(_) => { unreachable!("another thread panicked while holding this pending_events mutex") } + }; + // take our log lines + let mut log_lines = match self.pending_log_lines.lock() { + Ok(mut pending_log_lines) => std::mem::take(pending_log_lines.deref_mut()), + Err(_) => { + unreachable!("another thread panicked while holding this pending_log_lines mutex") + } + }; + + // append raw lines as TorEvent + for log_line in log_lines.iter_mut() { + tor_events.push(TorEvent::LogReceived { + line: std::mem::take(log_line), + }); } + + Ok(tor_events) } fn bootstrap(&mut self) -> Result<(), tor_provider::Error> {