package main import ( "encoding/json" "log" "net/http" "sync" "time" "github.com/gorilla/websocket" ) // --- Message types --- // // Server -> client: // {"type": "user:connected", "username": "nak"} // {"type": "user:disconnected", "username": "nak"} // {"type": "ping"} // // Client -> server: // {"type": "pong"} // {"type": "recall", "username": "nak"} (domain script relays to all clients) // // Any connected client can send a message; the hub broadcasts it to all others. // This keeps the server simple and lets the domain script act as a relay for // Overte-side events without needing separate channels for each use case. type Message struct { Type string `json:"type"` Username string `json:"username,omitempty"` Payload string `json:"payload,omitempty"` } // --- Hub --- type Hub struct { mu sync.RWMutex clients map[*Client]bool } func newHub() *Hub { return &Hub{clients: make(map[*Client]bool)} } func (h *Hub) register(c *Client) { h.mu.Lock() h.clients[c] = true h.mu.Unlock() log.Printf("[WS] client connected (total: %d)", h.count()) } func (h *Hub) unregister(c *Client) { h.mu.Lock() delete(h.clients, c) h.mu.Unlock() log.Printf("[WS] client disconnected (total: %d)", h.count()) } func (h *Hub) count() int { h.mu.RLock() defer h.mu.RUnlock() return len(h.clients) } // Broadcast sends a message to all connected clients except the sender. // Pass nil sender to broadcast to everyone. func (h *Hub) Broadcast(msg Message, sender *Client) { data, err := json.Marshal(msg) if err != nil { return } h.mu.RLock() defer h.mu.RUnlock() for c := range h.clients { if c == sender { continue } select { case c.send <- data: default: // slow client — drop the message rather than block log.Printf("[WS] dropped message to slow client") } } } // --- Client --- type Client struct { hub *Hub conn *websocket.Conn send chan []byte } const ( writeWait = 10 * time.Second pongWait = 60 * time.Second pingPeriod = 45 * time.Second maxMessageSize = 4096 ) func (c *Client) readPump() { defer func() { c.hub.unregister(c) c.conn.Close() }() c.conn.SetReadLimit(maxMessageSize) c.conn.SetReadDeadline(time.Now().Add(pongWait)) c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)) return nil }) for { _, data, err := c.conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { log.Printf("[WS] read error: %v", err) } break } var msg Message if err := json.Unmarshal(data, &msg); err != nil { log.Printf("[WS] bad message: %s", data) continue } // Relay to all other clients — domain script and entity scripts // all share the same hub, so a recall from the tablet entity script // arrives at the domain script automatically. if msg.Type != "pong" { c.hub.Broadcast(msg, c) } } } func (c *Client) writePump() { ticker := time.NewTicker(pingPeriod) defer func() { ticker.Stop() c.conn.Close() }() for { select { case data, ok := <-c.send: c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if !ok { c.conn.WriteMessage(websocket.CloseMessage, []byte{}) return } if err := c.conn.WriteMessage(websocket.TextMessage, data); err != nil { return } case <-ticker.C: c.conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { return } } } } // --- Upgrader & handler --- var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, // Allow all origins — Overte scripts connect from localhost-ish contexts CheckOrigin: func(r *http.Request) bool { return true }, } var hub = newHub() func handleWS(w http.ResponseWriter, r *http.Request) { log.Printf("[WS] new connection from %s (total after: %d)", r.RemoteAddr, hub.count()+1) conn, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Println("[WS] upgrade error:", err) return } client := &Client{ hub: hub, conn: conn, send: make(chan []byte, 64), } hub.register(client) go client.writePump() client.readPump() // blocks until disconnect } // --- Presence loop --- // // Replaces the domain script's polling. The server watches nodes.json, // detects connects/disconnects, pushes events to all WS clients, and // continues to tick balances. Single source of truth for who is online. func presenceLoop() { ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() prev := map[string]bool{} for range ticker.C { users, err := getConnectedUsers() if err != nil { log.Println("[presence] error fetching nodes:", err) continue } curr := map[string]bool{} for _, u := range users { curr[u] = true } // Detect joins for u := range curr { if !prev[u] { log.Printf("[presence] connected: %s", u) hub.Broadcast(Message{Type: "user:connected", Username: u}, nil) } } // Detect leaves for u := range prev { if !curr[u] { log.Printf("[presence] disconnected: %s", u) hub.Broadcast(Message{Type: "user:disconnected", Username: u}, nil) } } // Tick balances for connected users for u := range curr { if _, err := rdb.HIncrByFloat(ctx, "balances", u, 1.0).Result(); err != nil { log.Printf("[presence] tick error for %s: %v", u, err) } } prev = curr } }