Refactor execution logic as an inteface

master
blallo 2023-03-13 01:08:13 +01:00
parent 85dcade750
commit e125f1a541
Signed by: blallo
GPG Key ID: C530464EEDCF489A
3 changed files with 172 additions and 114 deletions

141
process.go 100644
View File

@ -0,0 +1,141 @@
package broadcast
import (
"context"
"os"
"sync"
"syscall"
"git.sr.ht/~blallo/logz/interface"
)
var _ Runnable = &Process{}
// Process represent a [Runnable] whose underlying is a system process that is
// directly supervised.
type Process struct {
program string
cmdLine []string
lines *LineRing
mu sync.RWMutex
proc *os.Process
errs chan error
quit chan struct{}
logger logz.Logger
}
func NewProcess(logger logz.Logger, program string, cmdLine ...string) (*Process, error) {
lines, err := NewLineRing(bufLines)
if err != nil {
return nil, err
}
lines.WithFormatter(timestampFormatter)
lines.WithLogger(logger)
return &Process{
program: program,
cmdLine: cmdLine,
lines: lines,
logger: logger,
errs: make(chan error),
quit: make(chan struct{}),
}, nil
}
func (p *Process) Init(ctx context.Context) error {
go p.lines.Run(ctx)
return nil
}
func (p *Process) Start(context.Context) error {
p.mu.RLock()
if p.proc != nil {
defer p.mu.RUnlock()
return ErrAlreadyRunning
}
p.mu.RUnlock()
cwd, err := os.Getwd()
if err != nil {
return err
}
p.logger.Debug(map[string]any{
"msg": "Starting process",
"cwd": cwd,
"exec": p.program,
"cmdLine": p.cmdLine,
})
p.mu.Lock()
proc, err := os.StartProcess(p.program, p.cmdLine, &os.ProcAttr{
Dir: cwd,
Env: os.Environ(),
Files: []*os.File{
nil,
p.lines.File(),
p.lines.File(),
},
})
if err != nil {
defer p.mu.Unlock()
return err
}
p.proc = proc
p.mu.Unlock()
errCh := make(chan error)
go func() {
state, err := p.proc.Wait()
if err != nil {
errCh <- err
}
p.logger.Info(map[string]any{
"msg": "Process exited",
"exit_code": state.ExitCode(),
"sys": state.SystemTime(),
"usr": state.UserTime(),
})
}()
go func() {
select {
case <-p.quit:
p.mu.Lock()
if p.proc != nil {
p.errs <- p.proc.Kill()
}
p.proc = nil
p.mu.Unlock()
case err := <-errCh:
p.logger.Err(map[string]any{
"msg": "Process exited with error",
"err": err.Error(),
})
close(errCh)
}
}()
return nil
}
func (p *Process) Stop(context.Context) error {
p.quit <- struct{}{}
return <-p.errs
}
func (p *Process) Logs(context.Context) []string {
return p.lines.Lines()
}
func (p *Process) Liveness(context.Context) error {
if p.proc == nil {
return ErrNotRunning
}
return p.proc.Signal(syscall.Signal(0))
}

125
radio.go
View File

@ -2,9 +2,6 @@ package broadcast
import (
"context"
"errors"
"os"
"syscall"
"git.sr.ht/~blallo/logz/interface"
)
@ -13,11 +10,6 @@ const (
bufLines = 100
)
var (
ErrAlreadyRunning = errors.New("The process is already running")
ErrNotRunning = errors.New("The process is not running")
)
type Command struct {
CommandType
Resp chan any
@ -36,36 +28,23 @@ type Radio struct {
Program string
CmdLine []string
lines *LineRing
proc *os.Process
errs chan error
quit chan struct{}
runnable Runnable
commands chan Command
logger logz.Logger
}
func NewRadio(logger logz.Logger, prog string, cmdLine ...string) (*Radio, error) {
lines, err := NewLineRing(bufLines)
if err != nil {
return nil, err
}
lines.WithFormatter(timestampFormatter)
lines.WithLogger(logger)
func NewRadio(logger logz.Logger, runnable Runnable) (*Radio, error) {
return &Radio{
Program: prog,
CmdLine: cmdLine,
lines: lines,
commands: make(chan Command, 100),
errs: make(chan error),
quit: make(chan struct{}),
runnable: runnable,
logger: logger,
commands: make(chan Command),
}, nil
}
func (r *Radio) Run(ctx context.Context) error {
go r.lines.Run(ctx)
if err := r.runnable.Init(ctx); err != nil {
return err
}
for {
select {
@ -80,23 +59,22 @@ func (r *Radio) Run(ctx context.Context) error {
r.logger.Info(map[string]any{
"msg": "Received Start command",
})
cmd.Resp <- r.start()
cmd.Resp <- r.runnable.Start(ctx)
case CommandStop:
r.logger.Info(map[string]any{
"msg": "Received Stop command",
})
cmd.Resp <- r.stop()
r.proc = nil
cmd.Resp <- r.runnable.Stop(ctx)
case CommandStatus:
r.logger.Info(map[string]any{
"msg": "Received Status command",
})
cmd.Resp <- r.printLogs()
cmd.Resp <- r.runnable.Logs(ctx)
case CommandLiveness:
r.logger.Info(map[string]any{
"msg": "Received Liveness command",
})
cmd.Resp <- r.liveness()
cmd.Resp <- r.runnable.Liveness(ctx)
}
}
}
@ -141,84 +119,3 @@ func (r *Radio) IsAlive() <-chan any {
return resp
}
func (r *Radio) start() error {
if r.proc != nil {
return ErrAlreadyRunning
}
cwd, err := os.Getwd()
if err != nil {
return err
}
r.logger.Debug(map[string]any{
"msg": "Starting process",
"cwd": cwd,
"exec": r.Program,
"cmdLine": r.CmdLine,
})
proc, err := os.StartProcess(r.Program, r.CmdLine, &os.ProcAttr{
Dir: cwd,
Env: os.Environ(),
Files: []*os.File{
nil,
r.lines.File(),
r.lines.File(),
},
})
if err != nil {
return err
}
r.proc = proc
errCh := make(chan error)
go func() {
state, err := r.proc.Wait()
if err != nil {
errCh <- err
}
r.logger.Info(map[string]any{
"msg": "Process exited",
"exit_code": state.ExitCode(),
"sys": state.SystemTime(),
"usr": state.UserTime(),
})
}()
go func() {
select {
case <-r.quit:
if r.proc != nil {
r.errs <- r.proc.Kill()
}
case err := <-errCh:
r.logger.Err(map[string]any{
"msg": "Process exited with error",
"err": err.Error(),
})
close(errCh)
}
}()
return nil
}
func (r *Radio) stop() error {
r.quit <- struct{}{}
return <-r.errs
}
func (r *Radio) printLogs() []string {
return r.lines.Lines()
}
func (r *Radio) liveness() error {
if r.proc == nil {
return ErrNotRunning
}
return r.proc.Signal(syscall.Signal(0))
}

20
runnable.go 100644
View File

@ -0,0 +1,20 @@
package broadcast
import (
"context"
"errors"
)
var (
ErrAlreadyRunning = errors.New("The process is already running")
ErrNotRunning = errors.New("The process is not running")
ErrCannotStop = errors.New("The process cannot be stopped")
)
type Runnable interface {
Init(context.Context) error
Start(context.Context) error
Stop(context.Context) error
Logs(context.Context) []string
Liveness(context.Context) error
}