Browse Source

fix(client): stability — timer leaks, identity store growth, ws loop bound

- sleepCtx helper (NewTimer+Stop) replaces time.After in long backoff sites:
  DTLS reconnect (10-30s), captcha 60s ban backoff, lockout sleep. Avoids
  long-lived timer leak when ctx cancels mid-wait.
- startIdentityJanitor: prunes expired identityStore entries every 5 min.
  Two-phase with TryLock so an in-flight acquireVkIdentity (which can hold
  entry.mu for tens of seconds during captcha) never blocks the janitor or
  other acquires.
- getYandexCreds: cap ws read loop at 64 messages so a chatty/malformed peer
  cannot keep us reading until the 15s deadline burns down.
pull/151/head
samosvalishe 2 months ago
parent
commit
e115e4de8c
  1. 95
      client/main.go

95
client/main.go

@ -697,6 +697,24 @@ func vkDelayRandom(minMs, maxMs int) {
time.Sleep(time.Duration(ms) * time.Millisecond)
}
// sleepCtx waits d or until ctx cancels, whichever comes first. Returns false
// on cancellation. Uses NewTimer+Stop so the timer is reclaimed immediately on
// ctx cancel, not after d expires (avoids long-lived timer leak under repeated
// cancellation/restart cycles).
func sleepCtx(ctx context.Context, d time.Duration) bool {
if d <= 0 {
return ctx.Err() == nil
}
timer := time.NewTimer(d)
defer timer.Stop()
select {
case <-ctx.Done():
return false
case <-timer.C:
return true
}
}
var credentialsStore = struct {
mu sync.RWMutex
caches map[int]*StreamCredentialsCache
@ -863,6 +881,65 @@ var identityStore = struct {
m map[identityCacheKey]*identityEntry
}{m: make(map[identityCacheKey]*identityEntry)}
// startIdentityJanitor prunes expired identityEntry every period. Long-running
// clients hopping across many distinct (link, client_id) pairs would otherwise
// grow identityStore.m without bound.
//
// Two-phase to avoid blocking acquires: phase 1 snapshots keys under store
// lock; phase 2 inspects each entry via TryLock. If an entry is busy
// (acquireVkIdentity in progress), skip it this round — it will be revisited
// next tick. Only entries with nil or expired ident are deleted.
func startIdentityJanitor(ctx context.Context, period time.Duration) {
go func() {
ticker := time.NewTicker(period)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
identityStore.mu.Lock()
keys := make([]identityCacheKey, 0, len(identityStore.m))
for k := range identityStore.m {
keys = append(keys, k)
}
identityStore.mu.Unlock()
now := time.Now()
for _, key := range keys {
identityStore.mu.Lock()
entry, ok := identityStore.m[key]
identityStore.mu.Unlock()
if !ok {
continue
}
if !entry.mu.TryLock() {
continue // busy — revisit next tick
}
expired := entry.ident == nil || now.After(entry.ident.expiresAt)
entry.mu.Unlock()
if !expired {
continue
}
identityStore.mu.Lock()
// Re-check under store lock + entry lock to avoid racing a
// concurrent acquire that just refilled the entry.
if cur, stillThere := identityStore.m[key]; stillThere && cur == entry {
if entry.mu.TryLock() {
if entry.ident == nil || now.After(entry.ident.expiresAt) {
delete(identityStore.m, key)
}
entry.mu.Unlock()
}
}
identityStore.mu.Unlock()
}
}
}()
}
func getOrAcquireIdentity(ctx context.Context, link string, streamID int, creds VKCredentials) (*vkIdentity, error) {
key := identityCacheKey{link: link, clientID: creds.ClientID}
@ -1574,7 +1651,8 @@ func getYandexCreds(link string) (string, string, string, error) {
return "", "", "", fmt.Errorf("ws set read deadline: %w", err)
}
for {
const maxWsMessages = 64
for i := 0; i < maxWsMessages; i++ {
_, msg, err := conn.ReadMessage()
if err != nil {
return "", "", "", fmt.Errorf("ws read: %w", err)
@ -1611,6 +1689,7 @@ func getYandexCreds(link string) (string, string, string, error) {
}
}
}
return "", "", "", fmt.Errorf("ws read: no serverHello with TURN urls after %d messages", maxWsMessages)
}
func dtlsFunc(ctx context.Context, conn net.PacketConn, peer *net.UDPAddr) (net.Conn, error) {
@ -2119,10 +2198,8 @@ func oneDtlsConnectionLoop(ctx context.Context, peer *net.UDPAddr, listenConn ne
if time.Now().Unix() < globalCaptchaLockout.Load() && strings.Contains(err.Error(), "context deadline exceeded") {
continue
}
select {
case <-ctx.Done():
if !sleepCtx(ctx, time.Duration(10+rand.Intn(20))*time.Second) {
return
case <-time.After(time.Duration(10+rand.Intn(20)) * time.Second):
}
}
}
@ -2154,10 +2231,8 @@ func oneTurnConnectionLoop(ctx context.Context, turnParams *turnParams, peer *ne
if strings.Contains(err.Error(), "CAPTCHA_WAIT_REQUIRED") {
if !strings.Contains(err.Error(), "global lockout active") {
log.Printf("[STREAM %d] Backing off for 60 seconds to avoid IP ban...", streamID)
select {
case <-ctx.Done():
if !sleepCtx(ctx, 60*time.Second) {
return
case <-time.After(60 * time.Second):
}
} else {
lockoutEnd := globalCaptchaLockout.Load()
@ -2165,10 +2240,8 @@ func oneTurnConnectionLoop(ctx context.Context, turnParams *turnParams, peer *ne
if sleepDuration < 0 {
sleepDuration = 5 * time.Second
}
select {
case <-ctx.Done():
if !sleepCtx(ctx, sleepDuration) {
return
case <-time.After(sleepDuration):
}
}
} else {
@ -2244,6 +2317,8 @@ func main() {
}
udpMode = *udp
startIdentityJanitor(ctx, 5*time.Minute)
var link string
var getCreds getCredsFunc
if *vklink != "" {

Loading…
Cancel
Save