From 5c22bcd43630354e9509a49db7b8e1a27da53f31 Mon Sep 17 00:00:00 2001 From: ssrlive <30760636+ssrlive@users.noreply.github.com> Date: Mon, 29 Dec 2025 16:56:25 +0800 Subject: [PATCH] Add graceful shutdown to heartbeat and socket handlers - Add CancellationToken parameter to `UdpGwClient::heartbeat_task` and make sleep cancellation-aware - Update `process_socket_requests` to accept a CancellationToken and exit when cancelled - Pass shutdown token into spawned socket and heartbeat tasks and log errors from them - Add start/exit log messages for heartbeat and socket request handlers --- src/bin/main.rs | 8 ++++++-- src/lib.rs | 5 ++++- src/socket_transfer.rs | 10 ++++++++-- src/udpgw.rs | 11 +++++++++-- 4 files changed, 27 insertions(+), 7 deletions(-) diff --git a/src/bin/main.rs b/src/bin/main.rs index bc85f04..3269452 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -111,7 +111,7 @@ async fn main_async(args: Args) -> Result<(), BoxError> { #[cfg(target_os = "linux")] async fn namespace_proxy_main( _args: Args, - _shutdown_token: tokio_util::sync::CancellationToken, + shutdown_token: tokio_util::sync::CancellationToken, ) -> Result { use nix::fcntl::{OFlag, open}; use nix::sys::stat::Mode; @@ -153,7 +153,11 @@ async fn namespace_proxy_main( log::info!("Writing unshare pid to {pidfile}"); std::fs::write(pidfile, unshare_pid.to_string()).ok(); } - tokio::spawn(async move { tun2proxy::socket_transfer::process_socket_requests(&socket).await }); + tokio::spawn(async move { + if let Err(err) = tun2proxy::socket_transfer::process_socket_requests(&socket, shutdown_token).await { + log::error!("namespace proxy socket request handler error: {err}"); + } + }); Ok(child.wait().await?) } diff --git a/src/lib.rs b/src/lib.rs index f770c27..4529840 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -253,8 +253,11 @@ where addr, )); let client_keepalive = client.clone(); + let shutdown_clone = shutdown_token.clone(); tokio::spawn(async move { - let _ = client_keepalive.heartbeat_task().await; + if let Err(err) = client_keepalive.heartbeat_task(shutdown_clone).await { + log::error!("UDP Gateway heartbeat task error: {err}"); + } }); client }); diff --git a/src/socket_transfer.rs b/src/socket_transfer.rs index 4c81da7..c6f686b 100644 --- a/src/socket_transfer.rs +++ b/src/socket_transfer.rs @@ -196,11 +196,15 @@ where /// Process [`Request`]s received from `socket` /// /// Panics if called outside of tokio runtime -pub async fn process_socket_requests(socket: &UnixDatagram) -> error::Result<()> { +pub async fn process_socket_requests(socket: &UnixDatagram, shutdown_token: tokio_util::sync::CancellationToken) -> error::Result<()> { + log::info!("socket_transfer: process_socket_requests started"); loop { let mut buf = [0_u8; REQUEST_BUFFER_SIZE]; - let len = socket.recv(&mut buf[..]).await?; + let len = tokio::select! { + _ = shutdown_token.cancelled() => break, + res = socket.recv(&mut buf[..]) => res?, + }; let request: Request = bincode::decode_from_slice(&buf[..len], bincode::config::standard()) .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))? @@ -239,4 +243,6 @@ pub async fn process_socket_requests(socket: &UnixDatagram) -> error::Result<()> sendmsg::<()>(socket.as_raw_fd(), &iov, &[cmsg], MsgFlags::empty(), None)?; } + log::info!("socket_transfer: process_socket_requests exiting"); + Ok(()) } diff --git a/src/udpgw.rs b/src/udpgw.rs index 24edaad..258cfbd 100644 --- a/src/udpgw.rs +++ b/src/udpgw.rs @@ -456,9 +456,14 @@ impl UdpGwClient { } /// Heartbeat task asynchronous function to periodically check and maintain the active state of the server connection. - pub(crate) async fn heartbeat_task(&self) -> std::io::Result<()> { + pub(crate) async fn heartbeat_task(&self, shutdown_token: tokio_util::sync::CancellationToken) -> std::io::Result<()> { + log::info!("udpgw: heartbeat task started"); loop { - sleep(self.keepalive_time).await; + tokio::select! { + _ = shutdown_token.cancelled() => break, + _ = sleep(self.keepalive_time) => {} + }; + let mut streams = Vec::new(); while let Some(stream) = self.pop_server_connection_from_queue().await { @@ -506,6 +511,8 @@ impl UdpGwClient { } crate::traffic_status::traffic_status_update(tx, rx)?; } + log::info!("udpgw: heartbeat task exiting"); + Ok(()) } /// Parses the UDP response data.