#include "sql/recover_module/payload.h"
#include <algorithm>
#include <cstddef>
#include <limits>
#include <ostream>
#include <type_traits>
#include "base/check_op.h"
#include "sql/recover_module/btree.h"
#include "sql/recover_module/integers.h"
#include "sql/recover_module/pager.h"
#include "third_party/sqlite/sqlite3.h"
namespace sql {
namespace recover {
namespace {
constexpr int kPageIdSize = sizeof(int32_t);
constexpr int kMaxPageOverhead = 12;
constexpr int kMaxCellOverhead = 23;
static constexpr int kLeafPayloadFraction = 32;
static constexpr int kPayloadFractionDenominator = 255;
int MaxInlinePayloadSize(int page_size) {
DCHECK_GE(page_size, DatabasePageReader::kMinUsablePageSize);
DCHECK_LE(page_size, DatabasePageReader::kMaxPageSize);
const int max_inline_payload_size =
page_size - kMaxPageOverhead - kMaxCellOverhead;
DCHECK_GE(max_inline_payload_size, LeafPayloadReader::kMinInlineSize);
static_assert(
DatabasePageReader::kMinPageSize - kMaxPageOverhead - kMaxCellOverhead >
LeafPayloadReader::kMinInlineSize,
"The DCHECK above may fail");
return max_inline_payload_size;
}
int MinInlinePayloadSize(int page_size) {
DCHECK_GE(page_size, DatabasePageReader::kMinUsablePageSize);
DCHECK_LE(page_size, DatabasePageReader::kMaxPageSize);
const int min_inline_payload_size =
((page_size - kMaxPageOverhead) * kLeafPayloadFraction) /
kPayloadFractionDenominator -
kMaxCellOverhead;
static_assert((DatabasePageReader::kMaxPageSize - kMaxPageOverhead) *
kLeafPayloadFraction <=
std::numeric_limits<int>::max(),
"The |min_inline_payload_size| computation above may overflow");
DCHECK_GE(min_inline_payload_size, LeafPayloadReader::kMinInlineSize);
static_assert(((DatabasePageReader::kMinUsablePageSize - kMaxPageOverhead) *
kLeafPayloadFraction) /
kPayloadFractionDenominator -
kMaxCellOverhead >=
LeafPayloadReader::kMinInlineSize,
"The DCHECK above may fail");
DCHECK_LE(min_inline_payload_size, MaxInlinePayloadSize(page_size));
return min_inline_payload_size;
}
int MaxOverflowPayloadSize(int page_size) {
DCHECK_GE(page_size, DatabasePageReader::kMinUsablePageSize);
DCHECK_LE(page_size, DatabasePageReader::kMaxPageSize);
const int max_overflow_payload_size = page_size - 4;
DCHECK_GT(max_overflow_payload_size, 0);
static_assert(DatabasePageReader::kMinUsablePageSize > 4,
"The DCHECK above may fail");
DCHECK_LE(max_overflow_payload_size, DatabasePageReader::kMaxPageSize);
return max_overflow_payload_size;
}
}
LeafPayloadReader::LeafPayloadReader(DatabasePageReader* db_reader)
: db_reader_(db_reader), page_id_(DatabasePageReader::kInvalidPageId) {}
LeafPayloadReader::~LeafPayloadReader() = default;
bool LeafPayloadReader::Initialize(int64_t payload_size, int payload_offset) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
payload_size_ = payload_size;
inline_payload_offset_ = payload_offset;
page_id_ = db_reader_->page_id();
const int page_size = db_reader_->page_size();
const int max_inline_payload_size = MaxInlinePayloadSize(page_size);
if (payload_size <= max_inline_payload_size) {
inline_payload_size_ = static_cast<int>(payload_size);
overflow_page_count_ = 0;
} else {
const int min_inline_payload_size = MinInlinePayloadSize(page_size);
DCHECK_GT(payload_size, min_inline_payload_size);
static_assert(
DatabasePageReader::kMaxPageSize * 2 <= std::numeric_limits<int>::max(),
"The additions below may overflow");
max_overflow_payload_size_ = MaxOverflowPayloadSize(page_size);
const int64_t efficient_overflow_page_count =
(payload_size - min_inline_payload_size) / max_overflow_payload_size_;
const int efficient_overflow_spill =
(payload_size - min_inline_payload_size) % max_overflow_payload_size_;
const int efficient_inline_payload_size =
min_inline_payload_size + efficient_overflow_spill;
if (efficient_inline_payload_size <= max_inline_payload_size) {
inline_payload_size_ = efficient_inline_payload_size;
overflow_page_count_ = efficient_overflow_page_count;
DCHECK_EQ(
0, (payload_size - inline_payload_size_) % max_overflow_payload_size_)
<< "Overflow pages not fully packed";
} else {
inline_payload_size_ = min_inline_payload_size;
overflow_page_count_ = efficient_overflow_page_count + 1;
}
DCHECK_LE(inline_payload_size_, max_inline_payload_size);
DCHECK_EQ(overflow_page_count_, (payload_size - inline_payload_size_ +
(max_overflow_payload_size_ - 1)) /
max_overflow_payload_size_)
<< "Incorect overflow page count calculation";
}
DCHECK_LE(inline_payload_size_, payload_size);
DCHECK_LE(inline_payload_size_, page_size);
const int first_overflow_page_id_size =
(overflow_page_count_ == 0) ? 0 : kPageIdSize;
if (inline_payload_offset_ + inline_payload_size_ +
first_overflow_page_id_size >
page_size) {
page_id_ = DatabasePageReader::kInvalidPageId;
return false;
}
overflow_page_ids_.clear();
overflow_page_ids_.reserve(overflow_page_count_);
return true;
}
bool LeafPayloadReader::IsInitialized() const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
return page_id_ != DatabasePageReader::kInvalidPageId;
}
bool LeafPayloadReader::ReadPayload(int64_t offset,
int64_t size,
uint8_t* buffer) {
DCHECK(IsInitialized())
<< "Initialize() not called, or last call did not succeed";
DCHECK_GE(offset, 0);
DCHECK_LT(offset, payload_size_);
DCHECK_GT(size, 0);
DCHECK_LE(offset + size, payload_size_);
DCHECK(buffer != nullptr);
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(page_id_ != DatabasePageReader::kInvalidPageId)
<< "Initialize() not called, or last call did not succeed";
if (offset < inline_payload_size_) {
if (db_reader_->ReadPage(page_id_) != SQLITE_OK)
return false;
const int page_size = db_reader_->page_size();
const int read_offset = inline_payload_offset_ + static_cast<int>(offset);
DCHECK_LE(read_offset, page_size);
const int read_size =
(static_cast<int>(offset) + size <= inline_payload_size_)
? static_cast<int>(size)
: inline_payload_size_ - static_cast<int>(offset);
DCHECK_LE(read_offset + read_size, page_size);
std::copy(db_reader_->page_data() + read_offset,
db_reader_->page_data() + read_offset + read_size, buffer);
if (read_size == size) {
return true;
}
offset += read_size;
DCHECK_EQ(offset, inline_payload_size_);
DCHECK_GT(size, read_size);
size -= read_size;
buffer += read_size;
}
DCHECK_GE(offset, inline_payload_size_);
if (max_overflow_payload_size_ <= 0) {
return false;
}
while (size > 0) {
const int overflow_page_index =
(offset - inline_payload_size_) / max_overflow_payload_size_;
DCHECK_LT(overflow_page_index, overflow_page_count_);
const int overflow_page_offset =
(offset - inline_payload_size_) % max_overflow_payload_size_;
while (overflow_page_ids_.size() <=
static_cast<size_t>(overflow_page_index)) {
if (!PopulateNextOverflowPageId())
return false;
}
const int page_id = overflow_page_ids_[overflow_page_index];
if (db_reader_->ReadPage(page_id) != SQLITE_OK)
return false;
const int page_size = db_reader_->page_size();
const int read_offset = kPageIdSize + overflow_page_offset;
DCHECK_LE(read_offset, page_size);
const int read_size = std::min<int64_t>(page_size - read_offset, size);
DCHECK_LE(read_offset + read_size, page_size);
std::copy(db_reader_->page_data() + read_offset,
db_reader_->page_data() + read_offset + read_size, buffer);
offset += read_size;
DCHECK_GE(size, read_size);
size -= read_size;
buffer += read_size;
}
return true;
}
const uint8_t* LeafPayloadReader::ReadInlinePayload() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK(IsInitialized())
<< "Initialize() not called, or last call did not succeed";
if (db_reader_->ReadPage(page_id_) != SQLITE_OK)
return nullptr;
return db_reader_->page_data() + inline_payload_offset_;
}
bool LeafPayloadReader::PopulateNextOverflowPageId() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
DCHECK_LT(overflow_page_ids_.size(),
static_cast<size_t>(overflow_page_count_));
int page_id_offset;
if (overflow_page_ids_.empty()) {
page_id_offset = inline_payload_offset_ + inline_payload_size_;
if (db_reader_->ReadPage(page_id_) != SQLITE_OK)
return false;
} else {
page_id_offset = 0;
if (db_reader_->ReadPage(overflow_page_ids_.back()) != SQLITE_OK)
return false;
}
DCHECK_LE(page_id_offset + kPageIdSize, db_reader_->page_size());
const int next_page_id =
LoadBigEndianInt32(db_reader_->page_data() + page_id_offset);
if (!DatabasePageReader::IsValidPageId(next_page_id)) {
return false;
}
overflow_page_ids_.push_back(next_page_id);
return true;
}
}
}