* Copyright (c) 2021 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "pkg_stream.h"
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#ifndef _WIN32
#include <sys/mman.h>
#endif
#include <unistd.h>
#include <cstdio>
#include "dump.h"
#include "pkg_manager.h"
#include "pkg_utils.h"
#include "ring_buffer/ring_buffer.h"
#include "securec.h"
#ifdef __APPLE__
#define off64_t off_t
#define fopen64 fopen
#define ftello64 ftello
#define fseeko64 fseek
#endif
using namespace Updater;
namespace Hpackage {
const std::string PkgStreamImpl::GetFileName() const
{
return fileName_;
}
PkgStreamPtr PkgStreamImpl::ConvertPkgStream(PkgManager::StreamPtr stream)
{
return (PkgStreamPtr)stream;
}
void PkgStreamImpl::AddRef()
{
refCount_++;
}
void PkgStreamImpl::DelRef()
{
refCount_--;
}
bool PkgStreamImpl::IsRef() const
{
return refCount_ == 0;
}
void PkgStreamImpl::PostDecodeProgress(int type, size_t writeDataLen, const void *context) const
{
if (pkgManager_ != nullptr) {
pkgManager_->PostDecodeProgress(type, writeDataLen, context);
}
}
FileStream::~FileStream()
{
if (stream_ != nullptr) {
fflush(stream_);
fclose(stream_);
stream_ = nullptr;
}
}
int32_t FileStream::Read(PkgBuffer &data, size_t start, size_t needRead, size_t &readLen)
{
Updater::UPDATER_INIT_RECORD;
std::lock_guard<std::recursive_mutex> lock(fileStreamLock_);
if (stream_ == nullptr) {
PKG_LOGE("Invalid stream");
UPDATER_LAST_WORD(PKG_INVALID_STREAM, "Invalid stream");
return PKG_INVALID_STREAM;
}
if (data.length < needRead) {
PKG_LOGE("insufficient buffer capacity");
UPDATER_LAST_WORD(PKG_INVALID_STREAM, "insufficient buffer capacity");
return PKG_INVALID_STREAM;
}
readLen = 0;
if (fseeko64(stream_, start, SEEK_SET) != 0) {
PKG_LOGE("read data fail");
UPDATER_LAST_WORD(PKG_INVALID_STREAM, "fseeko64 fail");
return PKG_INVALID_STREAM;
}
if (start > GetFileLength()) {
PKG_LOGE("Invalid start");
UPDATER_LAST_WORD(PKG_INVALID_STREAM, "Invalid start");
return PKG_INVALID_STREAM;
}
if (data.buffer == nullptr) {
data.data.resize(data.length);
data.buffer = data.data.data();
}
readLen = fread(data.buffer, 1, needRead, stream_);
if (readLen == 0) {
PKG_LOGE("read data fail %d", errno);
UPDATER_LAST_WORD(PKG_INVALID_STREAM, "read data fail");
return PKG_INVALID_STREAM;
}
return PKG_SUCCESS;
}
int64_t FileStream::GetReadOffset() const
{
off64_t ret = ftello64(stream_);
if (ret < 0) {
PKG_LOGE("ftell64 failed %d", errno);
}
return static_cast<int64_t>(ret);
}
int32_t FileStream::Write(const PkgBuffer &data, size_t size, size_t start)
{
std::lock_guard<std::recursive_mutex> lock(fileStreamLock_);
if (streamType_ != PkgStreamType_Write) {
PKG_LOGE("Invalid stream type");
return PKG_INVALID_STREAM;
}
if (stream_ == nullptr) {
PKG_LOGE("Invalid stream");
return PKG_INVALID_STREAM;
}
if (fseeko64(stream_, start, SEEK_SET) != 0) {
PKG_LOGE("write data fail");
return PKG_INVALID_STREAM;
}
size_t len = fwrite(data.buffer, size, 1, stream_);
if (len != 1) {
PKG_LOGE("Write buffer fail");
return PKG_INVALID_STREAM;
}
PostDecodeProgress(POST_TYPE_DECODE_PKG, size, nullptr);
return PKG_SUCCESS;
}
size_t FileStream::GetFileLength()
{
std::lock_guard<std::recursive_mutex> lock(fileStreamLock_);
if (stream_ == nullptr) {
PKG_LOGE("Invalid stream");
return 0;
}
if (fileLength_ == 0) {
if (Seek(0, SEEK_END) != 0) {
PKG_LOGE("Invalid stream");
return 0;
}
off64_t ret = ftello64(stream_);
if (ret < 0) {
PKG_LOGE("ftell64 failed");
return 0;
}
fileLength_ = static_cast<size_t>(ret);
if (fseek(stream_, 0, SEEK_SET) != 0) {
PKG_LOGE("fseek failed");
return 0;
}
}
return fileLength_;
}
int32_t FileStream::Seek(long int offset, int whence)
{
std::lock_guard<std::recursive_mutex> lock(fileStreamLock_);
if (stream_ == nullptr) {
PKG_LOGE("Invalid stream");
return PKG_INVALID_STREAM;
}
return fseek(stream_, offset, whence);
}
int32_t FileStream::Flush(size_t size)
{
std::lock_guard<std::recursive_mutex> lock(fileStreamLock_);
if (stream_ == nullptr) {
PKG_LOGE("Invalid stream");
return PKG_INVALID_STREAM;
}
if (fileLength_ == 0) {
fileLength_ = size;
}
if (fseek(stream_, 0, SEEK_END) != 0) {
PKG_LOGE("fseek failed");
return PKG_INVALID_STREAM;
}
off64_t ret = ftello64(stream_);
if (ret < 0) {
PKG_LOGE("ftell64 failed");
return PKG_INVALID_STREAM;
}
fileLength_ = static_cast<size_t>(ret);
if (size != fileLength_) {
PKG_LOGE("Flush size %zu local size:%zu", size, fileLength_);
}
if (fflush(stream_) != 0) {
PKG_LOGE("Invalid stream");
return PKG_INVALID_STREAM;
}
return PKG_SUCCESS;
}
#ifndef DIFF_PATCH_SDK
int32_t ShmDataStream::CreateShmRingBuffer()
{
PKG_LOGI("CreateShmRingBuffer");
int fd = shm_open(shmId_.c_str(), O_CREAT | O_RDWR, 0666);
if (fd == -1) {
PKG_LOGE("shm_open failed");
return -1;
};
ftruncate(fd, sizeof(ShmRingBuffer));
rb_ = static_cast<ShmRingBuffer*>(mmap(nullptr, sizeof(ShmRingBuffer), PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0));
close(fd);
if (rb_ == MAP_FAILED) {
PKG_LOGE("ShmRingBuffer mmap failed");
return -1;
}
sem_init(&rb_->semEmpty, 1, BLOCK_NUM);
sem_init(&rb_->semFull, 1, 0);
rb_->head = 0;
rb_->tail = 0;
rb_->currLen = 0;
rb_->currOffset = 0;
return 0;
}
int32_t ShmDataStream::InitShmRingBuffer()
{
int fd = shm_open(shmId_.c_str(), O_RDWR, 0666);
if (fd == -1) {
PKG_LOGE("shm_open failed");
return -1;
};
rb_ = static_cast<ShmRingBuffer*>(mmap(nullptr, sizeof(ShmRingBuffer), PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0));
close(fd);
if (rb_ == MAP_FAILED) {
PKG_LOGE("ShmRingBuffer mmap failed");
return -1;
}
return 0;
}
int32_t ShmDataStream::WaitSemWithTimeout(sem_t &sem)
{
if (timeoutMs_ == 0) {
return sem_wait(&sem);
}
timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += static_cast<long>(timeoutMs_ / MS_PER_SEC);
ts.tv_nsec += static_cast<long>(timeoutMs_ % MS_PER_SEC) * NS_PER_MS;
if (ts.tv_nsec >= NS_PER_SEC) {
ts.tv_sec += ts.tv_nsec / NS_PER_SEC;
ts.tv_nsec %= NS_PER_SEC;
}
return sem_timedwait(&sem, &ts);
}
int32_t ShmDataStream::ReadFully(size_t &needReadLen, size_t &readLen, PkgBuffer &data)
{
if (rb_ == nullptr) {
PKG_LOGE("rb is nullptr");
return PKG_INVALID_PARAM;
}
while (needReadLen > 0) {
if (WaitSemWithTimeout(rb_->semFull) != 0) {
PKG_LOGE("WaitSemWithTimeout timeout");
return PKG_INVALID_STREAM;
}
size_t len = rb_->efficientLen[rb_->head];
if (len == 0) {
PKG_LOGE("efficient len invalid: %d", rb_->head);
return PKG_INVALID_STREAM;
}
size_t offset = rb_->head * SINGLE_BLOCK_SIZE;
if (needReadLen >= len) {
if (memcpy_s(data.buffer + readLen, len, rb_->buffer + offset, len) != 0) {
PKG_LOGE("memcpy_s fail, len: %d", len);
return PKG_INVALID_STREAM;
}
rb_->efficientLen[rb_->head] = 0;
needReadLen -= len;
readLen += len;
rb_->head = (rb_->head + 1) % BLOCK_NUM;
sem_post(&rb_->semEmpty);
continue;
}
if (memcpy_s(data.buffer + readLen, needReadLen, rb_->buffer + offset, needReadLen) != 0) {
PKG_LOGE("memcpy_s fail, needReadLen: %d", needReadLen);
return PKG_INVALID_STREAM;
}
if (memcpy_s(rb_->reserved, len - needReadLen, rb_->buffer + offset + needReadLen, len - needReadLen) != 0) {
PKG_LOGE("memcpy_s fail, needReadLen: %d", len - needReadLen);
return PKG_INVALID_STREAM;
}
rb_->currLen = len - needReadLen;
rb_->currOffset = 0;
readLen += needReadLen;
needReadLen -= needReadLen;
rb_->efficientLen[rb_->head] = 0;
rb_->head = (rb_->head + 1) % BLOCK_NUM;
sem_post(&rb_->semEmpty);
}
return PKG_SUCCESS;
}
int32_t ShmDataStream::Read(PkgBuffer &data, size_t start, size_t needRead, size_t &readLen)
{
if (start != offset_) {
PKG_LOGE("offset_ not matched %zu", offset_);
UPDATER_LAST_WORD(PKG_INVALID_STREAM, "offset_ not matched");
return PKG_INVALID_STREAM;
}
if (data.length < needRead) {
PKG_LOGE("insufficient buffer capacity");
UPDATER_LAST_WORD(PKG_INVALID_STREAM, "insufficient buffer capacity");
return PKG_INVALID_STREAM;
}
if (data.buffer == nullptr) {
data.data.resize(needRead);
data.buffer = data.data.data();
}
if (rb_ == nullptr) {
PKG_LOGE("rb_ is nullptr");
UPDATER_LAST_WORD(PKG_INVALID_STREAM, "rb_ is nullptr");
return PKG_INVALID_STREAM;
}
readLen = 0;
size_t needReadLen = needRead;
if (rb_->currLen > 0) {
if (rb_->currLen >= needReadLen) {
if (memcpy_s(data.buffer, needReadLen, rb_->reserved + rb_->currOffset, needReadLen) != 0) {
PKG_LOGE("memcpy_s fail");
return PKG_INVALID_STREAM;
}
rb_->currOffset += needReadLen;
rb_->currLen -= needReadLen;
readLen = needReadLen;
offset_ += readLen;
return 0;
}
if (memcpy_s(data.buffer, rb_->currLen, rb_->reserved + rb_->currOffset, rb_->currLen) != 0) {
PKG_LOGE("memcpy_s fail");
return PKG_INVALID_STREAM;
}
needReadLen -= rb_->currLen;
readLen += rb_->currLen;
rb_->currLen = 0;
rb_->currOffset = 0;
}
if (ReadFully(needReadLen, readLen, data) != PKG_SUCCESS) {
return PKG_INVALID_STREAM;
}
offset_ += readLen;
return 0;
}
int32_t ShmDataStream::Write(const PkgBuffer &data, size_t size, size_t start)
{
if (start != offset_) {
PKG_LOGE("offset_ not matched %zu", offset_);
UPDATER_LAST_WORD(PKG_INVALID_STREAM, "offset_ not matched");
return PKG_INVALID_STREAM;
}
if (data.length < size || data.buffer == nullptr) {
PKG_LOGE("insufficient buffer capacity");
UPDATER_LAST_WORD(PKG_INVALID_STREAM, "insufficient buffer capacity");
return PKG_INVALID_STREAM;
}
if (rb_ == nullptr) {
PKG_LOGE("rb_ is nullptr");
UPDATER_LAST_WORD(PKG_INVALID_STREAM, "rb_ is nullptr");
return PKG_INVALID_STREAM;
}
size_t needWriteLen = size;
size_t writedLen = 0;
while (needWriteLen > 0) {
if (WaitSemWithTimeout(rb_->semEmpty) != 0) {
PKG_LOGE("WaitSemWithTimeout timeout");
UPDATER_LAST_WORD(PKG_INVALID_STREAM, "WaitSemWithTimeout timeout");
return PKG_INVALID_STREAM;
}
size_t len = needWriteLen < SINGLE_BLOCK_SIZE ? needWriteLen : SINGLE_BLOCK_SIZE;
size_t offset = rb_->tail * SINGLE_BLOCK_SIZE;
if (memcpy_s(rb_->buffer + offset, len, data.buffer + writedLen, len) != 0) {
PKG_LOGE("memcpy_s fail");
UPDATER_LAST_WORD(PKG_INVALID_STREAM, "memcpy_s fail");
return PKG_INVALID_STREAM;
}
rb_->efficientLen[rb_->tail] = len;
rb_->tail = (rb_->tail + 1) % BLOCK_NUM;
needWriteLen -= len;
writedLen += len;
sem_post(&rb_->semFull);
}
offset_ += size;
return 0;
}
void ShmDataStream::Stop()
{
if (rb_ == nullptr) {
PKG_LOGE("rb_ is nullptr");
return;
}
if (munmap(rb_, sizeof(RingBuffer)) != 0) {
PKG_LOGE("munmap failed : %s", strerror(errno));
return;
}
rb_ = nullptr;
}
bool ShmDataStream::Exit()
{
if (rb_ == nullptr) {
PKG_LOGE("rb_ is nullptr");
return false;
}
sem_destroy(&rb_->semEmpty);
sem_destroy(&rb_->semFull);
if (munmap(rb_, sizeof(RingBuffer)) != 0) {
PKG_LOGE("munmap failed : %s", strerror(errno));
return false;
}
rb_ = nullptr;
if (shm_unlink(shmId_.c_str()) != 0) {
PKG_LOGE("shm_unlink failed : %s", strerror(errno));
return false;
}
return true;
}
#endif
MemoryMapStream::~MemoryMapStream()
{
if (memMap_ == nullptr) {
PKG_LOGE("Invalid memory map");
return;
}
if (streamType_ == PkgStreamType_MemoryMap) {
ReleaseMemory(memMap_, memSize_);
}
}
int32_t MemoryMapStream::Read(PkgBuffer &data, size_t start, size_t needRead, size_t &readLen)
{
if (memMap_ == nullptr) {
PKG_LOGE("Invalid memory map");
return PKG_INVALID_STREAM;
}
if (start > memSize_) {
PKG_LOGE("Invalid start");
return PKG_INVALID_STREAM;
}
if (data.length < needRead) {
PKG_LOGE("insufficient buffer capacity");
return PKG_INVALID_STREAM;
}
size_t copyLen = GetFileLength() - start;
readLen = ((copyLen > needRead) ? needRead : copyLen);
if (data.data.size() == 0) {
data.buffer = memMap_ + start;
} else {
if (memcpy_s(data.buffer, needRead, memMap_ + start, readLen) != EOK) {
PKG_LOGE("Memcpy failed size:%zu, start:%zu copyLen:%zu %zu", needRead, start, copyLen, readLen);
return PKG_NONE_MEMORY;
}
}
return PKG_SUCCESS;
}
int32_t MemoryMapStream::Write(const PkgBuffer &data, size_t size, size_t start)
{
if (memMap_ == nullptr) {
PKG_LOGE("Invalid memory map");
return PKG_INVALID_STREAM;
}
if (start > memSize_) {
PKG_LOGE("Invalid start");
return PKG_INVALID_STREAM;
}
currOffset_ = start;
size_t copyLen = (memSize_ - start > SECUREC_MEM_MAX_LEN) ? SECUREC_MEM_MAX_LEN : memSize_ - start;
if (copyLen < size) {
PKG_LOGE("Write fail copyLen %zu, %zu", copyLen, size);
return PKG_INVALID_STREAM;
}
int32_t ret = memcpy_s(memMap_ + currOffset_, copyLen, data.buffer, size);
if (ret != PKG_SUCCESS) {
PKG_LOGE("Write fail");
return PKG_INVALID_STREAM;
}
PostDecodeProgress(POST_TYPE_DECODE_PKG, size, nullptr);
return PKG_SUCCESS;
}
int32_t MemoryMapStream::Flush(size_t size)
{
if (size != memSize_) {
PKG_LOGE("Flush size %zu local size:%zu", size, memSize_);
}
if (streamType_ == PkgStreamType_MemoryMap) {
msync(static_cast<void *>(memMap_), memSize_, MS_ASYNC);
}
currOffset_ = size;
return PKG_SUCCESS;
}
int32_t MemoryMapStream::Seek(long int offset, int whence)
{
if (whence == SEEK_SET) {
if (offset < 0) {
PKG_LOGE("Invalid offset");
return PKG_INVALID_STREAM;
}
if (static_cast<size_t>(offset) > memSize_) {
PKG_LOGE("Invalid offset");
return PKG_INVALID_STREAM;
}
currOffset_ = static_cast<size_t>(offset);
} else if (whence == SEEK_CUR) {
if (static_cast<size_t>(offset) > (memSize_ - currOffset_)) {
PKG_LOGE("Invalid offset");
return PKG_INVALID_STREAM;
}
currOffset_ += static_cast<size_t>(offset);
} else {
if (offset > 0) {
PKG_LOGE("Invalid offset");
return PKG_INVALID_STREAM;
}
auto memSize = static_cast<long long>(memSize_);
if (memSize + offset < 0) {
PKG_LOGE("Invalid offset");
return PKG_INVALID_STREAM;
}
currOffset_ = static_cast<size_t>(memSize + offset);
}
return PKG_SUCCESS;
}
int32_t ProcessorStream::Write(const PkgBuffer &data, size_t size, size_t start)
{
if (processor_ == nullptr) {
PKG_LOGE("processor not exist");
return PKG_INVALID_STREAM;
}
int ret = processor_(data, size, start, false, context_);
PostDecodeProgress(POST_TYPE_DECODE_PKG, size, nullptr);
return ret;
}
int32_t ProcessorStream::Flush(size_t size)
{
UNUSED(size);
if (processor_ == nullptr) {
PKG_LOGE("processor not exist");
return PKG_INVALID_STREAM;
}
PkgBuffer data = {};
return processor_(data, 0, 0, true, context_);
}
int32_t FlowDataStream::Read(PkgBuffer &data, size_t start, size_t needRead, size_t &readLen)
{
if (readOffset_ != start) {
PKG_LOGE("Invalid start, readOffset_: %d, start: %d", readOffset_, start);
return PKG_INVALID_STREAM;
}
if (data.length < needRead) {
PKG_LOGE("Invalid need length");
return PKG_INVALID_STREAM;
}
if (data.buffer == nullptr) {
data.data.resize(needRead);
data.buffer = data.data.data();
}
readLen = 0;
uint8_t *buffer = nullptr;
while (needRead - readLen > 0) {
uint32_t readOnce = 0;
if (ReadFromRingBuf(buffer, needRead - readLen, readOnce) != PKG_SUCCESS) {
PKG_LOGE("Fail to read header");
return PKG_INVALID_STREAM;
}
if (buffer == nullptr || readOnce == 0) {
PKG_LOGE("Fail to read header, readOnce: %d", readOnce);
return PKG_INVALID_STREAM;
}
if (memcpy_s(data.buffer + readLen, readOnce, buffer, readOnce) != EOK) {
PKG_LOGE("Memcpy failed size:%zu, copyLen:%zu", needRead, readOnce);
return PKG_NONE_MEMORY;
}
readLen += readOnce;
}
readOffset_ += needRead;
return PKG_SUCCESS;
}
int32_t FlowDataStream::ReadFromRingBuf(uint8_t *&buff, const uint32_t needLen, uint32_t &readLen)
{
if (ringBuf_ == nullptr) {
PKG_LOGE("ringBuf_ is nullptr");
buff = nullptr;
return PKG_INVALID_STREAM;
}
if ((avail_ == 0) && !ringBuf_->Pop(buff_, MAX_FLOW_BUFFER_SIZE, avail_)) {
PKG_LOGE("read data fail");
buff = nullptr;
return PKG_INVALID_STREAM;
}
buff = buff_ + bufOffset_;
readLen = needLen <= avail_ ? needLen : avail_;
avail_ -= readLen;
bufOffset_ = avail_ == 0 ? 0 : bufOffset_ + readLen;
return PKG_SUCCESS;
}
int32_t FlowDataStream::Write(const PkgBuffer &data, size_t size, size_t start)
{
if (ringBuf_ == nullptr) {
PKG_LOGE("ringBuf_ is nullptr");
return PKG_INVALID_STREAM;
}
if (writeOffset_ != start) {
PKG_LOGE("Invalid start, writeOffset: %zu, start: %zu", writeOffset_, start);
return PKG_INVALID_STREAM;
}
if (ringBuf_->Push(data.buffer, size)) {
writeOffset_ += size;
return PKG_SUCCESS;
}
PKG_LOGE("Write ring buffer fail");
return PKG_INVALID_STREAM;
}
void FlowDataStream::Stop()
{
PKG_LOGI("FlowDataStream stop");
if (ringBuf_ != nullptr) {
ringBuf_->Stop();
}
}
}