19 changed files with 2406 additions and 65 deletions
@ -0,0 +1,64 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"context" |
|||
"errors" |
|||
"log" |
|||
"net" |
|||
"sync/atomic" |
|||
|
|||
"github.com/cacggghp/vk-turn-proxy/internal/telemost" |
|||
) |
|||
|
|||
func runTelemostDataChannelMode(ctx context.Context, inviteLink, listenAddr string) error { |
|||
listenConn, err := net.ListenPacket("udp", listenAddr) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
defer func(listenConn net.PacketConn) { |
|||
err := listenConn.Close() |
|||
if err != nil { |
|||
log.Println(err) |
|||
} |
|||
}(listenConn) |
|||
|
|||
var activeLocalPeer atomic.Value |
|||
peer, err := telemost.NewConnectedPeer(ctx, inviteLink, func(data []byte) { |
|||
addr, ok := activeLocalPeer.Load().(net.Addr) |
|||
if !ok || addr == nil { |
|||
return |
|||
} |
|||
if _, writeErr := listenConn.WriteTo(data, addr); writeErr != nil { |
|||
log.Printf("Telemost DataChannel: failed to write local packet: %v", writeErr) |
|||
} |
|||
}, nil) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
defer func(peer *telemost.Peer) { |
|||
err := peer.Close() |
|||
if err != nil { |
|||
log.Println(err) |
|||
} |
|||
}(peer) |
|||
|
|||
closeOnContextDone(ctx, listenConn) |
|||
|
|||
log.Printf("Telemost DataChannel mode: listening on %s", listenAddr) |
|||
|
|||
buf := make([]byte, 2048) |
|||
for { |
|||
n, addr, err := listenConn.ReadFrom(buf) |
|||
if err != nil { |
|||
if ctx.Err() != nil || errors.Is(err, net.ErrClosed) { |
|||
return nil |
|||
} |
|||
return err |
|||
} |
|||
|
|||
activeLocalPeer.Store(addr) |
|||
if err := peer.Send(buf[:n]); err != nil { |
|||
log.Printf("Telemost DataChannel: dropped outbound packet (%d bytes): %v", n, err) |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,149 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"context" |
|||
"errors" |
|||
"log" |
|||
"net" |
|||
"sync" |
|||
"time" |
|||
|
|||
"github.com/cacggghp/vk-turn-proxy/internal/dcmux" |
|||
"github.com/cacggghp/vk-turn-proxy/internal/telemost" |
|||
) |
|||
|
|||
func runTelemostDataChannelVLESSMode(ctx context.Context, inviteLink, listenAddr string) error { |
|||
var ( |
|||
connMu sync.Mutex |
|||
conns = make(map[uint16]net.Conn) |
|||
) |
|||
closeAll := func() { |
|||
connMu.Lock() |
|||
defer connMu.Unlock() |
|||
for sid, conn := range conns { |
|||
_ = conn.Close() |
|||
delete(conns, sid) |
|||
} |
|||
} |
|||
|
|||
var peer *telemost.Peer |
|||
clientID := uint32(time.Now().UnixNano()) |
|||
mux := dcmux.New(clientID, func(frame []byte) error { |
|||
return peer.Send(frame) |
|||
}) |
|||
peer, err := telemost.NewConnectedPeer(ctx, inviteLink, mux.HandleFrame, func() { |
|||
if telemost.DebugEnabled() { |
|||
log.Printf("Telemost DataChannel VLESS: peer reconnected, closing active TCP streams") |
|||
} |
|||
closeAll() |
|||
mux.Reset() |
|||
}) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
defer func(peer *telemost.Peer) { |
|||
err := peer.Close() |
|||
if err != nil { |
|||
log.Println(err) |
|||
} |
|||
}(peer) |
|||
|
|||
listener, err := net.Listen("tcp", listenAddr) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
defer func(listener net.Listener) { |
|||
err := listener.Close() |
|||
if err != nil { |
|||
log.Println(err) |
|||
} |
|||
}(listener) |
|||
closeOnContextDone(ctx, listener) |
|||
|
|||
log.Printf("Telemost DataChannel VLESS mode: listening on %s", listenAddr) |
|||
|
|||
for { |
|||
conn, err := listener.Accept() |
|||
if err != nil { |
|||
if ctx.Err() != nil || errors.Is(err, net.ErrClosed) { |
|||
closeAll() |
|||
return nil |
|||
} |
|||
log.Printf("Telemost DataChannel VLESS accept error: %v", err) |
|||
continue |
|||
} |
|||
|
|||
sid := mux.OpenStream() |
|||
connMu.Lock() |
|||
conns[sid] = conn |
|||
connMu.Unlock() |
|||
|
|||
go func(streamID uint16, tcpConn net.Conn) { |
|||
defer func() { |
|||
connMu.Lock() |
|||
delete(conns, streamID) |
|||
connMu.Unlock() |
|||
if err := mux.CloseStream(streamID); err != nil { |
|||
log.Printf("Telemost DataChannel VLESS: failed to close mux stream %d: %v", streamID, err) |
|||
} |
|||
_ = tcpConn.Close() |
|||
mux.CleanupStream(streamID) |
|||
}() |
|||
|
|||
done := make(chan struct{}) |
|||
streamClosed := make(chan struct{}) |
|||
|
|||
go func() { |
|||
defer close(done) |
|||
buf := make([]byte, 32768) |
|||
for { |
|||
n, readErr := tcpConn.Read(buf) |
|||
if readErr != nil { |
|||
return |
|||
} |
|||
if sendErr := mux.SendData(streamID, buf[:n]); sendErr != nil { |
|||
return |
|||
} |
|||
} |
|||
}() |
|||
|
|||
go func() { |
|||
defer close(streamClosed) |
|||
for { |
|||
dataReady := mux.WaitForData(streamID) |
|||
|
|||
select { |
|||
case <-ctx.Done(): |
|||
return |
|||
case <-done: |
|||
return |
|||
case _, ok := <-dataReady: |
|||
if !ok { |
|||
return |
|||
} |
|||
} |
|||
|
|||
for { |
|||
data := mux.ReadStream(streamID) |
|||
if len(data) == 0 { |
|||
break |
|||
} |
|||
if _, writeErr := tcpConn.Write(data); writeErr != nil { |
|||
return |
|||
} |
|||
} |
|||
|
|||
if mux.StreamClosed(streamID) { |
|||
return |
|||
} |
|||
} |
|||
}() |
|||
|
|||
select { |
|||
case <-ctx.Done(): |
|||
case <-done: |
|||
case <-streamClosed: |
|||
} |
|||
}(sid, conn) |
|||
} |
|||
} |
|||
@ -0,0 +1,323 @@ |
|||
package dcmux |
|||
|
|||
import ( |
|||
"encoding/binary" |
|||
"sync" |
|||
) |
|||
|
|||
type Stream struct { |
|||
ID uint16 |
|||
ClientID uint32 |
|||
recvBuf []byte |
|||
closed bool |
|||
nextSeq uint32 |
|||
outOfOrder map[uint32][]byte |
|||
} |
|||
|
|||
type Multiplexer struct { |
|||
streams map[uint16]*Stream |
|||
nextID uint16 |
|||
clientID uint32 |
|||
onSend func([]byte) error |
|||
mu sync.RWMutex |
|||
maxStreams int |
|||
maxBufferSize int |
|||
activityCh chan struct{} |
|||
dataReady map[uint16]chan struct{} |
|||
dataReadyMu sync.Mutex |
|||
sendSeq map[uint16]uint32 |
|||
sendSeqMu sync.Mutex |
|||
} |
|||
|
|||
func New(clientID uint32, onSend func([]byte) error) *Multiplexer { |
|||
return &Multiplexer{ |
|||
streams: make(map[uint16]*Stream), |
|||
nextID: 1, |
|||
clientID: clientID, |
|||
onSend: onSend, |
|||
maxStreams: 10000, |
|||
maxBufferSize: 16 * 1024 * 1024, |
|||
activityCh: make(chan struct{}, 1), |
|||
dataReady: make(map[uint16]chan struct{}), |
|||
sendSeq: make(map[uint16]uint32), |
|||
} |
|||
} |
|||
|
|||
func (m *Multiplexer) OpenStream() uint16 { |
|||
m.mu.Lock() |
|||
defer m.mu.Unlock() |
|||
|
|||
for { |
|||
sid := m.nextID |
|||
m.nextID++ |
|||
if m.nextID == 0 { |
|||
m.nextID = 1 |
|||
} |
|||
|
|||
if _, exists := m.streams[sid]; !exists { |
|||
m.streams[sid] = &Stream{ |
|||
ID: sid, |
|||
recvBuf: make([]byte, 0), |
|||
nextSeq: 0, |
|||
outOfOrder: make(map[uint32][]byte), |
|||
} |
|||
return sid |
|||
} |
|||
} |
|||
} |
|||
|
|||
func (m *Multiplexer) SendData(sid uint16, data []byte) error { |
|||
m.mu.RLock() |
|||
stream, exists := m.streams[sid] |
|||
m.mu.RUnlock() |
|||
|
|||
if !exists || stream.closed { |
|||
return nil |
|||
} |
|||
|
|||
const chunkSize = 7168 |
|||
for i := 0; i < len(data); i += chunkSize { |
|||
end := i + chunkSize |
|||
if end > len(data) { |
|||
end = len(data) |
|||
} |
|||
|
|||
chunk := data[i:end] |
|||
|
|||
m.sendSeqMu.Lock() |
|||
seq := m.sendSeq[sid] |
|||
m.sendSeq[sid]++ |
|||
m.sendSeqMu.Unlock() |
|||
|
|||
frame := make([]byte, 12+len(chunk)) |
|||
binary.BigEndian.PutUint32(frame[0:4], m.clientID) |
|||
binary.BigEndian.PutUint16(frame[4:6], sid) |
|||
binary.BigEndian.PutUint16(frame[6:8], uint16(len(chunk))) |
|||
binary.BigEndian.PutUint32(frame[8:12], seq) |
|||
copy(frame[12:], chunk) |
|||
|
|||
if err := m.onSend(frame); err != nil { |
|||
return err |
|||
} |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (m *Multiplexer) CloseStream(sid uint16) error { |
|||
m.mu.Lock() |
|||
defer m.mu.Unlock() |
|||
|
|||
if stream, exists := m.streams[sid]; exists { |
|||
stream.closed = true |
|||
} |
|||
|
|||
m.sendSeqMu.Lock() |
|||
delete(m.sendSeq, sid) |
|||
m.sendSeqMu.Unlock() |
|||
|
|||
m.signalDataReady(sid) |
|||
|
|||
frame := make([]byte, 12) |
|||
binary.BigEndian.PutUint32(frame[0:4], m.clientID) |
|||
binary.BigEndian.PutUint16(frame[4:6], sid) |
|||
binary.BigEndian.PutUint16(frame[6:8], 0) |
|||
binary.BigEndian.PutUint32(frame[8:12], 0) |
|||
|
|||
return m.onSend(frame) |
|||
} |
|||
|
|||
func (m *Multiplexer) HandleFrame(frame []byte) { |
|||
if len(frame) < 12 { |
|||
return |
|||
} |
|||
|
|||
clientID := binary.BigEndian.Uint32(frame[0:4]) |
|||
sid := binary.BigEndian.Uint16(frame[4:6]) |
|||
length := binary.BigEndian.Uint16(frame[6:8]) |
|||
seq := binary.BigEndian.Uint32(frame[8:12]) |
|||
|
|||
if length == 0 { |
|||
m.mu.Lock() |
|||
if stream, exists := m.streams[sid]; exists && stream.ClientID == clientID { |
|||
stream.closed = true |
|||
} |
|||
m.mu.Unlock() |
|||
m.signalDataReady(sid) |
|||
return |
|||
} |
|||
|
|||
if len(frame) < 12+int(length) { |
|||
return |
|||
} |
|||
|
|||
data := frame[12 : 12+length] |
|||
|
|||
m.mu.Lock() |
|||
defer m.mu.Unlock() |
|||
|
|||
stream, exists := m.streams[sid] |
|||
if !exists { |
|||
if len(m.streams) >= m.maxStreams { |
|||
return |
|||
} |
|||
stream = &Stream{ |
|||
ID: sid, |
|||
ClientID: clientID, |
|||
recvBuf: make([]byte, 0), |
|||
nextSeq: 0, |
|||
outOfOrder: make(map[uint32][]byte), |
|||
} |
|||
m.streams[sid] = stream |
|||
} else if stream.ClientID != clientID { |
|||
stream.ClientID = clientID |
|||
stream.recvBuf = make([]byte, 0) |
|||
stream.closed = false |
|||
stream.nextSeq = 0 |
|||
stream.outOfOrder = make(map[uint32][]byte) |
|||
} |
|||
|
|||
if seq == stream.nextSeq { |
|||
if len(stream.recvBuf)+len(data) > m.maxBufferSize { |
|||
stream.closed = true |
|||
m.signalDataReady(sid) |
|||
return |
|||
} |
|||
stream.recvBuf = append(stream.recvBuf, data...) |
|||
stream.nextSeq++ |
|||
|
|||
for { |
|||
nextData, ok := stream.outOfOrder[stream.nextSeq] |
|||
if !ok { |
|||
break |
|||
} |
|||
if len(stream.recvBuf)+len(nextData) > m.maxBufferSize { |
|||
stream.closed = true |
|||
m.signalDataReady(sid) |
|||
return |
|||
} |
|||
stream.recvBuf = append(stream.recvBuf, nextData...) |
|||
delete(stream.outOfOrder, stream.nextSeq) |
|||
stream.nextSeq++ |
|||
} |
|||
|
|||
m.signalDataReady(sid) |
|||
return |
|||
} |
|||
|
|||
if seq > stream.nextSeq && len(stream.outOfOrder) < 100 { |
|||
stream.outOfOrder[seq] = append([]byte(nil), data...) |
|||
} |
|||
} |
|||
|
|||
func (m *Multiplexer) ReadStream(sid uint16) []byte { |
|||
m.mu.Lock() |
|||
defer m.mu.Unlock() |
|||
|
|||
stream, exists := m.streams[sid] |
|||
if !exists || len(stream.recvBuf) == 0 { |
|||
return nil |
|||
} |
|||
|
|||
data := stream.recvBuf |
|||
stream.recvBuf = make([]byte, 0) |
|||
return data |
|||
} |
|||
|
|||
func (m *Multiplexer) StreamClosed(sid uint16) bool { |
|||
m.mu.RLock() |
|||
defer m.mu.RUnlock() |
|||
|
|||
stream, exists := m.streams[sid] |
|||
return !exists || stream.closed |
|||
} |
|||
|
|||
func (m *Multiplexer) GetStreams() []uint16 { |
|||
m.mu.RLock() |
|||
defer m.mu.RUnlock() |
|||
|
|||
sids := make([]uint16, 0, len(m.streams)) |
|||
for sid := range m.streams { |
|||
sids = append(sids, sid) |
|||
} |
|||
return sids |
|||
} |
|||
|
|||
func (m *Multiplexer) WaitForData(sid uint16) <-chan struct{} { |
|||
m.dataReadyMu.Lock() |
|||
defer m.dataReadyMu.Unlock() |
|||
|
|||
if _, ok := m.dataReady[sid]; !ok { |
|||
m.dataReady[sid] = make(chan struct{}, 1) |
|||
} |
|||
return m.dataReady[sid] |
|||
} |
|||
|
|||
func (m *Multiplexer) WaitForActivity() <-chan struct{} { |
|||
return m.activityCh |
|||
} |
|||
|
|||
func (m *Multiplexer) CleanupStream(sid uint16) { |
|||
m.mu.Lock() |
|||
delete(m.streams, sid) |
|||
m.mu.Unlock() |
|||
|
|||
m.sendSeqMu.Lock() |
|||
delete(m.sendSeq, sid) |
|||
m.sendSeqMu.Unlock() |
|||
|
|||
m.dataReadyMu.Lock() |
|||
defer m.dataReadyMu.Unlock() |
|||
|
|||
if ch, ok := m.dataReady[sid]; ok { |
|||
close(ch) |
|||
delete(m.dataReady, sid) |
|||
} |
|||
} |
|||
|
|||
func (m *Multiplexer) Reset() { |
|||
m.mu.Lock() |
|||
for _, stream := range m.streams { |
|||
stream.closed = true |
|||
} |
|||
m.streams = make(map[uint16]*Stream) |
|||
m.nextID = 1 |
|||
m.mu.Unlock() |
|||
|
|||
m.sendSeqMu.Lock() |
|||
m.sendSeq = make(map[uint16]uint32) |
|||
m.sendSeqMu.Unlock() |
|||
|
|||
m.dataReadyMu.Lock() |
|||
for sid, ch := range m.dataReady { |
|||
close(ch) |
|||
delete(m.dataReady, sid) |
|||
} |
|||
m.dataReadyMu.Unlock() |
|||
|
|||
m.signalActivity() |
|||
} |
|||
|
|||
func (m *Multiplexer) signalDataReady(sid uint16) { |
|||
m.signalActivity() |
|||
|
|||
m.dataReadyMu.Lock() |
|||
defer m.dataReadyMu.Unlock() |
|||
|
|||
ch, ok := m.dataReady[sid] |
|||
if !ok { |
|||
return |
|||
} |
|||
|
|||
select { |
|||
case ch <- struct{}{}: |
|||
default: |
|||
} |
|||
} |
|||
|
|||
func (m *Multiplexer) signalActivity() { |
|||
select { |
|||
case m.activityCh <- struct{}{}: |
|||
default: |
|||
} |
|||
} |
|||
@ -0,0 +1,54 @@ |
|||
package dcmux |
|||
|
|||
import ( |
|||
"encoding/binary" |
|||
"testing" |
|||
) |
|||
|
|||
func TestCleanupStreamRemovesClosedStream(t *testing.T) { |
|||
t.Parallel() |
|||
|
|||
mux := New(1, func([]byte) error { return nil }) |
|||
mux.maxStreams = 1 |
|||
|
|||
dataReady := mux.WaitForData(7) |
|||
mux.HandleFrame(testFrame(1, 7, 0, []byte("hello"))) |
|||
if got := mux.GetStreams(); len(got) != 1 || got[0] != 7 { |
|||
t.Fatalf("GetStreams() = %v, want [7]", got) |
|||
} |
|||
select { |
|||
case <-dataReady: |
|||
default: |
|||
t.Fatal("expected dataReady signal after HandleFrame") |
|||
} |
|||
select { |
|||
case <-mux.WaitForActivity(): |
|||
default: |
|||
t.Fatal("expected activity signal after HandleFrame") |
|||
} |
|||
|
|||
mux.CleanupStream(7) |
|||
|
|||
if _, ok := <-dataReady; ok { |
|||
t.Fatal("dataReady channel stayed open after CleanupStream") |
|||
} |
|||
|
|||
if got := mux.GetStreams(); len(got) != 0 { |
|||
t.Fatalf("GetStreams() after CleanupStream = %v, want empty", got) |
|||
} |
|||
|
|||
mux.HandleFrame(testFrame(1, 8, 0, []byte("world"))) |
|||
if got := mux.GetStreams(); len(got) != 1 || got[0] != 8 { |
|||
t.Fatalf("GetStreams() after reusing capacity = %v, want [8]", got) |
|||
} |
|||
} |
|||
|
|||
func testFrame(clientID uint32, sid uint16, seq uint32, payload []byte) []byte { |
|||
frame := make([]byte, 12+len(payload)) |
|||
binary.BigEndian.PutUint32(frame[0:4], clientID) |
|||
binary.BigEndian.PutUint16(frame[4:6], sid) |
|||
binary.BigEndian.PutUint16(frame[6:8], uint16(len(payload))) |
|||
binary.BigEndian.PutUint32(frame[8:12], seq) |
|||
copy(frame[12:], payload) |
|||
return frame |
|||
} |
|||
@ -0,0 +1,193 @@ |
|||
package telemost |
|||
|
|||
import ( |
|||
"context" |
|||
"encoding/json" |
|||
"errors" |
|||
"fmt" |
|||
"io" |
|||
"log" |
|||
"net/http" |
|||
"net/url" |
|||
"strings" |
|||
"time" |
|||
|
|||
"github.com/google/uuid" |
|||
) |
|||
|
|||
const defaultAPIBaseURL = "https://cloud-api.yandex.ru/telemost_front/v2/telemost" |
|||
|
|||
var ( |
|||
apiBaseURL = defaultAPIBaseURL |
|||
apiHTTPClient = &http.Client{Timeout: 15 * time.Second} |
|||
) |
|||
|
|||
type ConnectionInfo struct { |
|||
RoomID string `json:"room_id"` |
|||
PeerID string `json:"peer_id"` |
|||
Credentials string `json:"credentials"` |
|||
ClientConfig struct { |
|||
MediaServerURL string `json:"media_server_url"` |
|||
} `json:"client_configuration"` |
|||
} |
|||
|
|||
type RoomTarget struct { |
|||
RoomID string |
|||
PreferredHost string |
|||
} |
|||
|
|||
func ParseRoomTarget(input string) (RoomTarget, error) { |
|||
input = strings.TrimSpace(input) |
|||
if input == "" { |
|||
return RoomTarget{}, fmt.Errorf("telemost invite link is required") |
|||
} |
|||
|
|||
target := RoomTarget{} |
|||
if strings.HasPrefix(input, "https://") || strings.HasPrefix(input, "http://") { |
|||
parsed, err := url.Parse(input) |
|||
if err != nil { |
|||
return RoomTarget{}, fmt.Errorf("invalid telemost invite link: %w", err) |
|||
} |
|||
parts := strings.Split(strings.Trim(parsed.Path, "/"), "/") |
|||
if len(parts) >= 2 && parts[0] == "j" && parts[1] != "" { |
|||
target.RoomID = parts[1] |
|||
target.PreferredHost = strings.ToLower(parsed.Hostname()) |
|||
} |
|||
} |
|||
|
|||
if target.RoomID == "" { |
|||
input = strings.TrimPrefix(input, "https://telemost.yandex.ru/j/") |
|||
input = strings.TrimPrefix(input, "http://telemost.yandex.ru/j/") |
|||
input = strings.TrimPrefix(input, "https://telemost.yandex.com/j/") |
|||
input = strings.TrimPrefix(input, "http://telemost.yandex.com/j/") |
|||
input = strings.TrimPrefix(input, "j/") |
|||
if idx := strings.IndexAny(input, "/?#"); idx != -1 { |
|||
input = input[:idx] |
|||
} |
|||
target.RoomID = strings.TrimSpace(input) |
|||
} |
|||
|
|||
if target.RoomID == "" { |
|||
return RoomTarget{}, fmt.Errorf("invalid telemost invite link") |
|||
} |
|||
|
|||
return target, nil |
|||
} |
|||
|
|||
func (t RoomTarget) CandidateRoomURLs() []string { |
|||
hosts := make([]string, 0, 3) |
|||
addHost := func(host string) { |
|||
host = strings.ToLower(strings.TrimSpace(host)) |
|||
if host == "" { |
|||
return |
|||
} |
|||
for _, existing := range hosts { |
|||
if existing == host { |
|||
return |
|||
} |
|||
} |
|||
hosts = append(hosts, host) |
|||
} |
|||
|
|||
addHost(t.PreferredHost) |
|||
addHost("telemost.yandex.ru") |
|||
addHost("telemost.yandex.com") |
|||
|
|||
roomURLs := make([]string, 0, len(hosts)) |
|||
for _, host := range hosts { |
|||
roomURLs = append(roomURLs, fmt.Sprintf("https://%s/j/%s", host, t.RoomID)) |
|||
} |
|||
|
|||
return roomURLs |
|||
} |
|||
|
|||
func WebOriginFromRoomURL(roomURL string) string { |
|||
parsed, err := url.Parse(roomURL) |
|||
if err != nil || parsed.Host == "" { |
|||
return "https://telemost.yandex.ru" |
|||
} |
|||
|
|||
scheme := parsed.Scheme |
|||
if scheme == "" { |
|||
scheme = "https" |
|||
} |
|||
|
|||
return scheme + "://" + parsed.Host |
|||
} |
|||
|
|||
func getConnectionInfoForRoomURL(ctx context.Context, roomURL, displayName string) (*ConnectionInfo, error) { |
|||
u := fmt.Sprintf("%s/conferences/%s/connection", apiBaseURL, url.QueryEscape(roomURL)) |
|||
origin := WebOriginFromRoomURL(roomURL) |
|||
|
|||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
|
|||
q := req.URL.Query() |
|||
q.Add("next_gen_media_platform_allowed", "true") |
|||
q.Add("display_name", displayName) |
|||
q.Add("waiting_room_supported", "true") |
|||
req.URL.RawQuery = q.Encode() |
|||
|
|||
req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0") |
|||
req.Header.Set("Accept", "*/*") |
|||
req.Header.Set("Content-Type", "application/json") |
|||
req.Header.Set("Client-Instance-Id", uuid.New().String()) |
|||
req.Header.Set("X-Telemost-Client-Version", "187.1.0") |
|||
req.Header.Set("Idempotency-Key", uuid.New().String()) |
|||
req.Header.Set("Origin", origin) |
|||
req.Header.Set("Referer", origin+"/") |
|||
|
|||
resp, err := apiHTTPClient.Do(req) |
|||
if err != nil { |
|||
if resp != nil && resp.Body != nil { |
|||
if closeErr := resp.Body.Close(); closeErr != nil { |
|||
log.Println(closeErr) |
|||
} |
|||
} |
|||
return nil, err |
|||
} |
|||
defer func(Body io.ReadCloser) { |
|||
err := Body.Close() |
|||
if err != nil { |
|||
log.Println(err) |
|||
} |
|||
}(resp.Body) |
|||
|
|||
if resp.StatusCode != http.StatusOK { |
|||
body, readErr := io.ReadAll(resp.Body) |
|||
if readErr != nil { |
|||
return nil, fmt.Errorf("telemost connection API returned %s and response body read failed: %w", resp.Status, readErr) |
|||
} |
|||
return nil, fmt.Errorf("telemost connection API returned %s: %s", resp.Status, strings.TrimSpace(string(body))) |
|||
} |
|||
|
|||
var info ConnectionInfo |
|||
if err := json.NewDecoder(resp.Body).Decode(&info); err != nil { |
|||
return nil, err |
|||
} |
|||
|
|||
return &info, nil |
|||
} |
|||
|
|||
func GetConnectionInfo(ctx context.Context, roomInput, displayName string) (*ConnectionInfo, string, error) { |
|||
target, err := ParseRoomTarget(roomInput) |
|||
if err != nil { |
|||
return nil, "", err |
|||
} |
|||
|
|||
var errs []string |
|||
for _, roomURL := range target.CandidateRoomURLs() { |
|||
info, err := getConnectionInfoForRoomURL(ctx, roomURL, displayName) |
|||
if err == nil { |
|||
return info, roomURL, nil |
|||
} |
|||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { |
|||
return nil, "", err |
|||
} |
|||
errs = append(errs, fmt.Sprintf("%s: %v", roomURL, err)) |
|||
} |
|||
|
|||
return nil, "", fmt.Errorf("telemost connection failed for room %s: %s", target.RoomID, strings.Join(errs, "; ")) |
|||
} |
|||
@ -0,0 +1,43 @@ |
|||
package telemost |
|||
|
|||
import ( |
|||
"context" |
|||
"errors" |
|||
"net/http" |
|||
"testing" |
|||
"time" |
|||
) |
|||
|
|||
func TestGetConnectionInfoHonorsContextCancellation(t *testing.T) { |
|||
previousBaseURL := apiBaseURL |
|||
previousClient := apiHTTPClient |
|||
apiBaseURL = "https://telemost.test" |
|||
apiHTTPClient = &http.Client{ |
|||
Transport: roundTripperFunc(func(req *http.Request) (*http.Response, error) { |
|||
<-req.Context().Done() |
|||
return nil, req.Context().Err() |
|||
}), |
|||
} |
|||
t.Cleanup(func() { |
|||
apiBaseURL = previousBaseURL |
|||
apiHTTPClient = previousClient |
|||
}) |
|||
|
|||
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) |
|||
defer cancel() |
|||
|
|||
start := time.Now() |
|||
_, _, err := GetConnectionInfo(ctx, "https://telemost.yandex.ru/j/test-room", "tester") |
|||
if !errors.Is(err, context.DeadlineExceeded) { |
|||
t.Fatalf("GetConnectionInfo() error = %v, want context deadline exceeded", err) |
|||
} |
|||
if elapsed := time.Since(start); elapsed > time.Second { |
|||
t.Fatalf("GetConnectionInfo() took too long to abort: %v", elapsed) |
|||
} |
|||
} |
|||
|
|||
type roundTripperFunc func(*http.Request) (*http.Response, error) |
|||
|
|||
func (f roundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) { |
|||
return f(req) |
|||
} |
|||
@ -0,0 +1,758 @@ |
|||
package telemost |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"log" |
|||
"net/http" |
|||
"strings" |
|||
"sync" |
|||
"sync/atomic" |
|||
"time" |
|||
|
|||
"github.com/cacggghp/vk-turn-proxy/internal/namegen" |
|||
"github.com/google/uuid" |
|||
"github.com/gorilla/websocket" |
|||
"github.com/pion/webrtc/v4" |
|||
) |
|||
|
|||
const ( |
|||
dataChannelLabel = "vk-turn-proxy" |
|||
) |
|||
|
|||
var debugLogging atomic.Bool |
|||
|
|||
type Peer struct { |
|||
roomURL string |
|||
name string |
|||
onData func([]byte) |
|||
|
|||
sessionMu sync.RWMutex |
|||
session *peerSession |
|||
|
|||
reconnectCbMu sync.RWMutex |
|||
reconnectCb func() |
|||
wsMu sync.Mutex |
|||
sendMu sync.Mutex |
|||
reconnectCh chan struct{} |
|||
closeCh chan struct{} |
|||
closed atomic.Bool |
|||
} |
|||
|
|||
type peerSession struct { |
|||
conn *ConnectionInfo |
|||
ws *websocket.Conn |
|||
pcSub *webrtc.PeerConnection |
|||
pcPub *webrtc.PeerConnection |
|||
dc *webrtc.DataChannel |
|||
connected atomic.Bool |
|||
} |
|||
|
|||
func NewPeer(roomURL, name string, onData func([]byte)) *Peer { |
|||
return &Peer{ |
|||
roomURL: roomURL, |
|||
name: name, |
|||
onData: onData, |
|||
reconnectCh: make(chan struct{}, 1), |
|||
closeCh: make(chan struct{}), |
|||
} |
|||
} |
|||
|
|||
// NewConnectedPeer parses a Telemost invite link, creates a peer, connects it,
|
|||
// and starts the background reconnect watcher.
|
|||
func NewConnectedPeer(ctx context.Context, inviteLink string, onData func([]byte), onReconnect func()) (*Peer, error) { |
|||
target, err := ParseRoomTarget(inviteLink) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
|
|||
peer := NewPeer(target.RoomID, namegen.Generate(), onData) |
|||
if onReconnect != nil { |
|||
peer.SetReconnectCallback(onReconnect) |
|||
} |
|||
if err := peer.Connect(ctx); err != nil { |
|||
if closeErr := peer.Close(); closeErr != nil { |
|||
log.Printf("Telemost DataChannel peer cleanup failed after connect error: %v", closeErr) |
|||
} |
|||
return nil, err |
|||
} |
|||
|
|||
go peer.WatchConnection(ctx) |
|||
|
|||
return peer, nil |
|||
} |
|||
|
|||
func SetDebug(enabled bool) { |
|||
debugLogging.Store(enabled) |
|||
} |
|||
|
|||
func DebugEnabled() bool { |
|||
return debugLogging.Load() |
|||
} |
|||
|
|||
func debugf(format string, args ...any) { |
|||
if DebugEnabled() { |
|||
log.Printf(format, args...) |
|||
} |
|||
} |
|||
|
|||
func isTransportDataChannelLabel(label string) bool { |
|||
return label == dataChannelLabel |
|||
} |
|||
|
|||
func (p *Peer) Connect(ctx context.Context) error { |
|||
return p.connectOnce(ctx) |
|||
} |
|||
|
|||
func (p *Peer) WatchConnection(ctx context.Context) { |
|||
const ( |
|||
maxReconnects = 10 |
|||
reconnectWindow = 5 * time.Minute |
|||
) |
|||
|
|||
var lastReconnect time.Time |
|||
reconnectCount := 0 |
|||
|
|||
for { |
|||
select { |
|||
case <-ctx.Done(): |
|||
return |
|||
case <-p.closeCh: |
|||
return |
|||
case <-p.reconnectCh: |
|||
} |
|||
|
|||
now := time.Now() |
|||
if now.Sub(lastReconnect) > reconnectWindow { |
|||
reconnectCount = 0 |
|||
} |
|||
if reconnectCount >= maxReconnects { |
|||
log.Printf("Telemost DataChannel: max reconnect attempts (%d) reached", maxReconnects) |
|||
return |
|||
} |
|||
reconnectCount++ |
|||
lastReconnect = now |
|||
|
|||
backoff := time.Duration(reconnectCount) * 2 * time.Second |
|||
if backoff > 30*time.Second { |
|||
backoff = 30 * time.Second |
|||
} |
|||
|
|||
for { |
|||
if ctx.Err() != nil || p.closed.Load() { |
|||
return |
|||
} |
|||
|
|||
debugf("Telemost DataChannel: reconnecting in %v", backoff) |
|||
select { |
|||
case <-ctx.Done(): |
|||
return |
|||
case <-p.closeCh: |
|||
return |
|||
case <-time.After(backoff): |
|||
} |
|||
|
|||
if err := p.reconnect(ctx); err != nil { |
|||
debugf("Telemost DataChannel: reconnect failed: %v", err) |
|||
continue |
|||
} |
|||
|
|||
debugf("Telemost DataChannel: reconnected") |
|||
break |
|||
} |
|||
} |
|||
} |
|||
|
|||
func (p *Peer) Send(data []byte) error { |
|||
p.sessionMu.RLock() |
|||
session := p.session |
|||
p.sessionMu.RUnlock() |
|||
|
|||
if session == nil || session.dc == nil || session.dc.ReadyState() != webrtc.DataChannelStateOpen { |
|||
return fmt.Errorf("datachannel not ready") |
|||
} |
|||
|
|||
p.sendMu.Lock() |
|||
defer p.sendMu.Unlock() |
|||
|
|||
start := time.Now() |
|||
for session.dc.BufferedAmount() > 256*1024 { |
|||
if time.Since(start) > 2*time.Second { |
|||
return fmt.Errorf("datachannel buffer is full") |
|||
} |
|||
if session.dc.ReadyState() != webrtc.DataChannelStateOpen { |
|||
return fmt.Errorf("datachannel not ready") |
|||
} |
|||
time.Sleep(10 * time.Millisecond) |
|||
} |
|||
|
|||
return session.dc.Send(data) |
|||
} |
|||
|
|||
func (p *Peer) Close() error { |
|||
if p.closed.Swap(true) { |
|||
return nil |
|||
} |
|||
|
|||
select { |
|||
case <-p.closeCh: |
|||
default: |
|||
close(p.closeCh) |
|||
} |
|||
|
|||
p.cleanupCurrentSession() |
|||
return nil |
|||
} |
|||
|
|||
func (p *Peer) SetReconnectCallback(cb func()) { |
|||
p.reconnectCbMu.Lock() |
|||
p.reconnectCb = cb |
|||
p.reconnectCbMu.Unlock() |
|||
} |
|||
|
|||
func (p *Peer) reconnect(ctx context.Context) error { |
|||
p.cleanupCurrentSession() |
|||
if err := p.connectOnce(ctx); err != nil { |
|||
return err |
|||
} |
|||
|
|||
p.reconnectCbMu.RLock() |
|||
cb := p.reconnectCb |
|||
p.reconnectCbMu.RUnlock() |
|||
if cb != nil { |
|||
cb() |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func (p *Peer) connectOnce(ctx context.Context) error { |
|||
conn, roomURL, err := GetConnectionInfo(ctx, p.roomURL, p.name) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
origin := WebOriginFromRoomURL(roomURL) |
|||
|
|||
config := webrtc.Configuration{ |
|||
ICEServers: []webrtc.ICEServer{ |
|||
{URLs: []string{"stun:stun.rtc.yandex.net:3478"}}, |
|||
}, |
|||
SDPSemantics: webrtc.SDPSemanticsUnifiedPlan, |
|||
} |
|||
|
|||
api := webrtc.NewAPI() |
|||
|
|||
pcSub, err := api.NewPeerConnection(config) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
pcPub, err := api.NewPeerConnection(config) |
|||
if err != nil { |
|||
_ = pcSub.Close() |
|||
return err |
|||
} |
|||
|
|||
session := &peerSession{ |
|||
conn: conn, |
|||
pcSub: pcSub, |
|||
pcPub: pcPub, |
|||
} |
|||
|
|||
pcSub.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { |
|||
debugf("Telemost subscriber state: %s", state.String()) |
|||
if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateDisconnected { |
|||
p.signalReconnectIfCurrent(session, "subscriber state "+state.String()) |
|||
} |
|||
}) |
|||
pcPub.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { |
|||
debugf("Telemost publisher state: %s", state.String()) |
|||
if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateDisconnected { |
|||
p.signalReconnectIfCurrent(session, "publisher state "+state.String()) |
|||
} |
|||
}) |
|||
|
|||
dc, err := pcPub.CreateDataChannel(dataChannelLabel, nil) |
|||
if err != nil { |
|||
p.cleanupSession(session) |
|||
return err |
|||
} |
|||
session.dc = dc |
|||
|
|||
dcReady := make(chan struct{}, 1) |
|||
dc.OnOpen(func() { |
|||
if session.connected.CompareAndSwap(false, true) { |
|||
log.Printf("Telemost DataChannel connected") |
|||
} |
|||
select { |
|||
case dcReady <- struct{}{}: |
|||
default: |
|||
} |
|||
}) |
|||
dc.OnClose(func() { |
|||
p.signalReconnectIfCurrent(session, "publisher datachannel closed") |
|||
}) |
|||
dc.OnMessage(func(msg webrtc.DataChannelMessage) { |
|||
if p.onData != nil && len(msg.Data) > 0 { |
|||
p.onData(msg.Data) |
|||
} |
|||
}) |
|||
|
|||
pcSub.OnDataChannel(func(remoteDC *webrtc.DataChannel) { |
|||
if !isTransportDataChannelLabel(remoteDC.Label()) { |
|||
debugf("Telemost remote datachannel ignored: %s", remoteDC.Label()) |
|||
return |
|||
} |
|||
debugf("Telemost remote datachannel opened: %s", remoteDC.Label()) |
|||
remoteDC.OnClose(func() { |
|||
p.signalReconnectIfCurrent(session, "remote datachannel "+remoteDC.Label()+" closed") |
|||
}) |
|||
remoteDC.OnMessage(func(msg webrtc.DataChannelMessage) { |
|||
if p.onData != nil && len(msg.Data) > 0 { |
|||
p.onData(msg.Data) |
|||
} |
|||
}) |
|||
}) |
|||
|
|||
headers := http.Header{} |
|||
headers.Set("Origin", origin) |
|||
headers.Set("Referer", origin+"/") |
|||
|
|||
ws, resp, err := websocket.DefaultDialer.Dial(conn.ClientConfig.MediaServerURL, headers) |
|||
if err != nil { |
|||
if resp != nil && resp.Body != nil { |
|||
if closeErr := resp.Body.Close(); closeErr != nil { |
|||
debugf("Telemost websocket response close failed: %v", closeErr) |
|||
} |
|||
} |
|||
p.cleanupSession(session) |
|||
return err |
|||
} |
|||
session.ws = ws |
|||
|
|||
ws.SetPongHandler(func(string) error { |
|||
return ws.SetReadDeadline(time.Now().Add(60 * time.Second)) |
|||
}) |
|||
if err := ws.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil { |
|||
p.cleanupSession(session) |
|||
return err |
|||
} |
|||
|
|||
p.sessionMu.Lock() |
|||
p.session = session |
|||
p.sessionMu.Unlock() |
|||
|
|||
p.setupICEHandlers(session) |
|||
|
|||
go p.keepAlive(session) |
|||
go p.handleSignaling(session) |
|||
|
|||
if err := p.sendHello(session); err != nil { |
|||
p.cleanupCurrentSession() |
|||
return err |
|||
} |
|||
|
|||
select { |
|||
case <-ctx.Done(): |
|||
p.cleanupCurrentSession() |
|||
return ctx.Err() |
|||
case <-p.closeCh: |
|||
p.cleanupCurrentSession() |
|||
return context.Canceled |
|||
case <-time.After(15 * time.Second): |
|||
p.cleanupCurrentSession() |
|||
return fmt.Errorf("datachannel timeout") |
|||
case <-dcReady: |
|||
return nil |
|||
} |
|||
} |
|||
|
|||
func (p *Peer) keepAlive(session *peerSession) { |
|||
wsPingTicker := time.NewTicker(30 * time.Second) |
|||
defer wsPingTicker.Stop() |
|||
appPingTicker := time.NewTicker(5 * time.Second) |
|||
defer appPingTicker.Stop() |
|||
|
|||
for { |
|||
select { |
|||
case <-p.closeCh: |
|||
return |
|||
case <-wsPingTicker.C: |
|||
if err := p.writeControl(session, websocket.PingMessage, []byte{}); err != nil { |
|||
p.signalReconnectIfCurrent(session, "websocket ping failed") |
|||
return |
|||
} |
|||
case <-appPingTicker.C: |
|||
if err := p.writeJSON(session, map[string]interface{}{ |
|||
"uid": uuid.New().String(), |
|||
"ping": map[string]interface{}{}, |
|||
}); err != nil { |
|||
p.signalReconnectIfCurrent(session, "application ping failed") |
|||
return |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
func (p *Peer) handleSignaling(session *peerSession) { |
|||
pubSent := false |
|||
|
|||
for { |
|||
var msg map[string]interface{} |
|||
if err := session.ws.ReadJSON(&msg); err != nil { |
|||
if p.isCurrentSession(session) && !p.closed.Load() { |
|||
debugf("Telemost signaling read error: %v", err) |
|||
p.signalReconnectIfCurrent(session, "signaling read failed") |
|||
} |
|||
return |
|||
} |
|||
if err := session.ws.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil { |
|||
if p.isCurrentSession(session) && !p.closed.Load() { |
|||
debugf("Telemost signaling read deadline update failed: %v", err) |
|||
p.signalReconnectIfCurrent(session, "signaling deadline update failed") |
|||
} |
|||
return |
|||
} |
|||
|
|||
uid := "" |
|||
if rawUID, ok := msg["uid"].(string); ok { |
|||
uid = rawUID |
|||
} |
|||
|
|||
sendAck := func() { |
|||
if err := p.sendAck(session, uid); err != nil { |
|||
debugf("Telemost signaling ack failed: %v", err) |
|||
} |
|||
} |
|||
|
|||
if _, ok := msg["serverHello"]; ok { |
|||
sendAck() |
|||
} |
|||
if _, ok := msg["updateDescription"]; ok { |
|||
sendAck() |
|||
} |
|||
if _, ok := msg["vadActivity"]; ok { |
|||
sendAck() |
|||
} |
|||
if _, ok := msg["ping"]; ok { |
|||
if err := p.sendPong(session, uid); err != nil { |
|||
debugf("Telemost signaling pong failed: %v", err) |
|||
} |
|||
continue |
|||
} |
|||
if _, ok := msg["pong"]; ok { |
|||
sendAck() |
|||
continue |
|||
} |
|||
|
|||
if offer, ok := msg["subscriberSdpOffer"].(map[string]interface{}); ok && !pubSent { |
|||
sdp, ok := offer["sdp"].(string) |
|||
if !ok || sdp == "" { |
|||
debugf("Telemost subscriber offer missing SDP") |
|||
continue |
|||
} |
|||
|
|||
pcSeq, ok := offer["pcSeq"].(float64) |
|||
if !ok { |
|||
debugf("Telemost subscriber offer missing pcSeq") |
|||
continue |
|||
} |
|||
|
|||
if err := session.pcSub.SetRemoteDescription(webrtc.SessionDescription{ |
|||
Type: webrtc.SDPTypeOffer, |
|||
SDP: sdp, |
|||
}); err != nil { |
|||
debugf("Telemost subscriber SetRemoteDescription failed: %v", err) |
|||
continue |
|||
} |
|||
|
|||
answer, err := session.pcSub.CreateAnswer(nil) |
|||
if err != nil { |
|||
debugf("Telemost subscriber CreateAnswer failed: %v", err) |
|||
continue |
|||
} |
|||
if err := session.pcSub.SetLocalDescription(answer); err != nil { |
|||
debugf("Telemost subscriber SetLocalDescription failed: %v", err) |
|||
continue |
|||
} |
|||
if err := p.writeJSON(session, map[string]interface{}{ |
|||
"uid": uuid.New().String(), |
|||
"subscriberSdpAnswer": map[string]interface{}{ |
|||
"pcSeq": int(pcSeq), |
|||
"sdp": answer.SDP, |
|||
}, |
|||
}); err != nil { |
|||
debugf("Telemost subscriber answer send failed: %v", err) |
|||
continue |
|||
} |
|||
|
|||
sendAck() |
|||
time.Sleep(300 * time.Millisecond) |
|||
|
|||
pubOffer, err := session.pcPub.CreateOffer(nil) |
|||
if err != nil { |
|||
debugf("Telemost publisher CreateOffer failed: %v", err) |
|||
continue |
|||
} |
|||
if err := session.pcPub.SetLocalDescription(pubOffer); err != nil { |
|||
debugf("Telemost publisher SetLocalDescription failed: %v", err) |
|||
continue |
|||
} |
|||
if err := p.writeJSON(session, map[string]interface{}{ |
|||
"uid": uuid.New().String(), |
|||
"publisherSdpOffer": map[string]interface{}{ |
|||
"pcSeq": 1, |
|||
"sdp": pubOffer.SDP, |
|||
}, |
|||
}); err != nil { |
|||
debugf("Telemost publisher offer send failed: %v", err) |
|||
continue |
|||
} |
|||
|
|||
pubSent = true |
|||
continue |
|||
} |
|||
|
|||
if answer, ok := msg["publisherSdpAnswer"].(map[string]interface{}); ok { |
|||
sdp, ok := answer["sdp"].(string) |
|||
if !ok || sdp == "" { |
|||
debugf("Telemost publisher answer missing SDP") |
|||
continue |
|||
} |
|||
if err := session.pcPub.SetRemoteDescription(webrtc.SessionDescription{ |
|||
Type: webrtc.SDPTypeAnswer, |
|||
SDP: sdp, |
|||
}); err != nil { |
|||
debugf("Telemost publisher SetRemoteDescription failed: %v", err) |
|||
} |
|||
sendAck() |
|||
continue |
|||
} |
|||
|
|||
if cand, ok := msg["webrtcIceCandidate"].(map[string]interface{}); ok { |
|||
p.handleICE(session, cand) |
|||
} |
|||
} |
|||
} |
|||
|
|||
func (p *Peer) setupICEHandlers(session *peerSession) { |
|||
session.pcSub.OnICECandidate(func(c *webrtc.ICECandidate) { |
|||
if c == nil { |
|||
return |
|||
} |
|||
init := c.ToJSON() |
|||
if err := p.writeJSON(session, map[string]interface{}{ |
|||
"uid": uuid.New().String(), |
|||
"webrtcIceCandidate": map[string]interface{}{ |
|||
"candidate": init.Candidate, |
|||
"sdpMid": init.SDPMid, |
|||
"sdpMlineIndex": init.SDPMLineIndex, |
|||
"target": "SUBSCRIBER", |
|||
"pcSeq": 1, |
|||
}, |
|||
}); err != nil && p.isCurrentSession(session) && !p.closed.Load() { |
|||
debugf("Telemost subscriber ICE send failed: %v", err) |
|||
} |
|||
}) |
|||
|
|||
session.pcPub.OnICECandidate(func(c *webrtc.ICECandidate) { |
|||
if c == nil { |
|||
return |
|||
} |
|||
init := c.ToJSON() |
|||
if err := p.writeJSON(session, map[string]interface{}{ |
|||
"uid": uuid.New().String(), |
|||
"webrtcIceCandidate": map[string]interface{}{ |
|||
"candidate": init.Candidate, |
|||
"sdpMid": init.SDPMid, |
|||
"sdpMlineIndex": init.SDPMLineIndex, |
|||
"target": "PUBLISHER", |
|||
"pcSeq": 1, |
|||
}, |
|||
}); err != nil && p.isCurrentSession(session) && !p.closed.Load() { |
|||
debugf("Telemost publisher ICE send failed: %v", err) |
|||
} |
|||
}) |
|||
} |
|||
|
|||
func (p *Peer) handleICE(session *peerSession, cand map[string]interface{}) { |
|||
candidate, ok := cand["candidate"].(string) |
|||
if !ok || candidate == "" || len(strings.Fields(candidate)) < 8 { |
|||
return |
|||
} |
|||
|
|||
target, ok := cand["target"].(string) |
|||
if !ok || target == "" { |
|||
return |
|||
} |
|||
|
|||
sdpMid := "" |
|||
if rawSDPMid, ok := cand["sdpMid"].(string); ok { |
|||
sdpMid = rawSDPMid |
|||
} |
|||
sdpMLineIndex, ok := cand["sdpMlineIndex"].(float64) |
|||
if !ok { |
|||
return |
|||
} |
|||
|
|||
index := uint16(sdpMLineIndex) |
|||
init := webrtc.ICECandidateInit{ |
|||
Candidate: candidate, |
|||
SDPMid: &sdpMid, |
|||
SDPMLineIndex: &index, |
|||
} |
|||
|
|||
switch target { |
|||
case "SUBSCRIBER": |
|||
if err := session.pcSub.AddICECandidate(init); err != nil { |
|||
debugf("Telemost subscriber ICE apply failed: %v", err) |
|||
} |
|||
case "PUBLISHER": |
|||
if err := session.pcPub.AddICECandidate(init); err != nil { |
|||
debugf("Telemost publisher ICE apply failed: %v", err) |
|||
} |
|||
} |
|||
} |
|||
|
|||
func (p *Peer) sendHello(session *peerSession) error { |
|||
hello := map[string]interface{}{ |
|||
"uid": uuid.New().String(), |
|||
"hello": map[string]interface{}{ |
|||
"participantMeta": map[string]interface{}{ |
|||
"name": p.name, |
|||
"role": "SPEAKER", |
|||
"sendAudio": false, |
|||
"sendVideo": false, |
|||
}, |
|||
"participantAttributes": map[string]interface{}{ |
|||
"name": p.name, |
|||
"role": "SPEAKER", |
|||
}, |
|||
"sendAudio": false, |
|||
"sendVideo": false, |
|||
"sendSharing": false, |
|||
"participantId": session.conn.PeerID, |
|||
"roomId": session.conn.RoomID, |
|||
"serviceName": "telemost", |
|||
"credentials": session.conn.Credentials, |
|||
"capabilitiesOffer": map[string]interface{}{ |
|||
"offerAnswerMode": []string{"SEPARATE"}, |
|||
"initialSubscriberOffer": []string{"ON_HELLO"}, |
|||
"slotsMode": []string{"FROM_CONTROLLER"}, |
|||
"simulcastMode": []string{"DISABLED"}, |
|||
"selfVadStatus": []string{"FROM_SERVER"}, |
|||
"dataChannelSharing": []string{"TO_RTP"}, |
|||
}, |
|||
"sdkInfo": map[string]interface{}{ |
|||
"implementation": "go", |
|||
"version": "1.0.0", |
|||
"userAgent": "vk-turn-proxy-" + p.name, |
|||
}, |
|||
"sdkInitializationId": uuid.New().String(), |
|||
"disablePublisher": false, |
|||
"disableSubscriber": false, |
|||
}, |
|||
} |
|||
|
|||
return p.writeJSON(session, hello) |
|||
} |
|||
|
|||
func (p *Peer) sendAck(session *peerSession, uid string) error { |
|||
return p.writeJSON(session, map[string]interface{}{ |
|||
"uid": uid, |
|||
"ack": map[string]interface{}{ |
|||
"status": map[string]interface{}{ |
|||
"code": "OK", |
|||
}, |
|||
}, |
|||
}) |
|||
} |
|||
|
|||
func (p *Peer) sendPong(session *peerSession, uid string) error { |
|||
return p.writeJSON(session, map[string]interface{}{ |
|||
"uid": uid, |
|||
"pong": map[string]interface{}{}, |
|||
}) |
|||
} |
|||
|
|||
func (p *Peer) writeJSON(session *peerSession, payload any) error { |
|||
p.wsMu.Lock() |
|||
defer p.wsMu.Unlock() |
|||
|
|||
if session.ws == nil { |
|||
return fmt.Errorf("websocket is closed") |
|||
} |
|||
|
|||
return session.ws.WriteJSON(payload) |
|||
} |
|||
|
|||
func (p *Peer) writeControl(session *peerSession, messageType int, data []byte) error { |
|||
p.wsMu.Lock() |
|||
defer p.wsMu.Unlock() |
|||
|
|||
if session.ws == nil { |
|||
return fmt.Errorf("websocket is closed") |
|||
} |
|||
|
|||
return session.ws.WriteControl(messageType, data, time.Now().Add(10*time.Second)) |
|||
} |
|||
|
|||
func (p *Peer) cleanupCurrentSession() { |
|||
p.sessionMu.Lock() |
|||
session := p.session |
|||
p.session = nil |
|||
p.sessionMu.Unlock() |
|||
|
|||
p.cleanupSession(session) |
|||
} |
|||
|
|||
func (p *Peer) cleanupSession(session *peerSession) { |
|||
if session == nil { |
|||
return |
|||
} |
|||
|
|||
if session.connected.Swap(false) { |
|||
log.Printf("Telemost DataChannel closed") |
|||
} |
|||
|
|||
if session.dc != nil { |
|||
_ = session.dc.Close() |
|||
} |
|||
if session.pcPub != nil { |
|||
_ = session.pcPub.Close() |
|||
} |
|||
if session.pcSub != nil { |
|||
_ = session.pcSub.Close() |
|||
} |
|||
if session.ws != nil { |
|||
p.wsMu.Lock() |
|||
if err := session.ws.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(time.Second)); err != nil { |
|||
debugf("Telemost websocket close control failed: %v", err) |
|||
} |
|||
_ = session.ws.Close() |
|||
p.wsMu.Unlock() |
|||
} |
|||
} |
|||
|
|||
func (p *Peer) isCurrentSession(session *peerSession) bool { |
|||
p.sessionMu.RLock() |
|||
defer p.sessionMu.RUnlock() |
|||
return p.session == session |
|||
} |
|||
|
|||
func (p *Peer) signalReconnectIfCurrent(session *peerSession, reason string) { |
|||
if p.closed.Load() || !p.isCurrentSession(session) { |
|||
return |
|||
} |
|||
|
|||
if session.connected.Swap(false) { |
|||
log.Printf("Telemost DataChannel closed") |
|||
} |
|||
if reason != "" { |
|||
debugf("Telemost DataChannel disconnect reason: %s", reason) |
|||
} |
|||
|
|||
select { |
|||
case p.reconnectCh <- struct{}{}: |
|||
default: |
|||
} |
|||
} |
|||
@ -0,0 +1,57 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"context" |
|||
"errors" |
|||
"log" |
|||
"net" |
|||
|
|||
"github.com/cacggghp/vk-turn-proxy/internal/telemost" |
|||
) |
|||
|
|||
func runTelemostDataChannelMode(ctx context.Context, inviteLink, connectAddr string) error { |
|||
backendConn, err := net.Dial("udp", connectAddr) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
defer func(backendConn net.Conn) { |
|||
err := backendConn.Close() |
|||
if err != nil { |
|||
log.Println(err) |
|||
} |
|||
}(backendConn) |
|||
|
|||
peer, err := telemost.NewConnectedPeer(ctx, inviteLink, func(data []byte) { |
|||
if _, writeErr := backendConn.Write(data); writeErr != nil { |
|||
log.Printf("Telemost DataChannel: failed to write backend packet: %v", writeErr) |
|||
} |
|||
}, nil) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
defer func(peer *telemost.Peer) { |
|||
err := peer.Close() |
|||
if err != nil { |
|||
log.Println(err) |
|||
} |
|||
}(peer) |
|||
|
|||
closeOnContextDone(ctx, backendConn) |
|||
|
|||
log.Printf("Telemost DataChannel mode: forwarding to %s", connectAddr) |
|||
|
|||
buf := make([]byte, 2048) |
|||
for { |
|||
n, err := backendConn.Read(buf) |
|||
if err != nil { |
|||
if ctx.Err() != nil || errors.Is(err, net.ErrClosed) { |
|||
return nil |
|||
} |
|||
return err |
|||
} |
|||
|
|||
if err := peer.Send(buf[:n]); err != nil { |
|||
log.Printf("Telemost DataChannel: dropped backend packet (%d bytes): %v", n, err) |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,264 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"context" |
|||
"errors" |
|||
"fmt" |
|||
"io" |
|||
"log" |
|||
"net" |
|||
"sync" |
|||
"sync/atomic" |
|||
"time" |
|||
|
|||
"github.com/cacggghp/vk-turn-proxy/internal/dcmux" |
|||
"github.com/cacggghp/vk-turn-proxy/internal/telemost" |
|||
) |
|||
|
|||
type telemostBackendStream struct { |
|||
conn net.Conn |
|||
ctx context.Context |
|||
cancel context.CancelFunc |
|||
writeCh chan []byte |
|||
closeOnce sync.Once |
|||
connMu sync.Mutex |
|||
closed atomic.Bool |
|||
} |
|||
|
|||
func newTelemostBackendStream(parent context.Context, conn net.Conn) *telemostBackendStream { |
|||
ctx, cancel := context.WithCancel(parent) |
|||
return &telemostBackendStream{ |
|||
conn: conn, |
|||
ctx: ctx, |
|||
cancel: cancel, |
|||
writeCh: make(chan []byte, 32), |
|||
} |
|||
} |
|||
|
|||
func (s *telemostBackendStream) Close() { |
|||
s.closeOnce.Do(func() { |
|||
s.closed.Store(true) |
|||
s.cancel() |
|||
s.connMu.Lock() |
|||
_ = s.conn.Close() |
|||
s.connMu.Unlock() |
|||
}) |
|||
} |
|||
|
|||
func enqueueBackendData(stream *telemostBackendStream, data []byte) error { |
|||
if stream.closed.Load() { |
|||
return context.Canceled |
|||
} |
|||
|
|||
select { |
|||
case <-stream.ctx.Done(): |
|||
return context.Canceled |
|||
case stream.writeCh <- data: |
|||
return nil |
|||
default: |
|||
return fmt.Errorf("backend write queue full") |
|||
} |
|||
} |
|||
|
|||
func (s *telemostBackendStream) write(data []byte) error { |
|||
if s.closed.Load() { |
|||
return net.ErrClosed |
|||
} |
|||
|
|||
s.connMu.Lock() |
|||
defer s.connMu.Unlock() |
|||
|
|||
if s.closed.Load() { |
|||
return net.ErrClosed |
|||
} |
|||
|
|||
if err := s.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)); err != nil { |
|||
return fmt.Errorf("set backend write deadline: %w", err) |
|||
} |
|||
if _, err := s.conn.Write(data); err != nil { |
|||
return fmt.Errorf("write backend data: %w", err) |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
|
|||
func handleTelemostBackendStream(streamID uint16, stream *telemostBackendStream, mux *dcmux.Multiplexer, closeStream func(uint16), closeMuxStream func(uint16)) { |
|||
defer closeStream(streamID) |
|||
defer closeMuxStream(streamID) |
|||
|
|||
var wg sync.WaitGroup |
|||
wg.Add(2) |
|||
|
|||
go func() { |
|||
defer wg.Done() |
|||
defer stream.cancel() |
|||
|
|||
buf := make([]byte, 32768) |
|||
for { |
|||
n, readErr := stream.conn.Read(buf) |
|||
if readErr != nil { |
|||
if errors.Is(readErr, io.EOF) || errors.Is(readErr, net.ErrClosed) { |
|||
if telemost.DebugEnabled() { |
|||
log.Printf("Telemost DataChannel VLESS backend stream closed: %v", readErr) |
|||
} |
|||
} else { |
|||
log.Printf("Telemost DataChannel VLESS backend read error: %v", readErr) |
|||
} |
|||
return |
|||
} |
|||
if sendErr := mux.SendData(streamID, buf[:n]); sendErr != nil { |
|||
return |
|||
} |
|||
} |
|||
}() |
|||
|
|||
go func() { |
|||
defer wg.Done() |
|||
defer stream.cancel() |
|||
|
|||
for { |
|||
select { |
|||
case <-stream.ctx.Done(): |
|||
return |
|||
case data := <-stream.writeCh: |
|||
if err := stream.write(data); err != nil { |
|||
if errors.Is(err, net.ErrClosed) || errors.Is(err, context.Canceled) || stream.closed.Load() { |
|||
return |
|||
} |
|||
log.Printf("Telemost DataChannel VLESS backend write error: %v", err) |
|||
return |
|||
} |
|||
} |
|||
} |
|||
}() |
|||
|
|||
wg.Wait() |
|||
} |
|||
|
|||
func runTelemostDataChannelVLESSMode(ctx context.Context, inviteLink, connectAddr string) error { |
|||
var ( |
|||
connMu sync.Mutex |
|||
conns = make(map[uint16]*telemostBackendStream) |
|||
) |
|||
|
|||
var peer *telemost.Peer |
|||
mux := dcmux.New(0, func(frame []byte) error { |
|||
return peer.Send(frame) |
|||
}) |
|||
|
|||
closeStream := func(sid uint16) { |
|||
connMu.Lock() |
|||
stream := conns[sid] |
|||
delete(conns, sid) |
|||
connMu.Unlock() |
|||
if stream != nil { |
|||
stream.Close() |
|||
} |
|||
} |
|||
|
|||
closeAll := func() { |
|||
connMu.Lock() |
|||
streams := make([]*telemostBackendStream, 0, len(conns)) |
|||
for sid, stream := range conns { |
|||
streams = append(streams, stream) |
|||
delete(conns, sid) |
|||
} |
|||
connMu.Unlock() |
|||
|
|||
for _, stream := range streams { |
|||
stream.Close() |
|||
} |
|||
} |
|||
|
|||
closeMuxStream := func(sid uint16) { |
|||
if mux.StreamClosed(sid) { |
|||
return |
|||
} |
|||
if err := mux.CloseStream(sid); err != nil { |
|||
log.Printf("Telemost DataChannel VLESS server: failed to close mux stream %d: %v", sid, err) |
|||
} |
|||
} |
|||
|
|||
getOrCreateBackendStream := func(sid uint16) (*telemostBackendStream, error) { |
|||
connMu.Lock() |
|||
stream := conns[sid] |
|||
connMu.Unlock() |
|||
if stream != nil { |
|||
return stream, nil |
|||
} |
|||
|
|||
dialer := &net.Dialer{Timeout: 10 * time.Second, KeepAlive: 30 * time.Second} |
|||
conn, err := dialer.DialContext(ctx, "tcp", connectAddr) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
|
|||
stream = newTelemostBackendStream(ctx, conn) |
|||
|
|||
connMu.Lock() |
|||
if existing := conns[sid]; existing != nil { |
|||
connMu.Unlock() |
|||
stream.Close() |
|||
return existing, nil |
|||
} |
|||
conns[sid] = stream |
|||
connMu.Unlock() |
|||
|
|||
go handleTelemostBackendStream(sid, stream, mux, closeStream, closeMuxStream) |
|||
return stream, nil |
|||
} |
|||
|
|||
peer, err := telemost.NewConnectedPeer(ctx, inviteLink, mux.HandleFrame, func() { |
|||
if telemost.DebugEnabled() { |
|||
log.Printf("Telemost DataChannel VLESS server: peer reconnected, closing active backend streams") |
|||
} |
|||
closeAll() |
|||
mux.Reset() |
|||
}) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
defer func(peer *telemost.Peer) { |
|||
err := peer.Close() |
|||
if err != nil { |
|||
log.Println(err) |
|||
} |
|||
}(peer) |
|||
|
|||
log.Printf("Telemost DataChannel VLESS server: forwarding to %s", connectAddr) |
|||
activityCh := mux.WaitForActivity() |
|||
|
|||
for { |
|||
select { |
|||
case <-ctx.Done(): |
|||
closeAll() |
|||
return nil |
|||
case <-activityCh: |
|||
} |
|||
|
|||
for _, sid := range mux.GetStreams() { |
|||
data := mux.ReadStream(sid) |
|||
if len(data) > 0 { |
|||
stream, err := getOrCreateBackendStream(sid) |
|||
if err != nil { |
|||
log.Printf("Telemost DataChannel VLESS backend dial error: %v", err) |
|||
closeMuxStream(sid) |
|||
continue |
|||
} |
|||
if err := enqueueBackendData(stream, data); err != nil { |
|||
if !errors.Is(err, context.Canceled) { |
|||
log.Printf("Telemost DataChannel VLESS backend stream %d stalled: %v", sid, err) |
|||
} |
|||
closeStream(sid) |
|||
closeMuxStream(sid) |
|||
continue |
|||
} |
|||
} |
|||
|
|||
if mux.StreamClosed(sid) { |
|||
closeStream(sid) |
|||
mux.CleanupStream(sid) |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,70 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"context" |
|||
"errors" |
|||
"net" |
|||
"strings" |
|||
"testing" |
|||
"time" |
|||
) |
|||
|
|||
func TestEnqueueBackendDataReturnsWhenQueueIsFull(t *testing.T) { |
|||
clientConn, serverConn := net.Pipe() |
|||
defer func() { |
|||
_ = clientConn.Close() |
|||
_ = serverConn.Close() |
|||
}() |
|||
|
|||
stream := newTelemostBackendStream(context.Background(), clientConn) |
|||
defer stream.Close() |
|||
|
|||
stream.writeCh = make(chan []byte, 1) |
|||
stream.writeCh <- []byte("busy") |
|||
|
|||
done := make(chan error, 1) |
|||
go func() { |
|||
done <- enqueueBackendData(stream, []byte("next")) |
|||
}() |
|||
|
|||
select { |
|||
case err := <-done: |
|||
if err == nil || !strings.Contains(err.Error(), "queue full") { |
|||
t.Fatalf("enqueueBackendData() error = %v, want queue full", err) |
|||
} |
|||
case <-time.After(200 * time.Millisecond): |
|||
t.Fatal("enqueueBackendData() blocked on a full queue") |
|||
} |
|||
} |
|||
|
|||
func TestEnqueueBackendDataReturnsCanceledForClosedStream(t *testing.T) { |
|||
clientConn, serverConn := net.Pipe() |
|||
defer func() { |
|||
_ = clientConn.Close() |
|||
_ = serverConn.Close() |
|||
}() |
|||
|
|||
stream := newTelemostBackendStream(context.Background(), clientConn) |
|||
stream.Close() |
|||
|
|||
err := enqueueBackendData(stream, []byte("next")) |
|||
if !errors.Is(err, context.Canceled) { |
|||
t.Fatalf("enqueueBackendData() error = %v, want context canceled", err) |
|||
} |
|||
} |
|||
|
|||
func TestTelemostBackendStreamWriteReturnsNetErrClosedAfterClose(t *testing.T) { |
|||
clientConn, serverConn := net.Pipe() |
|||
defer func() { |
|||
_ = clientConn.Close() |
|||
_ = serverConn.Close() |
|||
}() |
|||
|
|||
stream := newTelemostBackendStream(context.Background(), clientConn) |
|||
stream.Close() |
|||
|
|||
err := stream.write([]byte("next")) |
|||
if !errors.Is(err, net.ErrClosed) { |
|||
t.Fatalf("stream.write() error = %v, want net.ErrClosed", err) |
|||
} |
|||
} |
|||
Loading…
Reference in new issue