package ws import ( "context" "encoding/json" "log" "net/http" "sync" "time" "github.com/gorilla/websocket" "synctv/server/internal/room" ) type RoomService interface { Join(ctx context.Context, code, deviceID string, onlineCount int) (room.Room, bool, error) SetOnlineCount(ctx context.Context, code string, count int) error SetSource(ctx context.Context, code, deviceID string, src room.Source) (room.Room, error) UpdatePlayback(ctx context.Context, code string, p room.Playback) (room.Room, error) } type Event struct { Type string `json:"type"` Payload json.RawMessage `json:"payload,omitempty"` } type Hub struct { service RoomService upgrader websocket.Upgrader pingInterval time.Duration pongTimeout time.Duration writeTimeout time.Duration mu sync.RWMutex rooms map[string]map[*Client]struct{} } type Client struct { hub *Hub conn *websocket.Conn roomCode string deviceID string send chan any } func NewHub(service RoomService, allowedOrigins []string, pingInterval, pongTimeout, writeTimeout time.Duration) *Hub { originAllowed := map[string]struct{}{} allowAll := false for _, origin := range allowedOrigins { if origin == "*" { allowAll = true break } originAllowed[origin] = struct{}{} } return &Hub{ service: service, upgrader: websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { if allowAll { return true } _, ok := originAllowed[r.Header.Get("Origin")] return ok }, }, pingInterval: pingInterval, pongTimeout: pongTimeout, writeTimeout: writeTimeout, rooms: make(map[string]map[*Client]struct{}), } } func (h *Hub) ServeHTTP(w http.ResponseWriter, r *http.Request) { roomCode := r.URL.Query().Get("roomCode") deviceID := r.URL.Query().Get("deviceId") if roomCode == "" || deviceID == "" { http.Error(w, "roomCode and deviceId are required", http.StatusBadRequest) return } conn, err := h.upgrader.Upgrade(w, r, nil) if err != nil { return } c := &Client{ hub: h, conn: conn, roomCode: roomCode, deviceID: deviceID, send: make(chan any, 32), } h.register(c) count := h.count(roomCode) rm, isOwner, err := h.service.Join(r.Context(), roomCode, deviceID, count) if err != nil { c.writeJSON(map[string]any{"type": "error", "message": err.Error()}) h.unregister(c) _ = conn.Close() return } c.send <- map[string]any{ "type": "roomSnapshot", "payload": room.Snapshot{ Room: rm.PublicFor(deviceID), IsOwner: isOwner, DeviceID: deviceID, ServerNow: room.NowMS(), }, } h.broadcast(roomCode, map[string]any{"type": "presenceChanged", "payload": map[string]any{"onlineCount": count}}, c) go c.writePump() go c.readPump() } func (h *Hub) register(c *Client) { h.mu.Lock() defer h.mu.Unlock() if h.rooms[c.roomCode] == nil { h.rooms[c.roomCode] = make(map[*Client]struct{}) } h.rooms[c.roomCode][c] = struct{}{} } func (h *Hub) unregister(c *Client) { h.mu.Lock() if clients := h.rooms[c.roomCode]; clients != nil { delete(clients, c) if len(clients) == 0 { delete(h.rooms, c.roomCode) } } h.mu.Unlock() close(c.send) count := h.count(c.roomCode) if err := h.service.SetOnlineCount(context.Background(), c.roomCode, count); err != nil { log.Printf("update online count: %v", err) } h.broadcast(c.roomCode, map[string]any{"type": "presenceChanged", "payload": map[string]any{"onlineCount": count}}, nil) } func (h *Hub) count(roomCode string) int { h.mu.RLock() defer h.mu.RUnlock() return len(h.rooms[roomCode]) } func (h *Hub) broadcast(roomCode string, msg any, except *Client) { h.mu.RLock() defer h.mu.RUnlock() for c := range h.rooms[roomCode] { if c == except { continue } select { case c.send <- msg: default: } } } func (c *Client) readPump() { defer func() { c.hub.unregister(c) _ = c.conn.Close() }() c.conn.SetReadLimit(1 << 20) _ = c.conn.SetReadDeadline(time.Now().Add(c.hub.pongTimeout)) c.conn.SetPongHandler(func(string) error { return c.conn.SetReadDeadline(time.Now().Add(c.hub.pongTimeout)) }) for { var event Event if err := c.conn.ReadJSON(&event); err != nil { return } c.handle(event) } } func (c *Client) writePump() { ticker := time.NewTicker(c.hub.pingInterval) defer func() { ticker.Stop() _ = c.conn.Close() }() for { select { case msg, ok := <-c.send: _ = c.conn.SetWriteDeadline(time.Now().Add(c.hub.writeTimeout)) if !ok { _ = c.conn.WriteMessage(websocket.CloseMessage, nil) return } if err := c.conn.WriteJSON(msg); err != nil { return } case <-ticker.C: _ = c.conn.SetWriteDeadline(time.Now().Add(c.hub.writeTimeout)) if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { return } } } } func (c *Client) writeJSON(v any) { _ = c.conn.SetWriteDeadline(time.Now().Add(c.hub.writeTimeout)) _ = c.conn.WriteJSON(v) } func (c *Client) handle(event Event) { switch event.Type { case "setSource": var src room.Source if err := json.Unmarshal(event.Payload, &src); err != nil { c.sendError(err) return } rm, err := c.hub.service.SetSource(context.Background(), c.roomCode, c.deviceID, src) if err != nil { c.sendError(err) return } c.hub.broadcastSnapshot(c.roomCode, "sourceChanged", rm) case "play", "pause", "seek", "syncProgress", "syncToLive": var p room.Playback if err := json.Unmarshal(event.Payload, &p); err != nil { c.sendError(err) return } if event.Type == "play" { p.State = room.PlaybackPlaying } if event.Type == "pause" { p.State = room.PlaybackPaused } rm, err := c.hub.service.UpdatePlayback(context.Background(), c.roomCode, p) if err != nil { c.sendError(err) return } c.hub.broadcast(c.roomCode, map[string]any{ "type": "playbackChanged", "payload": map[string]any{ "playback": rm.Playback, "serverNow": room.NowMS(), }, }, nil) case "heartbeat": c.send <- map[string]any{"type": "heartbeatAck", "payload": map[string]any{"serverNow": room.NowMS()}} default: c.sendErrorString("unknown event type: " + event.Type) } } func (c *Client) sendError(err error) { c.sendErrorString(err.Error()) } func (c *Client) sendErrorString(message string) { c.send <- map[string]any{"type": "error", "payload": map[string]any{"message": message}} } func (h *Hub) broadcastSnapshot(roomCode, eventType string, rm room.Room) { h.mu.RLock() defer h.mu.RUnlock() for c := range h.rooms[roomCode] { c.send <- map[string]any{ "type": eventType, "payload": room.Snapshot{ Room: rm.PublicFor(c.deviceID), IsOwner: rm.OwnerDeviceID == c.deviceID, DeviceID: c.deviceID, ServerNow: room.NowMS(), }, } } }