master
blallo 2023-02-13 23:49:45 +01:00
commit 68b117c054
Signed by: blallo
GPG Key ID: C530464EEDCF489A
8 changed files with 678 additions and 0 deletions

1
.gitignore vendored 100644
View File

@ -0,0 +1 @@
/bin

View File

@ -0,0 +1,154 @@
package main
import (
"context"
"flag"
"os"
"os/signal"
"syscall"
"time"
"git.abbiamoundominio.org/blallo/broadcast"
"git.sr.ht/~blallo/logz/interface"
"git.sr.ht/~blallo/logz/zlog"
)
var (
debug = flag.Bool("debug", false, "Enable debug logging")
)
func main() {
logger := zlog.NewConsoleLogger()
flag.Parse()
if flag.NArg() < 1 {
logger.Err(map[string]any{
"msg": "Wrong number of arguments",
})
os.Exit(1)
}
if *debug {
logger.SetLevel(logz.LogDebug)
}
prog := flag.Arg(0)
cmdLine := flag.Args()[1:]
radio, err := broadcast.NewRadio(logger, prog, cmdLine...)
if err != nil {
logger.Err(map[string]any{
"msg": "Failed to start",
"err": err,
})
os.Exit(2)
}
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
start := make(chan os.Signal)
stop := make(chan os.Signal)
status := make(chan os.Signal)
signal.Notify(start, syscall.SIGUSR1)
signal.Notify(stop, syscall.SIGTSTP)
signal.Notify(status, syscall.SIGUSR2)
go func() {
for {
select {
case <-ctx.Done():
return
case <-start:
resp := <-radio.Start()
if resp != nil {
logger.Warn(map[string]any{
"msg": "Failed to start",
"err": resp.(error).Error(),
})
} else {
logger.Info(map[string]any{
"msg": "Started",
})
}
}
}
}()
go func() {
for {
select {
case <-ctx.Done():
return
case <-stop:
resp := <-radio.Stop()
if resp != nil {
logger.Warn(map[string]any{
"msg": "Failed to stop",
"err": resp.(error).Error(),
})
} else {
logger.Info(map[string]any{
"msg": "Stopped",
})
}
}
}
}()
go func() {
for {
select {
case <-ctx.Done():
return
case <-status:
resp := <-radio.Status()
for i, line := range resp.([]string) {
logger.Info(map[string]any{
"msg": line,
"lineNum": i,
})
}
}
}
}()
go func() {
resp, err := withTimeout(ctx, radio.Start())
if err != nil {
logger.Err(map[string]any{
"msg": "Cannot start",
"err": err.Error(),
})
} else {
if resp != nil {
logger.Info(map[string]any{
"msg": "Started",
"resp": resp.(error).Error(),
})
}
}
}()
if err := radio.Run(ctx); err != nil {
logger.Err(map[string]any{
"msg": "Execution failed",
"err": err.Error(),
})
os.Exit(2)
}
}
func withTimeout[T any](ctx context.Context, respCh <-chan T) (zero T, err error) {
shortCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
select {
case <-shortCtx.Done():
return zero, shortCtx.Err()
case resp := <-respCh:
return resp, nil
}
}

25
cmd/echo/main.go 100644
View File

@ -0,0 +1,25 @@
package main
import (
"context"
"fmt"
"os"
"os/signal"
"time"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
ticker := time.NewTicker(time.Second)
for {
select {
case <-ctx.Done():
fmt.Println("Bye...")
return
case <-ticker.C:
fmt.Println("Still alive")
}
}
}

17
go.mod 100644
View File

