910e62b5创建于 1月15日历史提交
// Copyright 2022 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "components/commerce/core/subscriptions/subscriptions_manager.h"

#include <queue>
#include <string>

#include "base/metrics/histogram_functions.h"
#include "base/task/sequenced_task_runner.h"
#include "base/time/time.h"
#include "components/commerce/core/commerce_feature_list.h"
#include "components/commerce/core/feature_utils.h"
#include "components/commerce/core/subscriptions/commerce_subscription.h"
#include "components/commerce/core/subscriptions/subscriptions_observer.h"
#include "components/commerce/core/subscriptions/subscriptions_server_proxy.h"
#include "components/commerce/core/subscriptions/subscriptions_storage.h"
#include "components/session_proto_db/session_proto_storage.h"
#include "services/network/public/cpp/shared_url_loader_factory.h"

namespace commerce {

namespace {
const int kDefaultTimeoutMs = 10000;
const char kTimeoutParam[] = "subscriptions_request_timeout";
constexpr base::FeatureParam<int> kTimeoutMs{&commerce::kShoppingList,
                                             kTimeoutParam, kDefaultTimeoutMs};
}  // namespace

const char kTrackResultHistogramName[] = "Commerce.Subscriptions.TrackResult";
const char kUntrackResultHistogramName[] =
    "Commerce.Subscriptions.UntrackResult";

SubscriptionsManager::SubscriptionsManager(
    signin::IdentityManager* identity_manager,
    scoped_refptr<network::SharedURLLoaderFactory> url_loader_factory,
    SessionProtoStorage<
        commerce_subscription_db::CommerceSubscriptionContentProto>*
        subscription_proto_db,
    AccountChecker* account_checker,
    signin::ConsentLevel consent_level)
    : SubscriptionsManager(
          identity_manager,
          std::make_unique<SubscriptionsServerProxy>(
              identity_manager,
              std::move(url_loader_factory),
              consent_level),
          std::make_unique<SubscriptionsStorage>(subscription_proto_db),
          account_checker) {}

SubscriptionsManager::SubscriptionsManager(
    signin::IdentityManager* identity_manager,
    std::unique_ptr<SubscriptionsServerProxy> server_proxy,
    std::unique_ptr<SubscriptionsStorage> storage,
    AccountChecker* account_checker)
    : server_proxy_(std::move(server_proxy)),
      storage_(std::move(storage)),
      account_checker_(account_checker),
      observers_(base::ObserverListPolicy::EXISTING_ONLY) {
  SyncSubscriptions();
  scoped_identity_manager_observation_.Observe(identity_manager);
}

SubscriptionsManager::SubscriptionsManager() = default;

SubscriptionsManager::~SubscriptionsManager() = default;

SubscriptionsManager::Request::Request(AsyncOperation operation,
                                       base::OnceCallback<void()> callback)
    : operation(operation), callback(std::move(callback)) {}
SubscriptionsManager::Request::Request(Request&&) = default;
SubscriptionsManager::Request::~Request() = default;

void SubscriptionsManager::Subscribe(
    std::unique_ptr<std::vector<CommerceSubscription>> subscriptions,
    base::OnceCallback<void(bool)> callback) {
  if (!IsSubscriptionsApiEnabled(account_checker_)) {
    base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
        FROM_HERE, base::BindOnce(std::move(callback), false));
    return;
  }

  CHECK(subscriptions->size() > 0);

  SyncIfNeeded();

  pending_requests_.emplace(
      AsyncOperation::kSubscribe,
      base::BindOnce(&SubscriptionsManager::HandleSubscribe,
                     weak_ptr_factory_.GetWeakPtr(), std::move(subscriptions),
                     std::move(callback)));
  CheckAndProcessRequest();
}

void SubscriptionsManager::Unsubscribe(
    std::unique_ptr<std::vector<CommerceSubscription>> subscriptions,
    base::OnceCallback<void(bool)> callback) {
  if (!IsSubscriptionsApiEnabled(account_checker_)) {
    base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
        FROM_HERE, base::BindOnce(std::move(callback), false));
    return;
  }

  CHECK(subscriptions->size() > 0);

  SyncIfNeeded();

  pending_requests_.emplace(
      AsyncOperation::kUnsubscribe,
      base::BindOnce(&SubscriptionsManager::HandleUnsubscribe,
                     weak_ptr_factory_.GetWeakPtr(), std::move(subscriptions),
                     std::move(callback)));
  CheckAndProcessRequest();
}

