overte-api/hub.go
2026-03-17 18:29:04 -07:00

279 lines
6.3 KiB
Go

package main
import (
"encoding/json"
"log"
"net/http"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
)
// --- Message types ---
//
// Server -> client:
// {"type": "user:connected", "username": "nak"}
// {"type": "user:disconnected", "username": "nak"}
// {"type": "ping"}
//
// Client -> server:
// {"type": "pong"}
// {"type": "recall", "username": "nak"} (domain script relays to all clients)
//
// Any connected client can send a message; the hub broadcasts it to all others.
// This keeps the server simple and lets the domain script act as a relay for
// Overte-side events without needing separate channels for each use case.
type Message struct {
Type string `json:"type"`
Username string `json:"username,omitempty"`
}
// --- Hub ---
type Hub struct {
mu sync.RWMutex
clients map[*Client]bool
}
func newHub() *Hub {
return &Hub{clients: make(map[*Client]bool)}
}
func (h *Hub) register(c *Client) {
h.mu.Lock()
h.clients[c] = true
h.mu.Unlock()
log.Printf("[WS] client connected (total: %d)", h.count())
}
func (h *Hub) unregister(c *Client) {
h.mu.Lock()
delete(h.clients, c)
h.mu.Unlock()
log.Printf("[WS] client disconnected (total: %d)", h.count())
}
func (h *Hub) count() int {
h.mu.RLock()
defer h.mu.RUnlock()
return len(h.clients)
}
// SendToUser sends a message to the first client identified as username.
func (h *Hub) SendToUser(username string, msg interface{}) {
data, err := json.Marshal(msg)
if err != nil {
return
}
h.mu.RLock()
defer h.mu.RUnlock()
for c := range h.clients {
if c.username == username {
select {
case c.send <- data:
default:
log.Printf("[WS] dropped private message to %s", username)
}
return
}
}
}
func (h *Hub) Broadcast(msg interface{}, sender *Client) {
data, err := json.Marshal(msg)
if err != nil {
return
}
h.mu.RLock()
defer h.mu.RUnlock()
for c := range h.clients {
if c == sender {
continue
}
select {
case c.send <- data:
default:
// slow client — drop the message rather than block
log.Printf("[WS] dropped message to slow client")
}
}
}
// --- Client ---
type Client struct {
hub *Hub
conn *websocket.Conn
send chan []byte
username string // set when client identifies itself via a session token
}
const (
writeWait = 10 * time.Second
pongWait = 60 * time.Second
pingPeriod = 45 * time.Second
maxMessageSize = 4096
)
func (c *Client) readPump() {
defer func() {
c.hub.unregister(c)
c.conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
_, data, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err,
websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("[WS] read error: %v", err)
}
break
}
var msg Message
if err := json.Unmarshal(data, &msg); err != nil {
log.Printf("[WS] bad message: %s", data)
continue
}
switch {
case msg.Type == "pong":
// keepalive, no relay
case msg.Type == "identify" && msg.Username != "":
// Clients send {"type":"identify","username":"nak"} after connecting
// so the hub knows which WS connection belongs to which player.
// The domain script doesn't need to identify.
c.username = msg.Username
log.Printf("[WS] client identified as %s", c.username)
case strings.HasPrefix(msg.Type, "poker:") || msg.Type == "admin:position":
// Route poker messages to the manager with the identified username
username := c.username
if username == "" {
username = msg.Username
}
if pokerManager != nil && username != "" {
var raw map[string]interface{}
json.Unmarshal(data, &raw)
pokerManager.HandleWSMessage(username, raw)
}
default:
// Broadcast everything else (recall, presence events, etc.)
c.hub.Broadcast(msg, c)
}
}
}
func (c *Client) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case data, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
if err := c.conn.WriteMessage(websocket.TextMessage, data); err != nil {
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
// --- Upgrader & handler ---
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
// Allow all origins — Overte scripts connect from localhost-ish contexts
CheckOrigin: func(r *http.Request) bool { return true },
}
var hub = newHub()
func handleWS(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println("[WS] upgrade error:", err)
return
}
client := &Client{
hub: hub,
conn: conn,
send: make(chan []byte, 64),
}
hub.register(client)
go client.writePump()
client.readPump() // blocks until disconnect
}
// --- Presence loop ---
//
// Replaces the domain script's polling. The server watches nodes.json,
// detects connects/disconnects, pushes events to all WS clients, and
// continues to tick balances. Single source of truth for who is online.
func presenceLoop() {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
prev := map[string]bool{}
for range ticker.C {
users, err := getConnectedUsers()
if err != nil {
log.Println("[presence] error fetching nodes:", err)
continue
}
curr := map[string]bool{}
for _, u := range users {
curr[u] = true
}
// Detect joins
for u := range curr {
if !prev[u] {
log.Printf("[presence] connected: %s", u)
hub.Broadcast(map[string]string{"type": "user:connected", "username": u}, nil)
}
}
// Detect leaves
for u := range prev {
if !curr[u] {
log.Printf("[presence] disconnected: %s", u)
hub.Broadcast(map[string]string{"type": "user:disconnected", "username": u}, nil)
}
}
// Tick balances for connected users
for u := range curr {
if _, err := rdb.HIncrByFloat(ctx, "balances", u, 1.0).Result(); err != nil {
log.Printf("[presence] tick error for %s: %v", u, err)
}
}
prev = curr
}
}