diff --git a/client/main.go b/client/main.go index fd3f885..59bd2e6 100644 --- a/client/main.go +++ b/client/main.go @@ -697,6 +697,24 @@ func vkDelayRandom(minMs, maxMs int) { time.Sleep(time.Duration(ms) * time.Millisecond) } +// sleepCtx waits d or until ctx cancels, whichever comes first. Returns false +// on cancellation. Uses NewTimer+Stop so the timer is reclaimed immediately on +// ctx cancel, not after d expires (avoids long-lived timer leak under repeated +// cancellation/restart cycles). +func sleepCtx(ctx context.Context, d time.Duration) bool { + if d <= 0 { + return ctx.Err() == nil + } + timer := time.NewTimer(d) + defer timer.Stop() + select { + case <-ctx.Done(): + return false + case <-timer.C: + return true + } +} + var credentialsStore = struct { mu sync.RWMutex caches map[int]*StreamCredentialsCache @@ -863,6 +881,65 @@ var identityStore = struct { m map[identityCacheKey]*identityEntry }{m: make(map[identityCacheKey]*identityEntry)} +// startIdentityJanitor prunes expired identityEntry every period. Long-running +// clients hopping across many distinct (link, client_id) pairs would otherwise +// grow identityStore.m without bound. +// +// Two-phase to avoid blocking acquires: phase 1 snapshots keys under store +// lock; phase 2 inspects each entry via TryLock. If an entry is busy +// (acquireVkIdentity in progress), skip it this round — it will be revisited +// next tick. Only entries with nil or expired ident are deleted. +func startIdentityJanitor(ctx context.Context, period time.Duration) { + go func() { + ticker := time.NewTicker(period) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + + identityStore.mu.Lock() + keys := make([]identityCacheKey, 0, len(identityStore.m)) + for k := range identityStore.m { + keys = append(keys, k) + } + identityStore.mu.Unlock() + + now := time.Now() + for _, key := range keys { + identityStore.mu.Lock() + entry, ok := identityStore.m[key] + identityStore.mu.Unlock() + if !ok { + continue + } + if !entry.mu.TryLock() { + continue // busy — revisit next tick + } + expired := entry.ident == nil || now.After(entry.ident.expiresAt) + entry.mu.Unlock() + if !expired { + continue + } + identityStore.mu.Lock() + // Re-check under store lock + entry lock to avoid racing a + // concurrent acquire that just refilled the entry. + if cur, stillThere := identityStore.m[key]; stillThere && cur == entry { + if entry.mu.TryLock() { + if entry.ident == nil || now.After(entry.ident.expiresAt) { + delete(identityStore.m, key) + } + entry.mu.Unlock() + } + } + identityStore.mu.Unlock() + } + } + }() +} + func getOrAcquireIdentity(ctx context.Context, link string, streamID int, creds VKCredentials) (*vkIdentity, error) { key := identityCacheKey{link: link, clientID: creds.ClientID} @@ -1574,7 +1651,8 @@ func getYandexCreds(link string) (string, string, string, error) { return "", "", "", fmt.Errorf("ws set read deadline: %w", err) } - for { + const maxWsMessages = 64 + for i := 0; i < maxWsMessages; i++ { _, msg, err := conn.ReadMessage() if err != nil { return "", "", "", fmt.Errorf("ws read: %w", err) @@ -1611,6 +1689,7 @@ func getYandexCreds(link string) (string, string, string, error) { } } } + return "", "", "", fmt.Errorf("ws read: no serverHello with TURN urls after %d messages", maxWsMessages) } func dtlsFunc(ctx context.Context, conn net.PacketConn, peer *net.UDPAddr) (net.Conn, error) { @@ -2119,10 +2198,8 @@ func oneDtlsConnectionLoop(ctx context.Context, peer *net.UDPAddr, listenConn ne if time.Now().Unix() < globalCaptchaLockout.Load() && strings.Contains(err.Error(), "context deadline exceeded") { continue } - select { - case <-ctx.Done(): + if !sleepCtx(ctx, time.Duration(10+rand.Intn(20))*time.Second) { return - case <-time.After(time.Duration(10+rand.Intn(20)) * time.Second): } } } @@ -2154,10 +2231,8 @@ func oneTurnConnectionLoop(ctx context.Context, turnParams *turnParams, peer *ne if strings.Contains(err.Error(), "CAPTCHA_WAIT_REQUIRED") { if !strings.Contains(err.Error(), "global lockout active") { log.Printf("[STREAM %d] Backing off for 60 seconds to avoid IP ban...", streamID) - select { - case <-ctx.Done(): + if !sleepCtx(ctx, 60*time.Second) { return - case <-time.After(60 * time.Second): } } else { lockoutEnd := globalCaptchaLockout.Load() @@ -2165,10 +2240,8 @@ func oneTurnConnectionLoop(ctx context.Context, turnParams *turnParams, peer *ne if sleepDuration < 0 { sleepDuration = 5 * time.Second } - select { - case <-ctx.Done(): + if !sleepCtx(ctx, sleepDuration) { return - case <-time.After(sleepDuration): } } } else { @@ -2244,6 +2317,8 @@ func main() { } udpMode = *udp + startIdentityJanitor(ctx, 5*time.Minute) + var link string var getCreds getCredsFunc if *vklink != "" {