From ec5e5b7983d4cc8041532c06c15f90ae78dee17b Mon Sep 17 00:00:00 2001 From: alexmac6574 <215134852+alexmac6574@users.noreply.github.com> Date: Sun, 5 Apr 2026 17:39:38 +0300 Subject: [PATCH] fix: Clean up the code --- client/main.go | 308 +++++++++++++++++++++---------------------------- server/main.go | 10 +- 2 files changed, 138 insertions(+), 180 deletions(-) diff --git a/client/main.go b/client/main.go index 399de87..0b5e5eb 100644 --- a/client/main.go +++ b/client/main.go @@ -59,13 +59,23 @@ type directListenConfig struct { // Global state trackers var ( - globalClientWGAddr atomic.Value + activeLocalPeer atomic.Value globalCaptchaLockout atomic.Int64 connectedStreams atomic.Int32 globalAppCancel context.CancelFunc handshakeSem = make(chan struct{}, 3) + isDebug bool ) +type UDPPacket struct { + Data []byte + N int +} + +var packetPool = sync.Pool{ + New: func() any { return &UDPPacket{Data: make([]byte, 2048)} }, +} + func newDirectNet() transport.Net { return directNet{} } @@ -491,7 +501,7 @@ func callCaptchaNotRobot(ctx context.Context, sessionToken, hash string, streamI // endregion -// region VK Credentials & Caching Layer +// region VK Credentials Layer type VKCredentials struct { ClientID string @@ -533,11 +543,6 @@ func getCacheID(streamID int) int { return streamID / streamsPerCache } -var ( - vkRequestMu sync.Mutex - globalLastVkFetchTime time.Time -) - func vkDelayRandom(minMs, maxMs int) { ms := minMs + rand.Intn(maxMs-minMs+1) time.Sleep(time.Duration(ms) * time.Millisecond) @@ -628,7 +633,9 @@ func getVkCredsCached(ctx context.Context, link string, streamID int, dialer *dn expires := time.Until(cache.creds.ExpiresAt) u, p, a := cache.creds.Username, cache.creds.Password, cache.creds.ServerAddr cache.mutex.RUnlock() - log.Printf("[STREAM %d] [VK Auth] Using cached credentials (cache=%d, expires in %v)", streamID, cacheID, expires) + if isDebug { + log.Printf("[STREAM %d] [VK Auth] Using cached credentials (cache=%d, expires in %v)", streamID, cacheID, expires) + } return u, p, a, nil } cache.mutex.RUnlock() @@ -636,44 +643,31 @@ func getVkCredsCached(ctx context.Context, link string, streamID int, dialer *dn cache.mutex.Lock() defer cache.mutex.Unlock() - // Double-check + // Double-check inside lock if cache.creds.Link == link && time.Now().Before(cache.creds.ExpiresAt) { - expires := time.Until(cache.creds.ExpiresAt) - log.Printf("[STREAM %d] [VK Auth] Using cached credentials (cache=%d, expires in %v)", streamID, cacheID, expires) return cache.creds.Username, cache.creds.Password, cache.creds.ServerAddr, nil } - log.Printf("[STREAM %d] [VK Auth] Cache miss (cache=%d), starting credential fetch...", streamID, cacheID) - - select { - case <-ctx.Done(): - return "", "", "", ctx.Err() - default: - } - user, pass, addr, err := fetchVkCredsSerialized(ctx, link, streamID, dialer) if err != nil { return "", "", "", err } - cache.creds = TurnCredentials{ - Username: user, - Password: pass, - ServerAddr: addr, - ExpiresAt: time.Now().Add(credentialLifetime - cacheSafetyMargin), - Link: link, - } - - log.Printf("[STREAM %d] [VK Auth] Success! Credentials cached until %v (cache=%d)", streamID, cache.creds.ExpiresAt, cacheID) + cache.creds = TurnCredentials{Username: user, Password: pass, ServerAddr: addr, ExpiresAt: time.Now().Add(credentialLifetime - cacheSafetyMargin), Link: link} return user, pass, addr, nil } +var ( + vkRequestMu sync.Mutex + globalLastVkFetchTime time.Time +) + func fetchVkCredsSerialized(ctx context.Context, link string, streamID int, dialer *dnsdialer.Dialer) (string, string, string, error) { vkRequestMu.Lock() defer vkRequestMu.Unlock() // Ensure a minimum cooldown between credential requests to avoid VK rate limits - minInterval := 10*time.Second + time.Duration(rand.Intn(30000))*time.Millisecond + minInterval := 3*time.Second + time.Duration(rand.Intn(3000))*time.Millisecond elapsed := time.Since(globalLastVkFetchTime) if !globalLastVkFetchTime.IsZero() && elapsed < minInterval { @@ -730,15 +724,15 @@ func fetchVkCreds(ctx context.Context, link string, streamID int, dialer *dnsdia func getTokenChain(ctx context.Context, link string, streamID int, creds VKCredentials, dialer *dnsdialer.Dialer, jar tlsclient.CookieJar) (string, string, string, error) { profile := Profile{ - UserAgent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", - SecChUa: `"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"`, + UserAgent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36", + SecChUa: `"Not(A:Brand";v="99", "Google Chrome";v="146", "Chromium";v="146"`, SecChUaMobile: "?0", SecChUaPlatform: `"Windows"`, } client, err := tlsclient.NewHttpClient(tlsclient.NewNoopLogger(), tlsclient.WithTimeoutSeconds(20), - tlsclient.WithClientProfile(profiles.Chrome_120), + tlsclient.WithClientProfile(profiles.Chrome_146), tlsclient.WithCookieJar(jar), tlsclient.WithDialer(getCustomNetDialer()), ) @@ -808,15 +802,15 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede return "", "", "", fmt.Errorf("missing access_token in response: %v", resp) } - vkDelayRandom(100, 200) + vkDelayRandom(100, 150) // Token 1 -> getCallPreview data = fmt.Sprintf("vk_join_link=https://vk.com/call/join/%s&fields=photo_200&access_token=%s", link, token1) _, _ = doRequest(data, "https://api.vk.ru/method/calls.getCallPreview?v=5.275&client_id="+creds.ClientID) - vkDelayRandom(500, 1000) + vkDelayRandom(200, 400) - // Token 2 (with 2 auto attempts + 1 manual fallback) + // Token 2 data = fmt.Sprintf("vk_join_link=https://vk.com/call/join/%s&name=%s&access_token=%s", link, escapedName, token1) urlAddr := fmt.Sprintf("https://api.vk.ru/method/calls.getAnonymousToken?v=5.275&client_id=%s", creds.ClientID) @@ -945,7 +939,7 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede break } - vkDelayRandom(100, 200) + vkDelayRandom(100, 150) // Token 3 sessionData := fmt.Sprintf(`{"version":2,"device_id":"%s","client_version":1.1,"client_type":"SDK_JS"}`, uuid.New()) @@ -956,7 +950,7 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede } token3 := resp["session_key"].(string) - vkDelayRandom(100, 200) + vkDelayRandom(100, 150) // Token 4 -> TURN Creds data = fmt.Sprintf("joinLink=%s&isVideo=false&protocolVersion=5&capabilities=2F7F&anonymToken=%s&method=vchat.joinConversationByLink&format=JSON&application_key=CGMMEJLGDIHBABABA&session_key=%s", link, token2, token3) @@ -974,15 +968,12 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede clean := strings.Split(urlStr, "?")[0] address := strings.TrimPrefix(strings.TrimPrefix(clean, "turn:"), "turns:") - vkDelayRandom(4000, 5000) - return user, pass, address, nil } // endregion func getYandexCreds(link string) (string, string, string, error) { - const debug = false const telemostConfHost = "cloud-api.yandex.ru" telemostConfPath := fmt.Sprintf("%s%s%s", "/telemost_front/v2/telemost/conferences/https%3A%2F%2Ftelemost.yandex.ru%2Fj%2F", link, "/connection?next_gen_media_platform_allowed=false") @@ -1225,7 +1216,7 @@ func getYandexCreds(link string) (string, string, string, error) { }, } - if debug { + if isDebug { b, _ := json.MarshalIndent(req1, "", " ") log.Printf("Sending HELLO:\n%s", string(b)) } @@ -1243,7 +1234,7 @@ func getYandexCreds(link string) (string, string, string, error) { if err != nil { return "", "", "", fmt.Errorf("ws read: %w", err) } - if debug { + if isDebug { s := string(msg) if len(s) > 800 { s = s[:800] + "...(truncated)" @@ -1310,14 +1301,12 @@ func dtlsFunc(ctx context.Context, conn net.PacketConn, peer *net.UDPAddr) (net. return dtlsConn, nil } -func oneDtlsConnection(ctx context.Context, peer *net.UDPAddr, listenConn net.PacketConn, connchan chan<- net.PacketConn, okchan chan<- struct{}, c chan<- error) { +func oneDtlsConnection(ctx context.Context, peer *net.UDPAddr, listenConn net.PacketConn, inboundChan <-chan *UDPPacket, connchan chan<- net.PacketConn, okchan chan<- struct{}, streamID int) error { time.Sleep(time.Duration(rand.Intn(400)+100) * time.Millisecond) - var err error = nil - defer func() { c <- err }() dtlsctx, dtlscancel := context.WithCancel(ctx) defer dtlscancel() - var conn1, conn2 net.PacketConn - conn1, conn2 = connutil.AsyncPacketPipe() + + conn1, conn2 := connutil.AsyncPacketPipe() go func() { for { select { @@ -1329,17 +1318,15 @@ func oneDtlsConnection(ctx context.Context, peer *net.UDPAddr, listenConn net.Pa }() dtlsConn, err1 := dtlsFunc(dtlsctx, conn1, peer) if err1 != nil { - err = fmt.Errorf("failed to connect DTLS: %s", err1) - return + return fmt.Errorf("failed to connect DTLS: %s", err1) } defer func() { if closeErr := dtlsConn.Close(); closeErr != nil { - err = fmt.Errorf("failed to close DTLS connection: %s", closeErr) - return + log.Printf("[STREAM %d] failed to close DTLS connection: %s", streamID, closeErr) } - log.Printf("Closed DTLS connection\n") + log.Printf("[STREAM %d] Closed DTLS connection\n", streamID) }() - log.Printf("Established DTLS connection!\n") + log.Printf("[STREAM %d] Established DTLS connection!\n", streamID) if okchan != nil { go func() { @@ -1351,38 +1338,20 @@ func oneDtlsConnection(ctx context.Context, peer *net.UDPAddr, listenConn net.Pa } wg := sync.WaitGroup{} - wg.Add(2) + wg.Add(1) context.AfterFunc(dtlsctx, func() { - if err := listenConn.SetDeadline(time.Now()); err != nil { - log.Printf("Failed to set listener deadline: %s", err) - } - if err := dtlsConn.SetDeadline(time.Now()); err != nil { - log.Printf("Failed to set DTLS deadline: %s", err) - } + _ = dtlsConn.SetDeadline(time.Now()) }) go func() { - defer wg.Done() defer dtlscancel() - buf := make([]byte, 1600) for { select { case <-dtlsctx.Done(): return - default: - } - n, addr1, err1 := listenConn.ReadFrom(buf) - if err1 != nil { - log.Printf("Failed: %s", err1) - return - } - - globalClientWGAddr.Store(addr1) - - _, err1 = dtlsConn.Write(buf[:n]) - if err1 != nil { - log.Printf("Failed: %s", err1) - return + case pkt := <-inboundChan: + _, _ = dtlsConn.Write(pkt.Data[:pkt.N]) + packetPool.Put(pkt) } } }() @@ -1392,37 +1361,23 @@ func oneDtlsConnection(ctx context.Context, peer *net.UDPAddr, listenConn net.Pa defer dtlscancel() buf := make([]byte, 1600) for { - select { - case <-dtlsctx.Done(): - return - default: - } n, err1 := dtlsConn.Read(buf) if err1 != nil { - log.Printf("Failed: %s", err1) return } - addr1, ok := globalClientWGAddr.Load().(net.Addr) - if !ok { - continue - } - - _, err1 = listenConn.WriteTo(buf[:n], addr1) - if err1 != nil { - log.Printf("Failed: %s", err1) - return + // Send back to the active WG client + if peerAddr := activeLocalPeer.Load(); peerAddr != nil { + _, _ = listenConn.WriteTo(buf[:n], peerAddr.(net.Addr)) } } }() wg.Wait() - if err := listenConn.SetDeadline(time.Time{}); err != nil { - log.Printf("Failed to clear listener deadline: %s", err) - } if err := dtlsConn.SetDeadline(time.Time{}); err != nil { - log.Printf("Failed to clear DTLS deadline: %s", err) + log.Printf("[STREAM %d] Failed to clear DTLS deadline: %s", streamID, err) } + return nil } type connectedUDPConn struct { @@ -1542,6 +1497,9 @@ func oneTurnConnection(ctx context.Context, turnParams *turnParams, peer *net.UD return } + // Reset error count on successful allocation + getStreamCache(streamID).errorCount.Store(0) + // Safely track active streams globally connectedStreams.Add(1) defer func() { @@ -1551,37 +1509,33 @@ func oneTurnConnection(ctx context.Context, turnParams *turnParams, peer *net.UD } }() - // Reset error count on successful allocation - getStreamCache(streamID).errorCount.Store(0) - - log.Printf("[STREAM %d] relayed-address=%s", streamID, relayConn.LocalAddr().String()) + if isDebug { + log.Printf("[STREAM %d] relayed-address=%s", streamID, relayConn.LocalAddr().String()) + } wg := sync.WaitGroup{} - wg.Add(2) - turnctx, turncancel := context.WithCancel(context.Background()) + wg.Add(1) + turnctx, turncancel := context.WithCancel(ctx) context.AfterFunc(turnctx, func() { if err := relayConn.SetDeadline(time.Now()); err != nil { log.Printf("Failed to set relay deadline: %s", err) } - if err := conn2.SetDeadline(time.Now()); err != nil { - log.Printf("Failed to set upstream deadline: %s", err) - } + // Do not set conn2 deadline (conn2 can sometimes be listenConn if direct mode is used) }) var internalPipeAddr atomic.Value go func() { - defer wg.Done() defer turncancel() buf := make([]byte, 1600) for { - select { - case <-turnctx.Done(): + if turnctx.Err() != nil { return - default: } n, addr1, err1 := conn2.ReadFrom(buf) if err1 != nil { - log.Printf("Failed: %s", err1) + return + } + if turnctx.Err() != nil { return } @@ -1589,7 +1543,6 @@ func oneTurnConnection(ctx context.Context, turnParams *turnParams, peer *net.UD _, err1 = relayConn.WriteTo(buf[:n], peer) if err1 != nil { - log.Printf("Failed: %s", err1) return } } @@ -1600,25 +1553,17 @@ func oneTurnConnection(ctx context.Context, turnParams *turnParams, peer *net.UD defer turncancel() buf := make([]byte, 1600) for { - select { - case <-turnctx.Done(): - return - default: - } n, _, err1 := relayConn.ReadFrom(buf) if err1 != nil { - log.Printf("Failed: %s", err1) return } - addr1, ok := internalPipeAddr.Load().(net.Addr) - if !ok { - log.Printf("Failed: no listener ip") - return + addr1 := internalPipeAddr.Load() + if addr1 == nil { + continue } - _, err1 = conn2.WriteTo(buf[:n], addr1) + _, err1 = conn2.WriteTo(buf[:n], addr1.(net.Addr)) if err1 != nil { - log.Printf("Failed: %s", err1) return } } @@ -1628,26 +1573,19 @@ func oneTurnConnection(ctx context.Context, turnParams *turnParams, peer *net.UD if err := relayConn.SetDeadline(time.Time{}); err != nil { log.Printf("Failed to clear relay deadline: %s", err) } - if err := conn2.SetDeadline(time.Time{}); err != nil { - log.Printf("Failed to clear upstream deadline: %s", err) - } } -func oneDtlsConnectionLoop(ctx context.Context, peer *net.UDPAddr, listenConnChan <-chan net.PacketConn, connchan chan<- net.PacketConn, okchan chan<- struct{}) { +func oneDtlsConnectionLoop(ctx context.Context, peer *net.UDPAddr, listenConn net.PacketConn, inboundChan <-chan *UDPPacket, connchan chan<- net.PacketConn, okchan chan<- struct{}, streamID int) { for { select { case <-ctx.Done(): return - case listenConn := <-listenConnChan: - c := make(chan error) - go oneDtlsConnection(ctx, peer, listenConn, connchan, okchan, c) - if err := <-c; err != nil { - // Suppress DTLS handshake timeout logs while a captcha lockout is active + default: + err := oneDtlsConnection(ctx, peer, listenConn, inboundChan, connchan, okchan, streamID) + if err != nil { if time.Now().Unix() < globalCaptchaLockout.Load() && strings.Contains(err.Error(), "context deadline exceeded") { continue } - log.Printf("[DTLS] Handshake failed, retrying in background: %v", err) - select { case <-ctx.Done(): return @@ -1681,10 +1619,8 @@ func oneTurnConnectionLoop(ctx context.Context, turnParams *turnParams, peer *ne return } if strings.Contains(err.Error(), "CAPTCHA_WAIT_REQUIRED") { - // Only log the backoff message once (the stream that triggered it) - // For subsequently awoken streams: calculate exact remaining sleep duration and sleep silently if !strings.Contains(err.Error(), "global lockout active") { - log.Printf("[STREAM %d] !!! VK DEMANDS SLIDER CAPTCHA. Backing off for 60 seconds to avoid IP ban...", streamID) + log.Printf("[STREAM %d] Backing off for 60 seconds to avoid IP ban...", streamID) select { case <-ctx.Done(): return @@ -1737,6 +1673,7 @@ func main() { n := flag.Int("n", 0, "connections to TURN (default 10 for VK, 1 for Yandex)") udp := flag.Bool("udp", false, "connect to TURN with UDP") direct := flag.Bool("no-dtls", false, "connect without obfuscation. DO NOT USE") + debugFlag := flag.Bool("debug", false, "enable debug logging") flag.Parse() if *peerAddr == "" { log.Panicf("Need peer address!") @@ -1749,6 +1686,8 @@ func main() { log.Panicf("Need either vk-link or yandex-link!") } + isDebug = *debugFlag + var link string var getCreds getCredsFunc if *vklink != "" { @@ -1789,69 +1728,86 @@ func main() { getCreds: getCreds, } - listenConnChan := make(chan net.PacketConn) listenConn, err := net.ListenPacket("udp", *listen) if err != nil { log.Panicf("Failed to listen: %s", err) } context.AfterFunc(ctx, func() { if closeErr := listenConn.Close(); closeErr != nil { - log.Panicf("Failed to close local connection: %s", closeErr) + log.Printf("Failed to close local connection: %s", closeErr) } }) + + numStreams := *n + if numStreams <= 0 { + numStreams = 1 + } + + // Shared Worker Pool Queue for Aggregation + inboundChan := make(chan *UDPPacket, 2000) + go func() { for { - select { - case <-ctx.Done(): + pkt := packetPool.Get().(*UDPPacket) + nRead, addr, err := listenConn.ReadFrom(pkt.Data) + if err != nil { return - case listenConnChan <- listenConn: + } + + // Save the local WireGuard peer address + current := activeLocalPeer.Load() + if current == nil || current.(net.Addr).String() != addr.String() { + activeLocalPeer.Store(addr) + } + + pkt.N = nRead + + select { + case inboundChan <- pkt: + default: + // Drop the packet only if the global queue is completely full + packetPool.Put(pkt) } } }() wg1 := sync.WaitGroup{} t := time.Tick(200 * time.Millisecond) + if *direct { - for i := 0; i < *n; i++ { - wg1.Add(1) - go func(streamID int) { - defer wg1.Done() - oneTurnConnectionLoop(ctx, params, peer, listenConnChan, t, streamID) - }(i) - } - } else { - okchan := make(chan struct{}) - connchan := make(chan net.PacketConn) + log.Panicf("Direct mode not supported with dispatcher") + } + + okchan := make(chan struct{}) + connchan := make(chan net.PacketConn) + wg1.Add(1) + go func() { + defer wg1.Done() + oneDtlsConnectionLoop(ctx, peer, listenConn, inboundChan, connchan, okchan, 1) + }() + wg1.Add(1) + go func() { + defer wg1.Done() + oneTurnConnectionLoop(ctx, params, peer, connchan, t, 1) + }() + select { + case <-okchan: + case <-ctx.Done(): + } + + for i := 1; i < numStreams; i++ { + cchan := make(chan net.PacketConn) wg1.Add(1) - go func() { + go func(streamID int) { defer wg1.Done() - oneDtlsConnectionLoop(ctx, peer, listenConnChan, connchan, okchan) - }() - + oneDtlsConnectionLoop(ctx, peer, listenConn, inboundChan, cchan, nil, streamID) + }(i) wg1.Add(1) - go func() { + go func(streamID int) { defer wg1.Done() - oneTurnConnectionLoop(ctx, params, peer, connchan, t, 0) - }() - - select { - case <-okchan: - case <-ctx.Done(): - } - for i := 0; i < *n-1; i++ { - connchan := make(chan net.PacketConn) - wg1.Add(1) - go func() { - defer wg1.Done() - oneDtlsConnectionLoop(ctx, peer, listenConnChan, connchan, nil) - }() - wg1.Add(1) - go func(streamID int) { - defer wg1.Done() - oneTurnConnectionLoop(ctx, params, peer, connchan, t, streamID) - }(i + 1) - } + oneTurnConnectionLoop(ctx, params, peer, cchan, t, streamID) + }(i) } wg1.Wait() diff --git a/server/main.go b/server/main.go index 819f9b5..effce98 100644 --- a/server/main.go +++ b/server/main.go @@ -17,6 +17,8 @@ import ( "github.com/pion/dtls/v3/pkg/crypto/selfsign" ) +const idleTimeout = 2 * time.Minute + func main() { listen := flag.String("listen", "0.0.0.0:56000", "listen on ip:port") connect := flag.String("connect", "", "connect to ip:port") @@ -150,7 +152,7 @@ func main() { return default: } - if err1 := conn.SetReadDeadline(time.Now().Add(time.Minute * 30)); err1 != nil { + if err1 := conn.SetReadDeadline(time.Now().Add(idleTimeout)); err1 != nil { log.Printf("Failed: %s", err1) return } @@ -160,7 +162,7 @@ func main() { return } - if err1 := serverConn.SetWriteDeadline(time.Now().Add(time.Minute * 30)); err1 != nil { + if err1 := serverConn.SetWriteDeadline(time.Now().Add(idleTimeout)); err1 != nil { log.Printf("Failed: %s", err1) return } @@ -181,7 +183,7 @@ func main() { return default: } - if err1 := serverConn.SetReadDeadline(time.Now().Add(time.Minute * 30)); err1 != nil { + if err1 := serverConn.SetReadDeadline(time.Now().Add(idleTimeout)); err1 != nil { log.Printf("Failed: %s", err1) return } @@ -191,7 +193,7 @@ func main() { return } - if err1 := conn.SetWriteDeadline(time.Now().Add(time.Minute * 30)); err1 != nil { + if err1 := conn.SetWriteDeadline(time.Now().Add(idleTimeout)); err1 != nil { log.Printf("Failed: %s", err1) return }