#ifndef MEDIA_REMOTING_STREAM_PROVIDER_H_
#define MEDIA_REMOTING_STREAM_PROVIDER_H_
#include "base/containers/circular_deque.h"
#include "base/functional/callback_forward.h"
#include "base/memory/raw_ptr.h"
#include "base/memory/scoped_refptr.h"
#include "base/memory/weak_ptr.h"
#include "base/task/sequenced_task_runner.h"
#include "base/task/sequenced_task_runner_helpers.h"
#include "base/task/single_thread_task_runner.h"
#include "media/base/audio_decoder_config.h"
#include "media/base/demuxer.h"
#include "media/base/demuxer_stream.h"
#include "media/base/video_decoder_config.h"
#include "media/mojo/mojom/remoting.mojom.h"
#include "mojo/public/cpp/bindings/receiver.h"
#include "third_party/openscreen/src/cast/streaming/remoting.pb.h"
namespace base {
class SingleThreadTaskRunner;
}
namespace openscreen {
namespace cast {
class RpcMessenger;
}
}
namespace media {
class MojoDecoderBufferReader;
namespace remoting {
class ReceiverController;
class StreamProvider final : public Demuxer {
public:
StreamProvider(
ReceiverController* receiver_controller,
const scoped_refptr<base::SequencedTaskRunner>& media_task_runner);
std::vector<DemuxerStream*> GetAllStreams() override;
std::string GetDisplayName() const override;
DemuxerType GetDemuxerType() const override;
void Initialize(DemuxerHost* host, PipelineStatusCallback status_cb) override;
void AbortPendingReads() override;
void StartWaitingForSeek(base::TimeDelta seek_time) override;
void CancelPendingSeek(base::TimeDelta seek_time) override;
void Seek(base::TimeDelta time, PipelineStatusCallback status_cb) override;
bool IsSeekable() const override;
void Stop() override;
base::TimeDelta GetStartTime() const override;
base::Time GetTimelineOffset() const override;
int64_t GetMemoryUsage() const override;
std::optional<container_names::MediaContainerName> GetContainerForMetrics()
const override;
void OnTracksChanged(DemuxerStream::Type track_type,
std::optional<MediaTrack::Id> track_id,
base::TimeDelta curr_time,
TrackChangeCB change_completed_cb) override;
void SetPlaybackRate(double rate) override {}
protected:
~StreamProvider() override;
private:
class MediaStream final : public DemuxerStream,
public mojom::RemotingDataStreamReceiver {
public:
using UniquePtr =
std::unique_ptr<MediaStream, std::function<void(MediaStream*)>>;
static void CreateOnMainThread(
openscreen::cast::RpcMessenger* rpc_messenger,
Type type,
int32_t handle,
const scoped_refptr<base::SequencedTaskRunner>& media_task_runner,
base::OnceCallback<void(MediaStream::UniquePtr)> callback);
static void DestructionHelper(MediaStream* stream);
MediaStream(
openscreen::cast::RpcMessenger* rpc_messenger,
Type type,
int32_t remote_handle,
const scoped_refptr<base::SequencedTaskRunner>& media_task_runner);
void Read(uint32_t count, ReadCB read_cb) override;
AudioDecoderConfig audio_decoder_config() override;
VideoDecoderConfig video_decoder_config() override;
DemuxerStream::Type type() const override;
StreamLiveness liveness() const override;
bool SupportsConfigChanges() override;
void Initialize(base::OnceClosure init_done_cb);
mojo::PendingRemote<mojom::RemotingDataStreamReceiver>
BindNewPipeAndPassRemote() {
return receiver_.BindNewPipeAndPassRemote();
}
private:
friend class base::DeleteHelper<MediaStream>;
friend class StreamProviderTest;
~MediaStream() override;
void Destroy();
void SendRpcMessageOnMainThread(
std::unique_ptr<openscreen::cast::RpcMessage> message);
void InitializeDataPipe(
mojo::ScopedDataPipeConsumerHandle data_pipe) override;
void ReceiveFrame(uint32_t count, mojom::DecoderBufferPtr buffer) override;
void FlushUntil(uint32_t count) override;
void OnReceivedRpc(std::unique_ptr<openscreen::cast::RpcMessage> message);
void OnInitializeCallback(
std::unique_ptr<openscreen::cast::RpcMessage> message);
void OnReadUntilCallback(
std::unique_ptr<openscreen::cast::RpcMessage> message);
void SendReadUntil();
void CompleteInitialize();
void AppendBuffer(uint32_t count, scoped_refptr<DecoderBuffer> buffer);
void CompleteRead(DemuxerStream::Status status);
void UpdateAudioConfig(
const openscreen::cast::AudioDecoderConfig& audio_message);
void UpdateVideoConfig(
const openscreen::cast::VideoDecoderConfig& video_message);
void OnError(const std::string& error);
scoped_refptr<base::SingleThreadTaskRunner> main_task_runner_;
scoped_refptr<base::SequencedTaskRunner> media_task_runner_;
const raw_ptr<openscreen::cast::RpcMessenger> rpc_messenger_;
const Type type_;
const int remote_handle_;
const int rpc_handle_;
base::OnceClosure init_done_callback_;
uint32_t current_frame_count_ = 0;
uint32_t buffered_frame_count_ = 0;
uint32_t total_received_frame_count_ = 0;
bool read_until_sent_ = false;
bool rpc_initialized_ = false;
ReadCB read_complete_callback_;
std::unique_ptr<MojoDecoderBufferReader> decoder_buffer_reader_;
base::circular_deque<scoped_refptr<DecoderBuffer>> buffers_;
AudioDecoderConfig audio_decoder_config_;
VideoDecoderConfig video_decoder_config_;
AudioDecoderConfig next_audio_decoder_config_;
VideoDecoderConfig next_video_decoder_config_;
mojo::Receiver<mojom::RemotingDataStreamReceiver> receiver_{this};
base::WeakPtr<MediaStream> media_weak_this_;
base::WeakPtrFactory<MediaStream> media_weak_factory_{this};
};
friend std::default_delete<StreamProvider>;
friend class base::DeleteHelper<StreamProvider>;
friend class StreamProviderTest;
void Destroy();
void OnReceivedRpc(std::unique_ptr<openscreen::cast::RpcMessage> message);
void OnAcquireDemuxer(std::unique_ptr<openscreen::cast::RpcMessage> message);
void InitializeDataPipe();
void OnAudioStreamCreated(MediaStream::UniquePtr stream);
void OnVideoStreamCreated(MediaStream::UniquePtr stream);
void OnAudioStreamInitialized();
void OnVideoStreamInitialized();
void CompleteInitialize();
scoped_refptr<base::SingleThreadTaskRunner> main_task_runner_;
scoped_refptr<base::SequencedTaskRunner> media_task_runner_;
const raw_ptr<ReceiverController> receiver_controller_;
const raw_ptr<openscreen::cast::RpcMessenger> rpc_messenger_;
MediaStream::UniquePtr audio_stream_;
MediaStream::UniquePtr video_stream_;
bool has_audio_{false};
bool has_video_{false};
bool audio_stream_initialized_{false};
bool video_stream_initialized_{false};
PipelineStatusCallback init_done_callback_;
base::WeakPtr<StreamProvider> media_weak_this_;
base::WeakPtrFactory<StreamProvider> media_weak_factory_{this};
};
}
}
namespace std {
template <>
struct default_delete<media::remoting::StreamProvider> {
constexpr default_delete() = default;
template <typename U>
requires(std::is_convertible_v<U*, media::remoting::StreamProvider*>)
explicit default_delete(const default_delete<U>& d) {}
void operator()(media::remoting::StreamProvider* ptr) const;
};
}
#endif