226 lines
4.6 KiB
Go
226 lines
4.6 KiB
Go
package stream
|
|
|
|
import (
|
|
"bytes"
|
|
"io"
|
|
"sync"
|
|
|
|
"git.kingecg.top/kingecg/goffmpeg/pkg/ffmpeg"
|
|
)
|
|
|
|
// StreamReader provides a high-level interface for reading media streams
|
|
type StreamReader interface {
|
|
// Read implements io.Reader - reads transcoded data
|
|
Read(p []byte) (n int, err error)
|
|
|
|
// MediaInfo returns media stream information
|
|
MediaInfo() (*MediaInfo, error)
|
|
|
|
// Close releases all resources
|
|
Close() error
|
|
}
|
|
|
|
// streamReader implements StreamReader
|
|
type streamReader struct {
|
|
opts *TranscodeOptions
|
|
mediaInfo *MediaInfo
|
|
|
|
// FFmpeg components
|
|
inputCtx *ffmpeg.FormatContext
|
|
outputCtx *ffmpeg.OutputFormatContext
|
|
|
|
// Decoding/Encoding contexts
|
|
decCtxs map[int]*ffmpeg.Context // stream index -> decoder context
|
|
encCtxs map[int]*ffmpeg.Context // stream index -> encoder context
|
|
|
|
// Internal buffer for output
|
|
buffer *bytes.Buffer
|
|
bufferSize int
|
|
|
|
// State
|
|
mu sync.Mutex
|
|
closed bool
|
|
eof bool
|
|
|
|
// For io.Reader input
|
|
pipeReader *pipeReader
|
|
}
|
|
|
|
// NewStreamReader creates a new StreamReader with the given options
|
|
func NewStreamReader(opts TranscodeOptions) (StreamReader, error) {
|
|
if err := opts.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
sr := &streamReader{
|
|
opts: &opts,
|
|
decCtxs: make(map[int]*ffmpeg.Context),
|
|
encCtxs: make(map[int]*ffmpeg.Context),
|
|
buffer: new(bytes.Buffer),
|
|
bufferSize: opts.BufferSize,
|
|
}
|
|
|
|
if sr.bufferSize <= 0 {
|
|
sr.bufferSize = 4096
|
|
}
|
|
|
|
// Handle io.Reader input via pipe
|
|
if opts.InputIO != nil {
|
|
pr, err := newPipeReader(opts.InputIO)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
sr.pipeReader = pr
|
|
// TODO: Start background goroutine to feed data to FFmpeg via pipe
|
|
// This requires implementing a custom IO context in FFmpeg
|
|
}
|
|
|
|
// Open input
|
|
inputURL := opts.InputURL
|
|
if inputURL == "" {
|
|
// For io.Reader input, we need to handle it differently
|
|
// This is a placeholder - actual implementation would need custom IO
|
|
inputURL = "pipe:0"
|
|
}
|
|
|
|
sr.inputCtx = ffmpeg.AllocFormatContext()
|
|
if err := sr.inputCtx.OpenInput(inputURL); err != nil {
|
|
sr.inputCtx.Free()
|
|
return nil, err
|
|
}
|
|
|
|
if err := sr.inputCtx.FindStreamInfo(); err != nil {
|
|
sr.inputCtx.Free()
|
|
return nil, err
|
|
}
|
|
|
|
// Build media info
|
|
sr.mediaInfo, _ = NewMediaInfo(sr.inputCtx)
|
|
|
|
return sr, nil
|
|
}
|
|
|
|
// Read implements io.Reader
|
|
func (sr *streamReader) Read(p []byte) (n int, err error) {
|
|
sr.mu.Lock()
|
|
defer sr.mu.Unlock()
|
|
|
|
if sr.closed {
|
|
return 0, ErrStreamClosed
|
|
}
|
|
|
|
// If buffer has data, read from it
|
|
if sr.buffer.Len() > 0 {
|
|
return sr.buffer.Read(p)
|
|
}
|
|
|
|
// Check if we're done
|
|
if sr.eof {
|
|
return 0, io.EOF
|
|
}
|
|
|
|
// Transcode more data
|
|
if err := sr.transcodeNext(); err != nil {
|
|
if err == io.EOF {
|
|
sr.eof = true
|
|
return 0, io.EOF
|
|
}
|
|
return 0, err
|
|
}
|
|
|
|
// Try reading again from buffer
|
|
if sr.buffer.Len() > 0 {
|
|
return sr.buffer.Read(p)
|
|
}
|
|
|
|
return 0, nil
|
|
}
|
|
|
|
// transcodeNext reads, decodes, encodes, and muxes the next packet
|
|
func (sr *streamReader) transcodeNext() error {
|
|
pkt := ffmpeg.AllocPacket()
|
|
defer pkt.Free()
|
|
|
|
err := sr.inputCtx.ReadPacket(pkt)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
streamIdx := pkt.StreamIndex()
|
|
streams := sr.inputCtx.Streams()
|
|
if streamIdx < 0 || streamIdx >= len(streams) {
|
|
pkt.Unref()
|
|
return nil
|
|
}
|
|
|
|
stream := streams[streamIdx]
|
|
_ = stream.Type() // stream type for future transcode logic
|
|
|
|
// For now, implement pass-through (stream copy) mode
|
|
// Full transcode would require decoding, filtering, re-encoding
|
|
|
|
// Write packet directly to output buffer (simplified)
|
|
data := pkt.Data()
|
|
if len(data) > 0 {
|
|
sr.buffer.Write(data)
|
|
}
|
|
|
|
pkt.Unref()
|
|
return nil
|
|
}
|
|
|
|
// MediaInfo returns the media information
|
|
func (sr *streamReader) MediaInfo() (*MediaInfo, error) {
|
|
sr.mu.Lock()
|
|
defer sr.mu.Unlock()
|
|
return sr.mediaInfo, nil
|
|
}
|
|
|
|
// Close releases all resources
|
|
func (sr *streamReader) Close() error {
|
|
sr.mu.Lock()
|
|
defer sr.mu.Unlock()
|
|
|
|
if sr.closed {
|
|
return nil
|
|
}
|
|
|
|
sr.closed = true
|
|
|
|
// Close pipe reader if used
|
|
if sr.pipeReader != nil {
|
|
sr.pipeReader.Close()
|
|
}
|
|
|
|
// Close decoder contexts
|
|
for _, ctx := range sr.decCtxs {
|
|
ctx.Close()
|
|
ctx.Free()
|
|
}
|
|
|
|
// Close encoder contexts
|
|
for _, ctx := range sr.encCtxs {
|
|
ctx.Close()
|
|
ctx.Free()
|
|
}
|
|
|
|
// Close input context
|
|
if sr.inputCtx != nil {
|
|
sr.inputCtx.Free()
|
|
}
|
|
|
|
// Close output context
|
|
if sr.outputCtx != nil {
|
|
sr.outputCtx.CloseOutput()
|
|
sr.outputCtx.Free()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// RawFormatContext returns the underlying input FormatContext for advanced operations
|
|
// This is kept for compatibility but returns nil in this implementation
|
|
func (sr *streamReader) RawFormatContext() *ffmpeg.FormatContext {
|
|
return sr.inputCtx
|
|
}
|