* Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Description: blocing queue test.
*/
#include <vector>
#include "ut/common.h"
#include "datasystem/common/util/format.h"
#include "datasystem/common/util/queue/blocking_queue.h"
#include "datasystem/common/util/thread_pool.h"
#include "datasystem/common/util/random_data.h"
namespace datasystem {
namespace ut {
class BlockingQueueTest : public CommonTest {};
TEST_F(BlockingQueueTest, TestPushPop)
{
int sendNum = 10;
std::vector<int> nums(sendNum);
RandomData rand;
int gtSum = 0;
for (auto &num : nums) {
num = rand.GetRandomUint64(10, 1000);
gtSum += num;
}
ThreadPool pool(1);
ThreadPool pool2(1);
BlockingQueue<int> q;
ASSERT_EQ(q.Size(), size_t(0));
ASSERT_EQ(q.Empty(), true);
double waitTime = 0;
auto sumFut = pool.Submit([sendNum, &q, &waitTime]() {
double tmpTime = 0;
int sum = 0;
for (int i = 0; i < sendNum; i++) {
int num = 0;
q.Pop(num, tmpTime);
sum += num;
waitTime += tmpTime;
}
return sum;
});
auto fut = pool2.Submit([&nums, &q]() {
for (auto num : nums) {
q.Push(num);
}
});
auto sum = sumFut.get();
fut.get();
LOG(INFO) << FormatString("Wait time: [%.6lf]s, Sum: [%d]", waitTime, sum);
ASSERT_EQ(sum, gtSum);
fut = pool.Submit([&q]() {
int num;
q.Pop(num);
});
q.Push(1);
fut.get();
}
TEST_F(BlockingQueueTest, TestAbortQueue)
{
auto func = [] {
std::atomic<bool> start = { false };
BlockingQueue<int> queue;
std::thread t([&queue, &start] {
int v;
start = true;
(void)queue.Pop(v);
});
while (!start.load()) {
std::this_thread::yield();
}
queue.Abort();
t.join();
};
int testCount = 1000;
for (int i = 0; i < testCount; i++) {
func();
}
}
}
}