From 86e7af03981da34ce7a47064f80680480508ab13 Mon Sep 17 00:00:00 2001 From: "B. Blechschmidt" Date: Sun, 26 Mar 2023 15:21:56 +0200 Subject: [PATCH] Prevent connection error bubbling from terminating the app --- src/tun2proxy.rs | 306 +++++++++++++++++++++++++---------------------- 1 file changed, 163 insertions(+), 143 deletions(-) diff --git a/src/tun2proxy.rs b/src/tun2proxy.rs index 04d3678..88d2a20 100644 --- a/src/tun2proxy.rs +++ b/src/tun2proxy.rs @@ -384,101 +384,113 @@ impl<'a> TunToProxy<'a> { } } }; - if resolved_conn.proto == IpProtocol::Tcp { - let cm = self.get_connection_manager(&resolved_conn); - if cm.is_none() { - return Ok(()); - } - let server = cm.ok_or("no connect manager")?.get_server(); - if first_packet { - for manager in self.connection_managers.iter_mut() { - if let Some(handler) = - manager.new_connection(&resolved_conn, manager.clone())? - { - let mut socket = tcp::Socket::new( - tcp::SocketBuffer::new(vec![0; 4096]), - tcp::SocketBuffer::new(vec![0; 4096]), - ); - socket.set_ack_delay(None); - let dst = SocketAddr::try_from(connection.dst)?; - socket.listen(dst)?; - let handle = self.sockets.add(socket); - - let client = TcpStream::connect(server)?; - - let token = Token(self.next_token); - self.next_token += 1; - - let mut state = ConnectionState { - smoltcp_handle: handle, - mio_stream: client, - token, - handler, - smoltcp_socket_state: 0, - }; + let dst = connection.dst; + (|| -> Result<(), Error> { + if resolved_conn.proto == IpProtocol::Tcp { + let cm = self.get_connection_manager(&resolved_conn); + if cm.is_none() { + return Ok(()); + } + let server = cm.ok_or("no connect manager")?.get_server(); + if first_packet { + for manager in self.connection_managers.iter_mut() { + if let Some(handler) = + manager.new_connection(&resolved_conn, manager.clone())? + { + let mut socket = tcp::Socket::new( + tcp::SocketBuffer::new(vec![0; 4096]), + tcp::SocketBuffer::new(vec![0; 4096]), + ); + socket.set_ack_delay(None); + let dst = SocketAddr::try_from(dst)?; + socket.listen(dst)?; + let handle = self.sockets.add(socket); + + let client = TcpStream::connect(server)?; + + let token = Token(self.next_token); + self.next_token += 1; + + let mut state = ConnectionState { + smoltcp_handle: handle, + mio_stream: client, + token, + handler, + smoltcp_socket_state: 0, + }; + + self.token_to_connection + .insert(token, resolved_conn.clone()); + self.poll.registry().register( + &mut state.mio_stream, + token, + Interest::READABLE | Interest::WRITABLE, + )?; + + self.connections.insert(resolved_conn.clone(), state); + + info!("CONNECT {}", resolved_conn,); + break; + } + } + } else if !self.connections.contains_key(&resolved_conn) { + return Ok(()); + } - self.token_to_connection - .insert(token, resolved_conn.clone()); - self.poll.registry().register( - &mut state.mio_stream, - token, - Interest::READABLE | Interest::WRITABLE, - )?; + // Inject the packet to advance the smoltcp socket state + self.device.inject_packet(frame); - self.connections.insert(resolved_conn.clone(), state); + // Having advanced the socket state, we expect the socket to ACK + // Exfiltrate the response packets generated by the socket and inject them + // into the tunnel interface. + self.expect_smoltcp_send()?; - info!("CONNECT {}", resolved_conn,); - break; + // Read from the smoltcp socket and push the data to the connection handler. + self.tunsocket_read_and_forward(&resolved_conn)?; + + // The connection handler builds up the connection or encapsulates the data. + // Therefore, we now expect it to write data to the server. + self.write_to_server(&resolved_conn)?; + } else if resolved_conn.proto == IpProtocol::Udp && resolved_conn.dst.port == 53 { + if let Some(virtual_dns) = &mut self.options.virtdns { + let payload = &frame[_payload_offset.._payload_offset + _payload_size]; + if let Some(response) = virtual_dns.receive_query(payload) { + let rx_buffer = udp::PacketBuffer::new( + vec![udp::PacketMetadata::EMPTY], + vec![0; 4096], + ); + let tx_buffer = udp::PacketBuffer::new( + vec![udp::PacketMetadata::EMPTY], + vec![0; 4096], + ); + let mut socket = udp::Socket::new(rx_buffer, tx_buffer); + let dst = SocketAddr::try_from(dst)?; + socket.bind(dst)?; + socket + .send_slice(response.as_slice(), resolved_conn.src.into()) + .expect("failed to send DNS response"); + let handle = self.sockets.add(socket); + self.expect_smoltcp_send()?; + self.sockets.remove(handle); } } - } else if !self.connections.contains_key(&resolved_conn) { - return Ok(()); + // Otherwise, UDP is not yet supported. } - - // Inject the packet to advance the smoltcp socket state - self.device.inject_packet(frame); - - // Having advanced the socket state, we expect the socket to ACK - // Exfiltrate the response packets generated by the socket and inject them - // into the tunnel interface. - self.expect_smoltcp_send()?; - - // Read from the smoltcp socket and push the data to the connection handler. - self.tunsocket_read_and_forward(&resolved_conn)?; - - // The connection handler builds up the connection or encapsulates the data. - // Therefore, we now expect it to write data to the server. - self.write_to_server(&resolved_conn); - } else if resolved_conn.proto == IpProtocol::Udp && resolved_conn.dst.port == 53 { - if let Some(virtual_dns) = &mut self.options.virtdns { - let payload = &frame[_payload_offset.._payload_offset + _payload_size]; - if let Some(response) = virtual_dns.receive_query(payload) { - let rx_buffer = - udp::PacketBuffer::new(vec![udp::PacketMetadata::EMPTY], vec![0; 4096]); - let tx_buffer = - udp::PacketBuffer::new(vec![udp::PacketMetadata::EMPTY], vec![0; 4096]); - let mut socket = udp::Socket::new(rx_buffer, tx_buffer); - let dst = SocketAddr::try_from(connection.dst)?; - socket.bind(dst)?; - socket - .send_slice(response.as_slice(), resolved_conn.src.into()) - .expect("failed to send DNS response"); - let handle = self.sockets.add(socket); - self.expect_smoltcp_send()?; - self.sockets.remove(handle); - } - } - // Otherwise, UDP is not yet supported. - } + Ok(()) + })() + .or_else(|error| { + log::error! {"{error}"} + Ok::<(), Error>(()) + })?; } Ok(()) } - fn write_to_server(&mut self, connection: &Connection) { + fn write_to_server(&mut self, connection: &Connection) -> Result<(), Error> { if let Some(state) = self.connections.get_mut(connection) { let event = state.handler.peek_data(OutgoingDirection::ToServer); if event.buffer.is_empty() { - return; + return Ok(()); } let result = state.mio_stream.write(event.buffer); match result { @@ -488,13 +500,12 @@ impl<'a> TunToProxy<'a> { .consume_data(OutgoingDirection::ToServer, consumed); } Err(error) if error.kind() != std::io::ErrorKind::WouldBlock => { - panic!("Error: {:?}", error); - } - _ => { - // println!("{:?}", result); + return Err(error.into()); } + _ => {} } } + Ok(()) } fn write_to_client(&mut self, token: Token, connection: &Connection) -> Result<(), Error> { @@ -556,7 +567,11 @@ impl<'a> TunToProxy<'a> { let cloned = self.write_sockets.clone(); for token in cloned.iter() { if let Some(connection) = self.token_to_connection.get(token) { - self.write_to_client(*token, &connection.clone())?; + let connection = connection.clone(); + if let Err(error) = self.write_to_client(*token, &connection) { + self.remove_connection(&connection)?; + log::error!("Write to client: {}: ", error); + } } } Ok(()) @@ -565,70 +580,75 @@ impl<'a> TunToProxy<'a> { fn mio_socket_event(&mut self, event: &Event) -> Result<(), Error> { let e = "connection not found"; let conn_ref = self.token_to_connection.get(&event.token()); - if conn_ref.is_none() { - return Ok(()); - } let connection = conn_ref.ok_or(e)?.clone(); - if event.is_readable() || event.is_read_closed() { - { - let state = self.connections.get_mut(&connection).ok_or(e)?; - - // TODO: Move this reading process to its own function. - let mut vecbuf = Vec::::new(); - let read_result = state.mio_stream.read_to_end(&mut vecbuf); - let read = match read_result { - Ok(read_result) => read_result, - Err(error) => { - if error.kind() != std::io::ErrorKind::WouldBlock { - error!("READ from proxy: {}", error); + + (|| -> Result<(), Error> { + if event.is_readable() || event.is_read_closed() { + { + let state = self.connections.get_mut(&connection).ok_or(e)?; + + // TODO: Move this reading process to its own function. + let mut vecbuf = Vec::::new(); + let read_result = state.mio_stream.read_to_end(&mut vecbuf); + let read = match read_result { + Ok(read_result) => read_result, + Err(error) => { + if error.kind() != std::io::ErrorKind::WouldBlock { + error!("Read from proxy: {}", error); + } + vecbuf.len() } - vecbuf.len() - } - }; + }; - if read == 0 { - { - let socket = self.sockets.get_mut::( - self.connections.get(&connection).ok_or(e)?.smoltcp_handle, - ); - socket.close(); + if read == 0 { + { + let socket = self.sockets.get_mut::( + self.connections.get(&connection).ok_or(e)?.smoltcp_handle, + ); + socket.close(); + } + self.expect_smoltcp_send()?; + self.remove_connection(&connection.clone())?; + return Ok(()); } - self.expect_smoltcp_send()?; - self.remove_connection(&connection.clone())?; - return Ok(()); - } - let data = vecbuf.as_slice(); - let data_event = IncomingDataEvent { - direction: IncomingDirection::FromServer, - buffer: &data[0..read], - }; - if let Err(error) = state.handler.push_data(data_event) { - state.mio_stream.shutdown(Both)?; - { - let socket = self.sockets.get_mut::( - self.connections.get(&connection).ok_or(e)?.smoltcp_handle, - ); - socket.close(); + let data = vecbuf.as_slice(); + let data_event = IncomingDataEvent { + direction: IncomingDirection::FromServer, + buffer: &data[0..read], + }; + if let Err(error) = state.handler.push_data(data_event) { + state.mio_stream.shutdown(Both)?; + { + let socket = self.sockets.get_mut::( + self.connections.get(&connection).ok_or(e)?.smoltcp_handle, + ); + socket.close(); + } + self.expect_smoltcp_send()?; + log::error! {"{error}"} + self.remove_connection(&connection.clone())?; + return Ok(()); + } + if event.is_read_closed() { + state.smoltcp_socket_state |= WRITE_CLOSED; } - self.expect_smoltcp_send()?; - log::error! {"{error}"} - self.remove_connection(&connection.clone())?; - return Ok(()); - } - if event.is_read_closed() { - state.smoltcp_socket_state |= WRITE_CLOSED; } - } - // We have read from the proxy server and pushed the data to the connection handler. - // Thus, expect data to be processed (e.g. decapsulated) and forwarded to the client. - self.write_to_client(event.token(), &connection)?; - } - if event.is_writable() { - self.write_to_server(&connection); - } - Ok(()) + // We have read from the proxy server and pushed the data to the connection handler. + // Thus, expect data to be processed (e.g. decapsulated) and forwarded to the client. + self.write_to_client(event.token(), &connection)?; + } + if event.is_writable() { + self.write_to_server(&connection)?; + } + Ok(()) + })() + .or_else(|error| { + self.remove_connection(&connection)?; + log::error! {"{error}"} + Ok(()) + }) } fn udp_event(&mut self, _event: &Event) {}