/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <algorithm>
#include <memory>
#include <string>
#include <vector>

#include "velox/common/time/CpuWallTimer.h"
#include "velox/serializers/PrestoSerializer.h"
#include "velox/type/Type.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/FlatVector.h"
#include "velox/vector/VectorStream.h"

#include <arrow/array/util.h>
#include <arrow/ipc/writer.h>
#include <arrow/memory_pool.h>
#include <arrow/record_batch.h>
#include <arrow/result.h>
#include <arrow/type.h>

#include "memory/VeloxMemoryManager.h"
#include "shuffle/Options.h"
#include "shuffle/PartitionWriter.h"
#include "shuffle/Partitioner.h"
#include "shuffle/ShuffleWriter.h"
#include "shuffle/Utils.h"

#include "utils/Print.h"

namespace gluten {

class VeloxShuffleWriter : public ShuffleWriter {
 public:
  static arrow::Result<std::shared_ptr<VeloxShuffleWriter>> create(
      ShuffleWriterType type,
      uint32_t numPartitions,
      std::unique_ptr<PartitionWriter> partitionWriter,
      ShuffleWriterOptions options,
      std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
      arrow::MemoryPool* arrowPool);

  facebook::velox::RowVectorPtr getStrippedRowVector(const facebook::velox::RowVector& rv) {
    // get new row type
    auto& rowType = rv.type()->asRow();
    auto typeChildren = rowType.children();
    typeChildren.erase(typeChildren.begin());
    auto newRowType = facebook::velox::ROW(std::move(typeChildren));

    // get length
    auto length = rv.size();

    // get children
    auto children = rv.children();
    children.erase(children.begin());

    return std::make_shared<facebook::velox::RowVector>(
        rv.pool(), newRowType, facebook::velox::BufferPtr(nullptr), length, std::move(children));
  }

  const int32_t* getFirstColumn(const facebook::velox::RowVector& rv) {
    VELOX_CHECK(rv.childrenSize() > 0, "RowVector missing partition id column.");

    auto& firstChild = rv.childAt(0);
    VELOX_CHECK(firstChild->isFlatEncoding(), "Partition id (field 0) is not flat encoding.");
    VELOX_CHECK(
        firstChild->type()->isInteger(),
        "Partition id (field 0) should be integer, but got {}",
        firstChild->type()->toString());

    // first column is partition key hash value or pid
    return firstChild->asFlatVector<int32_t>()->rawValues();
  }

  // For test only.
  virtual void setPartitionBufferSize(uint32_t newSize) {}

  virtual arrow::Status evictPartitionBuffers(uint32_t partitionId, bool reuseBuffers) {
    return arrow::Status::OK();
  }

  virtual arrow::Status evictRowVector(uint32_t partitionId) {
    return arrow::Status::OK();
  }

  virtual const uint64_t cachedPayloadSize() const {
    return 0;
  }

  int32_t maxBatchSize() const {
    return maxBatchSize_;
  }

  int64_t partitionBufferSize() const {
    return partitionBufferPool_->bytes_allocated();
  }

  int64_t peakBytesAllocated() const override {
    return partitionBufferPool_->max_memory() + veloxPool_->peakBytes();
  }

 protected:
  VeloxShuffleWriter(
      uint32_t numPartitions,
      std::unique_ptr<PartitionWriter> partitionWriter,
      ShuffleWriterOptions options,
      std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
      arrow::MemoryPool* pool)
      : ShuffleWriter(numPartitions, std::move(options), pool),
        partitionBufferPool_(std::make_unique<ShuffleMemoryPool>(pool)),
        veloxPool_(std::move(veloxPool)),
        partitionWriter_(std::move(partitionWriter)) {
    partitioner_ = Partitioner::make(options_.partitioning, numPartitions_, options_.startPartitionId);
    arenas_.resize(numPartitions);
    serdeOptions_.useLosslessTimestamp = true;
  }

  virtual ~VeloxShuffleWriter() = default;

  // Memory Pool used to track memory usage of partition buffers.
  // The actual allocation is delegated to options_.memoryPool.
  std::unique_ptr<ShuffleMemoryPool> partitionBufferPool_;

  std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;

  // PartitionWriter must destruct before partitionBufferPool_, as it may hold buffers allocated by
  // partitionBufferPool_.
  std::unique_ptr<PartitionWriter> partitionWriter_;

  std::shared_ptr<Partitioner> partitioner_;

  std::vector<std::unique_ptr<facebook::velox::StreamArena>> arenas_;

  facebook::velox::serializer::presto::PrestoVectorSerde::PrestoOptions serdeOptions_;

  int32_t maxBatchSize_{0};

  enum EvictState { kEvictable, kUnevictable };

  // stat
  enum CpuWallTimingType {
    CpuWallTimingBegin = 0,
    CpuWallTimingCompute = CpuWallTimingBegin,
    CpuWallTimingBuildPartition,
    CpuWallTimingEvictPartition,
    CpuWallTimingHasNull,
    CpuWallTimingCalculateBufferSize,
    CpuWallTimingAllocateBuffer,
    CpuWallTimingCreateRbFromBuffer,
    CpuWallTimingMakeRB,
    CpuWallTimingCacheRB,
    CpuWallTimingFlattenRV,
    CpuWallTimingSplitRV,
    CpuWallTimingIteratePartitions,
    CpuWallTimingStop,
    CpuWallTimingEnd,
    CpuWallTimingNum = CpuWallTimingEnd - CpuWallTimingBegin
  };

  static std::string CpuWallTimingName(CpuWallTimingType type) {
    switch (type) {
      case CpuWallTimingCompute:
        return "CpuWallTimingCompute";
      case CpuWallTimingBuildPartition:
        return "CpuWallTimingBuildPartition";
      case CpuWallTimingEvictPartition:
        return "CpuWallTimingEvictPartition";
      case CpuWallTimingHasNull:
        return "CpuWallTimingHasNull";
      case CpuWallTimingCalculateBufferSize:
        return "CpuWallTimingCalculateBufferSize";
      case CpuWallTimingAllocateBuffer:
        return "CpuWallTimingAllocateBuffer";
      case CpuWallTimingCreateRbFromBuffer:
        return "CpuWallTimingCreateRbFromBuffer";
      case CpuWallTimingMakeRB:
        return "CpuWallTimingMakeRB";
      case CpuWallTimingCacheRB:
        return "CpuWallTimingCacheRB";
      case CpuWallTimingFlattenRV:
        return "CpuWallTimingFlattenRV";
      case CpuWallTimingSplitRV:
        return "CpuWallTimingSplitRV";
      case CpuWallTimingIteratePartitions:
        return "CpuWallTimingIteratePartitions";
      case CpuWallTimingStop:
        return "CpuWallTimingStop";
      default:
        return "CpuWallTimingUnknown";
    }
  }

  facebook::velox::CpuWallTiming cpuWallTimingList_[CpuWallTimingNum];

  EvictState evictState_{kEvictable};

  class EvictGuard {
   public:
    explicit EvictGuard(EvictState& evictState) : evictState_(evictState) {
      evictState_ = EvictState::kUnevictable;
    }

    ~EvictGuard() {
      evictState_ = EvictState::kEvictable;
    }

    // For safety and clarity.
    EvictGuard(const EvictGuard&) = delete;
    EvictGuard& operator=(const EvictGuard&) = delete;
    EvictGuard(EvictGuard&&) = delete;
    EvictGuard& operator=(EvictGuard&&) = delete;

   private:
    EvictState& evictState_;
  };
};

} // namespace gluten