diff --git a/cmd/broadcast/main.go b/cmd/broadcast/main.go index a7df85d..3862af1 100644 --- a/cmd/broadcast/main.go +++ b/cmd/broadcast/main.go @@ -3,9 +3,11 @@ package main import ( "context" "flag" + "fmt" "net/http" "os" "os/signal" + "strings" "syscall" "time" @@ -172,10 +174,35 @@ func setupHandler(radio *broadcast.Radio, logger logz.Logger, addr string) error logger: logger, } + wsHandler := &livenessHandler{ + radio: radio, + logger: logger, + } + + testUI := &testUIHandler{ + baseAddr: getBaseAddr(addr), + logger: logger, + } + router := chi.NewRouter() router.Post("/start", handler.Start) router.Post("/stop", handler.Stop) router.Get("/status", handler.Status) + router.Handle("/liveness", wsHandler) + router.Get("/test_ui", testUI.TestUI) return http.ListenAndServe(addr, router) } + +func getBaseAddr(addr string) string { + parts := strings.Split(addr, ":") + if parts[0] == "" { + parts[0] = "localhost" + } + + if len(parts) == 2 { + return fmt.Sprintf("%s:%s", parts[0], parts[1]) + } + + return parts[0] +} diff --git a/cmd/broadcast/test_ui.go b/cmd/broadcast/test_ui.go new file mode 100644 index 0000000..63a3a16 --- /dev/null +++ b/cmd/broadcast/test_ui.go @@ -0,0 +1,52 @@ +package main + +import ( + "html/template" + "net/http" + + "git.sr.ht/~blallo/logz/interface" +) + +var uiTmpl = ` + + + + WebSocket test + + + + Open the console... + + + ` +var tmpl *template.Template + +func init() { + tmpl = template.Must(template.New("testUI").Parse(uiTmpl)) +} + +type testUIHandler struct { + baseAddr string + logger logz.Logger +} + +func (h *testUIHandler) TestUI(w http.ResponseWriter, r *http.Request) { + h.logger.Debug(map[string]any{ + "msg": "Serving test ui", + "context": "http", + }) + + if err := tmpl.ExecuteTemplate(w, "testUI", map[string]string{"BaseAddr": h.baseAddr}); err != nil { + h.logger.Err(map[string]any{ + "msg": "Failed to render testUI template", + "context": "http", + "err": err.Error(), + }) + } +} diff --git a/cmd/broadcast/ws_handler.go b/cmd/broadcast/ws_handler.go new file mode 100644 index 0000000..c8a3633 --- /dev/null +++ b/cmd/broadcast/ws_handler.go @@ -0,0 +1,137 @@ +package main + +import ( + "bytes" + "encoding/json" + "io" + "net/http" + "time" + + "github.com/gorilla/websocket" + + "git.abbiamoundominio.org/blallo/broadcast" + "git.sr.ht/~blallo/logz/interface" +) + +const ( + livenessInterval = 5 * time.Second + livenessTimeout = 4 * time.Second + + statusStarted = "STARTED" + statusStopped = "STOPPED" + statusUnknown = "UNKNOWN" +) + +var upgrader = websocket.Upgrader{} + +type livenessHandler struct { + radio *broadcast.Radio + logger logz.Logger +} + +func (h *livenessHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + h.logger.Info(map[string]any{ + "msg": "Starting Liveness handler", + "context": "ws", + }) + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + h.logger.Err(map[string]any{ + "msg": "Failed to upgrade", + "context": "ws", + "err": err.Error(), + }) + return + } + defer conn.Close() + + ticker := time.NewTicker(livenessInterval) + for { + <-ticker.C + timer := time.NewTimer(livenessTimeout) + select { + case <-timer.C: + h.logger.Warn(map[string]any{ + "msg": "Liveness request timed out", + "context": "ws", + }) + + h.writeMessage(conn, statusUnknown) + case err := <-h.radio.IsAlive(): + switch err { + case nil: + h.logger.Debug(map[string]any{ + "msg": "Still alive", + "context": "ws", + "status": statusStarted, + }) + + h.writeMessage(conn, statusStarted) + case broadcast.ErrNotRunning: + h.logger.Debug(map[string]any{ + "msg": "Still alive", + "context": "ws", + "status": statusStopped, + }) + + h.writeMessage(conn, statusStopped) + default: + h.logger.Debug(map[string]any{ + "msg": "Other error", + "context": "ws", + "err": err.(error).Error(), + "status": statusUnknown, + }) + + h.writeMessage(conn, statusUnknown) + } + } + timer.Stop() + } +} + +func (h *livenessHandler) writeMessage(conn *websocket.Conn, status string) { + msg, err := json.Marshal(map[string]any{ + "status": status, + }) + if err != nil { + h.logger.Err(map[string]any{ + "msg": "Failed to marshal", + "context": "ws", + "status": status, + "err": err.Error(), + }) + return + } + + w, err := conn.NextWriter(websocket.TextMessage) + if err != nil { + h.logger.Err(map[string]any{ + "msg": "Failed to get next writer", + "context": "ws", + "status": status, + "err": err.Error(), + }) + return + } + + if _, err := io.Copy(w, bytes.NewReader(msg)); err != nil { + h.logger.Err(map[string]any{ + "msg": "Failed to write message", + "context": "ws", + "status": status, + "err": err.Error(), + }) + return + } + + if err := w.Close(); err != nil { + h.logger.Err(map[string]any{ + "msg": "Failed to close writer", + "context": "ws", + "status": status, + "err": err.Error(), + }) + return + } +} diff --git a/go.mod b/go.mod index f84a91c..c31ad8b 100644 --- a/go.mod +++ b/go.mod @@ -13,9 +13,11 @@ require ( git.sr.ht/~blallo/logz/zlog v0.0.0-20220324191132-95d94ae8e337 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-chi/chi/v5 v5.0.8 // indirect + github.com/gorilla/websocket v1.5.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rs/zerolog v1.26.1 // indirect github.com/stretchr/objx v0.5.0 // indirect github.com/stretchr/testify v1.8.1 // indirect + golang.org/x/net v0.6.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 1f4cde7..1f76f89 100644 --- a/go.sum +++ b/go.sum @@ -15,6 +15,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/go-chi/chi/v5 v5.0.8 h1:lD+NLqFcAi1ovnVZpsnObHGW4xb4J8lNmoYVfECH1Y0= github.com/go-chi/chi/v5 v5.0.8/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -38,6 +40,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.6.0 h1:L4ZwwTvKW9gr0ZMS1yrHD9GZhIuVjOBBnaKH+SPQK0Q= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/runner.go b/radio.go similarity index 88% rename from runner.go rename to radio.go index 5ab2308..4e52274 100644 --- a/runner.go +++ b/radio.go @@ -4,6 +4,7 @@ import ( "context" "errors" "os" + "syscall" "git.sr.ht/~blallo/logz/interface" ) @@ -28,6 +29,7 @@ const ( CommandStart = 1 << iota CommandStop CommandStatus + CommandLiveness ) type Radio struct { @@ -90,6 +92,11 @@ func (r *Radio) Run(ctx context.Context) error { "msg": "Received Status command", }) cmd.Resp <- r.printLogs() + case CommandLiveness: + r.logger.Info(map[string]any{ + "msg": "Received Liveness command", + }) + cmd.Resp <- r.liveness() } } } @@ -125,6 +132,16 @@ func (r *Radio) Status() <-chan any { return resp } +func (r *Radio) IsAlive() <-chan any { + resp := make(chan any) + r.commands <- Command{ + CommandType: CommandLiveness, + Resp: resp, + } + + return resp +} + func (r *Radio) start() error { if r.proc != nil { return ErrAlreadyRunning @@ -197,3 +214,11 @@ func (r *Radio) stop() error { func (r *Radio) printLogs() []string { return r.lines.Lines() } + +func (r *Radio) liveness() error { + if r.proc == nil { + return ErrNotRunning + } + + return r.proc.Signal(syscall.Signal(0)) +}