@ -67,6 +67,8 @@ var (
isDebug bool
manualCaptcha bool
autoCaptchaSliderPOC bool
allocsPerStream int
udpMode bool
)
type captchaSolveMode int
@ -621,12 +623,12 @@ type VKCredentials struct {
ClientSecret string
}
// Only client_ids that currently expose calls.getAnonymousToken.
// VKVIDEO_* and VK_ID_AUTH_APP started returning error_code:3 "Unknown method"
// (observed 2026-04-28) and only burn throttle budget if kept in rotation.
var vkCredentialsList = [ ] VKCredentials {
{ ClientID : "6287487" , ClientSecret : "QbYic1K3lEV5kTGiqlq2" } , // VK_WEB_APP_ID
{ ClientID : "7879029" , ClientSecret : "aR5NKGmm03GYrCiNKsaw" } , // VK_MVK_APP_ID
{ ClientID : "52461373" , ClientSecret : "o557NLIkAErNhakXrQ7A" } , // VK_WEB_VKVIDEO_APP_ID
{ ClientID : "52649896" , ClientSecret : "WStp4ihWG4l3nmXZgIbC" } , // VK_MVK_VKVIDEO_APP_ID
{ ClientID : "51781872" , ClientSecret : "IjjCNl4L4Tf5QZEXIHKK" } , // VK_ID_AUTH_APP
{ ClientID : "6287487" , ClientSecret : "QbYic1K3lEV5kTGiqlq2" } , // VK_WEB_APP_ID
{ ClientID : "7879029" , ClientSecret : "aR5NKGmm03GYrCiNKsaw" } , // VK_MVK_APP_ID
}
type TurnCredentials struct {
@ -649,7 +651,10 @@ const (
cacheSafetyMargin = 60 * time . Second
maxCacheErrors = 3
errorWindow = 10 * time . Second
streamsPerCache = 10
// streamsPerCache=1: each stream caches its own slot creds because
// acquireVkTurnSlot mints a unique (username, password) per call.
streamsPerCache = 1
identityLifetime = 8 * time . Minute
)
func getCacheID ( streamID int ) int {
@ -761,7 +766,7 @@ func getVkCredsCached(ctx context.Context, link string, streamID int) (string, s
return cache . creds . Username , cache . creds . Password , cache . creds . ServerAddr , nil
}
user , pass , addr , err := fetchVkCredsSerialized ( ctx , link , streamID )
user , pass , addr , err := fetchVkCreds ( ctx , link , streamID )
if err != nil {
return "" , "" , "" , err
}
@ -775,58 +780,111 @@ var (
globalLastVkFetchTime time . Time
)
func fetchVkCredsSerialized ( ctx context . Context , link string , streamID int ) ( string , string , string , error ) {
vkRequestMu . Lock ( )
defer vkRequestMu . Unlock ( )
// vkIdentity caches the captcha-gated portion of a VK auth chain (steps 1-3:
// anonym_token + getCallPreview + getAnonymousToken). Once acquired it can be
// replayed via acquireVkTurnSlot to mint independent TURN credentials, each
// with a unique username — bypassing per-username throttling at the cost of a
// single captcha solve per (link, client_id) pair.
type vkIdentity struct {
creds VKCredentials
profile Profile
name string
token1 string
token2 string
client tlsclient . HttpClient
expiresAt time . Time
urlCounter atomic . Uint64 // round-robin index across turn_server.urls
}
type identityCacheKey struct {
link string
clientID string
}
// Ensure a minimum cooldown between credential requests to avoid VK rate limits
minInterval := 3 * time . Second + time . Duration ( rand . Intn ( 3000 ) ) * time . Millisecond
elapsed := time . Since ( globalLastVkFetchTime )
type identityEntry struct {
mu sync . Mutex
ident * vkIdentity
}
if ! globalLastVkFetchTime . IsZero ( ) && elapsed < minInterval {
wait := minInterval - elapsed
log . Printf ( "[STREAM %d] [VK Auth] Throttling: waiting %v to prevent rate limit..." , streamID , wait . Truncate ( time . Millisecond ) )
select {
case <- ctx . Done ( ) :
return "" , "" , "" , ctx . Err ( )
case <- time . After ( wait ) :
}
var identityStore = struct {
mu sync . Mutex
m map [ identityCacheKey ] * identityEntry
} { m : make ( map [ identityCacheKey ] * identityEntry ) }
func getOrAcquireIdentity ( ctx context . Context , link string , streamID int , creds VKCredentials ) ( * vkIdentity , error ) {
key := identityCacheKey { link : link , clientID : creds . ClientID }
identityStore . mu . Lock ( )
entry , ok := identityStore . m [ key ]
if ! ok {
entry = & identityEntry { }
identityStore . m [ key ] = entry
}
identityStore . mu . Unlock ( )
defer func ( ) {
globalLastVkFetchTime = time . Now ( )
} ( )
entry . mu . Lock ( )
defer entry . mu . Unlock ( )
if entry . ident != nil && time . Now ( ) . Before ( entry . ident . expiresAt ) {
return entry . ident , nil
}
return fetchVkCreds ( ctx , link , streamID )
ident , err := acquireVkIdentity ( ctx , link , streamID , creds )
if err != nil {
return nil , err
}
entry . ident = ident
return ident , nil
}
func invalidateIdentity ( link , clientID string ) {
identityStore . mu . Lock ( )
entry , ok := identityStore . m [ identityCacheKey { link : link , clientID : clientID } ]
identityStore . mu . Unlock ( )
if ! ok {
return
}
entry . mu . Lock ( )
entry . ident = nil
entry . mu . Unlock ( )
}
func fetchVkCreds ( ctx context . Context , link string , streamID int ) ( string , string , string , error ) {
// Check Global Lockout to prevent API bans
if time . Now ( ) . Unix ( ) < globalCaptchaLockout . Load ( ) {
return "" , "" , "" , fmt . Errorf ( "CAPTCHA_WAIT_REQUIRED: global lockout active" )
}
var lastErr error
jar := tlsclient . NewCookieJar ( )
n := len ( vkCredentialsList )
startIdx := streamID % n
for _ , creds := range vkCredentialsList {
var lastErr error
for offset := 0 ; offset < n ; offset ++ {
creds := vkCredentialsList [ ( startIdx + offset ) % n ]
log . Printf ( "[STREAM %d] [VK Auth] Trying credentials: client_id=%s" , streamID , creds . ClientID )
user , pass , addr , err := getTokenChain ( ctx , link , streamID , creds , jar )
ident , err := getOrAcquireIdentity ( ctx , link , streamID , creds )
if err != nil {
lastErr = err
log . Printf ( "[STREAM %d] [VK Auth] identity acquire failed (client_id=%s): %v" , streamID , creds . ClientID , err )
if strings . Contains ( err . Error ( ) , "CAPTCHA_WAIT_REQUIRED" ) || strings . Contains ( err . Error ( ) , "FATAL_CAPTCHA" ) {
return "" , "" , "" , err
}
continue
}
user , pass , addr , err := acquireVkTurnSlot ( ctx , link , streamID , ident )
if err == nil {
log . Printf ( "[STREAM %d] [VK Auth] Success with client_id=%s" , streamID , creds . ClientID )
return user , pass , addr , nil
}
lastErr = err
log . Printf ( "[STREAM %d] [VK Auth] Failed with client_id=%s: %v" , streamID , creds . ClientID , err )
log . Printf ( "[STREAM %d] [VK Auth] slot acquire failed (client_id=%s): %v" , streamID , creds . ClientID , err )
invalidateIdentity ( link , creds . ClientID )
// Hard abort on captcha/fatal conditions instead of trying next creds
if strings . Contains ( err . Error ( ) , "CAPTCHA_WAIT_REQUIRED" ) || strings . Contains ( err . Error ( ) , "FATAL_CAPTCHA" ) {
return "" , "" , "" , err
}
if strings . Contains ( err . Error ( ) , "error_code:29" ) || strings . Contains ( err . Error ( ) , "error_code: 29" ) || strings . Contains ( err . Error ( ) , "Rate limit" ) {
log . Printf ( "[STREAM %d] [VK Auth] Rate limit detected, trying next credentials..." , streamID )
}
@ -835,7 +893,79 @@ func fetchVkCreds(ctx context.Context, link string, streamID int) (string, strin
return "" , "" , "" , fmt . Errorf ( "all VK credentials failed: %w" , lastErr )
}
func getTokenChain ( ctx context . Context , link string , streamID int , creds VKCredentials , jar tlsclient . CookieJar ) ( string , string , string , error ) {
func vkDoRequest ( ctx context . Context , client tlsclient . HttpClient , profile Profile , data , url string ) ( map [ string ] interface { } , error ) {
parsedURL , err := neturl . Parse ( url )
if err != nil {
return nil , fmt . Errorf ( "parse request URL: %w" , err )
}
domain := parsedURL . Hostname ( )
req , err := fhttp . NewRequestWithContext ( ctx , "POST" , url , bytes . NewBuffer ( [ ] byte ( data ) ) )
if err != nil {
return nil , err
}
req . Host = domain
applyBrowserProfileFhttp ( req , profile )
req . Header . Set ( "Content-Type" , "application/x-www-form-urlencoded" )
req . Header . Set ( "Accept" , "*/*" )
req . Header . Set ( "Origin" , "https://vk.ru" )
req . Header . Set ( "Referer" , "https://vk.ru/" )
req . Header . Set ( "Sec-Fetch-Site" , "same-site" )
req . Header . Set ( "Sec-Fetch-Mode" , "cors" )
req . Header . Set ( "Sec-Fetch-Dest" , "empty" )
req . Header . Set ( "Priority" , "u=1, i" )
httpResp , err := client . Do ( req )
if err != nil {
return nil , err
}
defer func ( ) {
if closeErr := httpResp . Body . Close ( ) ; closeErr != nil {
log . Printf ( "close response body: %s" , closeErr )
}
} ( )
body , err := io . ReadAll ( httpResp . Body )
if err != nil {
return nil , err
}
var resp map [ string ] interface { }
if err := json . Unmarshal ( body , & resp ) ; err != nil {
return nil , err
}
return resp , nil
}
// acquireVkIdentity runs the heavy + captcha-gated portion of the VK auth chain
// (steps 1-3: get_anonym_token, calls.getCallPreview, calls.getAnonymousToken).
// The result is cached and reused across many TURN slot acquisitions.
//
// Globally serialised via vkRequestMu + 3-6s cooldown to avoid VK API bans.
func acquireVkIdentity ( ctx context . Context , link string , streamID int , creds VKCredentials ) ( * vkIdentity , error ) {
vkRequestMu . Lock ( )
defer vkRequestMu . Unlock ( )
minInterval := 3 * time . Second + time . Duration ( rand . Intn ( 3000 ) ) * time . Millisecond
elapsed := time . Since ( globalLastVkFetchTime )
if ! globalLastVkFetchTime . IsZero ( ) && elapsed < minInterval {
wait := minInterval - elapsed
log . Printf ( "[STREAM %d] [VK Auth] Throttling: waiting %v to prevent rate limit..." , streamID , wait . Truncate ( time . Millisecond ) )
select {
case <- ctx . Done ( ) :
return nil , ctx . Err ( )
case <- time . After ( wait ) :
}
}
defer func ( ) {
globalLastVkFetchTime = time . Now ( )
} ( )
if time . Now ( ) . Unix ( ) < globalCaptchaLockout . Load ( ) {
return nil , fmt . Errorf ( "CAPTCHA_WAIT_REQUIRED: global lockout active" )
}
profile := Profile {
UserAgent : "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36" ,
SecChUa : ` "Not(A:Brand";v="99", "Google Chrome";v="146", "Chromium";v="146" ` ,
@ -843,6 +973,7 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede
SecChUaPlatform : ` "Windows" ` ,
}
jar := tlsclient . NewCookieJar ( )
client , err := tlsclient . NewHttpClient ( tlsclient . NewNoopLogger ( ) ,
tlsclient . WithTimeoutSeconds ( 20 ) ,
tlsclient . WithClientProfile ( profiles . Chrome_146 ) ,
@ -850,94 +981,58 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede
tlsclient . WithDialer ( appDialer ( ) ) ,
)
if err != nil {
return "" , "" , "" , fmt . Errorf ( "failed to initialize tls_client: %w" , err )
return nil , fmt . Errorf ( "failed to initialize tls_client: %w" , err )
}
name := generateName ( )
escapedName := neturl . QueryEscape ( name )
log . Printf ( "[STREAM %d] [VK Auth] Connecting Identity - Name: %s | User-Agent: %s" , streamID , name , profile . UserAgent )
log . Printf ( "[STREAM %d] [VK Auth] Connecting Identity - Name: %s | client_id= %s" , streamID , name , creds . ClientID )
doRequest := func ( data string , url string ) ( resp map [ string ] interface { } , err error ) {
parsedURL , err := neturl . Parse ( url )
if err != nil {
return nil , fmt . Errorf ( "parse request URL: %w" , err )
}
domain := parsedURL . Hostname ( )
req , err := fhttp . NewRequestWithContext ( ctx , "POST" , url , bytes . NewBuffer ( [ ] byte ( data ) ) )
if err != nil {
return nil , err
}
req . Host = domain
applyBrowserProfileFhttp ( req , profile )
req . Header . Set ( "Content-Type" , "application/x-www-form-urlencoded" )
req . Header . Set ( "Accept" , "*/*" )
req . Header . Set ( "Origin" , "https://vk.ru" )
req . Header . Set ( "Referer" , "https://vk.ru/" )
req . Header . Set ( "Sec-Fetch-Site" , "same-site" )
req . Header . Set ( "Sec-Fetch-Mode" , "cors" )
req . Header . Set ( "Sec-Fetch-Dest" , "empty" )
req . Header . Set ( "Priority" , "u=1, i" )
httpResp , err := client . Do ( req )
if err != nil {
return nil , err
}
defer func ( ) {
if closeErr := httpResp . Body . Close ( ) ; closeErr != nil {
log . Printf ( "close response body: %s" , closeErr )
}
} ( )
body , err := io . ReadAll ( httpResp . Body )
if err != nil {
return nil , err
}
err = json . Unmarshal ( body , & resp )
if err != nil {
return nil , err
}
return resp , nil
}
// Token 1
// Step 1: anonym_token
data := fmt . Sprintf ( "client_id=%s&token_type=messages&client_secret=%s&version=1&app_id=%s" , creds . ClientID , creds . ClientSecret , creds . ClientID )
resp , err := doRequest ( data , "https://login.vk.ru/?act=get_anonym_token" )
resp , err := vkDoRequest ( ctx , client , profile , data , "https://login.vk.ru/?act=get_anonym_token" )
if err != nil {
return "" , "" , "" , err
return nil , err
}
dataMap , ok := resp [ "data" ] . ( map [ string ] interface { } )
if ! ok {
return "" , "" , "" , fmt . Errorf ( "unexpected anon token response: %v" , resp )
return nil , fmt . Errorf ( "unexpected anon token response: %v" , resp )
}
token1 , ok := dataMap [ "access_token" ] . ( string )
if ! ok {
return "" , "" , "" , fmt . Errorf ( "missing access_token in response: %v" , resp )
return nil , fmt . Errorf ( "missing access_token in response: %v" , resp )
}
vkDelayRandom ( 100 , 150 )
// Token 1 -> getCallPreview
// Step 2: getCallPreview (best-effort)
data = fmt . Sprintf ( "vk_join_link=https://vk.com/call/join/%s&fields=photo_200&access_token=%s" , link , token1 )
_ , err = doRequest ( data , "https://api.vk.ru/method/calls.getCallPreview?v=5.275&client_id=" + creds . ClientID )
_ , err = vkDoRequest ( ctx , client , profile , data , "https://api.vk.ru/method/calls.getCallPreview?v=5.275&client_id=" + creds . ClientID )
if err != nil {
log . Printf ( "[STREAM %d] [VK Auth] Warning: getCallPreview failed: %v" , streamID , err )
}
vkDelayRandom ( 200 , 400 )
// Token 2
// Step 3: getAnonymousToken (captcha-gated)
data = fmt . Sprintf ( "vk_join_link=https://vk.com/call/join/%s&name=%s&access_token=%s" , link , escapedName , token1 )
urlAddr := fmt . Sprintf ( "https://api.vk.ru/method/calls.getAnonymousToken?v=5.275&client_id=%s" , creds . ClientID )
exhaustedCaptcha := func ( ) error {
globalCaptchaLockout . Store ( time . Now ( ) . Add ( 60 * time . Second ) . Unix ( ) )
if connectedStreams . Load ( ) == 0 {
log . Printf ( "[STREAM %d] [FATAL] 0 connected streams and captcha solve modes exhausted." , streamID )
return fmt . Errorf ( "FATAL_CAPTCHA_FAILED_NO_STREAMS" )
}
return fmt . Errorf ( "CAPTCHA_WAIT_REQUIRED" )
}
var token2 string
for attempt := 0 ; ; attempt ++ {
resp , err = doRequest ( data , urlAddr )
resp , err = vkDoRequest ( ctx , client , profile , data , urlAddr )
if err != nil {
return "" , "" , "" , err
return nil , err
}
if errObj , hasErr := resp [ "error" ] . ( map [ string ] interface { } ) ; hasErr {
@ -946,16 +1041,7 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede
solveMode , hasSolveMode := captchaSolveModeForAttempt ( attempt , manualCaptcha , autoCaptchaSliderPOC )
if ! hasSolveMode {
log . Printf ( "[STREAM %d] [Captcha] No more solve modes available (attempt %d)" , streamID , attempt + 1 )
// Engage global lockout to protect API
globalCaptchaLockout . Store ( time . Now ( ) . Add ( 60 * time . Second ) . Unix ( ) )
if connectedStreams . Load ( ) == 0 {
log . Printf ( "[STREAM %d] [FATAL] 0 connected streams and captcha solve modes exhausted." , streamID )
return "" , "" , "" , fmt . Errorf ( "FATAL_CAPTCHA_FAILED_NO_STREAMS" )
}
return "" , "" , "" , fmt . Errorf ( "CAPTCHA_WAIT_REQUIRED" )
return nil , exhaustedCaptcha ( )
}
var successToken string
@ -1027,7 +1113,6 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede
manualCancel ( )
}
// If solving failed (auto or manual) or timed out
if solveErr != nil {
log . Printf ( "[STREAM %d] [Captcha] %s failed (attempt %d): %v" , streamID , captchaSolveModeLabel ( solveMode ) , attempt + 1 , solveErr )
@ -1036,17 +1121,7 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede
log . Printf ( "[STREAM %d] [Captcha] Falling back to %s..." , streamID , captchaSolveModeLabel ( nextSolveMode ) )
continue
}
// Engage global lockout to protect API
globalCaptchaLockout . Store ( time . Now ( ) . Add ( 60 * time . Second ) . Unix ( ) )
// If we have 0 streams alive, this is fatal
if connectedStreams . Load ( ) == 0 {
log . Printf ( "[STREAM %d] [FATAL] 0 connected streams and manual captcha failed/timed out." , streamID )
return "" , "" , "" , fmt . Errorf ( "FATAL_CAPTCHA_FAILED_NO_STREAMS" )
}
return "" , "" , "" , fmt . Errorf ( "CAPTCHA_WAIT_REQUIRED" )
return nil , exhaustedCaptcha ( )
}
if captchaErr . CaptchaAttempt == "0" || captchaErr . CaptchaAttempt == "" {
@ -1062,26 +1137,41 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede
}
continue
}
return "" , "" , "" , fmt . Errorf ( "VK API error: %v" , errObj )
return nil , fmt . Errorf ( "VK API error: %v" , errObj )
}
respMap , okLoop := resp [ "response" ] . ( map [ string ] interface { } )
if ! okLoop {
return "" , "" , "" , fmt . Errorf ( "unexpected getAnonymousToken response: %v" , resp )
return nil , fmt . Errorf ( "unexpected getAnonymousToken response: %v" , resp )
}
token2 , okLoop = respMap [ "token" ] . ( string )
if ! okLoop {
return "" , "" , "" , fmt . Errorf ( "missing token in response: %v" , resp )
return nil , fmt . Errorf ( "missing token in response: %v" , resp )
}
break
}
vkDelayRandom ( 100 , 150 )
// Token 3
return & vkIdentity {
creds : creds ,
profile : profile ,
name : name ,
token1 : token1 ,
token2 : token2 ,
client : client ,
expiresAt : time . Now ( ) . Add ( identityLifetime ) ,
} , nil
}
// acquireVkTurnSlot runs the lightweight portion of the chain (steps 4-5):
// auth.anonymLogin (with a fresh device_id) followed by vchat.joinConversationByLink.
// Each call returns a distinct (username, password) pair from VK, which lets us
// run multiple parallel TURN allocations under the same identity — bypassing
// per-username throttling without re-solving captcha.
func acquireVkTurnSlot ( ctx context . Context , link string , streamID int , ident * vkIdentity ) ( string , string , string , error ) {
// Step 4: auth.anonymLogin with fresh device_id → fresh session_key
sessionData := fmt . Sprintf ( ` { "version":2,"device_id":"%s","client_version":1.1,"client_type":"SDK_JS"} ` , uuid . New ( ) )
data = fmt . Sprintf ( "session_data=%s&method=auth.anonymLogin&format=JSON&application_key=CGMMEJLGDIHBABABA" , neturl . QueryEscape ( sessionData ) )
resp , err = doRequest ( data , "https://calls.okcdn.ru/fb.do" )
data : = fmt . Sprintf ( "session_data=%s&method=auth.anonymLogin&format=JSON&application_key=CGMMEJLGDIHBABABA" , neturl . QueryEscape ( sessionData ) )
resp , err := vkDoRequest ( ctx , ident . client , ident . profile , data , "https://calls.okcdn.ru/fb.do" )
if err != nil {
return "" , "" , "" , err
}
@ -1092,9 +1182,9 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede
vkDelayRandom ( 100 , 150 )
// Token 4 -> TURN C reds
data = fmt . Sprintf ( "joinLink=%s&isVideo=false&protocolVersion=5&capabilities=2F7F&anonymToken=%s&method=vchat.joinConversationByLink&format=JSON&application_key=CGMMEJLGDIHBABABA&session_key=%s" , link , token2 , token3 )
resp , err = doRequest ( data , "https://calls.okcdn.ru/fb.do" )
// Step 5: vchat.joinConversationByLink → turn_server c reds
data = fmt . Sprintf ( "joinLink=%s&isVideo=false&protocolVersion=5&capabilities=2F7F&anonymToken=%s&method=vchat.joinConversationByLink&format=JSON&application_key=CGMMEJLGDIHBABABA&session_key=%s" , link , ident . token2 , token3 )
resp , err = vkDoRequest ( ctx , ident . client , ident . profile , data , "https://calls.okcdn.ru/fb.do" )
if err != nil {
return "" , "" , "" , err
}
@ -1118,15 +1208,41 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede
if isDebug {
log . Printf ( "[STREAM %d] [VK Auth] turn_server urls: %v" , streamID , urlsRaw )
}
urlIdx := streamID % len ( urlsRaw )
urlStr , ok := urlsRaw [ urlIdx ] . ( string )
if ! ok {
return "" , "" , "" , fmt . Errorf ( "turn server url[%d] is not a string" , urlIdx )
// Prefer URLs whose transport matches the requested mode (udpMode).
// Per RFC 7065, "?transport=tcp" → TCP, missing or "transport=udp" → UDP.
// Fall back to the full list if nothing matches — this preserves the
// -port override path where the user intentionally dials a port not
// advertised in the URL list.
all := make ( [ ] string , 0 , len ( urlsRaw ) )
preferred := make ( [ ] string , 0 , len ( urlsRaw ) )
for _ , raw := range urlsRaw {
s , ok := raw . ( string )
if ! ok {
continue
}
all = append ( all , s )
isTCP := strings . Contains ( s , "transport=tcp" )
if udpMode == ! isTCP {
preferred = append ( preferred , s )
}
}
if isDebug {
log . Printf ( "[STREAM %d] [VK Auth] picked turn url[%d]: %s" , streamID , urlIdx , urlStr )
if len ( all ) == 0 {
return "" , "" , "" , fmt . Errorf ( "turn_server urls list contained no strings: %v" , urlsRaw )
}
pool := preferred
if len ( pool ) == 0 {
pool = all
log . Printf ( "[STREAM %d] [VK Auth] no urls match transport (udp=%v), falling back to full list (relying on -port override). urls=%v" , streamID , udpMode , all )
}
// Round-robin within the identity. streamID%len(pool) collapses every
// stream of the identity onto the same parity, so use a counter instead.
urlIdx := int ( ident . urlCounter . Add ( 1 ) - 1 ) % len ( pool )
urlStr := pool [ urlIdx ]
log . Printf ( "[STREAM %d] [VK Auth] turn_server urls=%d (preferred=%d), picked[%d]: %s" , streamID , len ( all ) , len ( preferred ) , urlIdx , urlStr )
clean := strings . Split ( urlStr , "?" ) [ 0 ]
address := strings . TrimPrefix ( strings . TrimPrefix ( clean , "turn:" ) , "turns:" )
@ -1631,6 +1747,115 @@ type turnParams struct {
getCreds getCredsFunc
}
// turnAllocation bundles a single TURN session: dial socket, TURN client, relay PacketConn.
type turnAllocation struct {
dialConn io . Closer
client * turn . Client
relay net . PacketConn
}
func ( a * turnAllocation ) close ( ) {
if a . relay != nil {
_ = a . relay . Close ( )
}
if a . client != nil {
a . client . Close ( )
}
if a . dialConn != nil {
_ = a . dialConn . Close ( )
}
}
// dialTurn opens a fresh TURN session under the given (user, pass). Each call
// produces an independent 5-tuple (own source UDP/TCP port) and an independent
// TURN allocation. VK may or may not allow multiple allocations under the same
// credentials — caller must tolerate failures on additional sessions.
func dialTurn ( ctx context . Context , useUDP bool , turnServerAddr string , turnServerUDPAddr * net . UDPAddr , addrFamily turn . RequestedAddressFamily , user , pass string , streamID int ) ( * turnAllocation , error ) {
var dialCloser io . Closer
var turnConn net . PacketConn
if useUDP {
conn , err := net . DialUDP ( "udp" , nil , turnServerUDPAddr )
if err != nil {
return nil , fmt . Errorf ( "failed to connect to TURN server: %w" , err )
}
dialCloser = conn
turnConn = & connectedUDPConn { conn }
} else {
ctx1 , cancel := context . WithTimeout ( ctx , 5 * time . Second )
defer cancel ( )
var d net . Dialer
conn , err := d . DialContext ( ctx1 , "tcp" , turnServerAddr )
if err != nil {
log . Printf ( "[STREAM %d] [TURN] tcp dial %s failed: class=%s err=%v" ,
streamID , turnServerAddr , classifyNetErr ( err ) , err )
return nil , fmt . Errorf ( "failed to connect to TURN server: %w" , err )
}
if isDebug {
log . Printf ( "[STREAM %d] [TURN] tcp established %s -> %s" ,
streamID , conn . LocalAddr ( ) , conn . RemoteAddr ( ) )
}
dialCloser = conn
turnConn = turn . NewSTUNConn ( & countingConn { Conn : conn } )
}
cfg := & turn . ClientConfig {
STUNServerAddr : turnServerAddr ,
TURNServerAddr : turnServerAddr ,
Conn : turnConn ,
Net : newDirectNet ( ) ,
Username : user ,
Password : pass ,
RequestedAddressFamily : addrFamily ,
LoggerFactory : logging . NewDefaultLoggerFactory ( ) ,
}
client , err := turn . NewClient ( cfg )
if err != nil {
_ = dialCloser . Close ( )
return nil , fmt . Errorf ( "failed to create TURN client: %w" , err )
}
if err := client . Listen ( ) ; err != nil {
client . Close ( )
_ = dialCloser . Close ( )
return nil , fmt . Errorf ( "failed to listen: %w" , err )
}
relay , err := client . Allocate ( )
if err != nil {
client . Close ( )
_ = dialCloser . Close ( )
return nil , fmt . Errorf ( "failed to allocate: %w" , err )
}
return & turnAllocation { dialConn : dialCloser , client : client , relay : relay } , nil
}
// relayPool is a concurrent ring of live relay PacketConns. Reads (pick) are
// non-blocking and lock-free on the hot path; mutation (add) is rare.
type relayPool struct {
mu sync . RWMutex
relays [ ] net . PacketConn
counter atomic . Uint64
}
func ( p * relayPool ) add ( r net . PacketConn ) {
p . mu . Lock ( )
p . relays = append ( p . relays , r )
p . mu . Unlock ( )
}
func ( p * relayPool ) pick ( ) net . PacketConn {
p . mu . RLock ( )
defer p . mu . RUnlock ( )
n := len ( p . relays )
if n == 0 {
return nil
}
idx := int ( p . counter . Add ( 1 ) - 1 ) % n
return p . relays [ idx ]
}
func oneTurnConnection ( ctx context . Context , turnParams * turnParams , peer * net . UDPAddr , conn2 net . PacketConn , streamID int , c chan <- error ) {
time . Sleep ( time . Duration ( rand . Intn ( 400 ) + 100 ) * time . Millisecond )
var err error
@ -1651,8 +1876,7 @@ func oneTurnConnection(ctx context.Context, turnParams *turnParams, peer *net.UD
if turnParams . port != "" {
urlport = turnParams . port
}
var turnServerAddr string
turnServerAddr = net . JoinHostPort ( urlhost , urlport )
turnServerAddr := net . JoinHostPort ( urlhost , urlport )
log . Printf ( "[STREAM %d] [TURN] dialing %s (udp=%v)" , streamID , turnServerAddr , turnParams . udp )
turnServerUDPAddr , err1 := net . ResolveUDPAddr ( "udp" , turnServerAddr )
if err1 != nil {
@ -1661,47 +1885,7 @@ func oneTurnConnection(ctx context.Context, turnParams *turnParams, peer *net.UD
}
turnServerAddr = turnServerUDPAddr . String ( )
fmt . Println ( turnServerUDPAddr . IP )
var cfg * turn . ClientConfig
var turnConn net . PacketConn
var d net . Dialer
ctx1 , cancel := context . WithTimeout ( ctx , 5 * time . Second )
defer cancel ( )
if turnParams . udp {
conn , err2 := net . DialUDP ( "udp" , nil , turnServerUDPAddr ) // nolint: noctx
if err2 != nil {
err = fmt . Errorf ( "failed to connect to TURN server: %s" , err2 )
return
}
defer func ( ) {
if err1 = conn . Close ( ) ; err1 != nil && err == nil {
err = fmt . Errorf ( "failed to close TURN server connection: %s" , err1 )
}
} ( )
turnConn = & connectedUDPConn { conn }
} else {
conn , err2 := d . DialContext ( ctx1 , "tcp" , turnServerAddr )
if err2 != nil {
log . Printf ( "[STREAM %d] [TURN] tcp dial %s failed: class=%s err=%v" ,
streamID , turnServerAddr , classifyNetErr ( err2 ) , err2 )
err = fmt . Errorf ( "failed to connect to TURN server: %s" , err2 )
return
}
if isDebug {
log . Printf ( "[STREAM %d] [TURN] tcp established %s -> %s" ,
streamID , conn . LocalAddr ( ) , conn . RemoteAddr ( ) )
}
cc := & countingConn { Conn : conn }
defer func ( ) {
if err != nil && isDebug {
log . Printf ( "[STREAM %d] [TURN] tcp closing after fail: written=%d read=%d" ,
streamID , cc . written . Load ( ) , cc . read . Load ( ) )
}
if err1 = conn . Close ( ) ; err1 != nil && err == nil {
err = fmt . Errorf ( "failed to close TURN server connection: %s" , err1 )
}
} ( )
turnConn = turn . NewSTUNConn ( cc )
}
var addrFamily turn . RequestedAddressFamily
if peer . IP . To4 ( ) != nil {
addrFamily = turn . RequestedAddressFamilyIPv4
@ -1709,66 +1893,83 @@ func oneTurnConnection(ctx context.Context, turnParams *turnParams, peer *net.UD
addrFamily = turn . RequestedAddressFamilyIPv6
}
cfg = & turn . ClientConfig {
STUNServerAddr : turnServerAddr ,
TURNServerAddr : turnServerAddr ,
Conn : turnConn ,
Net : newDirectNet ( ) ,
Username : user ,
Password : pass ,
RequestedAddressFamily : addrFamily ,
LoggerFactory : logging . NewDefaultLoggerFactory ( ) ,
}
client , err1 := turn . NewClient ( cfg )
if err1 != nil {
err = fmt . Errorf ( "failed to create TURN client: %s" , err1 )
return
}
defer client . Close ( )
err1 = client . Listen ( )
if err1 != nil {
err = fmt . Errorf ( "failed to listen: %s" , err1 )
return
}
relayConn , err1 := client . Allocate ( )
primary , err1 := dialTurn ( ctx , turnParams . udp , turnServerAddr , turnServerUDPAddr , addrFamily , user , pass , streamID )
if err1 != nil {
if isAuthError ( err1 ) {
handleAuthError ( streamID )
}
err = fmt . Errorf ( "failed to allocate: %s" , err1 )
err = err1
return
}
// Reset error count on successful allocation
getStreamCache ( streamID ) . errorCount . Store ( 0 )
// Safely track active streams globally
connectedStreams . Add ( 1 )
defer func ( ) {
connectedStreams . Add ( - 1 )
if err1 := relayConn . Close ( ) ; err1 != nil {
err = fmt . Errorf ( "failed to close TURN allocated connection: %s" , err1 )
}
} ( )
defer connectedStreams . Add ( - 1 )
if isDebug {
log . Printf ( "[STREAM %d] relayed-address=%s" , streamID , relayConn . LocalAddr ( ) . String ( ) )
log . Printf ( "[STREAM %d] relayed-address=%s" , streamID , primary . relay . LocalAddr ( ) . String ( ) )
}
wg := sync . WaitGroup { }
wg . Add ( 1 )
pool := & relayPool { }
pool . add ( primary . relay )
turnctx , turncancel := context . WithCancel ( ctx )
defer turncancel ( )
// Track all allocations for clean shutdown.
allocs := [ ] * turnAllocation { primary }
var allocsMu sync . Mutex
defer func ( ) {
allocsMu . Lock ( )
toClose := allocs
allocs = nil
allocsMu . Unlock ( )
for _ , a := range toClose {
a . close ( )
}
} ( )
context . AfterFunc ( turnctx , func ( ) {
if err := relayConn . SetDeadline ( time . Now ( ) ) ; err != nil {
log . Printf ( "Failed to set relay deadline: %s" , err )
allocsMu . Lock ( )
defer allocsMu . Unlock ( )
for _ , a := range allocs {
if a . relay != nil {
_ = a . relay . SetDeadline ( time . Now ( ) )
}
}
// Do not set conn2 deadline (conn2 can sometimes be listenConn if direct mode is used)
} )
var internalPipeAddr atomic . Value
// Per-relay inbound goroutine: read from its own relay, forward to conn2.
var inboundWg sync . WaitGroup
spawnInbound := func ( relay net . PacketConn ) {
inboundWg . Add ( 1 )
go func ( ) {
defer inboundWg . Done ( )
defer turncancel ( )
buf := make ( [ ] byte , 1600 )
for {
n , _ , err1 := relay . ReadFrom ( buf )
if err1 != nil {
return
}
addr1 := internalPipeAddr . Load ( )
if addr1 == nil {
continue
}
if addr , ok := addr1 . ( net . Addr ) ; ok {
if _ , err := conn2 . WriteTo ( buf [ : n ] , addr ) ; err != nil {
return
}
}
}
} ( )
}
spawnInbound ( primary . relay )
// Outbound: read from conn2, send via round-robin across the relay pool.
go func ( ) {
defer turncancel ( )
buf := make ( [ ] byte , 1600 )
@ -1783,42 +1984,51 @@ func oneTurnConnection(ctx context.Context, turnParams *turnParams, peer *net.UD
if turnctx . Err ( ) != nil {
return
}
internalPipeAddr . Store ( addr1 )
_ , err1 = relayConn . WriteTo ( buf [ : n ] , peer )
if err1 != nil {
r := pool . pick ( )
if r == nil {
return
}
if _ , err1 = r . WriteTo ( buf [ : n ] , peer ) ; err1 != nil {
return
}
}
} ( )
go func ( ) {
defer wg . Done ( )
defer turncancel ( )
buf := make ( [ ] byte , 1600 )
for {
n , _ , err1 := relayConn . ReadFrom ( buf )
if err1 != nil {
// Open extra allocations under the same creds. DTLS handshake completes
// over the primary first; deferring extras lets the server install the
// Connection ID so subsequent multi-path packets are matched to the
// existing session via CID rather than 5-tuple.
extras := allocsPerStream - 1
if extras > 0 {
go func ( ) {
select {
case <- turnctx . Done ( ) :
return
case <- time . After ( 3 * time . Second ) :
}
addr1 := internalPipeAddr . Load ( )
if addr1 == nil {
continue
}
if addr , ok := addr1 . ( net . Addr ) ; ok {
if _ , err := conn2 . WriteTo ( buf [ : n ] , addr ) ; err != nil {
for i := 0 ; i < extras ; i ++ {
if turnctx . Err ( ) != nil {
return
}
extra , err := dialTurn ( ctx , turnParams . udp , turnServerAddr , turnServerUDPAddr , addrFamily , user , pass , streamID )
if err != nil {
log . Printf ( "[STREAM %d] [TURN] extra alloc %d/%d failed: %v" , streamID , i + 1 , extras , err )
continue
}
log . Printf ( "[STREAM %d] [TURN] extra alloc %d/%d OK relay=%s" , streamID , i + 1 , extras , extra . relay . LocalAddr ( ) )
allocsMu . Lock ( )
allocs = append ( allocs , extra )
allocsMu . Unlock ( )
pool . add ( extra . relay )
spawnInbound ( extra . relay )
time . Sleep ( 200 * time . Millisecond )
}
}
} ( )
wg . Wait ( )
if err := relayConn . SetDeadline ( time . Time { } ) ; err != nil {
log . Printf ( "Failed to clear relay deadline: %s" , err )
} ( )
}
inboundWg . Wait ( )
}
func oneDtlsConnectionLoop ( ctx context . Context , peer * net . UDPAddr , listenConn net . PacketConn , inboundChan <- chan * UDPPacket , connchan chan <- net . PacketConn , okchan chan <- struct { } , streamID int ) {
@ -1923,6 +2133,7 @@ func main() {
debugFlag := flag . Bool ( "debug" , false , "enable debug logging" )
manualCaptchaFlag := flag . Bool ( "manual-captcha" , false , "skip auto captcha solving, use manual mode immediately" )
dnsFlag := flag . String ( "dns" , DNSModeAuto , "DNS resolution mode: udp | doh | auto (auto tries UDP/53 first, sticky-fallback to DoH on total failure)" )
allocsFlag := flag . Int ( "allocs-per-stream" , 1 , "open this many TURN allocations per stream under shared creds (only useful if VK throttles per-allocation)" )
flag . Parse ( )
switch * dnsFlag {
case DNSModeUDP , DNSModeDoH , DNSModeAuto :
@ -1945,6 +2156,11 @@ func main() {
isDebug = * debugFlag
manualCaptcha = * manualCaptchaFlag
autoCaptchaSliderPOC = ! manualCaptcha
allocsPerStream = * allocsFlag
if allocsPerStream < 1 {
allocsPerStream = 1
}
udpMode = * udp
var link string
var getCreds getCredsFunc