#ifndef CHROME_ENTERPRISE_COMPANION_TELEMETRY_LOGGER_TELEMETRY_LOGGER_H_
#define CHROME_ENTERPRISE_COMPANION_TELEMETRY_LOGGER_TELEMETRY_LOGGER_H_
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <vector>
#include "base/check.h"
#include "base/containers/span.h"
#include "base/functional/callback.h"
#include "base/functional/callback_helpers.h"
#include "base/logging.h"
#include "base/memory/ref_counted.h"
#include "base/memory/ref_counted_delete_on_sequence.h"
#include "base/memory/scoped_refptr.h"
#include "base/sequence_checker.h"
#include "base/task/bind_post_task.h"
#include "base/task/sequenced_task_runner.h"
#include "base/task/task_traits.h"
#include "base/task/thread_pool.h"
#include "base/time/time.h"
#include "base/timer/timer.h"
#include "chrome/enterprise_companion/telemetry_logger/proto/log_request.pb.h"
namespace enterprise_companion::telemetry_logger {
template <typename T>
class TelemetryLogger
: public base::RefCountedDeleteOnSequence<TelemetryLogger<T>> {
public:
class Delegate {
public:
virtual void StoreNextAllowedAttemptTime(base::Time time,
base::OnceClosure callback) = 0;
virtual void DoPostRequest(
const std::string& request_body,
base::OnceCallback<void(std::optional<int> http_status,
std::optional<std::string> response_body)>
callback) = 0;
virtual int GetLogIdentifier() const = 0;
virtual std::string AggregateAndSerializeEvents(
base::span<T> events) const = 0;
virtual base::TimeDelta MinimumCooldownTime() const = 0;
virtual ~Delegate() = default;
};
TelemetryLogger(scoped_refptr<base::SequencedTaskRunner> task_runner,
std::unique_ptr<Delegate> delegate,
bool auto_flush)
: base::RefCountedDeleteOnSequence<TelemetryLogger<T>>(task_runner),
delegate_(std::move(delegate)),
auto_flush_(auto_flush) {
DETACH_FROM_SEQUENCE(sequence_checker_);
}
TelemetryLogger(const TelemetryLogger&) = delete;
TelemetryLogger& operator=(const TelemetryLogger&) = delete;
static scoped_refptr<TelemetryLogger> Create(
std::unique_ptr<Delegate> delegate,
std::optional<base::Time> first_allowed_attempt_time,
bool auto_flush) {
auto logger = base::MakeRefCounted<TelemetryLogger<T>>(
base::ThreadPool::CreateSequencedTaskRunner(
{base::MayBlock(), base::WithBaseSyncPrimitives()}),
std::move(delegate), auto_flush);
logger->owning_task_runner()->PostTask(
FROM_HERE, base::BindOnce(&TelemetryLogger::SetInitialCooldownIfExists,
logger, first_allowed_attempt_time));
return logger;
}
void Log(const T& event) {
VLOG(2) << __func__;
owning_task_runner()->PostTask(
FROM_HERE, base::BindOnce(&TelemetryLogger<T>::DoLog,
base::WrapRefCounted(this), event));
}
void Flush(base::OnceClosure callback) {
VLOG(2) << __func__;
owning_task_runner()->PostTask(
FROM_HERE,
base::BindOnce(
&TelemetryLogger::Transmit, base::WrapRefCounted(this),
base::BindPostTaskToCurrentDefault(std::move(callback))));
}
void CancelCooldownTimer() {
owning_task_runner()->PostTask(
FROM_HERE, base::BindOnce(
[](scoped_refptr<TelemetryLogger<T>> logger) {
logger->cooldown_timer_.Stop();
},
base::WrapRefCounted(this)));
}
private:
friend class base::RefCountedDeleteOnSequence<TelemetryLogger<T>>;
friend class base::DeleteHelper<TelemetryLogger<T>>;
virtual ~TelemetryLogger() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
VLOG(1) << __func__;
}
base::SequencedTaskRunner* owning_task_runner() {
return base::RefCountedDeleteOnSequence<
TelemetryLogger<T>>::owning_task_runner();
}
void DoLog(const T& event) {
VLOG(2) << __func__;
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
events_.push_back(event);
}
std::string BuildRequestString(base::span<T> events) const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
int64_t now_ms = base::Time::Now().InMillisecondsSinceUnixEpoch();
proto::LogRequest request;
request.set_request_time_ms(now_ms);
request.mutable_client_info()->set_client_type(
telemetry_logger::proto::
ClientInfo_ClientType_CHROME_ENTERPRISE_COMPANION);
request.set_log_source(delegate_->GetLogIdentifier());
telemetry_logger::proto::LogEvent* log_event = request.add_log_event();
log_event->set_event_time_ms(now_ms);
log_event->set_source_extension(
delegate_->AggregateAndSerializeEvents(events));
return request.SerializeAsString();
}
void Transmit(base::OnceClosure callback) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (is_transmitting_) {
VLOG(2) << "Transmit skipped when there's an active one.";
std::move(callback).Run();
return;
}
if (cooldown_timer_.IsRunning()) {
VLOG(2) << "Transmit skipped in cool down period.";
std::move(callback).Run();
return;
}
if (events_.empty() && upload_queue_.empty()) {
VLOG(2) << "No events to transmit.";
std::move(callback).Run();
return;
}
if (!events_.empty()) {
upload_queue_.insert(upload_queue_.end(),
std::make_move_iterator(events_.begin()),
std::make_move_iterator(events_.end()));
events_.clear();
}
VLOG(2) << "Transmitting " << upload_queue_.size() << " events at "
<< base::Time::Now();
is_transmitting_ = true;
delegate_->DoPostRequest(
BuildRequestString(upload_queue_),
base::BindOnce(&TelemetryLogger::OnResponseReceived,
base::WrapRefCounted(this))
.Then(std::move(callback)));
}
bool ShouldDeleteUploadQueue(std::optional<int> http_status) const {
if (!http_status) {
return false;
}
return (*http_status >= 200 && *http_status < 300) ||
(*http_status >= 400 && *http_status < 500);
}
void OnResponseReceived(std::optional<int> http_status,
std::optional<std::string> response_body) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
VLOG(1) << __func__ << ": status=" << http_status.value_or(0);
is_transmitting_ = false;
if (ShouldDeleteUploadQueue(http_status)) {
VLOG(2) << "Clearing the upload queue.";
upload_queue_.clear();
}
telemetry_logger::proto::LogResponse response;
if (!response_body || !response.ParseFromString(*response_body)) {
VLOG(1) << "Failed to parse log response proto, response body: "
<< response_body.value_or("");
SetCooldown(delegate_->MinimumCooldownTime());
return;
}
base::TimeDelta cooldown_time =
std::max(base::Milliseconds(response.next_request_wait_millis()),
delegate_->MinimumCooldownTime());
VLOG(1) << "Cooldown time received from server: "
<< response.next_request_wait_millis() << " ms";
SetCooldown(cooldown_time);
delegate_->StoreNextAllowedAttemptTime(base::Time::Now() + cooldown_time,
base::DoNothing());
}
void SetCooldown(base::TimeDelta cooldown_time) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
CHECK(!cooldown_timer_.IsRunning());
if (cooldown_time.is_negative()) {
return;
}
VLOG(2) << "Set cool down time: " << cooldown_time.InMilliseconds()
<< "ms, " << base::Time::Now() + cooldown_time;
cooldown_timer_.Start(
FROM_HERE, cooldown_time,
auto_flush_
? base::BindOnce(&TelemetryLogger::Transmit,
base::WrapRefCounted(this), base::DoNothing())
: base::DoNothing());
}
void SetInitialCooldownIfExists(
std::optional<base::Time> first_allowed_attempt_time) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
CHECK(!cooldown_timer_.IsRunning());
if (!first_allowed_attempt_time) {
return;
}
VLOG(2) << __func__
<< ": next allowed attempt time: " << *first_allowed_attempt_time;
SetCooldown(*first_allowed_attempt_time - base::Time::Now());
}
bool is_transmitting_ = false;
std::unique_ptr<Delegate> delegate_;
const bool auto_flush_;
std::vector<T> events_;
std::vector<T> upload_queue_;
base::OneShotTimer cooldown_timer_;
SEQUENCE_CHECKER(sequence_checker_);
};
}
#endif