279 lines
6.3 KiB
Go
279 lines
6.3 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"log"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
)
|
|
|
|
// --- Message types ---
|
|
//
|
|
// Server -> client:
|
|
// {"type": "user:connected", "username": "nak"}
|
|
// {"type": "user:disconnected", "username": "nak"}
|
|
// {"type": "ping"}
|
|
//
|
|
// Client -> server:
|
|
// {"type": "pong"}
|
|
// {"type": "recall", "username": "nak"} (domain script relays to all clients)
|
|
//
|
|
// Any connected client can send a message; the hub broadcasts it to all others.
|
|
// This keeps the server simple and lets the domain script act as a relay for
|
|
// Overte-side events without needing separate channels for each use case.
|
|
|
|
type Message struct {
|
|
Type string `json:"type"`
|
|
Username string `json:"username,omitempty"`
|
|
}
|
|
|
|
// --- Hub ---
|
|
|
|
type Hub struct {
|
|
mu sync.RWMutex
|
|
clients map[*Client]bool
|
|
}
|
|
|
|
func newHub() *Hub {
|
|
return &Hub{clients: make(map[*Client]bool)}
|
|
}
|
|
|
|
func (h *Hub) register(c *Client) {
|
|
h.mu.Lock()
|
|
h.clients[c] = true
|
|
h.mu.Unlock()
|
|
log.Printf("[WS] client connected (total: %d)", h.count())
|
|
}
|
|
|
|
func (h *Hub) unregister(c *Client) {
|
|
h.mu.Lock()
|
|
delete(h.clients, c)
|
|
h.mu.Unlock()
|
|
log.Printf("[WS] client disconnected (total: %d)", h.count())
|
|
}
|
|
|
|
func (h *Hub) count() int {
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
return len(h.clients)
|
|
}
|
|
|
|
// SendToUser sends a message to the first client identified as username.
|
|
func (h *Hub) SendToUser(username string, msg interface{}) {
|
|
data, err := json.Marshal(msg)
|
|
if err != nil {
|
|
return
|
|
}
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
for c := range h.clients {
|
|
if c.username == username {
|
|
select {
|
|
case c.send <- data:
|
|
default:
|
|
log.Printf("[WS] dropped private message to %s", username)
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *Hub) Broadcast(msg interface{}, sender *Client) {
|
|
data, err := json.Marshal(msg)
|
|
if err != nil {
|
|
return
|
|
}
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
for c := range h.clients {
|
|
if c == sender {
|
|
continue
|
|
}
|
|
select {
|
|
case c.send <- data:
|
|
default:
|
|
// slow client — drop the message rather than block
|
|
log.Printf("[WS] dropped message to slow client")
|
|
}
|
|
}
|
|
}
|
|
|
|
// --- Client ---
|
|
|
|
type Client struct {
|
|
hub *Hub
|
|
conn *websocket.Conn
|
|
send chan []byte
|
|
username string // set when client identifies itself via a session token
|
|
}
|
|
|
|
const (
|
|
writeWait = 10 * time.Second
|
|
pongWait = 60 * time.Second
|
|
pingPeriod = 45 * time.Second
|
|
maxMessageSize = 4096
|
|
)
|
|
|
|
func (c *Client) readPump() {
|
|
defer func() {
|
|
c.hub.unregister(c)
|
|
c.conn.Close()
|
|
}()
|
|
c.conn.SetReadLimit(maxMessageSize)
|
|
c.conn.SetReadDeadline(time.Now().Add(pongWait))
|
|
c.conn.SetPongHandler(func(string) error {
|
|
c.conn.SetReadDeadline(time.Now().Add(pongWait))
|
|
return nil
|
|
})
|
|
for {
|
|
_, data, err := c.conn.ReadMessage()
|
|
if err != nil {
|
|
if websocket.IsUnexpectedCloseError(err,
|
|
websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
|
|
log.Printf("[WS] read error: %v", err)
|
|
}
|
|
break
|
|
}
|
|
var msg Message
|
|
if err := json.Unmarshal(data, &msg); err != nil {
|
|
log.Printf("[WS] bad message: %s", data)
|
|
continue
|
|
}
|
|
|
|
switch {
|
|
case msg.Type == "pong":
|
|
// keepalive, no relay
|
|
|
|
case msg.Type == "identify" && msg.Username != "":
|
|
// Clients send {"type":"identify","username":"nak"} after connecting
|
|
// so the hub knows which WS connection belongs to which player.
|
|
// The domain script doesn't need to identify.
|
|
c.username = msg.Username
|
|
log.Printf("[WS] client identified as %s", c.username)
|
|
|
|
case strings.HasPrefix(msg.Type, "poker:") || msg.Type == "admin:position":
|
|
// Route poker messages to the manager with the identified username
|
|
username := c.username
|
|
if username == "" {
|
|
username = msg.Username
|
|
}
|
|
if pokerManager != nil && username != "" {
|
|
var raw map[string]interface{}
|
|
json.Unmarshal(data, &raw)
|
|
pokerManager.HandleWSMessage(username, raw)
|
|
}
|
|
|
|
default:
|
|
// Broadcast everything else (recall, presence events, etc.)
|
|
c.hub.Broadcast(msg, c)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Client) writePump() {
|
|
ticker := time.NewTicker(pingPeriod)
|
|
defer func() {
|
|
ticker.Stop()
|
|
c.conn.Close()
|
|
}()
|
|
for {
|
|
select {
|
|
case data, ok := <-c.send:
|
|
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
|
if !ok {
|
|
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
|
|
return
|
|
}
|
|
if err := c.conn.WriteMessage(websocket.TextMessage, data); err != nil {
|
|
return
|
|
}
|
|
case <-ticker.C:
|
|
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
|
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// --- Upgrader & handler ---
|
|
|
|
var upgrader = websocket.Upgrader{
|
|
ReadBufferSize: 1024,
|
|
WriteBufferSize: 1024,
|
|
// Allow all origins — Overte scripts connect from localhost-ish contexts
|
|
CheckOrigin: func(r *http.Request) bool { return true },
|
|
}
|
|
|
|
var hub = newHub()
|
|
|
|
func handleWS(w http.ResponseWriter, r *http.Request) {
|
|
conn, err := upgrader.Upgrade(w, r, nil)
|
|
if err != nil {
|
|
log.Println("[WS] upgrade error:", err)
|
|
return
|
|
}
|
|
client := &Client{
|
|
hub: hub,
|
|
conn: conn,
|
|
send: make(chan []byte, 64),
|
|
}
|
|
hub.register(client)
|
|
go client.writePump()
|
|
client.readPump() // blocks until disconnect
|
|
}
|
|
|
|
// --- Presence loop ---
|
|
//
|
|
// Replaces the domain script's polling. The server watches nodes.json,
|
|
// detects connects/disconnects, pushes events to all WS clients, and
|
|
// continues to tick balances. Single source of truth for who is online.
|
|
|
|
func presenceLoop() {
|
|
ticker := time.NewTicker(2 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
prev := map[string]bool{}
|
|
|
|
for range ticker.C {
|
|
users, err := getConnectedUsers()
|
|
if err != nil {
|
|
log.Println("[presence] error fetching nodes:", err)
|
|
continue
|
|
}
|
|
|
|
curr := map[string]bool{}
|
|
for _, u := range users {
|
|
curr[u] = true
|
|
}
|
|
|
|
// Detect joins
|
|
for u := range curr {
|
|
if !prev[u] {
|
|
log.Printf("[presence] connected: %s", u)
|
|
hub.Broadcast(map[string]string{"type": "user:connected", "username": u}, nil)
|
|
}
|
|
}
|
|
|
|
// Detect leaves
|
|
for u := range prev {
|
|
if !curr[u] {
|
|
log.Printf("[presence] disconnected: %s", u)
|
|
hub.Broadcast(map[string]string{"type": "user:disconnected", "username": u}, nil)
|
|
}
|
|
}
|
|
|
|
// Tick balances for connected users
|
|
for u := range curr {
|
|
if _, err := rdb.HIncrByFloat(ctx, "balances", u, 1.0).Result(); err != nil {
|
|
log.Printf("[presence] tick error for %s: %v", u, err)
|
|
}
|
|
}
|
|
|
|
prev = curr
|
|
}
|
|
}
|