package broadcast import ( "context" "errors" "os" "syscall" "git.sr.ht/~blallo/logz/interface" ) const ( bufLines = 100 ) var ( ErrAlreadyRunning = errors.New("The process is already running") ErrNotRunning = errors.New("The process is not running") ) type Command struct { CommandType Resp chan any } type CommandType int const ( CommandStart = 1 << iota CommandStop CommandStatus CommandLiveness ) type Radio struct { Program string CmdLine []string lines *LineRing proc *os.Process errs chan error quit chan struct{} commands chan Command logger logz.Logger } func NewRadio(logger logz.Logger, prog string, cmdLine ...string) (*Radio, error) { lines, err := NewLineRing(bufLines) if err != nil { return nil, err } lines.WithFormatter(timestampFormatter) lines.WithLogger(logger) return &Radio{ Program: prog, CmdLine: cmdLine, lines: lines, commands: make(chan Command, 100), errs: make(chan error), quit: make(chan struct{}), logger: logger, }, nil } func (r *Radio) Run(ctx context.Context) error { go r.lines.Run(ctx) for { select { case <-ctx.Done(): r.logger.Warn(map[string]any{ "msg": "Context terminated. Exiting...", }) return ctx.Err() case cmd := <-r.commands: switch cmd.CommandType { case CommandStart: r.logger.Info(map[string]any{ "msg": "Received Start command", }) cmd.Resp <- r.start() case CommandStop: r.logger.Info(map[string]any{ "msg": "Received Stop command", }) cmd.Resp <- r.stop() r.proc = nil case CommandStatus: r.logger.Info(map[string]any{ "msg": "Received Status command", }) cmd.Resp <- r.printLogs() case CommandLiveness: r.logger.Info(map[string]any{ "msg": "Received Liveness command", }) cmd.Resp <- r.liveness() } } } } func (r *Radio) Start() <-chan any { resp := make(chan any) r.commands <- Command{ CommandType: CommandStart, Resp: resp, } return resp } func (r *Radio) Stop() <-chan any { resp := make(chan any) r.commands <- Command{ CommandType: CommandStop, Resp: resp, } return resp } func (r *Radio) Status() <-chan any { resp := make(chan any) r.commands <- Command{ CommandType: CommandStatus, Resp: resp, } return resp } func (r *Radio) IsAlive() <-chan any { resp := make(chan any) r.commands <- Command{ CommandType: CommandLiveness, Resp: resp, } return resp } func (r *Radio) start() error { if r.proc != nil { return ErrAlreadyRunning } cwd, err := os.Getwd() if err != nil { return err } r.logger.Debug(map[string]any{ "msg": "Starting process", "cwd": cwd, "exec": r.Program, "cmdLine": r.CmdLine, }) proc, err := os.StartProcess(r.Program, r.CmdLine, &os.ProcAttr{ Dir: cwd, Env: os.Environ(), Files: []*os.File{ nil, r.lines.File(), r.lines.File(), }, }) if err != nil { return err } r.proc = proc errCh := make(chan error) go func() { state, err := r.proc.Wait() if err != nil { errCh <- err } r.logger.Info(map[string]any{ "msg": "Process exited", "exit_code": state.ExitCode(), "sys": state.SystemTime(), "usr": state.UserTime(), }) }() go func() { select { case <-r.quit: if r.proc != nil { r.errs <- r.proc.Kill() } case err := <-errCh: r.logger.Err(map[string]any{ "msg": "Process exited with error", "err": err.Error(), }) close(errCh) } }() return nil } func (r *Radio) stop() error { r.quit <- struct{}{} return <-r.errs } func (r *Radio) printLogs() []string { return r.lines.Lines() } func (r *Radio) liveness() error { if r.proc == nil { return ErrNotRunning } return r.proc.Signal(syscall.Signal(0)) }