void SubscriptionsManager::SyncSubscriptions() {
  if (!IsSubscriptionsApiEnabled(account_checker_)) {
    return;
  }

  pending_requests_.emplace(AsyncOperation::kSync,
                            base::BindOnce(&SubscriptionsManager::HandleSync,
                                           weak_ptr_factory_.GetWeakPtr()));
  CheckAndProcessRequest();
}

void SubscriptionsManager::IsSubscribed(
    CommerceSubscription subscription,
    base::OnceCallback<void(bool)> callback) {
  if (!IsSubscriptionsApiEnabled(account_checker_)) {
    base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
        FROM_HERE, base::BindOnce(std::move(callback), false));
    return;
  }

  SyncIfNeeded();

  pending_requests_.emplace(
      AsyncOperation::kLookupOne,
      base::BindOnce(&SubscriptionsManager::HandleLookup,
                     weak_ptr_factory_.GetWeakPtr(), std::move(subscription),
                     std::move(callback)));
  CheckAndProcessRequest();
}

bool SubscriptionsManager::IsSubscribedFromCache(
    const CommerceSubscription& subscription) {
  return storage_->IsSubscribedFromCache(subscription);
}

void SubscriptionsManager::GetAllSubscriptions(
    SubscriptionType type,
    base::OnceCallback<void(std::vector<CommerceSubscription>)> callback) {
  if (!IsSubscriptionsApiEnabled(account_checker_)) {
    base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
        FROM_HERE, base::BindOnce(std::move(callback),
                                  std::vector<CommerceSubscription>()));
    return;
  }

  SyncIfNeeded();

  pending_requests_.emplace(AsyncOperation::kGetAll,
                            base::BindOnce(&SubscriptionsManager::HandleGetAll,
                                           weak_ptr_factory_.GetWeakPtr(), type,
                                           std::move(callback)));
  CheckAndProcessRequest();
}

void SubscriptionsManager::CheckTimestampOnBookmarkChange(
    int64_t bookmark_subscription_change_time) {
  pending_requests_.emplace(
      AsyncOperation::kCheckOnBookmarkChange,
      base::BindOnce(
          &SubscriptionsManager::HandleCheckTimestampOnBookmarkChange,
          weak_ptr_factory_.GetWeakPtr(), bookmark_subscription_change_time));
  CheckAndProcessRequest();
}

void SubscriptionsManager::CheckAndProcessRequest() {
  if (HasRequestRunning() || pending_requests_.empty())
    return;

  // If there is no request running, we can start processing next request in the
  // queue.
  has_request_running_ = true;
  last_request_started_time_ = base::Time::Now();
  Request request = std::move(pending_requests_.front());
  pending_requests_.pop();
  last_request_operation_ = request.operation;
  std::move(request.callback).Run();
}

void SubscriptionsManager::SyncIfNeeded() {
  if (!last_sync_succeeded_ && !HasRequestRunning()) {
    SyncSubscriptions();
  }
}

void SubscriptionsManager::OnRequestCompletion() {
  has_request_running_ = false;
  CheckAndProcessRequest();
}

void SubscriptionsManager::UpdateSyncStates(bool sync_succeeded) {
  last_sync_succeeded_ = sync_succeeded;
  if (sync_succeeded) {
    // Always use the Windows epoch to keep consistency with the timestamp in
    // bookmark.
    last_sync_time_ =
        base::Time::Now().ToDeltaSinceWindowsEpoch().InMicroseconds();
  }
}

void SubscriptionsManager::HandleSync() {
  if (account_checker_ && account_checker_->IsSignedIn() &&
      account_checker_->IsAnonymizedUrlDataCollectionEnabled()) {
    GetRemoteSubscriptionsAndUpdateStorage(
        SubscriptionType::kPriceTrack,
        base::BindOnce(&SubscriptionsManager::OnSyncStatusFetched,
                       weak_ptr_factory_.GetWeakPtr()));
  }
}

void SubscriptionsManager::OnSyncStatusFetched(
    SubscriptionsRequestStatus result) {
  UpdateSyncStates(result == SubscriptionsRequestStatus::kSuccess);
  OnRequestCompletion();
}

