CREATE OR REPLACE FUNCTION sch_chameleon.fn_process_batch(integer,integer)
RETURNS BOOLEAN AS
$BODY$
DECLARE
p_i_max_events ALIAS FOR $1;
p_i_source_id ALIAS FOR $2;
v_b_loop boolean;
v_r_rows record;
v_i_id_batch bigint;
v_t_ddl text;
v_i_replayed integer;
v_i_skipped integer;
v_i_ddl integer;
v_i_evt_replay bigint[];
v_i_evt_queue bigint[];
BEGIN
v_b_loop:=FALSE;
v_i_replayed:=0;
v_i_ddl:=0;
v_i_skipped:=0;
v_i_id_batch:= (
SELECT
i_id_batch
FROM ONLY
sch_chameleon.t_replica_batch
WHERE
b_started
AND b_processed
AND NOT b_replayed
AND i_id_source=p_i_source_id
ORDER BY
ts_created
LIMIT 1
)
;
v_i_evt_replay:=(
SELECT
i_id_event[1:p_i_max_events]
FROM
sch_chameleon.t_batch_events
WHERE
i_id_batch=v_i_id_batch
);
v_i_evt_queue:=(
SELECT
i_id_event[p_i_max_events+1:array_length(i_id_event,1)]
FROM
sch_chameleon.t_batch_events
WHERE
i_id_batch=v_i_id_batch
);
IF v_i_id_batch IS NULL
THEN
RETURN v_b_loop;
END IF;
RAISE DEBUG 'Found id_batch %', v_i_id_batch;
FOR v_r_rows IN
SELECT
CASE
WHEN enm_binlog_event = 'ddl'
THEN
t_query
WHEN enm_binlog_event = 'insert'
THEN
format(
'INSERT INTO %I.%I (%s) VALUES (%s);',
v_schema_name,
v_table_name,
array_to_string(t_colunm,','),
array_to_string(t_event_data,',')
)
WHEN enm_binlog_event = 'update'
THEN
format(
'UPDATE %I.%I SET %s WHERE %s;',
v_schema_name,
v_table_name,
t_update,
t_pk_update
)
WHEN enm_binlog_event = 'delete'
THEN
format(
'DELETE FROM %I.%I WHERE %s;',
v_schema_name,
v_table_name,
t_pk_data
)
END AS t_sql,
i_id_event,
i_id_batch,
enm_binlog_event
FROM
(
<<<<<<< HEAD
<<<<<<< 98364345ea29577df3de6311fc350e8f57876fe9
SELECT
=======
SELECT
>>>>>>> new function seems to work properly
=======
SELECT
>>>>>>> 16ec83d0b6f3e30ab282c65489405f1ae01609e9
i_id_event,
i_id_batch,
v_table_name,
v_schema_name,
enm_binlog_event,
<<<<<<< HEAD
<<<<<<< 98364345ea29577df3de6311fc350e8f57876fe9
=======
>>>>>>> 16ec83d0b6f3e30ab282c65489405f1ae01609e9
t_query,
ts_event_datetime,
t_pk_data,
t_pk_update,
array_agg(quote_ident(t_column)) AS t_colunm,
string_agg(distinct format('%I=%L',t_column,jsb_event_data->>t_column),',') as t_update,
array_agg(quote_nullable(jsb_event_data->>t_column)) as t_event_data
FROM
(
SELECT
i_id_event,
i_id_batch,
v_table_name,
v_schema_name,
enm_binlog_event,
jsb_event_data,
jsb_event_update,
t_query,
ts_event_datetime,
string_agg(distinct format('%I=%L',v_pkey,jsb_event_data->>v_pkey),' AND ') as t_pk_data,
string_agg(distinct format('%I=%L',v_pkey,jsb_event_update->>v_pkey),' AND ') as t_pk_update,
(jsonb_each_text(coalesce(jsb_event_data,'{"foo":"bar"}'::jsonb))).key AS t_column
FROM
(
SELECT
i_id_event,
i_id_batch,
v_table_name,
v_schema_name,
enm_binlog_event,
jsb_event_data,
jsb_event_update,
t_query,
ts_event_datetime,
replace(unnest(string_to_array(v_table_pkey[1],',')),'"','') as v_pkey
FROM
(
SELECT
log.i_id_event,
log.i_id_batch,
log.v_table_name,
log.v_schema_name,
log.enm_binlog_event,
log.jsb_event_data,
log.jsb_event_update,
log.t_query,
ts_event_datetime,
v_table_pkey
FROM
sch_chameleon.t_log_replica log
INNER JOIN sch_chameleon.t_replica_tables tab
ON
tab.v_table_name=log.v_table_name
AND tab.v_schema_name=log.v_schema_name
WHERE
log.i_id_batch=v_i_id_batch
AND log.i_id_event=ANY(v_i_evt_replay)
) t_log
) t_pkey
GROUP BY
i_id_event,
i_id_batch,
v_table_name,
v_schema_name,
enm_binlog_event,
jsb_event_data,
jsb_event_update,
t_query,
ts_event_datetime
) t_columns
<<<<<<< HEAD
=======
array_agg(quote_ident(t_column)) AS t_colunm,
array_agg(quote_literal(jsb_event_data->>t_column)) as t_event_data,
array_agg(jsb_event_update->>t_column) as t_event_update,
string_agg(distinct format('%I=%L',t_column,jsb_event_update->>t_column),',') as t_update,
string_agg(distinct format('%I=%L',v_pkey,jsb_event_data->>v_pkey),' AND ') as t_pk_data,
string_agg(distinct format('%I=%L',v_pkey,jsb_event_update->>v_pkey),' AND ') as t_pk_update,
t_query
FROM
(
SELECT
log.i_id_event,
log.i_id_batch,
log.v_table_name,
log.v_schema_name,
log.enm_binlog_event,
log.jsb_event_data,
log.jsb_event_update,
log.t_query,
replace(unnest(string_to_array(v_table_pkey[1],',')),'"','') as v_pkey,
ts_event_datetime,
(jsonb_each_text(coalesce(log.jsb_event_data,'{"foo":"bar"}'::jsonb))).key AS t_column
FROM
sch_chameleon.t_log_replica log
INNER JOIN sch_chameleon.t_replica_tables tab
ON
tab.v_table_name=log.v_table_name
AND tab.v_schema_name=log.v_schema_name
WHERE
log.i_id_batch=v_i_id_batch
) t_dat
>>>>>>> new function seems to work properly
=======
>>>>>>> 16ec83d0b6f3e30ab282c65489405f1ae01609e9
GROUP BY
i_id_event,
i_id_batch,
v_table_name,
v_schema_name,
enm_binlog_event,
t_query,
<<<<<<< HEAD
<<<<<<< 98364345ea29577df3de6311fc350e8f57876fe9
=======
>>>>>>> 16ec83d0b6f3e30ab282c65489405f1ae01609e9
ts_event_datetime,
t_pk_data,
t_pk_update
) t_sql
<<<<<<< HEAD
=======
ts_event_datetime
ORDER BY ts_event_datetime
) t_query
>>>>>>> new function seems to work properly
=======
>>>>>>> 16ec83d0b6f3e30ab282c65489405f1ae01609e9
LOOP
EXECUTE v_r_rows.t_sql;
IF v_r_rows.enm_binlog_event='ddl'
THEN
v_i_ddl:=v_i_ddl+1;
ELSE
v_i_replayed:=v_i_replayed+1;
END IF;
<<<<<<< HEAD
<<<<<<< 98364345ea29577df3de6311fc350e8f57876fe9
=======
DELETE FROM sch_chameleon.t_log_replica
WHERE
i_id_event=v_r_rows.i_id_event
;
>>>>>>> new function seems to work properly
=======
>>>>>>> 16ec83d0b6f3e30ab282c65489405f1ae01609e9
END LOOP;
IF v_i_replayed=0 AND v_i_ddl=0
THEN
<<<<<<< HEAD
<<<<<<< 4d3438102c559b55a0e3c42a1d5fb123edec5e61
<<<<<<< 98364345ea29577df3de6311fc350e8f57876fe9
=======
>>>>>>> improve performance for the replay plpgsql function
=======
>>>>>>> 16ec83d0b6f3e30ab282c65489405f1ae01609e9
DELETE FROM sch_chameleon.t_log_replica
WHERE
i_id_batch=v_i_id_batch
;
GET DIAGNOSTICS v_i_skipped = ROW_COUNT;
<<<<<<< HEAD
<<<<<<< 4d3438102c559b55a0e3c42a1d5fb123edec5e61
=======
=======
>>>>>>> improve performance for the replay plpgsql function
UPDATE ONLY sch_chameleon.t_replica_batch
SET
b_replayed=True,
i_skipped=v_i_skipped,
ts_replayed=clock_timestamp()
WHERE
i_id_batch=v_i_id_batch
;
v_b_loop=False;
ELSE
UPDATE ONLY sch_chameleon.t_replica_batch
SET
i_ddl=coalesce(i_ddl,0)+v_i_ddl,
i_replayed=coalesce(i_replayed,0)+v_i_replayed,
ts_replayed=clock_timestamp()
WHERE
i_id_batch=v_r_rows.i_id_batch
;
v_b_loop=True;
END IF;
>>>>>>> new function seems to work properly
=======
>>>>>>> 16ec83d0b6f3e30ab282c65489405f1ae01609e9
UPDATE ONLY sch_chameleon.t_replica_batch
SET
b_replayed=True,
i_skipped=v_i_skipped,
ts_replayed=clock_timestamp()
WHERE
i_id_batch=v_i_id_batch
;
DELETE FROM sch_chameleon.t_batch_events
WHERE
i_id_batch=v_i_id_batch
;
v_b_loop=False;
ELSE
UPDATE ONLY sch_chameleon.t_replica_batch
SET
i_ddl=coalesce(i_ddl,0)+v_i_ddl,
i_replayed=coalesce(i_replayed,0)+v_i_replayed,
ts_replayed=clock_timestamp()
WHERE
i_id_batch=v_r_rows.i_id_batch
;
UPDATE sch_chameleon.t_batch_events
SET
i_id_event = v_i_evt_queue
WHERE
i_id_batch=v_i_id_batch
;
DELETE FROM sch_chameleon.t_log_replica
WHERE
i_id_batch=v_i_id_batch
AND i_id_event=ANY(v_i_evt_replay)
;
v_b_loop=True;
END IF;
RETURN v_b_loop;
END;
$BODY$
LANGUAGE plpgsql;