package stream import ( "bytes" "io" "sync" "git.kingecg.top/kingecg/goffmpeg/pkg/ffmpeg" ) // StreamReader provides a high-level interface for reading media streams type StreamReader interface { // Read implements io.Reader - reads transcoded data Read(p []byte) (n int, err error) // MediaInfo returns media stream information MediaInfo() (*MediaInfo, error) // Close releases all resources Close() error } // streamReader implements StreamReader type streamReader struct { opts *TranscodeOptions mediaInfo *MediaInfo // FFmpeg components inputCtx *ffmpeg.FormatContext outputCtx *ffmpeg.OutputFormatContext // Decoding/Encoding contexts decCtxs map[int]*ffmpeg.Context // stream index -> decoder context encCtxs map[int]*ffmpeg.Context // stream index -> encoder context // Internal buffer for output buffer *bytes.Buffer bufferSize int // State mu sync.Mutex closed bool eof bool // For io.Reader input pipeReader *pipeReader } // NewStreamReader creates a new StreamReader with the given options func NewStreamReader(opts TranscodeOptions) (StreamReader, error) { if err := opts.Validate(); err != nil { return nil, err } sr := &streamReader{ opts: &opts, decCtxs: make(map[int]*ffmpeg.Context), encCtxs: make(map[int]*ffmpeg.Context), buffer: new(bytes.Buffer), bufferSize: opts.BufferSize, } if sr.bufferSize <= 0 { sr.bufferSize = 4096 } // Handle io.Reader input via pipe if opts.InputIO != nil { pr, err := newPipeReader(opts.InputIO) if err != nil { return nil, err } sr.pipeReader = pr // TODO: Start background goroutine to feed data to FFmpeg via pipe // This requires implementing a custom IO context in FFmpeg } // Open input inputURL := opts.InputURL if inputURL == "" { // For io.Reader input, we need to handle it differently // This is a placeholder - actual implementation would need custom IO inputURL = "pipe:0" } sr.inputCtx = ffmpeg.AllocFormatContext() if err := sr.inputCtx.OpenInput(inputURL); err != nil { sr.inputCtx.Free() return nil, err } if err := sr.inputCtx.FindStreamInfo(); err != nil { sr.inputCtx.Free() return nil, err } // Build media info sr.mediaInfo, _ = NewMediaInfo(sr.inputCtx) return sr, nil } // Read implements io.Reader func (sr *streamReader) Read(p []byte) (n int, err error) { sr.mu.Lock() defer sr.mu.Unlock() if sr.closed { return 0, ErrStreamClosed } // If buffer has data, read from it if sr.buffer.Len() > 0 { return sr.buffer.Read(p) } // Check if we're done if sr.eof { return 0, io.EOF } // Transcode more data if err := sr.transcodeNext(); err != nil { if err == io.EOF { sr.eof = true return 0, io.EOF } return 0, err } // Try reading again from buffer if sr.buffer.Len() > 0 { return sr.buffer.Read(p) } return 0, nil } // transcodeNext reads, decodes, encodes, and muxes the next packet func (sr *streamReader) transcodeNext() error { pkt := ffmpeg.AllocPacket() defer pkt.Free() err := sr.inputCtx.ReadPacket(pkt) if err != nil { return err } streamIdx := pkt.StreamIndex() streams := sr.inputCtx.Streams() if streamIdx < 0 || streamIdx >= len(streams) { pkt.Unref() return nil } stream := streams[streamIdx] _ = stream.Type() // stream type for future transcode logic // For now, implement pass-through (stream copy) mode // Full transcode would require decoding, filtering, re-encoding // Write packet directly to output buffer (simplified) data := pkt.Data() if len(data) > 0 { sr.buffer.Write(data) } pkt.Unref() return nil } // MediaInfo returns the media information func (sr *streamReader) MediaInfo() (*MediaInfo, error) { sr.mu.Lock() defer sr.mu.Unlock() return sr.mediaInfo, nil } // Close releases all resources func (sr *streamReader) Close() error { sr.mu.Lock() defer sr.mu.Unlock() if sr.closed { return nil } sr.closed = true // Close pipe reader if used if sr.pipeReader != nil { sr.pipeReader.Close() } // Close decoder contexts for _, ctx := range sr.decCtxs { ctx.Close() ctx.Free() } // Close encoder contexts for _, ctx := range sr.encCtxs { ctx.Close() ctx.Free() } // Close input context if sr.inputCtx != nil { sr.inputCtx.Free() } // Close output context if sr.outputCtx != nil { sr.outputCtx.CloseOutput() sr.outputCtx.Free() } return nil } // RawFormatContext returns the underlying input FormatContext for advanced operations // This is kept for compatibility but returns nil in this implementation func (sr *streamReader) RawFormatContext() *ffmpeg.FormatContext { return sr.inputCtx }