init
This commit is contained in:
285
server/internal/ws/hub.go
Normal file
285
server/internal/ws/hub.go
Normal file
@@ -0,0 +1,285 @@
|
||||
package ws
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"synctv/server/internal/room"
|
||||
)
|
||||
|
||||
type RoomService interface {
|
||||
Join(ctx context.Context, code, deviceID string, onlineCount int) (room.Room, bool, error)
|
||||
SetOnlineCount(ctx context.Context, code string, count int) error
|
||||
SetSource(ctx context.Context, code, deviceID string, src room.Source) (room.Room, error)
|
||||
UpdatePlayback(ctx context.Context, code string, p room.Playback) (room.Room, error)
|
||||
}
|
||||
|
||||
type Event struct {
|
||||
Type string `json:"type"`
|
||||
Payload json.RawMessage `json:"payload,omitempty"`
|
||||
}
|
||||
|
||||
type Hub struct {
|
||||
service RoomService
|
||||
upgrader websocket.Upgrader
|
||||
pingInterval time.Duration
|
||||
pongTimeout time.Duration
|
||||
writeTimeout time.Duration
|
||||
|
||||
mu sync.RWMutex
|
||||
rooms map[string]map[*Client]struct{}
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
hub *Hub
|
||||
conn *websocket.Conn
|
||||
roomCode string
|
||||
deviceID string
|
||||
send chan any
|
||||
}
|
||||
|
||||
func NewHub(service RoomService, allowedOrigins []string, pingInterval, pongTimeout, writeTimeout time.Duration) *Hub {
|
||||
originAllowed := map[string]struct{}{}
|
||||
allowAll := false
|
||||
for _, origin := range allowedOrigins {
|
||||
if origin == "*" {
|
||||
allowAll = true
|
||||
break
|
||||
}
|
||||
originAllowed[origin] = struct{}{}
|
||||
}
|
||||
|
||||
return &Hub{
|
||||
service: service,
|
||||
upgrader: websocket.Upgrader{
|
||||
CheckOrigin: func(r *http.Request) bool {
|
||||
if allowAll {
|
||||
return true
|
||||
}
|
||||
_, ok := originAllowed[r.Header.Get("Origin")]
|
||||
return ok
|
||||
},
|
||||
},
|
||||
pingInterval: pingInterval,
|
||||
pongTimeout: pongTimeout,
|
||||
writeTimeout: writeTimeout,
|
||||
rooms: make(map[string]map[*Client]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Hub) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
roomCode := r.URL.Query().Get("roomCode")
|
||||
deviceID := r.URL.Query().Get("deviceId")
|
||||
if roomCode == "" || deviceID == "" {
|
||||
http.Error(w, "roomCode and deviceId are required", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
conn, err := h.upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
c := &Client{
|
||||
hub: h,
|
||||
conn: conn,
|
||||
roomCode: roomCode,
|
||||
deviceID: deviceID,
|
||||
send: make(chan any, 32),
|
||||
}
|
||||
|
||||
h.register(c)
|
||||
count := h.count(roomCode)
|
||||
rm, isOwner, err := h.service.Join(r.Context(), roomCode, deviceID, count)
|
||||
if err != nil {
|
||||
c.writeJSON(map[string]any{"type": "error", "message": err.Error()})
|
||||
h.unregister(c)
|
||||
_ = conn.Close()
|
||||
return
|
||||
}
|
||||
c.send <- map[string]any{
|
||||
"type": "roomSnapshot",
|
||||
"payload": room.Snapshot{
|
||||
Room: rm.PublicFor(deviceID),
|
||||
IsOwner: isOwner,
|
||||
DeviceID: deviceID,
|
||||
ServerNow: room.NowMS(),
|
||||
},
|
||||
}
|
||||
h.broadcast(roomCode, map[string]any{"type": "presenceChanged", "payload": map[string]any{"onlineCount": count}}, c)
|
||||
|
||||
go c.writePump()
|
||||
go c.readPump()
|
||||
}
|
||||
|
||||
func (h *Hub) register(c *Client) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
if h.rooms[c.roomCode] == nil {
|
||||
h.rooms[c.roomCode] = make(map[*Client]struct{})
|
||||
}
|
||||
h.rooms[c.roomCode][c] = struct{}{}
|
||||
}
|
||||
|
||||
func (h *Hub) unregister(c *Client) {
|
||||
h.mu.Lock()
|
||||
if clients := h.rooms[c.roomCode]; clients != nil {
|
||||
delete(clients, c)
|
||||
if len(clients) == 0 {
|
||||
delete(h.rooms, c.roomCode)
|
||||
}
|
||||
}
|
||||
h.mu.Unlock()
|
||||
close(c.send)
|
||||
|
||||
count := h.count(c.roomCode)
|
||||
if err := h.service.SetOnlineCount(context.Background(), c.roomCode, count); err != nil {
|
||||
log.Printf("update online count: %v", err)
|
||||
}
|
||||
h.broadcast(c.roomCode, map[string]any{"type": "presenceChanged", "payload": map[string]any{"onlineCount": count}}, nil)
|
||||
}
|
||||
|
||||
func (h *Hub) count(roomCode string) int {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
return len(h.rooms[roomCode])
|
||||
}
|
||||
|
||||
func (h *Hub) broadcast(roomCode string, msg any, except *Client) {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
for c := range h.rooms[roomCode] {
|
||||
if c == except {
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case c.send <- msg:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) readPump() {
|
||||
defer func() {
|
||||
c.hub.unregister(c)
|
||||
_ = c.conn.Close()
|
||||
}()
|
||||
c.conn.SetReadLimit(1 << 20)
|
||||
_ = c.conn.SetReadDeadline(time.Now().Add(c.hub.pongTimeout))
|
||||
c.conn.SetPongHandler(func(string) error {
|
||||
return c.conn.SetReadDeadline(time.Now().Add(c.hub.pongTimeout))
|
||||
})
|
||||
|
||||
for {
|
||||
var event Event
|
||||
if err := c.conn.ReadJSON(&event); err != nil {
|
||||
return
|
||||
}
|
||||
c.handle(event)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) writePump() {
|
||||
ticker := time.NewTicker(c.hub.pingInterval)
|
||||
defer func() {
|
||||
ticker.Stop()
|
||||
_ = c.conn.Close()
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case msg, ok := <-c.send:
|
||||
_ = c.conn.SetWriteDeadline(time.Now().Add(c.hub.writeTimeout))
|
||||
if !ok {
|
||||
_ = c.conn.WriteMessage(websocket.CloseMessage, nil)
|
||||
return
|
||||
}
|
||||
if err := c.conn.WriteJSON(msg); err != nil {
|
||||
return
|
||||
}
|
||||
case <-ticker.C:
|
||||
_ = c.conn.SetWriteDeadline(time.Now().Add(c.hub.writeTimeout))
|
||||
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) writeJSON(v any) {
|
||||
_ = c.conn.SetWriteDeadline(time.Now().Add(c.hub.writeTimeout))
|
||||
_ = c.conn.WriteJSON(v)
|
||||
}
|
||||
|
||||
func (c *Client) handle(event Event) {
|
||||
switch event.Type {
|
||||
case "setSource":
|
||||
var src room.Source
|
||||
if err := json.Unmarshal(event.Payload, &src); err != nil {
|
||||
c.sendError(err)
|
||||
return
|
||||
}
|
||||
rm, err := c.hub.service.SetSource(context.Background(), c.roomCode, c.deviceID, src)
|
||||
if err != nil {
|
||||
c.sendError(err)
|
||||
return
|
||||
}
|
||||
c.hub.broadcastSnapshot(c.roomCode, "sourceChanged", rm)
|
||||
case "play", "pause", "seek", "syncProgress", "syncToLive":
|
||||
var p room.Playback
|
||||
if err := json.Unmarshal(event.Payload, &p); err != nil {
|
||||
c.sendError(err)
|
||||
return
|
||||
}
|
||||
if event.Type == "play" {
|
||||
p.State = room.PlaybackPlaying
|
||||
}
|
||||
if event.Type == "pause" {
|
||||
p.State = room.PlaybackPaused
|
||||
}
|
||||
rm, err := c.hub.service.UpdatePlayback(context.Background(), c.roomCode, p)
|
||||
if err != nil {
|
||||
c.sendError(err)
|
||||
return
|
||||
}
|
||||
c.hub.broadcast(c.roomCode, map[string]any{
|
||||
"type": "playbackChanged",
|
||||
"payload": map[string]any{
|
||||
"playback": rm.Playback,
|
||||
"serverNow": room.NowMS(),
|
||||
},
|
||||
}, nil)
|
||||
case "heartbeat":
|
||||
c.send <- map[string]any{"type": "heartbeatAck", "payload": map[string]any{"serverNow": room.NowMS()}}
|
||||
default:
|
||||
c.sendErrorString("unknown event type: " + event.Type)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) sendError(err error) {
|
||||
c.sendErrorString(err.Error())
|
||||
}
|
||||
|
||||
func (c *Client) sendErrorString(message string) {
|
||||
c.send <- map[string]any{"type": "error", "payload": map[string]any{"message": message}}
|
||||
}
|
||||
|
||||
func (h *Hub) broadcastSnapshot(roomCode, eventType string, rm room.Room) {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
for c := range h.rooms[roomCode] {
|
||||
c.send <- map[string]any{
|
||||
"type": eventType,
|
||||
"payload": room.Snapshot{
|
||||
Room: rm.PublicFor(c.deviceID),
|
||||
IsOwner: rm.OwnerDeviceID == c.deviceID,
|
||||
DeviceID: c.deviceID,
|
||||
ServerNow: room.NowMS(),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user