Add liveness websocket notification and test ui
This commit is contained in:
parent
60cb7a55c6
commit
a9c604102b
|
@ -3,9 +3,11 @@ package main
|
|||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
|
@ -172,10 +174,35 @@ func setupHandler(radio *broadcast.Radio, logger logz.Logger, addr string) error
|
|||
logger: logger,
|
||||
}
|
||||
|
||||
wsHandler := &livenessHandler{
|
||||
radio: radio,
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
testUI := &testUIHandler{
|
||||
baseAddr: getBaseAddr(addr),
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
router := chi.NewRouter()
|
||||
router.Post("/start", handler.Start)
|
||||
router.Post("/stop", handler.Stop)
|
||||
router.Get("/status", handler.Status)
|
||||
router.Handle("/liveness", wsHandler)
|
||||
router.Get("/test_ui", testUI.TestUI)
|
||||
|
||||
return http.ListenAndServe(addr, router)
|
||||
}
|
||||
|
||||
func getBaseAddr(addr string) string {
|
||||
parts := strings.Split(addr, ":")
|
||||
if parts[0] == "" {
|
||||
parts[0] = "localhost"
|
||||
}
|
||||
|
||||
if len(parts) == 2 {
|
||||
return fmt.Sprintf("%s:%s", parts[0], parts[1])
|
||||
}
|
||||
|
||||
return parts[0]
|
||||
}
|
||||
|
|
52
cmd/broadcast/test_ui.go
Normal file
52
cmd/broadcast/test_ui.go
Normal file
|
@ -0,0 +1,52 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"html/template"
|
||||
"net/http"
|
||||
|
||||
"git.sr.ht/~blallo/logz/interface"
|
||||
)
|
||||
|
||||
var uiTmpl = `
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<title>WebSocket test</title>
|
||||
<script>
|
||||
const socket = new WebSocket("ws://{{ .BaseAddr }}/liveness");
|
||||
|
||||
socket.onmessage = (event) => {
|
||||
console.log(event.data);
|
||||
};
|
||||
</script>
|
||||
</head>
|
||||
<body>
|
||||
Open the console...
|
||||
</body>
|
||||
</html>
|
||||
`
|
||||
var tmpl *template.Template
|
||||
|
||||
func init() {
|
||||
tmpl = template.Must(template.New("testUI").Parse(uiTmpl))
|
||||
}
|
||||
|
||||
type testUIHandler struct {
|
||||
baseAddr string
|
||||
logger logz.Logger
|
||||
}
|
||||
|
||||
func (h *testUIHandler) TestUI(w http.ResponseWriter, r *http.Request) {
|
||||
h.logger.Debug(map[string]any{
|
||||
"msg": "Serving test ui",
|
||||
"context": "http",
|
||||
})
|
||||
|
||||
if err := tmpl.ExecuteTemplate(w, "testUI", map[string]string{"BaseAddr": h.baseAddr}); err != nil {
|
||||
h.logger.Err(map[string]any{
|
||||
"msg": "Failed to render testUI template",
|
||||
"context": "http",
|
||||
"err": err.Error(),
|
||||
})
|
||||
}
|
||||
}
|
137
cmd/broadcast/ws_handler.go
Normal file
137
cmd/broadcast/ws_handler.go
Normal file
|
@ -0,0 +1,137 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
|
||||
"git.abbiamoundominio.org/blallo/broadcast"
|
||||
"git.sr.ht/~blallo/logz/interface"
|
||||
)
|
||||
|
||||
const (
|
||||
livenessInterval = 5 * time.Second
|
||||
livenessTimeout = 4 * time.Second
|
||||
|
||||
statusStarted = "STARTED"
|
||||
statusStopped = "STOPPED"
|
||||
statusUnknown = "UNKNOWN"
|
||||
)
|
||||
|
||||
var upgrader = websocket.Upgrader{}
|
||||
|
||||
type livenessHandler struct {
|
||||
radio *broadcast.Radio
|
||||
logger logz.Logger
|
||||
}
|
||||
|
||||
func (h *livenessHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
h.logger.Info(map[string]any{
|
||||
"msg": "Starting Liveness handler",
|
||||
"context": "ws",
|
||||
})
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
h.logger.Err(map[string]any{
|
||||
"msg": "Failed to upgrade",
|
||||
"context": "ws",
|
||||
"err": err.Error(),
|
||||
})
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
ticker := time.NewTicker(livenessInterval)
|
||||
for {
|
||||
<-ticker.C
|
||||
timer := time.NewTimer(livenessTimeout)
|
||||
select {
|
||||
case <-timer.C:
|
||||
h.logger.Warn(map[string]any{
|
||||
"msg": "Liveness request timed out",
|
||||
"context": "ws",
|
||||
})
|
||||
|
||||
h.writeMessage(conn, statusUnknown)
|
||||
case err := <-h.radio.IsAlive():
|
||||
switch err {
|
||||
case nil:
|
||||
h.logger.Debug(map[string]any{
|
||||
"msg": "Still alive",
|
||||
"context": "ws",
|
||||
"status": statusStarted,
|
||||
})
|
||||
|
||||
h.writeMessage(conn, statusStarted)
|
||||
case broadcast.ErrNotRunning:
|
||||
h.logger.Debug(map[string]any{
|
||||
"msg": "Still alive",
|
||||
"context": "ws",
|
||||
"status": statusStopped,
|
||||
})
|
||||
|
||||
h.writeMessage(conn, statusStopped)
|
||||
default:
|
||||
h.logger.Debug(map[string]any{
|
||||
"msg": "Other error",
|
||||
"context": "ws",
|
||||
"err": err.(error).Error(),
|
||||
"status": statusUnknown,
|
||||
})
|
||||
|
||||
h.writeMessage(conn, statusUnknown)
|
||||
}
|
||||
}
|
||||
timer.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
func (h *livenessHandler) writeMessage(conn *websocket.Conn, status string) {
|
||||
msg, err := json.Marshal(map[string]any{
|
||||
"status": status,
|
||||
})
|
||||
if err != nil {
|
||||
h.logger.Err(map[string]any{
|
||||
"msg": "Failed to marshal",
|
||||
"context": "ws",
|
||||
"status": status,
|
||||
"err": err.Error(),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
w, err := conn.NextWriter(websocket.TextMessage)
|
||||
if err != nil {
|
||||
h.logger.Err(map[string]any{
|
||||
"msg": "Failed to get next writer",
|
||||
"context": "ws",
|
||||
"status": status,
|
||||
"err": err.Error(),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
if _, err := io.Copy(w, bytes.NewReader(msg)); err != nil {
|
||||
h.logger.Err(map[string]any{
|
||||
"msg": "Failed to write message",
|
||||
"context": "ws",
|
||||
"status": status,
|
||||
"err": err.Error(),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
if err := w.Close(); err != nil {
|
||||
h.logger.Err(map[string]any{
|
||||
"msg": "Failed to close writer",
|
||||
"context": "ws",
|
||||
"status": status,
|
||||
"err": err.Error(),
|
||||
})
|
||||
return
|
||||
}
|
||||
}
|
2
go.mod
2
go.mod
|
@ -13,9 +13,11 @@ require (
|
|||
git.sr.ht/~blallo/logz/zlog v0.0.0-20220324191132-95d94ae8e337 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/go-chi/chi/v5 v5.0.8 // indirect
|
||||
github.com/gorilla/websocket v1.5.0 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/rs/zerolog v1.26.1 // indirect
|
||||
github.com/stretchr/objx v0.5.0 // indirect
|
||||
github.com/stretchr/testify v1.8.1 // indirect
|
||||
golang.org/x/net v0.6.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
|
4
go.sum
4
go.sum
|
@ -15,6 +15,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
|
|||
github.com/go-chi/chi/v5 v5.0.8 h1:lD+NLqFcAi1ovnVZpsnObHGW4xb4J8lNmoYVfECH1Y0=
|
||||
github.com/go-chi/chi/v5 v5.0.8/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
|
||||
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
|
||||
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
|
||||
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
|
@ -38,6 +40,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
|
|||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/net v0.6.0 h1:L4ZwwTvKW9gr0ZMS1yrHD9GZhIuVjOBBnaKH+SPQK0Q=
|
||||
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"syscall"
|
||||
|
||||
"git.sr.ht/~blallo/logz/interface"
|
||||
)
|
||||
|
@ -28,6 +29,7 @@ const (
|
|||
CommandStart = 1 << iota
|
||||
CommandStop
|
||||
CommandStatus
|
||||
CommandLiveness
|
||||
)
|
||||
|
||||
type Radio struct {
|
||||
|
@ -90,6 +92,11 @@ func (r *Radio) Run(ctx context.Context) error {
|
|||
"msg": "Received Status command",
|
||||
})
|
||||
cmd.Resp <- r.printLogs()
|
||||
case CommandLiveness:
|
||||
r.logger.Info(map[string]any{
|
||||
"msg": "Received Liveness command",
|
||||
})
|
||||
cmd.Resp <- r.liveness()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -125,6 +132,16 @@ func (r *Radio) Status() <-chan any {
|
|||
return resp
|
||||
}
|
||||
|
||||
func (r *Radio) IsAlive() <-chan any {
|
||||
resp := make(chan any)
|
||||
r.commands <- Command{
|
||||
CommandType: CommandLiveness,
|
||||
Resp: resp,
|
||||
}
|
||||
|
||||
return resp
|
||||
}
|
||||
|
||||
func (r *Radio) start() error {
|
||||
if r.proc != nil {
|
||||
return ErrAlreadyRunning
|
||||
|
@ -197,3 +214,11 @@ func (r *Radio) stop() error {
|
|||
func (r *Radio) printLogs() []string {
|
||||
return r.lines.Lines()
|
||||
}
|
||||
|
||||
func (r *Radio) liveness() error {
|
||||
if r.proc == nil {
|
||||
return ErrNotRunning
|
||||
}
|
||||
|
||||
return r.proc.Signal(syscall.Signal(0))
|
||||
}
|
Loading…
Reference in New Issue
Block a user