209 lines
4.2 KiB
Go
209 lines
4.2 KiB
Go
|
package broadcast
|
||
|
|
||
|
import (
|
||
|
"bufio"
|
||
|
"context"
|
||
|
"errors"
|
||
|
"io"
|
||
|
"strconv"
|
||
|
|
||
|
"github.com/docker/docker/api/types"
|
||
|
"github.com/docker/docker/api/types/container"
|
||
|
"github.com/docker/docker/api/types/filters"
|
||
|
"github.com/docker/docker/client"
|
||
|
"github.com/docker/docker/pkg/stdcopy"
|
||
|
|
||
|
"git.sr.ht/~blallo/logz/interface"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
ErrNotFound = errors.New("container not found")
|
||
|
ErrTooMany = errors.New("too many containers match the name")
|
||
|
)
|
||
|
|
||
|
var _ Runnable = &DockerContainer{}
|
||
|
|
||
|
type DockerContainer struct {
|
||
|
name string
|
||
|
client *client.Client
|
||
|
logger logz.Logger
|
||
|
}
|
||
|
|
||
|
func NewDockerContainer(logger logz.Logger, name string) *DockerContainer {
|
||
|
return &DockerContainer{
|
||
|
name: name,
|
||
|
logger: logger,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (d *DockerContainer) Init(ctx context.Context) error {
|
||
|
cli, err := client.NewClientWithOpts(client.FromEnv)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
d.client = cli
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (d *DockerContainer) Start(ctx context.Context) error {
|
||
|
c, err := d.getContainer(ctx)
|
||
|
if err != nil {
|
||
|
d.logger.Err(map[string]any{
|
||
|
"msg": "Cannot find container",
|
||
|
"err": err.Error(),
|
||
|
})
|
||
|
return ErrNotRunning
|
||
|
}
|
||
|
|
||
|
if err = d.client.ContainerStart(ctx, c.ID, types.ContainerStartOptions{}); err != nil {
|
||
|
d.logger.Err(map[string]any{
|
||
|
"msg": "Cannot start container",
|
||
|
"err": err.Error(),
|
||
|
})
|
||
|
return ErrNotRunning
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (d *DockerContainer) Stop(ctx context.Context) error {
|
||
|
c, err := d.getContainer(ctx)
|
||
|
if err != nil {
|
||
|
d.logger.Err(map[string]any{
|
||
|
"msg": "Cannot find container",
|
||
|
"err": err.Error(),
|
||
|
})
|
||
|
return ErrNotRunning
|
||
|
}
|
||
|
|
||
|
if err = d.client.ContainerStop(ctx, c.ID, container.StopOptions{}); err != nil {
|
||
|
d.logger.Err(map[string]any{
|
||
|
"msg": "Cannot stop container",
|
||
|
"err": err.Error(),
|
||
|
})
|
||
|
return ErrCannotStop
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (d *DockerContainer) Logs(ctx context.Context) []string {
|
||
|
c, err := d.getContainer(ctx)
|
||
|
if err != nil {
|
||
|
d.logger.Err(map[string]any{
|
||
|
"msg": "Cannot find container",
|
||
|
"err": err.Error(),
|
||
|
})
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
conf, err := d.client.ContainerInspect(ctx, c.ID)
|
||
|
if err != nil {
|
||
|
d.logger.Err(map[string]any{
|
||
|
"msg": "Cannot inspect container",
|
||
|
"err": err.Error(),
|
||
|
})
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
reader, err := d.client.ContainerLogs(ctx, c.ID, types.ContainerLogsOptions{
|
||
|
ShowStdout: true,
|
||
|
ShowStderr: true,
|
||
|
Tail: strconv.Itoa(bufLines),
|
||
|
})
|
||
|
if err != nil {
|
||
|
d.logger.Err(map[string]any{
|
||
|
"msg": "Cannot stop container",
|
||
|
"err": err.Error(),
|
||
|
})
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
if !conf.Config.Tty {
|
||
|
return d.consumeStreams(reader)
|
||
|
}
|
||
|
|
||
|
return d.consumeStream(reader)
|
||
|
}
|
||
|
|
||
|
func (d *DockerContainer) Liveness(ctx context.Context) error {
|
||
|
c, err := d.getContainer(ctx)
|
||
|
if err != nil {
|
||
|
d.logger.Err(map[string]any{
|
||
|
"msg": "Cannot find container",
|
||
|
"err": err.Error(),
|
||
|
})
|
||
|
return ErrNotRunning
|
||
|
}
|
||
|
|
||
|
if c.State != "running" {
|
||
|
d.logger.Debug(map[string]any{
|
||
|
"msg": "Container not running",
|
||
|
"state": c.State,
|
||
|
})
|
||
|
return ErrNotRunning
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (d *DockerContainer) getContainer(ctx context.Context) (*types.Container, error) {
|
||
|
containers, err := d.client.ContainerList(ctx, types.ContainerListOptions{
|
||
|
Filters: filters.NewArgs(filters.KeyValuePair{Key: "name", Value: d.name}),
|
||
|
})
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
d.logger.Debug(map[string]any{
|
||
|
"msg": "Containers found",
|
||
|
"containers": containers,
|
||
|
})
|
||
|
|
||
|
switch len(containers) {
|
||
|
case 0:
|
||
|
return nil, ErrNotFound
|
||
|
case 1:
|
||
|
return &containers[0], nil
|
||
|
default:
|
||
|
return nil, ErrTooMany
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (d *DockerContainer) consumeStream(r io.Reader) (result []string) {
|
||
|
scanner := bufio.NewScanner(r)
|
||
|
|
||
|
for scanner.Scan() {
|
||
|
if err := scanner.Err(); err != nil {
|
||
|
d.logger.Err(map[string]any{
|
||
|
"msg": "Failed to consume stream",
|
||
|
"err": err.Error(),
|
||
|
})
|
||
|
return nil
|
||
|
}
|
||
|
result = append(result, scanner.Text())
|
||
|
}
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func (d *DockerContainer) consumeStreams(r io.Reader) (result []string) {
|
||
|
rout, wout := io.Pipe()
|
||
|
rerr, werr := io.Pipe()
|
||
|
|
||
|
if _, err := stdcopy.StdCopy(wout, werr, r); err != nil {
|
||
|
d.logger.Err(map[string]any{
|
||
|
"msg": "Failed to consume stream",
|
||
|
"err": err.Error(),
|
||
|
})
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
result = append(result, d.consumeStream(rout)...)
|
||
|
result = append(result, d.consumeStream(rerr)...)
|
||
|
|
||
|
return
|
||
|
}
|