Browse Source

feat(turn): distribute streams across TURN URLs and add TCP conn diagnostics

- Pick turn URL by streamID % len(urls) instead of always urlsRaw[0]
- Add countingConn to track bytes written/read for TCP TURN connections
- Add classifyNetErr helper for structured error categorization
- Log TCP dial failures always; verbose logs gated behind isDebug
pull/151/head
samosvalishe 2 months ago
parent
commit
43bb1cad22
  1. 84
      client/main.go

84
client/main.go

@ -11,6 +11,7 @@ import (
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
@ -1075,10 +1076,16 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede
if !ok || len(urlsRaw) == 0 {
return "", "", "", fmt.Errorf("missing or empty urls in turn_server")
}
log.Printf("[STREAM %d] [VK Auth] turn_server urls: %v", streamID, urlsRaw)
urlStr, ok := urlsRaw[0].(string)
if isDebug {
log.Printf("[STREAM %d] [VK Auth] turn_server urls: %v", streamID, urlsRaw)
}
urlIdx := streamID % len(urlsRaw)
urlStr, ok := urlsRaw[urlIdx].(string)
if !ok {
return "", "", "", fmt.Errorf("turn server url is not a string")
return "", "", "", fmt.Errorf("turn server url[%d] is not a string", urlIdx)
}
if isDebug {
log.Printf("[STREAM %d] [VK Auth] picked turn url[%d]: %s", streamID, urlIdx, urlStr)
}
clean := strings.Split(urlStr, "?")[0]
@ -1525,6 +1532,58 @@ func (c *connectedUDPConn) WriteTo(p []byte, _ net.Addr) (int, error) {
return c.Write(p)
}
type countingConn struct {
net.Conn
written atomic.Int64
read atomic.Int64
}
func (c *countingConn) Read(p []byte) (int, error) {
n, err := c.Conn.Read(p)
if n > 0 {
c.read.Add(int64(n))
}
return n, err
}
func (c *countingConn) Write(p []byte) (int, error) {
n, err := c.Conn.Write(p)
if n > 0 {
c.written.Add(int64(n))
}
return n, err
}
func classifyNetErr(err error) string {
if err == nil {
return "nil"
}
if errors.Is(err, context.DeadlineExceeded) {
return "ctx-deadline"
}
if errors.Is(err, io.EOF) {
return "eof"
}
if errors.Is(err, syscall.ECONNRESET) {
return "rst"
}
if errors.Is(err, syscall.ECONNREFUSED) {
return "refused"
}
if errors.Is(err, syscall.EPIPE) {
return "broken-pipe"
}
var ne net.Error
if errors.As(err, &ne) && ne.Timeout() {
return "net-timeout"
}
var oe *net.OpError
if errors.As(err, &oe) {
return "op:" + oe.Op
}
return "other"
}
type turnParams struct {
host string
port string
@ -1575,25 +1634,34 @@ func oneTurnConnection(ctx context.Context, turnParams *turnParams, peer *net.UD
return
}
defer func() {
if err1 = conn.Close(); err1 != nil {
if err1 = conn.Close(); err1 != nil && err == nil {
err = fmt.Errorf("failed to close TURN server connection: %s", err1)
return
}
}()
turnConn = &connectedUDPConn{conn}
} else {
conn, err2 := d.DialContext(ctx1, "tcp", turnServerAddr)
if err2 != nil {
log.Printf("[STREAM %d] [TURN] tcp dial %s failed: class=%s err=%v",
streamID, turnServerAddr, classifyNetErr(err2), err2)
err = fmt.Errorf("failed to connect to TURN server: %s", err2)
return
}
if isDebug {
log.Printf("[STREAM %d] [TURN] tcp established %s -> %s",
streamID, conn.LocalAddr(), conn.RemoteAddr())
}
cc := &countingConn{Conn: conn}
defer func() {
if err1 = conn.Close(); err1 != nil {
if err != nil && isDebug {
log.Printf("[STREAM %d] [TURN] tcp closing after fail: written=%d read=%d",
streamID, cc.written.Load(), cc.read.Load())
}
if err1 = conn.Close(); err1 != nil && err == nil {
err = fmt.Errorf("failed to close TURN server connection: %s", err1)
return
}
}()
turnConn = turn.NewSTUNConn(conn)
turnConn = turn.NewSTUNConn(cc)
}
var addrFamily turn.RequestedAddressFamily
if peer.IP.To4() != nil {

Loading…
Cancel
Save