From 1221f5cc40d781270eb64c4259ea51944482d9f3 Mon Sep 17 00:00:00 2001 From: bretello Date: Sun, 8 Nov 2020 21:23:40 +0100 Subject: [PATCH] add stream.go draft --- broadcast/go.mod | 4 + broadcast/go.sum | 19 +++ broadcast/stream.go | 330 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 353 insertions(+) create mode 100644 broadcast/stream.go diff --git a/broadcast/go.mod b/broadcast/go.mod index 057b627..0f5eca4 100644 --- a/broadcast/go.mod +++ b/broadcast/go.mod @@ -3,7 +3,11 @@ module git.abbiamoundominio.org/unit/broadcast go 1.15 require ( + github.com/go-ini/ini v1.62.0 // indirect github.com/golang/protobuf v1.4.2 // indirect + github.com/smartystreets/goconvey v1.6.4 // indirect + github.com/stunndard/goicy v0.0.0-20180703001534-f76a17f16bb0 github.com/sunicy/go-lame v0.0.0-20200422031049-1c192eaafa39 + gopkg.in/ini.v1 v1.62.0 // indirect layeh.com/gumble v0.0.0-20200818122324-146f9205029b ) diff --git a/broadcast/go.sum b/broadcast/go.sum index c89c663..0a75a29 100644 --- a/broadcast/go.sum +++ b/broadcast/go.sum @@ -1,4 +1,6 @@ github.com/dchote/go-openal v0.0.0-20171116030048-f4a9a141d372/go.mod h1:74z+CYu2/mx4N+mcIS/rsvfAxBPBV9uv8zRAnwyFkdI= +github.com/go-ini/ini v1.62.0 h1:7VJT/ZXjzqSrvtraFp4ONq80hTcRQth1c9ZnQ3uNQvU= +github.com/go-ini/ini v1.62.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= @@ -12,8 +14,23 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= +github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= +github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= +github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= +github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= +github.com/stunndard/goicy v0.0.0-20180703001534-f76a17f16bb0 h1:cHajnSNE+njIb93GSwzr8Pf74FTcdolQOureoDq9wGQ= +github.com/stunndard/goicy v0.0.0-20180703001534-f76a17f16bb0/go.mod h1:zH/pbokcMUtJqlNgkuNQKuiDbt8U4MMef6cmI6TSzn4= github.com/sunicy/go-lame v0.0.0-20200422031049-1c192eaafa39 h1:P/6L4pZMkHutxyefALLAiXCPkcD+5NcvJRGayZmtBmY= github.com/sunicy/go-lame v0.0.0-20200422031049-1c192eaafa39/go.mod h1:H5mJP3sFKpUGaeckgSaMVXcTgnSgImhx54qyQXbpTVY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= @@ -23,6 +40,8 @@ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miE google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +gopkg.in/ini.v1 v1.62.0 h1:duBzk771uxoUuOlyRLkHsygud9+5lrlGjdFBb4mSKDU= +gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= layeh.com/gopus v0.0.0-20161224163843-0ebf989153aa h1:WNU4LYsgD2UHxgKgB36mL6iMAMOvr127alafSlgBbiA= layeh.com/gopus v0.0.0-20161224163843-0ebf989153aa/go.mod h1:AOef7vHz0+v4sWwJnr0jSyHiX/1NgsMoaxl+rEPz/I0= layeh.com/gumble v0.0.0-20200818122324-146f9205029b h1:Kne6wkHqbqrygRsqs5XUNhSs84DFG5TYMeCkCbM56sY= diff --git a/broadcast/stream.go b/broadcast/stream.go new file mode 100644 index 0000000..9b1076d --- /dev/null +++ b/broadcast/stream.go @@ -0,0 +1,330 @@ +package main + +import ( + "bufio" + "encoding/base64" + "errors" + "log" + "net" + "os/exec" + "strconv" + "time" + + "github.com/stunndard/goicy/mpeg" +) + +var Connected bool = false +var csock net.Conn + +type Config struct { + ServerType string + StreamBitRate int + StreamSamplerate int + StreamChannels int + + Mount string + Password string + StreamName string + StreamURL string + StreamGenre string + StreamDescription string + + BufferSize int +} + +func Connect(host string, port int) (net.Conn, error) { + h := host + ":" + strconv.Itoa(int(port)) + sock, err := net.Dial("tcp", h) + if err != nil { + Connected = false + } + return sock, err +} + +func send(sock net.Conn, buf []byte) error { + n, err := sock.Write(buf) + if err != nil { + Connected = false + return err + } + if n != len(buf) { + Connected = false + return errors.New("send() error") + } + return nil +} + +func recv(sock net.Conn) ([]byte, error) { + var buf []byte = make([]byte, 1024) + + n, err := sock.Read(buf) + //fmt.Println(n, err, string(buf), len(buf)) + if err != nil { + log.Fatal(err.Error()) + return nil, err + } + return buf[0:n], err +} + +func close(sock net.Conn) { + Connected = false + sock.Close() +} + +func ConnectServer(config Config, host string, port int) (net.Conn, error) { + var sock net.Conn + + if Connected { + return csock, nil + } + + log.Printf("Connecting to %s at %s:%d...", config.ServerType, host, strconv.Itoa(port)) + + sock, err := Connect(host, port) + + if err != nil { + Connected = false + return sock, err + } + + //fmt.Println("connected ok") + time.Sleep(time.Second) + + headers := "" + bitrate := 0 + samplerate := 0 + channels := 0 + + bitrate = config.StreamBitRate / 1000 + samplerate = config.StreamSamplerate + channels = config.StreamChannels + + contenttype := "audio/mpeg" // change for other stream formats + headers = "SOURCE /" + config.Mount + " HTTP/1.0\r\n" + + "Content-Type: " + contenttype + "\r\n" + + "Authorization: Basic " + base64.StdEncoding.EncodeToString([]byte("source:"+config.Password)) + "\r\n" + + "User-Agent: broadcast/" + "\r\n" + + "ice-name: " + config.StreamName + "\r\n" + + "ice-public: 0\r\n" + + "ice-url: " + config.StreamURL + "\r\n" + + "ice-genre: " + config.StreamGenre + "\r\n" + + "ice-description: " + config.StreamDescription + "\r\n" + + "ice-audio-info: bitrate=" + strconv.Itoa(bitrate) + + ";channels=" + strconv.Itoa(channels) + + ";samplerate=" + strconv.Itoa(samplerate) + "\r\n" + + "\r\n" + + err = send(sock, []byte(headers)) + if err != nil { + log.Fatal("Error sending headers") + Connected = false + return sock, err + } + + time.Sleep(time.Second) + resp, err := recv(sock) + if err != nil { + Connected = false + return sock, err + } + if string(resp[9:12]) != "200" { + Connected = false + return sock, errors.New("Invalid Icecast response: " + string(resp)) + } + + log.Println("Server connect successful") + Connected = true + csock = sock + + return sock, nil +} + +func StreamFFMPEG(config Config, filename string) error { + var ( + sock net.Conn + res error + cmd *exec.Cmd + totalFramesSent uint64 + ) + + cleanUp := func(err error) { + log.Println("Killing ffmpeg..") + cmd.Process.Kill() + close(sock) + totalFramesSent = 0 + res = err + } + + var err error + sock, err = ConnectServer("abbiamoundominio.org", 8000, 128000, 48, ch) + if err != nil { + log.Println("Cannot connect to server") + return err + } + + cmdArgs := []string{} + // if config.StreamReencode { + // cmdArgs = []string{ + // "-i", filename, + // "-c:a", "libmp3lame", + // "-b:a", strconv.Itoa(config.Cfg.StreamBitrate), + // "-cutoff", "20000", + // "-ar", strconv.Itoa(config.Cfg.StreamSamplerate), + // "-ac", strconv.Itoa(config.Cfg.StreamChannels), + // "-f", "mp3", + // "-write_xing", "0", + // "-id3v2_version", "0", + // "-loglevel", "fatal", + // "-", + // } + // } else { + cmdArgs = []string{ // We don't want to re-encode everything + "-i", filename, + "-c:a", "copy", + "-f", "mp3", + "-write_xing", "0", + "-id3v2_version", "0", + "-loglevel", "fatal", + "-", + } + + log.Println("Starting ffmpeg: " + config.FFMPEGPath) + log.Println("Format : source, no reencoding") + + cmd = exec.Command(config.FFMPEGPath, cmdArgs...) + + f, _ := cmd.StdoutPipe() + stderr, _ := cmd.StderrPipe() + + if err := cmd.Start(); err != nil { + log.Println("Error starting ffmpeg") + log.Println(err.Error()) + return err + } + + // log stderr output from ffmpeg + go func() { + in := bufio.NewScanner(stderr) + for in.Scan() { + log.Println("FFMPEG: " + in.Text()) + } + }() + + frames := 0 + timeFileBegin := time.Now() + + sr := 0 + spf := 0 + framesToRead := 1 + + for { + sendBegin := time.Now() + + var lbuf []byte + lbuf, err = mpeg.GetFramesStdin(f, framesToRead) + if framesToRead == 1 { + if len(lbuf) < 4 { + log.Println("Error reading data stream") + cleanUp(err) + break + } + sr = mpeg.GetSR(lbuf[0:4]) + if sr == 0 { + log.Println("Erroneous MPEG sample rate from data stream") + cleanUp(err) + break + } + spf = mpeg.GetSPF(lbuf[0:4]) + framesToRead = (sr / spf) + 1 + mbuf, _ := mpeg.GetFramesStdin(f, framesToRead-1) + lbuf = append(lbuf, mbuf...) + } + + if err != nil { + log.Println("Error reading data stream") + cleanUp(err) + break + } + + if len(lbuf) <= 0 { + log.Println("STDIN from ffmpeg ended") + break + } + + if totalFramesSent == 0 { + totalTimeBegin = time.Now() + //stdoutFramesSent = 0 + } + + if err := send(sock, lbuf); err != nil { + log.Println("Error sending data stream") + cleanUp(err) + break + } + + totalFramesSent = totalFramesSent + uint64(framesToRead) + frames = frames + framesToRead + + timeElapsed := int(float64((time.Now().Sub(totalTimeBegin)).Seconds()) * 1000) + timeSent := int(float64(totalFramesSent) * float64(spf) / float64(sr) * 1000) + timeFileElapsed := int(float64((time.Now().Sub(timeFileBegin)).Seconds()) * 1000) + + bufferSent := 0 + if timeSent > timeElapsed { + bufferSent = timeSent - timeElapsed + } + + // if config.UpdateMetadata { + // cuesheet.Update(uint32(timeFileElapsed)) + // } + + // calculate the send lag + sendLag := int(float64((time.Now().Sub(sendBegin)).Seconds()) * 1000) + + if timeElapsed > 1500 { + log.Println("Frames: " + strconv.Itoa(frames) + "/" + strconv.Itoa(int(totalFramesSent)) + " Time: " + + strconv.Itoa(int(timeElapsed/1000)) + "/" + strconv.Itoa(int(timeSent/1000)) + "s Buffer: " + + strconv.Itoa(int(bufferSent)) + "ms Frames/Bytes: " + strconv.Itoa(framesToRead) + "/" + strconv.Itoa(len(lbuf))) + } + + // regulate sending rate + timePause := 0 + if bufferSent < (config.BufferSize - 100) { + timePause = 900 - sendLag + } else { + if bufferSent > config.BufferSize { + timePause = 1100 - sendLag + } else { + timePause = 975 - sendLag + } + } + + // if Abort { + // err := errors.New("Aborted by user") + // cleanUp(err) + // break + // } + + time.Sleep(time.Duration(time.Millisecond) * time.Duration(timePause)) + } + cmd.Wait() + return res +} + +func streamBuffer(buffer <-chan []byte) error { + config := Config{} + config.ServerType = "Icecast" + config.StreamBitRate = 128000 + config.StreamSamplerate = 48000 + config.Mount = "bitume" + config.StreamName = "mumble" + config.StreamURL = "http://abbiamoundominio.org:8000" + config.StreamGenre = "acaro" + config.StreamDescription = "Biiiitume!" + config.BufferSize = 65536 // TODO: tweak this around + + err := StreamFFMPEG(config) + if err != nil { + log.Println("StreamFFMPEG failed") + } +}