* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* vsonicfilesource.h
* sonic file source class and class member declare
*
* IDENTIFICATION
* src/include/vectorsonic/vsonicfilesource.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef SRC_INCLUDE_VECTORSONIC_VSONICFILESOURCE_H_
#define SRC_INCLUDE_VECTORSONIC_VSONICFILESOURCE_H_
#include "postgres.h"
#include "knl/knl_variable.h"
#include "vectorsonic/vsonicarray.h"
#define CheckReadIsValid(nread, toread) \
if ((nread) != (toread)) { \
ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from sonic hash-join temporary file: %m"))); \
}
typedef struct FileInfo {
uint32 partColIdx;
int typeSize;
uint8 varheadlen;
} FileInfo;
class SonicHashFileSource : public BaseObject {
public:
SonicHashFileSource(MemoryContext context);
SonicHashFileSource(MemoryContext context, DatumDesc* desc);
virtual ~SonicHashFileSource()
{}
MemoryContext m_context;
DatumDesc m_desc;
void* m_file;
uint32 m_cols;
size_t m_colWidth;
uint32 m_nextColIdx;
FileInfo* m_fileInfo;
* record the size of varible data which will not contained within the Atom's allocated space
* when loading into memory, such as SONIC_NUMERIC_COMPRESS_TYPE and SONIC_VAR_TYPE,
* whose data may stored in buffers (eg. VarBuf) other than Atom.
* The Atom only holds the pointers of the data.
*/
size_t* m_varSize;
size_t (SonicHashFileSource::*m_writeScalarValue)(ScalarValue* val, uint8 flag);
size_t (SonicHashFileSource::*m_readScalarValue)(ScalarValue* val, uint8* flag);
size_t (SonicHashFileSource::*m_readTempFile)(void* file, void* data, size_t size);
size_t (SonicHashFileSource::*m_writeTempFile)(void* file, void* data, size_t size);
void (SonicHashFileSource::*m_rewind)();
void (SonicHashFileSource::*m_close)();
void prepareFileHandlerBuffer();
void releaseFileHandlerBuffer();
size_t write(void* data, size_t size);
size_t read(void* data, size_t size);
public:
virtual void setBuffer()
{
return;
}
size_t getTypeSize(int colIdx)
{
return (size_t)m_fileInfo[colIdx].typeSize;
}
uint32 getMaxCols()
{
return m_cols;
}
size_t getColWidth()
{
return m_colWidth;
}
size_t writeVal(char* val, uint8 flag = V_NOTNULL_MASK);
size_t writeVal(ScalarValue* val, uint8 flag);
size_t readVal(char* val, uint8* flag = NULL);
size_t readVal(ScalarValue* val, uint8* flag);
void rewind();
void close();
private:
virtual size_t writeScalar(ScalarValue* val, uint8 flag)
{
return 0;
}
virtual size_t readScalar(ScalarValue* val, uint8* flag)
{
return 0;
}
virtual size_t writeScalarOpt(ScalarValue* val, uint8 flag)
{
return 0;
}
virtual size_t readScalarOpt(ScalarValue* val, uint8* flag)
{
return 0;
}
virtual void init(DatumDesc* desc, uint16 fileIdx)
{
Assert(false);
}
size_t writeCompress(void* file, void* data, size_t size);
size_t writeNoCompress(void* file, void* data, size_t size);
size_t readCompress(void* file, void* data, size_t size);
size_t readNoCompress(void* file, void* data, size_t size);
void rewindCompress();
void rewindNoCompress();
void closeCompress();
void closeNoCompress();
};
class SonicHashIntFileSource : public SonicHashFileSource {
private:
size_t (SonicHashIntFileSource::*m_writeIntScalarValue)(ScalarValue* val, uint8 flag);
size_t (SonicHashIntFileSource::*m_readIntScalarValue)(ScalarValue* val, uint8* flag);
typedef size_t (SonicHashIntFileSource::*pWriteIntScalarValue)(ScalarValue* val, uint8 flag);
typedef size_t (SonicHashIntFileSource::*pReadIntScalarValue)(ScalarValue* val, uint8* flag);
pWriteIntScalarValue* m_writeIntScalarValueFuncs;
pReadIntScalarValue* m_readIntScalarValueFuncs;
public:
SonicHashIntFileSource(MemoryContext context, DatumDesc* desc, bool hasFlag = true);
~SonicHashIntFileSource(){};
SonicHashIntFileSource(MemoryContext context, uint32 maxCols, bool hasFlag = true);
size_t writeScalar(ScalarValue* val, uint8 flag);
size_t writeScalarOpt(ScalarValue* val, uint8 flag);
size_t write1ByteScalar(ScalarValue* val, uint8 flag);
size_t write1ByteScalarOpt(ScalarValue* val, uint8 flag);
size_t write2ByteScalar(ScalarValue* val, uint8 flag);
size_t write2ByteScalarOpt(ScalarValue* val, uint8 flag);
size_t write4ByteScalar(ScalarValue* val, uint8 flag);
size_t write4ByteScalarOpt(ScalarValue* val, uint8 flag);
size_t write8ByteScalar(ScalarValue* val, uint8 flag);
size_t write8ByteScalarOpt(ScalarValue* val, uint8 flag);
size_t readScalar(ScalarValue* val, uint8* flag);
size_t readScalarOpt(ScalarValue* val, uint8* flag);
size_t read1ByteScalar(ScalarValue* val, uint8* flag);
size_t read1ByteScalarOpt(ScalarValue* val, uint8* flag);
size_t read2ByteScalar(ScalarValue* val, uint8* flag);
size_t read2ByteScalarOpt(ScalarValue* val, uint8* flag);
size_t read4ByteScalar(ScalarValue* val, uint8* flag);
size_t read4ByteScalarOpt(ScalarValue* val, uint8* flag);
size_t read8ByteScalar(ScalarValue* val, uint8* flag);
size_t read8ByteScalarOpt(ScalarValue* val, uint8* flag);
void init(DatumDesc* desc, uint16 fileIdx);
private:
size_t readValue(char* val, uint8* flag);
};
class SonicHashNumericFileSource : public SonicHashFileSource {
public:
SonicHashNumericFileSource(MemoryContext context, DatumDesc* desc);
SonicHashNumericFileSource(MemoryContext context, uint32 maxCols);
~SonicHashNumericFileSource(){};
void init(DatumDesc* desc, uint16 fileIdx);
void setBuffer();
size_t writeScalar(ScalarValue* val, uint8 flag);
size_t writeScalarOpt(ScalarValue* val, uint8 flag);
size_t readScalar(ScalarValue* val, uint8* flag);
size_t readScalarOpt(ScalarValue* val, uint8* flag);
private:
char* m_packBuf;
VarBuf* m_buf;
};
class SonicHashEncodingFileSource : public SonicHashFileSource {
public:
SonicHashEncodingFileSource(MemoryContext context, DatumDesc* desc);
SonicHashEncodingFileSource(MemoryContext context, uint32 maxCols);
~SonicHashEncodingFileSource(){};
void init(DatumDesc* desc, uint16 fileIdx);
void setBuffer();
size_t writeScalar(ScalarValue* val, uint8 flag);
size_t writeScalarOpt(ScalarValue* val, uint8 flag);
size_t readScalar(ScalarValue* val, uint8* flag);
size_t readScalarOpt(ScalarValue* val, uint8* flag);
private:
VarBuf* m_buf;
};
class SonicHashCharFileSource : public SonicHashFileSource {
private:
char* m_buf;
char* m_batchBuf;
uint8 m_varheadlen;
public:
SonicHashCharFileSource(MemoryContext context, DatumDesc* desc);
SonicHashCharFileSource(MemoryContext context, int typeSize, uint32 maxCols);
~SonicHashCharFileSource(){};
void init(DatumDesc* desc, uint16 fileIdx);
void setBuffer();
size_t writeScalar(ScalarValue* val, uint8 flag);
size_t writeScalarOpt(ScalarValue* val, uint8 flag);
size_t readScalar(ScalarValue* val, uint8* flag);
size_t readScalarOpt(ScalarValue* val, uint8* flag);
private:
void initBatchBuf();
};
class SonicHashFixLenFileSource : public SonicHashFileSource {
private:
uint8 m_varheadlen;
char* m_batchBuf;
char* m_buf;
char* m_tmpvalue;
public:
SonicHashFixLenFileSource(MemoryContext context, DatumDesc* desc);
~SonicHashFixLenFileSource(){};
SonicHashFixLenFileSource(MemoryContext context, uint32 maxCols);
void init(DatumDesc* desc, uint16 fileIdx);
void setBuffer();
size_t writeScalar(ScalarValue* val, uint8 flag);
size_t writeScalarOpt(ScalarValue* val, uint8 flag);
size_t readScalar(ScalarValue* val, uint8* flag);
size_t readScalarOpt(ScalarValue* val, uint8* flag);
private:
void initBatchBuf();
};
#endif