broadcast/docker.go

208 lines
4.1 KiB
Go

package broadcast
import (
"bufio"
"context"
"errors"
"io"
"strconv"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
"git.sr.ht/~blallo/logz/interface"
)
var (
ErrNotFound = errors.New("container not found")
ErrTooMany = errors.New("too many containers match the name")
)
var _ Runnable = &DockerContainer{}
type DockerContainer struct {
name string
client *client.Client
logger logz.Logger
}
func NewDockerContainer(logger logz.Logger, name string) *DockerContainer {
return &DockerContainer{
name: name,
logger: logger,
}
}
func (d *DockerContainer) Init(ctx context.Context) error {
cli, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
return err
}
d.client = cli
return nil
}
func (d *DockerContainer) Start(ctx context.Context) error {
c, err := d.getContainer(ctx)
if err != nil {
d.logger.Err(map[string]any{
"msg": "Cannot find container",
"err": err.Error(),
})
return ErrNotRunning
}
if err = d.client.ContainerUnpause(ctx, c.ID); err != nil {
d.logger.Err(map[string]any{
"msg": "Cannot start container",
"err": err.Error(),
})
return ErrNotRunning
}
return nil
}
func (d *DockerContainer) Stop(ctx context.Context) error {
c, err := d.getContainer(ctx)
if err != nil {
d.logger.Err(map[string]any{
"msg": "Cannot find container",
"err": err.Error(),
})
return ErrNotRunning
}
if err = d.client.ContainerPause(ctx, c.ID); err != nil {
d.logger.Err(map[string]any{
"msg": "Cannot stop container",
"err": err.Error(),
})
return ErrCannotStop
}
return nil
}
func (d *DockerContainer) Logs(ctx context.Context) []string {
c, err := d.getContainer(ctx)
if err != nil {
d.logger.Err(map[string]any{
"msg": "Cannot find container",
"err": err.Error(),
})
return nil
}
conf, err := d.client.ContainerInspect(ctx, c.ID)
if err != nil {
d.logger.Err(map[string]any{
"msg": "Cannot inspect container",
"err": err.Error(),
})
return nil
}
reader, err := d.client.ContainerLogs(ctx, c.ID, types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Tail: strconv.Itoa(bufLines),
})
if err != nil {
d.logger.Err(map[string]any{
"msg": "Cannot stop container",
"err": err.Error(),
})
return nil
}
if !conf.Config.Tty {
return d.consumeStreams(reader)
}
return d.consumeStream(reader)
}
func (d *DockerContainer) Liveness(ctx context.Context) error {
c, err := d.getContainer(ctx)
if err != nil {
d.logger.Err(map[string]any{
"msg": "Cannot find container",
"err": err.Error(),
})
return ErrNotRunning
}
if c.State != "running" {
d.logger.Debug(map[string]any{
"msg": "Container not running",
"state": c.State,
})
return ErrNotRunning
}
return nil
}
func (d *DockerContainer) getContainer(ctx context.Context) (*types.Container, error) {
containers, err := d.client.ContainerList(ctx, types.ContainerListOptions{
Filters: filters.NewArgs(filters.KeyValuePair{Key: "name", Value: d.name}),
})
if err != nil {
return nil, err
}
d.logger.Debug(map[string]any{
"msg": "Containers found",
"containers": containers,
})
switch len(containers) {
case 0:
return nil, ErrNotFound
case 1:
return &containers[0], nil
default:
return nil, ErrTooMany
}
}
func (d *DockerContainer) consumeStream(r io.Reader) (result []string) {
scanner := bufio.NewScanner(r)
for scanner.Scan() {
if err := scanner.Err(); err != nil {
d.logger.Err(map[string]any{
"msg": "Failed to consume stream",
"err": err.Error(),
})
return nil
}
result = append(result, scanner.Text())
}
return
}
func (d *DockerContainer) consumeStreams(r io.Reader) (result []string) {
rout, wout := io.Pipe()
rerr, werr := io.Pipe()
if _, err := stdcopy.StdCopy(wout, werr, r); err != nil {
d.logger.Err(map[string]any{
"msg": "Failed to consume stream",
"err": err.Error(),
})
return nil
}
result = append(result, d.consumeStream(rout)...)
result = append(result, d.consumeStream(rerr)...)
return
}