package broadcast import ( "bufio" "context" "errors" "io" "strconv" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/client" "github.com/docker/docker/pkg/stdcopy" "git.sr.ht/~blallo/logz/interface" ) var ( ErrNotFound = errors.New("container not found") ErrTooMany = errors.New("too many containers match the name") ) var _ Runnable = &DockerContainer{} type DockerContainer struct { name string client *client.Client logger logz.Logger } func NewDockerContainer(logger logz.Logger, name string) *DockerContainer { return &DockerContainer{ name: name, logger: logger, } } func (d *DockerContainer) Init(ctx context.Context) error { cli, err := client.NewClientWithOpts(client.FromEnv) if err != nil { return err } d.client = cli return nil } func (d *DockerContainer) Start(ctx context.Context) error { c, err := d.getContainer(ctx) if err != nil { d.logger.Err(map[string]any{ "msg": "Cannot find container", "err": err.Error(), }) return ErrNotRunning } if err = d.client.ContainerUnpause(ctx, c.ID); err != nil { d.logger.Err(map[string]any{ "msg": "Cannot start container", "err": err.Error(), }) return ErrNotRunning } return nil } func (d *DockerContainer) Stop(ctx context.Context) error { c, err := d.getContainer(ctx) if err != nil { d.logger.Err(map[string]any{ "msg": "Cannot find container", "err": err.Error(), }) return ErrNotRunning } if err = d.client.ContainerPause(ctx, c.ID); err != nil { d.logger.Err(map[string]any{ "msg": "Cannot stop container", "err": err.Error(), }) return ErrCannotStop } return nil } func (d *DockerContainer) Logs(ctx context.Context) []string { c, err := d.getContainer(ctx) if err != nil { d.logger.Err(map[string]any{ "msg": "Cannot find container", "err": err.Error(), }) return nil } conf, err := d.client.ContainerInspect(ctx, c.ID) if err != nil { d.logger.Err(map[string]any{ "msg": "Cannot inspect container", "err": err.Error(), }) return nil } reader, err := d.client.ContainerLogs(ctx, c.ID, types.ContainerLogsOptions{ ShowStdout: true, ShowStderr: true, Tail: strconv.Itoa(bufLines), }) if err != nil { d.logger.Err(map[string]any{ "msg": "Cannot stop container", "err": err.Error(), }) return nil } if !conf.Config.Tty { return d.consumeStreams(reader) } return d.consumeStream(reader) } func (d *DockerContainer) Liveness(ctx context.Context) error { c, err := d.getContainer(ctx) if err != nil { d.logger.Err(map[string]any{ "msg": "Cannot find container", "err": err.Error(), }) return ErrNotRunning } if c.State != "running" { d.logger.Debug(map[string]any{ "msg": "Container not running", "state": c.State, }) return ErrNotRunning } return nil } func (d *DockerContainer) getContainer(ctx context.Context) (*types.Container, error) { containers, err := d.client.ContainerList(ctx, types.ContainerListOptions{ Filters: filters.NewArgs(filters.KeyValuePair{Key: "name", Value: d.name}), }) if err != nil { return nil, err } d.logger.Debug(map[string]any{ "msg": "Containers found", "containers": containers, }) switch len(containers) { case 0: return nil, ErrNotFound case 1: return &containers[0], nil default: return nil, ErrTooMany } } func (d *DockerContainer) consumeStream(r io.Reader) (result []string) { scanner := bufio.NewScanner(r) for scanner.Scan() { if err := scanner.Err(); err != nil { d.logger.Err(map[string]any{ "msg": "Failed to consume stream", "err": err.Error(), }) return nil } result = append(result, scanner.Text()) } return } func (d *DockerContainer) consumeStreams(r io.Reader) (result []string) { rout, wout := io.Pipe() rerr, werr := io.Pipe() if _, err := stdcopy.StdCopy(wout, werr, r); err != nil { d.logger.Err(map[string]any{ "msg": "Failed to consume stream", "err": err.Error(), }) return nil } result = append(result, d.consumeStream(rout)...) result = append(result, d.consumeStream(rerr)...) return }