broadcast/process.go

142 lines
2.4 KiB
Go

package broadcast
import (
"context"
"os"
"sync"
"syscall"
"git.sr.ht/~blallo/logz/interface"
)
var _ Runnable = &Process{}
// Process represent a [Runnable] whose underlying is a system process that is
// directly supervised.
type Process struct {
program string
cmdLine []string
lines *LineRing
mu sync.RWMutex
proc *os.Process
errs chan error
quit chan struct{}
logger logz.Logger
}
func NewProcess(logger logz.Logger, program string, cmdLine ...string) (*Process, error) {
lines, err := NewLineRing(bufLines)
if err != nil {
return nil, err
}
lines.WithFormatter(timestampFormatter)
lines.WithLogger(logger)
return &Process{
program: program,
cmdLine: cmdLine,
lines: lines,
logger: logger,
errs: make(chan error),
quit: make(chan struct{}),
}, nil
}
func (p *Process) Init(ctx context.Context) error {
go p.lines.Run(ctx)
return nil
}
func (p *Process) Start(context.Context) error {
p.mu.RLock()
if p.proc != nil {
defer p.mu.RUnlock()
return ErrAlreadyRunning
}
p.mu.RUnlock()
cwd, err := os.Getwd()
if err != nil {
return err
}
p.logger.Debug(map[string]any{
"msg": "Starting process",
"cwd": cwd,
"exec": p.program,
"cmdLine": p.cmdLine,
})
p.mu.Lock()
proc, err := os.StartProcess(p.program, p.cmdLine, &os.ProcAttr{
Dir: cwd,
Env: os.Environ(),
Files: []*os.File{
nil,
p.lines.File(),
p.lines.File(),
},
})
if err != nil {
defer p.mu.Unlock()
return err
}
p.proc = proc
p.mu.Unlock()
errCh := make(chan error)
go func() {
state, err := p.proc.Wait()
if err != nil {
errCh <- err
}
p.logger.Info(map[string]any{
"msg": "Process exited",
"exit_code": state.ExitCode(),
"sys": state.SystemTime(),
"usr": state.UserTime(),
})
}()
go func() {
select {
case <-p.quit:
p.mu.Lock()
if p.proc != nil {
p.errs <- p.proc.Kill()
}
p.proc = nil
p.mu.Unlock()
case err := <-errCh:
p.logger.Err(map[string]any{
"msg": "Process exited with error",
"err": err.Error(),
})
close(errCh)
}
}()
return nil
}
func (p *Process) Stop(context.Context) error {
p.quit <- struct{}{}
return <-p.errs
}
func (p *Process) Logs(context.Context) []string {
return p.lines.Lines()
}
func (p *Process) Liveness(context.Context) error {
if p.proc == nil {
return ErrNotRunning
}
return p.proc.Signal(syscall.Signal(0))
}