diff --git a/src/network/task.rs b/src/network/task.rs index 903f635f..b1a194aa 100755 --- a/src/network/task.rs +++ b/src/network/task.rs @@ -3,7 +3,10 @@ use std::fmt; use anyhow::Result; use std::time::Duration; -use tokio::sync::{mpsc, mpsc::{Permit, Receiver, Sender, UnboundedReceiver}}; +use tokio::sync::{ + mpsc, + mpsc::{Permit, Receiver, Sender, UnboundedReceiver}, +}; use tokio::task::JoinHandle; use crate::messages::{NetworkCommand, NetworkEvent, TransportCommand, TransportEvent}; diff --git a/src/network/tests.rs b/src/network/tests.rs index 7b4d1240..5dcd5525 100755 --- a/src/network/tests.rs +++ b/src/network/tests.rs @@ -1,8 +1,14 @@ use std::net::{Ipv6Addr, SocketAddr}; +use super::task::NetworkTask; +use crate::messages::{ + NetworkCommand, NetworkEvent, SmolPacket, TransportCommand, TransportEvent, TunnelInfo, +}; +use crate::shutdown; use anyhow::{anyhow, Result}; use internet_packet::InternetPacket; use smoltcp::{phy::ChecksumCapabilities, wire::*}; +use tokio::sync::watch; use tokio::{ sync::{ mpsc::{channel, unbounded_channel, Receiver, Sender, UnboundedSender}, @@ -10,12 +16,6 @@ use tokio::{ }, task::JoinHandle, }; -use tokio::sync::watch; -use crate::messages::{ - NetworkCommand, NetworkEvent, SmolPacket, TransportCommand, TransportEvent, TunnelInfo, -}; -use crate::shutdown; -use super::task::NetworkTask; struct MockNetwork { wg_to_smol_tx: Sender, diff --git a/src/network/udp.rs b/src/network/udp.rs index 50b09776..1b85c600 100644 --- a/src/network/udp.rs +++ b/src/network/udp.rs @@ -265,9 +265,9 @@ mod tests { use super::*; use crate::packet_sources::udp::UdpConf; use crate::packet_sources::{PacketSourceConf, PacketSourceTask}; + use crate::shutdown; use std::net::{IpAddr, Ipv4Addr}; use tokio::net::UdpSocket; - use crate::shutdown; #[test] fn test_connection_state_recv_recv_read_read() { diff --git a/src/packet_sources/linux.rs b/src/packet_sources/linux.rs index 894e3d7f..6030dfe0 100755 --- a/src/packet_sources/linux.rs +++ b/src/packet_sources/linux.rs @@ -15,11 +15,11 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use crate::intercept_conf::InterceptConf; use crate::messages::{TransportCommand, TransportEvent}; use crate::packet_sources::{forward_packets, PacketSourceConf, PacketSourceTask}; +use crate::shutdown; use tempfile::{tempdir, TempDir}; use tokio::net::UnixDatagram; use tokio::process::Command; use tokio::time::timeout; -use crate::shutdown; async fn start_redirector( executable: &Path, @@ -62,7 +62,7 @@ async fn start_redirector( if shutdown2.is_shutting_down() { // We don't want to log during exit, https://github.com/vorner/pyo3-log/issues/30 eprintln!("{}", line); - continue + continue; } let new_level = line @@ -104,11 +104,11 @@ async fn start_redirector( Duration::from_secs(5), BufReader::new(stdout).lines().next_line(), ) - .await - .context("failed to establish connection to Linux redirector")? - .context("failed to read redirector stdout")? - .map(PathBuf::from) - .context("redirector did not produce stdout") + .await + .context("failed to establish connection to Linux redirector")? + .context("failed to read redirector stdout")? + .map(PathBuf::from) + .context("redirector did not produce stdout") } pub struct LinuxConf { @@ -169,7 +169,8 @@ impl PacketSourceConf for LinuxConf { let datagram_dir = tempdir().context("failed to create temp dir")?; let channel = UnixDatagram::bind(datagram_dir.path().join("mitmproxy"))?; - let dst = start_redirector(&self.executable_path, datagram_dir.path(), shutdown.clone()).await?; + let dst = + start_redirector(&self.executable_path, datagram_dir.path(), shutdown.clone()).await?; channel .connect(&dst) diff --git a/src/packet_sources/macos.rs b/src/packet_sources/macos.rs index d618c50f..824f6067 100644 --- a/src/packet_sources/macos.rs +++ b/src/packet_sources/macos.rs @@ -25,7 +25,7 @@ use crate::network::udp::ConnectionState; use tokio::process::Command; use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; -use tokio::sync::{watch, oneshot}; +use tokio::sync::{oneshot, watch}; use tokio::task::JoinSet; use tokio::time::timeout; use tokio_util::codec::{Framed, LengthDelimitedCodec}; diff --git a/src/packet_sources/mod.rs b/src/packet_sources/mod.rs index d9dc68d4..45b71679 100755 --- a/src/packet_sources/mod.rs +++ b/src/packet_sources/mod.rs @@ -10,8 +10,8 @@ use prost::bytes::{Bytes, BytesMut}; use prost::Message; use std::future::Future; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use tokio::sync::mpsc::{Sender, UnboundedReceiver}; use tokio::sync::mpsc; +use tokio::sync::mpsc::{Sender, UnboundedReceiver}; #[cfg(target_os = "linux")] pub mod linux; diff --git a/src/packet_sources/tun.rs b/src/packet_sources/tun.rs index 32f20a22..638e14a9 100644 --- a/src/packet_sources/tun.rs +++ b/src/packet_sources/tun.rs @@ -3,13 +3,13 @@ use crate::messages::{ }; use crate::network::{add_network_layer, MAX_PACKET_SIZE}; use crate::packet_sources::{PacketSourceConf, PacketSourceTask}; +use crate::shutdown; use anyhow::{Context, Result}; use std::cmp::max; use std::fs; -use tokio::sync::mpsc::{Permit, Receiver, UnboundedReceiver}; use tokio::sync::mpsc::Sender; +use tokio::sync::mpsc::{Permit, Receiver, UnboundedReceiver}; use tun::AbstractDevice; -use crate::shutdown; pub struct TunConf { pub tun_name: Option, diff --git a/src/packet_sources/udp.rs b/src/packet_sources/udp.rs index 5874ae8d..d654239a 100644 --- a/src/packet_sources/udp.rs +++ b/src/packet_sources/udp.rs @@ -3,14 +3,14 @@ use std::str::FromStr; use anyhow::{Context, Result}; -use socket2::{Domain, Protocol, Socket, Type}; -use tokio::sync::mpsc::{Permit, UnboundedReceiver, Sender}; -use tokio::net::UdpSocket; use crate::messages::{TransportCommand, TransportEvent, TunnelInfo}; use crate::network::udp::{UdpHandler, UdpPacket}; use crate::network::MAX_PACKET_SIZE; use crate::packet_sources::{PacketSourceConf, PacketSourceTask}; use crate::shutdown; +use socket2::{Domain, Protocol, Socket, Type}; +use tokio::net::UdpSocket; +use tokio::sync::mpsc::{Permit, Sender, UnboundedReceiver}; pub fn remote_host_closed_conn(_res: &Result) -> bool { #[cfg(windows)] diff --git a/src/packet_sources/windows.rs b/src/packet_sources/windows.rs index 0065bf04..833d18db 100755 --- a/src/packet_sources/windows.rs +++ b/src/packet_sources/windows.rs @@ -1,133 +1,133 @@ -use std::iter; -use std::os::windows::ffi::OsStrExt; -use std::path::PathBuf; - -use anyhow::{anyhow, Result}; -use tokio::net::windows::named_pipe::{NamedPipeServer, PipeMode, ServerOptions}; -use tokio::sync::watch; -use tokio::sync::mpsc::Sender; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; -use windows::core::w; -use windows::core::PCWSTR; -use windows::Win32::UI::Shell::ShellExecuteW; -use windows::Win32::UI::Shell::SE_ERR_ACCESSDENIED; -use windows::Win32::UI::WindowsAndMessaging::{SW_HIDE, SW_SHOWNORMAL}; - -use crate::intercept_conf::InterceptConf; -use crate::messages::{TransportCommand, TransportEvent}; -use crate::packet_sources::{forward_packets, PacketSourceConf, PacketSourceTask, IPC_BUF_SIZE}; - -pub struct WindowsConf { - pub executable_path: PathBuf, -} - -impl PacketSourceConf for WindowsConf { - type Task = WindowsTask; - type Data = UnboundedSender; - - fn name(&self) -> &'static str { - "Windows proxy" - } - - async fn build( - self, - transport_events_tx: Sender, - transport_commands_rx: UnboundedReceiver, - shutdown: shutdown::Receiver, - ) -> Result<(Self::Task, Self::Data)> { - let pipe_name = format!( - r"\\.\pipe\mitmproxy-transparent-proxy-{}", - std::process::id() - ); - - let ipc_server = ServerOptions::new() - .pipe_mode(PipeMode::Message) - .first_pipe_instance(true) - .max_instances(1) - .in_buffer_size(IPC_BUF_SIZE as u32) - .out_buffer_size(IPC_BUF_SIZE as u32) - .reject_remote_clients(true) - .create(&pipe_name)?; - - log::debug!("starting {} {}", self.executable_path.display(), pipe_name); - - let pipe_name = pipe_name - .encode_utf16() - .chain(iter::once(0)) - .collect::>(); - - let executable_path = self - .executable_path - .as_os_str() - .encode_wide() - .chain(iter::once(0)) - .collect::>(); - - let result = unsafe { - ShellExecuteW( - None, - w!("runas"), - PCWSTR::from_raw(executable_path.as_ptr()), - PCWSTR::from_raw(pipe_name.as_ptr()), - None, - if cfg!(debug_assertions) { - SW_SHOWNORMAL - } else { - SW_HIDE - }, - ) - }; - - if cfg!(debug_assertions) { - if result.0 <= 32 { - let err = windows::core::Error::from_win32(); - log::warn!("Failed to start child process: {}", err); - } - } else if result.0 == SE_ERR_ACCESSDENIED as isize { - return Err(anyhow!( - "Failed to start the interception process as administrator." - )); - } else if result.0 <= 32 { - let err = windows::core::Error::from_win32(); - return Err(anyhow!("Failed to start the executable: {}", err)); - } - - let (conf_tx, conf_rx) = unbounded_channel(); - - Ok(( - WindowsTask { - ipc_server, - transport_events_tx, - transport_commands_rx, - conf_rx, - shutdown, - }, - conf_tx, - )) - } -} - -pub struct WindowsTask { - ipc_server: NamedPipeServer, - transport_events_tx: Sender, - transport_commands_rx: UnboundedReceiver, - conf_rx: UnboundedReceiver, - shutdown: shutdown::Receiver, -} - -impl PacketSourceTask for WindowsTask { - async fn run(self) -> Result<()> { - log::debug!("Waiting for IPC connection..."); - self.ipc_server.connect().await?; - log::debug!("IPC connected!"); - - forward_packets( - self.ipc_server, - self.transport_events_tx, - self.transport_commands_rx, - self.conf_rx, - self.shutdown, - ) - .await - } -} +use std::iter; +use std::os::windows::ffi::OsStrExt; +use std::path::PathBuf; + +use anyhow::{anyhow, Result}; +use tokio::net::windows::named_pipe::{NamedPipeServer, PipeMode, ServerOptions}; +use tokio::sync::mpsc::Sender; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::watch; +use windows::core::w; +use windows::core::PCWSTR; +use windows::Win32::UI::Shell::ShellExecuteW; +use windows::Win32::UI::Shell::SE_ERR_ACCESSDENIED; +use windows::Win32::UI::WindowsAndMessaging::{SW_HIDE, SW_SHOWNORMAL}; + +use crate::intercept_conf::InterceptConf; +use crate::messages::{TransportCommand, TransportEvent}; +use crate::packet_sources::{forward_packets, PacketSourceConf, PacketSourceTask, IPC_BUF_SIZE}; + +pub struct WindowsConf { + pub executable_path: PathBuf, +} + +impl PacketSourceConf for WindowsConf { + type Task = WindowsTask; + type Data = UnboundedSender; + + fn name(&self) -> &'static str { + "Windows proxy" + } + + async fn build( + self, + transport_events_tx: Sender, + transport_commands_rx: UnboundedReceiver, + shutdown: shutdown::Receiver, + ) -> Result<(Self::Task, Self::Data)> { + let pipe_name = format!( + r"\\.\pipe\mitmproxy-transparent-proxy-{}", + std::process::id() + ); + + let ipc_server = ServerOptions::new() + .pipe_mode(PipeMode::Message) + .first_pipe_instance(true) + .max_instances(1) + .in_buffer_size(IPC_BUF_SIZE as u32) + .out_buffer_size(IPC_BUF_SIZE as u32) + .reject_remote_clients(true) + .create(&pipe_name)?; + + log::debug!("starting {} {}", self.executable_path.display(), pipe_name); + + let pipe_name = pipe_name + .encode_utf16() + .chain(iter::once(0)) + .collect::>(); + + let executable_path = self + .executable_path + .as_os_str() + .encode_wide() + .chain(iter::once(0)) + .collect::>(); + + let result = unsafe { + ShellExecuteW( + None, + w!("runas"), + PCWSTR::from_raw(executable_path.as_ptr()), + PCWSTR::from_raw(pipe_name.as_ptr()), + None, + if cfg!(debug_assertions) { + SW_SHOWNORMAL + } else { + SW_HIDE + }, + ) + }; + + if cfg!(debug_assertions) { + if result.0 <= 32 { + let err = windows::core::Error::from_win32(); + log::warn!("Failed to start child process: {}", err); + } + } else if result.0 == SE_ERR_ACCESSDENIED as isize { + return Err(anyhow!( + "Failed to start the interception process as administrator." + )); + } else if result.0 <= 32 { + let err = windows::core::Error::from_win32(); + return Err(anyhow!("Failed to start the executable: {}", err)); + } + + let (conf_tx, conf_rx) = unbounded_channel(); + + Ok(( + WindowsTask { + ipc_server, + transport_events_tx, + transport_commands_rx, + conf_rx, + shutdown, + }, + conf_tx, + )) + } +} + +pub struct WindowsTask { + ipc_server: NamedPipeServer, + transport_events_tx: Sender, + transport_commands_rx: UnboundedReceiver, + conf_rx: UnboundedReceiver, + shutdown: shutdown::Receiver, +} + +impl PacketSourceTask for WindowsTask { + async fn run(self) -> Result<()> { + log::debug!("Waiting for IPC connection..."); + self.ipc_server.connect().await?; + log::debug!("IPC connected!"); + + forward_packets( + self.ipc_server, + self.transport_events_tx, + self.transport_commands_rx, + self.conf_rx, + self.shutdown, + ) + .await + } +} diff --git a/src/packet_sources/wireguard.rs b/src/packet_sources/wireguard.rs index 5a3d4eba..5456d711 100755 --- a/src/packet_sources/wireguard.rs +++ b/src/packet_sources/wireguard.rs @@ -2,6 +2,11 @@ use std::collections::HashMap; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::Arc; +use crate::messages::{ + NetworkCommand, NetworkEvent, SmolPacket, TransportCommand, TransportEvent, TunnelInfo, +}; +use crate::network::{add_network_layer, MAX_PACKET_SIZE}; +use crate::packet_sources::{PacketSourceConf, PacketSourceTask}; use anyhow::{anyhow, Context, Result}; use boringtun::noise::{ errors::WireGuardError, handshake::parse_handshake_anon, Packet, Tunn, TunnResult, @@ -17,11 +22,6 @@ use tokio::{ Mutex, }, }; -use crate::messages::{ - NetworkCommand, NetworkEvent, SmolPacket, TransportCommand, TransportEvent, TunnelInfo, -}; -use crate::network::{add_network_layer, MAX_PACKET_SIZE}; -use crate::packet_sources::{PacketSourceConf, PacketSourceTask}; use crate::packet_sources::udp::remote_host_closed_conn; use crate::shutdown; diff --git a/src/shutdown.rs b/src/shutdown.rs index ff87a9c5..cd112abc 100755 --- a/src/shutdown.rs +++ b/src/shutdown.rs @@ -1,5 +1,5 @@ -use std::fmt::{Debug, Formatter}; use anyhow::Result; +use std::fmt::{Debug, Formatter}; use tokio::sync::watch; use tokio::task::JoinSet; @@ -21,7 +21,7 @@ impl Receiver { impl Debug for Receiver { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_tuple("Shutdown") - .field(&self.is_shutting_down()) + .field(&self.is_shutting_down()) .finish() } } @@ -56,7 +56,6 @@ pub async fn shutdown_task(mut tasks: JoinSet>, shutdown_done: watch: shutdown_done.send(()).ok(); } - #[cfg(test)] mod tests { use super::*;