#!/bin/sh
source $1/env_utils.sh $1 $2
case_db="sync_db"
function test_1() {
echo "create database and tables."
exec_sql $db $pub_node1_port "CREATE DATABASE $case_db"
exec_sql $db $sub_node1_port "CREATE DATABASE $case_db"
exec_sql $case_db $pub_node1_port "CREATE TABLE tab_rep (a int primary key)"
exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep SELECT generate_series(1,10)"
exec_sql $case_db $sub_node1_port "CREATE TABLE tab_rep (a int primary key)"
echo "create publication and subscription."
publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd"
exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub FOR ALL TABLES"
exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
wait_for_subscription_sync $case_db $sub_node1_port
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep")" = "10" ]; then
echo "check initial data synced for first sub success"
else
echo "$failed_keyword when check initial data synced for first sub"
exit 1
fi
exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub"
exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep SELECT generate_series(11,20)"
exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
poll_query_until $case_db $sub_node1_port "SELECT srsubstate FROM pg_subscription_rel" "d" "Timed out while waiting for subscriber to start sync"
exec_sql $case_db $sub_node1_port "DELETE FROM tab_rep"
wait_for_subscription_sync $case_db $sub_node1_port
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep")" = "20" ]; then
echo "check initial data synced for second sub success"
else
echo "$failed_keyword when initial data synced for second sub"
exit 1
fi
exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub2 CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (copy_data = false)"
poll_query_until $case_db $sub_node1_port "SELECT count(*) FROM pg_stat_subscription WHERE subname = 'tap_sub2' AND relid IS NULL AND pid IS NOT NULL" "1" "Timed out while waiting for subscriber to start"
exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub"
exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub2"
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM pg_subscription")" = "0" ]; then
echo "check second and third sub are dropped success"
else
echo "$failed_keyword when second and third sub are dropped"
exit 1
fi
exec_sql $case_db $sub_node1_port "DELETE FROM tab_rep"
exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
wait_for_subscription_sync $case_db $sub_node1_port
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep")" = "20" ]; then
echo "check initial data synced for fourth sub success"
else
echo "$failed_keyword when initial data synced for fourth sub"
exit 1
fi
exec_sql $case_db $sub_node1_port "CREATE TABLE tab_rep_next (a int)"
exec_sql $case_db $pub_node1_port "CREATE TABLE tab_rep_next (a) AS SELECT generate_series(1,10)"
wait_for_catchup $case_db $pub_node1_port "tap_sub"
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep_next")" = "0" ]; then
echo "check no data for table added after subscription initialized success"
else
echo "$failed_keyword when no data for table added after subscription initialized"
exit 1
fi
exec_sql $case_db $sub_node1_port "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION"
wait_for_subscription_sync $case_db $sub_node1_port
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep_next")" = "10" ]; then
echo "check data for table added after subscription initialized are now synced success"
else
echo "$failed_keyword when data for table added after subscription initialized are now synced"
exit 1
fi
exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep_next SELECT generate_series(1,10)"
wait_for_catchup $case_db $pub_node1_port "tap_sub"
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep_next")" = "20" ]; then
echo "check data for table added after subscription initialized are now synced success"
else
echo "$failed_keyword when changes for table added after subscription initialized replicated"
exit 1
fi
exec_sql $case_db $pub_node1_port "DROP TABLE tab_rep_next"
exec_sql $case_db $sub_node1_port "DROP TABLE tab_rep_next"
exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub"
exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
poll_query_until $case_db $sub_node1_port "SELECT srsubstate FROM pg_subscription_rel" "d" "Timed out while waiting for subscriber to start sync"
exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub"
if [ "$(exec_sql $case_db $pub_node1_port "SELECT count(*) FROM pg_replication_slots")" = "2" ]; then
echo "check DROP SUBSCRIPTION during error can clean up the slots on the publisher success"
else
echo "$failed_keyword when DROP SUBSCRIPTION during error can clean up the slots on the publisher"
exit 1
fi
}
function tear_down() {
exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub"
exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub2"
exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub"
exec_sql $db $sub_node1_port "DROP DATABASE $case_db"
exec_sql $db $pub_node1_port "DROP DATABASE $case_db"
echo "tear down"
}
test_1
tear_down