CREATE OR REPLACE FUNCTION sch_chameleon.fn_replay_mysql(integer,integer,boolean)
RETURNS sch_chameleon.ty_replay_status AS
$BODY$
DECLARE
p_i_max_events ALIAS FOR $1;
p_i_id_source ALIAS FOR $2;
p_b_exit_on_error ALIAS FOR $3;
v_ty_status sch_chameleon.ty_replay_status;
v_r_statements record;
v_i_id_batch bigint;
v_t_ddl text;
v_i_replayed integer;
v_i_skipped integer;
v_i_ddl integer;
v_i_evt_replay bigint[];
v_i_evt_queue bigint[];
v_ts_evt_source timestamp without time zone;
v_tab_enabled boolean;
BEGIN
v_tab_enabled:=TRUE;
v_ty_status.b_continue:=FALSE;
v_ty_status.b_error:=FALSE;
v_i_replayed:=0;
v_i_ddl:=0;
v_i_skipped:=0;
v_i_id_batch:= (
SELECT
bat.i_id_batch
FROM
sch_chameleon.t_replica_batch bat
INNER JOIN sch_chameleon.t_batch_events evt
ON
evt.i_id_batch=bat.i_id_batch
WHERE
bat.b_started
AND bat.b_processed
AND NOT bat.b_replayed
AND bat.i_id_source=p_i_id_source
ORDER BY
bat.ts_created
LIMIT 1
)
;
v_i_evt_replay:=(
SELECT
i_id_event[1:p_i_max_events]
FROM
sch_chameleon.t_batch_events
WHERE
i_id_batch=v_i_id_batch
);
v_i_evt_queue:=(
SELECT
i_id_event[p_i_max_events+1:array_length(i_id_event,1)]
FROM
sch_chameleon.t_batch_events
WHERE
i_id_batch=v_i_id_batch
);
v_ts_evt_source:=(
SELECT
to_timestamp(i_my_event_time)
FROM
sch_chameleon.t_log_replica
WHERE
i_id_event=v_i_evt_replay[array_length(v_i_evt_replay,1)]
AND i_id_batch=v_i_id_batch
);
IF v_i_id_batch IS NULL
THEN
RETURN v_ty_status;
END IF;
RAISE DEBUG 'Found id_batch %', v_i_id_batch;
FOR v_r_statements IN
WITH
t_tables AS
(
SELECT i_id_source,
v_table_name,
v_schema_name,
unnest(v_table_pkey) as v_table_pkey
FROM
sch_chameleon.t_replica_tables
WHERE
b_replica_enabled
AND i_id_source=p_i_id_source
),
t_events AS
(
SELECT
i_id_event
FROM
unnest(v_i_evt_replay) AS i_id_event
)
SELECT
CASE
WHEN enm_binlog_event = 'ddl'
THEN
t_query
WHEN enm_binlog_event = 'insert'
THEN
format(
'INSERT INTO %I.%I (%s) VALUES (%s);',
v_schema_name,
v_table_name,
array_to_string(t_colunm,','),
array_to_string(t_event_data,',')
)
WHEN enm_binlog_event = 'update'
THEN
format(
'UPDATE %I.%I SET %s WHERE %s;',
v_schema_name,
v_table_name,
t_update,
t_pk_update
)
WHEN enm_binlog_event = 'delete'
THEN
format(
'DELETE FROM %I.%I WHERE %s;',
v_schema_name,
v_table_name,
t_pk_data
)
END AS t_sql,
i_id_event,
i_id_batch,
enm_binlog_event,
v_schema_name,
v_table_name,
t_pk_data
FROM
(
SELECT
i_id_event,
i_id_batch,
v_table_name,
v_schema_name,
enm_binlog_event,
t_query,
ts_event_datetime,
t_pk_data,
t_pk_update,
array_agg(quote_ident(t_column)) AS t_colunm,
string_agg(distinct format('%I=%L',t_column,jsb_event_after->>t_column),',') as t_update,
array_agg(quote_nullable(jsb_event_after->>t_column)) as t_event_data
FROM
(
SELECT
i_id_event,
i_id_batch,
v_table_name,
v_schema_name,
enm_binlog_event,
jsb_event_after,
jsb_event_before,
t_query,
ts_event_datetime,
string_agg(distinct format('%I=%L',v_pkey,jsb_event_after->>v_pkey),' AND ') as t_pk_data,
string_agg(distinct format('%I=%L',v_pkey,jsb_event_before->>v_pkey),' AND ') as t_pk_update,
(jsonb_each_text(coalesce(jsb_event_after,'{"foo":"bar"}'::jsonb))).key AS t_column
FROM
(
SELECT
log.i_id_event,
log.i_id_batch,
log.v_table_name,
log.v_schema_name,
log.enm_binlog_event,
log.jsb_event_after,
log.jsb_event_before,
log.t_query,
ts_event_datetime,
v_table_pkey as v_pkey
FROM
sch_chameleon.t_log_replica log
INNER JOIN t_tables tab
ON
tab.v_table_name=log.v_table_name
AND tab.v_schema_name=log.v_schema_name
INNER JOIN t_events evt
ON log.i_id_event=evt.i_id_event
) t_pkey
GROUP BY
i_id_event,
i_id_batch,
v_table_name,
v_schema_name,
enm_binlog_event,
jsb_event_after,
jsb_event_before,
t_query,
ts_event_datetime
) t_columns
GROUP BY
i_id_event,
i_id_batch,
v_table_name,
v_schema_name,
enm_binlog_event,
t_query,
ts_event_datetime,
t_pk_data,
t_pk_update
) t_sql
ORDER BY i_id_event
LOOP
BEGIN
EXECUTE v_r_statements.t_sql;
IF v_r_statements.enm_binlog_event='ddl'
THEN
v_i_ddl:=v_i_ddl+1;
ELSE
v_i_replayed:=v_i_replayed+1;
END IF;
EXCEPTION
WHEN OTHERS THEN
v_tab_enabled:=(
SELECT
b_replica_enabled
FROM
sch_chameleon.t_replica_tables
WHERE
v_schema_name=v_r_statements.v_schema_name
AND v_table_name=v_r_statements.v_table_name
)
;
IF v_tab_enabled
THEN
RAISE NOTICE 'An error occurred when replaying data for the table %.%',v_r_statements.v_schema_name,v_r_statements.v_table_name;
RAISE NOTICE 'SQLSTATE: % - ERROR MESSAGE %',SQLSTATE, SQLERRM;
RAISE NOTICE 'The table %.% has been removed from the replica',v_r_statements.v_schema_name,v_r_statements.v_table_name;
v_ty_status.v_table_error:=array_append(v_ty_status.v_table_error, format('%I.%I SQLSTATE: %s - ERROR MESSAGE: %s',v_r_statements.v_schema_name,v_r_statements.v_table_name,SQLSTATE, SQLERRM)::character varying) ;
RAISE NOTICE 'Adding error log entry for table %.% ',v_r_statements.v_schema_name,v_r_statements.v_table_name;
INSERT INTO sch_chameleon.t_error_log
(
i_id_batch,
i_id_source,
v_schema_name,
v_table_name,
t_table_pkey,
t_binlog_name,
i_binlog_position,
ts_error,
t_sql,
t_error_message
)
SELECT
i_id_batch,
p_i_id_source,
v_schema_name,
v_table_name,
v_r_statements.t_pk_data as t_table_pkey,
t_binlog_name,
i_binlog_position,
clock_timestamp(),
quote_literal(v_r_statements.t_sql) as t_sql,
format('%s - %s',SQLSTATE, SQLERRM) as t_error_message
FROM
sch_chameleon.t_log_replica log
WHERE
log.i_id_event=v_r_statements.i_id_event
;
IF p_b_exit_on_error
THEN
v_ty_status.b_continue:=FALSE;
v_ty_status.b_error:=TRUE;
RETURN v_ty_status;
ELSE
RAISE NOTICE 'Statement %', v_r_statements.t_sql;
UPDATE sch_chameleon.t_replica_tables
SET
b_replica_enabled=FALSE
WHERE
v_schema_name=v_r_statements.v_schema_name
AND v_table_name=v_r_statements.v_table_name
;
RAISE NOTICE 'Deleting the log entries for the table %.% ',v_r_statements.v_schema_name,v_r_statements.v_table_name;
DELETE FROM sch_chameleon.t_log_replica log
WHERE
v_table_name=v_r_statements.v_table_name
AND v_schema_name=v_r_statements.v_schema_name
AND i_id_batch=v_i_id_batch
;
END IF;
END IF;
END;
END LOOP;
IF v_ts_evt_source IS NOT NULL
THEN
UPDATE sch_chameleon.t_last_replayed
SET
ts_last_replayed=v_ts_evt_source
WHERE
i_id_source=p_i_id_source
;
END IF;
IF v_i_replayed=0 AND v_i_ddl=0
THEN
DELETE FROM sch_chameleon.t_log_replica
WHERE
i_id_batch=v_i_id_batch
;
GET DIAGNOSTICS v_i_skipped = ROW_COUNT;
UPDATE ONLY sch_chameleon.t_replica_batch
SET
b_replayed=True,
i_skipped=v_i_skipped,
ts_replayed=clock_timestamp()
WHERE
i_id_batch=v_i_id_batch
;
DELETE FROM sch_chameleon.t_batch_events
WHERE
i_id_batch=v_i_id_batch
;
v_ty_status.b_continue:=FALSE;
ELSE
UPDATE ONLY sch_chameleon.t_replica_batch
SET
i_ddl=coalesce(i_ddl,0)+v_i_ddl,
i_replayed=coalesce(i_replayed,0)+v_i_replayed,
i_skipped=v_i_skipped,
ts_replayed=clock_timestamp()
WHERE
i_id_batch=v_i_id_batch
;
UPDATE sch_chameleon.t_batch_events
SET
i_id_event = v_i_evt_queue
WHERE
i_id_batch=v_i_id_batch
;
DELETE FROM sch_chameleon.t_log_replica
WHERE
i_id_batch=v_i_id_batch
AND i_id_event=ANY(v_i_evt_replay)
;
v_ty_status.b_continue:=TRUE;
RETURN v_ty_status;
END IF;
v_i_id_batch:= (
SELECT
bat.i_id_batch
FROM
sch_chameleon.t_replica_batch bat
INNER JOIN sch_chameleon.t_batch_events evt
ON
evt.i_id_batch=bat.i_id_batch
WHERE
bat.b_started
AND bat.b_processed
AND NOT bat.b_replayed
AND bat.i_id_source=p_i_id_source
ORDER BY
bat.ts_created
LIMIT 1
)
;
IF v_i_id_batch IS NOT NULL
THEN
v_ty_status.b_continue:=TRUE;
END IF;
RETURN v_ty_status;
END;
$BODY$
LANGUAGE plpgsql;