package broadcast import ( "container/ring" "context" "errors" "fmt" "io" "os" "sync" "time" "github.com/cenkalti/backoff/v4" "git.sr.ht/~blallo/logz/interface" ) const ( bufSize = 10 minBHWait = time.Millisecond maxBHWait = 500 * time.Millisecond factor = 2 ) type LineRing struct { mu sync.Mutex pipeIn *os.File pipeOut *os.File ring *ring.Ring logger logz.Logger fmt func(string) string } func NewLineRing(size int) (*LineRing, error) { r, w, err := os.Pipe() if err != nil { return nil, err } return &LineRing{ pipeOut: r, pipeIn: w, ring: ring.New(size), fmt: defaultFormatter, }, nil } func (r *LineRing) WithLogger(logger logz.Logger) { r.logger = logger } func (r *LineRing) WithFormatter(fmt func(string) string) { r.fmt = fmt } func (r *LineRing) File() *os.File { return r.pipeIn } func (r *LineRing) Run(ctx context.Context) error { lines := make(chan string) defer close(lines) go func() { var line []byte buf := make([]byte, bufSize) backoff := newBackoff(ctx) Outer: for { n, err := r.pipeOut.Read(buf) if err != nil && !errors.Is(err, io.EOF) { if r.logger != nil { r.logger.Debug(map[string]any{ "msg": "Pipe errored", "err": err, }) } time.Sleep(backoff.NextBackOff()) continue Outer } backoff.Reset() Inner: for i := 0; i < n; i++ { c := buf[i] if c == 0xA { // check if newline lines <- string(line) line = nil continue Inner } line = append(line, c) } } }() for { select { case <-ctx.Done(): if err := r.Close(); err != nil { return err } return ctx.Err() case line := <-lines: r.mu.Lock() r.ring.Value = r.fmt(line) r.ring = r.ring.Next() r.mu.Unlock() } } } func (r *LineRing) Close() error { if err := r.pipeIn.Close(); err != nil { return err } if err := r.pipeOut.Close(); err != nil { return err } return nil } func (r *LineRing) Lines() (res []string) { r.mu.Lock() defer r.mu.Unlock() r.ring.Do(func(elem any) { if elem == nil { return } res = append(res, elem.(string)) }) return } func newBackoff(ctx context.Context) backoff.BackOffContext { bh := backoff.NewExponentialBackOff() bh.InitialInterval = minBHWait bh.Multiplier = factor bh.MaxInterval = maxBHWait return backoff.WithContext(bh, ctx) } func defaultFormatter(line string) string { return line } func timestampFormatter(line string) string { return fmt.Sprintf("%s: %s", time.Now().Format(time.RFC3339), line) }