broadcast/cmd/broadcast/ws_handler.go

145 lines
2.9 KiB
Go

package main
import (
"bytes"
"encoding/json"
"io"
"net/http"
"time"
"github.com/gorilla/websocket"
"git.abbiamoundominio.org/blallo/broadcast"
"git.sr.ht/~blallo/logz/interface"
)
const (
livenessInterval = 5 * time.Second
livenessTimeout = 4 * time.Second
statusStarted = "STARTED"
statusStopped = "STOPPED"
statusUnknown = "UNKNOWN"
)
var upgrader = websocket.Upgrader{}
type livenessHandler struct {
radio *broadcast.Radio
logger logz.Logger
}
func (h *livenessHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.logger.Info(map[string]any{
"msg": "Starting Liveness handler",
"context": "ws",
})
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
h.logger.Err(map[string]any{
"msg": "Failed to upgrade",
"context": "ws",
"err": err.Error(),
})
return
}
defer conn.Close()
ticker := time.NewTicker(livenessInterval)
for {
<-ticker.C
timer := time.NewTimer(livenessTimeout)
select {
case <-r.Context().Done():
h.logger.Warn(map[string]any{
"msg": "Liveness context canceled",
"context": "ws",
"err": r.Context().Err(),
})
return
case <-ticker.C:
h.logger.Warn(map[string]any{
"msg": "Liveness request timed out",
"context": "ws",
})
h.writeMessage(conn, statusUnknown)
case err := <-h.radio.IsAlive():
switch err {
case nil:
h.logger.Debug(map[string]any{
"msg": "Still alive",
"context": "ws",
"status": statusStarted,
})
h.writeMessage(conn, statusStarted)
case broadcast.ErrNotRunning:
h.logger.Debug(map[string]any{
"msg": "Still alive",
"context": "ws",
"status": statusStopped,
})
h.writeMessage(conn, statusStopped)
default:
h.logger.Debug(map[string]any{
"msg": "Other error",
"context": "ws",
"err": err.(error).Error(),
"status": statusUnknown,
})
h.writeMessage(conn, statusUnknown)
}
}
timer.Stop()
}
}
func (h *livenessHandler) writeMessage(conn *websocket.Conn, status string) {
msg, err := json.Marshal(map[string]any{
"status": status,
})
if err != nil {
h.logger.Err(map[string]any{
"msg": "Failed to marshal",
"context": "ws",
"status": status,
"err": err.Error(),
})
return
}
w, err := conn.NextWriter(websocket.TextMessage)
if err != nil {
h.logger.Err(map[string]any{
"msg": "Failed to get next writer",
"context": "ws",
"status": status,
"err": err.Error(),
})
return
}
if _, err := io.Copy(w, bytes.NewReader(msg)); err != nil {
h.logger.Err(map[string]any{
"msg": "Failed to write message",
"context": "ws",
"status": status,
"err": err.Error(),
})
return
}
if err := w.Close(); err != nil {
h.logger.Err(map[string]any{
"msg": "Failed to close writer",
"context": "ws",
"status": status,
"err": err.Error(),
})
return
}
}