* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* vechashtable.h
*
*
* IDENTIFICATION
* src/include/vecexecutor/vechashtable.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef VECHASHTABLE_H_
#define VECHASHTABLE_H_
#include "vecexecutor/vectorbatch.h"
#include "nodes/execnodes.h"
#include "access/hash.h"
#include "storage/buf/buffile.h"
#include "utils/memutils.h"
#include "utils/batchsort.h"
#include "utils/memprot.h"
#ifdef USE_ASSERT_CHECKING
#define HASH_BASED_DEBUG(A) A
#else
#define HASH_BASED_DEBUG(A)
#endif
typedef uint32 HashKey;
class vechashtable;
struct hashVal {
ScalarValue val;
uint8 flag;
};
struct hashCell {
union {
hashCell* m_next;
int m_rows;
} flag;
hashVal m_val[FLEXIBLE_ARRAY_MEMBER];
};
typedef struct HashSegTbl {
int tbl_size;
hashCell** tbl_data;
} HashSegTbl;
#define GET_NTH_CELL(cellHead, i) (hashCell*)((char*)cellHead + i * m_cellSize)
#define CELL_NTH_VAL(cell, i) (cell)->m_val[i].val
#define CELL_NTH_FLAG(cell, i) (cell)->m_val[i].flag
#define FILL_FACTOR 1.2
#define MHASH_FUN 0
#define DHASH_FUN 1
#define INNER_SIDE 0
#define OUTER_SIDE 1
#define MIN_HASH_TABLE_SIZE 4096
#define MAX_BUCKET_NUM (MaxAllocSize / sizeof(hashCell*) - 1)
#define MAX_LOG_LEN 1024
#define HASH_PREFETCH_DISTANCE 10
#define HASH_VARBUFSIZE 2 * 1024 * 1024
#define SCALAR_FUN 0
#define VAR_FUN 1
#define HASH_IN_MEMORY 0
#define HASH_IN_DISK 1
#define HASH_RESPILL 2
#define HASH_EXPAND_THRESHOLD 2
#define HASH_EXPAND_SIZE 2
class hashSource : public BaseObject {
public:
virtual VectorBatch* getBatch()
{
Assert(false);
return NULL;
}
virtual hashCell* getCell()
{
Assert(false);
return NULL;
}
virtual void close(int idx)
{
Assert(false);
}
virtual TupleTableSlot* getTup()
{
Assert(false);
return NULL;
}
virtual ~hashSource()
{}
};
class hashOpSource : public hashSource {
public:
hashOpSource(PlanState* op);
~hashOpSource(){};
VectorBatch* getBatch();
int64 getFileSize();
TupleTableSlot* getTup();
void close(int idx)
{
return;
}
private:
PlanState* m_op;
};
typedef ScalarValue (*stripValFun)(ScalarValue* val);
class hashMemSource : public hashSource {
public:
hashMemSource(List* data);
~hashMemSource(){};
hashCell* getCell();
private:
List* m_list;
ListCell* m_cell;
};
class hashFileSource : public hashSource {
public:
hashFileSource(VectorBatch* batch, MemoryContext context, int cellSize, hashCell* cellArray, bool complicateJoin,
int m_write_cols, int fileNum, TupleDesc tupleDescritor);
hashFileSource(TupleTableSlot* hashslot, int fileNum);
~hashFileSource(){};
TupleTableSlot* getTup();
void writeTup(MinimalTupleData* Tup, int idx);
void rewind(int idx);
void close(int idx);
void closeAll();
void freeFileSource();
void setCurrentIdx(int idx);
int getCurrentIdx();
int64 getCurrentIdxRownum(int64 rows_in_mem);
bool next();
void enlargeFileSource(int fileNum);
void writeCell(hashCell* cell, HashKey key);
void writeBatch(VectorBatch* batch, int idx, HashKey key);
void writeBatchToFile(VectorBatch* batch, int idx, int fileIdx);
void writeBatchWithHashval(VectorBatch* batch, int idx, HashKey key);
void writeBatchWithHashval(VectorBatch* batch, int idx, HashKey key, int fileIdx);
void resetFileSource();
void initFileSource(int fileNum);
VectorBatch* getBatch();
int64 getFileSize();
hashCell* getCell();
void resetVariableMemberIfNecessary(int fileNum);
void ReleaseFileHandlerBuffer(int fileIdx);
void ReleaseAllFileHandlerBuffer();
void PrepareFileHandlerBuffer(int fileIdx);
public:
int m_cellSize;
int64* m_rownum;
int64 m_total_filesize;
int64* m_spill_size;
int64* m_fileSize;
int m_cols;
int m_write_cols;
int* m_funType;
VectorBatch* m_batch;
hashCell* m_cellArray;
MemoryContext m_context;
void** m_file;
TupleTableSlot* m_hashTupleSlot;
MinimalTuple m_tuple;
uint32 m_tupleSize;
int m_currentFileIdx;
int m_fileNum;
Datum* m_values;
bool* m_isnull;
uint32 m_varSpaceLen;
stripValFun* m_stripFunArray;
size_t (hashFileSource::*m_write[2])(ScalarValue val, uint8 flag, int idx);
size_t (hashFileSource::*m_read[2])(ScalarValue* val, uint8* flag);
size_t (hashFileSource::*m_writeTuple)(MinimalTupleData* Tup, int idx);
TupleTableSlot* (hashFileSource::*m_getTuple)();
hashCell* (hashFileSource::*m_getCell)();
void (hashFileSource::*m_rewind)(int idx);
void (hashFileSource::*m_close)(int idx);
size_t (hashFileSource::*m_writeCell)(hashCell* cell, int idx);
size_t (hashFileSource::*m_writeBatch)(VectorBatch* batch, int idx, int fileIdx);
VectorBatch* (hashFileSource::*m_getBatch)();
size_t (hashFileSource::*m_writeBatchWithHashval)(VectorBatch* batch, int idx, HashKey key, int fileIdx);
private:
template <bool compress_spill>
size_t writeScalar(ScalarValue val, uint8 flag, int fileIdx);
template <bool compress_spill>
size_t writeVar(ScalarValue val, uint8 flag, int fileIdx);
template <bool compress_spill>
size_t readScalar(ScalarValue* val, uint8* flag);
template <bool compress_spill>
size_t readVar(ScalarValue* val, uint8* flag);
size_t writeBatchNoCompress(VectorBatch* batch, int idx, int fileIdx);
size_t writeBatchCompress(VectorBatch* batch, int idx, int fileIdx);
void assembleBatch(TupleTableSlot* slot, int idx);
template <bool get_hashval>
VectorBatch* getBatchCompress();
template <bool get_hashval>
VectorBatch* getBatchNoCompress();
void rewindCompress(int idx);
void rewindNoCompress(int idx);
void closeCompress(int idx);
void closeNoCompress(int idx);
template <bool write_hashval>
size_t writeCellNoCompress(hashCell* cell, int fileIdx);
template <bool write_hashval>
size_t writeCellCompress(hashCell* cell, int fileIdx);
template <bool write_hashval>
hashCell* getCellNoCompress();
template <bool write_hashval>
hashCell* getCellCompress();
template <bool compress_spill>
size_t writeTupCompress(MinimalTupleData* Tup, int idx);
template <bool compress_spill>
TupleTableSlot* getTupCompress();
template <bool compress_spill>
size_t writeBatchWithHashvalCompress(VectorBatch* batch, int idx, HashKey key, int fileIdx);
};
class hashSortSource : public hashSource {
public:
hashSortSource(Batchsortstate* batchSortState, VectorBatch* sortBatch);
~hashSortSource(){};
VectorBatch* getBatch();
VectorBatch* m_SortBatch;
private:
Batchsortstate* m_batchSortStateIn;
};
inline int getPower2LessNum(int num)
{
int i = 1;
int count = 0;
for (;;) {
num = num / 2;
if (num == 0)
break;
count++;
}
i <<= count;
return i;
}
inline int getPower2NextNum(int64 num)
{
int i = 1;
int count = 0;
if (num > INT_MAX / 2)
num = INT_MAX / 2;
num = num - 1;
for (;;) {
num = num / 2;
if (num == 0)
break;
count++;
}
i <<= (count + 1);
return i;
}
* operator common structure. */
class hashBasedOperator : public BaseObject {
public:
hashBasedOperator() : m_spillToDisk(false), m_rows(0), m_totalMem(0), m_availmems(0)
{
m_filesource = NULL;
m_innerHashFuncs = NULL;
m_key = 0;
m_cols = 0;
m_sysBusy = false;
m_outerHashFuncs = NULL;
m_overflowsource = NULL;
m_hashContext = NULL;
m_keyDesc = NULL;
m_tupleCount = 0;
m_hashTbl = NULL;
m_colWidth = 0;
m_cellSize = 0;
m_tmpContext = NULL;
m_colDesc = NULL;
m_spreadNum = 0;
m_okeyIdx = NULL;
m_fill_table_rows = 0;
m_keyIdx = NULL;
m_keySimple = false;
m_eqfunctions = NULL;
m_strategy = 0;
m_maxMem = 0;
}
virtual ~hashBasedOperator()
{}
virtual void Build() = 0;
virtual VectorBatch* Probe() = 0;
hashFileSource* CreateTempFile(VectorBatch* batch, int fileNum, PlanState* planstate);
void closeFile();
FORCE_INLINE
int64 getRows()
{
return m_rows;
}
int getFileNum()
{
if (m_filesource != NULL)
return m_filesource->m_fileNum;
else
return 0;
}
int calcFileNum(long numGroups);
void ReplaceEqfunc();
void JudgeMemoryOverflow(char* opname, int planid, int dop, Instrumentation* instrument = NULL,
bool isRack = false);
bool JudgeMemoryAllowExpand();
void freeMemoryContext();
template <bool reHash>
void hashCellT(hashCell* cell, int keyIdx, FmgrInfo* hashFmgr, int nval, ScalarValue* hashRes);
template <bool reHash>
void hashColT(ScalarVector* val, FmgrInfo* hashFmgr, int nval, ScalarValue* hashRes);
inline void hashBatch(
VectorBatch* batch, int* keyIdx, ScalarValue* hashRes, FmgrInfo* hashFmgr, bool needSpill = false);
inline void hashCellArray(
hashCell* cell, int nrows, int* keyIdx, ScalarValue* hashRes, FmgrInfo* hashFmgr, bool needSpill = false);
public:
vechashtable* m_hashTbl;
MemoryContext m_hashContext;
MemoryContext m_tmpContext;
* need not free and apply memory again, only do reset when rescan.
*/
ScalarValue m_cacheLoc[BatchMaxSize];
hashCell* m_cellCache[BatchMaxSize];
bool m_keyMatch[BatchMaxSize];
FmgrInfo* m_eqfunctions;
FmgrInfo* m_outerHashFuncs;
FmgrInfo* m_innerHashFuncs;
hashFileSource* m_filesource;
hashFileSource* m_overflowsource;
bool m_spillToDisk;
int m_strategy;
int64 m_rows;
int64 m_totalMem;
int64 m_availmems;
int m_cols;
int m_key;
int* m_keyIdx;
int* m_okeyIdx;
ScalarDesc* m_keyDesc;
ScalarDesc* m_colDesc;
bool m_keySimple;
int m_cellSize;
int m_fill_table_rows;
int64 m_tupleCount;
int64 m_colWidth;
int64 m_maxMem;
int m_spreadNum;
bool m_sysBusy;
};
class vechashtable : public BaseObject {
public:
vechashtable(int hashSize) : m_size(hashSize), m_data(0)
{
m_data = (hashCell**)palloc0(m_size * sizeof(hashCell*));
}
~vechashtable()
{
if (m_data)
pfree(m_data);
m_data = NULL;
m_size = 0;
}
void Reset()
{
Assert(m_size != 0);
m_data = (hashCell**)palloc0(m_size * sizeof(hashCell*));
}
void Profile(char* stats, bool* can_wlm_warning_statistics);
public:
int m_size;
hashCell** m_data;
};
#define GET_HASH_TABLE(node) (((vechashtable*)(node->hashTbl)))
template <bool reHash>
inline void hashBasedOperator::hashCellT(hashCell* cell, int keyIdx, FmgrInfo* hashFmgr, int nval, ScalarValue* hashRes)
{
hashVal val;
ScalarValue hashV;
FunctionCallInfoData fcinfo;
Datum args[2];
fcinfo.arg = &args[0];
fcinfo.flinfo = hashFmgr;
PGFunction func = hashFmgr->fn_addr;
for (int j = 0; j < nval; j++) {
val = cell->m_val[keyIdx];
if (likely(NOT_NULL(val.flag))) {
fcinfo.arg[0] = val.val;
if (reHash) {
hashV = hashRes[j];
hashV = (hashV << 1) | ((hashV & 0x80000000) ? 1 : 0);
hashV ^= func(&fcinfo);
hashRes[j] = hashV;
} else
hashRes[j] = func(&fcinfo);
} else {
if (!reHash)
hashRes[j] = 0;
}
cell = (hashCell*)((char*)cell + m_cellSize);
}
}
template <bool reHash>
inline void hashBasedOperator::hashColT(ScalarVector* val, FmgrInfo* hashFmgr, int nval, ScalarValue* hashRes)
{
ScalarValue* value = val->m_vals;
uint8* flag = val->m_flag;
FunctionCallInfoData fcinfo;
Datum args[2];
fcinfo.arg = &args[0];
fcinfo.flinfo = hashFmgr;
PGFunction func = hashFmgr->fn_addr;
ScalarValue hashV;
for (int j = 0; j < nval; j++) {
if (likely(NOT_NULL(flag[j]))) {
fcinfo.arg[0] = value[j];
if (reHash) {
hashV = hashRes[j];
hashV = (hashV << 1) | ((hashV & 0x80000000) ? 1 : 0);
hashV ^= func(&fcinfo);
hashRes[j] = hashV;
} else
hashRes[j] = func(&fcinfo);
} else {
if (!reHash)
hashRes[j] = 0;
}
}
}
inline void hashBasedOperator::hashBatch(
VectorBatch* batch, int* keyIdx, ScalarValue* hashRes, FmgrInfo* hashFmgr, bool needSpill)
{
int i;
int nrows = batch->m_rows;
ScalarVector* pVector = batch->m_arr;
AutoContextSwitch memGuard(m_tmpContext);
hashColT<false>(&pVector[keyIdx[0]], hashFmgr, nrows, hashRes);
for (i = 1; i < m_key; i++)
hashColT<true>(&pVector[keyIdx[i]], (hashFmgr + i), nrows, hashRes);
for (i = 0; i < nrows; i++) {
if (needSpill)
hashRes[i] = hash_new_uint32(DatumGetUInt32(hashRes[i]));
else
hashRes[i] = hash_uint32(DatumGetUInt32(hashRes[i]));
}
MemoryContextReset(m_tmpContext);
}
inline void hashBasedOperator::hashCellArray(
hashCell* cell, int nrows, int* keyIdx, ScalarValue* hashRes, FmgrInfo* hashFmgr, bool needSpill)
{
int i;
AutoContextSwitch memGuard(m_tmpContext);
hashCellT<false>(cell, keyIdx[0], hashFmgr, nrows, hashRes);
for (i = 1; i < m_key; i++)
hashCellT<true>(cell, keyIdx[i], (hashFmgr + i), nrows, hashRes);
for (i = 0; i < nrows; i++) {
if (needSpill)
hashRes[i] = hash_new_uint32(DatumGetUInt32(hashRes[i]));
else
hashRes[i] = hash_uint32(DatumGetUInt32(hashRes[i]));
}
MemoryContextReset(m_tmpContext);
}
extern ScalarValue addVariable(MemoryContext context, ScalarValue val);
extern ScalarValue replaceVariable(MemoryContext context, ScalarValue oldVal, ScalarValue val);
extern ScalarValue addToVarBuffer(VarBuf* buf, ScalarValue value);
extern void* TempFileCreate();
extern ScalarValue DatumToScalarInContext(MemoryContext context, Datum datumVal, Oid datumType);
#endif