Browse Source

Add multi-user sessions and session ID support

Introduce multi-user proxy sessions: add an English README and update the Russian README. Server: implement SessionManager and UserSession to aggregate DTLS streams by a 16-byte session ID, create a single UDP backend per session, and deliver backend->client traffic using round-robin across active DTLS streams. Simplify per-connection logic (read session ID after DTLS handshake, reuse backend, improved accept/handshake/error handling). Client: add -session-id flag, generate or parse a 16-byte UUID, send the session ID immediately after establishing each DTLS stream, and propagate sessionID through connection loops. These changes improve stability (prevent endpoint thrashing) and enable multiple simultaneous users per proxy server.
pull/26/head
kiper292 4 months ago
parent
commit
b4d5a05556
  1. 90
      README.en.md
  2. 12
      README.md
  3. 62
      client/main.go
  4. 279
      server/main.go

90
README.en.md

@ -0,0 +1,90 @@
# Good TURN
[Russian version](README.md)
Tunnels WireGuard/Hysteria traffic through VK Calls or Yandex Telemost TURN servers. Packets are encrypted with DTLS 1.2 and then sent in parallel streams via TCP or UDP to the TURN server using the STUN ChannelData protocol. From there, they are forwarded via UDP to your server, decrypted, and passed to WireGuard. TURN credentials are generated from the meeting link.
**Update: Multi-user Proxy Server**
The current implementation supports multiple simultaneous users through a single proxy server.
- **Session Identification:** The client generates a unique 16-byte UUID at startup.
- **Stream Aggregation:** The server groups all incoming DTLS connections from a single client by its UUID.
- **Stable Backend:** For each session, exactly one UDP connection is created to the WireGuard server. This prevents the "endpoint thrashing" issue and increases stability.
- **Load Balancing:** Outgoing traffic from the server to the client is distributed among all active DTLS streams of the user (Round-Robin).
For educational purposes only!
## Setup
You will need:
1. A link to an active VK call: create your own (requires a VK account) or search for `"https://vk.com/call/join/"`. Links are valid forever unless "end call for all" is clicked.
2. Or a link to a Yandex Telemost call: `"https://telemost.yandex.ru/j/"`. Better not to search for these as conference participants are visible.
3. A VPS with WireGuard installed.
4. For Android: Download Termux from F-Droid.
### Server
```bash
./server -listen 0.0.0.0:56000 -connect 127.0.0.1:<wg_port>
```
### Client
#### Android
**Recommended method:**
Use the native Android app [vk-turn-proxy-android](https://github.com/MYSOREZ/vk-turn-proxy-android).
- In the WireGuard client config, change the server address to `127.0.0.1:9000` and set MTU to 1280.
- **Add the app to WireGuard exceptions (Excluded Applications). Click "Save".**
**Alternative method (via Termux):**
- In the WireGuard client config, change the server address to `127.0.0.1:9000` and set MTU to 1280.
- **Add Termux to WireGuard exceptions. Click "Save".**
In Termux:
```bash
termux-wake-lock
```
The phone will not enter deep sleep. To disable:
```bash
termux-wake-unlock
```
Copy the binary to a local folder and grant execution rights:
```bash
cp /sdcard/Download/client-android ./
chmod 777 ./client-android
```
Run:
```bash
./client-android -peer <wg_server_ip>:56000 -vk-link <VK_link> -listen 127.0.0.1:9000
```
Additional flags:
- `-session-id <hex>`: set a fixed session ID (32 hex characters).
Or:
```bash
./client-android -udp -turn 5.255.211.241 -peer <wg_server_ip>:56000 -yandex-link <Ya_link> -listen 127.0.0.1:9000
```
#### Linux
In the WireGuard client config, change the server address to `127.0.0.1:9000` and set MTU to 1280.
The script will add routes to the necessary IPs:
```bash
./client-linux -peer <wg_server_ip>:56000 -vk-link <VK_link> -listen 127.0.0.1:9000 | sudo routes.sh
```
#### Windows
In the WireGuard client config, change the server address to `127.0.0.1:9000` and set MTU to 1280.
In PowerShell as Administrator (so the script can add routes):
```powershell
./client.exe -peer <wg_server_ip>:56000 -vk-link <VK_link> -listen 127.0.0.1:9000 | routes.ps1
```
### If it doesn't work
Use the `-turn` option to manually specify a TURN server address.
If TCP doesn't work, try adding the `-udp` flag.
Add `-n 1` for a more stable single-stream connection (limited to 5 Mbps for VK).
## Yandex Telemost
**UPD. TELEMOST IS CLOSED**
Unlike VK, Yandex servers do not limit speed, so the default is `-n 1`.
## Direct mode
With the `-no-dtls` flag, you can send packets without DTLS obfuscation and connect to regular WireGuard servers. This may result in a ban from VK/Yandex.

12
README.md

@ -1,6 +1,15 @@
# Good TURN
[English version](README.en.md)
Проброс трафика WireGuard/Hysteria через TURN сервера VK звонков или Яндекс телемоста. Пакеты шифруются DTLS 1.2, затем параллельными потоками через TCP или UDP отправляются на TURN сервер по протоколу STUN ChannelData. Оттуда по UDP отправляются на ваш сервер, где расшифровываются и передаются в WireGuard. Логин/пароль от TURN генерируются из ссылки на звонок.
**Обновление: Многопользовательский прокси-сервер**
Текущая реализация поддерживает одновременную работу нескольких пользователей через один прокси-сервер.
- **Идентификация сессий:** Клиент генерирует уникальный 16-байтный UUID при старте.
- **Агрегация потоков:** Сервер группирует все входящие DTLS-соединения от одного клиента по его UUID.
- **Стабильный бэкенд:** Для каждой сессии создается ровно одно UDP-соединение с WireGuard-сервером. Это предотвращает проблему "прыгающих портов" (endpoint thrashing) и повышает стабильность.
- **Балансировка:** Исходящий трафик от сервера к клиенту распределяется между всеми активными DTLS-потоками пользователя (Round-Robin).
Только для учебных целей!
## Настройка
Нам понадобится:
@ -41,6 +50,9 @@ chmod 777 ./client-android
```
./client-android -peer <ip сервера wg>:56000 -vk-link <VK ссылка> -listen 127.0.0.1:9000
```
Дополнительные флаги:
- `-session-id <hex>`: установить фиксированный ID сессии (32 символа hex).
Или
```
./client-android -udp -turn 5.255.211.241 -peer <ip сервера wg>:56000 -yandex-link <Ya ссылка> -listen 127.0.0.1:9000

62
client/main.go

@ -453,7 +453,7 @@ func dtlsFunc(ctx context.Context, conn net.PacketConn, peer *net.UDPAddr) (net.
return dtlsConn, nil
}
func oneDtlsConnection(ctx context.Context, peer *net.UDPAddr, listenConn net.PacketConn, connchan chan<- net.PacketConn, okchan chan<- struct{}, c chan<- error) {
func oneDtlsConnection(ctx context.Context, peer *net.UDPAddr, listenConn net.PacketConn, connchan chan<- net.PacketConn, okchan chan<- struct{}, c chan<- error, sessionID []byte) {
var err error = nil
defer func() { c <- err }()
dtlsctx, dtlscancel := context.WithCancel(ctx)
@ -481,7 +481,15 @@ func oneDtlsConnection(ctx context.Context, peer *net.UDPAddr, listenConn net.Pa
}
log.Printf("Closed DTLS connection\n")
}()
log.Printf("Established DTLS connection!\n")
// Phase 1: Send Session ID
dtlsConn.SetWriteDeadline(time.Now().Add(time.Second * 5))
if _, err1 = dtlsConn.Write(sessionID); err1 != nil {
err = fmt.Errorf("failed to send session ID: %s", err1)
return
}
log.Printf("Established DTLS connection and sent session ID!\n")
go func() {
for {
select {
@ -758,14 +766,14 @@ func oneTurnConnection(ctx context.Context, turnParams *turnParams, peer *net.UD
conn2.SetDeadline(time.Time{})
}
func oneDtlsConnectionLoop(ctx context.Context, peer *net.UDPAddr, listenConnChan <-chan net.PacketConn, connchan chan<- net.PacketConn, okchan chan<- struct{}) {
func oneDtlsConnectionLoop(ctx context.Context, peer *net.UDPAddr, listenConnChan <-chan net.PacketConn, connchan chan<- net.PacketConn, okchan chan<- struct{}, sessionID []byte) {
for {
select {
case <-ctx.Done():
return
case listenConn := <-listenConnChan:
c := make(chan error)
go oneDtlsConnection(ctx, peer, listenConn, connchan, okchan, c)
go oneDtlsConnection(ctx, peer, listenConn, connchan, okchan, c, sessionID)
if err := <-c; err != nil {
log.Printf("%s", err)
}
@ -817,6 +825,7 @@ func main() { //nolint:cyclop
n := flag.Int("n", 0, "connections to TURN (default 16 for VK, 1 for Yandex)")
udp := flag.Bool("udp", false, "connect to TURN with UDP")
direct := flag.Bool("no-dtls", false, "connect without obfuscation. DO NOT USE")
sessionIDFlag := flag.String("session-id", "", "override session ID (hex, 32 chars)")
flag.Parse()
if *peerAddr == "" {
log.Panicf("Need peer address!")
@ -856,6 +865,17 @@ func main() { //nolint:cyclop
getCreds,
}
var sessionID []byte
if *sessionIDFlag != "" {
sessionID = make([]byte, 16)
if _, err := fmt.Sscanf(*sessionIDFlag, "%x", &sessionID); err != nil {
log.Panicf("Invalid session ID: %v", err)
}
} else {
sessionID, _ = uuid.New().MarshalBinary()
}
log.Printf("Session ID: %x", sessionID)
listenConnChan := make(chan net.PacketConn)
listenConn, err := net.ListenPacket("udp", *listen) // nolint: noctx
if err != nil {
@ -880,21 +900,27 @@ func main() { //nolint:cyclop
t := time.Tick(100 * time.Millisecond)
if *direct {
for i := 0; i < *n; i++ {
wg1.Go(func() {
wg1.Add(1)
go func() {
defer wg1.Done()
oneTurnConnectionLoop(ctx, params, peer, listenConnChan, t)
})
}()
}
} else {
okchan := make(chan struct{})
connchan := make(chan net.PacketConn)
wg1.Go(func() {
oneDtlsConnectionLoop(ctx, peer, listenConnChan, connchan, okchan)
})
wg1.Add(1)
go func() {
defer wg1.Done()
oneDtlsConnectionLoop(ctx, peer, listenConnChan, connchan, okchan, sessionID)
}()
wg1.Go(func() {
wg1.Add(1)
go func() {
defer wg1.Done()
oneTurnConnectionLoop(ctx, params, peer, connchan, t)
})
}()
select {
case <-okchan:
@ -902,12 +928,16 @@ func main() { //nolint:cyclop
}
for i := 0; i < *n-1; i++ {
connchan := make(chan net.PacketConn)
wg1.Go(func() {
oneDtlsConnectionLoop(ctx, peer, listenConnChan, connchan, nil)
})
wg1.Go(func() {
wg1.Add(1)
go func() {
defer wg1.Done()
oneDtlsConnectionLoop(ctx, peer, listenConnChan, connchan, nil, sessionID)
}()
wg1.Add(1)
go func() {
defer wg1.Done()
oneTurnConnectionLoop(ctx, params, peer, connchan, t)
})
}()
}
}

279
server/main.go

@ -5,11 +5,13 @@ import (
"crypto/tls"
"flag"
"fmt"
"io"
"log"
"net"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"
@ -17,6 +19,121 @@ import (
"github.com/pion/dtls/v3/pkg/crypto/selfsign"
)
type UserSession struct {
ID string
Conns []net.Conn
BackendConn net.Conn
LastUsed uint32
Lock sync.RWMutex
Ctx context.Context
Cancel context.CancelFunc
Manager *SessionManager
}
type SessionManager struct {
Sessions map[string]*UserSession
Lock sync.RWMutex
}
func (s *SessionManager) GetOrCreate(ctx context.Context, id string, connectAddr string) (*UserSession, error) {
s.Lock.Lock()
defer s.Lock.Unlock()
if session, ok := s.Sessions[id]; ok {
return session, nil
}
backendConn, err := net.Dial("udp", connectAddr)
if err != nil {
return nil, err
}
sessionCtx, cancel := context.WithCancel(ctx)
session := &UserSession{
ID: id,
BackendConn: backendConn,
Manager: s,
Ctx: sessionCtx,
Cancel: cancel,
}
s.Sessions[id] = session
go session.backendReaderLoop()
return session, nil
}
func (s *UserSession) backendReaderLoop() {
defer s.Cleanup()
buf := make([]byte, 1600)
for {
select {
case <-s.Ctx.Done():
return
default:
}
s.BackendConn.SetReadDeadline(time.Now().Add(time.Minute * 5))
n, err := s.BackendConn.Read(buf)
if err != nil {
log.Printf("Session %s backend read error: %v", s.ID, err)
return
}
s.Lock.RLock()
if len(s.Conns) == 0 {
s.Lock.RUnlock()
continue
}
// Round-robin selection of DTLS connection
idx := atomic.AddUint32(&s.LastUsed, 1) % uint32(len(s.Conns))
conn := s.Conns[idx]
s.Lock.RUnlock()
conn.SetWriteDeadline(time.Now().Add(time.Second * 10))
_, err = conn.Write(buf[:n])
if err != nil {
log.Printf("Session %s DTLS write error: %v", s.ID, err)
// Connection will be removed by its own reader loop
}
}
}
func (s *UserSession) AddConn(conn net.Conn) {
s.Lock.Lock()
defer s.Lock.Unlock()
s.Conns = append(s.Conns, conn)
}
func (s *UserSession) RemoveConn(conn net.Conn) {
s.Lock.Lock()
defer s.Lock.Unlock()
for i, c := range s.Conns {
if c == conn {
s.Conns = append(s.Conns[:i], s.Conns[i+1:]...)
break
}
}
// If all connections are gone, we might want to start a timer to cleanup the session
// but for now we'll keep it alive until backendReaderLoop fails or context is cancelled.
}
func (s *UserSession) Cleanup() {
s.Cancel()
s.BackendConn.Close()
s.Manager.Lock.Lock()
delete(s.Manager.Sessions, s.ID)
s.Manager.Lock.Unlock()
s.Lock.Lock()
for _, c := range s.Conns {
c.Close()
}
s.Conns = nil
s.Lock.Unlock()
}
func main() {
listen := flag.String("listen", "0.0.0.0:56000", "listen on ip:port")
connect := flag.String("connect", "", "connect to ip:port")
@ -41,17 +158,12 @@ func main() {
if len(*connect) == 0 {
log.Panicf("server address is required")
}
// Generate a certificate and private key to secure the connection
certificate, genErr := selfsign.GenerateSelfSigned()
if genErr != nil {
panic(err)
panic(genErr)
}
//
// Everything below is the pion-DTLS API! Thanks for using it ❤️.
//
// Prepare the configuration of the DTLS connection
config := &dtls.Config{
Certificates: []tls.Certificate{certificate},
ExtendedMasterSecret: dtls.RequireExtendedMasterSecret,
@ -59,131 +171,86 @@ func main() {
ConnectionIDGenerator: dtls.RandomCIDGenerator(8),
}
// Connect to a DTLS server
listener, err := dtls.Listen("udp", addr, config)
if err != nil {
panic(err)
}
context.AfterFunc(ctx, func() {
if err = listener.Close(); err != nil {
panic(err)
}
listener.Close()
})
fmt.Println("Listening")
manager := &SessionManager{
Sessions: make(map[string]*UserSession),
}
log.Printf("Listening on %s, forwarding to %s", *listen, *connect)
wg1 := sync.WaitGroup{}
for {
select {
case <-ctx.Done():
wg1.Wait()
return
default:
}
// Wait for a connection.
conn, err := listener.Accept()
if err != nil {
log.Println(err)
continue
select {
case <-ctx.Done():
return
default:
log.Println("Accept error:", err)
continue
}
}
wg1.Add(1)
go func(conn net.Conn) {
defer wg1.Done()
defer conn.Close() // graceful shutdown
var err error = nil
log.Printf("Connection from %s\n", conn.RemoteAddr())
// `conn` is of type `net.Conn` but may be casted to `dtls.Conn`
// using `dtlsConn := conn.(*dtls.Conn)` in order to to expose
// functions like `ConnectionState` etc.
// Perform the handshake with a 30-second timeout
ctx1, cancel1 := context.WithTimeout(ctx, 30*time.Second)
defer conn.Close()
dtlsConn, ok := conn.(*dtls.Conn)
if !ok {
log.Println("Type error")
cancel1()
return
}
log.Println("Start handshake")
if err = dtlsConn.HandshakeContext(ctx1); err != nil {
log.Println(err)
cancel1()
handshakeCtx, hCancel := context.WithTimeout(ctx, 30*time.Second)
defer hCancel()
if err := dtlsConn.HandshakeContext(handshakeCtx); err != nil {
log.Println("Handshake failed:", err)
return
}
// Phase 1: Read Session ID (16 bytes)
idBuf := make([]byte, 16)
conn.SetReadDeadline(time.Now().Add(time.Second * 5))
_, err := io.ReadFull(conn, idBuf)
if err != nil {
log.Println("Failed to read session ID:", err)
return
}
cancel1()
log.Println("Handshake done")
sessionID := fmt.Sprintf("%x", idBuf)
serverConn, err := net.Dial("udp", *connect)
session, err := manager.GetOrCreate(ctx, sessionID, *connect)
if err != nil {
log.Println(err)
log.Println("Failed to get/create session:", err)
return
}
defer func() {
if err = serverConn.Close(); err != nil {
log.Printf("failed to close outgoing connection: %s", err)
session.AddConn(conn)
defer session.RemoveConn(conn)
log.Printf("New stream for session %s from %s", sessionID, conn.RemoteAddr())
// Upstream Loop: DTLS -> Backend
buf := make([]byte, 1600)
for {
conn.SetReadDeadline(time.Now().Add(time.Minute * 10))
n, err := conn.Read(buf)
if err != nil {
log.Printf("Stream %s closed: %v", sessionID, err)
return
}
}()
var wg sync.WaitGroup
wg.Add(2)
ctx2, cancel2 := context.WithCancel(ctx)
context.AfterFunc(ctx2, func() {
conn.SetDeadline(time.Now())
serverConn.SetDeadline(time.Now())
})
go func() {
defer wg.Done()
defer cancel2()
buf := make([]byte, 1600)
for {
select {
case <-ctx2.Done():
return
default:
}
conn.SetReadDeadline(time.Now().Add(time.Minute * 30))
n, err1 := conn.Read(buf)
if err1 != nil {
log.Printf("Failed: %s", err1)
return
}
serverConn.SetWriteDeadline(time.Now().Add(time.Minute * 30))
_, err1 = serverConn.Write(buf[:n])
if err1 != nil {
log.Printf("Failed: %s", err1)
return
}
}
}()
go func() {
defer wg.Done()
defer cancel2()
buf := make([]byte, 1600)
for {
select {
case <-ctx2.Done():
return
default:
}
serverConn.SetReadDeadline(time.Now().Add(time.Minute * 30))
n, err1 := serverConn.Read(buf)
if err1 != nil {
log.Printf("Failed: %s", err1)
return
}
conn.SetWriteDeadline(time.Now().Add(time.Minute * 30))
_, err1 = conn.Write(buf[:n])
if err1 != nil {
log.Printf("Failed: %s", err1)
return
}
session.BackendConn.SetWriteDeadline(time.Now().Add(time.Second * 5))
_, err = session.BackendConn.Write(buf[:n])
if err != nil {
log.Printf("Session %s backend write error: %v", sessionID, err)
return
}
}()
wg.Wait()
log.Printf("Connection closed: %s\n", conn.RemoteAddr())
}
}(conn)
}
}

Loading…
Cancel
Save