Browse Source

Prevent connection error bubbling from terminating the app

pull/19/head
B. Blechschmidt 3 years ago
parent
commit
86e7af0398
  1. 306
      src/tun2proxy.rs

306
src/tun2proxy.rs

@ -384,101 +384,113 @@ impl<'a> TunToProxy<'a> {
}
}
};
if resolved_conn.proto == IpProtocol::Tcp {
let cm = self.get_connection_manager(&resolved_conn);
if cm.is_none() {
return Ok(());
}
let server = cm.ok_or("no connect manager")?.get_server();
if first_packet {
for manager in self.connection_managers.iter_mut() {
if let Some(handler) =
manager.new_connection(&resolved_conn, manager.clone())?
{
let mut socket = tcp::Socket::new(
tcp::SocketBuffer::new(vec![0; 4096]),
tcp::SocketBuffer::new(vec![0; 4096]),
);
socket.set_ack_delay(None);
let dst = SocketAddr::try_from(connection.dst)?;
socket.listen(dst)?;
let handle = self.sockets.add(socket);
let client = TcpStream::connect(server)?;
let token = Token(self.next_token);
self.next_token += 1;
let mut state = ConnectionState {
smoltcp_handle: handle,
mio_stream: client,
token,
handler,
smoltcp_socket_state: 0,
};
let dst = connection.dst;
(|| -> Result<(), Error> {
if resolved_conn.proto == IpProtocol::Tcp {
let cm = self.get_connection_manager(&resolved_conn);
if cm.is_none() {
return Ok(());
}
let server = cm.ok_or("no connect manager")?.get_server();
if first_packet {
for manager in self.connection_managers.iter_mut() {
if let Some(handler) =
manager.new_connection(&resolved_conn, manager.clone())?
{
let mut socket = tcp::Socket::new(
tcp::SocketBuffer::new(vec![0; 4096]),
tcp::SocketBuffer::new(vec![0; 4096]),
);
socket.set_ack_delay(None);
let dst = SocketAddr::try_from(dst)?;
socket.listen(dst)?;
let handle = self.sockets.add(socket);
let client = TcpStream::connect(server)?;
let token = Token(self.next_token);
self.next_token += 1;
let mut state = ConnectionState {
smoltcp_handle: handle,
mio_stream: client,
token,
handler,
smoltcp_socket_state: 0,
};
self.token_to_connection
.insert(token, resolved_conn.clone());
self.poll.registry().register(
&mut state.mio_stream,
token,
Interest::READABLE | Interest::WRITABLE,
)?;
self.connections.insert(resolved_conn.clone(), state);
info!("CONNECT {}", resolved_conn,);
break;
}
}
} else if !self.connections.contains_key(&resolved_conn) {
return Ok(());
}
self.token_to_connection
.insert(token, resolved_conn.clone());
self.poll.registry().register(
&mut state.mio_stream,
token,
Interest::READABLE | Interest::WRITABLE,
)?;
// Inject the packet to advance the smoltcp socket state
self.device.inject_packet(frame);
self.connections.insert(resolved_conn.clone(), state);
// Having advanced the socket state, we expect the socket to ACK
// Exfiltrate the response packets generated by the socket and inject them
// into the tunnel interface.
self.expect_smoltcp_send()?;
info!("CONNECT {}", resolved_conn,);
break;
// Read from the smoltcp socket and push the data to the connection handler.
self.tunsocket_read_and_forward(&resolved_conn)?;
// The connection handler builds up the connection or encapsulates the data.
// Therefore, we now expect it to write data to the server.
self.write_to_server(&resolved_conn)?;
} else if resolved_conn.proto == IpProtocol::Udp && resolved_conn.dst.port == 53 {
if let Some(virtual_dns) = &mut self.options.virtdns {
let payload = &frame[_payload_offset.._payload_offset + _payload_size];
if let Some(response) = virtual_dns.receive_query(payload) {
let rx_buffer = udp::PacketBuffer::new(
vec![udp::PacketMetadata::EMPTY],
vec![0; 4096],
);
let tx_buffer = udp::PacketBuffer::new(
vec![udp::PacketMetadata::EMPTY],
vec![0; 4096],
);
let mut socket = udp::Socket::new(rx_buffer, tx_buffer);
let dst = SocketAddr::try_from(dst)?;
socket.bind(dst)?;
socket
.send_slice(response.as_slice(), resolved_conn.src.into())
.expect("failed to send DNS response");
let handle = self.sockets.add(socket);
self.expect_smoltcp_send()?;
self.sockets.remove(handle);
}
}
} else if !self.connections.contains_key(&resolved_conn) {
return Ok(());
// Otherwise, UDP is not yet supported.
}
// Inject the packet to advance the smoltcp socket state
self.device.inject_packet(frame);
// Having advanced the socket state, we expect the socket to ACK
// Exfiltrate the response packets generated by the socket and inject them
// into the tunnel interface.
self.expect_smoltcp_send()?;
// Read from the smoltcp socket and push the data to the connection handler.
self.tunsocket_read_and_forward(&resolved_conn)?;
// The connection handler builds up the connection or encapsulates the data.
// Therefore, we now expect it to write data to the server.
self.write_to_server(&resolved_conn);
} else if resolved_conn.proto == IpProtocol::Udp && resolved_conn.dst.port == 53 {
if let Some(virtual_dns) = &mut self.options.virtdns {
let payload = &frame[_payload_offset.._payload_offset + _payload_size];
if let Some(response) = virtual_dns.receive_query(payload) {
let rx_buffer =
udp::PacketBuffer::new(vec![udp::PacketMetadata::EMPTY], vec![0; 4096]);
let tx_buffer =
udp::PacketBuffer::new(vec![udp::PacketMetadata::EMPTY], vec![0; 4096]);
let mut socket = udp::Socket::new(rx_buffer, tx_buffer);
let dst = SocketAddr::try_from(connection.dst)?;
socket.bind(dst)?;
socket
.send_slice(response.as_slice(), resolved_conn.src.into())
.expect("failed to send DNS response");
let handle = self.sockets.add(socket);
self.expect_smoltcp_send()?;
self.sockets.remove(handle);
}
}
// Otherwise, UDP is not yet supported.
}
Ok(())
})()
.or_else(|error| {
log::error! {"{error}"}
Ok::<(), Error>(())
})?;
}
Ok(())
}
fn write_to_server(&mut self, connection: &Connection) {
fn write_to_server(&mut self, connection: &Connection) -> Result<(), Error> {
if let Some(state) = self.connections.get_mut(connection) {
let event = state.handler.peek_data(OutgoingDirection::ToServer);
if event.buffer.is_empty() {
return;
return Ok(());
}
let result = state.mio_stream.write(event.buffer);
match result {
@ -488,13 +500,12 @@ impl<'a> TunToProxy<'a> {
.consume_data(OutgoingDirection::ToServer, consumed);
}
Err(error) if error.kind() != std::io::ErrorKind::WouldBlock => {
panic!("Error: {:?}", error);
}
_ => {
// println!("{:?}", result);
return Err(error.into());
}
_ => {}
}
}
Ok(())
}
fn write_to_client(&mut self, token: Token, connection: &Connection) -> Result<(), Error> {
@ -556,7 +567,11 @@ impl<'a> TunToProxy<'a> {
let cloned = self.write_sockets.clone();
for token in cloned.iter() {
if let Some(connection) = self.token_to_connection.get(token) {
self.write_to_client(*token, &connection.clone())?;
let connection = connection.clone();
if let Err(error) = self.write_to_client(*token, &connection) {
self.remove_connection(&connection)?;
log::error!("Write to client: {}: ", error);
}
}
}
Ok(())
@ -565,70 +580,75 @@ impl<'a> TunToProxy<'a> {
fn mio_socket_event(&mut self, event: &Event) -> Result<(), Error> {
let e = "connection not found";
let conn_ref = self.token_to_connection.get(&event.token());
if conn_ref.is_none() {
return Ok(());
}
let connection = conn_ref.ok_or(e)?.clone();
if event.is_readable() || event.is_read_closed() {
{
let state = self.connections.get_mut(&connection).ok_or(e)?;
// TODO: Move this reading process to its own function.
let mut vecbuf = Vec::<u8>::new();
let read_result = state.mio_stream.read_to_end(&mut vecbuf);
let read = match read_result {
Ok(read_result) => read_result,
Err(error) => {
if error.kind() != std::io::ErrorKind::WouldBlock {
error!("READ from proxy: {}", error);
(|| -> Result<(), Error> {
if event.is_readable() || event.is_read_closed() {
{
let state = self.connections.get_mut(&connection).ok_or(e)?;
// TODO: Move this reading process to its own function.
let mut vecbuf = Vec::<u8>::new();
let read_result = state.mio_stream.read_to_end(&mut vecbuf);
let read = match read_result {
Ok(read_result) => read_result,
Err(error) => {
if error.kind() != std::io::ErrorKind::WouldBlock {
error!("Read from proxy: {}", error);
}
vecbuf.len()
}
vecbuf.len()
}
};
};
if read == 0 {
{
let socket = self.sockets.get_mut::<tcp::Socket>(
self.connections.get(&connection).ok_or(e)?.smoltcp_handle,
);
socket.close();
if read == 0 {
{
let socket = self.sockets.get_mut::<tcp::Socket>(
self.connections.get(&connection).ok_or(e)?.smoltcp_handle,
);
socket.close();
}
self.expect_smoltcp_send()?;
self.remove_connection(&connection.clone())?;
return Ok(());
}
self.expect_smoltcp_send()?;
self.remove_connection(&connection.clone())?;
return Ok(());
}
let data = vecbuf.as_slice();
let data_event = IncomingDataEvent {
direction: IncomingDirection::FromServer,
buffer: &data[0..read],
};
if let Err(error) = state.handler.push_data(data_event) {
state.mio_stream.shutdown(Both)?;
{
let socket = self.sockets.get_mut::<tcp::Socket>(
self.connections.get(&connection).ok_or(e)?.smoltcp_handle,
);
socket.close();
let data = vecbuf.as_slice();
let data_event = IncomingDataEvent {
direction: IncomingDirection::FromServer,
buffer: &data[0..read],
};
if let Err(error) = state.handler.push_data(data_event) {
state.mio_stream.shutdown(Both)?;
{
let socket = self.sockets.get_mut::<tcp::Socket>(
self.connections.get(&connection).ok_or(e)?.smoltcp_handle,
);
socket.close();
}
self.expect_smoltcp_send()?;
log::error! {"{error}"}
self.remove_connection(&connection.clone())?;
return Ok(());
}
if event.is_read_closed() {
state.smoltcp_socket_state |= WRITE_CLOSED;
}
self.expect_smoltcp_send()?;
log::error! {"{error}"}
self.remove_connection(&connection.clone())?;
return Ok(());
}
if event.is_read_closed() {
state.smoltcp_socket_state |= WRITE_CLOSED;
}
}
// We have read from the proxy server and pushed the data to the connection handler.
// Thus, expect data to be processed (e.g. decapsulated) and forwarded to the client.
self.write_to_client(event.token(), &connection)?;
}
if event.is_writable() {
self.write_to_server(&connection);
}
Ok(())
// We have read from the proxy server and pushed the data to the connection handler.
// Thus, expect data to be processed (e.g. decapsulated) and forwarded to the client.
self.write_to_client(event.token(), &connection)?;
}
if event.is_writable() {
self.write_to_server(&connection)?;
}
Ok(())
})()
.or_else(|error| {
self.remove_connection(&connection)?;
log::error! {"{error}"}
Ok(())
})
}
fn udp_event(&mut self, _event: &Event) {}

Loading…
Cancel
Save