15670430创建于 2020年12月28日历史提交
/*
 * Copyright (c) 2020 Huawei Technologies Co.,Ltd.
 *
 * openGauss is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *          http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 * ---------------------------------------------------------------------------------------
 *
 * init.h
 *        Head file for streaming engine init.
 *
 *
 * IDENTIFICATION
 *        src/include/streaming/init.h
 *
 * ---------------------------------------------------------------------------------------
 */

#ifndef SRC_INCLUDE_STREAMING_INIT_H_
#define SRC_INCLUDE_STREAMING_INIT_H_

#include "gs_thread.h"
#include "streaming/launcher.h"
#include "access/tupdesc.h"

typedef int StreamingThreadSeqNum;

typedef struct StreamingBatchStats {
    union {
        struct { // coordinator stats
            volatile uint64 worker_in_rows; /* worker input rows */
            volatile uint64 worker_in_bytes; /* worker input bytes */
            volatile uint64 worker_out_rows; /* worker output rows */
            volatile uint64 worker_out_bytes; /* worker output bytes */
            volatile uint64 worker_pending_times; /* worker pending times */
            volatile uint64 worker_error_times; /* worker error times */
        };
        struct { // datanode stats
            volatile uint64 router_in_rows; /* router input rows */
            volatile uint64 router_in_bytes; /* router input bytes */
            volatile uint64 router_out_rows; /* router output rows */
            volatile uint64 router_out_bytes; /* router output bytes */
            volatile uint64 router_error_times; /* router error times */
            volatile uint64 collector_in_rows; /* collector input rows */
            volatile uint64 collector_in_bytes; /* collector input bytes */
            volatile uint64 collector_out_rows; /* collector output rows */
            volatile uint64 collector_out_bytes; /* collector output bytes */
            volatile uint64 collector_pending_times; /* collector pending times */
            volatile uint64 collector_error_times; /* collector error times */
        };
    };
} StreamingBatchStats;

typedef struct StreamingSharedMetaData {
    volatile uint32 client_push_conn_atomic; /* round robin client push connection atomic */
    void *conn_hash_tbl; /* connection hash table for streaming threads */
    StreamingBatchStats *batch_stats; /* streaming engine microbatch statistics */
}StreamingSharedMetaData;

typedef struct StreamingThreadMetaData {
    ThreadId tid;
    knl_thread_role subrole;
    StreamingThreadSeqNum tseq;
}StreamingThreadMetaData;

typedef struct DictDesc
{
    char *nspname;
    char *relname;
    char *indname;
    Oid relid;
    Oid indrelid;
    TupleDesc desc;
    int nkeys;
    int key;
} DictDesc;

#define DICT_CACHE_SIZE 1024

typedef struct knl_t_streaming_context {
    volatile bool is_streaming_engine;
    volatile bool loaded;    /* streaming engine loaded flag */
    void *save_utility_hook;
    void *save_post_parse_analyze_hook;
    StreamingBackendServerLoopFunc streaming_backend_serverloop_hook;
    StreamingBackendShutdownFunc streaming_backend_shutdown_hook;
    void *streaming_planner_hook;
    volatile bool got_SIGHUP;
    volatile bool got_SIGTERM;
    int client_push_conn_id;
    StreamingThreadMetaData *thread_meta; /* streaming current thread meta */
    unsigned int streaming_context_flags;
    TransactionId cont_query_cache_xid;
    MemoryContext cont_query_cache_cxt;
    void *cont_query_cache;
    int current_cont_query_id;
    Oid streaming_exec_lock_oid;
    MemoryContext ContQueryTransactionContext;
    MemoryContext ContQueryBatchContext;

    HTAB *dict_htable[DICT_CACHE_SIZE];
    MemoryContext dict_context;
    bool dict_inited;
    DictDesc dictdesc[DICT_CACHE_SIZE];
} knl_t_streaming_context;

typedef struct knl_g_streaming_context {
    MemoryContext meta_cxt;    /* streaming engine meta context */
    MemoryContext conn_cxt;    /* streaming engine conn context */
    StreamingSharedMetaData *shared_meta; /* streaming shared meta */
    StreamingThreadMetaData *thread_metas; /* streaming thread metas */
    char *krb_server_keyfile; /* kerberos server keyfile */
    volatile bool got_SIGHUP; /* SIGHUP comm with nanomsg auth */
    bool enable;   /* streaming engine enable flag */
    int router_port; /* the port router thread listens on */
    int routers;   /* number of router threads */
    int workers;   /* number of worker threads */
    int combiners;   /* number of combiner threads */
    int queues;   /* number of queue threads */
    int reapers;   /* number of reaper threads */
    int batch_size; /* max number of tuples for streaming microbatch */
    int batch_mem; /* max size (KB) for streaming microbatch */
    int batch_wait; /* receive timeout (ms) for streaming microbatch */
    int flush_mem; /* max size (KB) for streaming disk flush */
    int flush_wait; /* receive timeout (ms) for streaming disk flush */
    volatile bool exec_lock_flag; /* get exec lock flag */
    int gather_window_interval; /* interval (min) of gather window */
}knl_g_streaming_context;

bool is_streaming_engine_available();
void validate_streaming_engine_status(Node *stmt);

#endif /* SRC_INCLUDE_STREAMING_INIT_H_ */