Browse Source

feat: Add SaluteJazz DC

pull/135/head
alxmcp 3 months ago
parent
commit
f13b55797e
  1. 44
      README.md
  2. 88
      client/dc.go
  3. 25
      client/dc_vless.go
  4. 49
      client/main.go
  5. 68
      client/main_test.go
  6. 64
      client/telemostdc.go
  7. 205
      internal/jazz/api.go
  8. 137
      internal/jazz/datapacket.go
  9. 769
      internal/jazz/peer.go
  10. 81
      server/dc.go
  11. 61
      server/dc_vless.go
  12. 8
      server/dc_vless_test.go
  13. 47
      server/main.go
  14. 81
      server/main_test.go
  15. 57
      server/telemostdc.go

44
README.md

@ -7,7 +7,6 @@
## Содержание
- [Good TURN](#good-turn)
- [Режимы работы](#режимы-работы)
- [Похожие проекты](#похожие-проекты)
- [Server & Cli](#server--cli)
- [Android](#android)
@ -31,20 +30,13 @@
- [Сервер (VPS)](#сервер-vps)
- [Docker](#docker-1)
- [Клиент](#клиент-1)
- [Telemost DataChannel](#telemost-datachannel)
- [Сервер](#сервер-1)
- [Клиент](#клиент-2)
- [VLESS через Telemost DataChannel](#vless-через-telemost-datachannel)
- [DataChannel](#datachannel)
- [Telemost сервер](#telemost-сервер)
- [Telemost клиент](#telemost-клиент)
- [SaluteJazz сервер](#salutejazz-сервер)
- [SaluteJazz клиент](#salutejazz-клиент)
- [Direct mode](#direct-mode)
## Режимы работы
| Режим | Клиент | Сервер | Полезная нагрузка | Транспорт |
|-----------------------------------------------|---------------------------------|---------------------------------|------------------------|------------------------------------------------------------------------------|
| VK TURN | `-vk-link ...` | `-connect ...` | UDP-пакеты WireGuard | TURN поверх TCP по умолчанию, либо UDP с `-udp` |
| Telemost TURN _(не работает)_ | `-yandex-link ...` | `-connect ...` | UDP-пакеты WireGuard | TURN поверх TCP по умолчанию, либо UDP с `-udp`; `-n` по умолчанию `1` |
| [VLESS](#vless-режим) | `-vless` | `-vless` | TCP-потоки | TURN поверх TCP по умолчанию, либо UDP с `-udp`, дальше `DTLS -> KCP + smux` |
| [Telemost DataChannel](#telemost-datachannel) | `-telemost-dc -yandex-link ...` | `-telemost-dc -yandex-link ...` | UDP или TCP (`-vless`) | WebRTC DataChannel, без TURN |
## Похожие проекты
@ -576,40 +568,42 @@ Xray сервер (config.json)
</details>
## Telemost DataChannel
## DataChannel
UPD: Яндекс Телемост не работает.
Для Яндекс Телемоста теперь есть альтернативный режим без TURN: `-telemost-dc`.
Для Яндекс Телемоста и SaluteJazz теперь есть альтернативный режим без TURN: `-dc`.
Режим работает как для обычного UDP/WireGuard-сценария, так и для `-vless`.
### Сервер
### Telemost сервер
```
./server -connect 127.0.0.1:<порт WG> -yandex-link https://telemost.yandex.ru/j/... -telemost-dc
./server -connect 127.0.0.1:<порт WG/VLESS> -yandex-link https://telemost.yandex.ru/j/... -dc
```
Вместо ссылки можно использовать просто ID звонка, ссылка может быть `.ru` и `.com`.
### Клиент
### Telemost клиент
```
./client -listen 127.0.0.1:9000 -yandex-link https://telemost.yandex.ru/j/... -telemost-dc
./client -listen 127.0.0.1:9000 -yandex-link https://telemost.yandex.ru/j/... -dc
```
В этом режиме флаг `-peer` не нужен: клиент и сервер встречаются внутри одной конференции Telemost по одной и той же ссылке.
### VLESS через Telemost DataChannel
Сервер:
### SaluteJazz сервер
```
./server -connect 127.0.0.1:<порт VLESS> -yandex-link https://telemost.yandex.ru/j/... -telemost-dc -vless
./server -connect 127.0.0.1:<порт WG/VLESS> -jazz-room any -dc
```
Клиент:
Сервер создаёт комнату и пишет в лог `room:password`. Вместо `any` можно внести самому существующую комнату.
### SaluteJazz клиент
```
./client -listen 127.0.0.1:9000 -yandex-link https://telemost.yandex.ru/j/... -telemost-dc -vless
./client -listen 127.0.0.1:9000 -jazz-room <room:password> -dc
```

88
client/dc.go

@ -0,0 +1,88 @@
package main
import (
"context"
"errors"
"log"
"net"
"sync/atomic"
"github.com/cacggghp/vk-turn-proxy/internal/jazz"
"github.com/cacggghp/vk-turn-proxy/internal/telemost"
)
type dataChannelPeer interface {
Send([]byte) error
Close() error
}
type dataChannelConnectFunc func(context.Context, string, func([]byte), func()) (dataChannelPeer, error)
func connectTelemostDataChannelPeer(ctx context.Context, room string, onData func([]byte), onReconnect func()) (dataChannelPeer, error) {
return telemost.NewConnectedPeer(ctx, room, onData, onReconnect)
}
func connectJazzDataChannelPeer(ctx context.Context, room string, onData func([]byte), onReconnect func()) (dataChannelPeer, error) {
return jazz.NewConnectedPeer(ctx, room, onData, onReconnect)
}
func runTelemostDataChannelMode(ctx context.Context, inviteLink, listenAddr string) error {
return runDataChannelMode(ctx, "Telemost", connectTelemostDataChannelPeer, inviteLink, listenAddr)
}
func runJazzDataChannelMode(ctx context.Context, room, listenAddr string) error {
return runDataChannelMode(ctx, "SaluteJazz", connectJazzDataChannelPeer, room, listenAddr)
}
func runDataChannelMode(ctx context.Context, providerName string, connectPeer dataChannelConnectFunc, room, listenAddr string) error {
listenConn, err := net.ListenPacket("udp", listenAddr)
if err != nil {
return err
}
defer func(listenConn net.PacketConn) {
err := listenConn.Close()
if err != nil {
log.Println(err)
}
}(listenConn)
var activeLocalPeer atomic.Value
peer, err := connectPeer(ctx, room, func(data []byte) {
addr, ok := activeLocalPeer.Load().(net.Addr)
if !ok || addr == nil {
return
}
if _, writeErr := listenConn.WriteTo(data, addr); writeErr != nil {
log.Printf("%s DataChannel: failed to write local packet: %v", providerName, writeErr)
}
}, nil)
if err != nil {
return err
}
defer func(peer dataChannelPeer) {
err := peer.Close()
if err != nil {
log.Println(err)
}
}(peer)
closeOnContextDone(ctx, listenConn)
log.Printf("%s DataChannel mode: listening on %s", providerName, listenAddr)
buf := make([]byte, 2048)
for {
n, addr, err := listenConn.ReadFrom(buf)
if err != nil {
if ctx.Err() != nil || errors.Is(err, net.ErrClosed) {
return nil
}
return err
}
activeLocalPeer.Store(addr)
if err := peer.Send(buf[:n]); err != nil {
log.Printf("%s DataChannel: dropped outbound packet (%d bytes): %v", providerName, n, err)
}
}
}

25
client/telemostdc_vless.go → client/dc_vless.go

@ -9,10 +9,17 @@ import (
"time"
"github.com/cacggghp/vk-turn-proxy/internal/dcmux"
"github.com/cacggghp/vk-turn-proxy/internal/telemost"
)
func runTelemostDataChannelVLESSMode(ctx context.Context, inviteLink, listenAddr string) error {
return runDataChannelVLESSMode(ctx, "Telemost", connectTelemostDataChannelPeer, inviteLink, listenAddr)
}
func runJazzDataChannelVLESSMode(ctx context.Context, room, listenAddr string) error {
return runDataChannelVLESSMode(ctx, "SaluteJazz", connectJazzDataChannelPeer, room, listenAddr)
}
func runDataChannelVLESSMode(ctx context.Context, providerName string, connectPeer dataChannelConnectFunc, room, listenAddr string) error {
var (
connMu sync.Mutex
conns = make(map[uint16]net.Conn)
@ -26,22 +33,20 @@ func runTelemostDataChannelVLESSMode(ctx context.Context, inviteLink, listenAddr
}
}
var peer *telemost.Peer
var peer dataChannelPeer
clientID := uint32(time.Now().UnixNano())
mux := dcmux.New(clientID, func(frame []byte) error {
return peer.Send(frame)
})
peer, err := telemost.NewConnectedPeer(ctx, inviteLink, mux.HandleFrame, func() {
if telemost.DebugEnabled() {
log.Printf("Telemost DataChannel VLESS: peer reconnected, closing active TCP streams")
}
peer, err := connectPeer(ctx, room, mux.HandleFrame, func() {
log.Printf("%s DataChannel VLESS: peer reconnected, closing active TCP streams", providerName)
closeAll()
mux.Reset()
})
if err != nil {
return err
}
defer func(peer *telemost.Peer) {
defer func(peer dataChannelPeer) {
err := peer.Close()
if err != nil {
log.Println(err)
@ -60,7 +65,7 @@ func runTelemostDataChannelVLESSMode(ctx context.Context, inviteLink, listenAddr
}(listener)
closeOnContextDone(ctx, listener)
log.Printf("Telemost DataChannel VLESS mode: listening on %s", listenAddr)
log.Printf("%s DataChannel VLESS mode: listening on %s", providerName, listenAddr)
for {
conn, err := listener.Accept()
@ -69,7 +74,7 @@ func runTelemostDataChannelVLESSMode(ctx context.Context, inviteLink, listenAddr
closeAll()
return nil
}
log.Printf("Telemost DataChannel VLESS accept error: %v", err)
log.Printf("%s DataChannel VLESS accept error: %v", providerName, err)
continue
}
@ -84,7 +89,7 @@ func runTelemostDataChannelVLESSMode(ctx context.Context, inviteLink, listenAddr
delete(conns, streamID)
connMu.Unlock()
if err := mux.CloseStream(streamID); err != nil {
log.Printf("Telemost DataChannel VLESS: failed to close mux stream %d: %v", streamID, err)
log.Printf("%s DataChannel VLESS: failed to close mux stream %d: %v", providerName, streamID, err)
}
_ = tcpConn.Close()
mux.CleanupStream(streamID)

49
client/main.go

@ -37,6 +37,7 @@ import (
"github.com/bschaatsbergen/dnsdialer"
"github.com/cacggghp/vk-turn-proxy/internal/cliutil"
"github.com/cacggghp/vk-turn-proxy/internal/jazz"
"github.com/cacggghp/vk-turn-proxy/internal/namegen"
"github.com/cacggghp/vk-turn-proxy/internal/telemost"
"github.com/cacggghp/vk-turn-proxy/tcputil"
@ -91,12 +92,13 @@ type clientOptions struct {
listen string
vklink string
yalink string
jazzRoom string
peerAddr string
n int
udp bool
direct bool
vlessMode bool
telemostDC bool
dc bool
debug bool
manualCaptcha bool
}
@ -111,13 +113,13 @@ func newClientFlagSet(program string, output io.Writer) (*flag.FlagSet, *clientO
fs.StringVar(&opts.listen, "listen", "127.0.0.1:9000", "listen on ip:port")
fs.StringVar(&opts.vklink, "vk-link", "", "VK calls invite link \"https://vk.com/call/join/...\"")
fs.StringVar(&opts.yalink, "yandex-link", "", "Yandex Telemost invite link \"https://telemost.yandex.ru/j/...\"")
fs.StringVar(&opts.jazzRoom, "jazz-room", "", "SaluteJazz room \"roomId[:password]\"")
fs.StringVar(&opts.peerAddr, "peer", "", "peer server address (host:port)")
fs.IntVar(&opts.n, "n", 0, "connections to TURN (default 10 for VK, 1 for Yandex)")
fs.BoolVar(&opts.udp, "udp", false, "connect to TURN with UDP")
fs.BoolVar(&opts.direct, "no-dtls", false, "connect without obfuscation. DO NOT USE")
fs.BoolVar(&opts.vlessMode, "vless", false, "VLESS mode: forward TCP connections (for VLESS) instead of UDP packets")
fs.BoolVar(&opts.telemostDC, "telemost-dc", false, "use Yandex Telemost DataChannel instead of TURN")
fs.BoolVar(&opts.telemostDC, "telemost-datachannel", false, "use Yandex Telemost DataChannel instead of TURN")
fs.BoolVar(&opts.dc, "dc", false, "use WebRTC DataChannel instead of TURN")
fs.BoolVar(&opts.debug, "debug", false, "enable debug logging")
fs.BoolVar(&opts.manualCaptcha, "manual-captcha", false, "skip auto captcha solving, use manual mode immediately")
fs.Usage = func() {
@ -125,7 +127,8 @@ func newClientFlagSet(program string, output io.Writer) (*flag.FlagSet, *clientO
cliutil.Fprintln(fs.Output(), "Examples:")
cliutil.Fprintf(fs.Output(), " %s -listen 127.0.0.1:9000 -peer 203.0.113.10:56000 -vk-link https://vk.com/call/join/...\n", program)
cliutil.Fprintf(fs.Output(), " %s -udp -turn 5.255.211.241 -peer 203.0.113.10:56000 -yandex-link https://telemost.yandex.ru/j/... -listen 127.0.0.1:9000\n", program)
cliutil.Fprintf(fs.Output(), " %s -listen 127.0.0.1:9000 -yandex-link https://telemost.yandex.ru/j/... -telemost-dc\n\n", program)
cliutil.Fprintf(fs.Output(), " %s -listen 127.0.0.1:9000 -yandex-link https://telemost.yandex.ru/j/... -dc\n", program)
cliutil.Fprintf(fs.Output(), " %s -listen 127.0.0.1:9000 -jazz-room room:password -dc\n\n", program)
cliutil.Fprintln(fs.Output(), "Flags:")
fs.PrintDefaults()
}
@ -135,17 +138,24 @@ func newClientFlagSet(program string, output io.Writer) (*flag.FlagSet, *clientO
func parseClientOptions(args []string, program string, stdout, stderr io.Writer) (clientOptions, int) {
return cliutil.Parse(args, program, stdout, stderr, newClientFlagSet, func(opts *clientOptions) error {
if !opts.telemostDC && opts.peerAddr == "" {
if !opts.dc && opts.peerAddr == "" {
return fmt.Errorf("-peer is required")
}
if (opts.vklink == "") == (opts.yalink == "") {
return fmt.Errorf("exactly one of -vk-link or -yandex-link is required")
}
if opts.telemostDC {
if opts.yalink == "" {
return fmt.Errorf("-telemost-dc requires -yandex-link")
linkCount := 0
for _, link := range []string{opts.vklink, opts.yalink, opts.jazzRoom} {
if link != "" {
linkCount++
}
}
if linkCount != 1 {
return fmt.Errorf("exactly one of -vk-link, -yandex-link, or -jazz-room is required")
}
if opts.jazzRoom != "" && !opts.dc {
return fmt.Errorf("-jazz-room requires -dc")
}
if opts.dc && opts.yalink == "" && opts.jazzRoom == "" {
return fmt.Errorf("-dc requires -yandex-link or -jazz-room")
}
return nil
})
}
@ -158,6 +168,14 @@ func runSelectedTelemostDataChannelMode(ctx context.Context, inviteLink, listenA
return runTelemostDataChannelMode(ctx, inviteLink, listenAddr)
}
func runSelectedJazzDataChannelMode(ctx context.Context, room, listenAddr string, vlessMode bool) error {
if vlessMode {
return runJazzDataChannelVLESSMode(ctx, room, listenAddr)
}
return runJazzDataChannelMode(ctx, room, listenAddr)
}
func closeOnContextDone(ctx context.Context, closer io.Closer) {
go func() {
<-ctx.Done()
@ -2054,15 +2072,22 @@ func main() {
isDebug = opts.debug
telemost.SetDebug(opts.debug)
jazz.SetDebug(opts.debug)
manualCaptcha = opts.manualCaptcha
autoCaptchaSliderPOC = !manualCaptcha
if opts.telemostDC {
if opts.dc && opts.yalink != "" {
if err := runSelectedTelemostDataChannelMode(ctx, opts.yalink, opts.listen, opts.vlessMode); err != nil {
log.Fatalf("Telemost DataChannel mode failed: %v", err)
}
return
}
if opts.dc && opts.jazzRoom != "" {
if err := runSelectedJazzDataChannelMode(ctx, opts.jazzRoom, opts.listen, opts.vlessMode); err != nil {
log.Fatalf("SaluteJazz DataChannel mode failed: %v", err)
}
return
}
peer, err := net.ResolveUDPAddr("udp", opts.peerAddr)
if err != nil {

68
client/main_test.go

@ -100,7 +100,7 @@ func TestParseClientOptionsParsesTelemostDataChannelArgs(t *testing.T) {
opts, exitCode := parseClientOptions([]string{
"-yandex-link", "https://telemost.yandex.ru/j/test",
"-listen", "127.0.0.1:9002",
"-telemost-dc",
"-dc",
}, "client", &stdout, &stderr)
if exitCode != cliutil.ContinueExecution {
t.Fatalf("parseClientOptions() exitCode = %d, want %d", exitCode, cliutil.ContinueExecution)
@ -108,15 +108,15 @@ func TestParseClientOptionsParsesTelemostDataChannelArgs(t *testing.T) {
if stderr.Len() != 0 {
t.Fatalf("expected no stderr output, got %q", stderr.String())
}
if !opts.telemostDC {
t.Fatal("telemostDC = false, want true")
if !opts.dc {
t.Fatal("dc = false, want true")
}
if opts.peerAddr != "" {
t.Fatalf("peerAddr = %q, want empty for telemost-dc mode", opts.peerAddr)
t.Fatalf("peerAddr = %q, want empty for dc mode", opts.peerAddr)
}
}
func TestParseClientOptionsRejectsTelemostDataChannelWithoutYandexLink(t *testing.T) {
func TestParseClientOptionsRejectsDataChannelWithoutDataChannelRoom(t *testing.T) {
t.Parallel()
var stdout bytes.Buffer
@ -124,13 +124,13 @@ func TestParseClientOptionsRejectsTelemostDataChannelWithoutYandexLink(t *testin
_, exitCode := parseClientOptions([]string{
"-vk-link", "https://vk.com/call/join/test",
"-telemost-dc",
"-dc",
}, "client", &stdout, &stderr)
if exitCode != 2 {
t.Fatalf("parseClientOptions() exitCode = %d, want 2", exitCode)
}
if got := stderr.String(); !strings.Contains(got, "-telemost-dc requires -yandex-link") {
t.Fatalf("expected telemost-dc validation error, got %q", got)
if got := stderr.String(); !strings.Contains(got, "-dc requires -yandex-link or -jazz-room") {
t.Fatalf("expected dc validation error, got %q", got)
}
}
@ -142,7 +142,7 @@ func TestParseClientOptionsAllowsTelemostDataChannelWithVLESS(t *testing.T) {
opts, exitCode := parseClientOptions([]string{
"-yandex-link", "https://telemost.yandex.ru/j/test",
"-telemost-dc",
"-dc",
"-vless",
}, "client", &stdout, &stderr)
if exitCode != cliutil.ContinueExecution {
@ -151,8 +151,54 @@ func TestParseClientOptionsAllowsTelemostDataChannelWithVLESS(t *testing.T) {
if stderr.Len() != 0 {
t.Fatalf("expected no stderr output, got %q", stderr.String())
}
if !opts.telemostDC || !opts.vlessMode {
t.Fatalf("expected telemostDC and vlessMode to be true, got telemostDC=%v vlessMode=%v", opts.telemostDC, opts.vlessMode)
if !opts.dc || !opts.vlessMode {
t.Fatalf("expected dc and vlessMode to be true, got dc=%v vlessMode=%v", opts.dc, opts.vlessMode)
}
}
func TestParseClientOptionsParsesJazzDataChannelArgs(t *testing.T) {
t.Parallel()
var stdout bytes.Buffer
var stderr bytes.Buffer
opts, exitCode := parseClientOptions([]string{
"-jazz-room", "room:password",
"-listen", "127.0.0.1:9002",
"-dc",
}, "client", &stdout, &stderr)
if exitCode != cliutil.ContinueExecution {
t.Fatalf("parseClientOptions() exitCode = %d, want %d", exitCode, cliutil.ContinueExecution)
}
if stderr.Len() != 0 {
t.Fatalf("expected no stderr output, got %q", stderr.String())
}
if !opts.dc {
t.Fatal("dc = false, want true")
}
if opts.jazzRoom != "room:password" {
t.Fatalf("jazzRoom = %q, want room:password", opts.jazzRoom)
}
if opts.peerAddr != "" {
t.Fatalf("peerAddr = %q, want empty for dc mode", opts.peerAddr)
}
}
func TestParseClientOptionsRejectsJazzRoomWithoutDataChannel(t *testing.T) {
t.Parallel()
var stdout bytes.Buffer
var stderr bytes.Buffer
_, exitCode := parseClientOptions([]string{
"-peer", "127.0.0.1:56000",
"-jazz-room", "room:password",
}, "client", &stdout, &stderr)
if exitCode != 2 {
t.Fatalf("parseClientOptions() exitCode = %d, want 2", exitCode)
}
if got := stderr.String(); !strings.Contains(got, "-jazz-room requires -dc") {
t.Fatalf("expected jazz room validation error, got %q", got)
}
}

64
client/telemostdc.go

@ -1,64 +0,0 @@
package main
import (
"context"
"errors"
"log"
"net"
"sync/atomic"
"github.com/cacggghp/vk-turn-proxy/internal/telemost"
)
func runTelemostDataChannelMode(ctx context.Context, inviteLink, listenAddr string) error {
listenConn, err := net.ListenPacket("udp", listenAddr)
if err != nil {
return err
}
defer func(listenConn net.PacketConn) {
err := listenConn.Close()
if err != nil {
log.Println(err)
}
}(listenConn)
var activeLocalPeer atomic.Value
peer, err := telemost.NewConnectedPeer(ctx, inviteLink, func(data []byte) {
addr, ok := activeLocalPeer.Load().(net.Addr)
if !ok || addr == nil {
return
}
if _, writeErr := listenConn.WriteTo(data, addr); writeErr != nil {
log.Printf("Telemost DataChannel: failed to write local packet: %v", writeErr)
}
}, nil)
if err != nil {
return err
}
defer func(peer *telemost.Peer) {
err := peer.Close()
if err != nil {
log.Println(err)
}
}(peer)
closeOnContextDone(ctx, listenConn)
log.Printf("Telemost DataChannel mode: listening on %s", listenAddr)
buf := make([]byte, 2048)
for {
n, addr, err := listenConn.ReadFrom(buf)
if err != nil {
if ctx.Err() != nil || errors.Is(err, net.ErrClosed) {
return nil
}
return err
}
activeLocalPeer.Store(addr)
if err := peer.Send(buf[:n]); err != nil {
log.Printf("Telemost DataChannel: dropped outbound packet (%d bytes): %v", n, err)
}
}
}

205
internal/jazz/api.go

@ -0,0 +1,205 @@
package jazz
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/cacggghp/vk-turn-proxy/internal/namegen"
"github.com/google/uuid"
)
const defaultAPIBaseURL = "https://bk.salutejazz.ru"
var (
apiBaseURL = defaultAPIBaseURL
apiHTTPClient = &http.Client{Timeout: 15 * time.Second}
)
var (
errCreateRoomFailed = errors.New("create room failed")
errPreconnectFailed = errors.New("preconnect failed")
)
type RoomInfo struct {
RoomID string `json:"roomId"`
Password string `json:"password"`
ConnectorURL string `json:"connectorUrl"`
}
func CreateRoom(ctx context.Context) (*RoomInfo, error) {
headers := jazzHeaders()
createResp, err := createMeeting(ctx, headers)
if err != nil {
return nil, fmt.Errorf("create meeting: %w", err)
}
connectorURL, err := preconnect(ctx, createResp.RoomID, createResp.Password, headers)
if err != nil {
return nil, fmt.Errorf("preconnect: %w", err)
}
return &RoomInfo{
RoomID: createResp.RoomID,
Password: createResp.Password,
ConnectorURL: connectorURL,
}, nil
}
func JoinRoom(ctx context.Context, roomInput string) (*RoomInfo, error) {
roomID, password := parseRoomInput(roomInput)
if roomID == "" {
return nil, fmt.Errorf("jazz room is required")
}
connectorURL, err := preconnect(ctx, roomID, password, jazzHeaders())
if err != nil {
return nil, err
}
return &RoomInfo{
RoomID: roomID,
Password: password,
ConnectorURL: connectorURL,
}, nil
}
func parseRoomInput(roomInput string) (string, string) {
roomInput = strings.TrimSpace(roomInput)
roomInput = strings.TrimPrefix(roomInput, "https://salutejazz.ru/")
roomInput = strings.TrimPrefix(roomInput, "http://salutejazz.ru/")
roomInput = strings.TrimPrefix(roomInput, "https://jazz.sber.ru/")
roomInput = strings.TrimPrefix(roomInput, "http://jazz.sber.ru/")
if idx := strings.IndexAny(roomInput, "/?#"); idx != -1 {
roomInput = roomInput[:idx]
}
roomID, password, _ := strings.Cut(roomInput, ":")
return strings.TrimSpace(roomID), strings.TrimSpace(password)
}
func jazzHeaders() map[string]string {
return map[string]string{
"X-Jazz-ClientId": uuid.New().String(),
"X-Jazz-AuthType": "ANONYMOUS",
"X-Client-AuthType": "ANONYMOUS",
"Content-Type": "application/json",
}
}
type createResponse struct {
RoomID string `json:"roomId"`
Password string `json:"password"`
}
func createMeeting(ctx context.Context, headers map[string]string) (*createResponse, error) {
createPayload := map[string]any{
"title": namegen.Generate() + " ДР",
"guestEnabled": true,
"lobbyEnabled": false,
"serverVideoRecordAutoStartEnabled": false,
"sipEnabled": false,
"moderatorEmails": []string{},
"summarizationEnabled": false,
"room3dEnabled": false,
"room3dScene": "XRLobby",
}
body, err := json.Marshal(createPayload)
if err != nil {
return nil, fmt.Errorf("marshal create payload: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, apiBaseURL+"/room/create-meeting", bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
setHeaders(req, headers)
resp, err := apiHTTPClient.Do(req)
if err != nil {
return nil, fmt.Errorf("do create request: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
return nil, statusError(errCreateRoomFailed, resp)
}
var res createResponse
if err := json.NewDecoder(resp.Body).Decode(&res); err != nil {
return nil, fmt.Errorf("decode create response: %w", err)
}
return &res, nil
}
func preconnect(ctx context.Context, roomID, password string, headers map[string]string) (string, error) {
preconnectPayload := map[string]any{
"password": password,
"jazzNextMigration": map[string]any{
"b2bBaseRoomSupport": true,
"demoRoomBaseSupport": true,
"demoRoomVersionSupport": 2,
"mediaWithoutAutoSubscribeSupport": true,
"webinarSpeakerSupport": true,
"webinarViewerSupport": true,
"sdkRoomSupport": true,
"sberclassRoomSupport": true,
},
}
body, err := json.Marshal(preconnectPayload)
if err != nil {
return "", fmt.Errorf("marshal preconnect payload: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, fmt.Sprintf("%s/room/%s/preconnect", apiBaseURL, roomID), bytes.NewReader(body))
if err != nil {
return "", fmt.Errorf("create preconnect request: %w", err)
}
setHeaders(req, headers)
resp, err := apiHTTPClient.Do(req)
if err != nil {
return "", fmt.Errorf("do preconnect request: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
return "", statusError(errPreconnectFailed, resp)
}
var preconnectResp struct {
ConnectorURL string `json:"connectorUrl"`
}
if err := json.NewDecoder(resp.Body).Decode(&preconnectResp); err != nil {
return "", fmt.Errorf("decode preconnect response: %w", err)
}
if preconnectResp.ConnectorURL == "" {
return "", fmt.Errorf("preconnect response missing connector URL")
}
return preconnectResp.ConnectorURL, nil
}
func setHeaders(req *http.Request, headers map[string]string) {
for key, value := range headers {
req.Header.Set(key, value)
}
}
func statusError(base error, resp *http.Response) error {
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("%w: status %s and response body read failed: %v", base, resp.Status, err)
}
return fmt.Errorf("%w: status %s: %s", base, resp.Status, strings.TrimSpace(string(body)))
}

137
internal/jazz/datapacket.go

@ -0,0 +1,137 @@
package jazz
import (
"encoding/binary"
"fmt"
"io"
"github.com/google/uuid"
)
func encodeVarint(value uint64) []byte {
buf := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(buf, value)
return buf[:n]
}
func encodeField(fieldNumber int, wireType int, data []byte) []byte {
tag := encodeVarint(uint64(fieldNumber)<<3 | uint64(wireType)) //nolint:gosec
switch wireType {
case 2:
length := encodeVarint(uint64(len(data)))
result := make([]byte, 0, len(tag)+len(length)+len(data))
result = append(result, tag...)
result = append(result, length...)
result = append(result, data...)
return result
default:
result := make([]byte, 0, len(tag)+len(data))
result = append(result, tag...)
result = append(result, data...)
return result
}
}
func EncodeDataPacket(payload []byte) []byte {
userFields := encodeField(2, 2, payload)
userFields = append(userFields, encodeField(8, 2, []byte(uuid.New().String()))...)
packet := encodeField(1, 0, encodeVarint(0))
packet = append(packet, encodeField(2, 2, userFields)...)
return packet
}
func DecodeDataPacket(raw []byte) ([]byte, bool) {
userData, ok := parseFields(raw, 2)
if !ok {
return nil, false
}
payload, ok := parseFields(userData, 2)
return payload, ok
}
func parseFields(data []byte, targetField int) ([]byte, bool) {
reader := &byteReader{data: data}
var result []byte
for reader.pos < len(reader.data) {
tagVal, err := readVarint(reader)
if err != nil {
break
}
fieldNumber := int(tagVal >> 3)
wireType := int(tagVal & 0x07)
fieldData, ok := handleWireType(reader, wireType, len(data))
if !ok {
return result, len(result) > 0
}
if fieldNumber == targetField && wireType == 2 {
result = fieldData
}
}
return result, len(result) > 0
}
func readVarint(r io.ByteReader) (uint64, error) {
val, err := binary.ReadUvarint(r)
if err != nil {
return 0, fmt.Errorf("read uvarint: %w", err)
}
return val, nil
}
func handleWireType(reader *byteReader, wireType int, dataLen int) ([]byte, bool) {
switch wireType {
case 0:
_, err := readVarint(reader)
return nil, err == nil
case 1:
reader.pos += 8
return nil, reader.pos <= dataLen
case 2:
length, err := readVarint(reader)
if err != nil {
return nil, false
}
if length > uint64(dataLen)-uint64(reader.pos) { //nolint:gosec
return nil, false
}
fieldData := make([]byte, length)
n, err := reader.Read(fieldData)
return fieldData, err == nil && uint64(n) == length //nolint:gosec
case 5:
reader.pos += 4
return nil, reader.pos <= dataLen
default:
return nil, false
}
}
type byteReader struct {
data []byte
pos int
}
func (b *byteReader) ReadByte() (byte, error) {
if b.pos >= len(b.data) {
return 0, io.EOF
}
c := b.data[b.pos]
b.pos++
return c, nil
}
func (b *byteReader) Read(p []byte) (int, error) {
if b.pos >= len(b.data) {
return 0, io.EOF
}
n := copy(p, b.data[b.pos:])
b.pos += n
return n, nil
}

769
internal/jazz/peer.go

@ -0,0 +1,769 @@
package jazz
import (
"context"
"fmt"
"log"
"sync"
"sync/atomic"
"time"
"github.com/cacggghp/vk-turn-proxy/internal/namegen"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/pion/webrtc/v4"
)
const (
dataChannelLabel = "_reliable"
maxDataChannelMessageSize = 12288
)
var debugLogging atomic.Bool
type Peer struct {
roomInput string
name string
onData func([]byte)
roomMu sync.Mutex
roomInfo *RoomInfo
sessionMu sync.RWMutex
session *peerSession
reconnectCbMu sync.RWMutex
reconnectCb func()
wsMu sync.Mutex
sendMu sync.Mutex
reconnectCh chan struct{}
closeCh chan struct{}
closed atomic.Bool
}
type peerSession struct {
roomInfo *RoomInfo
ws *websocket.Conn
pcSub *webrtc.PeerConnection
pcPub *webrtc.PeerConnection
dc *webrtc.DataChannel
groupID string
connected atomic.Bool
}
func NewPeer(roomInput, name string, onData func([]byte)) *Peer {
return &Peer{
roomInput: roomInput,
name: name,
onData: onData,
reconnectCh: make(chan struct{}, 1),
closeCh: make(chan struct{}),
}
}
func NewConnectedPeer(ctx context.Context, roomInput string, onData func([]byte), onReconnect func()) (*Peer, error) {
peer := NewPeer(roomInput, namegen.Generate(), onData)
if onReconnect != nil {
peer.SetReconnectCallback(onReconnect)
}
if err := peer.Connect(ctx); err != nil {
if closeErr := peer.Close(); closeErr != nil {
log.Printf("SaluteJazz DataChannel peer cleanup failed after connect error: %v", closeErr)
}
return nil, err
}
go peer.WatchConnection(ctx)
return peer, nil
}
func SetDebug(enabled bool) {
debugLogging.Store(enabled)
}
func DebugEnabled() bool {
return debugLogging.Load()
}
func debugf(format string, args ...any) {
if DebugEnabled() {
log.Printf(format, args...)
}
}
func (p *Peer) Connect(ctx context.Context) error {
return p.connectOnce(ctx)
}
func (p *Peer) WatchConnection(ctx context.Context) {
const (
maxReconnects = 10
reconnectWindow = 5 * time.Minute
)
var lastReconnect time.Time
reconnectCount := 0
for {
select {
case <-ctx.Done():
return
case <-p.closeCh:
return
case <-p.reconnectCh:
}
now := time.Now()
if now.Sub(lastReconnect) > reconnectWindow {
reconnectCount = 0
}
if reconnectCount >= maxReconnects {
log.Printf("SaluteJazz DataChannel: max reconnect attempts (%d) reached", maxReconnects)
return
}
reconnectCount++
lastReconnect = now
backoff := time.Duration(reconnectCount) * 2 * time.Second
if backoff > 30*time.Second {
backoff = 30 * time.Second
}
for {
if ctx.Err() != nil || p.closed.Load() {
return
}
debugf("SaluteJazz DataChannel: reconnecting in %v", backoff)
select {
case <-ctx.Done():
return
case <-p.closeCh:
return
case <-time.After(backoff):
}
if err := p.reconnect(ctx); err != nil {
debugf("SaluteJazz DataChannel: reconnect failed: %v", err)
continue
}
debugf("SaluteJazz DataChannel: reconnected")
break
}
}
}
func (p *Peer) Send(data []byte) error {
if len(data) > maxDataChannelMessageSize {
return fmt.Errorf("datachannel message too large: %d > %d", len(data), maxDataChannelMessageSize)
}
p.sessionMu.RLock()
session := p.session
p.sessionMu.RUnlock()
if session == nil || session.dc == nil || session.dc.ReadyState() != webrtc.DataChannelStateOpen {
return fmt.Errorf("datachannel not ready")
}
p.sendMu.Lock()
defer p.sendMu.Unlock()
encoded := EncodeDataPacket(data)
start := time.Now()
for session.dc.BufferedAmount() > 256*1024 {
if time.Since(start) > 2*time.Second {
return fmt.Errorf("datachannel buffer is full")
}
if session.dc.ReadyState() != webrtc.DataChannelStateOpen {
return fmt.Errorf("datachannel not ready")
}
time.Sleep(10 * time.Millisecond)
}
return session.dc.Send(encoded)
}
func (p *Peer) Close() error {
if p.closed.Swap(true) {
return nil
}
select {
case <-p.closeCh:
default:
close(p.closeCh)
}
p.cleanupCurrentSession()
return nil
}
func (p *Peer) SetReconnectCallback(cb func()) {
p.reconnectCbMu.Lock()
p.reconnectCb = cb
p.reconnectCbMu.Unlock()
}
func (p *Peer) reconnect(ctx context.Context) error {
p.cleanupCurrentSession()
if err := p.connectOnce(ctx); err != nil {
return err
}
p.reconnectCbMu.RLock()
cb := p.reconnectCb
p.reconnectCbMu.RUnlock()
if cb != nil {
cb()
}
return nil
}
func (p *Peer) connectOnce(ctx context.Context) error {
roomInfo, err := p.connectionInfo(ctx)
if err != nil {
return err
}
config := webrtc.Configuration{
ICEServers: []webrtc.ICEServer{},
SDPSemantics: webrtc.SDPSemanticsUnifiedPlan,
BundlePolicy: webrtc.BundlePolicyMaxBundle,
}
api := webrtc.NewAPI()
pcSub, err := api.NewPeerConnection(config)
if err != nil {
return err
}
pcPub, err := api.NewPeerConnection(config)
if err != nil {
_ = pcSub.Close()
return err
}
session := &peerSession{
roomInfo: roomInfo,
pcSub: pcSub,
pcPub: pcPub,
}
pcSub.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
debugf("SaluteJazz subscriber state: %s", state.String())
if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateDisconnected {
p.signalReconnectIfCurrent(session, "subscriber state "+state.String())
}
})
pcPub.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
debugf("SaluteJazz publisher state: %s", state.String())
if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateDisconnected {
p.signalReconnectIfCurrent(session, "publisher state "+state.String())
}
})
dc, err := pcPub.CreateDataChannel(dataChannelLabel, &webrtc.DataChannelInit{
Ordered: ptr(true),
})
if err != nil {
p.cleanupSession(session)
return err
}
session.dc = dc
dcReady := make(chan struct{}, 1)
dc.OnOpen(func() {
if session.connected.CompareAndSwap(false, true) {
log.Printf("SaluteJazz DataChannel connected")
}
select {
case dcReady <- struct{}{}:
default:
}
})
dc.OnClose(func() {
p.signalReconnectIfCurrent(session, "publisher datachannel closed")
})
dc.OnMessage(func(msg webrtc.DataChannelMessage) {
p.handleIncomingMessage(msg.Data)
})
pcSub.OnDataChannel(func(remoteDC *webrtc.DataChannel) {
if remoteDC.Label() != dataChannelLabel {
debugf("SaluteJazz remote datachannel ignored: %s", remoteDC.Label())
return
}
debugf("SaluteJazz remote datachannel opened: %s", remoteDC.Label())
remoteDC.OnClose(func() {
p.signalReconnectIfCurrent(session, "remote datachannel "+remoteDC.Label()+" closed")
})
remoteDC.OnMessage(func(msg webrtc.DataChannelMessage) {
p.handleIncomingMessage(msg.Data)
})
})
ws, resp, err := websocket.DefaultDialer.Dial(roomInfo.ConnectorURL, nil)
if err != nil {
if resp != nil && resp.Body != nil {
_ = resp.Body.Close()
}
p.cleanupSession(session)
return err
}
if resp != nil && resp.Body != nil {
_ = resp.Body.Close()
}
session.ws = ws
ws.SetPongHandler(func(string) error {
return ws.SetReadDeadline(time.Now().Add(60 * time.Second))
})
if err := ws.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil {
p.cleanupSession(session)
return err
}
p.sessionMu.Lock()
p.session = session
p.sessionMu.Unlock()
go p.keepAlive(session)
go p.handleSignaling(session)
if err := p.sendJoin(session); err != nil {
p.cleanupCurrentSession()
return err
}
select {
case <-ctx.Done():
p.cleanupCurrentSession()
return ctx.Err()
case <-p.closeCh:
p.cleanupCurrentSession()
return context.Canceled
case <-time.After(30 * time.Second):
p.cleanupCurrentSession()
return fmt.Errorf("datachannel timeout")
case <-dcReady:
return nil
}
}
func (p *Peer) connectionInfo(ctx context.Context) (*RoomInfo, error) {
p.roomMu.Lock()
defer p.roomMu.Unlock()
if p.roomInfo != nil {
roomInfo, err := JoinRoom(ctx, p.roomInfo.RoomID+":"+p.roomInfo.Password)
if err != nil {
return nil, fmt.Errorf("refresh room connection: %w", err)
}
p.roomInfo.ConnectorURL = roomInfo.ConnectorURL
return p.roomInfo, nil
}
roomID, _ := parseRoomInput(p.roomInput)
if roomID == "" || roomID == "any" || roomID == "dummy" {
roomInfo, err := CreateRoom(ctx)
if err != nil {
return nil, fmt.Errorf("create room: %w", err)
}
p.roomInfo = roomInfo
log.Printf("SaluteJazz room created: %s:%s", roomInfo.RoomID, roomInfo.Password)
return roomInfo, nil
}
roomInfo, err := JoinRoom(ctx, p.roomInput)
if err != nil {
return nil, fmt.Errorf("join room: %w", err)
}
p.roomInfo = roomInfo
log.Printf("SaluteJazz joining room: %s", roomInfo.RoomID)
return roomInfo, nil
}
func (p *Peer) keepAlive(session *peerSession) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-p.closeCh:
return
case <-ticker.C:
if err := p.writeControl(session, websocket.PingMessage, []byte{}); err != nil {
p.signalReconnectIfCurrent(session, "websocket ping failed")
return
}
}
}
}
func (p *Peer) handleIncomingMessage(data []byte) {
payload, ok := DecodeDataPacket(data)
if !ok {
debugf("SaluteJazz DataChannel: failed to decode packet, using raw payload")
payload = data
}
if p.onData != nil && len(payload) > 0 {
p.onData(payload)
}
}
func (p *Peer) handleSignaling(session *peerSession) {
for {
var msg map[string]any
if err := session.ws.ReadJSON(&msg); err != nil {
if p.isCurrentSession(session) && !p.closed.Load() {
debugf("SaluteJazz signaling read error: %v", err)
p.signalReconnectIfCurrent(session, "signaling read failed")
}
return
}
if err := session.ws.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil {
if p.isCurrentSession(session) && !p.closed.Load() {
debugf("SaluteJazz signaling deadline update failed: %v", err)
p.signalReconnectIfCurrent(session, "signaling deadline update failed")
}
return
}
event := stringValue(msg, "event")
payload := mapValue(msg, "payload")
switch event {
case "join-response":
p.handleJoinResponse(session, payload)
case "media-out":
p.handleMediaOut(session, payload)
}
}
}
func (p *Peer) handleJoinResponse(session *peerSession, payload map[string]any) {
group := mapValue(payload, "participantGroup")
session.groupID = stringValue(group, "groupId")
debugf("SaluteJazz peer joined: groupId=%s", session.groupID)
}
func (p *Peer) handleMediaOut(session *peerSession, payload map[string]any) {
method := stringValue(payload, "method")
switch method {
case "rtc:config":
p.handleRTCConfig(session, payload)
case "rtc:offer":
p.handleSubscriberOffer(session, payload)
case "rtc:answer":
p.handlePublisherAnswer(session, payload)
case "rtc:ice":
p.handleICE(session, payload)
}
}
func (p *Peer) handleRTCConfig(session *peerSession, payload map[string]any) {
config := mapValue(payload, "configuration")
servers := sliceValue(config, "iceServers")
iceServers := make([]webrtc.ICEServer, 0, len(servers))
for _, rawServer := range servers {
server, ok := rawServer.(map[string]any)
if !ok {
continue
}
rawURLs := sliceValue(server, "urls")
username := stringValue(server, "username")
credential := stringValue(server, "credential")
urls := make([]string, 0, len(rawURLs))
for _, rawURL := range rawURLs {
if url, ok := rawURL.(string); ok && url != "" {
urls = append(urls, url)
}
}
if len(urls) > 0 {
iceServers = append(iceServers, webrtc.ICEServer{
URLs: urls,
Username: username,
Credential: credential,
})
}
}
if len(iceServers) == 0 {
return
}
newConfig := webrtc.Configuration{
ICEServers: iceServers,
SDPSemantics: webrtc.SDPSemanticsUnifiedPlan,
BundlePolicy: webrtc.BundlePolicyMaxBundle,
}
if err := session.pcSub.SetConfiguration(newConfig); err != nil {
debugf("SaluteJazz subscriber SetConfiguration failed: %v", err)
}
if err := session.pcPub.SetConfiguration(newConfig); err != nil {
debugf("SaluteJazz publisher SetConfiguration failed: %v", err)
}
}
func (p *Peer) handleSubscriberOffer(session *peerSession, payload map[string]any) {
desc := mapValue(payload, "description")
sdp := stringValue(desc, "sdp")
if sdp == "" {
debugf("SaluteJazz subscriber offer missing SDP")
return
}
if err := session.pcSub.SetRemoteDescription(webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: sdp}); err != nil {
debugf("SaluteJazz subscriber SetRemoteDescription failed: %v", err)
return
}
answer, err := session.pcSub.CreateAnswer(nil)
if err != nil {
debugf("SaluteJazz subscriber CreateAnswer failed: %v", err)
return
}
if err := session.pcSub.SetLocalDescription(answer); err != nil {
debugf("SaluteJazz subscriber SetLocalDescription failed: %v", err)
return
}
if err := p.writeJSON(session, map[string]any{
"roomId": session.roomInfo.RoomID,
"event": "media-in",
"groupId": session.groupID,
"requestId": uuid.New().String(),
"payload": map[string]any{
"method": "rtc:answer",
"description": map[string]any{
"type": "answer",
"sdp": answer.SDP,
},
},
}); err != nil {
debugf("SaluteJazz subscriber answer send failed: %v", err)
return
}
time.Sleep(300 * time.Millisecond)
p.sendPublisherOffer(session)
}
func (p *Peer) sendPublisherOffer(session *peerSession) {
offer, err := session.pcPub.CreateOffer(nil)
if err != nil {
debugf("SaluteJazz publisher CreateOffer failed: %v", err)
return
}
if err := session.pcPub.SetLocalDescription(offer); err != nil {
debugf("SaluteJazz publisher SetLocalDescription failed: %v", err)
return
}
if err := p.writeJSON(session, map[string]any{
"roomId": session.roomInfo.RoomID,
"event": "media-in",
"groupId": session.groupID,
"requestId": uuid.New().String(),
"payload": map[string]any{
"method": "rtc:offer",
"description": map[string]any{
"type": "offer",
"sdp": offer.SDP,
},
},
}); err != nil {
debugf("SaluteJazz publisher offer send failed: %v", err)
}
}
func (p *Peer) handlePublisherAnswer(session *peerSession, payload map[string]any) {
desc := mapValue(payload, "description")
sdp := stringValue(desc, "sdp")
if sdp == "" {
debugf("SaluteJazz publisher answer missing SDP")
return
}
if err := session.pcPub.SetRemoteDescription(webrtc.SessionDescription{Type: webrtc.SDPTypeAnswer, SDP: sdp}); err != nil {
debugf("SaluteJazz publisher SetRemoteDescription failed: %v", err)
}
}
func (p *Peer) handleICE(session *peerSession, payload map[string]any) {
candidates := sliceValue(payload, "rtcIceCandidates")
for _, rawCandidate := range candidates {
cand, ok := rawCandidate.(map[string]any)
if !ok {
continue
}
candidate := stringValue(cand, "candidate")
target := stringValue(cand, "target")
sdpMid := stringValue(cand, "sdpMid")
sdpMLineIndex, ok := floatValue(cand, "sdpMLineIndex")
if candidate == "" || target == "" || !ok {
continue
}
index := uint16(sdpMLineIndex)
init := webrtc.ICECandidateInit{
Candidate: candidate,
SDPMid: &sdpMid,
SDPMLineIndex: &index,
}
switch target {
case "SUBSCRIBER":
if err := session.pcSub.AddICECandidate(init); err != nil {
debugf("SaluteJazz subscriber ICE apply failed: %v", err)
}
case "PUBLISHER":
if err := session.pcPub.AddICECandidate(init); err != nil {
debugf("SaluteJazz publisher ICE apply failed: %v", err)
}
}
}
}
func (p *Peer) sendJoin(session *peerSession) error {
return p.writeJSON(session, map[string]any{
"roomId": session.roomInfo.RoomID,
"event": "join",
"requestId": uuid.New().String(),
"payload": map[string]any{
"password": session.roomInfo.Password,
"participantName": p.name,
"supportedFeatures": map[string]any{
"attachedRooms": true,
"sessionGroups": true,
"transcription": true,
},
"isSilent": false,
},
})
}
func (p *Peer) writeJSON(session *peerSession, payload any) error {
p.wsMu.Lock()
defer p.wsMu.Unlock()
if session.ws == nil {
return fmt.Errorf("websocket is closed")
}
return session.ws.WriteJSON(payload)
}
func (p *Peer) writeControl(session *peerSession, messageType int, data []byte) error {
p.wsMu.Lock()
defer p.wsMu.Unlock()
if session.ws == nil {
return fmt.Errorf("websocket is closed")
}
return session.ws.WriteControl(messageType, data, time.Now().Add(10*time.Second))
}
func (p *Peer) cleanupCurrentSession() {
p.sessionMu.Lock()
session := p.session
p.session = nil
p.sessionMu.Unlock()
p.cleanupSession(session)
}
func (p *Peer) cleanupSession(session *peerSession) {
if session == nil {
return
}
if session.connected.Swap(false) {
log.Printf("SaluteJazz DataChannel closed")
}
if session.dc != nil {
_ = session.dc.Close()
}
if session.pcPub != nil {
_ = session.pcPub.Close()
}
if session.pcSub != nil {
_ = session.pcSub.Close()
}
if session.ws != nil {
p.wsMu.Lock()
if err := session.ws.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(time.Second)); err != nil {
debugf("SaluteJazz websocket close control failed: %v", err)
}
_ = session.ws.Close()
p.wsMu.Unlock()
}
}
func (p *Peer) isCurrentSession(session *peerSession) bool {
p.sessionMu.RLock()
defer p.sessionMu.RUnlock()
return p.session == session
}
func (p *Peer) signalReconnectIfCurrent(session *peerSession, reason string) {
if p.closed.Load() || !p.isCurrentSession(session) {
return
}
if session.connected.Swap(false) {
log.Printf("SaluteJazz DataChannel closed")
}
if reason != "" {
debugf("SaluteJazz DataChannel disconnect reason: %s", reason)
}
select {
case p.reconnectCh <- struct{}{}:
default:
}
}
func ptr[T any](value T) *T {
return &value
}
func stringValue(values map[string]any, key string) string {
value, ok := values[key].(string)
if !ok {
return ""
}
return value
}
func mapValue(values map[string]any, key string) map[string]any {
value, ok := values[key].(map[string]any)
if !ok {
return nil
}
return value
}
func sliceValue(values map[string]any, key string) []any {
value, ok := values[key].([]any)
if !ok {
return nil
}
return value
}
func floatValue(values map[string]any, key string) (float64, bool) {
value, ok := values[key].(float64)
return value, ok
}

81
server/dc.go

@ -0,0 +1,81 @@
package main
import (
"context"
"errors"
"log"
"net"
"github.com/cacggghp/vk-turn-proxy/internal/jazz"
"github.com/cacggghp/vk-turn-proxy/internal/telemost"
)
type dataChannelPeer interface {
Send([]byte) error
Close() error
}
type dataChannelConnectFunc func(context.Context, string, func([]byte), func()) (dataChannelPeer, error)
func connectTelemostDataChannelPeer(ctx context.Context, room string, onData func([]byte), onReconnect func()) (dataChannelPeer, error) {
return telemost.NewConnectedPeer(ctx, room, onData, onReconnect)
}
func connectJazzDataChannelPeer(ctx context.Context, room string, onData func([]byte), onReconnect func()) (dataChannelPeer, error) {
return jazz.NewConnectedPeer(ctx, room, onData, onReconnect)
}
func runTelemostDataChannelMode(ctx context.Context, inviteLink, connectAddr string) error {
return runDataChannelMode(ctx, "Telemost", connectTelemostDataChannelPeer, inviteLink, connectAddr)
}
func runJazzDataChannelMode(ctx context.Context, room, connectAddr string) error {
return runDataChannelMode(ctx, "SaluteJazz", connectJazzDataChannelPeer, room, connectAddr)
}
func runDataChannelMode(ctx context.Context, providerName string, connectPeer dataChannelConnectFunc, room, connectAddr string) error {
backendConn, err := net.Dial("udp", connectAddr)
if err != nil {
return err
}
defer func(backendConn net.Conn) {
err := backendConn.Close()
if err != nil {
log.Println(err)
}
}(backendConn)
peer, err := connectPeer(ctx, room, func(data []byte) {
if _, writeErr := backendConn.Write(data); writeErr != nil {
log.Printf("%s DataChannel: failed to write backend packet: %v", providerName, writeErr)
}
}, nil)
if err != nil {
return err
}
defer func(peer dataChannelPeer) {
err := peer.Close()
if err != nil {
log.Println(err)
}
}(peer)
closeOnContextDone(ctx, backendConn)
log.Printf("%s DataChannel mode: forwarding to %s", providerName, connectAddr)
buf := make([]byte, 2048)
for {
n, err := backendConn.Read(buf)
if err != nil {
if ctx.Err() != nil || errors.Is(err, net.ErrClosed) {
return nil
}
return err
}
if err := peer.Send(buf[:n]); err != nil {
log.Printf("%s DataChannel: dropped backend packet (%d bytes): %v", providerName, n, err)
}
}
}

61
server/telemostdc_vless.go → server/dc_vless.go

@ -12,10 +12,9 @@ import (
"time"
"github.com/cacggghp/vk-turn-proxy/internal/dcmux"
"github.com/cacggghp/vk-turn-proxy/internal/telemost"
)
type telemostBackendStream struct {
type dcBackendStream struct {
conn net.Conn
ctx context.Context
cancel context.CancelFunc
@ -25,9 +24,9 @@ type telemostBackendStream struct {
closed atomic.Bool
}
func newTelemostBackendStream(parent context.Context, conn net.Conn) *telemostBackendStream {
func newDCBackendStream(parent context.Context, conn net.Conn) *dcBackendStream {
ctx, cancel := context.WithCancel(parent)
return &telemostBackendStream{
return &dcBackendStream{
conn: conn,
ctx: ctx,
cancel: cancel,
@ -35,7 +34,7 @@ func newTelemostBackendStream(parent context.Context, conn net.Conn) *telemostBa
}
}
func (s *telemostBackendStream) Close() {
func (s *dcBackendStream) Close() {
s.closeOnce.Do(func() {
s.closed.Store(true)
s.cancel()
@ -45,7 +44,7 @@ func (s *telemostBackendStream) Close() {
})
}
func enqueueBackendData(stream *telemostBackendStream, data []byte) error {
func enqueueBackendData(stream *dcBackendStream, data []byte) error {
if stream.closed.Load() {
return context.Canceled
}
@ -60,7 +59,7 @@ func enqueueBackendData(stream *telemostBackendStream, data []byte) error {
}
}
func (s *telemostBackendStream) write(data []byte) error {
func (s *dcBackendStream) write(data []byte) error {
if s.closed.Load() {
return net.ErrClosed
}
@ -82,7 +81,7 @@ func (s *telemostBackendStream) write(data []byte) error {
return nil
}
func handleTelemostBackendStream(streamID uint16, stream *telemostBackendStream, mux *dcmux.Multiplexer, closeStream func(uint16), closeMuxStream func(uint16)) {
func handleDCBackendStream(streamID uint16, stream *dcBackendStream, mux *dcmux.Multiplexer, closeStream func(uint16), closeMuxStream func(uint16)) {
defer closeStream(streamID)
defer closeMuxStream(streamID)
@ -97,12 +96,8 @@ func handleTelemostBackendStream(streamID uint16, stream *telemostBackendStream,
for {
n, readErr := stream.conn.Read(buf)
if readErr != nil {
if errors.Is(readErr, io.EOF) || errors.Is(readErr, net.ErrClosed) {
if telemost.DebugEnabled() {
log.Printf("Telemost DataChannel VLESS backend stream closed: %v", readErr)
}
} else {
log.Printf("Telemost DataChannel VLESS backend read error: %v", readErr)
if !errors.Is(readErr, io.EOF) && !errors.Is(readErr, net.ErrClosed) {
log.Printf("DataChannel VLESS backend read error: %v", readErr)
}
return
}
@ -125,7 +120,7 @@ func handleTelemostBackendStream(streamID uint16, stream *telemostBackendStream,
if errors.Is(err, net.ErrClosed) || errors.Is(err, context.Canceled) || stream.closed.Load() {
return
}
log.Printf("Telemost DataChannel VLESS backend write error: %v", err)
log.Printf("DataChannel VLESS backend write error: %v", err)
return
}
}
@ -136,12 +131,20 @@ func handleTelemostBackendStream(streamID uint16, stream *telemostBackendStream,
}
func runTelemostDataChannelVLESSMode(ctx context.Context, inviteLink, connectAddr string) error {
return runDataChannelVLESSMode(ctx, "Telemost", connectTelemostDataChannelPeer, inviteLink, connectAddr)
}
func runJazzDataChannelVLESSMode(ctx context.Context, room, connectAddr string) error {
return runDataChannelVLESSMode(ctx, "SaluteJazz", connectJazzDataChannelPeer, room, connectAddr)
}
func runDataChannelVLESSMode(ctx context.Context, providerName string, connectPeer dataChannelConnectFunc, room, connectAddr string) error {
var (
connMu sync.Mutex
conns = make(map[uint16]*telemostBackendStream)
conns = make(map[uint16]*dcBackendStream)
)
var peer *telemost.Peer
var peer dataChannelPeer
mux := dcmux.New(0, func(frame []byte) error {
return peer.Send(frame)
})
@ -158,7 +161,7 @@ func runTelemostDataChannelVLESSMode(ctx context.Context, inviteLink, connectAdd
closeAll := func() {
connMu.Lock()
streams := make([]*telemostBackendStream, 0, len(conns))
streams := make([]*dcBackendStream, 0, len(conns))
for sid, stream := range conns {
streams = append(streams, stream)
delete(conns, sid)
@ -175,11 +178,11 @@ func runTelemostDataChannelVLESSMode(ctx context.Context, inviteLink, connectAdd
return
}
if err := mux.CloseStream(sid); err != nil {
log.Printf("Telemost DataChannel VLESS server: failed to close mux stream %d: %v", sid, err)
log.Printf("%s DataChannel VLESS server: failed to close mux stream %d: %v", providerName, sid, err)
}
}
getOrCreateBackendStream := func(sid uint16) (*telemostBackendStream, error) {
getOrCreateBackendStream := func(sid uint16) (*dcBackendStream, error) {
connMu.Lock()
stream := conns[sid]
connMu.Unlock()
@ -193,7 +196,7 @@ func runTelemostDataChannelVLESSMode(ctx context.Context, inviteLink, connectAdd
return nil, err
}
stream = newTelemostBackendStream(ctx, conn)
stream = newDCBackendStream(ctx, conn)
connMu.Lock()
if existing := conns[sid]; existing != nil {
@ -204,28 +207,26 @@ func runTelemostDataChannelVLESSMode(ctx context.Context, inviteLink, connectAdd
conns[sid] = stream
connMu.Unlock()
go handleTelemostBackendStream(sid, stream, mux, closeStream, closeMuxStream)
go handleDCBackendStream(sid, stream, mux, closeStream, closeMuxStream)
return stream, nil
}
peer, err := telemost.NewConnectedPeer(ctx, inviteLink, mux.HandleFrame, func() {
if telemost.DebugEnabled() {
log.Printf("Telemost DataChannel VLESS server: peer reconnected, closing active backend streams")
}
peer, err := connectPeer(ctx, room, mux.HandleFrame, func() {
log.Printf("%s DataChannel VLESS server: peer reconnected, closing active backend streams", providerName)
closeAll()
mux.Reset()
})
if err != nil {
return err
}
defer func(peer *telemost.Peer) {
defer func(peer dataChannelPeer) {
err := peer.Close()
if err != nil {
log.Println(err)
}
}(peer)
log.Printf("Telemost DataChannel VLESS server: forwarding to %s", connectAddr)
log.Printf("%s DataChannel VLESS server: forwarding to %s", providerName, connectAddr)
activityCh := mux.WaitForActivity()
for {
@ -241,13 +242,13 @@ func runTelemostDataChannelVLESSMode(ctx context.Context, inviteLink, connectAdd
if len(data) > 0 {
stream, err := getOrCreateBackendStream(sid)
if err != nil {
log.Printf("Telemost DataChannel VLESS backend dial error: %v", err)
log.Printf("%s DataChannel VLESS backend dial error: %v", providerName, err)
closeMuxStream(sid)
continue
}
if err := enqueueBackendData(stream, data); err != nil {
if !errors.Is(err, context.Canceled) {
log.Printf("Telemost DataChannel VLESS backend stream %d stalled: %v", sid, err)
log.Printf("%s DataChannel VLESS backend stream %d stalled: %v", providerName, sid, err)
}
closeStream(sid)
closeMuxStream(sid)

8
server/telemostdc_vless_test.go → server/dc_vless_test.go

@ -16,7 +16,7 @@ func TestEnqueueBackendDataReturnsWhenQueueIsFull(t *testing.T) {
_ = serverConn.Close()
}()
stream := newTelemostBackendStream(context.Background(), clientConn)
stream := newDCBackendStream(context.Background(), clientConn)
defer stream.Close()
stream.writeCh = make(chan []byte, 1)
@ -44,7 +44,7 @@ func TestEnqueueBackendDataReturnsCanceledForClosedStream(t *testing.T) {
_ = serverConn.Close()
}()
stream := newTelemostBackendStream(context.Background(), clientConn)
stream := newDCBackendStream(context.Background(), clientConn)
stream.Close()
err := enqueueBackendData(stream, []byte("next"))
@ -53,14 +53,14 @@ func TestEnqueueBackendDataReturnsCanceledForClosedStream(t *testing.T) {
}
}
func TestTelemostBackendStreamWriteReturnsNetErrClosedAfterClose(t *testing.T) {
func TestDCBackendStreamWriteReturnsNetErrClosedAfterClose(t *testing.T) {
clientConn, serverConn := net.Pipe()
defer func() {
_ = clientConn.Close()
_ = serverConn.Close()
}()
stream := newTelemostBackendStream(context.Background(), clientConn)
stream := newDCBackendStream(context.Background(), clientConn)
stream.Close()
err := stream.write([]byte("next"))

47
server/main.go

@ -16,6 +16,7 @@ import (
"time"
"github.com/cacggghp/vk-turn-proxy/internal/cliutil"
"github.com/cacggghp/vk-turn-proxy/internal/jazz"
"github.com/cacggghp/vk-turn-proxy/internal/telemost"
"github.com/cacggghp/vk-turn-proxy/tcputil"
"github.com/pion/dtls/v3"
@ -24,12 +25,13 @@ import (
)
type serverOptions struct {
listen string
connect string
yalink string
vlessMode bool
telemostDC bool
debug bool
listen string
connect string
yalink string
jazzRoom string
vlessMode bool
dc bool
debug bool
}
func newServerFlagSet(program string, output io.Writer) (*flag.FlagSet, *serverOptions) {
@ -40,16 +42,17 @@ func newServerFlagSet(program string, output io.Writer) (*flag.FlagSet, *serverO
fs.StringVar(&opts.listen, "listen", "0.0.0.0:56000", "listen on ip:port")
fs.StringVar(&opts.connect, "connect", "", "connect to ip:port")
fs.StringVar(&opts.yalink, "yandex-link", "", "Yandex Telemost invite link \"https://telemost.yandex.ru/j/...\"")
fs.StringVar(&opts.jazzRoom, "jazz-room", "", "SaluteJazz room \"roomId[:password]\" (use \"any\" to create)")
fs.BoolVar(&opts.vlessMode, "vless", false, "VLESS mode: forward TCP connections (for VLESS) instead of UDP packets")
fs.BoolVar(&opts.telemostDC, "telemost-dc", false, "use Yandex Telemost DataChannel instead of DTLS listener")
fs.BoolVar(&opts.telemostDC, "telemost-datachannel", false, "use Yandex Telemost DataChannel instead of DTLS listener")
fs.BoolVar(&opts.dc, "dc", false, "use WebRTC DataChannel instead of DTLS listener")
fs.BoolVar(&opts.debug, "debug", false, "enable debug logging")
fs.Usage = func() {
cliutil.Fprintf(fs.Output(), "Usage:\n %s -connect <ip:port> [flags]\n\n", program)
cliutil.Fprintln(fs.Output(), "Examples:")
cliutil.Fprintf(fs.Output(), " %s -connect 127.0.0.1:51820\n", program)
cliutil.Fprintf(fs.Output(), " %s -listen 0.0.0.0:56000 -connect 127.0.0.1:51820 -vless\n", program)
cliutil.Fprintf(fs.Output(), " %s -connect 127.0.0.1:51820 -yandex-link https://telemost.yandex.ru/j/... -telemost-dc\n\n", program)
cliutil.Fprintf(fs.Output(), " %s -connect 127.0.0.1:51820 -yandex-link https://telemost.yandex.ru/j/... -dc\n", program)
cliutil.Fprintf(fs.Output(), " %s -connect 127.0.0.1:51820 -jazz-room any -dc\n\n", program)
cliutil.Fprintln(fs.Output(), "Flags:")
fs.PrintDefaults()
}
@ -62,10 +65,11 @@ func parseServerOptions(args []string, program string, stdout, stderr io.Writer)
if opts.connect == "" {
return fmt.Errorf("-connect is required")
}
if opts.telemostDC {
if opts.yalink == "" {
return fmt.Errorf("-telemost-dc requires -yandex-link")
}
if opts.dc && (opts.yalink == "") == (opts.jazzRoom == "") {
return fmt.Errorf("-dc requires exactly one of -yandex-link or -jazz-room")
}
if opts.jazzRoom != "" && !opts.dc {
return fmt.Errorf("-jazz-room requires -dc")
}
return nil
})
@ -79,6 +83,14 @@ func runSelectedTelemostDataChannelMode(ctx context.Context, inviteLink, connect
return runTelemostDataChannelMode(ctx, inviteLink, connectAddr)
}
func runSelectedJazzDataChannelMode(ctx context.Context, room, connectAddr string, vlessMode bool) error {
if vlessMode {
return runJazzDataChannelVLESSMode(ctx, room, connectAddr)
}
return runJazzDataChannelMode(ctx, room, connectAddr)
}
func closeOnContextDone(ctx context.Context, closer io.Closer) {
go func() {
<-ctx.Done()
@ -105,13 +117,20 @@ func main() {
}()
telemost.SetDebug(opts.debug)
jazz.SetDebug(opts.debug)
if opts.telemostDC {
if opts.dc && opts.yalink != "" {
if err := runSelectedTelemostDataChannelMode(ctx, opts.yalink, opts.connect, opts.vlessMode); err != nil {
log.Fatalf("Telemost DataChannel mode failed: %v", err)
}
return
}
if opts.dc && opts.jazzRoom != "" {
if err := runSelectedJazzDataChannelMode(ctx, opts.jazzRoom, opts.connect, opts.vlessMode); err != nil {
log.Fatalf("SaluteJazz DataChannel mode failed: %v", err)
}
return
}
listenAddr, err := net.ResolveUDPAddr("udp", opts.listen)
if err != nil {

81
server/main_test.go

@ -98,7 +98,7 @@ func TestParseServerOptionsParsesTelemostDataChannelArgs(t *testing.T) {
opts, exitCode := parseServerOptions([]string{
"-connect", "127.0.0.1:51820",
"-yandex-link", "https://telemost.yandex.ru/j/test",
"-telemost-dc",
"-dc",
}, "server", &stdout, &stderr)
if exitCode != cliutil.ContinueExecution {
t.Fatalf("parseServerOptions() exitCode = %d, want %d", exitCode, cliutil.ContinueExecution)
@ -106,15 +106,15 @@ func TestParseServerOptionsParsesTelemostDataChannelArgs(t *testing.T) {
if stderr.Len() != 0 {
t.Fatalf("expected no stderr output, got %q", stderr.String())
}
if !opts.telemostDC {
t.Fatal("telemostDC = false, want true")
if !opts.dc {
t.Fatal("dc = false, want true")
}
if opts.yalink == "" {
t.Fatal("yalink = empty, want yandex link")
}
}
func TestParseServerOptionsRejectsTelemostDataChannelWithoutYandexLink(t *testing.T) {
func TestParseServerOptionsRejectsDataChannelWithoutRoom(t *testing.T) {
t.Parallel()
var stdout bytes.Buffer
@ -122,13 +122,13 @@ func TestParseServerOptionsRejectsTelemostDataChannelWithoutYandexLink(t *testin
_, exitCode := parseServerOptions([]string{
"-connect", "127.0.0.1:51820",
"-telemost-dc",
"-dc",
}, "server", &stdout, &stderr)
if exitCode != 2 {
t.Fatalf("parseServerOptions() exitCode = %d, want 2", exitCode)
}
if got := stderr.String(); !strings.Contains(got, "-telemost-dc requires -yandex-link") {
t.Fatalf("expected telemost-dc validation error, got %q", got)
if got := stderr.String(); !strings.Contains(got, "-dc requires exactly one of -yandex-link or -jazz-room") {
t.Fatalf("expected dc validation error, got %q", got)
}
}
@ -141,7 +141,7 @@ func TestParseServerOptionsAllowsTelemostDataChannelWithVLESS(t *testing.T) {
opts, exitCode := parseServerOptions([]string{
"-connect", "127.0.0.1:51820",
"-yandex-link", "https://telemost.yandex.ru/j/test",
"-telemost-dc",
"-dc",
"-vless",
}, "server", &stdout, &stderr)
if exitCode != cliutil.ContinueExecution {
@ -150,7 +150,68 @@ func TestParseServerOptionsAllowsTelemostDataChannelWithVLESS(t *testing.T) {
if stderr.Len() != 0 {
t.Fatalf("expected no stderr output, got %q", stderr.String())
}
if !opts.telemostDC || !opts.vlessMode {
t.Fatalf("expected telemostDC and vlessMode to be true, got telemostDC=%v vlessMode=%v", opts.telemostDC, opts.vlessMode)
if !opts.dc || !opts.vlessMode {
t.Fatalf("expected dc and vlessMode to be true, got dc=%v vlessMode=%v", opts.dc, opts.vlessMode)
}
}
func TestParseServerOptionsParsesJazzDataChannelArgs(t *testing.T) {
t.Parallel()
var stdout bytes.Buffer
var stderr bytes.Buffer
opts, exitCode := parseServerOptions([]string{
"-connect", "127.0.0.1:51820",
"-jazz-room", "any",
"-dc",
}, "server", &stdout, &stderr)
if exitCode != cliutil.ContinueExecution {
t.Fatalf("parseServerOptions() exitCode = %d, want %d", exitCode, cliutil.ContinueExecution)
}
if stderr.Len() != 0 {
t.Fatalf("expected no stderr output, got %q", stderr.String())
}
if !opts.dc {
t.Fatal("dc = false, want true")
}
if opts.jazzRoom != "any" {
t.Fatalf("jazzRoom = %q, want any", opts.jazzRoom)
}
}
func TestParseServerOptionsRejectsJazzDataChannelWithoutRoom(t *testing.T) {
t.Parallel()
var stdout bytes.Buffer
var stderr bytes.Buffer
_, exitCode := parseServerOptions([]string{
"-connect", "127.0.0.1:51820",
"-dc",
}, "server", &stdout, &stderr)
if exitCode != 2 {
t.Fatalf("parseServerOptions() exitCode = %d, want 2", exitCode)
}
if got := stderr.String(); !strings.Contains(got, "-dc requires exactly one of -yandex-link or -jazz-room") {
t.Fatalf("expected dc validation error, got %q", got)
}
}
func TestParseServerOptionsRejectsJazzRoomWithoutDataChannel(t *testing.T) {
t.Parallel()
var stdout bytes.Buffer
var stderr bytes.Buffer
_, exitCode := parseServerOptions([]string{
"-connect", "127.0.0.1:51820",
"-jazz-room", "any",
}, "server", &stdout, &stderr)
if exitCode != 2 {
t.Fatalf("parseServerOptions() exitCode = %d, want 2", exitCode)
}
if got := stderr.String(); !strings.Contains(got, "-jazz-room requires -dc") {
t.Fatalf("expected jazz room validation error, got %q", got)
}
}

57
server/telemostdc.go

@ -1,57 +0,0 @@
package main
import (
"context"
"errors"
"log"
"net"
"github.com/cacggghp/vk-turn-proxy/internal/telemost"
)
func runTelemostDataChannelMode(ctx context.Context, inviteLink, connectAddr string) error {
backendConn, err := net.Dial("udp", connectAddr)
if err != nil {
return err
}
defer func(backendConn net.Conn) {
err := backendConn.Close()
if err != nil {
log.Println(err)
}
}(backendConn)
peer, err := telemost.NewConnectedPeer(ctx, inviteLink, func(data []byte) {
if _, writeErr := backendConn.Write(data); writeErr != nil {
log.Printf("Telemost DataChannel: failed to write backend packet: %v", writeErr)
}
}, nil)
if err != nil {
return err
}
defer func(peer *telemost.Peer) {
err := peer.Close()
if err != nil {
log.Println(err)
}
}(peer)
closeOnContextDone(ctx, backendConn)
log.Printf("Telemost DataChannel mode: forwarding to %s", connectAddr)
buf := make([]byte, 2048)
for {
n, err := backendConn.Read(buf)
if err != nil {
if ctx.Err() != nil || errors.Is(err, net.ErrClosed) {
return nil
}
return err
}
if err := peer.Send(buf[:n]); err != nil {
log.Printf("Telemost DataChannel: dropped backend packet (%d bytes): %v", n, err)
}
}
}
Loading…
Cancel
Save