diff --git a/.github/workflows/issues.yml b/.github/workflows/issues.yml index 2eee1c7..913a7df 100644 --- a/.github/workflows/issues.yml +++ b/.github/workflows/issues.yml @@ -1,7 +1,7 @@ name: "Close Stale Issues" on: schedule: - - cron: "0 0 * * 3" + - cron: "0 0 * * 0" workflow_dispatch: jobs: @@ -20,7 +20,7 @@ jobs:

Спасибо! close-issue-message: "Issue был закрыт из-за отсутствия активности." - days-before-stale: 120 + days-before-stale: 30 days-before-close: 7 operations-per-run: 1000 ascending: true diff --git a/README.md b/README.md index cf0a746..77cdae1 100644 --- a/README.md +++ b/README.md @@ -1,32 +1,74 @@ # Good TURN -Проброс трафика WireGuard/Hysteria через TURN сервера VK звонков или ~~Яндекс телемоста~~. Пакеты шифруются DTLS 1.2, затем параллельными потоками через TCP или UDP отправляются на TURN сервер по протоколу STUN ChannelData. Оттуда по UDP отправляются на ваш сервер, где расшифровываются и передаются в WireGuard. Логин/пароль от TURN генерируются из ссылки на звонок. +Проброс трафика WireGuard/Hysteria/VLESS через TURN сервера VK звонков или ~~Яндекс Телемоста~~, или через DC Яндекс Телемоста. Пакеты шифруются DTLS 1.2, затем параллельными потоками через TCP или UDP отправляются на TURN сервер по протоколу STUN ChannelData. Оттуда по UDP отправляются на ваш сервер, где расшифровываются и передаются в WireGuard. Логин/пароль от TURN генерируются из ссылки на звонок. Только для учебных целей! +## Содержание + +- [Good TURN](#good-turn) + - [Режимы работы](#режимы-работы) + - [Похожие проекты](#похожие-проекты) + - [Server & Cli](#server--cli) + - [Android](#android) + - [iOS](#ios) + - [macOS](#macos) + - [Настройка](#настройка) + - [Сервер](#сервер) + - [Установка демона](#установка-демона) + - [Docker](#docker) + - [Клиент](#клиент) + - [Android](#android-1) + - [iOS](#ios-1) + - [Linux](#linux) + - [macOS](#macos-1) + - [Windows](#windows) + - [Если не работает](#если-не-работает) + - [Яндекс Телемост](#яндекс-телемост) + - [v2ray](#v2ray) + - [VLESS-режим](#vless-режим) + - [Настройка](#настройка-1) + - [Сервер (VPS)](#сервер-vps) + - [Docker](#docker-1) + - [Клиент](#клиент-1) + - [Telemost DataChannel](#telemost-datachannel) + - [Сервер](#сервер-1) + - [Клиент](#клиент-2) + - [VLESS через Telemost DataChannel](#vless-через-telemost-datachannel) + - [Direct mode](#direct-mode) + +## Режимы работы + +| Режим | Клиент | Сервер | Полезная нагрузка | Транспорт | +|-----------------------------------------------|---------------------------------|---------------------------------|------------------------|------------------------------------------------------------------------------| +| VK TURN | `-vk-link ...` | `-connect ...` | UDP-пакеты WireGuard | TURN поверх TCP по умолчанию, либо UDP с `-udp` | +| Telemost TURN _(не работает)_ | `-yandex-link ...` | `-connect ...` | UDP-пакеты WireGuard | TURN поверх TCP по умолчанию, либо UDP с `-udp`; `-n` по умолчанию `1` | +| [VLESS](#vless-режим) | `-vless` | `-vless` | TCP-потоки | TURN поверх TCP по умолчанию, либо UDP с `-udp`, дальше `DTLS -> KCP + smux` | +| [Telemost DataChannel](#telemost-datachannel) | `-telemost-dc -yandex-link ...` | `-telemost-dc -yandex-link ...` | UDP или TCP (`-vless`) | WebRTC DataChannel, без TURN | + ## Похожие проекты > [!WARNING] > Авторы данного репозитория не несут ответственности за другие похожие проекты. -#### Server +### Server & Cli - https://github.com/Urtyom-Alyanov/turn-proxy - реализация на Rust - https://github.com/jaykaiperson/lionheart - аналог для https://stream.wb.ru (статья: https://habr.com/ru/articles/1017410/) - https://github.com/kulikov0/whitelist-bypass - проброс через медиасервер SFU ВК и Яндекс Телемоста - https://github.com/NedgNDG/vk-proxy-auto-installer - автоустановщик VK TURN Proxy (TUI) -#### Android +### Android - https://github.com/MYSOREZ/vk-turn-proxy-android - клиент для андроида - https://github.com/WINGS-N/WINGSV - клиент для андроида с One UI, WireGuard, раздачей VPN с root - https://github.com/kiper292/wireguard-turn-android - клиент для андроида интегрированный в WireGuard - https://github.com/oxsidee/vkpn - клиент для андроида (кроссплатформенный Flutter) -- https://github.com/antongospod/turn-proxy-android - клиент для андроида c Material 3 UI и автоапдейтами (Kotlin) +- https://github.com/samosvalishe/turn-proxy-android - клиент для андроида c Material 3 UI и автоапдейтами (Kotlin) - https://github.com/amurcanov/proxy-turn-vk-android - клиент для андроида с WireGuard -#### iOS +### iOS - https://github.com/nullcstring/turnbridge - клиент для iOS -#### macOS +### macOS - https://github.com/denny4-user/vk-turn-proxy-macos-gui - клиент для macOS @@ -299,15 +341,15 @@ curl -L -o client https://github.com/cacggghp/vk-turn-proxy/releases/latest/down По умолчанию капча теперь проходит так: обычная автопопытка, затем автопопытка через пазл-слайдер POC, и только потом ручной режим. -## Яндекс телемост +## Яндекс Телемост -**UPD. ТЕЛЕМОСТ ЗАКРЫЛИ** +**UPD: Не работает.** В отличие от ВК, сервера яндекса не ограничивают скорость, так что по умолчанию стоит `-n 1`. Увеличение этого числа может привести к временной блокировке по IP из-за переполнения конференции фейковыми участниками. -В режиме `-udp` скорость обычно больше +В режиме `-udp` скорость обычно больше. -Большинство диапазонов IP TURN серверов Яндекса не работают, указывайте вручную через `-turn` +Большинство диапазонов IP TURN серверов Яндекса не работают, указывайте вручную через `-turn`.
@@ -534,8 +576,43 @@ Xray сервер (config.json)
+## Telemost DataChannel + +Для Яндекс Телемоста теперь есть альтернативный режим без TURN: `-telemost-dc`. + +Режим работает как для обычного UDP/WireGuard-сценария, так и для `-vless`. + +### Сервер + +``` +./server -connect 127.0.0.1:51820 -yandex-link https://telemost.yandex.ru/j/... -telemost-dc +``` + +### Клиент + +``` +./client -listen 127.0.0.1:9000 -yandex-link https://telemost.yandex.ru/j/... -telemost-dc +``` + +В этом режиме флаг `-peer` не нужен: клиент и сервер встречаются внутри одной конференции Telemost по одной и той же ссылке. + +### VLESS через Telemost DataChannel + +Сервер: + +``` +./server -connect 127.0.0.1:443 -yandex-link https://telemost.yandex.ru/j/... -telemost-dc -vless +``` + +Клиент: + +``` +./client -listen 127.0.0.1:9000 -yandex-link https://telemost.yandex.ru/j/... -telemost-dc -vless +``` + ## Direct mode -С флагом `-no-dtls` можно отправлять пакеты без обфускации DTLS и подключаться к обычным серверам Wireguard. Может привести к бану от вк/яндекса. +**UPD: Не работает.** +С флагом `-no-dtls` можно отправлять пакеты без обфускации DTLS и подключаться к обычным серверам Wireguard. Может привести к бану от вк/яндекса. diff --git a/client/main.go b/client/main.go index 49c4751..8d5e82d 100644 --- a/client/main.go +++ b/client/main.go @@ -37,6 +37,8 @@ import ( "github.com/bschaatsbergen/dnsdialer" "github.com/cacggghp/vk-turn-proxy/internal/cliutil" + "github.com/cacggghp/vk-turn-proxy/internal/namegen" + "github.com/cacggghp/vk-turn-proxy/internal/telemost" "github.com/cacggghp/vk-turn-proxy/tcputil" "github.com/cbeuw/connutil" "github.com/google/uuid" @@ -94,6 +96,7 @@ type clientOptions struct { udp bool direct bool vlessMode bool + telemostDC bool debug bool manualCaptcha bool } @@ -113,13 +116,16 @@ func newClientFlagSet(program string, output io.Writer) (*flag.FlagSet, *clientO fs.BoolVar(&opts.udp, "udp", false, "connect to TURN with UDP") fs.BoolVar(&opts.direct, "no-dtls", false, "connect without obfuscation. DO NOT USE") fs.BoolVar(&opts.vlessMode, "vless", false, "VLESS mode: forward TCP connections (for VLESS) instead of UDP packets") + fs.BoolVar(&opts.telemostDC, "telemost-dc", false, "use Yandex Telemost DataChannel instead of TURN") + fs.BoolVar(&opts.telemostDC, "telemost-datachannel", false, "use Yandex Telemost DataChannel instead of TURN") fs.BoolVar(&opts.debug, "debug", false, "enable debug logging") fs.BoolVar(&opts.manualCaptcha, "manual-captcha", false, "skip auto captcha solving, use manual mode immediately") fs.Usage = func() { cliutil.Fprintf(fs.Output(), "Usage:\n %s -peer -vk-link [flags]\n %s -peer -yandex-link [flags]\n\n", program, program) cliutil.Fprintln(fs.Output(), "Examples:") cliutil.Fprintf(fs.Output(), " %s -listen 127.0.0.1:9000 -peer 203.0.113.10:56000 -vk-link https://vk.com/call/join/...\n", program) - cliutil.Fprintf(fs.Output(), " %s -udp -turn 5.255.211.241 -peer 203.0.113.10:56000 -yandex-link https://telemost.yandex.ru/j/... -listen 127.0.0.1:9000\n\n", program) + cliutil.Fprintf(fs.Output(), " %s -udp -turn 5.255.211.241 -peer 203.0.113.10:56000 -yandex-link https://telemost.yandex.ru/j/... -listen 127.0.0.1:9000\n", program) + cliutil.Fprintf(fs.Output(), " %s -listen 127.0.0.1:9000 -yandex-link https://telemost.yandex.ru/j/... -telemost-dc\n\n", program) cliutil.Fprintln(fs.Output(), "Flags:") fs.PrintDefaults() } @@ -129,16 +135,36 @@ func newClientFlagSet(program string, output io.Writer) (*flag.FlagSet, *clientO func parseClientOptions(args []string, program string, stdout, stderr io.Writer) (clientOptions, int) { return cliutil.Parse(args, program, stdout, stderr, newClientFlagSet, func(opts *clientOptions) error { - if opts.peerAddr == "" { + if !opts.telemostDC && opts.peerAddr == "" { return fmt.Errorf("-peer is required") } if (opts.vklink == "") == (opts.yalink == "") { return fmt.Errorf("exactly one of -vk-link or -yandex-link is required") } + if opts.telemostDC { + if opts.yalink == "" { + return fmt.Errorf("-telemost-dc requires -yandex-link") + } + } return nil }) } +func runSelectedTelemostDataChannelMode(ctx context.Context, inviteLink, listenAddr string, vlessMode bool) error { + if vlessMode { + return runTelemostDataChannelVLESSMode(ctx, inviteLink, listenAddr) + } + + return runTelemostDataChannelMode(ctx, inviteLink, listenAddr) +} + +func closeOnContextDone(ctx context.Context, closer io.Closer) { + go func() { + <-ctx.Done() + _ = closer.Close() + }() +} + func captchaSolveModeForAttempt(attempt int, manualOnly bool, enableSliderPOC bool) (captchaSolveMode, bool) { if manualOnly { return captchaSolveModeManual, attempt == 0 @@ -1026,7 +1052,7 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede return "", "", "", fmt.Errorf("failed to initialize tls_client: %w", err) } - name := generateName() + name := namegen.Generate() escapedName := neturl.QueryEscape(name) log.Printf("[STREAM %d] [VK Auth] Connecting Identity - Name: %s | User-Agent: %s", streamID, name, profile.UserAgent) @@ -1273,12 +1299,31 @@ func getTokenChain(ctx context.Context, link string, streamID int, creds VKCrede // endregion -func getYandexCreds(link string) (string, string, string, error) { +func getYandexCreds(roomInput string) (string, string, string, error) { + target, err := telemost.ParseRoomTarget(roomInput) + if err != nil { + return "", "", "", err + } + + var errs []string + for _, roomURL := range target.CandidateRoomURLs() { + user, pass, addr, err := getYandexCredsForRoomURL(roomURL) + if err == nil { + return user, pass, addr, nil + } + errs = append(errs, fmt.Sprintf("%s: %v", roomURL, err)) + } + + return "", "", "", fmt.Errorf("failed to get yandex turn credentials for room %s: %s", target.RoomID, strings.Join(errs, "; ")) +} + +func getYandexCredsForRoomURL(roomURL string) (string, string, string, error) { const telemostConfHost = "cloud-api.yandex.ru" - telemostConfPath := fmt.Sprintf("%s%s%s", "/telemost_front/v2/telemost/conferences/https%3A%2F%2Ftelemost.yandex.ru%2Fj%2F", link, "/connection?next_gen_media_platform_allowed=false") + telemostConfPath := fmt.Sprintf("%s%s%s", "/telemost_front/v2/telemost/conferences/", neturl.QueryEscape(roomURL), "/connection?next_gen_media_platform_allowed=false") profile := getRandomProfile() - name := generateName() + name := namegen.Generate() + origin := telemost.WebOriginFromRoomURL(roomURL) type ConferenceResponse struct { URI string `json:"uri"` @@ -1409,8 +1454,8 @@ func getYandexCreds(link string) (string, string, string, error) { applyBrowserProfile(req, profile) req.Header.Set("Content-Type", "application/json") - req.Header.Set("Referer", "https://telemost.yandex.ru/") - req.Header.Set("Origin", "https://telemost.yandex.ru") + req.Header.Set("Referer", origin+"/") + req.Header.Set("Origin", origin) req.Header.Set("Client-Instance-Id", uuid.New().String()) resp, err := client.Do(req) @@ -1441,7 +1486,8 @@ func getYandexCreds(link string) (string, string, string, error) { Wss: result.ClientConfiguration.MediaServerURL, } h := http.Header{} - h.Set("Origin", "https://telemost.yandex.ru") + h.Set("Origin", origin) + h.Set("Referer", origin+"/") h.Set("User-Agent", profile.UserAgent) ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) @@ -2006,15 +2052,23 @@ func main() { log.Fatalf("Exit...\n") }() - peer, err := net.ResolveUDPAddr("udp", opts.peerAddr) - if err != nil { - panic(err) - } - isDebug = opts.debug + telemost.SetDebug(opts.debug) manualCaptcha = opts.manualCaptcha autoCaptchaSliderPOC = !manualCaptcha + if opts.telemostDC { + if err := runSelectedTelemostDataChannelMode(ctx, opts.yalink, opts.listen, opts.vlessMode); err != nil { + log.Fatalf("Telemost DataChannel mode failed: %v", err) + } + return + } + + peer, err := net.ResolveUDPAddr("udp", opts.peerAddr) + if err != nil { + log.Fatalf("invalid peer address %q: %v", opts.peerAddr, err) + } + var link string var getCreds getCredsFunc if opts.vklink != "" { @@ -2034,8 +2088,11 @@ func main() { opts.n = 10 } } else { - parts := strings.Split(opts.yalink, "j/") - link = parts[len(parts)-1] + target, err := telemost.ParseRoomTarget(opts.yalink) + if err != nil { + log.Fatalf("invalid yandex-link: %v", err) + } + link = target.RoomID getCreds = func(ctx context.Context, s string, streamID int) (string, string, string, error) { return getYandexCreds(s) } @@ -2043,8 +2100,10 @@ func main() { opts.n = 1 } } - if idx := strings.IndexAny(link, "/?#"); idx != -1 { - link = link[:idx] + if opts.vklink != "" { + if idx := strings.IndexAny(link, "/?#"); idx != -1 { + link = link[:idx] + } } params := &turnParams{ @@ -2447,7 +2506,11 @@ func createSmuxSession(ctx context.Context, tp *turnParams, peer *net.UDPAddr, i cleanup() return nil, nil, fmt.Errorf("KCP session: %w", err) } - cleanupFns = append(cleanupFns, func() { _ = cleanupKCP() }) + cleanupFns = append(cleanupFns, func() { + if cleanupErr := cleanupKCP(); cleanupErr != nil { + log.Printf("failed to close KCP-over-DTLS transport: %v", cleanupErr) + } + }) log.Printf("KCP session established") // 6. Create smux client session over KCP diff --git a/client/main_test.go b/client/main_test.go index 9eb72cd..910930a 100644 --- a/client/main_test.go +++ b/client/main_test.go @@ -91,6 +91,71 @@ func TestParseClientOptionsParsesValidVKArgs(t *testing.T) { } } +func TestParseClientOptionsParsesTelemostDataChannelArgs(t *testing.T) { + t.Parallel() + + var stdout bytes.Buffer + var stderr bytes.Buffer + + opts, exitCode := parseClientOptions([]string{ + "-yandex-link", "https://telemost.yandex.ru/j/test", + "-listen", "127.0.0.1:9002", + "-telemost-dc", + }, "client", &stdout, &stderr) + if exitCode != cliutil.ContinueExecution { + t.Fatalf("parseClientOptions() exitCode = %d, want %d", exitCode, cliutil.ContinueExecution) + } + if stderr.Len() != 0 { + t.Fatalf("expected no stderr output, got %q", stderr.String()) + } + if !opts.telemostDC { + t.Fatal("telemostDC = false, want true") + } + if opts.peerAddr != "" { + t.Fatalf("peerAddr = %q, want empty for telemost-dc mode", opts.peerAddr) + } +} + +func TestParseClientOptionsRejectsTelemostDataChannelWithoutYandexLink(t *testing.T) { + t.Parallel() + + var stdout bytes.Buffer + var stderr bytes.Buffer + + _, exitCode := parseClientOptions([]string{ + "-vk-link", "https://vk.com/call/join/test", + "-telemost-dc", + }, "client", &stdout, &stderr) + if exitCode != 2 { + t.Fatalf("parseClientOptions() exitCode = %d, want 2", exitCode) + } + if got := stderr.String(); !strings.Contains(got, "-telemost-dc requires -yandex-link") { + t.Fatalf("expected telemost-dc validation error, got %q", got) + } +} + +func TestParseClientOptionsAllowsTelemostDataChannelWithVLESS(t *testing.T) { + t.Parallel() + + var stdout bytes.Buffer + var stderr bytes.Buffer + + opts, exitCode := parseClientOptions([]string{ + "-yandex-link", "https://telemost.yandex.ru/j/test", + "-telemost-dc", + "-vless", + }, "client", &stdout, &stderr) + if exitCode != cliutil.ContinueExecution { + t.Fatalf("parseClientOptions() exitCode = %d, want %d", exitCode, cliutil.ContinueExecution) + } + if stderr.Len() != 0 { + t.Fatalf("expected no stderr output, got %q", stderr.String()) + } + if !opts.telemostDC || !opts.vlessMode { + t.Fatalf("expected telemostDC and vlessMode to be true, got telemostDC=%v vlessMode=%v", opts.telemostDC, opts.vlessMode) + } +} + func TestCaptchaSolveModeForAttempt(t *testing.T) { t.Parallel() diff --git a/client/telemostdc.go b/client/telemostdc.go new file mode 100644 index 0000000..7c83b16 --- /dev/null +++ b/client/telemostdc.go @@ -0,0 +1,64 @@ +package main + +import ( + "context" + "errors" + "log" + "net" + "sync/atomic" + + "github.com/cacggghp/vk-turn-proxy/internal/telemost" +) + +func runTelemostDataChannelMode(ctx context.Context, inviteLink, listenAddr string) error { + listenConn, err := net.ListenPacket("udp", listenAddr) + if err != nil { + return err + } + defer func(listenConn net.PacketConn) { + err := listenConn.Close() + if err != nil { + log.Println(err) + } + }(listenConn) + + var activeLocalPeer atomic.Value + peer, err := telemost.NewConnectedPeer(ctx, inviteLink, func(data []byte) { + addr, ok := activeLocalPeer.Load().(net.Addr) + if !ok || addr == nil { + return + } + if _, writeErr := listenConn.WriteTo(data, addr); writeErr != nil { + log.Printf("Telemost DataChannel: failed to write local packet: %v", writeErr) + } + }, nil) + if err != nil { + return err + } + defer func(peer *telemost.Peer) { + err := peer.Close() + if err != nil { + log.Println(err) + } + }(peer) + + closeOnContextDone(ctx, listenConn) + + log.Printf("Telemost DataChannel mode: listening on %s", listenAddr) + + buf := make([]byte, 2048) + for { + n, addr, err := listenConn.ReadFrom(buf) + if err != nil { + if ctx.Err() != nil || errors.Is(err, net.ErrClosed) { + return nil + } + return err + } + + activeLocalPeer.Store(addr) + if err := peer.Send(buf[:n]); err != nil { + log.Printf("Telemost DataChannel: dropped outbound packet (%d bytes): %v", n, err) + } + } +} diff --git a/client/telemostdc_vless.go b/client/telemostdc_vless.go new file mode 100644 index 0000000..37d782e --- /dev/null +++ b/client/telemostdc_vless.go @@ -0,0 +1,149 @@ +package main + +import ( + "context" + "errors" + "log" + "net" + "sync" + "time" + + "github.com/cacggghp/vk-turn-proxy/internal/dcmux" + "github.com/cacggghp/vk-turn-proxy/internal/telemost" +) + +func runTelemostDataChannelVLESSMode(ctx context.Context, inviteLink, listenAddr string) error { + var ( + connMu sync.Mutex + conns = make(map[uint16]net.Conn) + ) + closeAll := func() { + connMu.Lock() + defer connMu.Unlock() + for sid, conn := range conns { + _ = conn.Close() + delete(conns, sid) + } + } + + var peer *telemost.Peer + clientID := uint32(time.Now().UnixNano()) + mux := dcmux.New(clientID, func(frame []byte) error { + return peer.Send(frame) + }) + peer, err := telemost.NewConnectedPeer(ctx, inviteLink, mux.HandleFrame, func() { + if telemost.DebugEnabled() { + log.Printf("Telemost DataChannel VLESS: peer reconnected, closing active TCP streams") + } + closeAll() + mux.Reset() + }) + if err != nil { + return err + } + defer func(peer *telemost.Peer) { + err := peer.Close() + if err != nil { + log.Println(err) + } + }(peer) + + listener, err := net.Listen("tcp", listenAddr) + if err != nil { + return err + } + defer func(listener net.Listener) { + err := listener.Close() + if err != nil { + log.Println(err) + } + }(listener) + closeOnContextDone(ctx, listener) + + log.Printf("Telemost DataChannel VLESS mode: listening on %s", listenAddr) + + for { + conn, err := listener.Accept() + if err != nil { + if ctx.Err() != nil || errors.Is(err, net.ErrClosed) { + closeAll() + return nil + } + log.Printf("Telemost DataChannel VLESS accept error: %v", err) + continue + } + + sid := mux.OpenStream() + connMu.Lock() + conns[sid] = conn + connMu.Unlock() + + go func(streamID uint16, tcpConn net.Conn) { + defer func() { + connMu.Lock() + delete(conns, streamID) + connMu.Unlock() + if err := mux.CloseStream(streamID); err != nil { + log.Printf("Telemost DataChannel VLESS: failed to close mux stream %d: %v", streamID, err) + } + _ = tcpConn.Close() + mux.CleanupStream(streamID) + }() + + done := make(chan struct{}) + streamClosed := make(chan struct{}) + + go func() { + defer close(done) + buf := make([]byte, 32768) + for { + n, readErr := tcpConn.Read(buf) + if readErr != nil { + return + } + if sendErr := mux.SendData(streamID, buf[:n]); sendErr != nil { + return + } + } + }() + + go func() { + defer close(streamClosed) + for { + dataReady := mux.WaitForData(streamID) + + select { + case <-ctx.Done(): + return + case <-done: + return + case _, ok := <-dataReady: + if !ok { + return + } + } + + for { + data := mux.ReadStream(streamID) + if len(data) == 0 { + break + } + if _, writeErr := tcpConn.Write(data); writeErr != nil { + return + } + } + + if mux.StreamClosed(streamID) { + return + } + } + }() + + select { + case <-ctx.Done(): + case <-done: + case <-streamClosed: + } + }(sid, conn) + } +} diff --git a/go.mod b/go.mod index e6e5929..cbdf656 100644 --- a/go.mod +++ b/go.mod @@ -13,8 +13,9 @@ require ( github.com/pion/logging v0.2.4 github.com/pion/transport/v4 v4.0.1 github.com/pion/turn/v5 v5.0.3 - github.com/xtaci/kcp-go/v5 v5.6.18 - github.com/xtaci/smux v1.5.34 + github.com/pion/webrtc/v4 v4.2.11 + github.com/xtaci/kcp-go/v5 v5.6.72 + github.com/xtaci/smux v1.5.57 ) require ( @@ -27,25 +28,34 @@ require ( github.com/google/go-cmp v0.7.0 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/klauspost/compress v1.18.5 // indirect - github.com/klauspost/cpuid/v2 v2.2.9 // indirect - github.com/klauspost/reedsolomon v1.12.4 // indirect + github.com/klauspost/cpuid/v2 v2.3.0 // indirect + github.com/klauspost/reedsolomon v1.13.3 // indirect github.com/miekg/dns v1.1.72 // indirect + github.com/pion/datachannel v1.6.0 // indirect + github.com/pion/ice/v4 v4.2.2 // indirect + github.com/pion/interceptor v0.1.44 // indirect + github.com/pion/mdns/v2 v2.1.0 // indirect github.com/pion/randutil v0.1.0 // indirect + github.com/pion/rtcp v1.2.16 // indirect + github.com/pion/rtp v1.10.1 // indirect + github.com/pion/sctp v1.9.4 // indirect + github.com/pion/sdp/v3 v3.0.18 // indirect + github.com/pion/srtp/v3 v3.0.10 // indirect github.com/pion/stun/v3 v3.1.2 // indirect + github.com/pion/turn/v4 v4.1.4 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/quic-go/qpack v0.6.0 // indirect github.com/tam7t/hpkp v0.0.0-20160821193359-2b70b4024ed5 // indirect - github.com/templexxx/cpu v0.1.1 // indirect - github.com/templexxx/xorsimd v0.4.3 // indirect github.com/tjfoc/gmsm v1.4.1 // indirect github.com/wlynxg/anet v0.0.5 // indirect - golang.org/x/crypto v0.49.0 // indirect - golang.org/x/mod v0.34.0 // indirect - golang.org/x/net v0.52.0 // indirect + golang.org/x/crypto v0.50.0 // indirect + golang.org/x/mod v0.35.0 // indirect + golang.org/x/net v0.53.0 // indirect golang.org/x/sync v0.20.0 // indirect golang.org/x/sys v0.43.0 // indirect - golang.org/x/text v0.35.0 // indirect - golang.org/x/tools v0.43.0 // indirect + golang.org/x/text v0.36.0 // indirect + golang.org/x/time v0.15.0 // indirect + golang.org/x/tools v0.44.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260406210006-6f92a3bedf2d // indirect google.golang.org/grpc v1.80.0 // indirect ) diff --git a/go.sum b/go.sum index aef2d97..1b0753f 100644 --- a/go.sum +++ b/go.sum @@ -55,22 +55,48 @@ github.com/klauspost/compress v1.18.5 h1:/h1gH5Ce+VWNLSWqPzOVn6XBO+vJbCNGvjoaGBF github.com/klauspost/compress v1.18.5/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ= github.com/klauspost/cpuid/v2 v2.2.9 h1:66ze0taIn2H33fBvCkXuv9BmCwDfafmiIVpKV9kKGuY= github.com/klauspost/cpuid/v2 v2.2.9/go.mod h1:rqkxqrZ1EhYM9G+hXH7YdowN5R5RGN6NK4QwQ3WMXF8= +github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= +github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/klauspost/reedsolomon v1.12.4 h1:5aDr3ZGoJbgu/8+j45KtUJxzYm8k08JGtB9Wx1VQ4OA= github.com/klauspost/reedsolomon v1.12.4/go.mod h1:d3CzOMOt0JXGIFZm1StgkyF14EYr3xneR2rNWo7NcMU= +github.com/klauspost/reedsolomon v1.13.3 h1:01GwnO2xoCSaM0ShP4qwl+FsHg3csFShC6Tu/RS1ji0= +github.com/klauspost/reedsolomon v1.13.3/go.mod h1:yjqqjgMTQkBUHSG97/rm4zipffCNbCiZcB3kTqr++sQ= github.com/miekg/dns v1.1.72 h1:vhmr+TF2A3tuoGNkLDFK9zi36F2LS+hKTRW0Uf8kbzI= github.com/miekg/dns v1.1.72/go.mod h1:+EuEPhdHOsfk6Wk5TT2CzssZdqkmFhf8r+aVyDEToIs= +github.com/pion/datachannel v1.6.0 h1:XecBlj+cvsxhAMZWFfFcPyUaDZtd7IJvrXqlXD/53i0= +github.com/pion/datachannel v1.6.0/go.mod h1:ur+wzYF8mWdC+Mkis5Thosk+u/VOL287apDNEbFpsIk= github.com/pion/dtls/v3 v3.1.2 h1:gqEdOUXLtCGW+afsBLO0LtDD8GnuBBjEy6HRtyofZTc= github.com/pion/dtls/v3 v3.1.2/go.mod h1:Hw/igcX4pdY69z1Hgv5x7wJFrUkdgHwAn/Q/uo7YHRo= +github.com/pion/ice/v4 v4.2.2 h1:dQJzzcgTFHDYyV3BoCfjPeX+JEtr58BWPi4PGyo6Vjg= +github.com/pion/ice/v4 v4.2.2/go.mod h1:2quLV1S5v1tAx3VvAJaH//KGitRXvo4RKlX6D3tnN+c= +github.com/pion/interceptor v0.1.44 h1:sNlZwM8dWXU9JQAkJh8xrarC0Etn8Oolcniukmuy0/I= +github.com/pion/interceptor v0.1.44/go.mod h1:4atVlBkcgXuUP+ykQF0qOCGU2j7pQzX2ofvPRFsY5RY= github.com/pion/logging v0.2.4 h1:tTew+7cmQ+Mc1pTBLKH2puKsOvhm32dROumOZ655zB8= github.com/pion/logging v0.2.4/go.mod h1:DffhXTKYdNZU+KtJ5pyQDjvOAh/GsNSyv1lbkFbe3so= +github.com/pion/mdns/v2 v2.1.0 h1:3IJ9+Xio6tWYjhN6WwuY142P/1jA0D5ERaIqawg/fOY= +github.com/pion/mdns/v2 v2.1.0/go.mod h1:pcez23GdynwcfRU1977qKU0mDxSeucttSHbCSfFOd9A= github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= +github.com/pion/rtcp v1.2.16 h1:fk1B1dNW4hsI78XUCljZJlC4kZOPk67mNRuQ0fcEkSo= +github.com/pion/rtcp v1.2.16/go.mod h1:/as7VKfYbs5NIb4h6muQ35kQF/J0ZVNz2Z3xKoCBYOo= +github.com/pion/rtp v1.10.1 h1:xP1prZcCTUuhO2c83XtxyOHJteISg6o8iPsE2acaMtA= +github.com/pion/rtp v1.10.1/go.mod h1:rF5nS1GqbR7H/TCpKwylzeq6yDM+MM6k+On5EgeThEM= +github.com/pion/sctp v1.9.4 h1:cMxEu0F5tbP4qH07bKf1Zjf4rUih9LIo0qQt424e258= +github.com/pion/sctp v1.9.4/go.mod h1:N20Dq6LY+JvJDAh9VVh1JELngb2rQ8dPgds5yBWiPgw= +github.com/pion/sdp/v3 v3.0.18 h1:l0bAXazKHpepazVdp+tPYnrsy9dfh7ZbT8DxesH5ZnI= +github.com/pion/sdp/v3 v3.0.18/go.mod h1:ZREGo6A9ZygQ9XkqAj5xYCQtQpif0i6Pa81HOiAdqQ8= +github.com/pion/srtp/v3 v3.0.10 h1:tFirkpBb3XccP5VEXLi50GqXhv5SKPxqrdlhDCJlZrQ= +github.com/pion/srtp/v3 v3.0.10/go.mod h1:3mOTIB0cq9qlbn59V4ozvv9ClW/BSEbRp4cY0VtaR7M= github.com/pion/stun/v3 v3.1.2 h1:86IhD8wFn6IDW4b1/0QzoQS+f5PeA8OHHRn8UZW5ErY= github.com/pion/stun/v3 v3.1.2/go.mod h1:H7gDic7nNwlUL05pbs6T1dtaBehh/KjupxfWw3ZI7cA= github.com/pion/transport/v4 v4.0.1 h1:sdROELU6BZ63Ab7FrOLn13M6YdJLY20wldXW2Cu2k8o= github.com/pion/transport/v4 v4.0.1/go.mod h1:nEuEA4AD5lPdcIegQDpVLgNoDGreqM/YqmEx3ovP4jM= +github.com/pion/turn/v4 v4.1.4 h1:EU11yMXKIsK43FhcUnjLlrhE4nboHZq+TXBIi3QpcxQ= +github.com/pion/turn/v4 v4.1.4/go.mod h1:ES1DXVFKnOhuDkqn9hn5VJlSWmZPaRJLyBXoOeO/BmQ= github.com/pion/turn/v5 v5.0.3 h1:I+Nw0fQgdPWF1SXDj0egWDhCkcff7gWiigdQpOK52Ak= github.com/pion/turn/v5 v5.0.3/go.mod h1:fs4SogUh/aRGQzonc4Lx3Jp4EU3j3t0PfNDEd9KcD/w= +github.com/pion/webrtc/v4 v4.2.11 h1:QUX1QZKlNIn4O7U5JxLPGP0sV5RTncZkzu9SPR3jVNU= +github.com/pion/webrtc/v4 v4.2.11/go.mod h1:s/rAiyy77GyRFrZMx+Ls6aua26dIBPudH8/ZHYbIRWY= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -86,16 +112,22 @@ github.com/templexxx/cpu v0.1.1 h1:isxHaxBXpYFWnk2DReuKkigaZyrjs2+9ypIdGP4h+HI= github.com/templexxx/cpu v0.1.1/go.mod h1:w7Tb+7qgcAlIyX4NhLuDKt78AHA5SzPmq0Wj6HiEnnk= github.com/templexxx/xorsimd v0.4.3 h1:9AQTFHd7Bhk3dIT7Al2XeBX5DWOvsUPZCuhyAtNbHjU= github.com/templexxx/xorsimd v0.4.3/go.mod h1:oZQcD6RFDisW2Am58dSAGwwL6rHjbzrlu25VDqfWkQg= +github.com/templexxx/xorsimd v0.4.4 h1:vT7cBwIhJxitPSbJk+LIqOht2JIJOl/77cRxcyRNBFA= +github.com/templexxx/xorsimd v0.4.4/go.mod h1:oZQcD6RFDisW2Am58dSAGwwL6rHjbzrlu25VDqfWkQg= github.com/tjfoc/gmsm v1.4.1 h1:aMe1GlZb+0bLjn+cKTPEvvn9oUEBlJitaZiiBwsbgho= github.com/tjfoc/gmsm v1.4.1/go.mod h1:j4INPkHWMrhJb38G+J6W4Tw0AbuN8Thu3PbdVYhVcTE= github.com/wlynxg/anet v0.0.5 h1:J3VJGi1gvo0JwZ/P1/Yc/8p63SoW98B5dHkYDmpgvvU= github.com/wlynxg/anet v0.0.5/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA= github.com/xtaci/kcp-go/v5 v5.6.18 h1:7oV4mc272pcnn39/13BB11Bx7hJM4ogMIEokJYVWn4g= github.com/xtaci/kcp-go/v5 v5.6.18/go.mod h1:75S1AKYYzNUSXIv30h+jPKJYZUwqpfvLshu63nCNSOM= +github.com/xtaci/kcp-go/v5 v5.6.72 h1:FLaQPalgpufJYQRk0OK+gErEhXGLUPjv6FSRPrFR8Lk= +github.com/xtaci/kcp-go/v5 v5.6.72/go.mod h1:9O3D8WR+cyyUjGiTILYfg17vn72otWuXK2AFfqIe6CM= github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae h1:J0GxkO96kL4WF+AIT3M4mfUVinOCPgf2uUWYFUzN0sM= github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae/go.mod h1:gXtu8J62kEgmN++bm9BVICuT/e8yiLI2KFobd/TRFsE= github.com/xtaci/smux v1.5.34 h1:OUA9JaDFHJDT8ZT3ebwLWPAgEfE6sWo2LaTy3anXqwg= github.com/xtaci/smux v1.5.34/go.mod h1:OMlQbT5vcgl2gb49mFkYo6SMf+zP3rcjcwQz7ZU7IGY= +github.com/xtaci/smux v1.5.57 h1:N72VbGoSYxgcm6mPOYX0QzEZNVD3UI/JlVvAtXF+WrY= +github.com/xtaci/smux v1.5.57/go.mod h1:IGQ9QYrBphmb/4aTnLEcJby0TNr3NV+OslIOMrX825Q= github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= go.uber.org/mock v0.5.2 h1:LbtPTcP8A5k9WPXj54PPPbjcI4Y6lhyOZXn+VS7wNko= @@ -105,12 +137,16 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.49.0 h1:+Ng2ULVvLHnJ/ZFEq4KdcDd/cfjrrjjNSXNzxg0Y4U4= golang.org/x/crypto v0.49.0/go.mod h1:ErX4dUh2UM+CFYiXZRTcMpEcN8b/1gxEuv3nODoYtCA= +golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI= +golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.34.0 h1:xIHgNUUnW6sYkcM5Jleh05DvLOtwc6RitGHbDk4akRI= golang.org/x/mod v0.34.0/go.mod h1:ykgH52iCZe79kzLLMhyCUzhMci+nQj+0XkbXpNYtVjY= +golang.org/x/mod v0.35.0 h1:Ww1D637e6Pg+Zb2KrWfHQUnH2dQRLBQyAtpr/haaJeM= +golang.org/x/mod v0.35.0/go.mod h1:+GwiRhIInF8wPm+4AoT6L0FA1QWAad3OMdTRx4tFYlU= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -120,6 +156,8 @@ golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20211104170005-ce137452f963/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0= golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw= +golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= +golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -140,8 +178,12 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8= golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA= +golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg= +golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164= golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= +golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U= +golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -149,6 +191,8 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.43.0 h1:12BdW9CeB3Z+J/I/wj34VMl8X+fEXBxVR90JeMX5E7s= golang.org/x/tools v0.43.0/go.mod h1:uHkMso649BX2cZK6+RpuIPXS3ho2hZo4FVwfoy1vIk0= +golang.org/x/tools v0.44.0 h1:UP4ajHPIcuMjT1GqzDWRlalUEoY+uzoZKnhOjbIPD2c= +golang.org/x/tools v0.44.0/go.mod h1:KA0AfVErSdxRZIsOVipbv3rQhVXTnlU6UhKxHd1seDI= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= diff --git a/internal/dcmux/mux.go b/internal/dcmux/mux.go new file mode 100644 index 0000000..5b327c7 --- /dev/null +++ b/internal/dcmux/mux.go @@ -0,0 +1,323 @@ +package dcmux + +import ( + "encoding/binary" + "sync" +) + +type Stream struct { + ID uint16 + ClientID uint32 + recvBuf []byte + closed bool + nextSeq uint32 + outOfOrder map[uint32][]byte +} + +type Multiplexer struct { + streams map[uint16]*Stream + nextID uint16 + clientID uint32 + onSend func([]byte) error + mu sync.RWMutex + maxStreams int + maxBufferSize int + activityCh chan struct{} + dataReady map[uint16]chan struct{} + dataReadyMu sync.Mutex + sendSeq map[uint16]uint32 + sendSeqMu sync.Mutex +} + +func New(clientID uint32, onSend func([]byte) error) *Multiplexer { + return &Multiplexer{ + streams: make(map[uint16]*Stream), + nextID: 1, + clientID: clientID, + onSend: onSend, + maxStreams: 10000, + maxBufferSize: 16 * 1024 * 1024, + activityCh: make(chan struct{}, 1), + dataReady: make(map[uint16]chan struct{}), + sendSeq: make(map[uint16]uint32), + } +} + +func (m *Multiplexer) OpenStream() uint16 { + m.mu.Lock() + defer m.mu.Unlock() + + for { + sid := m.nextID + m.nextID++ + if m.nextID == 0 { + m.nextID = 1 + } + + if _, exists := m.streams[sid]; !exists { + m.streams[sid] = &Stream{ + ID: sid, + recvBuf: make([]byte, 0), + nextSeq: 0, + outOfOrder: make(map[uint32][]byte), + } + return sid + } + } +} + +func (m *Multiplexer) SendData(sid uint16, data []byte) error { + m.mu.RLock() + stream, exists := m.streams[sid] + m.mu.RUnlock() + + if !exists || stream.closed { + return nil + } + + const chunkSize = 7168 + for i := 0; i < len(data); i += chunkSize { + end := i + chunkSize + if end > len(data) { + end = len(data) + } + + chunk := data[i:end] + + m.sendSeqMu.Lock() + seq := m.sendSeq[sid] + m.sendSeq[sid]++ + m.sendSeqMu.Unlock() + + frame := make([]byte, 12+len(chunk)) + binary.BigEndian.PutUint32(frame[0:4], m.clientID) + binary.BigEndian.PutUint16(frame[4:6], sid) + binary.BigEndian.PutUint16(frame[6:8], uint16(len(chunk))) + binary.BigEndian.PutUint32(frame[8:12], seq) + copy(frame[12:], chunk) + + if err := m.onSend(frame); err != nil { + return err + } + } + + return nil +} + +func (m *Multiplexer) CloseStream(sid uint16) error { + m.mu.Lock() + defer m.mu.Unlock() + + if stream, exists := m.streams[sid]; exists { + stream.closed = true + } + + m.sendSeqMu.Lock() + delete(m.sendSeq, sid) + m.sendSeqMu.Unlock() + + m.signalDataReady(sid) + + frame := make([]byte, 12) + binary.BigEndian.PutUint32(frame[0:4], m.clientID) + binary.BigEndian.PutUint16(frame[4:6], sid) + binary.BigEndian.PutUint16(frame[6:8], 0) + binary.BigEndian.PutUint32(frame[8:12], 0) + + return m.onSend(frame) +} + +func (m *Multiplexer) HandleFrame(frame []byte) { + if len(frame) < 12 { + return + } + + clientID := binary.BigEndian.Uint32(frame[0:4]) + sid := binary.BigEndian.Uint16(frame[4:6]) + length := binary.BigEndian.Uint16(frame[6:8]) + seq := binary.BigEndian.Uint32(frame[8:12]) + + if length == 0 { + m.mu.Lock() + if stream, exists := m.streams[sid]; exists && stream.ClientID == clientID { + stream.closed = true + } + m.mu.Unlock() + m.signalDataReady(sid) + return + } + + if len(frame) < 12+int(length) { + return + } + + data := frame[12 : 12+length] + + m.mu.Lock() + defer m.mu.Unlock() + + stream, exists := m.streams[sid] + if !exists { + if len(m.streams) >= m.maxStreams { + return + } + stream = &Stream{ + ID: sid, + ClientID: clientID, + recvBuf: make([]byte, 0), + nextSeq: 0, + outOfOrder: make(map[uint32][]byte), + } + m.streams[sid] = stream + } else if stream.ClientID != clientID { + stream.ClientID = clientID + stream.recvBuf = make([]byte, 0) + stream.closed = false + stream.nextSeq = 0 + stream.outOfOrder = make(map[uint32][]byte) + } + + if seq == stream.nextSeq { + if len(stream.recvBuf)+len(data) > m.maxBufferSize { + stream.closed = true + m.signalDataReady(sid) + return + } + stream.recvBuf = append(stream.recvBuf, data...) + stream.nextSeq++ + + for { + nextData, ok := stream.outOfOrder[stream.nextSeq] + if !ok { + break + } + if len(stream.recvBuf)+len(nextData) > m.maxBufferSize { + stream.closed = true + m.signalDataReady(sid) + return + } + stream.recvBuf = append(stream.recvBuf, nextData...) + delete(stream.outOfOrder, stream.nextSeq) + stream.nextSeq++ + } + + m.signalDataReady(sid) + return + } + + if seq > stream.nextSeq && len(stream.outOfOrder) < 100 { + stream.outOfOrder[seq] = append([]byte(nil), data...) + } +} + +func (m *Multiplexer) ReadStream(sid uint16) []byte { + m.mu.Lock() + defer m.mu.Unlock() + + stream, exists := m.streams[sid] + if !exists || len(stream.recvBuf) == 0 { + return nil + } + + data := stream.recvBuf + stream.recvBuf = make([]byte, 0) + return data +} + +func (m *Multiplexer) StreamClosed(sid uint16) bool { + m.mu.RLock() + defer m.mu.RUnlock() + + stream, exists := m.streams[sid] + return !exists || stream.closed +} + +func (m *Multiplexer) GetStreams() []uint16 { + m.mu.RLock() + defer m.mu.RUnlock() + + sids := make([]uint16, 0, len(m.streams)) + for sid := range m.streams { + sids = append(sids, sid) + } + return sids +} + +func (m *Multiplexer) WaitForData(sid uint16) <-chan struct{} { + m.dataReadyMu.Lock() + defer m.dataReadyMu.Unlock() + + if _, ok := m.dataReady[sid]; !ok { + m.dataReady[sid] = make(chan struct{}, 1) + } + return m.dataReady[sid] +} + +func (m *Multiplexer) WaitForActivity() <-chan struct{} { + return m.activityCh +} + +func (m *Multiplexer) CleanupStream(sid uint16) { + m.mu.Lock() + delete(m.streams, sid) + m.mu.Unlock() + + m.sendSeqMu.Lock() + delete(m.sendSeq, sid) + m.sendSeqMu.Unlock() + + m.dataReadyMu.Lock() + defer m.dataReadyMu.Unlock() + + if ch, ok := m.dataReady[sid]; ok { + close(ch) + delete(m.dataReady, sid) + } +} + +func (m *Multiplexer) Reset() { + m.mu.Lock() + for _, stream := range m.streams { + stream.closed = true + } + m.streams = make(map[uint16]*Stream) + m.nextID = 1 + m.mu.Unlock() + + m.sendSeqMu.Lock() + m.sendSeq = make(map[uint16]uint32) + m.sendSeqMu.Unlock() + + m.dataReadyMu.Lock() + for sid, ch := range m.dataReady { + close(ch) + delete(m.dataReady, sid) + } + m.dataReadyMu.Unlock() + + m.signalActivity() +} + +func (m *Multiplexer) signalDataReady(sid uint16) { + m.signalActivity() + + m.dataReadyMu.Lock() + defer m.dataReadyMu.Unlock() + + ch, ok := m.dataReady[sid] + if !ok { + return + } + + select { + case ch <- struct{}{}: + default: + } +} + +func (m *Multiplexer) signalActivity() { + select { + case m.activityCh <- struct{}{}: + default: + } +} diff --git a/internal/dcmux/mux_test.go b/internal/dcmux/mux_test.go new file mode 100644 index 0000000..84170d9 --- /dev/null +++ b/internal/dcmux/mux_test.go @@ -0,0 +1,54 @@ +package dcmux + +import ( + "encoding/binary" + "testing" +) + +func TestCleanupStreamRemovesClosedStream(t *testing.T) { + t.Parallel() + + mux := New(1, func([]byte) error { return nil }) + mux.maxStreams = 1 + + dataReady := mux.WaitForData(7) + mux.HandleFrame(testFrame(1, 7, 0, []byte("hello"))) + if got := mux.GetStreams(); len(got) != 1 || got[0] != 7 { + t.Fatalf("GetStreams() = %v, want [7]", got) + } + select { + case <-dataReady: + default: + t.Fatal("expected dataReady signal after HandleFrame") + } + select { + case <-mux.WaitForActivity(): + default: + t.Fatal("expected activity signal after HandleFrame") + } + + mux.CleanupStream(7) + + if _, ok := <-dataReady; ok { + t.Fatal("dataReady channel stayed open after CleanupStream") + } + + if got := mux.GetStreams(); len(got) != 0 { + t.Fatalf("GetStreams() after CleanupStream = %v, want empty", got) + } + + mux.HandleFrame(testFrame(1, 8, 0, []byte("world"))) + if got := mux.GetStreams(); len(got) != 1 || got[0] != 8 { + t.Fatalf("GetStreams() after reusing capacity = %v, want [8]", got) + } +} + +func testFrame(clientID uint32, sid uint16, seq uint32, payload []byte) []byte { + frame := make([]byte, 12+len(payload)) + binary.BigEndian.PutUint32(frame[0:4], clientID) + binary.BigEndian.PutUint16(frame[4:6], sid) + binary.BigEndian.PutUint16(frame[6:8], uint16(len(payload))) + binary.BigEndian.PutUint32(frame[8:12], seq) + copy(frame[12:], payload) + return frame +} diff --git a/client/namegen.go b/internal/namegen/namegen.go similarity index 90% rename from client/namegen.go rename to internal/namegen/namegen.go index 0593c9d..6a218e1 100644 --- a/client/namegen.go +++ b/internal/namegen/namegen.go @@ -1,4 +1,4 @@ -package main +package namegen import ( "fmt" @@ -162,26 +162,25 @@ func convertToFemaleSurname(surname string) string { return surname } -func generateName() string { - // Decide gender first +func Generate() string { isFemale := rand.Intn(2) == 0 - var fn string + var firstName string if isFemale { - fn = femaleFirstNames[rand.Intn(len(femaleFirstNames))] + firstName = femaleFirstNames[rand.Intn(len(femaleFirstNames))] } else { - fn = maleFirstNames[rand.Intn(len(maleFirstNames))] + firstName = maleFirstNames[rand.Intn(len(maleFirstNames))] } // 70% chance to have a last name if rand.Float32() < 0.3 { - return fn + return firstName } - ln := lastNames[rand.Intn(len(lastNames))] + lastName := lastNames[rand.Intn(len(lastNames))] if isFemale { - ln = convertToFemaleSurname(ln) + lastName = convertToFemaleSurname(lastName) } - return fmt.Sprintf("%s %s", fn, ln) + return fmt.Sprintf("%s %s", firstName, lastName) } diff --git a/internal/telemost/api.go b/internal/telemost/api.go new file mode 100644 index 0000000..ede4f69 --- /dev/null +++ b/internal/telemost/api.go @@ -0,0 +1,193 @@ +package telemost + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "net/http" + "net/url" + "strings" + "time" + + "github.com/google/uuid" +) + +const defaultAPIBaseURL = "https://cloud-api.yandex.ru/telemost_front/v2/telemost" + +var ( + apiBaseURL = defaultAPIBaseURL + apiHTTPClient = &http.Client{Timeout: 15 * time.Second} +) + +type ConnectionInfo struct { + RoomID string `json:"room_id"` + PeerID string `json:"peer_id"` + Credentials string `json:"credentials"` + ClientConfig struct { + MediaServerURL string `json:"media_server_url"` + } `json:"client_configuration"` +} + +type RoomTarget struct { + RoomID string + PreferredHost string +} + +func ParseRoomTarget(input string) (RoomTarget, error) { + input = strings.TrimSpace(input) + if input == "" { + return RoomTarget{}, fmt.Errorf("telemost invite link is required") + } + + target := RoomTarget{} + if strings.HasPrefix(input, "https://") || strings.HasPrefix(input, "http://") { + parsed, err := url.Parse(input) + if err != nil { + return RoomTarget{}, fmt.Errorf("invalid telemost invite link: %w", err) + } + parts := strings.Split(strings.Trim(parsed.Path, "/"), "/") + if len(parts) >= 2 && parts[0] == "j" && parts[1] != "" { + target.RoomID = parts[1] + target.PreferredHost = strings.ToLower(parsed.Hostname()) + } + } + + if target.RoomID == "" { + input = strings.TrimPrefix(input, "https://telemost.yandex.ru/j/") + input = strings.TrimPrefix(input, "http://telemost.yandex.ru/j/") + input = strings.TrimPrefix(input, "https://telemost.yandex.com/j/") + input = strings.TrimPrefix(input, "http://telemost.yandex.com/j/") + input = strings.TrimPrefix(input, "j/") + if idx := strings.IndexAny(input, "/?#"); idx != -1 { + input = input[:idx] + } + target.RoomID = strings.TrimSpace(input) + } + + if target.RoomID == "" { + return RoomTarget{}, fmt.Errorf("invalid telemost invite link") + } + + return target, nil +} + +func (t RoomTarget) CandidateRoomURLs() []string { + hosts := make([]string, 0, 3) + addHost := func(host string) { + host = strings.ToLower(strings.TrimSpace(host)) + if host == "" { + return + } + for _, existing := range hosts { + if existing == host { + return + } + } + hosts = append(hosts, host) + } + + addHost(t.PreferredHost) + addHost("telemost.yandex.ru") + addHost("telemost.yandex.com") + + roomURLs := make([]string, 0, len(hosts)) + for _, host := range hosts { + roomURLs = append(roomURLs, fmt.Sprintf("https://%s/j/%s", host, t.RoomID)) + } + + return roomURLs +} + +func WebOriginFromRoomURL(roomURL string) string { + parsed, err := url.Parse(roomURL) + if err != nil || parsed.Host == "" { + return "https://telemost.yandex.ru" + } + + scheme := parsed.Scheme + if scheme == "" { + scheme = "https" + } + + return scheme + "://" + parsed.Host +} + +func getConnectionInfoForRoomURL(ctx context.Context, roomURL, displayName string) (*ConnectionInfo, error) { + u := fmt.Sprintf("%s/conferences/%s/connection", apiBaseURL, url.QueryEscape(roomURL)) + origin := WebOriginFromRoomURL(roomURL) + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil) + if err != nil { + return nil, err + } + + q := req.URL.Query() + q.Add("next_gen_media_platform_allowed", "true") + q.Add("display_name", displayName) + q.Add("waiting_room_supported", "true") + req.URL.RawQuery = q.Encode() + + req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0") + req.Header.Set("Accept", "*/*") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Client-Instance-Id", uuid.New().String()) + req.Header.Set("X-Telemost-Client-Version", "187.1.0") + req.Header.Set("Idempotency-Key", uuid.New().String()) + req.Header.Set("Origin", origin) + req.Header.Set("Referer", origin+"/") + + resp, err := apiHTTPClient.Do(req) + if err != nil { + if resp != nil && resp.Body != nil { + if closeErr := resp.Body.Close(); closeErr != nil { + log.Println(closeErr) + } + } + return nil, err + } + defer func(Body io.ReadCloser) { + err := Body.Close() + if err != nil { + log.Println(err) + } + }(resp.Body) + + if resp.StatusCode != http.StatusOK { + body, readErr := io.ReadAll(resp.Body) + if readErr != nil { + return nil, fmt.Errorf("telemost connection API returned %s and response body read failed: %w", resp.Status, readErr) + } + return nil, fmt.Errorf("telemost connection API returned %s: %s", resp.Status, strings.TrimSpace(string(body))) + } + + var info ConnectionInfo + if err := json.NewDecoder(resp.Body).Decode(&info); err != nil { + return nil, err + } + + return &info, nil +} + +func GetConnectionInfo(ctx context.Context, roomInput, displayName string) (*ConnectionInfo, string, error) { + target, err := ParseRoomTarget(roomInput) + if err != nil { + return nil, "", err + } + + var errs []string + for _, roomURL := range target.CandidateRoomURLs() { + info, err := getConnectionInfoForRoomURL(ctx, roomURL, displayName) + if err == nil { + return info, roomURL, nil + } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return nil, "", err + } + errs = append(errs, fmt.Sprintf("%s: %v", roomURL, err)) + } + + return nil, "", fmt.Errorf("telemost connection failed for room %s: %s", target.RoomID, strings.Join(errs, "; ")) +} diff --git a/internal/telemost/api_test.go b/internal/telemost/api_test.go new file mode 100644 index 0000000..68d6483 --- /dev/null +++ b/internal/telemost/api_test.go @@ -0,0 +1,43 @@ +package telemost + +import ( + "context" + "errors" + "net/http" + "testing" + "time" +) + +func TestGetConnectionInfoHonorsContextCancellation(t *testing.T) { + previousBaseURL := apiBaseURL + previousClient := apiHTTPClient + apiBaseURL = "https://telemost.test" + apiHTTPClient = &http.Client{ + Transport: roundTripperFunc(func(req *http.Request) (*http.Response, error) { + <-req.Context().Done() + return nil, req.Context().Err() + }), + } + t.Cleanup(func() { + apiBaseURL = previousBaseURL + apiHTTPClient = previousClient + }) + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + start := time.Now() + _, _, err := GetConnectionInfo(ctx, "https://telemost.yandex.ru/j/test-room", "tester") + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("GetConnectionInfo() error = %v, want context deadline exceeded", err) + } + if elapsed := time.Since(start); elapsed > time.Second { + t.Fatalf("GetConnectionInfo() took too long to abort: %v", elapsed) + } +} + +type roundTripperFunc func(*http.Request) (*http.Response, error) + +func (f roundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return f(req) +} diff --git a/internal/telemost/peer.go b/internal/telemost/peer.go new file mode 100644 index 0000000..1a2b1d1 --- /dev/null +++ b/internal/telemost/peer.go @@ -0,0 +1,758 @@ +package telemost + +import ( + "context" + "fmt" + "log" + "net/http" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/cacggghp/vk-turn-proxy/internal/namegen" + "github.com/google/uuid" + "github.com/gorilla/websocket" + "github.com/pion/webrtc/v4" +) + +const ( + dataChannelLabel = "vk-turn-proxy" +) + +var debugLogging atomic.Bool + +type Peer struct { + roomURL string + name string + onData func([]byte) + + sessionMu sync.RWMutex + session *peerSession + + reconnectCbMu sync.RWMutex + reconnectCb func() + wsMu sync.Mutex + sendMu sync.Mutex + reconnectCh chan struct{} + closeCh chan struct{} + closed atomic.Bool +} + +type peerSession struct { + conn *ConnectionInfo + ws *websocket.Conn + pcSub *webrtc.PeerConnection + pcPub *webrtc.PeerConnection + dc *webrtc.DataChannel + connected atomic.Bool +} + +func NewPeer(roomURL, name string, onData func([]byte)) *Peer { + return &Peer{ + roomURL: roomURL, + name: name, + onData: onData, + reconnectCh: make(chan struct{}, 1), + closeCh: make(chan struct{}), + } +} + +// NewConnectedPeer parses a Telemost invite link, creates a peer, connects it, +// and starts the background reconnect watcher. +func NewConnectedPeer(ctx context.Context, inviteLink string, onData func([]byte), onReconnect func()) (*Peer, error) { + target, err := ParseRoomTarget(inviteLink) + if err != nil { + return nil, err + } + + peer := NewPeer(target.RoomID, namegen.Generate(), onData) + if onReconnect != nil { + peer.SetReconnectCallback(onReconnect) + } + if err := peer.Connect(ctx); err != nil { + if closeErr := peer.Close(); closeErr != nil { + log.Printf("Telemost DataChannel peer cleanup failed after connect error: %v", closeErr) + } + return nil, err + } + + go peer.WatchConnection(ctx) + + return peer, nil +} + +func SetDebug(enabled bool) { + debugLogging.Store(enabled) +} + +func DebugEnabled() bool { + return debugLogging.Load() +} + +func debugf(format string, args ...any) { + if DebugEnabled() { + log.Printf(format, args...) + } +} + +func isTransportDataChannelLabel(label string) bool { + return label == dataChannelLabel +} + +func (p *Peer) Connect(ctx context.Context) error { + return p.connectOnce(ctx) +} + +func (p *Peer) WatchConnection(ctx context.Context) { + const ( + maxReconnects = 10 + reconnectWindow = 5 * time.Minute + ) + + var lastReconnect time.Time + reconnectCount := 0 + + for { + select { + case <-ctx.Done(): + return + case <-p.closeCh: + return + case <-p.reconnectCh: + } + + now := time.Now() + if now.Sub(lastReconnect) > reconnectWindow { + reconnectCount = 0 + } + if reconnectCount >= maxReconnects { + log.Printf("Telemost DataChannel: max reconnect attempts (%d) reached", maxReconnects) + return + } + reconnectCount++ + lastReconnect = now + + backoff := time.Duration(reconnectCount) * 2 * time.Second + if backoff > 30*time.Second { + backoff = 30 * time.Second + } + + for { + if ctx.Err() != nil || p.closed.Load() { + return + } + + debugf("Telemost DataChannel: reconnecting in %v", backoff) + select { + case <-ctx.Done(): + return + case <-p.closeCh: + return + case <-time.After(backoff): + } + + if err := p.reconnect(ctx); err != nil { + debugf("Telemost DataChannel: reconnect failed: %v", err) + continue + } + + debugf("Telemost DataChannel: reconnected") + break + } + } +} + +func (p *Peer) Send(data []byte) error { + p.sessionMu.RLock() + session := p.session + p.sessionMu.RUnlock() + + if session == nil || session.dc == nil || session.dc.ReadyState() != webrtc.DataChannelStateOpen { + return fmt.Errorf("datachannel not ready") + } + + p.sendMu.Lock() + defer p.sendMu.Unlock() + + start := time.Now() + for session.dc.BufferedAmount() > 256*1024 { + if time.Since(start) > 2*time.Second { + return fmt.Errorf("datachannel buffer is full") + } + if session.dc.ReadyState() != webrtc.DataChannelStateOpen { + return fmt.Errorf("datachannel not ready") + } + time.Sleep(10 * time.Millisecond) + } + + return session.dc.Send(data) +} + +func (p *Peer) Close() error { + if p.closed.Swap(true) { + return nil + } + + select { + case <-p.closeCh: + default: + close(p.closeCh) + } + + p.cleanupCurrentSession() + return nil +} + +func (p *Peer) SetReconnectCallback(cb func()) { + p.reconnectCbMu.Lock() + p.reconnectCb = cb + p.reconnectCbMu.Unlock() +} + +func (p *Peer) reconnect(ctx context.Context) error { + p.cleanupCurrentSession() + if err := p.connectOnce(ctx); err != nil { + return err + } + + p.reconnectCbMu.RLock() + cb := p.reconnectCb + p.reconnectCbMu.RUnlock() + if cb != nil { + cb() + } + + return nil +} + +func (p *Peer) connectOnce(ctx context.Context) error { + conn, roomURL, err := GetConnectionInfo(ctx, p.roomURL, p.name) + if err != nil { + return err + } + origin := WebOriginFromRoomURL(roomURL) + + config := webrtc.Configuration{ + ICEServers: []webrtc.ICEServer{ + {URLs: []string{"stun:stun.rtc.yandex.net:3478"}}, + }, + SDPSemantics: webrtc.SDPSemanticsUnifiedPlan, + } + + api := webrtc.NewAPI() + + pcSub, err := api.NewPeerConnection(config) + if err != nil { + return err + } + pcPub, err := api.NewPeerConnection(config) + if err != nil { + _ = pcSub.Close() + return err + } + + session := &peerSession{ + conn: conn, + pcSub: pcSub, + pcPub: pcPub, + } + + pcSub.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { + debugf("Telemost subscriber state: %s", state.String()) + if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateDisconnected { + p.signalReconnectIfCurrent(session, "subscriber state "+state.String()) + } + }) + pcPub.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { + debugf("Telemost publisher state: %s", state.String()) + if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateDisconnected { + p.signalReconnectIfCurrent(session, "publisher state "+state.String()) + } + }) + + dc, err := pcPub.CreateDataChannel(dataChannelLabel, nil) + if err != nil { + p.cleanupSession(session) + return err + } + session.dc = dc + + dcReady := make(chan struct{}, 1) + dc.OnOpen(func() { + if session.connected.CompareAndSwap(false, true) { + log.Printf("Telemost DataChannel connected") + } + select { + case dcReady <- struct{}{}: + default: + } + }) + dc.OnClose(func() { + p.signalReconnectIfCurrent(session, "publisher datachannel closed") + }) + dc.OnMessage(func(msg webrtc.DataChannelMessage) { + if p.onData != nil && len(msg.Data) > 0 { + p.onData(msg.Data) + } + }) + + pcSub.OnDataChannel(func(remoteDC *webrtc.DataChannel) { + if !isTransportDataChannelLabel(remoteDC.Label()) { + debugf("Telemost remote datachannel ignored: %s", remoteDC.Label()) + return + } + debugf("Telemost remote datachannel opened: %s", remoteDC.Label()) + remoteDC.OnClose(func() { + p.signalReconnectIfCurrent(session, "remote datachannel "+remoteDC.Label()+" closed") + }) + remoteDC.OnMessage(func(msg webrtc.DataChannelMessage) { + if p.onData != nil && len(msg.Data) > 0 { + p.onData(msg.Data) + } + }) + }) + + headers := http.Header{} + headers.Set("Origin", origin) + headers.Set("Referer", origin+"/") + + ws, resp, err := websocket.DefaultDialer.Dial(conn.ClientConfig.MediaServerURL, headers) + if err != nil { + if resp != nil && resp.Body != nil { + if closeErr := resp.Body.Close(); closeErr != nil { + debugf("Telemost websocket response close failed: %v", closeErr) + } + } + p.cleanupSession(session) + return err + } + session.ws = ws + + ws.SetPongHandler(func(string) error { + return ws.SetReadDeadline(time.Now().Add(60 * time.Second)) + }) + if err := ws.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil { + p.cleanupSession(session) + return err + } + + p.sessionMu.Lock() + p.session = session + p.sessionMu.Unlock() + + p.setupICEHandlers(session) + + go p.keepAlive(session) + go p.handleSignaling(session) + + if err := p.sendHello(session); err != nil { + p.cleanupCurrentSession() + return err + } + + select { + case <-ctx.Done(): + p.cleanupCurrentSession() + return ctx.Err() + case <-p.closeCh: + p.cleanupCurrentSession() + return context.Canceled + case <-time.After(15 * time.Second): + p.cleanupCurrentSession() + return fmt.Errorf("datachannel timeout") + case <-dcReady: + return nil + } +} + +func (p *Peer) keepAlive(session *peerSession) { + wsPingTicker := time.NewTicker(30 * time.Second) + defer wsPingTicker.Stop() + appPingTicker := time.NewTicker(5 * time.Second) + defer appPingTicker.Stop() + + for { + select { + case <-p.closeCh: + return + case <-wsPingTicker.C: + if err := p.writeControl(session, websocket.PingMessage, []byte{}); err != nil { + p.signalReconnectIfCurrent(session, "websocket ping failed") + return + } + case <-appPingTicker.C: + if err := p.writeJSON(session, map[string]interface{}{ + "uid": uuid.New().String(), + "ping": map[string]interface{}{}, + }); err != nil { + p.signalReconnectIfCurrent(session, "application ping failed") + return + } + } + } +} + +func (p *Peer) handleSignaling(session *peerSession) { + pubSent := false + + for { + var msg map[string]interface{} + if err := session.ws.ReadJSON(&msg); err != nil { + if p.isCurrentSession(session) && !p.closed.Load() { + debugf("Telemost signaling read error: %v", err) + p.signalReconnectIfCurrent(session, "signaling read failed") + } + return + } + if err := session.ws.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil { + if p.isCurrentSession(session) && !p.closed.Load() { + debugf("Telemost signaling read deadline update failed: %v", err) + p.signalReconnectIfCurrent(session, "signaling deadline update failed") + } + return + } + + uid := "" + if rawUID, ok := msg["uid"].(string); ok { + uid = rawUID + } + + sendAck := func() { + if err := p.sendAck(session, uid); err != nil { + debugf("Telemost signaling ack failed: %v", err) + } + } + + if _, ok := msg["serverHello"]; ok { + sendAck() + } + if _, ok := msg["updateDescription"]; ok { + sendAck() + } + if _, ok := msg["vadActivity"]; ok { + sendAck() + } + if _, ok := msg["ping"]; ok { + if err := p.sendPong(session, uid); err != nil { + debugf("Telemost signaling pong failed: %v", err) + } + continue + } + if _, ok := msg["pong"]; ok { + sendAck() + continue + } + + if offer, ok := msg["subscriberSdpOffer"].(map[string]interface{}); ok && !pubSent { + sdp, ok := offer["sdp"].(string) + if !ok || sdp == "" { + debugf("Telemost subscriber offer missing SDP") + continue + } + + pcSeq, ok := offer["pcSeq"].(float64) + if !ok { + debugf("Telemost subscriber offer missing pcSeq") + continue + } + + if err := session.pcSub.SetRemoteDescription(webrtc.SessionDescription{ + Type: webrtc.SDPTypeOffer, + SDP: sdp, + }); err != nil { + debugf("Telemost subscriber SetRemoteDescription failed: %v", err) + continue + } + + answer, err := session.pcSub.CreateAnswer(nil) + if err != nil { + debugf("Telemost subscriber CreateAnswer failed: %v", err) + continue + } + if err := session.pcSub.SetLocalDescription(answer); err != nil { + debugf("Telemost subscriber SetLocalDescription failed: %v", err) + continue + } + if err := p.writeJSON(session, map[string]interface{}{ + "uid": uuid.New().String(), + "subscriberSdpAnswer": map[string]interface{}{ + "pcSeq": int(pcSeq), + "sdp": answer.SDP, + }, + }); err != nil { + debugf("Telemost subscriber answer send failed: %v", err) + continue + } + + sendAck() + time.Sleep(300 * time.Millisecond) + + pubOffer, err := session.pcPub.CreateOffer(nil) + if err != nil { + debugf("Telemost publisher CreateOffer failed: %v", err) + continue + } + if err := session.pcPub.SetLocalDescription(pubOffer); err != nil { + debugf("Telemost publisher SetLocalDescription failed: %v", err) + continue + } + if err := p.writeJSON(session, map[string]interface{}{ + "uid": uuid.New().String(), + "publisherSdpOffer": map[string]interface{}{ + "pcSeq": 1, + "sdp": pubOffer.SDP, + }, + }); err != nil { + debugf("Telemost publisher offer send failed: %v", err) + continue + } + + pubSent = true + continue + } + + if answer, ok := msg["publisherSdpAnswer"].(map[string]interface{}); ok { + sdp, ok := answer["sdp"].(string) + if !ok || sdp == "" { + debugf("Telemost publisher answer missing SDP") + continue + } + if err := session.pcPub.SetRemoteDescription(webrtc.SessionDescription{ + Type: webrtc.SDPTypeAnswer, + SDP: sdp, + }); err != nil { + debugf("Telemost publisher SetRemoteDescription failed: %v", err) + } + sendAck() + continue + } + + if cand, ok := msg["webrtcIceCandidate"].(map[string]interface{}); ok { + p.handleICE(session, cand) + } + } +} + +func (p *Peer) setupICEHandlers(session *peerSession) { + session.pcSub.OnICECandidate(func(c *webrtc.ICECandidate) { + if c == nil { + return + } + init := c.ToJSON() + if err := p.writeJSON(session, map[string]interface{}{ + "uid": uuid.New().String(), + "webrtcIceCandidate": map[string]interface{}{ + "candidate": init.Candidate, + "sdpMid": init.SDPMid, + "sdpMlineIndex": init.SDPMLineIndex, + "target": "SUBSCRIBER", + "pcSeq": 1, + }, + }); err != nil && p.isCurrentSession(session) && !p.closed.Load() { + debugf("Telemost subscriber ICE send failed: %v", err) + } + }) + + session.pcPub.OnICECandidate(func(c *webrtc.ICECandidate) { + if c == nil { + return + } + init := c.ToJSON() + if err := p.writeJSON(session, map[string]interface{}{ + "uid": uuid.New().String(), + "webrtcIceCandidate": map[string]interface{}{ + "candidate": init.Candidate, + "sdpMid": init.SDPMid, + "sdpMlineIndex": init.SDPMLineIndex, + "target": "PUBLISHER", + "pcSeq": 1, + }, + }); err != nil && p.isCurrentSession(session) && !p.closed.Load() { + debugf("Telemost publisher ICE send failed: %v", err) + } + }) +} + +func (p *Peer) handleICE(session *peerSession, cand map[string]interface{}) { + candidate, ok := cand["candidate"].(string) + if !ok || candidate == "" || len(strings.Fields(candidate)) < 8 { + return + } + + target, ok := cand["target"].(string) + if !ok || target == "" { + return + } + + sdpMid := "" + if rawSDPMid, ok := cand["sdpMid"].(string); ok { + sdpMid = rawSDPMid + } + sdpMLineIndex, ok := cand["sdpMlineIndex"].(float64) + if !ok { + return + } + + index := uint16(sdpMLineIndex) + init := webrtc.ICECandidateInit{ + Candidate: candidate, + SDPMid: &sdpMid, + SDPMLineIndex: &index, + } + + switch target { + case "SUBSCRIBER": + if err := session.pcSub.AddICECandidate(init); err != nil { + debugf("Telemost subscriber ICE apply failed: %v", err) + } + case "PUBLISHER": + if err := session.pcPub.AddICECandidate(init); err != nil { + debugf("Telemost publisher ICE apply failed: %v", err) + } + } +} + +func (p *Peer) sendHello(session *peerSession) error { + hello := map[string]interface{}{ + "uid": uuid.New().String(), + "hello": map[string]interface{}{ + "participantMeta": map[string]interface{}{ + "name": p.name, + "role": "SPEAKER", + "sendAudio": false, + "sendVideo": false, + }, + "participantAttributes": map[string]interface{}{ + "name": p.name, + "role": "SPEAKER", + }, + "sendAudio": false, + "sendVideo": false, + "sendSharing": false, + "participantId": session.conn.PeerID, + "roomId": session.conn.RoomID, + "serviceName": "telemost", + "credentials": session.conn.Credentials, + "capabilitiesOffer": map[string]interface{}{ + "offerAnswerMode": []string{"SEPARATE"}, + "initialSubscriberOffer": []string{"ON_HELLO"}, + "slotsMode": []string{"FROM_CONTROLLER"}, + "simulcastMode": []string{"DISABLED"}, + "selfVadStatus": []string{"FROM_SERVER"}, + "dataChannelSharing": []string{"TO_RTP"}, + }, + "sdkInfo": map[string]interface{}{ + "implementation": "go", + "version": "1.0.0", + "userAgent": "vk-turn-proxy-" + p.name, + }, + "sdkInitializationId": uuid.New().String(), + "disablePublisher": false, + "disableSubscriber": false, + }, + } + + return p.writeJSON(session, hello) +} + +func (p *Peer) sendAck(session *peerSession, uid string) error { + return p.writeJSON(session, map[string]interface{}{ + "uid": uid, + "ack": map[string]interface{}{ + "status": map[string]interface{}{ + "code": "OK", + }, + }, + }) +} + +func (p *Peer) sendPong(session *peerSession, uid string) error { + return p.writeJSON(session, map[string]interface{}{ + "uid": uid, + "pong": map[string]interface{}{}, + }) +} + +func (p *Peer) writeJSON(session *peerSession, payload any) error { + p.wsMu.Lock() + defer p.wsMu.Unlock() + + if session.ws == nil { + return fmt.Errorf("websocket is closed") + } + + return session.ws.WriteJSON(payload) +} + +func (p *Peer) writeControl(session *peerSession, messageType int, data []byte) error { + p.wsMu.Lock() + defer p.wsMu.Unlock() + + if session.ws == nil { + return fmt.Errorf("websocket is closed") + } + + return session.ws.WriteControl(messageType, data, time.Now().Add(10*time.Second)) +} + +func (p *Peer) cleanupCurrentSession() { + p.sessionMu.Lock() + session := p.session + p.session = nil + p.sessionMu.Unlock() + + p.cleanupSession(session) +} + +func (p *Peer) cleanupSession(session *peerSession) { + if session == nil { + return + } + + if session.connected.Swap(false) { + log.Printf("Telemost DataChannel closed") + } + + if session.dc != nil { + _ = session.dc.Close() + } + if session.pcPub != nil { + _ = session.pcPub.Close() + } + if session.pcSub != nil { + _ = session.pcSub.Close() + } + if session.ws != nil { + p.wsMu.Lock() + if err := session.ws.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(time.Second)); err != nil { + debugf("Telemost websocket close control failed: %v", err) + } + _ = session.ws.Close() + p.wsMu.Unlock() + } +} + +func (p *Peer) isCurrentSession(session *peerSession) bool { + p.sessionMu.RLock() + defer p.sessionMu.RUnlock() + return p.session == session +} + +func (p *Peer) signalReconnectIfCurrent(session *peerSession, reason string) { + if p.closed.Load() || !p.isCurrentSession(session) { + return + } + + if session.connected.Swap(false) { + log.Printf("Telemost DataChannel closed") + } + if reason != "" { + debugf("Telemost DataChannel disconnect reason: %s", reason) + } + + select { + case p.reconnectCh <- struct{}{}: + default: + } +} diff --git a/server/main.go b/server/main.go index e02c3a4..8f08bf1 100644 --- a/server/main.go +++ b/server/main.go @@ -16,6 +16,7 @@ import ( "time" "github.com/cacggghp/vk-turn-proxy/internal/cliutil" + "github.com/cacggghp/vk-turn-proxy/internal/telemost" "github.com/cacggghp/vk-turn-proxy/tcputil" "github.com/pion/dtls/v3" "github.com/pion/dtls/v3/pkg/crypto/selfsign" @@ -23,9 +24,12 @@ import ( ) type serverOptions struct { - listen string - connect string - vlessMode bool + listen string + connect string + yalink string + vlessMode bool + telemostDC bool + debug bool } func newServerFlagSet(program string, output io.Writer) (*flag.FlagSet, *serverOptions) { @@ -35,12 +39,17 @@ func newServerFlagSet(program string, output io.Writer) (*flag.FlagSet, *serverO opts := &serverOptions{} fs.StringVar(&opts.listen, "listen", "0.0.0.0:56000", "listen on ip:port") fs.StringVar(&opts.connect, "connect", "", "connect to ip:port") + fs.StringVar(&opts.yalink, "yandex-link", "", "Yandex Telemost invite link \"https://telemost.yandex.ru/j/...\"") fs.BoolVar(&opts.vlessMode, "vless", false, "VLESS mode: forward TCP connections (for VLESS) instead of UDP packets") + fs.BoolVar(&opts.telemostDC, "telemost-dc", false, "use Yandex Telemost DataChannel instead of DTLS listener") + fs.BoolVar(&opts.telemostDC, "telemost-datachannel", false, "use Yandex Telemost DataChannel instead of DTLS listener") + fs.BoolVar(&opts.debug, "debug", false, "enable debug logging") fs.Usage = func() { cliutil.Fprintf(fs.Output(), "Usage:\n %s -connect [flags]\n\n", program) cliutil.Fprintln(fs.Output(), "Examples:") cliutil.Fprintf(fs.Output(), " %s -connect 127.0.0.1:51820\n", program) - cliutil.Fprintf(fs.Output(), " %s -listen 0.0.0.0:56000 -connect 127.0.0.1:51820 -vless\n\n", program) + cliutil.Fprintf(fs.Output(), " %s -listen 0.0.0.0:56000 -connect 127.0.0.1:51820 -vless\n", program) + cliutil.Fprintf(fs.Output(), " %s -connect 127.0.0.1:51820 -yandex-link https://telemost.yandex.ru/j/... -telemost-dc\n\n", program) cliutil.Fprintln(fs.Output(), "Flags:") fs.PrintDefaults() } @@ -53,10 +62,30 @@ func parseServerOptions(args []string, program string, stdout, stderr io.Writer) if opts.connect == "" { return fmt.Errorf("-connect is required") } + if opts.telemostDC { + if opts.yalink == "" { + return fmt.Errorf("-telemost-dc requires -yandex-link") + } + } return nil }) } +func runSelectedTelemostDataChannelMode(ctx context.Context, inviteLink, connectAddr string, vlessMode bool) error { + if vlessMode { + return runTelemostDataChannelVLESSMode(ctx, inviteLink, connectAddr) + } + + return runTelemostDataChannelMode(ctx, inviteLink, connectAddr) +} + +func closeOnContextDone(ctx context.Context, closer io.Closer) { + go func() { + <-ctx.Done() + _ = closer.Close() + }() +} + func main() { opts, exitCode := parseServerOptions(os.Args[1:], filepath.Base(os.Args[0]), os.Stdout, os.Stderr) if exitCode != cliutil.ContinueExecution { @@ -75,31 +104,41 @@ func main() { log.Fatalf("Exit...\n") }() - addr, err := net.ResolveUDPAddr("udp", opts.listen) + telemost.SetDebug(opts.debug) + + if opts.telemostDC { + if err := runSelectedTelemostDataChannelMode(ctx, opts.yalink, opts.connect, opts.vlessMode); err != nil { + log.Fatalf("Telemost DataChannel mode failed: %v", err) + } + return + } + + listenAddr, err := net.ResolveUDPAddr("udp", opts.listen) if err != nil { - panic(err) + log.Fatalf("invalid listen address %q: %v", opts.listen, err) } + // Generate a certificate and private key to secure the connection certificate, genErr := selfsign.GenerateSelfSigned() if genErr != nil { - panic(genErr) + log.Fatalf("failed to generate DTLS certificate: %v", genErr) } // Connect to a DTLS server listener, err := dtls.ListenWithOptions( "udp", - addr, + listenAddr, dtls.WithCertificates(certificate), dtls.WithExtendedMasterSecret(dtls.RequireExtendedMasterSecret), dtls.WithCipherSuites(dtls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256), dtls.WithConnectionIDGenerator(dtls.RandomCIDGenerator(8)), ) if err != nil { - panic(err) + log.Fatalf("failed to start DTLS listener on %s: %v", opts.listen, err) } context.AfterFunc(ctx, func() { - if err = listener.Close(); err != nil { - panic(err) + if closeErr := listener.Close(); closeErr != nil && !errors.Is(closeErr, net.ErrClosed) { + log.Printf("failed to close DTLS listener: %v", closeErr) } }) diff --git a/server/main_test.go b/server/main_test.go index 01e8707..0e7b368 100644 --- a/server/main_test.go +++ b/server/main_test.go @@ -68,7 +68,7 @@ func TestParseServerOptionsParsesValidArgs(t *testing.T) { var stdout bytes.Buffer var stderr bytes.Buffer - opts, exitCode := parseServerOptions([]string{"-connect", "127.0.0.1:51820", "-listen", "0.0.0.0:56000", "-vless"}, "server", &stdout, &stderr) + opts, exitCode := parseServerOptions([]string{"-connect", "127.0.0.1:51820", "-listen", "0.0.0.0:56000", "-vless", "-debug"}, "server", &stdout, &stderr) if exitCode != cliutil.ContinueExecution { t.Fatalf("parseServerOptions() exitCode = %d, want %d", exitCode, cliutil.ContinueExecution) } @@ -84,4 +84,73 @@ func TestParseServerOptionsParsesValidArgs(t *testing.T) { if !opts.vlessMode { t.Fatal("vlessMode = false, want true") } + if !opts.debug { + t.Fatal("debug = false, want true") + } +} + +func TestParseServerOptionsParsesTelemostDataChannelArgs(t *testing.T) { + t.Parallel() + + var stdout bytes.Buffer + var stderr bytes.Buffer + + opts, exitCode := parseServerOptions([]string{ + "-connect", "127.0.0.1:51820", + "-yandex-link", "https://telemost.yandex.ru/j/test", + "-telemost-dc", + }, "server", &stdout, &stderr) + if exitCode != cliutil.ContinueExecution { + t.Fatalf("parseServerOptions() exitCode = %d, want %d", exitCode, cliutil.ContinueExecution) + } + if stderr.Len() != 0 { + t.Fatalf("expected no stderr output, got %q", stderr.String()) + } + if !opts.telemostDC { + t.Fatal("telemostDC = false, want true") + } + if opts.yalink == "" { + t.Fatal("yalink = empty, want yandex link") + } +} + +func TestParseServerOptionsRejectsTelemostDataChannelWithoutYandexLink(t *testing.T) { + t.Parallel() + + var stdout bytes.Buffer + var stderr bytes.Buffer + + _, exitCode := parseServerOptions([]string{ + "-connect", "127.0.0.1:51820", + "-telemost-dc", + }, "server", &stdout, &stderr) + if exitCode != 2 { + t.Fatalf("parseServerOptions() exitCode = %d, want 2", exitCode) + } + if got := stderr.String(); !strings.Contains(got, "-telemost-dc requires -yandex-link") { + t.Fatalf("expected telemost-dc validation error, got %q", got) + } +} + +func TestParseServerOptionsAllowsTelemostDataChannelWithVLESS(t *testing.T) { + t.Parallel() + + var stdout bytes.Buffer + var stderr bytes.Buffer + + opts, exitCode := parseServerOptions([]string{ + "-connect", "127.0.0.1:51820", + "-yandex-link", "https://telemost.yandex.ru/j/test", + "-telemost-dc", + "-vless", + }, "server", &stdout, &stderr) + if exitCode != cliutil.ContinueExecution { + t.Fatalf("parseServerOptions() exitCode = %d, want %d", exitCode, cliutil.ContinueExecution) + } + if stderr.Len() != 0 { + t.Fatalf("expected no stderr output, got %q", stderr.String()) + } + if !opts.telemostDC || !opts.vlessMode { + t.Fatalf("expected telemostDC and vlessMode to be true, got telemostDC=%v vlessMode=%v", opts.telemostDC, opts.vlessMode) + } } diff --git a/server/telemostdc.go b/server/telemostdc.go new file mode 100644 index 0000000..1446061 --- /dev/null +++ b/server/telemostdc.go @@ -0,0 +1,57 @@ +package main + +import ( + "context" + "errors" + "log" + "net" + + "github.com/cacggghp/vk-turn-proxy/internal/telemost" +) + +func runTelemostDataChannelMode(ctx context.Context, inviteLink, connectAddr string) error { + backendConn, err := net.Dial("udp", connectAddr) + if err != nil { + return err + } + defer func(backendConn net.Conn) { + err := backendConn.Close() + if err != nil { + log.Println(err) + } + }(backendConn) + + peer, err := telemost.NewConnectedPeer(ctx, inviteLink, func(data []byte) { + if _, writeErr := backendConn.Write(data); writeErr != nil { + log.Printf("Telemost DataChannel: failed to write backend packet: %v", writeErr) + } + }, nil) + if err != nil { + return err + } + defer func(peer *telemost.Peer) { + err := peer.Close() + if err != nil { + log.Println(err) + } + }(peer) + + closeOnContextDone(ctx, backendConn) + + log.Printf("Telemost DataChannel mode: forwarding to %s", connectAddr) + + buf := make([]byte, 2048) + for { + n, err := backendConn.Read(buf) + if err != nil { + if ctx.Err() != nil || errors.Is(err, net.ErrClosed) { + return nil + } + return err + } + + if err := peer.Send(buf[:n]); err != nil { + log.Printf("Telemost DataChannel: dropped backend packet (%d bytes): %v", n, err) + } + } +} diff --git a/server/telemostdc_vless.go b/server/telemostdc_vless.go new file mode 100644 index 0000000..38abcd6 --- /dev/null +++ b/server/telemostdc_vless.go @@ -0,0 +1,264 @@ +package main + +import ( + "context" + "errors" + "fmt" + "io" + "log" + "net" + "sync" + "sync/atomic" + "time" + + "github.com/cacggghp/vk-turn-proxy/internal/dcmux" + "github.com/cacggghp/vk-turn-proxy/internal/telemost" +) + +type telemostBackendStream struct { + conn net.Conn + ctx context.Context + cancel context.CancelFunc + writeCh chan []byte + closeOnce sync.Once + connMu sync.Mutex + closed atomic.Bool +} + +func newTelemostBackendStream(parent context.Context, conn net.Conn) *telemostBackendStream { + ctx, cancel := context.WithCancel(parent) + return &telemostBackendStream{ + conn: conn, + ctx: ctx, + cancel: cancel, + writeCh: make(chan []byte, 32), + } +} + +func (s *telemostBackendStream) Close() { + s.closeOnce.Do(func() { + s.closed.Store(true) + s.cancel() + s.connMu.Lock() + _ = s.conn.Close() + s.connMu.Unlock() + }) +} + +func enqueueBackendData(stream *telemostBackendStream, data []byte) error { + if stream.closed.Load() { + return context.Canceled + } + + select { + case <-stream.ctx.Done(): + return context.Canceled + case stream.writeCh <- data: + return nil + default: + return fmt.Errorf("backend write queue full") + } +} + +func (s *telemostBackendStream) write(data []byte) error { + if s.closed.Load() { + return net.ErrClosed + } + + s.connMu.Lock() + defer s.connMu.Unlock() + + if s.closed.Load() { + return net.ErrClosed + } + + if err := s.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)); err != nil { + return fmt.Errorf("set backend write deadline: %w", err) + } + if _, err := s.conn.Write(data); err != nil { + return fmt.Errorf("write backend data: %w", err) + } + + return nil +} + +func handleTelemostBackendStream(streamID uint16, stream *telemostBackendStream, mux *dcmux.Multiplexer, closeStream func(uint16), closeMuxStream func(uint16)) { + defer closeStream(streamID) + defer closeMuxStream(streamID) + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + defer stream.cancel() + + buf := make([]byte, 32768) + for { + n, readErr := stream.conn.Read(buf) + if readErr != nil { + if errors.Is(readErr, io.EOF) || errors.Is(readErr, net.ErrClosed) { + if telemost.DebugEnabled() { + log.Printf("Telemost DataChannel VLESS backend stream closed: %v", readErr) + } + } else { + log.Printf("Telemost DataChannel VLESS backend read error: %v", readErr) + } + return + } + if sendErr := mux.SendData(streamID, buf[:n]); sendErr != nil { + return + } + } + }() + + go func() { + defer wg.Done() + defer stream.cancel() + + for { + select { + case <-stream.ctx.Done(): + return + case data := <-stream.writeCh: + if err := stream.write(data); err != nil { + if errors.Is(err, net.ErrClosed) || errors.Is(err, context.Canceled) || stream.closed.Load() { + return + } + log.Printf("Telemost DataChannel VLESS backend write error: %v", err) + return + } + } + } + }() + + wg.Wait() +} + +func runTelemostDataChannelVLESSMode(ctx context.Context, inviteLink, connectAddr string) error { + var ( + connMu sync.Mutex + conns = make(map[uint16]*telemostBackendStream) + ) + + var peer *telemost.Peer + mux := dcmux.New(0, func(frame []byte) error { + return peer.Send(frame) + }) + + closeStream := func(sid uint16) { + connMu.Lock() + stream := conns[sid] + delete(conns, sid) + connMu.Unlock() + if stream != nil { + stream.Close() + } + } + + closeAll := func() { + connMu.Lock() + streams := make([]*telemostBackendStream, 0, len(conns)) + for sid, stream := range conns { + streams = append(streams, stream) + delete(conns, sid) + } + connMu.Unlock() + + for _, stream := range streams { + stream.Close() + } + } + + closeMuxStream := func(sid uint16) { + if mux.StreamClosed(sid) { + return + } + if err := mux.CloseStream(sid); err != nil { + log.Printf("Telemost DataChannel VLESS server: failed to close mux stream %d: %v", sid, err) + } + } + + getOrCreateBackendStream := func(sid uint16) (*telemostBackendStream, error) { + connMu.Lock() + stream := conns[sid] + connMu.Unlock() + if stream != nil { + return stream, nil + } + + dialer := &net.Dialer{Timeout: 10 * time.Second, KeepAlive: 30 * time.Second} + conn, err := dialer.DialContext(ctx, "tcp", connectAddr) + if err != nil { + return nil, err + } + + stream = newTelemostBackendStream(ctx, conn) + + connMu.Lock() + if existing := conns[sid]; existing != nil { + connMu.Unlock() + stream.Close() + return existing, nil + } + conns[sid] = stream + connMu.Unlock() + + go handleTelemostBackendStream(sid, stream, mux, closeStream, closeMuxStream) + return stream, nil + } + + peer, err := telemost.NewConnectedPeer(ctx, inviteLink, mux.HandleFrame, func() { + if telemost.DebugEnabled() { + log.Printf("Telemost DataChannel VLESS server: peer reconnected, closing active backend streams") + } + closeAll() + mux.Reset() + }) + if err != nil { + return err + } + defer func(peer *telemost.Peer) { + err := peer.Close() + if err != nil { + log.Println(err) + } + }(peer) + + log.Printf("Telemost DataChannel VLESS server: forwarding to %s", connectAddr) + activityCh := mux.WaitForActivity() + + for { + select { + case <-ctx.Done(): + closeAll() + return nil + case <-activityCh: + } + + for _, sid := range mux.GetStreams() { + data := mux.ReadStream(sid) + if len(data) > 0 { + stream, err := getOrCreateBackendStream(sid) + if err != nil { + log.Printf("Telemost DataChannel VLESS backend dial error: %v", err) + closeMuxStream(sid) + continue + } + if err := enqueueBackendData(stream, data); err != nil { + if !errors.Is(err, context.Canceled) { + log.Printf("Telemost DataChannel VLESS backend stream %d stalled: %v", sid, err) + } + closeStream(sid) + closeMuxStream(sid) + continue + } + } + + if mux.StreamClosed(sid) { + closeStream(sid) + mux.CleanupStream(sid) + } + } + } +} diff --git a/server/telemostdc_vless_test.go b/server/telemostdc_vless_test.go new file mode 100644 index 0000000..6c31348 --- /dev/null +++ b/server/telemostdc_vless_test.go @@ -0,0 +1,70 @@ +package main + +import ( + "context" + "errors" + "net" + "strings" + "testing" + "time" +) + +func TestEnqueueBackendDataReturnsWhenQueueIsFull(t *testing.T) { + clientConn, serverConn := net.Pipe() + defer func() { + _ = clientConn.Close() + _ = serverConn.Close() + }() + + stream := newTelemostBackendStream(context.Background(), clientConn) + defer stream.Close() + + stream.writeCh = make(chan []byte, 1) + stream.writeCh <- []byte("busy") + + done := make(chan error, 1) + go func() { + done <- enqueueBackendData(stream, []byte("next")) + }() + + select { + case err := <-done: + if err == nil || !strings.Contains(err.Error(), "queue full") { + t.Fatalf("enqueueBackendData() error = %v, want queue full", err) + } + case <-time.After(200 * time.Millisecond): + t.Fatal("enqueueBackendData() blocked on a full queue") + } +} + +func TestEnqueueBackendDataReturnsCanceledForClosedStream(t *testing.T) { + clientConn, serverConn := net.Pipe() + defer func() { + _ = clientConn.Close() + _ = serverConn.Close() + }() + + stream := newTelemostBackendStream(context.Background(), clientConn) + stream.Close() + + err := enqueueBackendData(stream, []byte("next")) + if !errors.Is(err, context.Canceled) { + t.Fatalf("enqueueBackendData() error = %v, want context canceled", err) + } +} + +func TestTelemostBackendStreamWriteReturnsNetErrClosedAfterClose(t *testing.T) { + clientConn, serverConn := net.Pipe() + defer func() { + _ = clientConn.Close() + _ = serverConn.Close() + }() + + stream := newTelemostBackendStream(context.Background(), clientConn) + stream.Close() + + err := stream.write([]byte("next")) + if !errors.Is(err, net.ErrClosed) { + t.Fatalf("stream.write() error = %v, want net.ErrClosed", err) + } +}