From 43bb1cad2221b8761eac42b9a2ceb439a3856a8d Mon Sep 17 00:00:00 2001 From: samosvalishe Date: Tue, 21 Apr 2026 21:24:50 +0300 Subject: [PATCH] feat(turn): distribute streams across TURN URLs and add TCP conn diagnostics - Pick turn URL by streamID % len(urls) instead of always urlsRaw[0] - Add countingConn to track bytes written/read for TCP TURN connections - Add classifyNetErr helper for structured error categorization - Log TCP dial failures always; verbose logs gated behind isDebug --- client/main.go | 84 +++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 76 insertions(+), 8 deletions(-) diff --git a/client/main.go b/client/main.go index d4ad403..5becb55 100644 --- a/client/main.go +++ b/client/main.go @@ -11,6 +11,7 @@ import ( "encoding/base64" "encoding/hex" "encoding/json" + "errors" "flag" "fmt" "io" @@ -1075,10 +1076,16 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede if !ok || len(urlsRaw) == 0 { return "", "", "", fmt.Errorf("missing or empty urls in turn_server") } - log.Printf("[STREAM %d] [VK Auth] turn_server urls: %v", streamID, urlsRaw) - urlStr, ok := urlsRaw[0].(string) + if isDebug { + log.Printf("[STREAM %d] [VK Auth] turn_server urls: %v", streamID, urlsRaw) + } + urlIdx := streamID % len(urlsRaw) + urlStr, ok := urlsRaw[urlIdx].(string) if !ok { - return "", "", "", fmt.Errorf("turn server url is not a string") + return "", "", "", fmt.Errorf("turn server url[%d] is not a string", urlIdx) + } + if isDebug { + log.Printf("[STREAM %d] [VK Auth] picked turn url[%d]: %s", streamID, urlIdx, urlStr) } clean := strings.Split(urlStr, "?")[0] @@ -1525,6 +1532,58 @@ func (c *connectedUDPConn) WriteTo(p []byte, _ net.Addr) (int, error) { return c.Write(p) } +type countingConn struct { + net.Conn + written atomic.Int64 + read atomic.Int64 +} + +func (c *countingConn) Read(p []byte) (int, error) { + n, err := c.Conn.Read(p) + if n > 0 { + c.read.Add(int64(n)) + } + return n, err +} + +func (c *countingConn) Write(p []byte) (int, error) { + n, err := c.Conn.Write(p) + if n > 0 { + c.written.Add(int64(n)) + } + return n, err +} + +func classifyNetErr(err error) string { + if err == nil { + return "nil" + } + if errors.Is(err, context.DeadlineExceeded) { + return "ctx-deadline" + } + if errors.Is(err, io.EOF) { + return "eof" + } + if errors.Is(err, syscall.ECONNRESET) { + return "rst" + } + if errors.Is(err, syscall.ECONNREFUSED) { + return "refused" + } + if errors.Is(err, syscall.EPIPE) { + return "broken-pipe" + } + var ne net.Error + if errors.As(err, &ne) && ne.Timeout() { + return "net-timeout" + } + var oe *net.OpError + if errors.As(err, &oe) { + return "op:" + oe.Op + } + return "other" +} + type turnParams struct { host string port string @@ -1575,25 +1634,34 @@ func oneTurnConnection(ctx context.Context, turnParams *turnParams, peer *net.UD return } defer func() { - if err1 = conn.Close(); err1 != nil { + if err1 = conn.Close(); err1 != nil && err == nil { err = fmt.Errorf("failed to close TURN server connection: %s", err1) - return } }() turnConn = &connectedUDPConn{conn} } else { conn, err2 := d.DialContext(ctx1, "tcp", turnServerAddr) if err2 != nil { + log.Printf("[STREAM %d] [TURN] tcp dial %s failed: class=%s err=%v", + streamID, turnServerAddr, classifyNetErr(err2), err2) err = fmt.Errorf("failed to connect to TURN server: %s", err2) return } + if isDebug { + log.Printf("[STREAM %d] [TURN] tcp established %s -> %s", + streamID, conn.LocalAddr(), conn.RemoteAddr()) + } + cc := &countingConn{Conn: conn} defer func() { - if err1 = conn.Close(); err1 != nil { + if err != nil && isDebug { + log.Printf("[STREAM %d] [TURN] tcp closing after fail: written=%d read=%d", + streamID, cc.written.Load(), cc.read.Load()) + } + if err1 = conn.Close(); err1 != nil && err == nil { err = fmt.Errorf("failed to close TURN server connection: %s", err1) - return } }() - turnConn = turn.NewSTUNConn(conn) + turnConn = turn.NewSTUNConn(cc) } var addrFamily turn.RequestedAddressFamily if peer.IP.To4() != nil {