"""
ETL Utils - 使用 ETL 重构版本
提供向后兼容的接口,使用 ETL 的注册中心和任务类
"""
from web_apps.datamodel.services.datamodel_service import gen_datasource_model_info, gen_extract_info, gen_load_info
from etl.registry import get_reader, get_writer, get_registry
from etl.etl_task import EtlTask
from utils.common_utils import get_res_fields
__all__ = [
'MyEtlTask',
'get_reader_model',
'get_writer_model',
'get_res_fields',
'get_reader',
'get_writer',
'EtlTask',
'get_registry'
]
class MyEtlTask(EtlTask):
def __init__(self, task_params):
super().__init__(task_params)
def gen_data_models(self):
'''
获取读取或写入数据模型
使用 ETL 的注册中心
:return:
'''
self.extract_info = self.params.get('extract', {})
self.extract_type = self.extract_info.get('extract_type', 'once')
flag, reader = get_reader_model(self.extract_info)
if flag:
self.reader = reader
else:
self.error_list.append(reader)
self.reader = None
if self.load_info is not None:
flag, writer = get_writer_model(self.load_info)
if flag:
self.writer = writer
else:
self.writer = None
else:
self.writer = None
def get_reader_model(extract_info):
'''
获取reader对象
使用 ETL 的注册中心
:return:
'''
if 'datasource_id' in extract_info:
flag, extract_info = gen_datasource_model_info(extract_info['datasource_id'])
if not flag:
return False, extract_info
if 'model_id' in extract_info:
flag, extract_info = gen_extract_info(extract_info)
if not flag:
return False, extract_info
try:
flag, data_model = get_reader(extract_info)
return flag, data_model
except Exception as e:
return False, str(e)
def get_writer_model(load_info):
'''
获取writer对象
使用 ETL 的注册中心
:return:
'''
if 'model_id' in load_info:
flag, load_info = gen_load_info(load_info)
if not flag:
return False, load_info
try:
flag, data_model = get_writer(load_info)
return flag, data_model
except Exception as e:
return False, str(e)