* Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Description: The interface of file cache worker descriptor.
*/
#ifndef DATASYSTEM_COMMON_STREAM_CACHE_CONSUMER_META_H
#define DATASYSTEM_COMMON_STREAM_CACHE_CONSUMER_META_H
#include "datasystem/common/util/format.h"
#include "datasystem/common/util/net_util.h"
#include "datasystem/protos/worker_stream.pb.h"
#include "datasystem/stream/stream_config.h"
namespace datasystem {
inline SubscriptionType ToSubscriptionType(SubscriptionTypePb typePb)
{
switch (typePb) {
case SubscriptionTypePb::STREAM_PB:
return SubscriptionType::STREAM;
case SubscriptionTypePb::KEY_PARTITIONS_PB:
return SubscriptionType::KEY_PARTITIONS;
case SubscriptionTypePb::ROUND_ROBIN_PB:
return SubscriptionType::ROUND_ROBIN;
default:
return SubscriptionType::UNKNOWN;
}
}
inline SubscriptionTypePb ToSubscriptionTypePb(SubscriptionType type)
{
switch (type) {
case SubscriptionType::STREAM:
return SubscriptionTypePb::STREAM_PB;
case SubscriptionType::KEY_PARTITIONS:
return SubscriptionTypePb::KEY_PARTITIONS_PB;
case SubscriptionType::ROUND_ROBIN:
return SubscriptionTypePb::ROUND_ROBIN_PB;
default:
return SubscriptionTypePb::SubscriptionTypePb_INT_MIN_SENTINEL_DO_NOT_USE_;
}
}
* @brief Consumer metadata.
* @details Consisting of stream name, consumer id, worker address, subscription configuration and consumer last acked
* cursor. This class is used in both worker and master components.
*/
class ConsumerMeta {
public:
ConsumerMeta(std::string streamName, std::string consumerId, HostPort workerAddress,
SubscriptionConfig subConfig, uint64_t lastAckCursor)
: streamName_(std::move(streamName)),
consumerId_(std::move(consumerId)),
workerAddress_(std::move(workerAddress)),
subConfig_(std::move(subConfig)),
lastAckCursor_(lastAckCursor)
{
}
std::string ToString() const
{
std::string typeName = (subConfig_.subscriptionType == SubscriptionType::STREAM) ? "stream" : "queue";
return FormatString("Stream: <%s>, Worker:<%s>, Subscription:<%s>, mode:<%s>, Consumer:<%s>", streamName_,
workerAddress_.ToString(), subConfig_.subscriptionName, typeName, consumerId_);
}
std::string StreamName() const
{
return streamName_;
}
HostPort WorkerAddress() const
{
return workerAddress_;
}
std::string ConsumerId() const
{
return consumerId_;
}
SubscriptionConfig SubConfig() const
{
return subConfig_;
}
uint64_t LastAckCursor() const
{
return lastAckCursor_;
}
inline bool operator<(const ConsumerMeta &other) const
{
return this->consumerId_ < other.consumerId_;
}
ConsumerMetaPb SerializeToPb() const
{
ConsumerMetaPb consumerMetaPb;
consumerMetaPb.set_stream_name(streamName_);
consumerMetaPb.mutable_worker_address()->set_host(workerAddress_.Host());
consumerMetaPb.mutable_worker_address()->set_port(workerAddress_.Port());
consumerMetaPb.set_consumer_id(consumerId_);
consumerMetaPb.mutable_sub_config()->set_subscription_name(subConfig_.subscriptionName);
consumerMetaPb.mutable_sub_config()->set_subscription_type(ToSubscriptionTypePb(subConfig_.subscriptionType));
consumerMetaPb.set_last_ack_cursor(lastAckCursor_);
return consumerMetaPb;
}
void ParseFromPb(const ConsumerMetaPb &metaPb)
{
streamName_ = metaPb.stream_name();
workerAddress_ = HostPort(metaPb.worker_address().host(), metaPb.worker_address().port());
consumerId_ = metaPb.consumer_id();
subConfig_ = SubscriptionConfig(metaPb.sub_config().subscription_name(),
ToSubscriptionType(metaPb.sub_config().subscription_type()));
lastAckCursor_ = metaPb.last_ack_cursor();
}
private:
std::string streamName_;
std::string consumerId_;
HostPort workerAddress_;
SubscriptionConfig subConfig_;
uint64_t lastAckCursor_ = 0;
};
}
#endif