goffmpeg/pkg/ffmpeg/stream/reader.go

226 lines
4.6 KiB
Go

package stream
import (
"bytes"
"io"
"sync"
"git.kingecg.top/kingecg/goffmpeg/pkg/ffmpeg"
)
// StreamReader provides a high-level interface for reading media streams
type StreamReader interface {
// Read implements io.Reader - reads transcoded data
Read(p []byte) (n int, err error)
// MediaInfo returns media stream information
MediaInfo() (*MediaInfo, error)
// Close releases all resources
Close() error
}
// streamReader implements StreamReader
type streamReader struct {
opts *TranscodeOptions
mediaInfo *MediaInfo
// FFmpeg components
inputCtx *ffmpeg.FormatContext
outputCtx *ffmpeg.OutputFormatContext
// Decoding/Encoding contexts
decCtxs map[int]*ffmpeg.Context // stream index -> decoder context
encCtxs map[int]*ffmpeg.Context // stream index -> encoder context
// Internal buffer for output
buffer *bytes.Buffer
bufferSize int
// State
mu sync.Mutex
closed bool
eof bool
// For io.Reader input
pipeReader *pipeReader
}
// NewStreamReader creates a new StreamReader with the given options
func NewStreamReader(opts TranscodeOptions) (StreamReader, error) {
if err := opts.Validate(); err != nil {
return nil, err
}
sr := &streamReader{
opts: &opts,
decCtxs: make(map[int]*ffmpeg.Context),
encCtxs: make(map[int]*ffmpeg.Context),
buffer: new(bytes.Buffer),
bufferSize: opts.BufferSize,
}
if sr.bufferSize <= 0 {
sr.bufferSize = 4096
}
// Handle io.Reader input via pipe
if opts.InputIO != nil {
pr, err := newPipeReader(opts.InputIO)
if err != nil {
return nil, err
}
sr.pipeReader = pr
// TODO: Start background goroutine to feed data to FFmpeg via pipe
// This requires implementing a custom IO context in FFmpeg
}
// Open input
inputURL := opts.InputURL
if inputURL == "" {
// For io.Reader input, we need to handle it differently
// This is a placeholder - actual implementation would need custom IO
inputURL = "pipe:0"
}
sr.inputCtx = ffmpeg.AllocFormatContext()
if err := sr.inputCtx.OpenInput(inputURL); err != nil {
sr.inputCtx.Free()
return nil, err
}
if err := sr.inputCtx.FindStreamInfo(); err != nil {
sr.inputCtx.Free()
return nil, err
}
// Build media info
sr.mediaInfo, _ = NewMediaInfo(sr.inputCtx)
return sr, nil
}
// Read implements io.Reader
func (sr *streamReader) Read(p []byte) (n int, err error) {
sr.mu.Lock()
defer sr.mu.Unlock()
if sr.closed {
return 0, ErrStreamClosed
}
// If buffer has data, read from it
if sr.buffer.Len() > 0 {
return sr.buffer.Read(p)
}
// Check if we're done
if sr.eof {
return 0, io.EOF
}
// Transcode more data
if err := sr.transcodeNext(); err != nil {
if err == io.EOF {
sr.eof = true
return 0, io.EOF
}
return 0, err
}
// Try reading again from buffer
if sr.buffer.Len() > 0 {
return sr.buffer.Read(p)
}
return 0, nil
}
// transcodeNext reads, decodes, encodes, and muxes the next packet
func (sr *streamReader) transcodeNext() error {
pkt := ffmpeg.AllocPacket()
defer pkt.Free()
err := sr.inputCtx.ReadPacket(pkt)
if err != nil {
return err
}
streamIdx := pkt.StreamIndex()
streams := sr.inputCtx.Streams()
if streamIdx < 0 || streamIdx >= len(streams) {
pkt.Unref()
return nil
}
stream := streams[streamIdx]
_ = stream.Type() // stream type for future transcode logic
// For now, implement pass-through (stream copy) mode
// Full transcode would require decoding, filtering, re-encoding
// Write packet directly to output buffer (simplified)
data := pkt.Data()
if len(data) > 0 {
sr.buffer.Write(data)
}
pkt.Unref()
return nil
}
// MediaInfo returns the media information
func (sr *streamReader) MediaInfo() (*MediaInfo, error) {
sr.mu.Lock()
defer sr.mu.Unlock()
return sr.mediaInfo, nil
}
// Close releases all resources
func (sr *streamReader) Close() error {
sr.mu.Lock()
defer sr.mu.Unlock()
if sr.closed {
return nil
}
sr.closed = true
// Close pipe reader if used
if sr.pipeReader != nil {
sr.pipeReader.Close()
}
// Close decoder contexts
for _, ctx := range sr.decCtxs {
ctx.Close()
ctx.Free()
}
// Close encoder contexts
for _, ctx := range sr.encCtxs {
ctx.Close()
ctx.Free()
}
// Close input context
if sr.inputCtx != nil {
sr.inputCtx.Free()
}
// Close output context
if sr.outputCtx != nil {
sr.outputCtx.CloseOutput()
sr.outputCtx.Free()
}
return nil
}
// RawFormatContext returns the underlying input FormatContext for advanced operations
// This is kept for compatibility but returns nil in this implementation
func (sr *streamReader) RawFormatContext() *ffmpeg.FormatContext {
return sr.inputCtx
}