void SubscriptionsManager::HandleSubscribe(
    std::unique_ptr<std::vector<CommerceSubscription>> subscriptions,
    base::OnceCallback<void(bool)> callback) {
  SubscriptionType type = (*subscriptions)[0].type;
  // Make a copy of subscriptions to notify observers later.
  std::vector<CommerceSubscription> notified_subscriptions = *subscriptions;

  SubscriptionsRequestCallback wrapped_callback =
      base::BindOnce(&SubscriptionsManager::OnSubscribeStatusFetched,
                     weak_ptr_factory_.GetWeakPtr(),
                     std::move(notified_subscriptions), std::move(callback));

  if (!last_sync_succeeded_) {
    std::move(wrapped_callback)
        .Run(SubscriptionsRequestStatus::kLastSyncFailed);
    return;
  }
  storage_->GetUniqueNonExistingSubscriptions(
      std::move(subscriptions),
      base::BindOnce(
          &SubscriptionsManager::OnIncomingSubscriptionsFilteredForSubscribe,
          weak_ptr_factory_.GetWeakPtr(), type, std::move(wrapped_callback)));
}

void SubscriptionsManager::OnSubscribeStatusFetched(
    std::vector<CommerceSubscription> notified_subscriptions,
    base::OnceCallback<void(bool)> callback,
    SubscriptionsRequestStatus result) {
  base::UmaHistogramEnumeration(kTrackResultHistogramName, result);
  bool succeeded = result == SubscriptionsRequestStatus::kSuccess ||
                   result == SubscriptionsRequestStatus::kNoOp;
  OnSubscribe(notified_subscriptions, succeeded);
  std::move(callback).Run(succeeded);
  // We sync local cache with server only when the product is successfully added
  // on server. The sync states should be updated after notifying all observers
  // and running the callback so |last_sync_time_| is larger than external
  // timestamp (e.g. in bookmarks).
  if (result == SubscriptionsRequestStatus::kSuccess) {
    UpdateSyncStates(true);
  }
  OnRequestCompletion();
}

void SubscriptionsManager::OnIncomingSubscriptionsFilteredForSubscribe(
    SubscriptionType type,
    SubscriptionsRequestCallback callback,
    std::unique_ptr<std::vector<CommerceSubscription>> unique_subscriptions) {
  if (unique_subscriptions->size() == 0) {
    base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
        FROM_HERE,
        base::BindOnce(std::move(callback), SubscriptionsRequestStatus::kNoOp));
    return;
  }
  server_proxy_->Create(
      std::move(unique_subscriptions),
      base::BindOnce(&SubscriptionsManager::HandleManageSubscriptionsResponse,
                     weak_ptr_factory_.GetWeakPtr(), type,
                     std::move(callback)));
}

void SubscriptionsManager::HandleUnsubscribe(
    std::unique_ptr<std::vector<CommerceSubscription>> subscriptions,
    base::OnceCallback<void(bool)> callback) {
  SubscriptionType type = (*subscriptions)[0].type;
  // Make a copy of subscriptions to notify observers later.
  std::vector<CommerceSubscription> notified_subscriptions = *subscriptions;

  SubscriptionsRequestCallback wrapped_callback =
      base::BindOnce(&SubscriptionsManager::OnUnsubscribeStatusFetched,
                     weak_ptr_factory_.GetWeakPtr(),
                     std::move(notified_subscriptions), std::move(callback));

  if (!last_sync_succeeded_) {
    std::move(wrapped_callback)
        .Run(SubscriptionsRequestStatus::kLastSyncFailed);
    return;
  }
  storage_->GetUniqueExistingSubscriptions(
      std::move(subscriptions),
      base::BindOnce(
          &SubscriptionsManager::OnIncomingSubscriptionsFilteredForUnsubscribe,
          weak_ptr_factory_.GetWeakPtr(), type, std::move(wrapped_callback)));
}

