From e125f1a541b25086a121d62ce5ef343e1a1063b3 Mon Sep 17 00:00:00 2001 From: Blallo Date: Mon, 13 Mar 2023 01:08:13 +0100 Subject: [PATCH] Refactor execution logic as an inteface --- process.go | 141 ++++++++++++++++++++++++++++++++++++++++++++++++++++ radio.go | 125 ++++------------------------------------------ runnable.go | 20 ++++++++ 3 files changed, 172 insertions(+), 114 deletions(-) create mode 100644 process.go create mode 100644 runnable.go diff --git a/process.go b/process.go new file mode 100644 index 0000000..8943a5f --- /dev/null +++ b/process.go @@ -0,0 +1,141 @@ +package broadcast + +import ( + "context" + "os" + "sync" + "syscall" + + "git.sr.ht/~blallo/logz/interface" +) + +var _ Runnable = &Process{} + +// Process represent a [Runnable] whose underlying is a system process that is +// directly supervised. +type Process struct { + program string + cmdLine []string + + lines *LineRing + mu sync.RWMutex + proc *os.Process + errs chan error + quit chan struct{} + logger logz.Logger +} + +func NewProcess(logger logz.Logger, program string, cmdLine ...string) (*Process, error) { + lines, err := NewLineRing(bufLines) + if err != nil { + return nil, err + } + lines.WithFormatter(timestampFormatter) + lines.WithLogger(logger) + + return &Process{ + program: program, + cmdLine: cmdLine, + lines: lines, + logger: logger, + errs: make(chan error), + quit: make(chan struct{}), + }, nil +} + +func (p *Process) Init(ctx context.Context) error { + go p.lines.Run(ctx) + + return nil +} + +func (p *Process) Start(context.Context) error { + p.mu.RLock() + if p.proc != nil { + defer p.mu.RUnlock() + return ErrAlreadyRunning + } + p.mu.RUnlock() + + cwd, err := os.Getwd() + if err != nil { + return err + } + + p.logger.Debug(map[string]any{ + "msg": "Starting process", + "cwd": cwd, + "exec": p.program, + "cmdLine": p.cmdLine, + }) + + p.mu.Lock() + proc, err := os.StartProcess(p.program, p.cmdLine, &os.ProcAttr{ + Dir: cwd, + Env: os.Environ(), + Files: []*os.File{ + nil, + p.lines.File(), + p.lines.File(), + }, + }) + if err != nil { + defer p.mu.Unlock() + return err + } + + p.proc = proc + p.mu.Unlock() + + errCh := make(chan error) + + go func() { + state, err := p.proc.Wait() + if err != nil { + errCh <- err + } + p.logger.Info(map[string]any{ + "msg": "Process exited", + "exit_code": state.ExitCode(), + "sys": state.SystemTime(), + "usr": state.UserTime(), + }) + }() + + go func() { + select { + case <-p.quit: + p.mu.Lock() + if p.proc != nil { + p.errs <- p.proc.Kill() + } + p.proc = nil + p.mu.Unlock() + case err := <-errCh: + p.logger.Err(map[string]any{ + "msg": "Process exited with error", + "err": err.Error(), + }) + close(errCh) + } + }() + + return nil +} + +func (p *Process) Stop(context.Context) error { + p.quit <- struct{}{} + return <-p.errs +} + +func (p *Process) Logs(context.Context) []string { + return p.lines.Lines() +} + +func (p *Process) Liveness(context.Context) error { + if p.proc == nil { + return ErrNotRunning + } + + return p.proc.Signal(syscall.Signal(0)) +} diff --git a/radio.go b/radio.go index 4e52274..7b2af44 100644 --- a/radio.go +++ b/radio.go @@ -2,9 +2,6 @@ package broadcast import ( "context" - "errors" - "os" - "syscall" "git.sr.ht/~blallo/logz/interface" ) @@ -13,11 +10,6 @@ const ( bufLines = 100 ) -var ( - ErrAlreadyRunning = errors.New("The process is already running") - ErrNotRunning = errors.New("The process is not running") -) - type Command struct { CommandType Resp chan any @@ -36,36 +28,23 @@ type Radio struct { Program string CmdLine []string - lines *LineRing - proc *os.Process - errs chan error - quit chan struct{} + runnable Runnable commands chan Command logger logz.Logger } -func NewRadio(logger logz.Logger, prog string, cmdLine ...string) (*Radio, error) { - lines, err := NewLineRing(bufLines) - if err != nil { - return nil, err - } - lines.WithFormatter(timestampFormatter) - lines.WithLogger(logger) - +func NewRadio(logger logz.Logger, runnable Runnable) (*Radio, error) { return &Radio{ - Program: prog, - CmdLine: cmdLine, - - lines: lines, - commands: make(chan Command, 100), - errs: make(chan error), - quit: make(chan struct{}), + runnable: runnable, logger: logger, + commands: make(chan Command), }, nil } func (r *Radio) Run(ctx context.Context) error { - go r.lines.Run(ctx) + if err := r.runnable.Init(ctx); err != nil { + return err + } for { select { @@ -80,23 +59,22 @@ func (r *Radio) Run(ctx context.Context) error { r.logger.Info(map[string]any{ "msg": "Received Start command", }) - cmd.Resp <- r.start() + cmd.Resp <- r.runnable.Start(ctx) case CommandStop: r.logger.Info(map[string]any{ "msg": "Received Stop command", }) - cmd.Resp <- r.stop() - r.proc = nil + cmd.Resp <- r.runnable.Stop(ctx) case CommandStatus: r.logger.Info(map[string]any{ "msg": "Received Status command", }) - cmd.Resp <- r.printLogs() + cmd.Resp <- r.runnable.Logs(ctx) case CommandLiveness: r.logger.Info(map[string]any{ "msg": "Received Liveness command", }) - cmd.Resp <- r.liveness() + cmd.Resp <- r.runnable.Liveness(ctx) } } } @@ -141,84 +119,3 @@ func (r *Radio) IsAlive() <-chan any { return resp } - -func (r *Radio) start() error { - if r.proc != nil { - return ErrAlreadyRunning - } - - cwd, err := os.Getwd() - if err != nil { - return err - } - - r.logger.Debug(map[string]any{ - "msg": "Starting process", - "cwd": cwd, - "exec": r.Program, - "cmdLine": r.CmdLine, - }) - - proc, err := os.StartProcess(r.Program, r.CmdLine, &os.ProcAttr{ - Dir: cwd, - Env: os.Environ(), - Files: []*os.File{ - nil, - r.lines.File(), - r.lines.File(), - }, - }) - if err != nil { - return err - } - - r.proc = proc - errCh := make(chan error) - - go func() { - state, err := r.proc.Wait() - if err != nil { - errCh <- err - } - r.logger.Info(map[string]any{ - "msg": "Process exited", - "exit_code": state.ExitCode(), - "sys": state.SystemTime(), - "usr": state.UserTime(), - }) - }() - - go func() { - select { - case <-r.quit: - if r.proc != nil { - r.errs <- r.proc.Kill() - } - case err := <-errCh: - r.logger.Err(map[string]any{ - "msg": "Process exited with error", - "err": err.Error(), - }) - close(errCh) - } - }() - - return nil -} - -func (r *Radio) stop() error { - r.quit <- struct{}{} - return <-r.errs -} - -func (r *Radio) printLogs() []string { - return r.lines.Lines() -} - -func (r *Radio) liveness() error { - if r.proc == nil { - return ErrNotRunning - } - - return r.proc.Signal(syscall.Signal(0)) -} diff --git a/runnable.go b/runnable.go new file mode 100644 index 0000000..03e5faa --- /dev/null +++ b/runnable.go @@ -0,0 +1,20 @@ +package broadcast + +import ( + "context" + "errors" +) + +var ( + ErrAlreadyRunning = errors.New("The process is already running") + ErrNotRunning = errors.New("The process is not running") + ErrCannotStop = errors.New("The process cannot be stopped") +) + +type Runnable interface { + Init(context.Context) error + Start(context.Context) error + Stop(context.Context) error + Logs(context.Context) []string + Liveness(context.Context) error +}