Browse Source

fix: Clean up the code

pull/105/head
alexmac6574 2 months ago
parent
commit
ec5e5b7983
  1. 308
      client/main.go
  2. 10
      server/main.go

308
client/main.go

@ -59,13 +59,23 @@ type directListenConfig struct {
// Global state trackers
var (
globalClientWGAddr atomic.Value
activeLocalPeer atomic.Value
globalCaptchaLockout atomic.Int64
connectedStreams atomic.Int32
globalAppCancel context.CancelFunc
handshakeSem = make(chan struct{}, 3)
isDebug bool
)
type UDPPacket struct {
Data []byte
N int
}
var packetPool = sync.Pool{
New: func() any { return &UDPPacket{Data: make([]byte, 2048)} },
}
func newDirectNet() transport.Net {
return directNet{}
}
@ -491,7 +501,7 @@ func callCaptchaNotRobot(ctx context.Context, sessionToken, hash string, streamI
// endregion
// region VK Credentials & Caching Layer
// region VK Credentials Layer
type VKCredentials struct {
ClientID string
@ -533,11 +543,6 @@ func getCacheID(streamID int) int {
return streamID / streamsPerCache
}
var (
vkRequestMu sync.Mutex
globalLastVkFetchTime time.Time
)
func vkDelayRandom(minMs, maxMs int) {
ms := minMs + rand.Intn(maxMs-minMs+1)
time.Sleep(time.Duration(ms) * time.Millisecond)
@ -628,7 +633,9 @@ func getVkCredsCached(ctx context.Context, link string, streamID int, dialer *dn
expires := time.Until(cache.creds.ExpiresAt)
u, p, a := cache.creds.Username, cache.creds.Password, cache.creds.ServerAddr
cache.mutex.RUnlock()
log.Printf("[STREAM %d] [VK Auth] Using cached credentials (cache=%d, expires in %v)", streamID, cacheID, expires)
if isDebug {
log.Printf("[STREAM %d] [VK Auth] Using cached credentials (cache=%d, expires in %v)", streamID, cacheID, expires)
}
return u, p, a, nil
}
cache.mutex.RUnlock()
@ -636,44 +643,31 @@ func getVkCredsCached(ctx context.Context, link string, streamID int, dialer *dn
cache.mutex.Lock()
defer cache.mutex.Unlock()
// Double-check
// Double-check inside lock
if cache.creds.Link == link && time.Now().Before(cache.creds.ExpiresAt) {
expires := time.Until(cache.creds.ExpiresAt)
log.Printf("[STREAM %d] [VK Auth] Using cached credentials (cache=%d, expires in %v)", streamID, cacheID, expires)
return cache.creds.Username, cache.creds.Password, cache.creds.ServerAddr, nil
}
log.Printf("[STREAM %d] [VK Auth] Cache miss (cache=%d), starting credential fetch...", streamID, cacheID)
select {
case <-ctx.Done():
return "", "", "", ctx.Err()
default:
}
user, pass, addr, err := fetchVkCredsSerialized(ctx, link, streamID, dialer)
if err != nil {
return "", "", "", err
}
cache.creds = TurnCredentials{
Username: user,
Password: pass,
ServerAddr: addr,
ExpiresAt: time.Now().Add(credentialLifetime - cacheSafetyMargin),
Link: link,
}
log.Printf("[STREAM %d] [VK Auth] Success! Credentials cached until %v (cache=%d)", streamID, cache.creds.ExpiresAt, cacheID)
cache.creds = TurnCredentials{Username: user, Password: pass, ServerAddr: addr, ExpiresAt: time.Now().Add(credentialLifetime - cacheSafetyMargin), Link: link}
return user, pass, addr, nil
}
var (
vkRequestMu sync.Mutex
globalLastVkFetchTime time.Time
)
func fetchVkCredsSerialized(ctx context.Context, link string, streamID int, dialer *dnsdialer.Dialer) (string, string, string, error) {
vkRequestMu.Lock()
defer vkRequestMu.Unlock()
// Ensure a minimum cooldown between credential requests to avoid VK rate limits
minInterval := 10*time.Second + time.Duration(rand.Intn(30000))*time.Millisecond
minInterval := 3*time.Second + time.Duration(rand.Intn(3000))*time.Millisecond
elapsed := time.Since(globalLastVkFetchTime)
if !globalLastVkFetchTime.IsZero() && elapsed < minInterval {
@ -730,15 +724,15 @@ func fetchVkCreds(ctx context.Context, link string, streamID int, dialer *dnsdia
func getTokenChain(ctx context.Context, link string, streamID int, creds VKCredentials, dialer *dnsdialer.Dialer, jar tlsclient.CookieJar) (string, string, string, error) {
profile := Profile{
UserAgent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
SecChUa: `"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"`,
UserAgent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36",
SecChUa: `"Not(A:Brand";v="99", "Google Chrome";v="146", "Chromium";v="146"`,
SecChUaMobile: "?0",
SecChUaPlatform: `"Windows"`,
}
client, err := tlsclient.NewHttpClient(tlsclient.NewNoopLogger(),
tlsclient.WithTimeoutSeconds(20),
tlsclient.WithClientProfile(profiles.Chrome_120),
tlsclient.WithClientProfile(profiles.Chrome_146),
tlsclient.WithCookieJar(jar),
tlsclient.WithDialer(getCustomNetDialer()),
)
@ -808,15 +802,15 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede
return "", "", "", fmt.Errorf("missing access_token in response: %v", resp)
}
vkDelayRandom(100, 200)
vkDelayRandom(100, 150)
// Token 1 -> getCallPreview
data = fmt.Sprintf("vk_join_link=https://vk.com/call/join/%s&fields=photo_200&access_token=%s", link, token1)
_, _ = doRequest(data, "https://api.vk.ru/method/calls.getCallPreview?v=5.275&client_id="+creds.ClientID)
vkDelayRandom(500, 1000)
vkDelayRandom(200, 400)
// Token 2 (with 2 auto attempts + 1 manual fallback)
// Token 2
data = fmt.Sprintf("vk_join_link=https://vk.com/call/join/%s&name=%s&access_token=%s", link, escapedName, token1)
urlAddr := fmt.Sprintf("https://api.vk.ru/method/calls.getAnonymousToken?v=5.275&client_id=%s", creds.ClientID)
@ -945,7 +939,7 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede
break
}
vkDelayRandom(100, 200)
vkDelayRandom(100, 150)
// Token 3
sessionData := fmt.Sprintf(`{"version":2,"device_id":"%s","client_version":1.1,"client_type":"SDK_JS"}`, uuid.New())
@ -956,7 +950,7 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede
}
token3 := resp["session_key"].(string)
vkDelayRandom(100, 200)
vkDelayRandom(100, 150)
// Token 4 -> TURN Creds
data = fmt.Sprintf("joinLink=%s&isVideo=false&protocolVersion=5&capabilities=2F7F&anonymToken=%s&method=vchat.joinConversationByLink&format=JSON&application_key=CGMMEJLGDIHBABABA&session_key=%s", link, token2, token3)
@ -974,15 +968,12 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede
clean := strings.Split(urlStr, "?")[0]
address := strings.TrimPrefix(strings.TrimPrefix(clean, "turn:"), "turns:")
vkDelayRandom(4000, 5000)
return user, pass, address, nil
}
// endregion
func getYandexCreds(link string) (string, string, string, error) {
const debug = false
const telemostConfHost = "cloud-api.yandex.ru"
telemostConfPath := fmt.Sprintf("%s%s%s", "/telemost_front/v2/telemost/conferences/https%3A%2F%2Ftelemost.yandex.ru%2Fj%2F", link, "/connection?next_gen_media_platform_allowed=false")
@ -1225,7 +1216,7 @@ func getYandexCreds(link string) (string, string, string, error) {
},
}
if debug {
if isDebug {
b, _ := json.MarshalIndent(req1, "", " ")
log.Printf("Sending HELLO:\n%s", string(b))
}
@ -1243,7 +1234,7 @@ func getYandexCreds(link string) (string, string, string, error) {
if err != nil {
return "", "", "", fmt.Errorf("ws read: %w", err)
}
if debug {
if isDebug {
s := string(msg)
if len(s) > 800 {
s = s[:800] + "...(truncated)"
@ -1310,14 +1301,12 @@ func dtlsFunc(ctx context.Context, conn net.PacketConn, peer *net.UDPAddr) (net.
return dtlsConn, nil
}
func oneDtlsConnection(ctx context.Context, peer *net.UDPAddr, listenConn net.PacketConn, connchan chan<- net.PacketConn, okchan chan<- struct{}, c chan<- error) {
func oneDtlsConnection(ctx context.Context, peer *net.UDPAddr, listenConn net.PacketConn, inboundChan <-chan *UDPPacket, connchan chan<- net.PacketConn, okchan chan<- struct{}, streamID int) error {
time.Sleep(time.Duration(rand.Intn(400)+100) * time.Millisecond)
var err error = nil
defer func() { c <- err }()
dtlsctx, dtlscancel := context.WithCancel(ctx)
defer dtlscancel()
var conn1, conn2 net.PacketConn
conn1, conn2 = connutil.AsyncPacketPipe()
conn1, conn2 := connutil.AsyncPacketPipe()
go func() {
for {
select {
@ -1329,17 +1318,15 @@ func oneDtlsConnection(ctx context.Context, peer *net.UDPAddr, listenConn net.Pa
}()
dtlsConn, err1 := dtlsFunc(dtlsctx, conn1, peer)
if err1 != nil {
err = fmt.Errorf("failed to connect DTLS: %s", err1)
return
return fmt.Errorf("failed to connect DTLS: %s", err1)
}
defer func() {
if closeErr := dtlsConn.Close(); closeErr != nil {
err = fmt.Errorf("failed to close DTLS connection: %s", closeErr)
return
log.Printf("[STREAM %d] failed to close DTLS connection: %s", streamID, closeErr)
}
log.Printf("Closed DTLS connection\n")
log.Printf("[STREAM %d] Closed DTLS connection\n", streamID)
}()
log.Printf("Established DTLS connection!\n")
log.Printf("[STREAM %d] Established DTLS connection!\n", streamID)
if okchan != nil {
go func() {
@ -1351,38 +1338,20 @@ func oneDtlsConnection(ctx context.Context, peer *net.UDPAddr, listenConn net.Pa
}
wg := sync.WaitGroup{}
wg.Add(2)
wg.Add(1)
context.AfterFunc(dtlsctx, func() {
if err := listenConn.SetDeadline(time.Now()); err != nil {
log.Printf("Failed to set listener deadline: %s", err)
}
if err := dtlsConn.SetDeadline(time.Now()); err != nil {
log.Printf("Failed to set DTLS deadline: %s", err)
}
_ = dtlsConn.SetDeadline(time.Now())
})
go func() {
defer wg.Done()
defer dtlscancel()
buf := make([]byte, 1600)
for {
select {
case <-dtlsctx.Done():
return
default:
}
n, addr1, err1 := listenConn.ReadFrom(buf)
if err1 != nil {
log.Printf("Failed: %s", err1)
return
}
globalClientWGAddr.Store(addr1)
_, err1 = dtlsConn.Write(buf[:n])
if err1 != nil {
log.Printf("Failed: %s", err1)
return
case pkt := <-inboundChan:
_, _ = dtlsConn.Write(pkt.Data[:pkt.N])
packetPool.Put(pkt)
}
}
}()
@ -1392,37 +1361,23 @@ func oneDtlsConnection(ctx context.Context, peer *net.UDPAddr, listenConn net.Pa
defer dtlscancel()
buf := make([]byte, 1600)
for {
select {
case <-dtlsctx.Done():
return
default:
}
n, err1 := dtlsConn.Read(buf)
if err1 != nil {
log.Printf("Failed: %s", err1)
return
}
addr1, ok := globalClientWGAddr.Load().(net.Addr)
if !ok {
continue
}
_, err1 = listenConn.WriteTo(buf[:n], addr1)
if err1 != nil {
log.Printf("Failed: %s", err1)
return
// Send back to the active WG client
if peerAddr := activeLocalPeer.Load(); peerAddr != nil {
_, _ = listenConn.WriteTo(buf[:n], peerAddr.(net.Addr))
}
}
}()
wg.Wait()
if err := listenConn.SetDeadline(time.Time{}); err != nil {
log.Printf("Failed to clear listener deadline: %s", err)
}
if err := dtlsConn.SetDeadline(time.Time{}); err != nil {
log.Printf("Failed to clear DTLS deadline: %s", err)
log.Printf("[STREAM %d] Failed to clear DTLS deadline: %s", streamID, err)
}
return nil
}
type connectedUDPConn struct {
@ -1542,6 +1497,9 @@ func oneTurnConnection(ctx context.Context, turnParams *turnParams, peer *net.UD
return
}
// Reset error count on successful allocation
getStreamCache(streamID).errorCount.Store(0)
// Safely track active streams globally
connectedStreams.Add(1)
defer func() {
@ -1551,37 +1509,33 @@ func oneTurnConnection(ctx context.Context, turnParams *turnParams, peer *net.UD
}
}()
// Reset error count on successful allocation
getStreamCache(streamID).errorCount.Store(0)
log.Printf("[STREAM %d] relayed-address=%s", streamID, relayConn.LocalAddr().String())
if isDebug {
log.Printf("[STREAM %d] relayed-address=%s", streamID, relayConn.LocalAddr().String())
}
wg := sync.WaitGroup{}
wg.Add(2)
turnctx, turncancel := context.WithCancel(context.Background())
wg.Add(1)
turnctx, turncancel := context.WithCancel(ctx)
context.AfterFunc(turnctx, func() {
if err := relayConn.SetDeadline(time.Now()); err != nil {
log.Printf("Failed to set relay deadline: %s", err)
}
if err := conn2.SetDeadline(time.Now()); err != nil {
log.Printf("Failed to set upstream deadline: %s", err)
}
// Do not set conn2 deadline (conn2 can sometimes be listenConn if direct mode is used)
})
var internalPipeAddr atomic.Value
go func() {
defer wg.Done()
defer turncancel()
buf := make([]byte, 1600)
for {
select {
case <-turnctx.Done():
if turnctx.Err() != nil {
return
default:
}
n, addr1, err1 := conn2.ReadFrom(buf)
if err1 != nil {
log.Printf("Failed: %s", err1)
return
}
if turnctx.Err() != nil {
return
}
@ -1589,7 +1543,6 @@ func oneTurnConnection(ctx context.Context, turnParams *turnParams, peer *net.UD
_, err1 = relayConn.WriteTo(buf[:n], peer)
if err1 != nil {
log.Printf("Failed: %s", err1)
return
}
}
@ -1600,25 +1553,17 @@ func oneTurnConnection(ctx context.Context, turnParams *turnParams, peer *net.UD
defer turncancel()
buf := make([]byte, 1600)
for {
select {
case <-turnctx.Done():
return
default:
}
n, _, err1 := relayConn.ReadFrom(buf)
if err1 != nil {
log.Printf("Failed: %s", err1)
return
}
addr1, ok := internalPipeAddr.Load().(net.Addr)
if !ok {
log.Printf("Failed: no listener ip")
return
addr1 := internalPipeAddr.Load()
if addr1 == nil {
continue
}
_, err1 = conn2.WriteTo(buf[:n], addr1)
_, err1 = conn2.WriteTo(buf[:n], addr1.(net.Addr))
if err1 != nil {
log.Printf("Failed: %s", err1)
return
}
}
@ -1628,26 +1573,19 @@ func oneTurnConnection(ctx context.Context, turnParams *turnParams, peer *net.UD
if err := relayConn.SetDeadline(time.Time{}); err != nil {
log.Printf("Failed to clear relay deadline: %s", err)
}
if err := conn2.SetDeadline(time.Time{}); err != nil {
log.Printf("Failed to clear upstream deadline: %s", err)
}
}
func oneDtlsConnectionLoop(ctx context.Context, peer *net.UDPAddr, listenConnChan <-chan net.PacketConn, connchan chan<- net.PacketConn, okchan chan<- struct{}) {
func oneDtlsConnectionLoop(ctx context.Context, peer *net.UDPAddr, listenConn net.PacketConn, inboundChan <-chan *UDPPacket, connchan chan<- net.PacketConn, okchan chan<- struct{}, streamID int) {
for {
select {
case <-ctx.Done():
return
case listenConn := <-listenConnChan:
c := make(chan error)
go oneDtlsConnection(ctx, peer, listenConn, connchan, okchan, c)
if err := <-c; err != nil {
// Suppress DTLS handshake timeout logs while a captcha lockout is active
default:
err := oneDtlsConnection(ctx, peer, listenConn, inboundChan, connchan, okchan, streamID)
if err != nil {
if time.Now().Unix() < globalCaptchaLockout.Load() && strings.Contains(err.Error(), "context deadline exceeded") {
continue
}
log.Printf("[DTLS] Handshake failed, retrying in background: %v", err)
select {
case <-ctx.Done():
return
@ -1681,10 +1619,8 @@ func oneTurnConnectionLoop(ctx context.Context, turnParams *turnParams, peer *ne
return
}
if strings.Contains(err.Error(), "CAPTCHA_WAIT_REQUIRED") {
// Only log the backoff message once (the stream that triggered it)
// For subsequently awoken streams: calculate exact remaining sleep duration and sleep silently
if !strings.Contains(err.Error(), "global lockout active") {
log.Printf("[STREAM %d] !!! VK DEMANDS SLIDER CAPTCHA. Backing off for 60 seconds to avoid IP ban...", streamID)
log.Printf("[STREAM %d] Backing off for 60 seconds to avoid IP ban...", streamID)
select {
case <-ctx.Done():
return
@ -1737,6 +1673,7 @@ func main() {
n := flag.Int("n", 0, "connections to TURN (default 10 for VK, 1 for Yandex)")
udp := flag.Bool("udp", false, "connect to TURN with UDP")
direct := flag.Bool("no-dtls", false, "connect without obfuscation. DO NOT USE")
debugFlag := flag.Bool("debug", false, "enable debug logging")
flag.Parse()
if *peerAddr == "" {
log.Panicf("Need peer address!")
@ -1749,6 +1686,8 @@ func main() {
log.Panicf("Need either vk-link or yandex-link!")
}
isDebug = *debugFlag
var link string
var getCreds getCredsFunc
if *vklink != "" {
@ -1789,69 +1728,86 @@ func main() {
getCreds: getCreds,
}
listenConnChan := make(chan net.PacketConn)
listenConn, err := net.ListenPacket("udp", *listen)
if err != nil {
log.Panicf("Failed to listen: %s", err)
}
context.AfterFunc(ctx, func() {
if closeErr := listenConn.Close(); closeErr != nil {
log.Panicf("Failed to close local connection: %s", closeErr)
log.Printf("Failed to close local connection: %s", closeErr)
}
})
numStreams := *n
if numStreams <= 0 {
numStreams = 1
}
// Shared Worker Pool Queue for Aggregation
inboundChan := make(chan *UDPPacket, 2000)
go func() {
for {
select {
case <-ctx.Done():
pkt := packetPool.Get().(*UDPPacket)
nRead, addr, err := listenConn.ReadFrom(pkt.Data)
if err != nil {
return
case listenConnChan <- listenConn:
}
// Save the local WireGuard peer address
current := activeLocalPeer.Load()
if current == nil || current.(net.Addr).String() != addr.String() {
activeLocalPeer.Store(addr)
}
pkt.N = nRead
select {
case inboundChan <- pkt:
default:
// Drop the packet only if the global queue is completely full
packetPool.Put(pkt)
}
}
}()
wg1 := sync.WaitGroup{}
t := time.Tick(200 * time.Millisecond)
if *direct {
for i := 0; i < *n; i++ {
wg1.Add(1)
go func(streamID int) {
defer wg1.Done()
oneTurnConnectionLoop(ctx, params, peer, listenConnChan, t, streamID)
}(i)
}
} else {
okchan := make(chan struct{})
connchan := make(chan net.PacketConn)
log.Panicf("Direct mode not supported with dispatcher")
}
okchan := make(chan struct{})
connchan := make(chan net.PacketConn)
wg1.Add(1)
go func() {
defer wg1.Done()
oneDtlsConnectionLoop(ctx, peer, listenConn, inboundChan, connchan, okchan, 1)
}()
wg1.Add(1)
go func() {
defer wg1.Done()
oneTurnConnectionLoop(ctx, params, peer, connchan, t, 1)
}()
select {
case <-okchan:
case <-ctx.Done():
}
for i := 1; i < numStreams; i++ {
cchan := make(chan net.PacketConn)
wg1.Add(1)
go func() {
go func(streamID int) {
defer wg1.Done()
oneDtlsConnectionLoop(ctx, peer, listenConnChan, connchan, okchan)
}()
oneDtlsConnectionLoop(ctx, peer, listenConn, inboundChan, cchan, nil, streamID)
}(i)
wg1.Add(1)
go func() {
go func(streamID int) {
defer wg1.Done()
oneTurnConnectionLoop(ctx, params, peer, connchan, t, 0)
}()
select {
case <-okchan:
case <-ctx.Done():
}
for i := 0; i < *n-1; i++ {
connchan := make(chan net.PacketConn)
wg1.Add(1)
go func() {
defer wg1.Done()
oneDtlsConnectionLoop(ctx, peer, listenConnChan, connchan, nil)
}()
wg1.Add(1)
go func(streamID int) {
defer wg1.Done()
oneTurnConnectionLoop(ctx, params, peer, connchan, t, streamID)
}(i + 1)
}
oneTurnConnectionLoop(ctx, params, peer, cchan, t, streamID)
}(i)
}
wg1.Wait()

10
server/main.go

@ -17,6 +17,8 @@ import (
"github.com/pion/dtls/v3/pkg/crypto/selfsign"
)
const idleTimeout = 2 * time.Minute
func main() {
listen := flag.String("listen", "0.0.0.0:56000", "listen on ip:port")
connect := flag.String("connect", "", "connect to ip:port")
@ -150,7 +152,7 @@ func main() {
return
default:
}
if err1 := conn.SetReadDeadline(time.Now().Add(time.Minute * 30)); err1 != nil {
if err1 := conn.SetReadDeadline(time.Now().Add(idleTimeout)); err1 != nil {
log.Printf("Failed: %s", err1)
return
}
@ -160,7 +162,7 @@ func main() {
return
}
if err1 := serverConn.SetWriteDeadline(time.Now().Add(time.Minute * 30)); err1 != nil {
if err1 := serverConn.SetWriteDeadline(time.Now().Add(idleTimeout)); err1 != nil {
log.Printf("Failed: %s", err1)
return
}
@ -181,7 +183,7 @@ func main() {
return
default:
}
if err1 := serverConn.SetReadDeadline(time.Now().Add(time.Minute * 30)); err1 != nil {
if err1 := serverConn.SetReadDeadline(time.Now().Add(idleTimeout)); err1 != nil {
log.Printf("Failed: %s", err1)
return
}
@ -191,7 +193,7 @@ func main() {
return
}
if err1 := conn.SetWriteDeadline(time.Now().Add(time.Minute * 30)); err1 != nil {
if err1 := conn.SetWriteDeadline(time.Now().Add(idleTimeout)); err1 != nil {
log.Printf("Failed: %s", err1)
return
}

Loading…
Cancel
Save