* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <arrow/io/api.h>
#include "shuffle/PartitionWriter.h"
#include "shuffle/ShuffleWriter.h"
#include "utils/Macros.h"
namespace gluten {
class LocalPartitionWriter : public PartitionWriter {
public:
explicit LocalPartitionWriter(
uint32_t numPartitions,
PartitionWriterOptions options,
arrow::MemoryPool* pool,
const std::string& dataFile,
const std::vector<std::string>& localDirs);
arrow::Status hashEvict(
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
Evict::type evictType,
bool reuseBuffers,
bool hasComplexType) override;
arrow::Status sortEvict(
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
std::shared_ptr<arrow::Buffer> compressed,
bool isFinal) override;
arrow::Status evict(uint32_t partitionId, std::unique_ptr<BlockPayload> blockPayload, bool stop) override;
arrow::Status stop(ShuffleWriterMetrics* metrics) override;
arrow::Status reclaimFixedSize(int64_t size, int64_t* actual) override;
class LocalSpiller;
class PayloadMerger;
class PayloadCache;
private:
void init();
arrow::Status requestSpill(bool isFinal);
arrow::Status finishSpill(bool close);
std::string nextSpilledFileDir();
arrow::Result<std::shared_ptr<arrow::io::OutputStream>> openFile(const std::string& file);
arrow::Status mergeSpills(uint32_t partitionId);
arrow::Status clearResource();
arrow::Status populateMetrics(ShuffleWriterMetrics* metrics);
std::string dataFile_;
std::vector<std::string> localDirs_;
bool stopped_{false};
bool useSpillFileAsDataFile_{false};
std::shared_ptr<LocalSpiller> spiller_{nullptr};
std::shared_ptr<PayloadMerger> merger_{nullptr};
std::shared_ptr<PayloadCache> payloadCache_{nullptr};
std::list<std::shared_ptr<Spill>> spills_{};
int32_t dirSelection_{0};
std::vector<int32_t> subDirSelection_;
std::shared_ptr<arrow::io::OutputStream> dataFileOs_;
int64_t totalBytesToEvict_{0};
int64_t totalBytesEvicted_{0};
std::vector<int64_t> partitionLengths_;
std::vector<int64_t> rawPartitionLengths_;
int32_t lastEvictPid_{-1};
};
}