@ -0,0 +1,17 @@
module git.abbiamoundominio.org/blallo/broadcast
go 1.20
require (
git.sr.ht/~blallo/logz v0.0.0-20220324191132-95d94ae8e337 // indirect
git.sr.ht/~blallo/logz/interface v0.0.0-20220324191132-95d94ae8e337 // indirect
git.sr.ht/~blallo/logz/testlogger v0.0.0-20230212191205-53d5ce2c0d54 // indirect
git.sr.ht/~blallo/logz/zlog v0.0.0-20220324191132-95d94ae8e337 // indirect
github.com/cenkalti/backoff/v4 v4.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rs/zerolog v1.26.1 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/stretchr/testify v1.8.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

59
go.sum 100644
View File

@ -0,0 +1,59 @@
git.sr.ht/~blallo/logz v0.0.0-20220324191132-95d94ae8e337 h1:QYgqHKnaExscRlUrQvpm9AiHnpOVJnoQBw/W+VCZaK8=
git.sr.ht/~blallo/logz v0.0.0-20220324191132-95d94ae8e337/go.mod h1:W/OSkm9pxF84geA9tK+A9Ys028UF1ayD41BJlCDC4Io=
git.sr.ht/~blallo/logz/interface v0.0.0-20220324191132-95d94ae8e337 h1:a62rvbRTBnosIciC9Bg7i8XlpYDmMDGecj1WCUgg8Uo=
git.sr.ht/~blallo/logz/interface v0.0.0-20220324191132-95d94ae8e337/go.mod h1:V1e+pLie6GMc2iEdyhB3+bSfFBvwY0qDcQmyIQ3Jr3I=
git.sr.ht/~blallo/logz/testlogger v0.0.0-20230212191205-53d5ce2c0d54 h1:CHxlq5zroyZORIG9mFvAxLTNuMyUFZZGStevHvAF+ek=
git.sr.ht/~blallo/logz/testlogger v0.0.0-20230212191205-53d5ce2c0d54/go.mod h1:qOrf0RE20YqvShbkT+uZ1YfTOp8DYVIldF2Q9umhpOo=
git.sr.ht/~blallo/logz/zlog v0.0.0-20220324191132-95d94ae8e337 h1:4oSLCsOXU0sRwEwn5+VIQQ0E2YhUFrnNuZkv+umipo0=
git.sr.ht/~blallo/logz/zlog v0.0.0-20220324191132-95d94ae8e337/go.mod h1:9fUP8ioBX7rzNIBYPvi7irK/ZSug4gLDInl0CJvHgys=
github.com/cenkalti/backoff/v4 v4.2.0 h1:HN5dHm3WBOgndBH6E8V0q2jIYIR3s9yglV8k/+MN3u4=
github.com/cenkalti/backoff/v4 v4.2.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.26.1 h1:/ihwxqH+4z8UxyI70wM1z9yCvkWcfz/a3mj48k/Zngc=
github.com/rs/zerolog v1.26.1/go.mod h1:/wSSJWX7lVrsOwlbyTRSOJvqRlc+WjWlfes+CiJ+tmc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

158
lines_ring.go 100644
View File

@ -0,0 +1,158 @@
package broadcast
import (
"container/ring"
"context"
"errors"
"fmt"
"io"
"os"
"sync"
"time"
"github.com/cenkalti/backoff/v4"
"git.sr.ht/~blallo/logz/interface"
)
const (
bufSize = 10
minBHWait = time.Millisecond
maxBHWait = 500 * time.Millisecond
factor = 2
)
type LineRing struct {
mu sync.Mutex
pipeIn *os.File
pipeOut *os.File
ring *ring.Ring
logger logz.Logger
fmt func(string) string
}
func NewLineRing(size int) (*LineRing, error) {
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
return &LineRing{
pipeOut: r,
pipeIn: w,
ring: ring.New(size),
fmt: defaultFormatter,
}, nil
}
func (r *LineRing) WithLogger(logger logz.Logger) {
r.logger = logger
}
func (r *LineRing) WithFormatter(fmt func(string) string) {
r.fmt = fmt
}
func (r *LineRing) File() *os.File {
return r.pipeIn
}
func (r *LineRing) Run(ctx context.Context) error {
lines := make(chan string)
defer close(lines)
go func() {
var line []byte
buf := make([]byte, bufSize)
backoff := newBackoff(ctx)
Outer:
for {
n, err := r.pipeOut.Read(buf)
if err != nil && !errors.Is(err, io.EOF) {
if r.logger != nil {
r.logger.Debug(map[string]any{
"msg": "Pipe errored",
"err": err,
})
}
time.Sleep(backoff.NextBackOff())
continue Outer
}
backoff.Reset()
Inner:
for i := 0; i < n; i++ {
c := buf[i]
if c == 0xA { // check if newline
lines <- string(line)
line = nil
continue Inner
}
line = append(line, c)
}
}
}()
for {
select {
case <-ctx.Done():
if err := r.Close(); err != nil {
return err
}
return ctx.Err()
case line := <-lines:
r.mu.Lock()
r.ring.Value = r.fmt(line)
r.ring = r.ring.Next()
r.mu.Unlock()
}
}
}
func (r *LineRing) Close() error {
if err := r.pipeIn.Close(); err != nil {
return err
}
if err := r.pipeOut.Close(); err != nil {
return err
}
return nil
}
func (r *LineRing) Lines() (res []string) {
r.mu.Lock()
defer r.mu.Unlock()
r.ring.Do(func(elem any) {
if elem == nil {
return
}
res = append(res, elem.(string))
})
return
}
func newBackoff(ctx context.Context) backoff.BackOffContext {
bh := backoff.NewExponentialBackOff()
bh.InitialInterval = minBHWait
bh.Multiplier = factor
bh.MaxInterval = maxBHWait
return backoff.WithContext(bh, ctx)
}
func defaultFormatter(line string) string {
return line
}
func timestampFormatter(line string) string {
return fmt.Sprintf("%s: %s", time.Now().Format(time.RFC3339), line)
}

65
lines_ring_test.go 100644
View File

@ -0,0 +1,65 @@
package broadcast
import (
"context"
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"git.sr.ht/~blallo/logz/interface"
"git.sr.ht/~blallo/logz/testlogger"
)
func TestLineRing(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
ctx := context.TODO()
lines, err := NewLineRing(3)
require.NoError(err)
logLevel := logz.LogInfo
if os.Getenv("DEBUG") != "" {
logLevel = logz.LogDebug
}
logger := testlogger.New(t).SetLevel(logLevel)
lines.WithLogger(logger)
go lines.Run(ctx)
defer lines.Close()
testLines := []string{
"test line 1",
"test line 2",
"test line 3",
"test line 4",
"test line 5",
}
pIn := lines.File()
for _, line := range testLines[:3] {
pIn.Write([]byte(line))
pIn.Write([]byte("\n"))
}
assert.Eventually(
func() bool {
return assert.Equal(testLines[:3], lines.Lines())
}, 100*time.Millisecond, time.Millisecond,
)
for _, line := range testLines[3:] {
pIn.Write([]byte(line))
pIn.Write([]byte("\n"))
}
assert.Eventually(
func() bool {
return assert.Equal(append(testLines[2:3], testLines[3:]...), lines.Lines())
}, 100*time.Millisecond, time.Millisecond,
)
}

199
runner.go 100644
View File

@ -0,0 +1,199 @@
package broadcast
import (
"context"
"errors"
"os"
"git.sr.ht/~blallo/logz/interface"
)
const (
bufLines = 100
)
var (
ErrAlreadyRunning = errors.New("The process is already running")
ErrNotRunning = errors.New("The process is not running")
)
type Command struct {
CommandType
Resp chan any
}
type CommandType int
const (
CommandStart = 1 << iota
CommandStop
CommandStatus
)
type Radio struct {
Program string
CmdLine []string
lines *LineRing
proc *os.Process
errs chan error
quit chan struct{}
commands chan Command
logger logz.Logger
}
func NewRadio(logger logz.Logger, prog string, cmdLine ...string) (*Radio, error) {
lines, err := NewLineRing(bufLines)
if err != nil {
return nil, err
}
lines.WithFormatter(timestampFormatter)
lines.WithLogger(logger)
return &Radio{
Program: prog,
CmdLine: cmdLine,
lines: lines,
commands: make(chan Command, 100),
errs: make(chan error),
quit: make(chan struct{}),
logger: logger,
}, nil
}
func (r *Radio) Run(ctx context.Context) error {
go r.lines.Run(ctx)
for {
select {
case <-ctx.Done():
r.logger.Warn(map[string]any{
"msg": "Context terminated. Exiting...",
})
return ctx.Err()
case cmd := <-r.commands:
switch cmd.CommandType {
case CommandStart:
r.logger.Info(map[string]any{
"msg": "Received Start command",
})
cmd.Resp <- r.start()
case CommandStop:
r.logger.Info(map[string]any{
"msg": "Received Stop command",
})
cmd.Resp <- r.stop()
r.proc = nil
case CommandStatus:
r.logger.Info(map[string]any{
"msg": "Received Status command",
})
cmd.Resp <- r.printLogs()
}
}
}
}
func (r *Radio) Start() <-chan any {
resp := make(chan any)
r.commands <- Command{
CommandType: CommandStart,
Resp: resp,
}
return resp
}
func (r *Radio) Stop() <-chan any {
resp := make(chan any)
r.commands <- Command{
CommandType: CommandStop,
Resp: resp,
}
return resp
}
func (r *Radio) Status() <-chan any {
resp := make(chan any)
r.commands <- Command{
CommandType: CommandStatus,
Resp: resp,
}
return resp
}
func (r *Radio) start() error {
if r.proc != nil {
return ErrAlreadyRunning
}
cwd, err := os.Getwd()
if err != nil {
return err
}
r.logger.Debug(map[string]any{
"msg": "Starting process",
"cwd": cwd,
"exec": r.Program,
"cmdLine": r.CmdLine,
})
proc, err := os.StartProcess(r.Program, r.CmdLine, &os.ProcAttr{
Dir: cwd,
Env: os.Environ(),
Files: []*os.File{
nil,
r.lines.File(),
r.lines.File(),
},
})
if err != nil {
return err
}
r.proc = proc
errCh := make(chan error)
go func() {
state, err := r.proc.Wait()
if err != nil {
errCh <- err
}
r.logger.Info(map[string]any{
"msg": "Process exited",
"exit_code": state.ExitCode(),
"sys": state.SystemTime(),
"usr": state.UserTime(),
})
}()
go func() {
select {
case <-r.quit:
if r.proc != nil {
r.errs <- r.proc.Kill()
}
case err := <-errCh:
r.logger.Err(map[string]any{
"msg": "Process exited with error",
"err": err.Error(),
})
close(errCh)
}
}()
return nil
}
func (r *Radio) stop() error {
r.quit <- struct{}{}
return <-r.errs
}
func (r *Radio) printLogs() []string {
return r.lines.Lines()
}