void SubscriptionsManager::OnUnsubscribeStatusFetched(
    std::vector<CommerceSubscription> notified_subscriptions,
    base::OnceCallback<void(bool)> callback,
    SubscriptionsRequestStatus result) {
  base::UmaHistogramEnumeration(kUntrackResultHistogramName, result);
  bool succeeded = result == SubscriptionsRequestStatus::kSuccess ||
                   result == SubscriptionsRequestStatus::kNoOp;
  OnUnsubscribe(notified_subscriptions, succeeded);
  std::move(callback).Run(succeeded);
  // We sync local cache with server only when the product is successfully
  // removed on server. The sync states should be updated after notifying all
  // observers and running the callback so |last_sync_time_| is larger than
  // external timestamp (e.g. in bookmarks).
  if (result == SubscriptionsRequestStatus::kSuccess) {
    UpdateSyncStates(true);
  }
  OnRequestCompletion();
}

void SubscriptionsManager::OnIncomingSubscriptionsFilteredForUnsubscribe(
    SubscriptionType type,
    SubscriptionsRequestCallback callback,
    std::unique_ptr<std::vector<CommerceSubscription>> unique_subscriptions) {
  if (unique_subscriptions->size() == 0) {
    base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
        FROM_HERE,
        base::BindOnce(std::move(callback), SubscriptionsRequestStatus::kNoOp));
    return;
  }
  server_proxy_->Delete(
      std::move(unique_subscriptions),
      base::BindOnce(&SubscriptionsManager::HandleManageSubscriptionsResponse,
                     weak_ptr_factory_.GetWeakPtr(), type,
                     std::move(callback)));
}

void SubscriptionsManager::GetRemoteSubscriptionsAndUpdateStorage(
    SubscriptionType type,
    SubscriptionsRequestCallback callback) {
  server_proxy_->Get(
      type, base::BindOnce(
                &SubscriptionsManager::HandleGetSubscriptionsResponse,
                weak_ptr_factory_.GetWeakPtr(), type, std::move(callback)));
}

void SubscriptionsManager::HandleGetSubscriptionsResponse(
    SubscriptionType type,
    SubscriptionsRequestCallback callback,
    SubscriptionsRequestStatus status,
    std::unique_ptr<std::vector<CommerceSubscription>> remote_subscriptions) {
  if (status != SubscriptionsRequestStatus::kSuccess) {
    std::move(callback).Run(status);
  } else {
    storage_->UpdateStorage(type, std::move(callback),
                            std::move(remote_subscriptions));
  }
}

void SubscriptionsManager::HandleManageSubscriptionsResponse(
    SubscriptionType type,
    SubscriptionsRequestCallback callback,
    SubscriptionsRequestStatus status,
    std::unique_ptr<std::vector<CommerceSubscription>> remote_subscriptions) {
  if (status != SubscriptionsRequestStatus::kSuccess) {
    VLOG(1) << "Fail to create or delete subscriptions on server";
    std::move(callback).Run(status);
  } else {
    storage_->UpdateStorage(type, std::move(callback),
                            std::move(remote_subscriptions));
  }
}

void SubscriptionsManager::HandleLookup(
    CommerceSubscription subscription,
    base::OnceCallback<void(bool)> callback) {
  storage_->IsSubscribed(
      std::move(subscription),
      base::BindOnce(&SubscriptionsManager::OnLookupResult,
                     weak_ptr_factory_.GetWeakPtr(), std::move(callback)));
}

void SubscriptionsManager::OnLookupResult(
    base::OnceCallback<void(bool)> callback,
    bool is_subscribed) {
  std::move(callback).Run(is_subscribed);
  OnRequestCompletion();
}

void SubscriptionsManager::HandleGetAll(
    SubscriptionType type,
    base::OnceCallback<void(std::vector<CommerceSubscription>)> callback) {
  storage_->LoadAllSubscriptionsForType(
      type,
      base::BindOnce(&SubscriptionsManager::OnGetAllResult,
                     weak_ptr_factory_.GetWeakPtr(), std::move(callback)));
}

void SubscriptionsManager::OnGetAllResult(
    base::OnceCallback<void(std::vector<CommerceSubscription>)> callback,
    std::unique_ptr<std::vector<CommerceSubscription>> subscriptions) {
  std::move(callback).Run(std::move(*subscriptions));
  OnRequestCompletion();
}

