diff --git a/README.md b/README.md index 634bf7f..0ba4fa0 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,6 @@ ## Содержание - [Good TURN](#good-turn) - - [Режимы работы](#режимы-работы) - [Похожие проекты](#похожие-проекты) - [Server & Cli](#server--cli) - [Android](#android) @@ -31,20 +30,13 @@ - [Сервер (VPS)](#сервер-vps) - [Docker](#docker-1) - [Клиент](#клиент-1) - - [Telemost DataChannel](#telemost-datachannel) - - [Сервер](#сервер-1) - - [Клиент](#клиент-2) - - [VLESS через Telemost DataChannel](#vless-через-telemost-datachannel) + - [DataChannel](#datachannel) + - [Telemost сервер](#telemost-сервер) + - [Telemost клиент](#telemost-клиент) + - [SaluteJazz сервер](#salutejazz-сервер) + - [SaluteJazz клиент](#salutejazz-клиент) - [Direct mode](#direct-mode) -## Режимы работы - -| Режим | Клиент | Сервер | Полезная нагрузка | Транспорт | -|-----------------------------------------------|---------------------------------|---------------------------------|------------------------|------------------------------------------------------------------------------| -| VK TURN | `-vk-link ...` | `-connect ...` | UDP-пакеты WireGuard | TURN поверх TCP по умолчанию, либо UDP с `-udp` | -| Telemost TURN _(не работает)_ | `-yandex-link ...` | `-connect ...` | UDP-пакеты WireGuard | TURN поверх TCP по умолчанию, либо UDP с `-udp`; `-n` по умолчанию `1` | -| [VLESS](#vless-режим) | `-vless` | `-vless` | TCP-потоки | TURN поверх TCP по умолчанию, либо UDP с `-udp`, дальше `DTLS -> KCP + smux` | -| [Telemost DataChannel](#telemost-datachannel) | `-telemost-dc -yandex-link ...` | `-telemost-dc -yandex-link ...` | UDP или TCP (`-vless`) | WebRTC DataChannel, без TURN | ## Похожие проекты @@ -576,40 +568,42 @@ Xray сервер (config.json) -## Telemost DataChannel +## DataChannel + +UPD: Яндекс Телемост не работает. -Для Яндекс Телемоста теперь есть альтернативный режим без TURN: `-telemost-dc`. +Для Яндекс Телемоста и SaluteJazz теперь есть альтернативный режим без TURN: `-dc`. Режим работает как для обычного UDP/WireGuard-сценария, так и для `-vless`. -### Сервер +### Telemost сервер ``` -./server -connect 127.0.0.1:<порт WG> -yandex-link https://telemost.yandex.ru/j/... -telemost-dc +./server -connect 127.0.0.1:<порт WG/VLESS> -yandex-link https://telemost.yandex.ru/j/... -dc ``` Вместо ссылки можно использовать просто ID звонка, ссылка может быть `.ru` и `.com`. -### Клиент +### Telemost клиент ``` -./client -listen 127.0.0.1:9000 -yandex-link https://telemost.yandex.ru/j/... -telemost-dc +./client -listen 127.0.0.1:9000 -yandex-link https://telemost.yandex.ru/j/... -dc ``` В этом режиме флаг `-peer` не нужен: клиент и сервер встречаются внутри одной конференции Telemost по одной и той же ссылке. -### VLESS через Telemost DataChannel - -Сервер: +### SaluteJazz сервер ``` -./server -connect 127.0.0.1:<порт VLESS> -yandex-link https://telemost.yandex.ru/j/... -telemost-dc -vless +./server -connect 127.0.0.1:<порт WG/VLESS> -jazz-room any -dc ``` -Клиент: +Сервер создаёт комнату и пишет в лог `room:password`. Вместо `any` можно внести самому существующую комнату. + +### SaluteJazz клиент ``` -./client -listen 127.0.0.1:9000 -yandex-link https://telemost.yandex.ru/j/... -telemost-dc -vless +./client -listen 127.0.0.1:9000 -jazz-room -dc ``` diff --git a/client/dc.go b/client/dc.go new file mode 100644 index 0000000..b9f6013 --- /dev/null +++ b/client/dc.go @@ -0,0 +1,88 @@ +package main + +import ( + "context" + "errors" + "log" + "net" + "sync/atomic" + + "github.com/cacggghp/vk-turn-proxy/internal/jazz" + "github.com/cacggghp/vk-turn-proxy/internal/telemost" +) + +type dataChannelPeer interface { + Send([]byte) error + Close() error +} + +type dataChannelConnectFunc func(context.Context, string, func([]byte), func()) (dataChannelPeer, error) + +func connectTelemostDataChannelPeer(ctx context.Context, room string, onData func([]byte), onReconnect func()) (dataChannelPeer, error) { + return telemost.NewConnectedPeer(ctx, room, onData, onReconnect) +} + +func connectJazzDataChannelPeer(ctx context.Context, room string, onData func([]byte), onReconnect func()) (dataChannelPeer, error) { + return jazz.NewConnectedPeer(ctx, room, onData, onReconnect) +} + +func runTelemostDataChannelMode(ctx context.Context, inviteLink, listenAddr string) error { + return runDataChannelMode(ctx, "Telemost", connectTelemostDataChannelPeer, inviteLink, listenAddr) +} + +func runJazzDataChannelMode(ctx context.Context, room, listenAddr string) error { + return runDataChannelMode(ctx, "SaluteJazz", connectJazzDataChannelPeer, room, listenAddr) +} + +func runDataChannelMode(ctx context.Context, providerName string, connectPeer dataChannelConnectFunc, room, listenAddr string) error { + listenConn, err := net.ListenPacket("udp", listenAddr) + if err != nil { + return err + } + defer func(listenConn net.PacketConn) { + err := listenConn.Close() + if err != nil { + log.Println(err) + } + }(listenConn) + + var activeLocalPeer atomic.Value + peer, err := connectPeer(ctx, room, func(data []byte) { + addr, ok := activeLocalPeer.Load().(net.Addr) + if !ok || addr == nil { + return + } + if _, writeErr := listenConn.WriteTo(data, addr); writeErr != nil { + log.Printf("%s DataChannel: failed to write local packet: %v", providerName, writeErr) + } + }, nil) + if err != nil { + return err + } + defer func(peer dataChannelPeer) { + err := peer.Close() + if err != nil { + log.Println(err) + } + }(peer) + + closeOnContextDone(ctx, listenConn) + + log.Printf("%s DataChannel mode: listening on %s", providerName, listenAddr) + + buf := make([]byte, 2048) + for { + n, addr, err := listenConn.ReadFrom(buf) + if err != nil { + if ctx.Err() != nil || errors.Is(err, net.ErrClosed) { + return nil + } + return err + } + + activeLocalPeer.Store(addr) + if err := peer.Send(buf[:n]); err != nil { + log.Printf("%s DataChannel: dropped outbound packet (%d bytes): %v", providerName, n, err) + } + } +} diff --git a/client/telemostdc_vless.go b/client/dc_vless.go similarity index 72% rename from client/telemostdc_vless.go rename to client/dc_vless.go index 37d782e..67a0a68 100644 --- a/client/telemostdc_vless.go +++ b/client/dc_vless.go @@ -9,10 +9,17 @@ import ( "time" "github.com/cacggghp/vk-turn-proxy/internal/dcmux" - "github.com/cacggghp/vk-turn-proxy/internal/telemost" ) func runTelemostDataChannelVLESSMode(ctx context.Context, inviteLink, listenAddr string) error { + return runDataChannelVLESSMode(ctx, "Telemost", connectTelemostDataChannelPeer, inviteLink, listenAddr) +} + +func runJazzDataChannelVLESSMode(ctx context.Context, room, listenAddr string) error { + return runDataChannelVLESSMode(ctx, "SaluteJazz", connectJazzDataChannelPeer, room, listenAddr) +} + +func runDataChannelVLESSMode(ctx context.Context, providerName string, connectPeer dataChannelConnectFunc, room, listenAddr string) error { var ( connMu sync.Mutex conns = make(map[uint16]net.Conn) @@ -26,22 +33,20 @@ func runTelemostDataChannelVLESSMode(ctx context.Context, inviteLink, listenAddr } } - var peer *telemost.Peer + var peer dataChannelPeer clientID := uint32(time.Now().UnixNano()) mux := dcmux.New(clientID, func(frame []byte) error { return peer.Send(frame) }) - peer, err := telemost.NewConnectedPeer(ctx, inviteLink, mux.HandleFrame, func() { - if telemost.DebugEnabled() { - log.Printf("Telemost DataChannel VLESS: peer reconnected, closing active TCP streams") - } + peer, err := connectPeer(ctx, room, mux.HandleFrame, func() { + log.Printf("%s DataChannel VLESS: peer reconnected, closing active TCP streams", providerName) closeAll() mux.Reset() }) if err != nil { return err } - defer func(peer *telemost.Peer) { + defer func(peer dataChannelPeer) { err := peer.Close() if err != nil { log.Println(err) @@ -60,7 +65,7 @@ func runTelemostDataChannelVLESSMode(ctx context.Context, inviteLink, listenAddr }(listener) closeOnContextDone(ctx, listener) - log.Printf("Telemost DataChannel VLESS mode: listening on %s", listenAddr) + log.Printf("%s DataChannel VLESS mode: listening on %s", providerName, listenAddr) for { conn, err := listener.Accept() @@ -69,7 +74,7 @@ func runTelemostDataChannelVLESSMode(ctx context.Context, inviteLink, listenAddr closeAll() return nil } - log.Printf("Telemost DataChannel VLESS accept error: %v", err) + log.Printf("%s DataChannel VLESS accept error: %v", providerName, err) continue } @@ -84,7 +89,7 @@ func runTelemostDataChannelVLESSMode(ctx context.Context, inviteLink, listenAddr delete(conns, streamID) connMu.Unlock() if err := mux.CloseStream(streamID); err != nil { - log.Printf("Telemost DataChannel VLESS: failed to close mux stream %d: %v", streamID, err) + log.Printf("%s DataChannel VLESS: failed to close mux stream %d: %v", providerName, streamID, err) } _ = tcpConn.Close() mux.CleanupStream(streamID) diff --git a/client/main.go b/client/main.go index 8d5e82d..2e63169 100644 --- a/client/main.go +++ b/client/main.go @@ -37,6 +37,7 @@ import ( "github.com/bschaatsbergen/dnsdialer" "github.com/cacggghp/vk-turn-proxy/internal/cliutil" + "github.com/cacggghp/vk-turn-proxy/internal/jazz" "github.com/cacggghp/vk-turn-proxy/internal/namegen" "github.com/cacggghp/vk-turn-proxy/internal/telemost" "github.com/cacggghp/vk-turn-proxy/tcputil" @@ -91,12 +92,13 @@ type clientOptions struct { listen string vklink string yalink string + jazzRoom string peerAddr string n int udp bool direct bool vlessMode bool - telemostDC bool + dc bool debug bool manualCaptcha bool } @@ -111,13 +113,13 @@ func newClientFlagSet(program string, output io.Writer) (*flag.FlagSet, *clientO fs.StringVar(&opts.listen, "listen", "127.0.0.1:9000", "listen on ip:port") fs.StringVar(&opts.vklink, "vk-link", "", "VK calls invite link \"https://vk.com/call/join/...\"") fs.StringVar(&opts.yalink, "yandex-link", "", "Yandex Telemost invite link \"https://telemost.yandex.ru/j/...\"") + fs.StringVar(&opts.jazzRoom, "jazz-room", "", "SaluteJazz room \"roomId[:password]\"") fs.StringVar(&opts.peerAddr, "peer", "", "peer server address (host:port)") fs.IntVar(&opts.n, "n", 0, "connections to TURN (default 10 for VK, 1 for Yandex)") fs.BoolVar(&opts.udp, "udp", false, "connect to TURN with UDP") fs.BoolVar(&opts.direct, "no-dtls", false, "connect without obfuscation. DO NOT USE") fs.BoolVar(&opts.vlessMode, "vless", false, "VLESS mode: forward TCP connections (for VLESS) instead of UDP packets") - fs.BoolVar(&opts.telemostDC, "telemost-dc", false, "use Yandex Telemost DataChannel instead of TURN") - fs.BoolVar(&opts.telemostDC, "telemost-datachannel", false, "use Yandex Telemost DataChannel instead of TURN") + fs.BoolVar(&opts.dc, "dc", false, "use WebRTC DataChannel instead of TURN") fs.BoolVar(&opts.debug, "debug", false, "enable debug logging") fs.BoolVar(&opts.manualCaptcha, "manual-captcha", false, "skip auto captcha solving, use manual mode immediately") fs.Usage = func() { @@ -125,7 +127,8 @@ func newClientFlagSet(program string, output io.Writer) (*flag.FlagSet, *clientO cliutil.Fprintln(fs.Output(), "Examples:") cliutil.Fprintf(fs.Output(), " %s -listen 127.0.0.1:9000 -peer 203.0.113.10:56000 -vk-link https://vk.com/call/join/...\n", program) cliutil.Fprintf(fs.Output(), " %s -udp -turn 5.255.211.241 -peer 203.0.113.10:56000 -yandex-link https://telemost.yandex.ru/j/... -listen 127.0.0.1:9000\n", program) - cliutil.Fprintf(fs.Output(), " %s -listen 127.0.0.1:9000 -yandex-link https://telemost.yandex.ru/j/... -telemost-dc\n\n", program) + cliutil.Fprintf(fs.Output(), " %s -listen 127.0.0.1:9000 -yandex-link https://telemost.yandex.ru/j/... -dc\n", program) + cliutil.Fprintf(fs.Output(), " %s -listen 127.0.0.1:9000 -jazz-room room:password -dc\n\n", program) cliutil.Fprintln(fs.Output(), "Flags:") fs.PrintDefaults() } @@ -135,17 +138,24 @@ func newClientFlagSet(program string, output io.Writer) (*flag.FlagSet, *clientO func parseClientOptions(args []string, program string, stdout, stderr io.Writer) (clientOptions, int) { return cliutil.Parse(args, program, stdout, stderr, newClientFlagSet, func(opts *clientOptions) error { - if !opts.telemostDC && opts.peerAddr == "" { + if !opts.dc && opts.peerAddr == "" { return fmt.Errorf("-peer is required") } - if (opts.vklink == "") == (opts.yalink == "") { - return fmt.Errorf("exactly one of -vk-link or -yandex-link is required") - } - if opts.telemostDC { - if opts.yalink == "" { - return fmt.Errorf("-telemost-dc requires -yandex-link") + linkCount := 0 + for _, link := range []string{opts.vklink, opts.yalink, opts.jazzRoom} { + if link != "" { + linkCount++ } } + if linkCount != 1 { + return fmt.Errorf("exactly one of -vk-link, -yandex-link, or -jazz-room is required") + } + if opts.jazzRoom != "" && !opts.dc { + return fmt.Errorf("-jazz-room requires -dc") + } + if opts.dc && opts.yalink == "" && opts.jazzRoom == "" { + return fmt.Errorf("-dc requires -yandex-link or -jazz-room") + } return nil }) } @@ -158,6 +168,14 @@ func runSelectedTelemostDataChannelMode(ctx context.Context, inviteLink, listenA return runTelemostDataChannelMode(ctx, inviteLink, listenAddr) } +func runSelectedJazzDataChannelMode(ctx context.Context, room, listenAddr string, vlessMode bool) error { + if vlessMode { + return runJazzDataChannelVLESSMode(ctx, room, listenAddr) + } + + return runJazzDataChannelMode(ctx, room, listenAddr) +} + func closeOnContextDone(ctx context.Context, closer io.Closer) { go func() { <-ctx.Done() @@ -2054,15 +2072,22 @@ func main() { isDebug = opts.debug telemost.SetDebug(opts.debug) + jazz.SetDebug(opts.debug) manualCaptcha = opts.manualCaptcha autoCaptchaSliderPOC = !manualCaptcha - if opts.telemostDC { + if opts.dc && opts.yalink != "" { if err := runSelectedTelemostDataChannelMode(ctx, opts.yalink, opts.listen, opts.vlessMode); err != nil { log.Fatalf("Telemost DataChannel mode failed: %v", err) } return } + if opts.dc && opts.jazzRoom != "" { + if err := runSelectedJazzDataChannelMode(ctx, opts.jazzRoom, opts.listen, opts.vlessMode); err != nil { + log.Fatalf("SaluteJazz DataChannel mode failed: %v", err) + } + return + } peer, err := net.ResolveUDPAddr("udp", opts.peerAddr) if err != nil { diff --git a/client/main_test.go b/client/main_test.go index 910930a..dc60265 100644 --- a/client/main_test.go +++ b/client/main_test.go @@ -100,7 +100,7 @@ func TestParseClientOptionsParsesTelemostDataChannelArgs(t *testing.T) { opts, exitCode := parseClientOptions([]string{ "-yandex-link", "https://telemost.yandex.ru/j/test", "-listen", "127.0.0.1:9002", - "-telemost-dc", + "-dc", }, "client", &stdout, &stderr) if exitCode != cliutil.ContinueExecution { t.Fatalf("parseClientOptions() exitCode = %d, want %d", exitCode, cliutil.ContinueExecution) @@ -108,15 +108,15 @@ func TestParseClientOptionsParsesTelemostDataChannelArgs(t *testing.T) { if stderr.Len() != 0 { t.Fatalf("expected no stderr output, got %q", stderr.String()) } - if !opts.telemostDC { - t.Fatal("telemostDC = false, want true") + if !opts.dc { + t.Fatal("dc = false, want true") } if opts.peerAddr != "" { - t.Fatalf("peerAddr = %q, want empty for telemost-dc mode", opts.peerAddr) + t.Fatalf("peerAddr = %q, want empty for dc mode", opts.peerAddr) } } -func TestParseClientOptionsRejectsTelemostDataChannelWithoutYandexLink(t *testing.T) { +func TestParseClientOptionsRejectsDataChannelWithoutDataChannelRoom(t *testing.T) { t.Parallel() var stdout bytes.Buffer @@ -124,13 +124,13 @@ func TestParseClientOptionsRejectsTelemostDataChannelWithoutYandexLink(t *testin _, exitCode := parseClientOptions([]string{ "-vk-link", "https://vk.com/call/join/test", - "-telemost-dc", + "-dc", }, "client", &stdout, &stderr) if exitCode != 2 { t.Fatalf("parseClientOptions() exitCode = %d, want 2", exitCode) } - if got := stderr.String(); !strings.Contains(got, "-telemost-dc requires -yandex-link") { - t.Fatalf("expected telemost-dc validation error, got %q", got) + if got := stderr.String(); !strings.Contains(got, "-dc requires -yandex-link or -jazz-room") { + t.Fatalf("expected dc validation error, got %q", got) } } @@ -142,7 +142,7 @@ func TestParseClientOptionsAllowsTelemostDataChannelWithVLESS(t *testing.T) { opts, exitCode := parseClientOptions([]string{ "-yandex-link", "https://telemost.yandex.ru/j/test", - "-telemost-dc", + "-dc", "-vless", }, "client", &stdout, &stderr) if exitCode != cliutil.ContinueExecution { @@ -151,8 +151,54 @@ func TestParseClientOptionsAllowsTelemostDataChannelWithVLESS(t *testing.T) { if stderr.Len() != 0 { t.Fatalf("expected no stderr output, got %q", stderr.String()) } - if !opts.telemostDC || !opts.vlessMode { - t.Fatalf("expected telemostDC and vlessMode to be true, got telemostDC=%v vlessMode=%v", opts.telemostDC, opts.vlessMode) + if !opts.dc || !opts.vlessMode { + t.Fatalf("expected dc and vlessMode to be true, got dc=%v vlessMode=%v", opts.dc, opts.vlessMode) + } +} + +func TestParseClientOptionsParsesJazzDataChannelArgs(t *testing.T) { + t.Parallel() + + var stdout bytes.Buffer + var stderr bytes.Buffer + + opts, exitCode := parseClientOptions([]string{ + "-jazz-room", "room:password", + "-listen", "127.0.0.1:9002", + "-dc", + }, "client", &stdout, &stderr) + if exitCode != cliutil.ContinueExecution { + t.Fatalf("parseClientOptions() exitCode = %d, want %d", exitCode, cliutil.ContinueExecution) + } + if stderr.Len() != 0 { + t.Fatalf("expected no stderr output, got %q", stderr.String()) + } + if !opts.dc { + t.Fatal("dc = false, want true") + } + if opts.jazzRoom != "room:password" { + t.Fatalf("jazzRoom = %q, want room:password", opts.jazzRoom) + } + if opts.peerAddr != "" { + t.Fatalf("peerAddr = %q, want empty for dc mode", opts.peerAddr) + } +} + +func TestParseClientOptionsRejectsJazzRoomWithoutDataChannel(t *testing.T) { + t.Parallel() + + var stdout bytes.Buffer + var stderr bytes.Buffer + + _, exitCode := parseClientOptions([]string{ + "-peer", "127.0.0.1:56000", + "-jazz-room", "room:password", + }, "client", &stdout, &stderr) + if exitCode != 2 { + t.Fatalf("parseClientOptions() exitCode = %d, want 2", exitCode) + } + if got := stderr.String(); !strings.Contains(got, "-jazz-room requires -dc") { + t.Fatalf("expected jazz room validation error, got %q", got) } } diff --git a/client/telemostdc.go b/client/telemostdc.go deleted file mode 100644 index 7c83b16..0000000 --- a/client/telemostdc.go +++ /dev/null @@ -1,64 +0,0 @@ -package main - -import ( - "context" - "errors" - "log" - "net" - "sync/atomic" - - "github.com/cacggghp/vk-turn-proxy/internal/telemost" -) - -func runTelemostDataChannelMode(ctx context.Context, inviteLink, listenAddr string) error { - listenConn, err := net.ListenPacket("udp", listenAddr) - if err != nil { - return err - } - defer func(listenConn net.PacketConn) { - err := listenConn.Close() - if err != nil { - log.Println(err) - } - }(listenConn) - - var activeLocalPeer atomic.Value - peer, err := telemost.NewConnectedPeer(ctx, inviteLink, func(data []byte) { - addr, ok := activeLocalPeer.Load().(net.Addr) - if !ok || addr == nil { - return - } - if _, writeErr := listenConn.WriteTo(data, addr); writeErr != nil { - log.Printf("Telemost DataChannel: failed to write local packet: %v", writeErr) - } - }, nil) - if err != nil { - return err - } - defer func(peer *telemost.Peer) { - err := peer.Close() - if err != nil { - log.Println(err) - } - }(peer) - - closeOnContextDone(ctx, listenConn) - - log.Printf("Telemost DataChannel mode: listening on %s", listenAddr) - - buf := make([]byte, 2048) - for { - n, addr, err := listenConn.ReadFrom(buf) - if err != nil { - if ctx.Err() != nil || errors.Is(err, net.ErrClosed) { - return nil - } - return err - } - - activeLocalPeer.Store(addr) - if err := peer.Send(buf[:n]); err != nil { - log.Printf("Telemost DataChannel: dropped outbound packet (%d bytes): %v", n, err) - } - } -} diff --git a/internal/jazz/api.go b/internal/jazz/api.go new file mode 100644 index 0000000..fa8f6dc --- /dev/null +++ b/internal/jazz/api.go @@ -0,0 +1,205 @@ +package jazz + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "strings" + "time" + + "github.com/cacggghp/vk-turn-proxy/internal/namegen" + "github.com/google/uuid" +) + +const defaultAPIBaseURL = "https://bk.salutejazz.ru" + +var ( + apiBaseURL = defaultAPIBaseURL + apiHTTPClient = &http.Client{Timeout: 15 * time.Second} +) + +var ( + errCreateRoomFailed = errors.New("create room failed") + errPreconnectFailed = errors.New("preconnect failed") +) + +type RoomInfo struct { + RoomID string `json:"roomId"` + Password string `json:"password"` + ConnectorURL string `json:"connectorUrl"` +} + +func CreateRoom(ctx context.Context) (*RoomInfo, error) { + headers := jazzHeaders() + + createResp, err := createMeeting(ctx, headers) + if err != nil { + return nil, fmt.Errorf("create meeting: %w", err) + } + + connectorURL, err := preconnect(ctx, createResp.RoomID, createResp.Password, headers) + if err != nil { + return nil, fmt.Errorf("preconnect: %w", err) + } + + return &RoomInfo{ + RoomID: createResp.RoomID, + Password: createResp.Password, + ConnectorURL: connectorURL, + }, nil +} + +func JoinRoom(ctx context.Context, roomInput string) (*RoomInfo, error) { + roomID, password := parseRoomInput(roomInput) + if roomID == "" { + return nil, fmt.Errorf("jazz room is required") + } + + connectorURL, err := preconnect(ctx, roomID, password, jazzHeaders()) + if err != nil { + return nil, err + } + + return &RoomInfo{ + RoomID: roomID, + Password: password, + ConnectorURL: connectorURL, + }, nil +} + +func parseRoomInput(roomInput string) (string, string) { + roomInput = strings.TrimSpace(roomInput) + roomInput = strings.TrimPrefix(roomInput, "https://salutejazz.ru/") + roomInput = strings.TrimPrefix(roomInput, "http://salutejazz.ru/") + roomInput = strings.TrimPrefix(roomInput, "https://jazz.sber.ru/") + roomInput = strings.TrimPrefix(roomInput, "http://jazz.sber.ru/") + if idx := strings.IndexAny(roomInput, "/?#"); idx != -1 { + roomInput = roomInput[:idx] + } + + roomID, password, _ := strings.Cut(roomInput, ":") + return strings.TrimSpace(roomID), strings.TrimSpace(password) +} + +func jazzHeaders() map[string]string { + return map[string]string{ + "X-Jazz-ClientId": uuid.New().String(), + "X-Jazz-AuthType": "ANONYMOUS", + "X-Client-AuthType": "ANONYMOUS", + "Content-Type": "application/json", + } +} + +type createResponse struct { + RoomID string `json:"roomId"` + Password string `json:"password"` +} + +func createMeeting(ctx context.Context, headers map[string]string) (*createResponse, error) { + createPayload := map[string]any{ + "title": namegen.Generate() + " ДР", + "guestEnabled": true, + "lobbyEnabled": false, + "serverVideoRecordAutoStartEnabled": false, + "sipEnabled": false, + "moderatorEmails": []string{}, + "summarizationEnabled": false, + "room3dEnabled": false, + "room3dScene": "XRLobby", + } + + body, err := json.Marshal(createPayload) + if err != nil { + return nil, fmt.Errorf("marshal create payload: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, apiBaseURL+"/room/create-meeting", bytes.NewReader(body)) + if err != nil { + return nil, fmt.Errorf("create request: %w", err) + } + setHeaders(req, headers) + + resp, err := apiHTTPClient.Do(req) + if err != nil { + return nil, fmt.Errorf("do create request: %w", err) + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != http.StatusOK { + return nil, statusError(errCreateRoomFailed, resp) + } + + var res createResponse + if err := json.NewDecoder(resp.Body).Decode(&res); err != nil { + return nil, fmt.Errorf("decode create response: %w", err) + } + + return &res, nil +} + +func preconnect(ctx context.Context, roomID, password string, headers map[string]string) (string, error) { + preconnectPayload := map[string]any{ + "password": password, + "jazzNextMigration": map[string]any{ + "b2bBaseRoomSupport": true, + "demoRoomBaseSupport": true, + "demoRoomVersionSupport": 2, + "mediaWithoutAutoSubscribeSupport": true, + "webinarSpeakerSupport": true, + "webinarViewerSupport": true, + "sdkRoomSupport": true, + "sberclassRoomSupport": true, + }, + } + + body, err := json.Marshal(preconnectPayload) + if err != nil { + return "", fmt.Errorf("marshal preconnect payload: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, fmt.Sprintf("%s/room/%s/preconnect", apiBaseURL, roomID), bytes.NewReader(body)) + if err != nil { + return "", fmt.Errorf("create preconnect request: %w", err) + } + setHeaders(req, headers) + + resp, err := apiHTTPClient.Do(req) + if err != nil { + return "", fmt.Errorf("do preconnect request: %w", err) + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != http.StatusOK { + return "", statusError(errPreconnectFailed, resp) + } + + var preconnectResp struct { + ConnectorURL string `json:"connectorUrl"` + } + if err := json.NewDecoder(resp.Body).Decode(&preconnectResp); err != nil { + return "", fmt.Errorf("decode preconnect response: %w", err) + } + if preconnectResp.ConnectorURL == "" { + return "", fmt.Errorf("preconnect response missing connector URL") + } + + return preconnectResp.ConnectorURL, nil +} + +func setHeaders(req *http.Request, headers map[string]string) { + for key, value := range headers { + req.Header.Set(key, value) + } +} + +func statusError(base error, resp *http.Response) error { + body, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("%w: status %s and response body read failed: %v", base, resp.Status, err) + } + return fmt.Errorf("%w: status %s: %s", base, resp.Status, strings.TrimSpace(string(body))) +} diff --git a/internal/jazz/datapacket.go b/internal/jazz/datapacket.go new file mode 100644 index 0000000..2339758 --- /dev/null +++ b/internal/jazz/datapacket.go @@ -0,0 +1,137 @@ +package jazz + +import ( + "encoding/binary" + "fmt" + "io" + + "github.com/google/uuid" +) + +func encodeVarint(value uint64) []byte { + buf := make([]byte, binary.MaxVarintLen64) + n := binary.PutUvarint(buf, value) + return buf[:n] +} + +func encodeField(fieldNumber int, wireType int, data []byte) []byte { + tag := encodeVarint(uint64(fieldNumber)<<3 | uint64(wireType)) //nolint:gosec + switch wireType { + case 2: + length := encodeVarint(uint64(len(data))) + result := make([]byte, 0, len(tag)+len(length)+len(data)) + result = append(result, tag...) + result = append(result, length...) + result = append(result, data...) + return result + default: + result := make([]byte, 0, len(tag)+len(data)) + result = append(result, tag...) + result = append(result, data...) + return result + } +} + +func EncodeDataPacket(payload []byte) []byte { + userFields := encodeField(2, 2, payload) + userFields = append(userFields, encodeField(8, 2, []byte(uuid.New().String()))...) + + packet := encodeField(1, 0, encodeVarint(0)) + packet = append(packet, encodeField(2, 2, userFields)...) + + return packet +} + +func DecodeDataPacket(raw []byte) ([]byte, bool) { + userData, ok := parseFields(raw, 2) + if !ok { + return nil, false + } + + payload, ok := parseFields(userData, 2) + return payload, ok +} + +func parseFields(data []byte, targetField int) ([]byte, bool) { + reader := &byteReader{data: data} + var result []byte + + for reader.pos < len(reader.data) { + tagVal, err := readVarint(reader) + if err != nil { + break + } + + fieldNumber := int(tagVal >> 3) + wireType := int(tagVal & 0x07) + + fieldData, ok := handleWireType(reader, wireType, len(data)) + if !ok { + return result, len(result) > 0 + } + + if fieldNumber == targetField && wireType == 2 { + result = fieldData + } + } + + return result, len(result) > 0 +} + +func readVarint(r io.ByteReader) (uint64, error) { + val, err := binary.ReadUvarint(r) + if err != nil { + return 0, fmt.Errorf("read uvarint: %w", err) + } + return val, nil +} + +func handleWireType(reader *byteReader, wireType int, dataLen int) ([]byte, bool) { + switch wireType { + case 0: + _, err := readVarint(reader) + return nil, err == nil + case 1: + reader.pos += 8 + return nil, reader.pos <= dataLen + case 2: + length, err := readVarint(reader) + if err != nil { + return nil, false + } + if length > uint64(dataLen)-uint64(reader.pos) { //nolint:gosec + return nil, false + } + fieldData := make([]byte, length) + n, err := reader.Read(fieldData) + return fieldData, err == nil && uint64(n) == length //nolint:gosec + case 5: + reader.pos += 4 + return nil, reader.pos <= dataLen + default: + return nil, false + } +} + +type byteReader struct { + data []byte + pos int +} + +func (b *byteReader) ReadByte() (byte, error) { + if b.pos >= len(b.data) { + return 0, io.EOF + } + c := b.data[b.pos] + b.pos++ + return c, nil +} + +func (b *byteReader) Read(p []byte) (int, error) { + if b.pos >= len(b.data) { + return 0, io.EOF + } + n := copy(p, b.data[b.pos:]) + b.pos += n + return n, nil +} diff --git a/internal/jazz/peer.go b/internal/jazz/peer.go new file mode 100644 index 0000000..f6e8d64 --- /dev/null +++ b/internal/jazz/peer.go @@ -0,0 +1,769 @@ +package jazz + +import ( + "context" + "fmt" + "log" + "sync" + "sync/atomic" + "time" + + "github.com/cacggghp/vk-turn-proxy/internal/namegen" + "github.com/google/uuid" + "github.com/gorilla/websocket" + "github.com/pion/webrtc/v4" +) + +const ( + dataChannelLabel = "_reliable" + maxDataChannelMessageSize = 12288 +) + +var debugLogging atomic.Bool + +type Peer struct { + roomInput string + name string + onData func([]byte) + + roomMu sync.Mutex + roomInfo *RoomInfo + + sessionMu sync.RWMutex + session *peerSession + + reconnectCbMu sync.RWMutex + reconnectCb func() + wsMu sync.Mutex + sendMu sync.Mutex + reconnectCh chan struct{} + closeCh chan struct{} + closed atomic.Bool +} + +type peerSession struct { + roomInfo *RoomInfo + ws *websocket.Conn + pcSub *webrtc.PeerConnection + pcPub *webrtc.PeerConnection + dc *webrtc.DataChannel + groupID string + connected atomic.Bool +} + +func NewPeer(roomInput, name string, onData func([]byte)) *Peer { + return &Peer{ + roomInput: roomInput, + name: name, + onData: onData, + reconnectCh: make(chan struct{}, 1), + closeCh: make(chan struct{}), + } +} + +func NewConnectedPeer(ctx context.Context, roomInput string, onData func([]byte), onReconnect func()) (*Peer, error) { + peer := NewPeer(roomInput, namegen.Generate(), onData) + if onReconnect != nil { + peer.SetReconnectCallback(onReconnect) + } + if err := peer.Connect(ctx); err != nil { + if closeErr := peer.Close(); closeErr != nil { + log.Printf("SaluteJazz DataChannel peer cleanup failed after connect error: %v", closeErr) + } + return nil, err + } + + go peer.WatchConnection(ctx) + + return peer, nil +} + +func SetDebug(enabled bool) { + debugLogging.Store(enabled) +} + +func DebugEnabled() bool { + return debugLogging.Load() +} + +func debugf(format string, args ...any) { + if DebugEnabled() { + log.Printf(format, args...) + } +} + +func (p *Peer) Connect(ctx context.Context) error { + return p.connectOnce(ctx) +} + +func (p *Peer) WatchConnection(ctx context.Context) { + const ( + maxReconnects = 10 + reconnectWindow = 5 * time.Minute + ) + + var lastReconnect time.Time + reconnectCount := 0 + + for { + select { + case <-ctx.Done(): + return + case <-p.closeCh: + return + case <-p.reconnectCh: + } + + now := time.Now() + if now.Sub(lastReconnect) > reconnectWindow { + reconnectCount = 0 + } + if reconnectCount >= maxReconnects { + log.Printf("SaluteJazz DataChannel: max reconnect attempts (%d) reached", maxReconnects) + return + } + reconnectCount++ + lastReconnect = now + + backoff := time.Duration(reconnectCount) * 2 * time.Second + if backoff > 30*time.Second { + backoff = 30 * time.Second + } + + for { + if ctx.Err() != nil || p.closed.Load() { + return + } + + debugf("SaluteJazz DataChannel: reconnecting in %v", backoff) + select { + case <-ctx.Done(): + return + case <-p.closeCh: + return + case <-time.After(backoff): + } + + if err := p.reconnect(ctx); err != nil { + debugf("SaluteJazz DataChannel: reconnect failed: %v", err) + continue + } + + debugf("SaluteJazz DataChannel: reconnected") + break + } + } +} + +func (p *Peer) Send(data []byte) error { + if len(data) > maxDataChannelMessageSize { + return fmt.Errorf("datachannel message too large: %d > %d", len(data), maxDataChannelMessageSize) + } + + p.sessionMu.RLock() + session := p.session + p.sessionMu.RUnlock() + + if session == nil || session.dc == nil || session.dc.ReadyState() != webrtc.DataChannelStateOpen { + return fmt.Errorf("datachannel not ready") + } + + p.sendMu.Lock() + defer p.sendMu.Unlock() + + encoded := EncodeDataPacket(data) + start := time.Now() + for session.dc.BufferedAmount() > 256*1024 { + if time.Since(start) > 2*time.Second { + return fmt.Errorf("datachannel buffer is full") + } + if session.dc.ReadyState() != webrtc.DataChannelStateOpen { + return fmt.Errorf("datachannel not ready") + } + time.Sleep(10 * time.Millisecond) + } + + return session.dc.Send(encoded) +} + +func (p *Peer) Close() error { + if p.closed.Swap(true) { + return nil + } + + select { + case <-p.closeCh: + default: + close(p.closeCh) + } + + p.cleanupCurrentSession() + return nil +} + +func (p *Peer) SetReconnectCallback(cb func()) { + p.reconnectCbMu.Lock() + p.reconnectCb = cb + p.reconnectCbMu.Unlock() +} + +func (p *Peer) reconnect(ctx context.Context) error { + p.cleanupCurrentSession() + if err := p.connectOnce(ctx); err != nil { + return err + } + + p.reconnectCbMu.RLock() + cb := p.reconnectCb + p.reconnectCbMu.RUnlock() + if cb != nil { + cb() + } + + return nil +} + +func (p *Peer) connectOnce(ctx context.Context) error { + roomInfo, err := p.connectionInfo(ctx) + if err != nil { + return err + } + + config := webrtc.Configuration{ + ICEServers: []webrtc.ICEServer{}, + SDPSemantics: webrtc.SDPSemanticsUnifiedPlan, + BundlePolicy: webrtc.BundlePolicyMaxBundle, + } + + api := webrtc.NewAPI() + + pcSub, err := api.NewPeerConnection(config) + if err != nil { + return err + } + pcPub, err := api.NewPeerConnection(config) + if err != nil { + _ = pcSub.Close() + return err + } + + session := &peerSession{ + roomInfo: roomInfo, + pcSub: pcSub, + pcPub: pcPub, + } + + pcSub.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { + debugf("SaluteJazz subscriber state: %s", state.String()) + if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateDisconnected { + p.signalReconnectIfCurrent(session, "subscriber state "+state.String()) + } + }) + pcPub.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { + debugf("SaluteJazz publisher state: %s", state.String()) + if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateDisconnected { + p.signalReconnectIfCurrent(session, "publisher state "+state.String()) + } + }) + + dc, err := pcPub.CreateDataChannel(dataChannelLabel, &webrtc.DataChannelInit{ + Ordered: ptr(true), + }) + if err != nil { + p.cleanupSession(session) + return err + } + session.dc = dc + + dcReady := make(chan struct{}, 1) + dc.OnOpen(func() { + if session.connected.CompareAndSwap(false, true) { + log.Printf("SaluteJazz DataChannel connected") + } + select { + case dcReady <- struct{}{}: + default: + } + }) + dc.OnClose(func() { + p.signalReconnectIfCurrent(session, "publisher datachannel closed") + }) + dc.OnMessage(func(msg webrtc.DataChannelMessage) { + p.handleIncomingMessage(msg.Data) + }) + + pcSub.OnDataChannel(func(remoteDC *webrtc.DataChannel) { + if remoteDC.Label() != dataChannelLabel { + debugf("SaluteJazz remote datachannel ignored: %s", remoteDC.Label()) + return + } + debugf("SaluteJazz remote datachannel opened: %s", remoteDC.Label()) + remoteDC.OnClose(func() { + p.signalReconnectIfCurrent(session, "remote datachannel "+remoteDC.Label()+" closed") + }) + remoteDC.OnMessage(func(msg webrtc.DataChannelMessage) { + p.handleIncomingMessage(msg.Data) + }) + }) + + ws, resp, err := websocket.DefaultDialer.Dial(roomInfo.ConnectorURL, nil) + if err != nil { + if resp != nil && resp.Body != nil { + _ = resp.Body.Close() + } + p.cleanupSession(session) + return err + } + if resp != nil && resp.Body != nil { + _ = resp.Body.Close() + } + session.ws = ws + + ws.SetPongHandler(func(string) error { + return ws.SetReadDeadline(time.Now().Add(60 * time.Second)) + }) + if err := ws.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil { + p.cleanupSession(session) + return err + } + + p.sessionMu.Lock() + p.session = session + p.sessionMu.Unlock() + + go p.keepAlive(session) + go p.handleSignaling(session) + + if err := p.sendJoin(session); err != nil { + p.cleanupCurrentSession() + return err + } + + select { + case <-ctx.Done(): + p.cleanupCurrentSession() + return ctx.Err() + case <-p.closeCh: + p.cleanupCurrentSession() + return context.Canceled + case <-time.After(30 * time.Second): + p.cleanupCurrentSession() + return fmt.Errorf("datachannel timeout") + case <-dcReady: + return nil + } +} + +func (p *Peer) connectionInfo(ctx context.Context) (*RoomInfo, error) { + p.roomMu.Lock() + defer p.roomMu.Unlock() + + if p.roomInfo != nil { + roomInfo, err := JoinRoom(ctx, p.roomInfo.RoomID+":"+p.roomInfo.Password) + if err != nil { + return nil, fmt.Errorf("refresh room connection: %w", err) + } + p.roomInfo.ConnectorURL = roomInfo.ConnectorURL + return p.roomInfo, nil + } + + roomID, _ := parseRoomInput(p.roomInput) + if roomID == "" || roomID == "any" || roomID == "dummy" { + roomInfo, err := CreateRoom(ctx) + if err != nil { + return nil, fmt.Errorf("create room: %w", err) + } + p.roomInfo = roomInfo + log.Printf("SaluteJazz room created: %s:%s", roomInfo.RoomID, roomInfo.Password) + return roomInfo, nil + } + + roomInfo, err := JoinRoom(ctx, p.roomInput) + if err != nil { + return nil, fmt.Errorf("join room: %w", err) + } + p.roomInfo = roomInfo + log.Printf("SaluteJazz joining room: %s", roomInfo.RoomID) + return roomInfo, nil +} + +func (p *Peer) keepAlive(session *peerSession) { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-p.closeCh: + return + case <-ticker.C: + if err := p.writeControl(session, websocket.PingMessage, []byte{}); err != nil { + p.signalReconnectIfCurrent(session, "websocket ping failed") + return + } + } + } +} + +func (p *Peer) handleIncomingMessage(data []byte) { + payload, ok := DecodeDataPacket(data) + if !ok { + debugf("SaluteJazz DataChannel: failed to decode packet, using raw payload") + payload = data + } + + if p.onData != nil && len(payload) > 0 { + p.onData(payload) + } +} + +func (p *Peer) handleSignaling(session *peerSession) { + for { + var msg map[string]any + if err := session.ws.ReadJSON(&msg); err != nil { + if p.isCurrentSession(session) && !p.closed.Load() { + debugf("SaluteJazz signaling read error: %v", err) + p.signalReconnectIfCurrent(session, "signaling read failed") + } + return + } + if err := session.ws.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil { + if p.isCurrentSession(session) && !p.closed.Load() { + debugf("SaluteJazz signaling deadline update failed: %v", err) + p.signalReconnectIfCurrent(session, "signaling deadline update failed") + } + return + } + + event := stringValue(msg, "event") + payload := mapValue(msg, "payload") + + switch event { + case "join-response": + p.handleJoinResponse(session, payload) + case "media-out": + p.handleMediaOut(session, payload) + } + } +} + +func (p *Peer) handleJoinResponse(session *peerSession, payload map[string]any) { + group := mapValue(payload, "participantGroup") + session.groupID = stringValue(group, "groupId") + debugf("SaluteJazz peer joined: groupId=%s", session.groupID) +} + +func (p *Peer) handleMediaOut(session *peerSession, payload map[string]any) { + method := stringValue(payload, "method") + + switch method { + case "rtc:config": + p.handleRTCConfig(session, payload) + case "rtc:offer": + p.handleSubscriberOffer(session, payload) + case "rtc:answer": + p.handlePublisherAnswer(session, payload) + case "rtc:ice": + p.handleICE(session, payload) + } +} + +func (p *Peer) handleRTCConfig(session *peerSession, payload map[string]any) { + config := mapValue(payload, "configuration") + servers := sliceValue(config, "iceServers") + + iceServers := make([]webrtc.ICEServer, 0, len(servers)) + for _, rawServer := range servers { + server, ok := rawServer.(map[string]any) + if !ok { + continue + } + rawURLs := sliceValue(server, "urls") + username := stringValue(server, "username") + credential := stringValue(server, "credential") + + urls := make([]string, 0, len(rawURLs)) + for _, rawURL := range rawURLs { + if url, ok := rawURL.(string); ok && url != "" { + urls = append(urls, url) + } + } + if len(urls) > 0 { + iceServers = append(iceServers, webrtc.ICEServer{ + URLs: urls, + Username: username, + Credential: credential, + }) + } + } + + if len(iceServers) == 0 { + return + } + + newConfig := webrtc.Configuration{ + ICEServers: iceServers, + SDPSemantics: webrtc.SDPSemanticsUnifiedPlan, + BundlePolicy: webrtc.BundlePolicyMaxBundle, + } + if err := session.pcSub.SetConfiguration(newConfig); err != nil { + debugf("SaluteJazz subscriber SetConfiguration failed: %v", err) + } + if err := session.pcPub.SetConfiguration(newConfig); err != nil { + debugf("SaluteJazz publisher SetConfiguration failed: %v", err) + } +} + +func (p *Peer) handleSubscriberOffer(session *peerSession, payload map[string]any) { + desc := mapValue(payload, "description") + sdp := stringValue(desc, "sdp") + if sdp == "" { + debugf("SaluteJazz subscriber offer missing SDP") + return + } + + if err := session.pcSub.SetRemoteDescription(webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: sdp}); err != nil { + debugf("SaluteJazz subscriber SetRemoteDescription failed: %v", err) + return + } + + answer, err := session.pcSub.CreateAnswer(nil) + if err != nil { + debugf("SaluteJazz subscriber CreateAnswer failed: %v", err) + return + } + if err := session.pcSub.SetLocalDescription(answer); err != nil { + debugf("SaluteJazz subscriber SetLocalDescription failed: %v", err) + return + } + + if err := p.writeJSON(session, map[string]any{ + "roomId": session.roomInfo.RoomID, + "event": "media-in", + "groupId": session.groupID, + "requestId": uuid.New().String(), + "payload": map[string]any{ + "method": "rtc:answer", + "description": map[string]any{ + "type": "answer", + "sdp": answer.SDP, + }, + }, + }); err != nil { + debugf("SaluteJazz subscriber answer send failed: %v", err) + return + } + + time.Sleep(300 * time.Millisecond) + p.sendPublisherOffer(session) +} + +func (p *Peer) sendPublisherOffer(session *peerSession) { + offer, err := session.pcPub.CreateOffer(nil) + if err != nil { + debugf("SaluteJazz publisher CreateOffer failed: %v", err) + return + } + + if err := session.pcPub.SetLocalDescription(offer); err != nil { + debugf("SaluteJazz publisher SetLocalDescription failed: %v", err) + return + } + + if err := p.writeJSON(session, map[string]any{ + "roomId": session.roomInfo.RoomID, + "event": "media-in", + "groupId": session.groupID, + "requestId": uuid.New().String(), + "payload": map[string]any{ + "method": "rtc:offer", + "description": map[string]any{ + "type": "offer", + "sdp": offer.SDP, + }, + }, + }); err != nil { + debugf("SaluteJazz publisher offer send failed: %v", err) + } +} + +func (p *Peer) handlePublisherAnswer(session *peerSession, payload map[string]any) { + desc := mapValue(payload, "description") + sdp := stringValue(desc, "sdp") + if sdp == "" { + debugf("SaluteJazz publisher answer missing SDP") + return + } + if err := session.pcPub.SetRemoteDescription(webrtc.SessionDescription{Type: webrtc.SDPTypeAnswer, SDP: sdp}); err != nil { + debugf("SaluteJazz publisher SetRemoteDescription failed: %v", err) + } +} + +func (p *Peer) handleICE(session *peerSession, payload map[string]any) { + candidates := sliceValue(payload, "rtcIceCandidates") + + for _, rawCandidate := range candidates { + cand, ok := rawCandidate.(map[string]any) + if !ok { + continue + } + candidate := stringValue(cand, "candidate") + target := stringValue(cand, "target") + sdpMid := stringValue(cand, "sdpMid") + sdpMLineIndex, ok := floatValue(cand, "sdpMLineIndex") + if candidate == "" || target == "" || !ok { + continue + } + + index := uint16(sdpMLineIndex) + init := webrtc.ICECandidateInit{ + Candidate: candidate, + SDPMid: &sdpMid, + SDPMLineIndex: &index, + } + + switch target { + case "SUBSCRIBER": + if err := session.pcSub.AddICECandidate(init); err != nil { + debugf("SaluteJazz subscriber ICE apply failed: %v", err) + } + case "PUBLISHER": + if err := session.pcPub.AddICECandidate(init); err != nil { + debugf("SaluteJazz publisher ICE apply failed: %v", err) + } + } + } +} + +func (p *Peer) sendJoin(session *peerSession) error { + return p.writeJSON(session, map[string]any{ + "roomId": session.roomInfo.RoomID, + "event": "join", + "requestId": uuid.New().String(), + "payload": map[string]any{ + "password": session.roomInfo.Password, + "participantName": p.name, + "supportedFeatures": map[string]any{ + "attachedRooms": true, + "sessionGroups": true, + "transcription": true, + }, + "isSilent": false, + }, + }) +} + +func (p *Peer) writeJSON(session *peerSession, payload any) error { + p.wsMu.Lock() + defer p.wsMu.Unlock() + + if session.ws == nil { + return fmt.Errorf("websocket is closed") + } + + return session.ws.WriteJSON(payload) +} + +func (p *Peer) writeControl(session *peerSession, messageType int, data []byte) error { + p.wsMu.Lock() + defer p.wsMu.Unlock() + + if session.ws == nil { + return fmt.Errorf("websocket is closed") + } + + return session.ws.WriteControl(messageType, data, time.Now().Add(10*time.Second)) +} + +func (p *Peer) cleanupCurrentSession() { + p.sessionMu.Lock() + session := p.session + p.session = nil + p.sessionMu.Unlock() + + p.cleanupSession(session) +} + +func (p *Peer) cleanupSession(session *peerSession) { + if session == nil { + return + } + + if session.connected.Swap(false) { + log.Printf("SaluteJazz DataChannel closed") + } + + if session.dc != nil { + _ = session.dc.Close() + } + if session.pcPub != nil { + _ = session.pcPub.Close() + } + if session.pcSub != nil { + _ = session.pcSub.Close() + } + if session.ws != nil { + p.wsMu.Lock() + if err := session.ws.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(time.Second)); err != nil { + debugf("SaluteJazz websocket close control failed: %v", err) + } + _ = session.ws.Close() + p.wsMu.Unlock() + } +} + +func (p *Peer) isCurrentSession(session *peerSession) bool { + p.sessionMu.RLock() + defer p.sessionMu.RUnlock() + return p.session == session +} + +func (p *Peer) signalReconnectIfCurrent(session *peerSession, reason string) { + if p.closed.Load() || !p.isCurrentSession(session) { + return + } + + if session.connected.Swap(false) { + log.Printf("SaluteJazz DataChannel closed") + } + if reason != "" { + debugf("SaluteJazz DataChannel disconnect reason: %s", reason) + } + + select { + case p.reconnectCh <- struct{}{}: + default: + } +} + +func ptr[T any](value T) *T { + return &value +} + +func stringValue(values map[string]any, key string) string { + value, ok := values[key].(string) + if !ok { + return "" + } + return value +} + +func mapValue(values map[string]any, key string) map[string]any { + value, ok := values[key].(map[string]any) + if !ok { + return nil + } + return value +} + +func sliceValue(values map[string]any, key string) []any { + value, ok := values[key].([]any) + if !ok { + return nil + } + return value +} + +func floatValue(values map[string]any, key string) (float64, bool) { + value, ok := values[key].(float64) + return value, ok +} diff --git a/server/dc.go b/server/dc.go new file mode 100644 index 0000000..dc0de5f --- /dev/null +++ b/server/dc.go @@ -0,0 +1,81 @@ +package main + +import ( + "context" + "errors" + "log" + "net" + + "github.com/cacggghp/vk-turn-proxy/internal/jazz" + "github.com/cacggghp/vk-turn-proxy/internal/telemost" +) + +type dataChannelPeer interface { + Send([]byte) error + Close() error +} + +type dataChannelConnectFunc func(context.Context, string, func([]byte), func()) (dataChannelPeer, error) + +func connectTelemostDataChannelPeer(ctx context.Context, room string, onData func([]byte), onReconnect func()) (dataChannelPeer, error) { + return telemost.NewConnectedPeer(ctx, room, onData, onReconnect) +} + +func connectJazzDataChannelPeer(ctx context.Context, room string, onData func([]byte), onReconnect func()) (dataChannelPeer, error) { + return jazz.NewConnectedPeer(ctx, room, onData, onReconnect) +} + +func runTelemostDataChannelMode(ctx context.Context, inviteLink, connectAddr string) error { + return runDataChannelMode(ctx, "Telemost", connectTelemostDataChannelPeer, inviteLink, connectAddr) +} + +func runJazzDataChannelMode(ctx context.Context, room, connectAddr string) error { + return runDataChannelMode(ctx, "SaluteJazz", connectJazzDataChannelPeer, room, connectAddr) +} + +func runDataChannelMode(ctx context.Context, providerName string, connectPeer dataChannelConnectFunc, room, connectAddr string) error { + backendConn, err := net.Dial("udp", connectAddr) + if err != nil { + return err + } + defer func(backendConn net.Conn) { + err := backendConn.Close() + if err != nil { + log.Println(err) + } + }(backendConn) + + peer, err := connectPeer(ctx, room, func(data []byte) { + if _, writeErr := backendConn.Write(data); writeErr != nil { + log.Printf("%s DataChannel: failed to write backend packet: %v", providerName, writeErr) + } + }, nil) + if err != nil { + return err + } + defer func(peer dataChannelPeer) { + err := peer.Close() + if err != nil { + log.Println(err) + } + }(peer) + + closeOnContextDone(ctx, backendConn) + + log.Printf("%s DataChannel mode: forwarding to %s", providerName, connectAddr) + + buf := make([]byte, 2048) + for { + n, err := backendConn.Read(buf) + if err != nil { + if ctx.Err() != nil || errors.Is(err, net.ErrClosed) { + return nil + } + return err + } + + if err := peer.Send(buf[:n]); err != nil { + log.Printf("%s DataChannel: dropped backend packet (%d bytes): %v", providerName, n, err) + } + } +} diff --git a/server/telemostdc_vless.go b/server/dc_vless.go similarity index 66% rename from server/telemostdc_vless.go rename to server/dc_vless.go index 38abcd6..4cca8c7 100644 --- a/server/telemostdc_vless.go +++ b/server/dc_vless.go @@ -12,10 +12,9 @@ import ( "time" "github.com/cacggghp/vk-turn-proxy/internal/dcmux" - "github.com/cacggghp/vk-turn-proxy/internal/telemost" ) -type telemostBackendStream struct { +type dcBackendStream struct { conn net.Conn ctx context.Context cancel context.CancelFunc @@ -25,9 +24,9 @@ type telemostBackendStream struct { closed atomic.Bool } -func newTelemostBackendStream(parent context.Context, conn net.Conn) *telemostBackendStream { +func newDCBackendStream(parent context.Context, conn net.Conn) *dcBackendStream { ctx, cancel := context.WithCancel(parent) - return &telemostBackendStream{ + return &dcBackendStream{ conn: conn, ctx: ctx, cancel: cancel, @@ -35,7 +34,7 @@ func newTelemostBackendStream(parent context.Context, conn net.Conn) *telemostBa } } -func (s *telemostBackendStream) Close() { +func (s *dcBackendStream) Close() { s.closeOnce.Do(func() { s.closed.Store(true) s.cancel() @@ -45,7 +44,7 @@ func (s *telemostBackendStream) Close() { }) } -func enqueueBackendData(stream *telemostBackendStream, data []byte) error { +func enqueueBackendData(stream *dcBackendStream, data []byte) error { if stream.closed.Load() { return context.Canceled } @@ -60,7 +59,7 @@ func enqueueBackendData(stream *telemostBackendStream, data []byte) error { } } -func (s *telemostBackendStream) write(data []byte) error { +func (s *dcBackendStream) write(data []byte) error { if s.closed.Load() { return net.ErrClosed } @@ -82,7 +81,7 @@ func (s *telemostBackendStream) write(data []byte) error { return nil } -func handleTelemostBackendStream(streamID uint16, stream *telemostBackendStream, mux *dcmux.Multiplexer, closeStream func(uint16), closeMuxStream func(uint16)) { +func handleDCBackendStream(streamID uint16, stream *dcBackendStream, mux *dcmux.Multiplexer, closeStream func(uint16), closeMuxStream func(uint16)) { defer closeStream(streamID) defer closeMuxStream(streamID) @@ -97,12 +96,8 @@ func handleTelemostBackendStream(streamID uint16, stream *telemostBackendStream, for { n, readErr := stream.conn.Read(buf) if readErr != nil { - if errors.Is(readErr, io.EOF) || errors.Is(readErr, net.ErrClosed) { - if telemost.DebugEnabled() { - log.Printf("Telemost DataChannel VLESS backend stream closed: %v", readErr) - } - } else { - log.Printf("Telemost DataChannel VLESS backend read error: %v", readErr) + if !errors.Is(readErr, io.EOF) && !errors.Is(readErr, net.ErrClosed) { + log.Printf("DataChannel VLESS backend read error: %v", readErr) } return } @@ -125,7 +120,7 @@ func handleTelemostBackendStream(streamID uint16, stream *telemostBackendStream, if errors.Is(err, net.ErrClosed) || errors.Is(err, context.Canceled) || stream.closed.Load() { return } - log.Printf("Telemost DataChannel VLESS backend write error: %v", err) + log.Printf("DataChannel VLESS backend write error: %v", err) return } } @@ -136,12 +131,20 @@ func handleTelemostBackendStream(streamID uint16, stream *telemostBackendStream, } func runTelemostDataChannelVLESSMode(ctx context.Context, inviteLink, connectAddr string) error { + return runDataChannelVLESSMode(ctx, "Telemost", connectTelemostDataChannelPeer, inviteLink, connectAddr) +} + +func runJazzDataChannelVLESSMode(ctx context.Context, room, connectAddr string) error { + return runDataChannelVLESSMode(ctx, "SaluteJazz", connectJazzDataChannelPeer, room, connectAddr) +} + +func runDataChannelVLESSMode(ctx context.Context, providerName string, connectPeer dataChannelConnectFunc, room, connectAddr string) error { var ( connMu sync.Mutex - conns = make(map[uint16]*telemostBackendStream) + conns = make(map[uint16]*dcBackendStream) ) - var peer *telemost.Peer + var peer dataChannelPeer mux := dcmux.New(0, func(frame []byte) error { return peer.Send(frame) }) @@ -158,7 +161,7 @@ func runTelemostDataChannelVLESSMode(ctx context.Context, inviteLink, connectAdd closeAll := func() { connMu.Lock() - streams := make([]*telemostBackendStream, 0, len(conns)) + streams := make([]*dcBackendStream, 0, len(conns)) for sid, stream := range conns { streams = append(streams, stream) delete(conns, sid) @@ -175,11 +178,11 @@ func runTelemostDataChannelVLESSMode(ctx context.Context, inviteLink, connectAdd return } if err := mux.CloseStream(sid); err != nil { - log.Printf("Telemost DataChannel VLESS server: failed to close mux stream %d: %v", sid, err) + log.Printf("%s DataChannel VLESS server: failed to close mux stream %d: %v", providerName, sid, err) } } - getOrCreateBackendStream := func(sid uint16) (*telemostBackendStream, error) { + getOrCreateBackendStream := func(sid uint16) (*dcBackendStream, error) { connMu.Lock() stream := conns[sid] connMu.Unlock() @@ -193,7 +196,7 @@ func runTelemostDataChannelVLESSMode(ctx context.Context, inviteLink, connectAdd return nil, err } - stream = newTelemostBackendStream(ctx, conn) + stream = newDCBackendStream(ctx, conn) connMu.Lock() if existing := conns[sid]; existing != nil { @@ -204,28 +207,26 @@ func runTelemostDataChannelVLESSMode(ctx context.Context, inviteLink, connectAdd conns[sid] = stream connMu.Unlock() - go handleTelemostBackendStream(sid, stream, mux, closeStream, closeMuxStream) + go handleDCBackendStream(sid, stream, mux, closeStream, closeMuxStream) return stream, nil } - peer, err := telemost.NewConnectedPeer(ctx, inviteLink, mux.HandleFrame, func() { - if telemost.DebugEnabled() { - log.Printf("Telemost DataChannel VLESS server: peer reconnected, closing active backend streams") - } + peer, err := connectPeer(ctx, room, mux.HandleFrame, func() { + log.Printf("%s DataChannel VLESS server: peer reconnected, closing active backend streams", providerName) closeAll() mux.Reset() }) if err != nil { return err } - defer func(peer *telemost.Peer) { + defer func(peer dataChannelPeer) { err := peer.Close() if err != nil { log.Println(err) } }(peer) - log.Printf("Telemost DataChannel VLESS server: forwarding to %s", connectAddr) + log.Printf("%s DataChannel VLESS server: forwarding to %s", providerName, connectAddr) activityCh := mux.WaitForActivity() for { @@ -241,13 +242,13 @@ func runTelemostDataChannelVLESSMode(ctx context.Context, inviteLink, connectAdd if len(data) > 0 { stream, err := getOrCreateBackendStream(sid) if err != nil { - log.Printf("Telemost DataChannel VLESS backend dial error: %v", err) + log.Printf("%s DataChannel VLESS backend dial error: %v", providerName, err) closeMuxStream(sid) continue } if err := enqueueBackendData(stream, data); err != nil { if !errors.Is(err, context.Canceled) { - log.Printf("Telemost DataChannel VLESS backend stream %d stalled: %v", sid, err) + log.Printf("%s DataChannel VLESS backend stream %d stalled: %v", providerName, sid, err) } closeStream(sid) closeMuxStream(sid) diff --git a/server/telemostdc_vless_test.go b/server/dc_vless_test.go similarity index 82% rename from server/telemostdc_vless_test.go rename to server/dc_vless_test.go index 6c31348..8cd8fd2 100644 --- a/server/telemostdc_vless_test.go +++ b/server/dc_vless_test.go @@ -16,7 +16,7 @@ func TestEnqueueBackendDataReturnsWhenQueueIsFull(t *testing.T) { _ = serverConn.Close() }() - stream := newTelemostBackendStream(context.Background(), clientConn) + stream := newDCBackendStream(context.Background(), clientConn) defer stream.Close() stream.writeCh = make(chan []byte, 1) @@ -44,7 +44,7 @@ func TestEnqueueBackendDataReturnsCanceledForClosedStream(t *testing.T) { _ = serverConn.Close() }() - stream := newTelemostBackendStream(context.Background(), clientConn) + stream := newDCBackendStream(context.Background(), clientConn) stream.Close() err := enqueueBackendData(stream, []byte("next")) @@ -53,14 +53,14 @@ func TestEnqueueBackendDataReturnsCanceledForClosedStream(t *testing.T) { } } -func TestTelemostBackendStreamWriteReturnsNetErrClosedAfterClose(t *testing.T) { +func TestDCBackendStreamWriteReturnsNetErrClosedAfterClose(t *testing.T) { clientConn, serverConn := net.Pipe() defer func() { _ = clientConn.Close() _ = serverConn.Close() }() - stream := newTelemostBackendStream(context.Background(), clientConn) + stream := newDCBackendStream(context.Background(), clientConn) stream.Close() err := stream.write([]byte("next")) diff --git a/server/main.go b/server/main.go index 8f08bf1..c14ea84 100644 --- a/server/main.go +++ b/server/main.go @@ -16,6 +16,7 @@ import ( "time" "github.com/cacggghp/vk-turn-proxy/internal/cliutil" + "github.com/cacggghp/vk-turn-proxy/internal/jazz" "github.com/cacggghp/vk-turn-proxy/internal/telemost" "github.com/cacggghp/vk-turn-proxy/tcputil" "github.com/pion/dtls/v3" @@ -24,12 +25,13 @@ import ( ) type serverOptions struct { - listen string - connect string - yalink string - vlessMode bool - telemostDC bool - debug bool + listen string + connect string + yalink string + jazzRoom string + vlessMode bool + dc bool + debug bool } func newServerFlagSet(program string, output io.Writer) (*flag.FlagSet, *serverOptions) { @@ -40,16 +42,17 @@ func newServerFlagSet(program string, output io.Writer) (*flag.FlagSet, *serverO fs.StringVar(&opts.listen, "listen", "0.0.0.0:56000", "listen on ip:port") fs.StringVar(&opts.connect, "connect", "", "connect to ip:port") fs.StringVar(&opts.yalink, "yandex-link", "", "Yandex Telemost invite link \"https://telemost.yandex.ru/j/...\"") + fs.StringVar(&opts.jazzRoom, "jazz-room", "", "SaluteJazz room \"roomId[:password]\" (use \"any\" to create)") fs.BoolVar(&opts.vlessMode, "vless", false, "VLESS mode: forward TCP connections (for VLESS) instead of UDP packets") - fs.BoolVar(&opts.telemostDC, "telemost-dc", false, "use Yandex Telemost DataChannel instead of DTLS listener") - fs.BoolVar(&opts.telemostDC, "telemost-datachannel", false, "use Yandex Telemost DataChannel instead of DTLS listener") + fs.BoolVar(&opts.dc, "dc", false, "use WebRTC DataChannel instead of DTLS listener") fs.BoolVar(&opts.debug, "debug", false, "enable debug logging") fs.Usage = func() { cliutil.Fprintf(fs.Output(), "Usage:\n %s -connect [flags]\n\n", program) cliutil.Fprintln(fs.Output(), "Examples:") cliutil.Fprintf(fs.Output(), " %s -connect 127.0.0.1:51820\n", program) cliutil.Fprintf(fs.Output(), " %s -listen 0.0.0.0:56000 -connect 127.0.0.1:51820 -vless\n", program) - cliutil.Fprintf(fs.Output(), " %s -connect 127.0.0.1:51820 -yandex-link https://telemost.yandex.ru/j/... -telemost-dc\n\n", program) + cliutil.Fprintf(fs.Output(), " %s -connect 127.0.0.1:51820 -yandex-link https://telemost.yandex.ru/j/... -dc\n", program) + cliutil.Fprintf(fs.Output(), " %s -connect 127.0.0.1:51820 -jazz-room any -dc\n\n", program) cliutil.Fprintln(fs.Output(), "Flags:") fs.PrintDefaults() } @@ -62,10 +65,11 @@ func parseServerOptions(args []string, program string, stdout, stderr io.Writer) if opts.connect == "" { return fmt.Errorf("-connect is required") } - if opts.telemostDC { - if opts.yalink == "" { - return fmt.Errorf("-telemost-dc requires -yandex-link") - } + if opts.dc && (opts.yalink == "") == (opts.jazzRoom == "") { + return fmt.Errorf("-dc requires exactly one of -yandex-link or -jazz-room") + } + if opts.jazzRoom != "" && !opts.dc { + return fmt.Errorf("-jazz-room requires -dc") } return nil }) @@ -79,6 +83,14 @@ func runSelectedTelemostDataChannelMode(ctx context.Context, inviteLink, connect return runTelemostDataChannelMode(ctx, inviteLink, connectAddr) } +func runSelectedJazzDataChannelMode(ctx context.Context, room, connectAddr string, vlessMode bool) error { + if vlessMode { + return runJazzDataChannelVLESSMode(ctx, room, connectAddr) + } + + return runJazzDataChannelMode(ctx, room, connectAddr) +} + func closeOnContextDone(ctx context.Context, closer io.Closer) { go func() { <-ctx.Done() @@ -105,13 +117,20 @@ func main() { }() telemost.SetDebug(opts.debug) + jazz.SetDebug(opts.debug) - if opts.telemostDC { + if opts.dc && opts.yalink != "" { if err := runSelectedTelemostDataChannelMode(ctx, opts.yalink, opts.connect, opts.vlessMode); err != nil { log.Fatalf("Telemost DataChannel mode failed: %v", err) } return } + if opts.dc && opts.jazzRoom != "" { + if err := runSelectedJazzDataChannelMode(ctx, opts.jazzRoom, opts.connect, opts.vlessMode); err != nil { + log.Fatalf("SaluteJazz DataChannel mode failed: %v", err) + } + return + } listenAddr, err := net.ResolveUDPAddr("udp", opts.listen) if err != nil { diff --git a/server/main_test.go b/server/main_test.go index 0e7b368..e44bff9 100644 --- a/server/main_test.go +++ b/server/main_test.go @@ -98,7 +98,7 @@ func TestParseServerOptionsParsesTelemostDataChannelArgs(t *testing.T) { opts, exitCode := parseServerOptions([]string{ "-connect", "127.0.0.1:51820", "-yandex-link", "https://telemost.yandex.ru/j/test", - "-telemost-dc", + "-dc", }, "server", &stdout, &stderr) if exitCode != cliutil.ContinueExecution { t.Fatalf("parseServerOptions() exitCode = %d, want %d", exitCode, cliutil.ContinueExecution) @@ -106,15 +106,15 @@ func TestParseServerOptionsParsesTelemostDataChannelArgs(t *testing.T) { if stderr.Len() != 0 { t.Fatalf("expected no stderr output, got %q", stderr.String()) } - if !opts.telemostDC { - t.Fatal("telemostDC = false, want true") + if !opts.dc { + t.Fatal("dc = false, want true") } if opts.yalink == "" { t.Fatal("yalink = empty, want yandex link") } } -func TestParseServerOptionsRejectsTelemostDataChannelWithoutYandexLink(t *testing.T) { +func TestParseServerOptionsRejectsDataChannelWithoutRoom(t *testing.T) { t.Parallel() var stdout bytes.Buffer @@ -122,13 +122,13 @@ func TestParseServerOptionsRejectsTelemostDataChannelWithoutYandexLink(t *testin _, exitCode := parseServerOptions([]string{ "-connect", "127.0.0.1:51820", - "-telemost-dc", + "-dc", }, "server", &stdout, &stderr) if exitCode != 2 { t.Fatalf("parseServerOptions() exitCode = %d, want 2", exitCode) } - if got := stderr.String(); !strings.Contains(got, "-telemost-dc requires -yandex-link") { - t.Fatalf("expected telemost-dc validation error, got %q", got) + if got := stderr.String(); !strings.Contains(got, "-dc requires exactly one of -yandex-link or -jazz-room") { + t.Fatalf("expected dc validation error, got %q", got) } } @@ -141,7 +141,7 @@ func TestParseServerOptionsAllowsTelemostDataChannelWithVLESS(t *testing.T) { opts, exitCode := parseServerOptions([]string{ "-connect", "127.0.0.1:51820", "-yandex-link", "https://telemost.yandex.ru/j/test", - "-telemost-dc", + "-dc", "-vless", }, "server", &stdout, &stderr) if exitCode != cliutil.ContinueExecution { @@ -150,7 +150,68 @@ func TestParseServerOptionsAllowsTelemostDataChannelWithVLESS(t *testing.T) { if stderr.Len() != 0 { t.Fatalf("expected no stderr output, got %q", stderr.String()) } - if !opts.telemostDC || !opts.vlessMode { - t.Fatalf("expected telemostDC and vlessMode to be true, got telemostDC=%v vlessMode=%v", opts.telemostDC, opts.vlessMode) + if !opts.dc || !opts.vlessMode { + t.Fatalf("expected dc and vlessMode to be true, got dc=%v vlessMode=%v", opts.dc, opts.vlessMode) + } +} + +func TestParseServerOptionsParsesJazzDataChannelArgs(t *testing.T) { + t.Parallel() + + var stdout bytes.Buffer + var stderr bytes.Buffer + + opts, exitCode := parseServerOptions([]string{ + "-connect", "127.0.0.1:51820", + "-jazz-room", "any", + "-dc", + }, "server", &stdout, &stderr) + if exitCode != cliutil.ContinueExecution { + t.Fatalf("parseServerOptions() exitCode = %d, want %d", exitCode, cliutil.ContinueExecution) + } + if stderr.Len() != 0 { + t.Fatalf("expected no stderr output, got %q", stderr.String()) + } + if !opts.dc { + t.Fatal("dc = false, want true") + } + if opts.jazzRoom != "any" { + t.Fatalf("jazzRoom = %q, want any", opts.jazzRoom) + } +} + +func TestParseServerOptionsRejectsJazzDataChannelWithoutRoom(t *testing.T) { + t.Parallel() + + var stdout bytes.Buffer + var stderr bytes.Buffer + + _, exitCode := parseServerOptions([]string{ + "-connect", "127.0.0.1:51820", + "-dc", + }, "server", &stdout, &stderr) + if exitCode != 2 { + t.Fatalf("parseServerOptions() exitCode = %d, want 2", exitCode) + } + if got := stderr.String(); !strings.Contains(got, "-dc requires exactly one of -yandex-link or -jazz-room") { + t.Fatalf("expected dc validation error, got %q", got) + } +} + +func TestParseServerOptionsRejectsJazzRoomWithoutDataChannel(t *testing.T) { + t.Parallel() + + var stdout bytes.Buffer + var stderr bytes.Buffer + + _, exitCode := parseServerOptions([]string{ + "-connect", "127.0.0.1:51820", + "-jazz-room", "any", + }, "server", &stdout, &stderr) + if exitCode != 2 { + t.Fatalf("parseServerOptions() exitCode = %d, want 2", exitCode) + } + if got := stderr.String(); !strings.Contains(got, "-jazz-room requires -dc") { + t.Fatalf("expected jazz room validation error, got %q", got) } } diff --git a/server/telemostdc.go b/server/telemostdc.go deleted file mode 100644 index 1446061..0000000 --- a/server/telemostdc.go +++ /dev/null @@ -1,57 +0,0 @@ -package main - -import ( - "context" - "errors" - "log" - "net" - - "github.com/cacggghp/vk-turn-proxy/internal/telemost" -) - -func runTelemostDataChannelMode(ctx context.Context, inviteLink, connectAddr string) error { - backendConn, err := net.Dial("udp", connectAddr) - if err != nil { - return err - } - defer func(backendConn net.Conn) { - err := backendConn.Close() - if err != nil { - log.Println(err) - } - }(backendConn) - - peer, err := telemost.NewConnectedPeer(ctx, inviteLink, func(data []byte) { - if _, writeErr := backendConn.Write(data); writeErr != nil { - log.Printf("Telemost DataChannel: failed to write backend packet: %v", writeErr) - } - }, nil) - if err != nil { - return err - } - defer func(peer *telemost.Peer) { - err := peer.Close() - if err != nil { - log.Println(err) - } - }(peer) - - closeOnContextDone(ctx, backendConn) - - log.Printf("Telemost DataChannel mode: forwarding to %s", connectAddr) - - buf := make([]byte, 2048) - for { - n, err := backendConn.Read(buf) - if err != nil { - if ctx.Err() != nil || errors.Is(err, net.ErrClosed) { - return nil - } - return err - } - - if err := peer.Send(buf[:n]); err != nil { - log.Printf("Telemost DataChannel: dropped backend packet (%d bytes): %v", n, err) - } - } -}