#ifndef MOJO_CORE_NODE_CONTROLLER_H_
#define MOJO_CORE_NODE_CONTROLLER_H_
#include <map>
#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "base/containers/queue.h"
#include "base/containers/span.h"
#include "base/functional/callback.h"
#include "base/memory/scoped_refptr.h"
#include "base/memory/writable_shared_memory_region.h"
#include "base/process/process.h"
#include "base/task/single_thread_task_runner.h"
#include "build/build_config.h"
#include "mojo/core/atomic_flag.h"
#include "mojo/core/node_channel.h"
#include "mojo/core/ports/event.h"
#include "mojo/core/ports/name.h"
#include "mojo/core/ports/node.h"
#include "mojo/core/ports/node_delegate.h"
#include "mojo/core/system_impl_export.h"
#include "mojo/public/cpp/platform/platform_handle.h"
namespace mojo {
namespace core {
class Broker;
class Core;
class BoundedPeerSet {
public:
BoundedPeerSet();
BoundedPeerSet(const BoundedPeerSet&) = delete;
BoundedPeerSet& operator=(const BoundedPeerSet&) = delete;
~BoundedPeerSet();
void Insert(const ports::NodeName& name);
bool Contains(const ports::NodeName& name);
private:
static constexpr int kHalfSize = 50000;
std::unordered_set<ports::NodeName> old_set_;
std::unordered_set<ports::NodeName> new_set_;
};
class MOJO_SYSTEM_IMPL_EXPORT NodeController : public ports::NodeDelegate,
public NodeChannel::Delegate {
public:
class PortObserver : public ports::UserData {
public:
virtual void OnPortStatusChanged() = 0;
protected:
~PortObserver() override = default;
};
NodeController();
NodeController(const NodeController&) = delete;
NodeController& operator=(const NodeController&) = delete;
~NodeController() override;
const ports::NodeName& name() const { return name_; }
ports::Node* node() const { return node_.get(); }
scoped_refptr<base::SingleThreadTaskRunner> io_task_runner() const {
return io_task_runner_;
}
void SetIOTaskRunner(
scoped_refptr<base::SingleThreadTaskRunner> io_task_runner);
void SendBrokerClientInvitation(
base::Process target_process,
ConnectionParams connection_params,
const std::vector<std::pair<std::string, ports::PortRef>>& attached_ports,
const ProcessErrorCallback& process_error_callback);
void AcceptBrokerClientInvitation(ConnectionParams connection_params);
void ConnectIsolated(ConnectionParams connection_params,
const ports::PortRef& port,
std::string_view connection_name);
void SetPortObserver(const ports::PortRef& port,
scoped_refptr<PortObserver> observer);
void ClosePort(const ports::PortRef& port);
int SendUserMessage(const ports::PortRef& port_ref,
std::unique_ptr<ports::UserMessageEvent> message);
void MergePortIntoInviter(const std::string& name,
const ports::PortRef& port);
int MergeLocalPorts(const ports::PortRef& port0, const ports::PortRef& port1);
base::WritableSharedMemoryRegion CreateSharedBuffer(size_t num_bytes);
void RequestShutdown(base::OnceClosure callback);
void NotifyBadMessageFrom(const ports::NodeName& source_node,
const std::string& error);
bool HasBadMessageHandler(const ports::NodeName& source_node);
void ForceDisconnectProcessForTesting(base::ProcessId process_id);
static void DeserializeRawBytesAsEventForFuzzer(
base::span<const unsigned char> data);
static void DeserializeMessageAsEventForFuzzer(Channel::MessagePtr message);
scoped_refptr<NodeChannel> GetBrokerChannel();
private:
friend Core;
using NodeMap =
std::unordered_map<ports::NodeName, scoped_refptr<NodeChannel>>;
using OutgoingMessageQueue = base::queue<Channel::MessagePtr>;
using PortMap = std::map<std::string, ports::PortRef>;
struct IsolatedConnection {
IsolatedConnection();
IsolatedConnection(const IsolatedConnection& other);
IsolatedConnection(IsolatedConnection&& other);
IsolatedConnection(scoped_refptr<NodeChannel> channel,
const ports::PortRef& local_port,
std::string_view name);
~IsolatedConnection();
IsolatedConnection& operator=(const IsolatedConnection& other);
IsolatedConnection& operator=(IsolatedConnection&& other);
scoped_refptr<NodeChannel> channel;
ports::PortRef local_port;
std::string name;
};
void SendBrokerClientInvitationOnIOThread(
base::Process target_process,
ConnectionParams connection_params,
ports::NodeName temporary_node_name,
const ProcessErrorCallback& process_error_callback);
void FinishSendBrokerClientInvitationOnIOThread(
base::Process target_process,
ConnectionParams connection_params,
ports::NodeName temporary_node_name,
Channel::HandlePolicy handle_policy,
const ProcessErrorCallback& process_error_callback);
void AcceptBrokerClientInvitationOnIOThread(
ConnectionParams connection_params,
std::optional<PlatformHandle> broker_host_handle);
void ConnectIsolatedOnIOThread(ConnectionParams connection_params,
ports::PortRef port,
const std::string& connection_name);
scoped_refptr<NodeChannel> GetPeerChannel(const ports::NodeName& name);
scoped_refptr<NodeChannel> GetInviterChannel();
void AddPeer(const ports::NodeName& name,
scoped_refptr<NodeChannel> channel,
bool start_channel,
bool allow_name_reuse = false);
void DropPeer(const ports::NodeName& name, NodeChannel* channel);
void SendPeerEvent(const ports::NodeName& name, ports::ScopedEvent event);
void DropAllPeers();
void ForwardEvent(const ports::NodeName& node,
ports::ScopedEvent event) override;
void BroadcastEvent(ports::ScopedEvent event) override;
void PortStatusChanged(const ports::PortRef& port) override;
void OnAcceptInvitee(const ports::NodeName& from_node,
const ports::NodeName& inviter_name,
const ports::NodeName& token) override;
void OnAcceptInvitation(const ports::NodeName& from_node,
const ports::NodeName& token,
const ports::NodeName& invitee_name) override;
void OnAddBrokerClient(const ports::NodeName& from_node,
const ports::NodeName& client_name,
base::ProcessHandle process_handle) override;
void OnBrokerClientAdded(const ports::NodeName& from_node,
const ports::NodeName& client_name,
PlatformHandle broker_channel) override;
void OnAcceptBrokerClient(const ports::NodeName& from_node,
const ports::NodeName& broker_name,
PlatformHandle broker_channel,
const uint64_t broker_capabilities) override;
void OnEventMessage(const ports::NodeName& from_node,
Channel::MessagePtr message) override;
void OnRequestPortMerge(const ports::NodeName& from_node,
const ports::PortName& connector_port_name,
const std::string& token) override;
void OnRequestIntroduction(const ports::NodeName& from_node,
const ports::NodeName& name) override;
void OnIntroduce(const ports::NodeName& from_node,
const ports::NodeName& name,
PlatformHandle channel_handle,
const uint64_t remote_capailities) override;
void OnBroadcast(const ports::NodeName& from_node,
Channel::MessagePtr message) override;
#if BUILDFLAG(IS_WIN)
void OnRelayEventMessage(const ports::NodeName& from_node,
base::ProcessHandle from_process,
const ports::NodeName& destination,
Channel::MessagePtr message) override;
void OnEventMessageFromRelay(const ports::NodeName& from_node,
const ports::NodeName& source_node,
Channel::MessagePtr message) override;
#endif
void OnAcceptPeer(const ports::NodeName& from_node,
const ports::NodeName& token,
const ports::NodeName& peer_name,
const ports::PortName& port_name) override;
void OnChannelError(const ports::NodeName& from_node,
NodeChannel* channel) override;
void CancelPendingPortMerges();
void DestroyOnIOThreadShutdown();
void AttemptShutdownIfRequested();
void ForceDisconnectProcessForTestingOnIOThread(base::ProcessId process_id);
void RecordPendingPortMerge(const ports::PortRef& port);
const ports::NodeName name_;
const std::unique_ptr<ports::Node> node_;
scoped_refptr<base::SingleThreadTaskRunner> io_task_runner_;
base::Lock peers_lock_;
NodeMap peers_;
BoundedPeerSet dropped_peers_;
std::unordered_map<ports::NodeName, OutgoingMessageQueue>
pending_peer_messages_;
base::Lock reserved_ports_lock_;
std::map<ports::NodeName, PortMap> reserved_ports_;
base::Lock pending_port_merges_lock_;
std::vector<std::pair<std::string, ports::PortRef>> pending_port_merges_;
bool reject_pending_merges_ = false;
base::Lock inviter_lock_;
ports::NodeName inviter_name_;
scoped_refptr<NodeChannel> bootstrap_inviter_channel_;
base::Lock broker_lock_;
ports::NodeName broker_name_;
base::queue<ports::NodeName> pending_broker_clients_;
std::unordered_map<ports::NodeName, OutgoingMessageQueue>
pending_relay_messages_;
base::Lock shutdown_lock_;
base::OnceClosure shutdown_callback_;
AtomicFlag shutdown_callback_flag_;
NodeMap pending_invitations_;
std::map<ports::NodeName, IsolatedConnection> pending_isolated_connections_;
std::map<std::string, ports::NodeName> named_isolated_connections_;
bool destroy_on_io_thread_shutdown_ = false;
#if !BUILDFLAG(IS_APPLE) && !BUILDFLAG(IS_FUCHSIA)
std::unique_ptr<Broker> broker_;
#endif
};
}
}
#endif