/*
 * Copyright (c) 2021 Huawei Device Co., Ltd.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#ifndef PKG_STREAM_H
#define PKG_STREAM_H
#ifndef __WIN32
#include <sys/mman.h>
#endif
#include <atomic>
#include <mutex>
#include <semaphore.h>
#include "pkg_manager.h"
#include "pkg_utils.h"

namespace Updater {
class RingBuffer;
}

namespace Hpackage {
class PkgStreamImpl : public PkgStream {
public:
    explicit PkgStreamImpl(PkgManager::PkgManagerPtr pkgManager, const std::string fileName)
        : fileName_(fileName), refCount_(0), pkgManager_(pkgManager) {}

    virtual ~PkgStreamImpl() {}

    int32_t Read(PkgBuffer &data, size_t start, size_t needRead, size_t &readLen) override
    {
        UNUSED(data);
        UNUSED(start);
        UNUSED(readLen);
        UNUSED(needRead);
        return PKG_SUCCESS;
    }

    int32_t GetBuffer(PkgBuffer &buffer) const override
    {
        buffer.length = 0;
        buffer.buffer = nullptr;
        return PKG_SUCCESS;
    }

    int32_t Write(const PkgBuffer &data, size_t size, size_t start) override
    {
        UNUSED(data);
        UNUSED(size);
        UNUSED(start);
        return PKG_SUCCESS;
    }

    virtual int32_t Seek(long int offset, int whence) = 0;

    int32_t Flush(size_t size) override
    {
        UNUSED(size);
        return PKG_SUCCESS;
    }

    const std::string GetFileName() const override;

    int32_t GetStreamType() const override
    {
        return PkgStreamType_Read;
    };

    void AddRef() override;

    void DelRef() override;

    bool IsRef() const override;

    static PkgStreamPtr ConvertPkgStream(PkgManager::StreamPtr stream);

protected:
    void PostDecodeProgress(int type, size_t writeDataLen, const void *context) const;
    std::string fileName_;

private:
    std::atomic_int refCount_;
    PkgManager::PkgManagerPtr pkgManager_ = nullptr;
};

class FileStream : public PkgStreamImpl {
public:
    FileStream(PkgManager::PkgManagerPtr pkgManager, const std::string fileName, FILE *stream, int32_t streamType)
        : PkgStreamImpl(pkgManager, fileName), stream_(stream), fileLength_(0), streamType_(streamType) {}

    ~FileStream() override;

    int32_t Read(PkgBuffer &data, size_t start, size_t needRead, size_t &readLen) override;

    int32_t Write(const PkgBuffer &data, size_t size, size_t start) override;

    int32_t Seek(long int offset, int whence) override;

    int32_t Flush(size_t size) override;

    size_t GetFileLength() override;

    int32_t GetStreamType() const override
    {
        return streamType_;
    }
    int64_t GetReadOffset() const override;
private:
    FILE *stream_;
    size_t fileLength_;
    int32_t streamType_;
    std::recursive_mutex fileStreamLock_;
};

#ifndef DIFF_PATCH_SDK
constexpr size_t BLOCK_NUM = 32;
constexpr size_t SINGLE_BLOCK_SIZE = 50 * 1024;
constexpr size_t RING_BUFFER_SIZE = BLOCK_NUM * SINGLE_BLOCK_SIZE;
constexpr long NS_PER_MS = 1000000L;
constexpr long NS_PER_SEC = 1000000000L;
constexpr size_t MS_PER_SEC = 1000;

struct ShmRingBuffer {
    sem_t semEmpty;                     // 空闲块信号量
    sem_t semFull;                      // 已用块信号量
    size_t head;                        // 消费者读取位置
    size_t tail;                        // 生产者写入位置
    uint8_t buffer[RING_BUFFER_SIZE];    // 环形数据存储区
    size_t efficientLen[BLOCK_NUM];      // 块有效长度
    uint8_t reserved[SINGLE_BLOCK_SIZE]; // 临时缓冲区
    size_t currLen;                      // 临时缓冲区长度
    size_t currOffset;                   // 临时缓冲区头偏移
};

struct ShmInfo {
    std::string shmId;
    size_t fileLen;
    size_t offset;
};

class ShmDataStream : public PkgStreamImpl {
public:
    ShmDataStream(PkgManager::PkgManagerPtr pkgManager, const std::string &fileName, const ShmInfo &shmInfo,
        int32_t streamType, size_t timeoutMs = 0) : PkgStreamImpl(pkgManager, fileName), shmId_(shmInfo.shmId),
        streamType_(streamType), pkgLen_(shmInfo.fileLen), offset_(shmInfo.offset), timeoutMs_(timeoutMs) {}

    ~ShmDataStream() override
    {
        Stop();
    };

    int32_t CreateShmRingBuffer();

    int32_t InitShmRingBuffer();

    int32_t Read(PkgBuffer &data, size_t start, size_t needRead, size_t &readLen) override;

    int32_t Write(const PkgBuffer &data, size_t size, size_t start) override;

    void Stop() override;

    bool Exit();

    int32_t Seek(long int size, int whence) override
    {
        UNUSED(size);
        UNUSED(whence);
        return PKG_SUCCESS;
    }

    void SetOffset(size_t offset)
    {
        offset_ = offset;
    }

    int64_t GetReadOffset() const override
    {
        return static_cast<int64_t>(offset_);
    }

    void SetpkgLen(size_t pkgLen)
    {
        pkgLen_ = pkgLen;
    }

    size_t GetFileLength() override
    {
        return pkgLen_;
    }

    int32_t GetStreamType() const override
    {
        return streamType_;
    }

private:
    int32_t ReadFully(size_t &needReadLen, size_t &readLen, PkgBuffer &data);
    int32_t WaitSemWithTimeout(sem_t &sem);
    ShmRingBuffer* rb_ = nullptr;
    std::string shmId_;
    int32_t streamType_;
    size_t pkgLen_ = 0; // 整包大小
    size_t offset_ = 0; // 偏移
    size_t timeoutMs_ = 0; // 信号量超时时间(ms),0表示永不超时
};
#endif

class MemoryMapStream : public PkgStreamImpl {
public:
    MemoryMapStream(PkgManager::PkgManagerPtr pkgManager, const std::string fileName, const PkgBuffer &buffer,
        int32_t streamType = PkgStreamType_MemoryMap) : PkgStreamImpl(pkgManager, fileName), memMap_(buffer.buffer),
        memSize_(buffer.length), currOffset_(0), streamType_(streamType) {}
    ~MemoryMapStream() override;

    int32_t Read(PkgBuffer &data, size_t start, size_t needRead, size_t &readLen) override;

    int32_t Write(const PkgBuffer &data, size_t size, size_t start) override;

    int32_t Seek(long int offset, int whence) override;

    int32_t GetStreamType() const override
    {
        return streamType_;
    }

    size_t GetFileLength() override
    {
        return memSize_;
    }

    int32_t Flush(size_t size) override;

    int32_t GetBuffer(PkgBuffer &buffer) const override
    {
        buffer.buffer = memMap_;
        buffer.length = memSize_;
        return PKG_SUCCESS;
    }

private:
    uint8_t *memMap_;
    size_t memSize_;
    size_t currOffset_;
    int32_t streamType_;
};

class ProcessorStream : public PkgStreamImpl {
public:
    ProcessorStream(PkgManager::PkgManagerPtr pkgManager, const std::string fileName,
        ExtractFileProcessor processor, const void *context)
        : PkgStreamImpl(pkgManager, fileName), processor_(processor), context_(context) {}

    ~ProcessorStream() override {}

    int32_t Read(PkgBuffer &data, size_t start, size_t needRead, size_t &readLen) override
    {
        UNUSED(data);
        UNUSED(start);
        UNUSED(readLen);
        UNUSED(needRead);
        return PKG_INVALID_STREAM;
    }

    int32_t Write(const PkgBuffer &data, size_t size, size_t start) override;

    int32_t Seek(long int size, int whence) override
    {
        UNUSED(size);
        UNUSED(whence);
        return PKG_SUCCESS;
    }

    int32_t GetStreamType() const override
    {
        return PkgStreamType_Process;
    }

    size_t GetFileLength() override
    {
        return 0;
    }

    int32_t Flush(size_t size) override;

private:
    ExtractFileProcessor processor_ = nullptr;
    const void *context_;
};

constexpr uint32_t MAX_FLOW_BUFFER_SIZE = 4 * 1024 * 1024;

class FlowDataStream : public Hpackage::PkgStreamImpl {
public:
    FlowDataStream(Hpackage::PkgManager::PkgManagerPtr pkgManager, const std::string fileName,
        const size_t fileSize, Updater::RingBuffer *buffer, int32_t streamType =
        PkgStreamType_FlowData) : PkgStreamImpl(pkgManager, fileName), fileLength_(fileSize),
        ringBuf_(buffer), streamType_(streamType) {}
    ~FlowDataStream() override {}

    int32_t Read(Hpackage::PkgBuffer &data, size_t start, size_t needRead, size_t &readLen) override;

    int32_t Write(const Hpackage::PkgBuffer &data, size_t size, size_t start) override;

    int32_t Seek(long int offset, int whence) override
    {
        UNUSED(offset);
        UNUSED(whence);
        return Hpackage::PKG_INVALID_STREAM;
    }

    int32_t GetStreamType() const override
    {
        return streamType_;
    }

    size_t GetFileLength() override
    {
        return fileLength_;
    }

    int32_t Flush(size_t size) override
    {
        UNUSED(size);
        return Hpackage::PKG_INVALID_STREAM;
    }

    int64_t GetReadOffset() const override
    {
        return static_cast<int64_t>(readOffset_);
    }

    void Stop() override;

private:
    int32_t ReadFromRingBuf(uint8_t *&buff, const uint32_t needLen, uint32_t &readLen);

    size_t fileLength_ {};
    Updater::RingBuffer *ringBuf_ {};
    int32_t streamType_;
    uint8_t buff_[MAX_FLOW_BUFFER_SIZE] = {0};
    uint32_t avail_ {};
    uint32_t bufOffset_ {};
    size_t readOffset_ {};
    size_t writeOffset_ {};
};
} // namespace Hpackage
#endif // PKG_STREAM_H