broadcast/lines_ring.go

159 lines
2.5 KiB
Go
Raw Normal View History

2023-02-13 23:49:45 +01:00
package broadcast
import (
"container/ring"
"context"
"errors"
"fmt"
"io"
"os"
"sync"
"time"
"github.com/cenkalti/backoff/v4"
"git.sr.ht/~blallo/logz/interface"
)
const (
bufSize = 10
minBHWait = time.Millisecond
maxBHWait = 500 * time.Millisecond
factor = 2
)
type LineRing struct {
mu sync.Mutex
pipeIn *os.File
pipeOut *os.File
ring *ring.Ring
logger logz.Logger
fmt func(string) string
}
func NewLineRing(size int) (*LineRing, error) {
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
return &LineRing{
pipeOut: r,
pipeIn: w,
ring: ring.New(size),
fmt: defaultFormatter,
}, nil
}
func (r *LineRing) WithLogger(logger logz.Logger) {
r.logger = logger
}
func (r *LineRing) WithFormatter(fmt func(string) string) {
r.fmt = fmt
}
func (r *LineRing) File() *os.File {
return r.pipeIn
}
func (r *LineRing) Run(ctx context.Context) error {
lines := make(chan string)
defer close(lines)
go func() {
var line []byte
buf := make([]byte, bufSize)
backoff := newBackoff(ctx)
Outer:
for {
n, err := r.pipeOut.Read(buf)
if err != nil && !errors.Is(err, io.EOF) {
if r.logger != nil {
r.logger.Debug(map[string]any{
"msg": "Pipe errored",
"err": err,
})
}
time.Sleep(backoff.NextBackOff())
continue Outer
}
backoff.Reset()
Inner:
for i := 0; i < n; i++ {
c := buf[i]
if c == 0xA { // check if newline
lines <- string(line)
line = nil
continue Inner
}
line = append(line, c)
}
}
}()
for {
select {
case <-ctx.Done():
if err := r.Close(); err != nil {
return err
}
return ctx.Err()
case line := <-lines:
r.mu.Lock()
r.ring.Value = r.fmt(line)
r.ring = r.ring.Next()
r.mu.Unlock()
}
}
}
func (r *LineRing) Close() error {
if err := r.pipeIn.Close(); err != nil {
return err
}
if err := r.pipeOut.Close(); err != nil {
return err
}
return nil
}
func (r *LineRing) Lines() (res []string) {
r.mu.Lock()
defer r.mu.Unlock()
r.ring.Do(func(elem any) {
if elem == nil {
return
}
res = append(res, elem.(string))
})
return
}
func newBackoff(ctx context.Context) backoff.BackOffContext {
bh := backoff.NewExponentialBackOff()
bh.InitialInterval = minBHWait
bh.Multiplier = factor
bh.MaxInterval = maxBHWait
return backoff.WithContext(bh, ctx)
}
func defaultFormatter(line string) string {
return line
}
func timestampFormatter(line string) string {
return fmt.Sprintf("%s: %s", time.Now().Format(time.RFC3339), line)
}