diff --git a/client/main.go b/client/main.go index a8dbfce..3d14142 100644 --- a/client/main.go +++ b/client/main.go @@ -67,6 +67,8 @@ var ( isDebug bool manualCaptcha bool autoCaptchaSliderPOC bool + allocsPerStream int + udpMode bool ) type captchaSolveMode int @@ -621,12 +623,12 @@ type VKCredentials struct { ClientSecret string } +// Only client_ids that currently expose calls.getAnonymousToken. +// VKVIDEO_* and VK_ID_AUTH_APP started returning error_code:3 "Unknown method" +// (observed 2026-04-28) and only burn throttle budget if kept in rotation. var vkCredentialsList = []VKCredentials{ - {ClientID: "6287487", ClientSecret: "QbYic1K3lEV5kTGiqlq2"}, // VK_WEB_APP_ID - {ClientID: "7879029", ClientSecret: "aR5NKGmm03GYrCiNKsaw"}, // VK_MVK_APP_ID - {ClientID: "52461373", ClientSecret: "o557NLIkAErNhakXrQ7A"}, // VK_WEB_VKVIDEO_APP_ID - {ClientID: "52649896", ClientSecret: "WStp4ihWG4l3nmXZgIbC"}, // VK_MVK_VKVIDEO_APP_ID - {ClientID: "51781872", ClientSecret: "IjjCNl4L4Tf5QZEXIHKK"}, // VK_ID_AUTH_APP + {ClientID: "6287487", ClientSecret: "QbYic1K3lEV5kTGiqlq2"}, // VK_WEB_APP_ID + {ClientID: "7879029", ClientSecret: "aR5NKGmm03GYrCiNKsaw"}, // VK_MVK_APP_ID } type TurnCredentials struct { @@ -649,7 +651,10 @@ const ( cacheSafetyMargin = 60 * time.Second maxCacheErrors = 3 errorWindow = 10 * time.Second - streamsPerCache = 10 + // streamsPerCache=1: each stream caches its own slot creds because + // acquireVkTurnSlot mints a unique (username, password) per call. + streamsPerCache = 1 + identityLifetime = 8 * time.Minute ) func getCacheID(streamID int) int { @@ -761,7 +766,7 @@ func getVkCredsCached(ctx context.Context, link string, streamID int) (string, s return cache.creds.Username, cache.creds.Password, cache.creds.ServerAddr, nil } - user, pass, addr, err := fetchVkCredsSerialized(ctx, link, streamID) + user, pass, addr, err := fetchVkCreds(ctx, link, streamID) if err != nil { return "", "", "", err } @@ -775,58 +780,111 @@ var ( globalLastVkFetchTime time.Time ) -func fetchVkCredsSerialized(ctx context.Context, link string, streamID int) (string, string, string, error) { - vkRequestMu.Lock() - defer vkRequestMu.Unlock() +// vkIdentity caches the captcha-gated portion of a VK auth chain (steps 1-3: +// anonym_token + getCallPreview + getAnonymousToken). Once acquired it can be +// replayed via acquireVkTurnSlot to mint independent TURN credentials, each +// with a unique username — bypassing per-username throttling at the cost of a +// single captcha solve per (link, client_id) pair. +type vkIdentity struct { + creds VKCredentials + profile Profile + name string + token1 string + token2 string + client tlsclient.HttpClient + expiresAt time.Time + urlCounter atomic.Uint64 // round-robin index across turn_server.urls +} + +type identityCacheKey struct { + link string + clientID string +} - // Ensure a minimum cooldown between credential requests to avoid VK rate limits - minInterval := 3*time.Second + time.Duration(rand.Intn(3000))*time.Millisecond - elapsed := time.Since(globalLastVkFetchTime) +type identityEntry struct { + mu sync.Mutex + ident *vkIdentity +} - if !globalLastVkFetchTime.IsZero() && elapsed < minInterval { - wait := minInterval - elapsed - log.Printf("[STREAM %d] [VK Auth] Throttling: waiting %v to prevent rate limit...", streamID, wait.Truncate(time.Millisecond)) - select { - case <-ctx.Done(): - return "", "", "", ctx.Err() - case <-time.After(wait): - } +var identityStore = struct { + mu sync.Mutex + m map[identityCacheKey]*identityEntry +}{m: make(map[identityCacheKey]*identityEntry)} + +func getOrAcquireIdentity(ctx context.Context, link string, streamID int, creds VKCredentials) (*vkIdentity, error) { + key := identityCacheKey{link: link, clientID: creds.ClientID} + + identityStore.mu.Lock() + entry, ok := identityStore.m[key] + if !ok { + entry = &identityEntry{} + identityStore.m[key] = entry } + identityStore.mu.Unlock() - defer func() { - globalLastVkFetchTime = time.Now() - }() + entry.mu.Lock() + defer entry.mu.Unlock() + + if entry.ident != nil && time.Now().Before(entry.ident.expiresAt) { + return entry.ident, nil + } - return fetchVkCreds(ctx, link, streamID) + ident, err := acquireVkIdentity(ctx, link, streamID, creds) + if err != nil { + return nil, err + } + entry.ident = ident + return ident, nil +} + +func invalidateIdentity(link, clientID string) { + identityStore.mu.Lock() + entry, ok := identityStore.m[identityCacheKey{link: link, clientID: clientID}] + identityStore.mu.Unlock() + if !ok { + return + } + entry.mu.Lock() + entry.ident = nil + entry.mu.Unlock() } func fetchVkCreds(ctx context.Context, link string, streamID int) (string, string, string, error) { - // Check Global Lockout to prevent API bans if time.Now().Unix() < globalCaptchaLockout.Load() { return "", "", "", fmt.Errorf("CAPTCHA_WAIT_REQUIRED: global lockout active") } - var lastErr error - jar := tlsclient.NewCookieJar() + n := len(vkCredentialsList) + startIdx := streamID % n - for _, creds := range vkCredentialsList { + var lastErr error + for offset := 0; offset < n; offset++ { + creds := vkCredentialsList[(startIdx+offset)%n] log.Printf("[STREAM %d] [VK Auth] Trying credentials: client_id=%s", streamID, creds.ClientID) - user, pass, addr, err := getTokenChain(ctx, link, streamID, creds, jar) + ident, err := getOrAcquireIdentity(ctx, link, streamID, creds) + if err != nil { + lastErr = err + log.Printf("[STREAM %d] [VK Auth] identity acquire failed (client_id=%s): %v", streamID, creds.ClientID, err) + if strings.Contains(err.Error(), "CAPTCHA_WAIT_REQUIRED") || strings.Contains(err.Error(), "FATAL_CAPTCHA") { + return "", "", "", err + } + continue + } + user, pass, addr, err := acquireVkTurnSlot(ctx, link, streamID, ident) if err == nil { log.Printf("[STREAM %d] [VK Auth] Success with client_id=%s", streamID, creds.ClientID) return user, pass, addr, nil } lastErr = err - log.Printf("[STREAM %d] [VK Auth] Failed with client_id=%s: %v", streamID, creds.ClientID, err) + log.Printf("[STREAM %d] [VK Auth] slot acquire failed (client_id=%s): %v", streamID, creds.ClientID, err) + invalidateIdentity(link, creds.ClientID) - // Hard abort on captcha/fatal conditions instead of trying next creds if strings.Contains(err.Error(), "CAPTCHA_WAIT_REQUIRED") || strings.Contains(err.Error(), "FATAL_CAPTCHA") { return "", "", "", err } - if strings.Contains(err.Error(), "error_code:29") || strings.Contains(err.Error(), "error_code: 29") || strings.Contains(err.Error(), "Rate limit") { log.Printf("[STREAM %d] [VK Auth] Rate limit detected, trying next credentials...", streamID) } @@ -835,7 +893,79 @@ func fetchVkCreds(ctx context.Context, link string, streamID int) (string, strin return "", "", "", fmt.Errorf("all VK credentials failed: %w", lastErr) } -func getTokenChain(ctx context.Context, link string, streamID int, creds VKCredentials, jar tlsclient.CookieJar) (string, string, string, error) { +func vkDoRequest(ctx context.Context, client tlsclient.HttpClient, profile Profile, data, url string) (map[string]interface{}, error) { + parsedURL, err := neturl.Parse(url) + if err != nil { + return nil, fmt.Errorf("parse request URL: %w", err) + } + domain := parsedURL.Hostname() + + req, err := fhttp.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer([]byte(data))) + if err != nil { + return nil, err + } + + req.Host = domain + applyBrowserProfileFhttp(req, profile) + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + req.Header.Set("Accept", "*/*") + req.Header.Set("Origin", "https://vk.ru") + req.Header.Set("Referer", "https://vk.ru/") + req.Header.Set("Sec-Fetch-Site", "same-site") + req.Header.Set("Sec-Fetch-Mode", "cors") + req.Header.Set("Sec-Fetch-Dest", "empty") + req.Header.Set("Priority", "u=1, i") + + httpResp, err := client.Do(req) + if err != nil { + return nil, err + } + defer func() { + if closeErr := httpResp.Body.Close(); closeErr != nil { + log.Printf("close response body: %s", closeErr) + } + }() + + body, err := io.ReadAll(httpResp.Body) + if err != nil { + return nil, err + } + + var resp map[string]interface{} + if err := json.Unmarshal(body, &resp); err != nil { + return nil, err + } + return resp, nil +} + +// acquireVkIdentity runs the heavy + captcha-gated portion of the VK auth chain +// (steps 1-3: get_anonym_token, calls.getCallPreview, calls.getAnonymousToken). +// The result is cached and reused across many TURN slot acquisitions. +// +// Globally serialised via vkRequestMu + 3-6s cooldown to avoid VK API bans. +func acquireVkIdentity(ctx context.Context, link string, streamID int, creds VKCredentials) (*vkIdentity, error) { + vkRequestMu.Lock() + defer vkRequestMu.Unlock() + + minInterval := 3*time.Second + time.Duration(rand.Intn(3000))*time.Millisecond + elapsed := time.Since(globalLastVkFetchTime) + if !globalLastVkFetchTime.IsZero() && elapsed < minInterval { + wait := minInterval - elapsed + log.Printf("[STREAM %d] [VK Auth] Throttling: waiting %v to prevent rate limit...", streamID, wait.Truncate(time.Millisecond)) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(wait): + } + } + defer func() { + globalLastVkFetchTime = time.Now() + }() + + if time.Now().Unix() < globalCaptchaLockout.Load() { + return nil, fmt.Errorf("CAPTCHA_WAIT_REQUIRED: global lockout active") + } + profile := Profile{ UserAgent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36", SecChUa: `"Not(A:Brand";v="99", "Google Chrome";v="146", "Chromium";v="146"`, @@ -843,6 +973,7 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede SecChUaPlatform: `"Windows"`, } + jar := tlsclient.NewCookieJar() client, err := tlsclient.NewHttpClient(tlsclient.NewNoopLogger(), tlsclient.WithTimeoutSeconds(20), tlsclient.WithClientProfile(profiles.Chrome_146), @@ -850,94 +981,58 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede tlsclient.WithDialer(appDialer()), ) if err != nil { - return "", "", "", fmt.Errorf("failed to initialize tls_client: %w", err) + return nil, fmt.Errorf("failed to initialize tls_client: %w", err) } name := generateName() escapedName := neturl.QueryEscape(name) - log.Printf("[STREAM %d] [VK Auth] Connecting Identity - Name: %s | User-Agent: %s", streamID, name, profile.UserAgent) + log.Printf("[STREAM %d] [VK Auth] Connecting Identity - Name: %s | client_id=%s", streamID, name, creds.ClientID) - doRequest := func(data string, url string) (resp map[string]interface{}, err error) { - parsedURL, err := neturl.Parse(url) - if err != nil { - return nil, fmt.Errorf("parse request URL: %w", err) - } - domain := parsedURL.Hostname() - - req, err := fhttp.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer([]byte(data))) - if err != nil { - return nil, err - } - - req.Host = domain - applyBrowserProfileFhttp(req, profile) - req.Header.Set("Content-Type", "application/x-www-form-urlencoded") - req.Header.Set("Accept", "*/*") - req.Header.Set("Origin", "https://vk.ru") - req.Header.Set("Referer", "https://vk.ru/") - req.Header.Set("Sec-Fetch-Site", "same-site") - req.Header.Set("Sec-Fetch-Mode", "cors") - req.Header.Set("Sec-Fetch-Dest", "empty") - req.Header.Set("Priority", "u=1, i") - - httpResp, err := client.Do(req) - if err != nil { - return nil, err - } - defer func() { - if closeErr := httpResp.Body.Close(); closeErr != nil { - log.Printf("close response body: %s", closeErr) - } - }() - - body, err := io.ReadAll(httpResp.Body) - if err != nil { - return nil, err - } - - err = json.Unmarshal(body, &resp) - if err != nil { - return nil, err - } - return resp, nil - } - - // Token 1 + // Step 1: anonym_token data := fmt.Sprintf("client_id=%s&token_type=messages&client_secret=%s&version=1&app_id=%s", creds.ClientID, creds.ClientSecret, creds.ClientID) - resp, err := doRequest(data, "https://login.vk.ru/?act=get_anonym_token") + resp, err := vkDoRequest(ctx, client, profile, data, "https://login.vk.ru/?act=get_anonym_token") if err != nil { - return "", "", "", err + return nil, err } dataMap, ok := resp["data"].(map[string]interface{}) if !ok { - return "", "", "", fmt.Errorf("unexpected anon token response: %v", resp) + return nil, fmt.Errorf("unexpected anon token response: %v", resp) } token1, ok := dataMap["access_token"].(string) if !ok { - return "", "", "", fmt.Errorf("missing access_token in response: %v", resp) + return nil, fmt.Errorf("missing access_token in response: %v", resp) } vkDelayRandom(100, 150) - // Token 1 -> getCallPreview + // Step 2: getCallPreview (best-effort) data = fmt.Sprintf("vk_join_link=https://vk.com/call/join/%s&fields=photo_200&access_token=%s", link, token1) - _, err = doRequest(data, "https://api.vk.ru/method/calls.getCallPreview?v=5.275&client_id="+creds.ClientID) + _, err = vkDoRequest(ctx, client, profile, data, "https://api.vk.ru/method/calls.getCallPreview?v=5.275&client_id="+creds.ClientID) if err != nil { log.Printf("[STREAM %d] [VK Auth] Warning: getCallPreview failed: %v", streamID, err) } vkDelayRandom(200, 400) - // Token 2 + // Step 3: getAnonymousToken (captcha-gated) data = fmt.Sprintf("vk_join_link=https://vk.com/call/join/%s&name=%s&access_token=%s", link, escapedName, token1) urlAddr := fmt.Sprintf("https://api.vk.ru/method/calls.getAnonymousToken?v=5.275&client_id=%s", creds.ClientID) + exhaustedCaptcha := func() error { + globalCaptchaLockout.Store(time.Now().Add(60 * time.Second).Unix()) + if connectedStreams.Load() == 0 { + log.Printf("[STREAM %d] [FATAL] 0 connected streams and captcha solve modes exhausted.", streamID) + return fmt.Errorf("FATAL_CAPTCHA_FAILED_NO_STREAMS") + } + return fmt.Errorf("CAPTCHA_WAIT_REQUIRED") + } + var token2 string for attempt := 0; ; attempt++ { - resp, err = doRequest(data, urlAddr) + resp, err = vkDoRequest(ctx, client, profile, data, urlAddr) if err != nil { - return "", "", "", err + return nil, err } if errObj, hasErr := resp["error"].(map[string]interface{}); hasErr { @@ -946,16 +1041,7 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede solveMode, hasSolveMode := captchaSolveModeForAttempt(attempt, manualCaptcha, autoCaptchaSliderPOC) if !hasSolveMode { log.Printf("[STREAM %d] [Captcha] No more solve modes available (attempt %d)", streamID, attempt+1) - - // Engage global lockout to protect API - globalCaptchaLockout.Store(time.Now().Add(60 * time.Second).Unix()) - - if connectedStreams.Load() == 0 { - log.Printf("[STREAM %d] [FATAL] 0 connected streams and captcha solve modes exhausted.", streamID) - return "", "", "", fmt.Errorf("FATAL_CAPTCHA_FAILED_NO_STREAMS") - } - - return "", "", "", fmt.Errorf("CAPTCHA_WAIT_REQUIRED") + return nil, exhaustedCaptcha() } var successToken string @@ -1027,7 +1113,6 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede manualCancel() } - // If solving failed (auto or manual) or timed out if solveErr != nil { log.Printf("[STREAM %d] [Captcha] %s failed (attempt %d): %v", streamID, captchaSolveModeLabel(solveMode), attempt+1, solveErr) @@ -1036,17 +1121,7 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede log.Printf("[STREAM %d] [Captcha] Falling back to %s...", streamID, captchaSolveModeLabel(nextSolveMode)) continue } - - // Engage global lockout to protect API - globalCaptchaLockout.Store(time.Now().Add(60 * time.Second).Unix()) - - // If we have 0 streams alive, this is fatal - if connectedStreams.Load() == 0 { - log.Printf("[STREAM %d] [FATAL] 0 connected streams and manual captcha failed/timed out.", streamID) - return "", "", "", fmt.Errorf("FATAL_CAPTCHA_FAILED_NO_STREAMS") - } - - return "", "", "", fmt.Errorf("CAPTCHA_WAIT_REQUIRED") + return nil, exhaustedCaptcha() } if captchaErr.CaptchaAttempt == "0" || captchaErr.CaptchaAttempt == "" { @@ -1062,26 +1137,41 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede } continue } - return "", "", "", fmt.Errorf("VK API error: %v", errObj) + return nil, fmt.Errorf("VK API error: %v", errObj) } respMap, okLoop := resp["response"].(map[string]interface{}) if !okLoop { - return "", "", "", fmt.Errorf("unexpected getAnonymousToken response: %v", resp) + return nil, fmt.Errorf("unexpected getAnonymousToken response: %v", resp) } token2, okLoop = respMap["token"].(string) if !okLoop { - return "", "", "", fmt.Errorf("missing token in response: %v", resp) + return nil, fmt.Errorf("missing token in response: %v", resp) } break } - vkDelayRandom(100, 150) - - // Token 3 + return &vkIdentity{ + creds: creds, + profile: profile, + name: name, + token1: token1, + token2: token2, + client: client, + expiresAt: time.Now().Add(identityLifetime), + }, nil +} + +// acquireVkTurnSlot runs the lightweight portion of the chain (steps 4-5): +// auth.anonymLogin (with a fresh device_id) followed by vchat.joinConversationByLink. +// Each call returns a distinct (username, password) pair from VK, which lets us +// run multiple parallel TURN allocations under the same identity — bypassing +// per-username throttling without re-solving captcha. +func acquireVkTurnSlot(ctx context.Context, link string, streamID int, ident *vkIdentity) (string, string, string, error) { + // Step 4: auth.anonymLogin with fresh device_id → fresh session_key sessionData := fmt.Sprintf(`{"version":2,"device_id":"%s","client_version":1.1,"client_type":"SDK_JS"}`, uuid.New()) - data = fmt.Sprintf("session_data=%s&method=auth.anonymLogin&format=JSON&application_key=CGMMEJLGDIHBABABA", neturl.QueryEscape(sessionData)) - resp, err = doRequest(data, "https://calls.okcdn.ru/fb.do") + data := fmt.Sprintf("session_data=%s&method=auth.anonymLogin&format=JSON&application_key=CGMMEJLGDIHBABABA", neturl.QueryEscape(sessionData)) + resp, err := vkDoRequest(ctx, ident.client, ident.profile, data, "https://calls.okcdn.ru/fb.do") if err != nil { return "", "", "", err } @@ -1092,9 +1182,9 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede vkDelayRandom(100, 150) - // Token 4 -> TURN Creds - data = fmt.Sprintf("joinLink=%s&isVideo=false&protocolVersion=5&capabilities=2F7F&anonymToken=%s&method=vchat.joinConversationByLink&format=JSON&application_key=CGMMEJLGDIHBABABA&session_key=%s", link, token2, token3) - resp, err = doRequest(data, "https://calls.okcdn.ru/fb.do") + // Step 5: vchat.joinConversationByLink → turn_server creds + data = fmt.Sprintf("joinLink=%s&isVideo=false&protocolVersion=5&capabilities=2F7F&anonymToken=%s&method=vchat.joinConversationByLink&format=JSON&application_key=CGMMEJLGDIHBABABA&session_key=%s", link, ident.token2, token3) + resp, err = vkDoRequest(ctx, ident.client, ident.profile, data, "https://calls.okcdn.ru/fb.do") if err != nil { return "", "", "", err } @@ -1118,15 +1208,41 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede if isDebug { log.Printf("[STREAM %d] [VK Auth] turn_server urls: %v", streamID, urlsRaw) } - urlIdx := streamID % len(urlsRaw) - urlStr, ok := urlsRaw[urlIdx].(string) - if !ok { - return "", "", "", fmt.Errorf("turn server url[%d] is not a string", urlIdx) + + // Prefer URLs whose transport matches the requested mode (udpMode). + // Per RFC 7065, "?transport=tcp" → TCP, missing or "transport=udp" → UDP. + // Fall back to the full list if nothing matches — this preserves the + // -port override path where the user intentionally dials a port not + // advertised in the URL list. + all := make([]string, 0, len(urlsRaw)) + preferred := make([]string, 0, len(urlsRaw)) + for _, raw := range urlsRaw { + s, ok := raw.(string) + if !ok { + continue + } + all = append(all, s) + isTCP := strings.Contains(s, "transport=tcp") + if udpMode == !isTCP { + preferred = append(preferred, s) + } } - if isDebug { - log.Printf("[STREAM %d] [VK Auth] picked turn url[%d]: %s", streamID, urlIdx, urlStr) + if len(all) == 0 { + return "", "", "", fmt.Errorf("turn_server urls list contained no strings: %v", urlsRaw) } + pool := preferred + if len(pool) == 0 { + pool = all + log.Printf("[STREAM %d] [VK Auth] no urls match transport (udp=%v), falling back to full list (relying on -port override). urls=%v", streamID, udpMode, all) + } + + // Round-robin within the identity. streamID%len(pool) collapses every + // stream of the identity onto the same parity, so use a counter instead. + urlIdx := int(ident.urlCounter.Add(1)-1) % len(pool) + urlStr := pool[urlIdx] + log.Printf("[STREAM %d] [VK Auth] turn_server urls=%d (preferred=%d), picked[%d]: %s", streamID, len(all), len(preferred), urlIdx, urlStr) + clean := strings.Split(urlStr, "?")[0] address := strings.TrimPrefix(strings.TrimPrefix(clean, "turn:"), "turns:") @@ -1631,6 +1747,115 @@ type turnParams struct { getCreds getCredsFunc } +// turnAllocation bundles a single TURN session: dial socket, TURN client, relay PacketConn. +type turnAllocation struct { + dialConn io.Closer + client *turn.Client + relay net.PacketConn +} + +func (a *turnAllocation) close() { + if a.relay != nil { + _ = a.relay.Close() + } + if a.client != nil { + a.client.Close() + } + if a.dialConn != nil { + _ = a.dialConn.Close() + } +} + +// dialTurn opens a fresh TURN session under the given (user, pass). Each call +// produces an independent 5-tuple (own source UDP/TCP port) and an independent +// TURN allocation. VK may or may not allow multiple allocations under the same +// credentials — caller must tolerate failures on additional sessions. +func dialTurn(ctx context.Context, useUDP bool, turnServerAddr string, turnServerUDPAddr *net.UDPAddr, addrFamily turn.RequestedAddressFamily, user, pass string, streamID int) (*turnAllocation, error) { + var dialCloser io.Closer + var turnConn net.PacketConn + if useUDP { + conn, err := net.DialUDP("udp", nil, turnServerUDPAddr) + if err != nil { + return nil, fmt.Errorf("failed to connect to TURN server: %w", err) + } + dialCloser = conn + turnConn = &connectedUDPConn{conn} + } else { + ctx1, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + var d net.Dialer + conn, err := d.DialContext(ctx1, "tcp", turnServerAddr) + if err != nil { + log.Printf("[STREAM %d] [TURN] tcp dial %s failed: class=%s err=%v", + streamID, turnServerAddr, classifyNetErr(err), err) + return nil, fmt.Errorf("failed to connect to TURN server: %w", err) + } + if isDebug { + log.Printf("[STREAM %d] [TURN] tcp established %s -> %s", + streamID, conn.LocalAddr(), conn.RemoteAddr()) + } + dialCloser = conn + turnConn = turn.NewSTUNConn(&countingConn{Conn: conn}) + } + + cfg := &turn.ClientConfig{ + STUNServerAddr: turnServerAddr, + TURNServerAddr: turnServerAddr, + Conn: turnConn, + Net: newDirectNet(), + Username: user, + Password: pass, + RequestedAddressFamily: addrFamily, + LoggerFactory: logging.NewDefaultLoggerFactory(), + } + + client, err := turn.NewClient(cfg) + if err != nil { + _ = dialCloser.Close() + return nil, fmt.Errorf("failed to create TURN client: %w", err) + } + + if err := client.Listen(); err != nil { + client.Close() + _ = dialCloser.Close() + return nil, fmt.Errorf("failed to listen: %w", err) + } + + relay, err := client.Allocate() + if err != nil { + client.Close() + _ = dialCloser.Close() + return nil, fmt.Errorf("failed to allocate: %w", err) + } + + return &turnAllocation{dialConn: dialCloser, client: client, relay: relay}, nil +} + +// relayPool is a concurrent ring of live relay PacketConns. Reads (pick) are +// non-blocking and lock-free on the hot path; mutation (add) is rare. +type relayPool struct { + mu sync.RWMutex + relays []net.PacketConn + counter atomic.Uint64 +} + +func (p *relayPool) add(r net.PacketConn) { + p.mu.Lock() + p.relays = append(p.relays, r) + p.mu.Unlock() +} + +func (p *relayPool) pick() net.PacketConn { + p.mu.RLock() + defer p.mu.RUnlock() + n := len(p.relays) + if n == 0 { + return nil + } + idx := int(p.counter.Add(1)-1) % n + return p.relays[idx] +} + func oneTurnConnection(ctx context.Context, turnParams *turnParams, peer *net.UDPAddr, conn2 net.PacketConn, streamID int, c chan<- error) { time.Sleep(time.Duration(rand.Intn(400)+100) * time.Millisecond) var err error @@ -1651,8 +1876,7 @@ func oneTurnConnection(ctx context.Context, turnParams *turnParams, peer *net.UD if turnParams.port != "" { urlport = turnParams.port } - var turnServerAddr string - turnServerAddr = net.JoinHostPort(urlhost, urlport) + turnServerAddr := net.JoinHostPort(urlhost, urlport) log.Printf("[STREAM %d] [TURN] dialing %s (udp=%v)", streamID, turnServerAddr, turnParams.udp) turnServerUDPAddr, err1 := net.ResolveUDPAddr("udp", turnServerAddr) if err1 != nil { @@ -1661,47 +1885,7 @@ func oneTurnConnection(ctx context.Context, turnParams *turnParams, peer *net.UD } turnServerAddr = turnServerUDPAddr.String() fmt.Println(turnServerUDPAddr.IP) - var cfg *turn.ClientConfig - var turnConn net.PacketConn - var d net.Dialer - ctx1, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - if turnParams.udp { - conn, err2 := net.DialUDP("udp", nil, turnServerUDPAddr) // nolint: noctx - if err2 != nil { - err = fmt.Errorf("failed to connect to TURN server: %s", err2) - return - } - defer func() { - if err1 = conn.Close(); err1 != nil && err == nil { - err = fmt.Errorf("failed to close TURN server connection: %s", err1) - } - }() - turnConn = &connectedUDPConn{conn} - } else { - conn, err2 := d.DialContext(ctx1, "tcp", turnServerAddr) - if err2 != nil { - log.Printf("[STREAM %d] [TURN] tcp dial %s failed: class=%s err=%v", - streamID, turnServerAddr, classifyNetErr(err2), err2) - err = fmt.Errorf("failed to connect to TURN server: %s", err2) - return - } - if isDebug { - log.Printf("[STREAM %d] [TURN] tcp established %s -> %s", - streamID, conn.LocalAddr(), conn.RemoteAddr()) - } - cc := &countingConn{Conn: conn} - defer func() { - if err != nil && isDebug { - log.Printf("[STREAM %d] [TURN] tcp closing after fail: written=%d read=%d", - streamID, cc.written.Load(), cc.read.Load()) - } - if err1 = conn.Close(); err1 != nil && err == nil { - err = fmt.Errorf("failed to close TURN server connection: %s", err1) - } - }() - turnConn = turn.NewSTUNConn(cc) - } + var addrFamily turn.RequestedAddressFamily if peer.IP.To4() != nil { addrFamily = turn.RequestedAddressFamilyIPv4 @@ -1709,66 +1893,83 @@ func oneTurnConnection(ctx context.Context, turnParams *turnParams, peer *net.UD addrFamily = turn.RequestedAddressFamilyIPv6 } - cfg = &turn.ClientConfig{ - STUNServerAddr: turnServerAddr, - TURNServerAddr: turnServerAddr, - Conn: turnConn, - Net: newDirectNet(), - Username: user, - Password: pass, - RequestedAddressFamily: addrFamily, - LoggerFactory: logging.NewDefaultLoggerFactory(), - } - - client, err1 := turn.NewClient(cfg) - if err1 != nil { - err = fmt.Errorf("failed to create TURN client: %s", err1) - return - } - defer client.Close() - - err1 = client.Listen() - if err1 != nil { - err = fmt.Errorf("failed to listen: %s", err1) - return - } - - relayConn, err1 := client.Allocate() + primary, err1 := dialTurn(ctx, turnParams.udp, turnServerAddr, turnServerUDPAddr, addrFamily, user, pass, streamID) if err1 != nil { if isAuthError(err1) { handleAuthError(streamID) } - err = fmt.Errorf("failed to allocate: %s", err1) + err = err1 return } - // Reset error count on successful allocation getStreamCache(streamID).errorCount.Store(0) - // Safely track active streams globally connectedStreams.Add(1) - defer func() { - connectedStreams.Add(-1) - if err1 := relayConn.Close(); err1 != nil { - err = fmt.Errorf("failed to close TURN allocated connection: %s", err1) - } - }() + defer connectedStreams.Add(-1) if isDebug { - log.Printf("[STREAM %d] relayed-address=%s", streamID, relayConn.LocalAddr().String()) + log.Printf("[STREAM %d] relayed-address=%s", streamID, primary.relay.LocalAddr().String()) } - wg := sync.WaitGroup{} - wg.Add(1) + pool := &relayPool{} + pool.add(primary.relay) + turnctx, turncancel := context.WithCancel(ctx) + defer turncancel() + + // Track all allocations for clean shutdown. + allocs := []*turnAllocation{primary} + var allocsMu sync.Mutex + defer func() { + allocsMu.Lock() + toClose := allocs + allocs = nil + allocsMu.Unlock() + for _, a := range toClose { + a.close() + } + }() + context.AfterFunc(turnctx, func() { - if err := relayConn.SetDeadline(time.Now()); err != nil { - log.Printf("Failed to set relay deadline: %s", err) + allocsMu.Lock() + defer allocsMu.Unlock() + for _, a := range allocs { + if a.relay != nil { + _ = a.relay.SetDeadline(time.Now()) + } } - // Do not set conn2 deadline (conn2 can sometimes be listenConn if direct mode is used) }) + var internalPipeAddr atomic.Value + // Per-relay inbound goroutine: read from its own relay, forward to conn2. + var inboundWg sync.WaitGroup + spawnInbound := func(relay net.PacketConn) { + inboundWg.Add(1) + go func() { + defer inboundWg.Done() + defer turncancel() + buf := make([]byte, 1600) + for { + n, _, err1 := relay.ReadFrom(buf) + if err1 != nil { + return + } + addr1 := internalPipeAddr.Load() + if addr1 == nil { + continue + } + if addr, ok := addr1.(net.Addr); ok { + if _, err := conn2.WriteTo(buf[:n], addr); err != nil { + return + } + } + } + }() + } + spawnInbound(primary.relay) + + // Outbound: read from conn2, send via round-robin across the relay pool. go func() { defer turncancel() buf := make([]byte, 1600) @@ -1783,42 +1984,51 @@ func oneTurnConnection(ctx context.Context, turnParams *turnParams, peer *net.UD if turnctx.Err() != nil { return } - internalPipeAddr.Store(addr1) - _, err1 = relayConn.WriteTo(buf[:n], peer) - if err1 != nil { + r := pool.pick() + if r == nil { + return + } + if _, err1 = r.WriteTo(buf[:n], peer); err1 != nil { return } } }() - go func() { - defer wg.Done() - defer turncancel() - buf := make([]byte, 1600) - for { - n, _, err1 := relayConn.ReadFrom(buf) - if err1 != nil { + // Open extra allocations under the same creds. DTLS handshake completes + // over the primary first; deferring extras lets the server install the + // Connection ID so subsequent multi-path packets are matched to the + // existing session via CID rather than 5-tuple. + extras := allocsPerStream - 1 + if extras > 0 { + go func() { + select { + case <-turnctx.Done(): return + case <-time.After(3 * time.Second): } - addr1 := internalPipeAddr.Load() - if addr1 == nil { - continue - } - - if addr, ok := addr1.(net.Addr); ok { - if _, err := conn2.WriteTo(buf[:n], addr); err != nil { + for i := 0; i < extras; i++ { + if turnctx.Err() != nil { return } + extra, err := dialTurn(ctx, turnParams.udp, turnServerAddr, turnServerUDPAddr, addrFamily, user, pass, streamID) + if err != nil { + log.Printf("[STREAM %d] [TURN] extra alloc %d/%d failed: %v", streamID, i+1, extras, err) + continue + } + log.Printf("[STREAM %d] [TURN] extra alloc %d/%d OK relay=%s", streamID, i+1, extras, extra.relay.LocalAddr()) + allocsMu.Lock() + allocs = append(allocs, extra) + allocsMu.Unlock() + pool.add(extra.relay) + spawnInbound(extra.relay) + time.Sleep(200 * time.Millisecond) } - } - }() - - wg.Wait() - if err := relayConn.SetDeadline(time.Time{}); err != nil { - log.Printf("Failed to clear relay deadline: %s", err) + }() } + + inboundWg.Wait() } func oneDtlsConnectionLoop(ctx context.Context, peer *net.UDPAddr, listenConn net.PacketConn, inboundChan <-chan *UDPPacket, connchan chan<- net.PacketConn, okchan chan<- struct{}, streamID int) { @@ -1923,6 +2133,7 @@ func main() { debugFlag := flag.Bool("debug", false, "enable debug logging") manualCaptchaFlag := flag.Bool("manual-captcha", false, "skip auto captcha solving, use manual mode immediately") dnsFlag := flag.String("dns", DNSModeAuto, "DNS resolution mode: udp | doh | auto (auto tries UDP/53 first, sticky-fallback to DoH on total failure)") + allocsFlag := flag.Int("allocs-per-stream", 1, "open this many TURN allocations per stream under shared creds (only useful if VK throttles per-allocation)") flag.Parse() switch *dnsFlag { case DNSModeUDP, DNSModeDoH, DNSModeAuto: @@ -1945,6 +2156,11 @@ func main() { isDebug = *debugFlag manualCaptcha = *manualCaptchaFlag autoCaptchaSliderPOC = !manualCaptcha + allocsPerStream = *allocsFlag + if allocsPerStream < 1 { + allocsPerStream = 1 + } + udpMode = *udp var link string var getCreds getCredsFunc