Browse Source

Add graceful shutdown to heartbeat and socket handlers

- Add CancellationToken parameter to `UdpGwClient::heartbeat_task` and make sleep cancellation-aware
- Update `process_socket_requests` to accept a CancellationToken and exit when cancelled
- Pass shutdown token into spawned socket and heartbeat tasks and log errors from them
- Add start/exit log messages for heartbeat and socket request handlers
pull/239/head
ssrlive 5 months ago
parent
commit
5c22bcd436
  1. 8
      src/bin/main.rs
  2. 5
      src/lib.rs
  3. 10
      src/socket_transfer.rs
  4. 11
      src/udpgw.rs

8
src/bin/main.rs

@ -111,7 +111,7 @@ async fn main_async(args: Args) -> Result<(), BoxError> {
#[cfg(target_os = "linux")]
async fn namespace_proxy_main(
_args: Args,
_shutdown_token: tokio_util::sync::CancellationToken,
shutdown_token: tokio_util::sync::CancellationToken,
) -> Result<std::process::ExitStatus, tun2proxy::Error> {
use nix::fcntl::{OFlag, open};
use nix::sys::stat::Mode;
@ -153,7 +153,11 @@ async fn namespace_proxy_main(
log::info!("Writing unshare pid to {pidfile}");
std::fs::write(pidfile, unshare_pid.to_string()).ok();
}
tokio::spawn(async move { tun2proxy::socket_transfer::process_socket_requests(&socket).await });
tokio::spawn(async move {
if let Err(err) = tun2proxy::socket_transfer::process_socket_requests(&socket, shutdown_token).await {
log::error!("namespace proxy socket request handler error: {err}");
}
});
Ok(child.wait().await?)
}

5
src/lib.rs

@ -253,8 +253,11 @@ where
addr,
));
let client_keepalive = client.clone();
let shutdown_clone = shutdown_token.clone();
tokio::spawn(async move {
let _ = client_keepalive.heartbeat_task().await;
if let Err(err) = client_keepalive.heartbeat_task(shutdown_clone).await {
log::error!("UDP Gateway heartbeat task error: {err}");
}
});
client
});

10
src/socket_transfer.rs

@ -196,11 +196,15 @@ where
/// Process [`Request`]s received from `socket`
///
/// Panics if called outside of tokio runtime
pub async fn process_socket_requests(socket: &UnixDatagram) -> error::Result<()> {
pub async fn process_socket_requests(socket: &UnixDatagram, shutdown_token: tokio_util::sync::CancellationToken) -> error::Result<()> {
log::info!("socket_transfer: process_socket_requests started");
loop {
let mut buf = [0_u8; REQUEST_BUFFER_SIZE];
let len = socket.recv(&mut buf[..]).await?;
let len = tokio::select! {
_ = shutdown_token.cancelled() => break,
res = socket.recv(&mut buf[..]) => res?,
};
let request: Request = bincode::decode_from_slice(&buf[..len], bincode::config::standard())
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?
@ -239,4 +243,6 @@ pub async fn process_socket_requests(socket: &UnixDatagram) -> error::Result<()>
sendmsg::<()>(socket.as_raw_fd(), &iov, &[cmsg], MsgFlags::empty(), None)?;
}
log::info!("socket_transfer: process_socket_requests exiting");
Ok(())
}

11
src/udpgw.rs

@ -456,9 +456,14 @@ impl UdpGwClient {
}
/// Heartbeat task asynchronous function to periodically check and maintain the active state of the server connection.
pub(crate) async fn heartbeat_task(&self) -> std::io::Result<()> {
pub(crate) async fn heartbeat_task(&self, shutdown_token: tokio_util::sync::CancellationToken) -> std::io::Result<()> {
log::info!("udpgw: heartbeat task started");
loop {
sleep(self.keepalive_time).await;
tokio::select! {
_ = shutdown_token.cancelled() => break,
_ = sleep(self.keepalive_time) => {}
};
let mut streams = Vec::new();
while let Some(stream) = self.pop_server_connection_from_queue().await {
@ -506,6 +511,8 @@ impl UdpGwClient {
}
crate::traffic_status::traffic_status_update(tx, rx)?;
}
log::info!("udpgw: heartbeat task exiting");
Ok(())
}
/// Parses the UDP response data.

Loading…
Cancel
Save