void SubscriptionsManager::HandleCheckTimestampOnBookmarkChange(
    int64_t bookmark_subscription_change_time) {
  // Do nothing if current local cache is newer than the bookmark change.
  if (bookmark_subscription_change_time <= last_sync_time_) {
    OnRequestCompletion();
    return;
  }

  server_proxy_->Get(
      SubscriptionType::kPriceTrack,
      base::BindOnce(
          &SubscriptionsManager::HandleGetSubscriptionsResponseOnBookmarkChange,
          weak_ptr_factory_.GetWeakPtr()));
}

void SubscriptionsManager::HandleGetSubscriptionsResponseOnBookmarkChange(
    SubscriptionsRequestStatus status,
    std::unique_ptr<std::vector<CommerceSubscription>> remote_subscriptions) {
  if (status != SubscriptionsRequestStatus::kSuccess) {
    UpdateSyncStates(false);
    OnRequestCompletion();
    return;
  }

  storage_->UpdateStorageAndNotifyModifiedSubscriptions(
      SubscriptionType::kPriceTrack,
      base::BindOnce(&SubscriptionsManager::OnStorageUpdatedOnBookmarkChange,
                     weak_ptr_factory_.GetWeakPtr()),
      std::move(remote_subscriptions));
}

void SubscriptionsManager::OnStorageUpdatedOnBookmarkChange(
    SubscriptionsRequestStatus status,
    std::vector<CommerceSubscription> added_subs,
    std::vector<CommerceSubscription> removed_subs) {
  if (status == SubscriptionsRequestStatus::kSuccess) {
    if (added_subs.size() > 0) {
      OnSubscribe(added_subs, true);
    }
    if (removed_subs.size() > 0) {
      OnUnsubscribe(removed_subs, true);
    }
  }
  UpdateSyncStates(status == SubscriptionsRequestStatus::kSuccess);
  OnRequestCompletion();
}

void SubscriptionsManager::OnSubscribe(
    const std::vector<CommerceSubscription>& subscriptions,
    bool succeeded) {
  for (SubscriptionsObserver& observer : observers_) {
    for (auto& sub : subscriptions) {
      observer.OnSubscribe(sub, succeeded);
    }
  }
}

void SubscriptionsManager::OnUnsubscribe(
    const std::vector<CommerceSubscription>& subscriptions,
    bool succeeded) {
  for (SubscriptionsObserver& observer : observers_) {
    for (auto& sub : subscriptions) {
      observer.OnUnsubscribe(sub, succeeded);
    }
  }
}

void SubscriptionsManager::OnPrimaryAccountChanged(
    const signin::PrimaryAccountChangeEvent& event_details) {
  storage_->DeleteAll();
  SyncSubscriptions();
}

bool SubscriptionsManager::HasRequestRunning() {
  // Reset has_request_running_ to false if the last request is stuck somewhere.
  // TODO(crbug.com/40241090): We should still be able to get the callback when
  // the request times out. Also we should make the callback cancelable itself
  // rather than having to wait for the next request coming.
  if (has_request_running_ &&
      (base::Time::Now() - last_request_started_time_).InMilliseconds() >
          kTimeoutMs.Get()) {
    has_request_running_ = false;
    if (last_request_operation_ == AsyncOperation::kSubscribe) {
      base::UmaHistogramEnumeration(kTrackResultHistogramName,
                                    SubscriptionsRequestStatus::kLost);
    } else if (last_request_operation_ == AsyncOperation::kUnsubscribe) {
      base::UmaHistogramEnumeration(kUntrackResultHistogramName,
                                    SubscriptionsRequestStatus::kLost);
    }
  }
  return has_request_running_;
}

void SubscriptionsManager::AddObserver(SubscriptionsObserver* observer) {
  observers_.AddObserver(observer);
}

void SubscriptionsManager::RemoveObserver(SubscriptionsObserver* observer) {
  observers_.RemoveObserver(observer);
}

bool SubscriptionsManager::GetLastSyncSucceededForTesting() {
  return last_sync_succeeded_;
}

int64_t SubscriptionsManager::GetLastSyncTimeForTesting() {
  return last_sync_time_;
}

void SubscriptionsManager::SetHasRequestRunningForTesting(
    bool has_request_running) {
  has_request_running_ = has_request_running;
}

bool SubscriptionsManager::HasPendingRequestsForTesting() {
  return !pending_requests_.empty();
}

void SubscriptionsManager::SetLastRequestStartedTimeForTesting(
    base::Time time) {
  last_request_started_time_ = time;
}

}  // namespace commerce