broadcast/broadcast/stream.go

331 lines
7.2 KiB
Go

package main
import (
"bufio"
"encoding/base64"
"errors"
"log"
"net"
"os/exec"
"strconv"
"time"
"github.com/stunndard/goicy/mpeg"
)
var Connected bool = false
var csock net.Conn
type Config struct {
ServerType string
StreamBitRate int
StreamSamplerate int
StreamChannels int
Mount string
Password string
StreamName string
StreamURL string
StreamGenre string
StreamDescription string
BufferSize int
}
func Connect(host string, port int) (net.Conn, error) {
h := host + ":" + strconv.Itoa(int(port))
sock, err := net.Dial("tcp", h)
if err != nil {
Connected = false
}
return sock, err
}
func send(sock net.Conn, buf []byte) error {
n, err := sock.Write(buf)
if err != nil {
Connected = false
return err
}
if n != len(buf) {
Connected = false
return errors.New("send() error")
}
return nil
}
func recv(sock net.Conn) ([]byte, error) {
var buf []byte = make([]byte, 1024)
n, err := sock.Read(buf)
//fmt.Println(n, err, string(buf), len(buf))
if err != nil {
log.Fatal(err.Error())
return nil, err
}
return buf[0:n], err
}
func close(sock net.Conn) {
Connected = false
sock.Close()
}
func ConnectServer(config Config, host string, port int) (net.Conn, error) {
var sock net.Conn
if Connected {
return csock, nil
}
log.Printf("Connecting to %s at %s:%d...", config.ServerType, host, strconv.Itoa(port))
sock, err := Connect(host, port)
if err != nil {
Connected = false
return sock, err
}
//fmt.Println("connected ok")
time.Sleep(time.Second)
headers := ""
bitrate := 0
samplerate := 0
channels := 0
bitrate = config.StreamBitRate / 1000
samplerate = config.StreamSamplerate
channels = config.StreamChannels
contenttype := "audio/mpeg" // change for other stream formats
headers = "SOURCE /" + config.Mount + " HTTP/1.0\r\n" +
"Content-Type: " + contenttype + "\r\n" +
"Authorization: Basic " + base64.StdEncoding.EncodeToString([]byte("source:"+config.Password)) + "\r\n" +
"User-Agent: broadcast/" + "\r\n" +
"ice-name: " + config.StreamName + "\r\n" +
"ice-public: 0\r\n" +
"ice-url: " + config.StreamURL + "\r\n" +
"ice-genre: " + config.StreamGenre + "\r\n" +
"ice-description: " + config.StreamDescription + "\r\n" +
"ice-audio-info: bitrate=" + strconv.Itoa(bitrate) +
";channels=" + strconv.Itoa(channels) +
";samplerate=" + strconv.Itoa(samplerate) + "\r\n" +
"\r\n"
err = send(sock, []byte(headers))
if err != nil {
log.Fatal("Error sending headers")
Connected = false
return sock, err
}
time.Sleep(time.Second)
resp, err := recv(sock)
if err != nil {
Connected = false
return sock, err
}
if string(resp[9:12]) != "200" {
Connected = false
return sock, errors.New("Invalid Icecast response: " + string(resp))
}
log.Println("Server connect successful")
Connected = true
csock = sock
return sock, nil
}
func StreamFFMPEG(config Config, filename string) error {
var (
sock net.Conn
res error
cmd *exec.Cmd
totalFramesSent uint64
)
cleanUp := func(err error) {
log.Println("Killing ffmpeg..")
cmd.Process.Kill()
close(sock)
totalFramesSent = 0
res = err
}
var err error
sock, err = ConnectServer("abbiamoundominio.org", 8000, 128000, 48, ch)
if err != nil {
log.Println("Cannot connect to server")
return err
}
cmdArgs := []string{}
// if config.StreamReencode {
// cmdArgs = []string{
// "-i", filename,
// "-c:a", "libmp3lame",
// "-b:a", strconv.Itoa(config.Cfg.StreamBitrate),
// "-cutoff", "20000",
// "-ar", strconv.Itoa(config.Cfg.StreamSamplerate),
// "-ac", strconv.Itoa(config.Cfg.StreamChannels),
// "-f", "mp3",
// "-write_xing", "0",
// "-id3v2_version", "0",
// "-loglevel", "fatal",
// "-",
// }
// } else {
cmdArgs = []string{ // We don't want to re-encode everything
"-i", filename,
"-c:a", "copy",
"-f", "mp3",
"-write_xing", "0",
"-id3v2_version", "0",
"-loglevel", "fatal",
"-",
}
log.Println("Starting ffmpeg: " + config.FFMPEGPath)
log.Println("Format : source, no reencoding")
cmd = exec.Command(config.FFMPEGPath, cmdArgs...)
f, _ := cmd.StdoutPipe()
stderr, _ := cmd.StderrPipe()
if err := cmd.Start(); err != nil {
log.Println("Error starting ffmpeg")
log.Println(err.Error())
return err
}
// log stderr output from ffmpeg
go func() {
in := bufio.NewScanner(stderr)
for in.Scan() {
log.Println("FFMPEG: " + in.Text())
}
}()
frames := 0
timeFileBegin := time.Now()
sr := 0
spf := 0
framesToRead := 1
for {
sendBegin := time.Now()
var lbuf []byte
lbuf, err = mpeg.GetFramesStdin(f, framesToRead)
if framesToRead == 1 {
if len(lbuf) < 4 {
log.Println("Error reading data stream")
cleanUp(err)
break
}
sr = mpeg.GetSR(lbuf[0:4])
if sr == 0 {
log.Println("Erroneous MPEG sample rate from data stream")
cleanUp(err)
break
}
spf = mpeg.GetSPF(lbuf[0:4])
framesToRead = (sr / spf) + 1
mbuf, _ := mpeg.GetFramesStdin(f, framesToRead-1)
lbuf = append(lbuf, mbuf...)
}
if err != nil {
log.Println("Error reading data stream")
cleanUp(err)
break
}
if len(lbuf) <= 0 {
log.Println("STDIN from ffmpeg ended")
break
}
if totalFramesSent == 0 {
totalTimeBegin = time.Now()
//stdoutFramesSent = 0
}
if err := send(sock, lbuf); err != nil {
log.Println("Error sending data stream")
cleanUp(err)
break
}
totalFramesSent = totalFramesSent + uint64(framesToRead)
frames = frames + framesToRead
timeElapsed := int(float64((time.Now().Sub(totalTimeBegin)).Seconds()) * 1000)
timeSent := int(float64(totalFramesSent) * float64(spf) / float64(sr) * 1000)
timeFileElapsed := int(float64((time.Now().Sub(timeFileBegin)).Seconds()) * 1000)
bufferSent := 0
if timeSent > timeElapsed {
bufferSent = timeSent - timeElapsed
}
// if config.UpdateMetadata {
// cuesheet.Update(uint32(timeFileElapsed))
// }
// calculate the send lag
sendLag := int(float64((time.Now().Sub(sendBegin)).Seconds()) * 1000)
if timeElapsed > 1500 {
log.Println("Frames: " + strconv.Itoa(frames) + "/" + strconv.Itoa(int(totalFramesSent)) + " Time: " +
strconv.Itoa(int(timeElapsed/1000)) + "/" + strconv.Itoa(int(timeSent/1000)) + "s Buffer: " +
strconv.Itoa(int(bufferSent)) + "ms Frames/Bytes: " + strconv.Itoa(framesToRead) + "/" + strconv.Itoa(len(lbuf)))
}
// regulate sending rate
timePause := 0
if bufferSent < (config.BufferSize - 100) {
timePause = 900 - sendLag
} else {
if bufferSent > config.BufferSize {
timePause = 1100 - sendLag
} else {
timePause = 975 - sendLag
}
}
// if Abort {
// err := errors.New("Aborted by user")
// cleanUp(err)
// break
// }
time.Sleep(time.Duration(time.Millisecond) * time.Duration(timePause))
}
cmd.Wait()
return res
}
func streamBuffer(buffer <-chan []byte) error {
config := Config{}
config.ServerType = "Icecast"
config.StreamBitRate = 128000
config.StreamSamplerate = 48000
config.Mount = "bitume"
config.StreamName = "mumble"
config.StreamURL = "http://abbiamoundominio.org:8000"
config.StreamGenre = "acaro"
config.StreamDescription = "Biiiitume!"
config.BufferSize = 65536 // TODO: tweak this around
err := StreamFFMPEG(config)
if err != nil {
log.Println("StreamFFMPEG failed")
}
}