From 524dade18ea7d0e12be40f6ceb99665eb917b3ba Mon Sep 17 00:00:00 2001 From: Jeremy Finzel Date: Fri, 9 Aug 2019 10:48:35 -0500 Subject: [PATCH 1/3] Bump version --- Makefile | 3 +- pglogical_ticker--1.3--1.4.sql | 0 pglogical_ticker--1.4.sql | 647 +++++++++++++++++++++++++++++++++ pglogical_ticker-sql-maker.sh | 4 +- pglogical_ticker.control | 2 +- 5 files changed, 652 insertions(+), 4 deletions(-) create mode 100644 pglogical_ticker--1.3--1.4.sql create mode 100644 pglogical_ticker--1.4.sql diff --git a/Makefile b/Makefile index d744539..02750b6 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,8 @@ EXTENSION = pglogical_ticker DATA = pglogical_ticker--1.0.sql pglogical_ticker--1.0--1.1.sql \ pglogical_ticker--1.1.sql pglogical_ticker--1.1--1.2.sql \ pglogical_ticker--1.2.sql pglogical_ticker--1.2--1.3.sql \ - pglogical_ticker--1.3.sql + pglogical_ticker--1.3.sql pglogical_ticker--1.3--1.4.sql \ + pglogical_ticker--1.4.sql PGFILEDESC = "pglogical_ticker - Have an accurate view of pglogical replication delay" PG_CONFIG = pg_config diff --git a/pglogical_ticker--1.3--1.4.sql b/pglogical_ticker--1.3--1.4.sql new file mode 100644 index 0000000..e69de29 diff --git a/pglogical_ticker--1.4.sql b/pglogical_ticker--1.4.sql new file mode 100644 index 0000000..2225d09 --- /dev/null +++ b/pglogical_ticker--1.4.sql @@ -0,0 +1,647 @@ +/* pglogical_ticker--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION pglogical_ticker" to load this file. \quit + +CREATE FUNCTION pglogical_ticker._launch(oid) + RETURNS pg_catalog.INT4 STRICT +AS 'MODULE_PATHNAME', 'pglogical_ticker_launch' +LANGUAGE C; + +CREATE FUNCTION pglogical_ticker.launch() + RETURNS pg_catalog.INT4 STRICT +AS $BODY$ +SELECT pglogical_ticker._launch(oid) +FROM pg_database +WHERE datname = current_database() +--This should be improved in the future but should do +--the job for now. +AND NOT EXISTS + (SELECT 1 + FROM pg_stat_activity psa + WHERE NOT pid = pg_backend_pid() + AND query = 'SELECT pglogical_ticker.tick();'); +$BODY$ +LANGUAGE SQL; + +CREATE FUNCTION pglogical_ticker.dependency_update() +RETURNS VOID AS +$DEPS$ +/***** +This handles the rename of pglogical.replication_set_relation to pglogical_ticker.rep_set_table_wrapper from version 1 to 2 + */ +BEGIN + +IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'rep_set_table_wrapper' AND table_schema = 'pglogical_ticker') THEN + PERFORM pglogical_ticker.drop_ext_object('VIEW','pglogical_ticker.rep_set_table_wrapper'); + DROP VIEW pglogical_ticker.rep_set_table_wrapper; +END IF; +IF (SELECT extversion FROM pg_extension WHERE extname = 'pglogical') ~* '^1.*' THEN + + CREATE VIEW pglogical_ticker.rep_set_table_wrapper AS + SELECT * + FROM pglogical.replication_set_relation; + +ELSE + + CREATE VIEW pglogical_ticker.rep_set_table_wrapper AS + SELECT * + FROM pglogical.replication_set_table; + +END IF; + +END; +$DEPS$ +LANGUAGE plpgsql; + +SELECT pglogical_ticker.dependency_update(); + +CREATE OR REPLACE FUNCTION pglogical_ticker.add_ext_object + (p_type text + , p_full_obj_name text) +RETURNS VOID AS +$BODY$ +BEGIN +PERFORM pglogical_ticker.toggle_ext_object(p_type, p_full_obj_name, 'ADD'); +END; +$BODY$ +LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION pglogical_ticker.drop_ext_object + (p_type text + , p_full_obj_name text) +RETURNS VOID AS +$BODY$ +BEGIN +PERFORM pglogical_ticker.toggle_ext_object(p_type, p_full_obj_name, 'DROP'); +END; +$BODY$ +LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION pglogical_ticker.toggle_ext_object + (p_type text + , p_full_obj_name text + , p_toggle text) +RETURNS VOID AS +$BODY$ +DECLARE + c_valid_types TEXT[] = ARRAY['EVENT TRIGGER','FUNCTION','VIEW','TABLE']; + c_valid_toggles TEXT[] = ARRAY['ADD','DROP']; +BEGIN + +IF NOT (SELECT ARRAY[upper(p_type)] && c_valid_types) THEN + RAISE EXCEPTION 'Must pass one of % as 1st arg.', array_to_string(c_valid_types,','); +END IF; + +IF NOT (SELECT ARRAY[upper(p_toggle)] && c_valid_toggles) THEN + RAISE EXCEPTION 'Must pass one of % as 3rd arg.', array_to_string(c_valid_toggles,','); +END IF; + +EXECUTE 'ALTER EXTENSION pglogical_ticker '||p_toggle||' '||p_type||' '||p_full_obj_name; + +/*EXCEPTION + WHEN undefined_function THEN + RETURN; + WHEN undefined_object THEN + RETURN; + WHEN object_not_in_prerequisite_state THEN + RETURN; +*/ +END; +$BODY$ +LANGUAGE plpgsql; + +CREATE FUNCTION pglogical_ticker.deploy_ticker_tables() +RETURNS INT AS +$BODY$ +/**** +This will create the main table on both provider and +all subscriber(s) for in use replication sets. + +It assumes this extension is installed both places. + */ +DECLARE + v_row_count INT; +BEGIN + +PERFORM pglogical.replicate_ddl_command($$ +CREATE TABLE IF NOT EXISTS pglogical_ticker.$$||quote_ident(set_name)||$$ ( + provider_name NAME PRIMARY KEY, + source_time TIMESTAMPTZ +);$$, ARRAY[set_name]) +FROM pglogical.replication_set; + +PERFORM pglogical_ticker.add_ext_object('TABLE', 'pglogical_ticker.'||quote_ident(set_name)) +FROM pglogical.replication_set; + +GET DIAGNOSTICS v_row_count = ROW_COUNT; +RETURN v_row_count; + +END; +$BODY$ +LANGUAGE plpgsql; + +CREATE FUNCTION pglogical_ticker.all_repset_tickers() +RETURNS TABLE (provider_name NAME, set_name NAME, source_time TIMESTAMPTZ) +AS +$BODY$ +DECLARE v_sql TEXT; +BEGIN + +SELECT COALESCE( + string_agg( + format( + 'SELECT provider_name, %s::NAME AS set_name, source_time FROM %s', + quote_literal(rs.set_name), + relid::REGCLASS::TEXT + ), + E'\nUNION ALL\n' + ), + 'SELECT NULL::NAME, NULL::NAME, NULL::TIMESTAMPTZ') INTO v_sql +FROM pg_stat_user_tables st +INNER JOIN pglogical.replication_set rs ON rs.set_name = st.relname +WHERE schemaname = 'pglogical_ticker'; + +RETURN QUERY EXECUTE v_sql; + +END; +$BODY$ +LANGUAGE plpgsql; + +CREATE FUNCTION pglogical_ticker.all_subscription_tickers() +RETURNS TABLE (provider_name NAME, set_name NAME, source_time TIMESTAMPTZ) +AS +$BODY$ +DECLARE v_sql TEXT; +BEGIN + +WITH sub_rep_sets AS ( +SELECT DISTINCT unnest(sub_replication_sets) AS set_name +FROM pglogical.subscription +) + +SELECT COALESCE( + string_agg( + format( + 'SELECT provider_name, %s::NAME AS set_name, source_time FROM %s', + quote_literal(srs.set_name), + relid::REGCLASS::TEXT + ), + E'\nUNION ALL\n' + ), + 'SELECT NULL::NAME, NULL::NAME, NULL::TIMESTAMPTZ') INTO v_sql +FROM pg_stat_user_tables st +INNER JOIN sub_rep_sets srs ON srs.set_name = st.relname +WHERE schemaname = 'pglogical_ticker'; + +RETURN QUERY EXECUTE v_sql; + +END; +$BODY$ +LANGUAGE plpgsql; + +CREATE FUNCTION pglogical_ticker.add_ticker_tables_to_replication() +RETURNS INT AS +$BODY$ +DECLARE v_row_count INT; +BEGIN +/**** +This will add all ticker tables +to replication if not done already. + +It assumes of course pglogical_ticker.deploy_ticker_tables() +has been run. + */ +PERFORM rs.set_name, pglogical.replication_set_add_table( + set_name:=rs.set_name + ,relation:=('pglogical_ticker.'||quote_ident(set_name))::REGCLASS + --default synchronize_data is false + ,synchronize_data:=false +) +FROM pglogical.replication_set rs +WHERE NOT EXISTS + (SELECT 1 + FROM pglogical_ticker.rep_set_table_wrapper rsr + WHERE rsr.set_reloid = ('pglogical_ticker.'||quote_ident(set_name))::REGCLASS + AND rsr.set_id = rs.set_id); + +GET DIAGNOSTICS v_row_count = ROW_COUNT; +RETURN v_row_count; + +END; +$BODY$ +LANGUAGE plpgsql; + +CREATE FUNCTION pglogical_ticker.tick_rep_set(p_set_name name) +RETURNS INT AS +$BODY$ +DECLARE + v_sql TEXT; +BEGIN + +v_sql:=$$ +INSERT INTO pglogical_ticker.$$||quote_ident(p_set_name)||$$ (provider_name, source_time) +SELECT ni.if_name, now() AS source_time +FROM pglogical.replication_set rs +INNER JOIN pglogical.node n ON n.node_id = rs.set_nodeid +INNER JOIN pglogical.node_interface ni ON ni.if_nodeid = n.node_id +WHERE EXISTS (SELECT 1 + FROM pglogical_ticker.rep_set_table_wrapper rsr + WHERE rsr.set_id = rs.set_id) +ON CONFLICT (provider_name, replication_set_name) +DO UPDATE +SET source_time = now(); +$$; + +EXECUTE v_sql; + +END; +$BODY$ +LANGUAGE plpgsql; + +CREATE FUNCTION pglogical_ticker.tick() +RETURNS VOID AS +$BODY$ +DECLARE + v_record RECORD; + v_sql TEXT; + v_row_count INT; +BEGIN + +FOR v_record IN SELECT set_name FROM pglogical.replication_set ORDER BY set_name +LOOP + + v_sql:=$$ + INSERT INTO pglogical_ticker.$$||quote_ident(v_record.set_name)||$$ (provider_name, source_time) + SELECT ni.if_name, now() AS source_time + FROM pglogical.replication_set rs + INNER JOIN pglogical.node n ON n.node_id = rs.set_nodeid + INNER JOIN pglogical.node_interface ni ON ni.if_nodeid = n.node_id + WHERE rs.set_name = '$$||quote_ident(v_record.set_name)||$$' + ON CONFLICT (provider_name) + DO UPDATE + SET source_time = now(); + $$; + + EXECUTE v_sql; + +END LOOP; + +END; +$BODY$ +LANGUAGE plpgsql; + +REVOKE EXECUTE ON ALL FUNCTIONS IN SCHEMA pglogical_ticker FROM PUBLIC; +/* pglogical_ticker--1.0--1.1.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION pglogical_ticker" to load this file. \quit + +--This must be done AFTER we update the function def +SELECT pglogical_ticker.drop_ext_object('FUNCTION','pglogical_ticker.dependency_update()'); +DROP FUNCTION pglogical_ticker.dependency_update(); +SELECT pglogical_ticker.drop_ext_object('VIEW','pglogical_ticker.rep_set_table_wrapper'); +DROP VIEW IF EXISTS pglogical_ticker.rep_set_table_wrapper; + + +CREATE OR REPLACE FUNCTION pglogical_ticker.toggle_ext_object(p_type text, p_full_obj_name text, p_toggle text) + RETURNS void + LANGUAGE plpgsql +AS $function$ +DECLARE + c_valid_types TEXT[] = ARRAY['EVENT TRIGGER','FUNCTION','VIEW','TABLE']; + c_valid_toggles TEXT[] = ARRAY['ADD','DROP']; +BEGIN + +IF NOT (SELECT ARRAY[upper(p_type)] && c_valid_types) THEN + RAISE EXCEPTION 'Must pass one of % as 1st arg.', array_to_string(c_valid_types,','); +END IF; + +IF NOT (SELECT ARRAY[upper(p_toggle)] && c_valid_toggles) THEN + RAISE EXCEPTION 'Must pass one of % as 3rd arg.', array_to_string(c_valid_toggles,','); +END IF; + +EXECUTE 'ALTER EXTENSION pglogical_ticker '||p_toggle||' '||p_type||' '||p_full_obj_name; + +EXCEPTION + WHEN undefined_function THEN + RETURN; + WHEN undefined_object THEN + RETURN; + WHEN object_not_in_prerequisite_state THEN + RETURN; + +END; +$function$ +; + + +CREATE OR REPLACE FUNCTION pglogical_ticker.rep_set_table_wrapper() + RETURNS TABLE (set_id OID, set_reloid REGCLASS) + LANGUAGE plpgsql +AS $function$ +/***** +This handles the rename of pglogical.replication_set_relation to pglogical_ticker.rep_set_table_wrapper from version 1 to 2 + */ +BEGIN + +IF EXISTS (SELECT 1 FROM pg_tables WHERE schemaname = 'pglogical' AND tablename = 'replication_set_table') THEN + RETURN QUERY + SELECT r.set_id, r.set_reloid + FROM pglogical.replication_set_table r; + +ELSEIF EXISTS (SELECT 1 FROM pg_tables WHERE schemaname = 'pglogical' AND tablename = 'replication_set_relation') THEN + RETURN QUERY + SELECT r.set_id, r.set_reloid + FROM pglogical.replication_set_relation r; + +ELSE + RAISE EXCEPTION 'No table pglogical.replication_set_relation or pglogical.replication_set_table found'; +END IF; + +END; +$function$ +; + + +CREATE OR REPLACE FUNCTION pglogical_ticker.add_ticker_tables_to_replication() + RETURNS integer + LANGUAGE plpgsql +AS $function$ +DECLARE v_row_count INT; +BEGIN +/**** +This will add all ticker tables +to replication if not done already. + +It assumes of course pglogical_ticker.deploy_ticker_tables() +has been run. + */ +PERFORM rs.set_name, pglogical.replication_set_add_table( + set_name:=rs.set_name + ,relation:=('pglogical_ticker.'||quote_ident(set_name))::REGCLASS + --default synchronize_data is false + ,synchronize_data:=false +) +FROM pglogical.replication_set rs +WHERE NOT EXISTS + (SELECT 1 + FROM pglogical_ticker.rep_set_table_wrapper() rsr + WHERE rsr.set_reloid = ('pglogical_ticker.'||quote_ident(set_name))::REGCLASS + AND rsr.set_id = rs.set_id); + +GET DIAGNOSTICS v_row_count = ROW_COUNT; +RETURN v_row_count; + +END; +$function$ +; + + +CREATE OR REPLACE FUNCTION pglogical_ticker.tick_rep_set(p_set_name name) + RETURNS integer + LANGUAGE plpgsql +AS $function$ +DECLARE + v_sql TEXT; +BEGIN + +v_sql:=$$ +INSERT INTO pglogical_ticker.$$||quote_ident(p_set_name)||$$ (provider_name, source_time) +SELECT ni.if_name, now() AS source_time +FROM pglogical.replication_set rs +INNER JOIN pglogical.node n ON n.node_id = rs.set_nodeid +INNER JOIN pglogical.node_interface ni ON ni.if_nodeid = n.node_id +WHERE EXISTS (SELECT 1 + FROM pglogical_ticker.rep_set_table_wrapper() rsr + WHERE rsr.set_id = rs.set_id) +ON CONFLICT (provider_name, replication_set_name) +DO UPDATE +SET source_time = now(); +$$; + +EXECUTE v_sql; + +END; +$function$ +; + + +CREATE OR REPLACE FUNCTION pglogical_ticker.launch() + RETURNS integer + LANGUAGE sql + STRICT +AS $function$ +SELECT pglogical_ticker._launch(oid) +FROM pg_database +WHERE datname = current_database() +--This should be improved in the future but should do +--the job for now. +AND NOT EXISTS + (SELECT 1 + FROM pg_stat_activity psa + WHERE NOT pid = pg_backend_pid() + AND query = 'SELECT pglogical_ticker.tick();') +AND NOT pg_is_in_recovery(); +$function$ +; + +CREATE OR REPLACE FUNCTION pglogical_ticker.launch_if_repset_tables() + RETURNS integer + LANGUAGE sql +AS $function$ +SELECT pglogical_ticker.launch() +WHERE EXISTS (SELECT 1 FROM pglogical_ticker.rep_set_table_wrapper()); +$function$ +; + +/* pglogical_ticker--1.1--1.2.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION pglogical_ticker" to load this file. \quit + +DROP FUNCTION pglogical_ticker.deploy_ticker_tables(); +DROP FUNCTION pglogical_ticker.add_ticker_tables_to_replication(); +CREATE OR REPLACE FUNCTION pglogical_ticker.eligible_tickers +( +/*** +"Eligible tickers" are defined as replication sets and tables +that are eligible to be created or added to replication, either +because the replication sets exist, or with cascading replication, +the tables already exist to add to a specified replication set +p_cascade_to_set_name as cascaded tickers. +***/ +p_cascade_to_set_name NAME = NULL +) + RETURNS TABLE (set_name name, tablename name) + LANGUAGE plpgsql +AS $function$ +/**** +It assumes this extension is installed both places! + */ +BEGIN + +RETURN QUERY +--In the generic case, always tablename = set_name +SELECT rs.set_name, rs.set_name AS tablename +FROM pglogical.replication_set rs +WHERE p_cascade_to_set_name IS NULL +UNION +--For cascading replication, we override set_name +SELECT p_cascade_to_set_name AS set_name_out, relname AS tablename +FROM pg_class c +INNER JOIN pg_namespace n ON n.oid = c.relnamespace +WHERE p_cascade_to_set_name IS NOT NULL +AND n.nspname = 'pglogical_ticker' +AND c.relkind = 'r' +AND EXISTS ( + SELECT 1 + FROM pglogical.replication_set rsi + WHERE rsi.set_name = p_cascade_to_set_name +); + +END; +$function$ +; + + +CREATE OR REPLACE FUNCTION pglogical_ticker.deploy_ticker_tables( +--For use with cascading replication, you can pass +--a set_name in order to add all current subscription tickers +--to this replication set +p_cascade_to_set_name NAME = NULL +) + RETURNS integer + LANGUAGE plpgsql +AS $function$ +/**** +This will create the main table on both provider and +all subscriber(s) for in use replication sets. + +It assumes this extension is installed both places. + */ +DECLARE + v_row_count INT; +BEGIN + +PERFORM pglogical.replicate_ddl_command($$ +CREATE TABLE IF NOT EXISTS pglogical_ticker.$$||quote_ident(tablename)||$$ ( + provider_name NAME PRIMARY KEY, + source_time TIMESTAMPTZ +); + +SELECT pglogical_ticker.add_ext_object( +'TABLE', +format('%s.%s', + 'pglogical_ticker', + quote_ident($$||quote_literal(tablename)||$$) + ) +); +$$, ARRAY[set_name]) +FROM pglogical_ticker.eligible_tickers(p_cascade_to_set_name); + +GET DIAGNOSTICS v_row_count = ROW_COUNT; +RETURN v_row_count; + +END; +$function$ +; + + +CREATE OR REPLACE FUNCTION pglogical_ticker.add_ticker_tables_to_replication( +--For use with cascading replication, you can pass +--a set_name in order to add all current subscription tickers +--to this replication set +p_cascade_to_set_name NAME = NULL +) + RETURNS integer + LANGUAGE plpgsql +AS $function$ +DECLARE v_row_count INT; +BEGIN +/**** +This will add all ticker tables +to replication if not done already. + +It assumes of course pglogical_ticker.deploy_ticker_tables() +has been run. + */ +PERFORM et.set_name, pglogical.replication_set_add_table( + set_name:=et.set_name + ,relation:=('pglogical_ticker.'||quote_ident(et.tablename))::REGCLASS + --default synchronize_data is false + ,synchronize_data:=false +) +FROM pglogical_ticker.eligible_tickers(p_cascade_to_set_name) et +WHERE NOT EXISTS + (SELECT 1 + FROM pglogical_ticker.rep_set_table_wrapper() rsr + INNER JOIN pglogical.replication_set rs ON rs.set_id = rsr.set_id + WHERE rsr.set_reloid = ('pglogical_ticker.'||quote_ident(et.tablename))::REGCLASS + AND et.set_name = rs.set_name); + +GET DIAGNOSTICS v_row_count = ROW_COUNT; +RETURN v_row_count; + +END; +$function$ +; + + +CREATE OR REPLACE FUNCTION pglogical_ticker.tick() + RETURNS void + LANGUAGE plpgsql +AS $function$ +DECLARE + v_record RECORD; + v_sql TEXT; + v_row_count INT; +BEGIN + +FOR v_record IN + SELECT rs.set_name + FROM pglogical.replication_set rs + /*** + Don't try to tick tables that don't yet exist. This will allow + us to create replication sets without worrying about adding a ticker table + immediately. + ***/ + WHERE EXISTS + (SELECT 1 + FROM pg_class c + INNER JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = 'pglogical_ticker' + AND c.relname = rs.set_name + /*** + Also avoid uselessly ticking tables that are not in any replication set + (regardless of which one) + ***/ + AND EXISTS + (SELECT 1 + FROM pglogical_ticker.rep_set_table_wrapper() rst + WHERE c.oid = rst.set_reloid) + ) + ORDER BY rs.set_name +LOOP + + v_sql:=$$ + INSERT INTO pglogical_ticker.$$||quote_ident(v_record.set_name)||$$ (provider_name, source_time) + SELECT ni.if_name, now() AS source_time + FROM pglogical.replication_set rs + INNER JOIN pglogical.node n ON n.node_id = rs.set_nodeid + INNER JOIN pglogical.node_interface ni ON ni.if_nodeid = n.node_id + WHERE rs.set_name = '$$||quote_ident(v_record.set_name)||$$' + ON CONFLICT (provider_name) + DO UPDATE + SET source_time = now(); + $$; + + EXECUTE v_sql; + +END LOOP; + +END; +$function$ +; + + diff --git a/pglogical_ticker-sql-maker.sh b/pglogical_ticker-sql-maker.sh index fca7117..6eb77b7 100755 --- a/pglogical_ticker-sql-maker.sh +++ b/pglogical_ticker-sql-maker.sh @@ -2,8 +2,8 @@ set -eu -last_version=1.2 -new_version=1.3 +last_version=1.3 +new_version=1.4 last_version_file=pglogical_ticker--${last_version}.sql new_version_file=pglogical_ticker--${new_version}.sql update_file=pglogical_ticker--${last_version}--${new_version}.sql diff --git a/pglogical_ticker.control b/pglogical_ticker.control index 74369e3..f3b7fad 100644 --- a/pglogical_ticker.control +++ b/pglogical_ticker.control @@ -1,6 +1,6 @@ # pglogical_ticker extension comment = 'Have an accurate view on pglogical replication delay' -default_version = '1.3' +default_version = '1.4' schema = 'pglogical_ticker' module_pathname = '$libdir/pglogical_ticker' requires = 'pglogical' From ef9b68fd6b5b99787034520009577f8cfec0049c Mon Sep 17 00:00:00 2001 From: Jeremy Finzel Date: Thu, 12 Sep 2019 10:06:44 -0500 Subject: [PATCH 2/3] Support auto-launching ticker and add application_name --- README.md | 49 +++++++--- expected/01_create_ext.out | 2 +- functions/pglogical_ticker.launch.sql | 4 +- pglogical_ticker--1.3--1.4.sql | 25 +++++ pglogical_ticker-sql-maker.sh | 3 +- pglogical_ticker.c | 130 +++++++++++++++++++++++--- sql/01_create_ext.sql | 2 +- test_all_versions.sh | 91 +++++++++++++++++- 8 files changed, 273 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index 5a96a3e..80159e3 100644 --- a/README.md +++ b/README.md @@ -20,12 +20,19 @@ make install make installcheck # run regression suite ``` +Although not strictly required, to get access to the configuration settings of +`pglogical_ticker` and to auto-launch the ticker on server restart or a soft crash, +add `pglogical_ticker` to `shared_preload_libraries` in your postgresql.conf file: +``` +shared_preload_libraries = 'pglogical,pglogical_ticker' #... and whatever others you already may have +``` + Once installed, simply run this on the provider and all subscribers: ```sql CREATE EXTENSION pglogical_ticker; ``` -### Deploy tables and launch ticker +### Deploy ticker tables Deploy the ticker tables. Run this command on the provider only, which will use `pglogical.replicate_ddl_command` to send to subscriber. ```sql @@ -58,22 +65,39 @@ You can manually try the tick() function if you so choose: SELECT pglogical_ticker.tick(); ``` -TO LAUNCH THE BACKGROUND WORKER: +### Configuration +This is only supported if you have added `pglogical_ticker` in `shared_preload_libraries` +as noted above. + +- `pglogical_ticker.database`: The database in which to launch the ticker (we currently + have no need to support multiple databases, but may add that feature at a later time). + The ticker will only auto-launch on restart if this setting is configured. +- `pglogical_ticker.naptime`: How frequently the ticker ticks - default 10 seconds +- `pglogical_ticker.restart_time`: How many seconds before the ticker auto-restarts, default 10. This + is also how long it will take to re-launch after a soft crash, for instance. Set this to + -1 to disable. **Be aware** that you cannot use this setting to prevent an already-launched + ticker from restarting. Only a server restart will take this new value into account for + the ticker backend and prevent it from ever restarting, if that is your desired behavior. + +### Launching the ticker +As of version 1.4, the ticker will automatically launch upon server load +if you have `pglogical_ticker` in `shared_preload_libraries`. + +Otherwise, this function will launch the ticker, only if there is not already +one running: ```sql SELECT pglogical_ticker.launch(); /** -It is better to use this function always, which automatically checks if +It is better to use the following function instead, which automatically checks if the system should have a ticker based on tables existing in replication. (this assumes you don't want a replication stream open with no tables). - -This function is very useful if you want to just blindly run the function -to launch the ticker if it needs to be launched, i.e. after a restart. **/ SELECT pglogical_ticker.launch_if_repset_tables(); ``` -This will run the function `pglogical_ticker.tick()` every 10 seconds. +The background worker launched either by this function or upon server load will +run the function `pglogical_ticker.tick()` every n seconds according to `pglogical_ticker.naptime`. Be sure to use caution in monitoring deployment and running of these background worker processes. @@ -95,10 +119,13 @@ SELECT * FROM pglogical_ticker.all_subscription_tickers(); Help is always wanted to review and improve the BackgroundWorker module. It is directly based on `worker_spi` from Postgres' test suite. -It could be improved to take arguments for naptime and also a different -username. I would also like it to be written so as to prevent any -possibility of launching more than one worker, which currently is only -done through the exposed function in the docs `launch()`. +It could be improved to use a different username. I would also like it to +be written so as to prevent any possibility of launching more than one worker, +which currently is only done through the exposed function in the docs `launch()`. + +As of 1.4, I'm also interested in allowing a clean shutdown with exit code 0, +as well as (if safe enough) searching databases for the ticker as opposed to having +to configure `pglogical_ticker.database`. The SQL files are maintained separately to make version control much easier to see. Make changes in these folders and then run diff --git a/expected/01_create_ext.out b/expected/01_create_ext.out index 3af4c4b..b689baf 100644 --- a/expected/01_create_ext.out +++ b/expected/01_create_ext.out @@ -1,5 +1,5 @@ -- Allow running regression suite with upgrade paths -\set v `echo ${FROMVERSION:-1.3}` +\set v `echo ${FROMVERSION:-1.4}` SET client_min_messages = warning; CREATE EXTENSION pglogical; CREATE EXTENSION pglogical_ticker VERSION :'v'; diff --git a/functions/pglogical_ticker.launch.sql b/functions/pglogical_ticker.launch.sql index 4c6ab09..cb4f6df 100644 --- a/functions/pglogical_ticker.launch.sql +++ b/functions/pglogical_ticker.launch.sql @@ -12,7 +12,7 @@ AND NOT EXISTS (SELECT 1 FROM pg_stat_activity psa WHERE NOT pid = pg_backend_pid() - AND query = 'SELECT pglogical_ticker.tick();') + AND application_name LIKE 'pglogical_ticker%') AND NOT pg_is_in_recovery(); $function$ -; \ No newline at end of file +; diff --git a/pglogical_ticker--1.3--1.4.sql b/pglogical_ticker--1.3--1.4.sql index e69de29..1f9a6ff 100644 --- a/pglogical_ticker--1.3--1.4.sql +++ b/pglogical_ticker--1.3--1.4.sql @@ -0,0 +1,25 @@ +/* pglogical_ticker--1.3--1.4.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION pglogical_ticker" to load this file. \quit + +CREATE OR REPLACE FUNCTION pglogical_ticker.launch() + RETURNS integer + LANGUAGE sql + STRICT +AS $function$ +SELECT pglogical_ticker._launch(oid) +FROM pg_database +WHERE datname = current_database() +--This should be improved in the future but should do +--the job for now. +AND NOT EXISTS + (SELECT 1 + FROM pg_stat_activity psa + WHERE NOT pid = pg_backend_pid() + AND application_name LIKE 'pglogical_ticker%') +AND NOT pg_is_in_recovery(); +$function$ +; + + diff --git a/pglogical_ticker-sql-maker.sh b/pglogical_ticker-sql-maker.sh index 6eb77b7..e4e9a6f 100755 --- a/pglogical_ticker-sql-maker.sh +++ b/pglogical_ticker-sql-maker.sh @@ -33,9 +33,10 @@ d=$2 (cat "${s}"; echo; echo) >> "$d" } -#create_update_file_with_header +create_update_file_with_header # Add view and function changes +add_file functions/pglogical_ticker.launch.sql $update_file # Only copy diff and new files after last version, and add the update script touch $update_file diff --git a/pglogical_ticker.c b/pglogical_ticker.c index b9b4097..be1588e 100644 --- a/pglogical_ticker.c +++ b/pglogical_ticker.c @@ -34,6 +34,7 @@ PG_MODULE_MAGIC; PG_FUNCTION_INFO_V1(pglogical_ticker_launch); +void _PG_init(void); void pglogical_ticker_main(Datum) pg_attribute_noreturn(); /* flags set by signal handlers */ @@ -41,7 +42,12 @@ static volatile sig_atomic_t got_sighup = false; static volatile sig_atomic_t got_sigterm = false; /* GUC variables */ -static int ticker_naptime = 10; +static int pglogical_ticker_naptime = 10; +static char *pglogical_ticker_database; +static int pglogical_ticker_restart_time = 10; + +/* Constants */ +static int pglogical_ticker_total_workers = 1; /* * Signal handler for SIGTERM @@ -90,18 +96,31 @@ pglogical_ticker_main(Datum main_arg) BackgroundWorkerUnblockSignals(); /* Connect to our database */ + if (pglogical_ticker_database != NULL) + { +#if PG_VERSION_NUM >= 110000 + BackgroundWorkerInitializeConnection(pglogical_ticker_database, NULL, 0); +#else + BackgroundWorkerInitializeConnection(pglogical_ticker_database, NULL); +#endif + } + else + { #if PG_VERSION_NUM >= 110000 - BackgroundWorkerInitializeConnectionByOid(db_oid_main, InvalidOid, 0); + BackgroundWorkerInitializeConnectionByOid(db_oid_main, InvalidOid, 0); #else - BackgroundWorkerInitializeConnectionByOid(db_oid_main, InvalidOid); + BackgroundWorkerInitializeConnectionByOid(db_oid_main, InvalidOid); #endif + } + SetConfigOption("application_name", MyBgworkerEntry->bgw_name, + PGC_USERSET, PGC_S_SESSION); elog(LOG, "%s initialized", - MyBgworkerEntry->bgw_name); + MyBgworkerEntry->bgw_name); initStringInfo(&buf); appendStringInfo(&buf, - "SELECT pglogical_ticker.tick();"); + "SELECT pglogical_ticker.tick();"); /* * Main loop: do this until the SIGTERM handler tells us to terminate @@ -118,13 +137,13 @@ pglogical_ticker_main(Datum main_arg) */ #if PG_VERSION_NUM >= 100000 rc = WaitLatch(MyLatch, - WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, - ticker_naptime * 1000L, - PG_WAIT_EXTENSION); + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + pglogical_ticker_naptime * 1000L, + PG_WAIT_EXTENSION); #else rc = WaitLatch(MyLatch, - WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, - ticker_naptime * 1000L); + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + pglogical_ticker_naptime * 1000L); #endif ResetLatch(MyLatch); @@ -177,17 +196,102 @@ pglogical_ticker_main(Datum main_arg) pgstat_report_stat(false); pgstat_report_activity(STATE_IDLE, NULL); } - + proc_exit(1); } +/* + * Entrypoint of this module. + * + * We register more than one worker process here, to demonstrate how that can + * be done. + */ +void +_PG_init(void) +{ + BackgroundWorker worker; + unsigned int i; + + /* get the configuration */ + DefineCustomIntVariable("pglogical_ticker.naptime", + "Duration between each tick (in seconds).", + NULL, + &pglogical_ticker_naptime, + pglogical_ticker_naptime, + 1, + INT_MAX, + PGC_SIGHUP, + 0, + NULL, + NULL, + NULL); + + + DefineCustomStringVariable("pglogical_ticker.database", + "Database to connect to.", + NULL, + &pglogical_ticker_database, + pglogical_ticker_database, + PGC_SIGHUP, + 0, + NULL, + NULL, + NULL); + + DefineCustomIntVariable("pglogical_ticker.restart_time", + "Seconds after which to restart ticker if it dies. -1 to disable", + NULL, + &pglogical_ticker_restart_time, + pglogical_ticker_restart_time, + -1, + INT_MAX, + PGC_SIGHUP, + 0, + NULL, + NULL, + NULL); + + if (!process_shared_preload_libraries_in_progress) + return; + + + /* Only auto-start worker if pglogical_ticker_database is set */ + if (pglogical_ticker_database) + { + /* set up common data for all our workers */ + memset(&worker, 0, sizeof(worker)); + worker.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_RecoveryFinished; + worker.bgw_restart_time = pglogical_ticker_restart_time; + sprintf(worker.bgw_library_name, "pglogical_ticker"); + sprintf(worker.bgw_function_name, "pglogical_ticker_main"); + worker.bgw_notify_pid = 0; + + /* + * Now fill in worker-specific data, and do the actual registrations. + */ + for (i = 1; i <= pglogical_ticker_total_workers; i++) + { + snprintf(worker.bgw_name, BGW_MAXLEN, "pglogical_ticker worker %d", i); +#if PG_VERSION_NUM >= 110000 + snprintf(worker.bgw_type, BGW_MAXLEN, "pglogical_ticker"); +#endif + /* Hack to use postgres db oid until we do something smarter */ + worker.bgw_main_arg = Int32GetDatum(0); + + RegisterBackgroundWorker(&worker); + } + } +} + /* * Dynamically launch an SPI worker. */ Datum pglogical_ticker_launch(PG_FUNCTION_ARGS) { - Oid db_oid = PG_GETARG_OID(0); + Oid db_oid = PG_GETARG_OID(0); BackgroundWorker worker; BackgroundWorkerHandle *handle; BgwHandleStatus status; @@ -197,7 +301,7 @@ pglogical_ticker_launch(PG_FUNCTION_ARGS) worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; worker.bgw_start_time = BgWorkerStart_RecoveryFinished; - worker.bgw_restart_time = BGW_NEVER_RESTART; + worker.bgw_restart_time = pglogical_ticker_restart_time; sprintf(worker.bgw_library_name, "pglogical_ticker"); sprintf(worker.bgw_function_name, "pglogical_ticker_main"); snprintf(worker.bgw_name, BGW_MAXLEN, "pglogical_ticker worker"); diff --git a/sql/01_create_ext.sql b/sql/01_create_ext.sql index 3af4c4b..b689baf 100644 --- a/sql/01_create_ext.sql +++ b/sql/01_create_ext.sql @@ -1,5 +1,5 @@ -- Allow running regression suite with upgrade paths -\set v `echo ${FROMVERSION:-1.3}` +\set v `echo ${FROMVERSION:-1.4}` SET client_min_messages = warning; CREATE EXTENSION pglogical; CREATE EXTENSION pglogical_ticker VERSION :'v'; diff --git a/test_all_versions.sh b/test_all_versions.sh index 2337674..3918a81 100755 --- a/test_all_versions.sh +++ b/test_all_versions.sh @@ -3,7 +3,7 @@ set -eu orig_path=$PATH -newest_version=1.3 +newest_version=1.4 unset PGSERVICE @@ -27,6 +27,91 @@ sudo "PATH=$PATH" make install port=$(get_port $version) PGPORT=$port psql postgres -c "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = 'contrib_regression' AND pid <> pg_backend_pid()" FROMVERSION=$from_version PGPORT=$port make installcheck + +sigpg() { +sig=$1 +echo "performing $sig" +sudo systemctl $sig postgresql@${version}-main +} + +# Start without shared_preload_libraries +echo "Testing no shared_preload_libraries launch and restart" +sudo -u postgres sed -i "s/'pglogical,pglogical_ticker'/'pglogical'/g" /etc/postgresql/$version/main/postgresql.conf +sigpg restart + +## Run the first 4 regression files which sets things up +echo "Seeding with first 4 regression scripts" +for f in sql/0[1-4]*; do + PGPORT=$port psql contrib_regression -f $f > /dev/null +done + +assert_ticker_running() { +PGPORT=$port psql contrib_regression -v "ON_ERROR_STOP" << 'EOM' +DO $$ +BEGIN + +IF NOT EXISTS (SELECT 1 FROM pg_stat_activity WHERE application_name LIKE 'pglogical_ticker%') THEN + RAISE EXCEPTION 'No ticker running'; +END IF; + +END$$; +EOM +echo "PASS" +} + +assert_ticker_not_running() { +PGPORT=$port psql contrib_regression -v "ON_ERROR_STOP" << 'EOM' +DO $$ +BEGIN + +IF EXISTS (SELECT 1 FROM pg_stat_activity WHERE application_name LIKE 'pglogical_ticker%') THEN + RAISE EXCEPTION 'Ticker running'; +END IF; + +END$$; +EOM +echo "PASS" +} + +ticker_check() { +echo "Launching ticker if not launched" +PGPORT=$port psql contrib_regression -v "ON_ERROR_STOP" << 'EOM' > /dev/null +SELECT pglogical_ticker.launch(); +SELECT pg_sleep(1); +EOM +assert_ticker_running +echo "Terminating and expecting auto-restart" +PGPORT=$port psql contrib_regression -v "ON_ERROR_STOP" << 'EOM' > /dev/null +SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE application_name LIKE 'pglogical_ticker%'; +SELECT pg_sleep(11); +EOM +assert_ticker_running +} +ticker_check + +# Perform first load using shared_preload_libraries to set GUCs +sudo -u postgres sed -i "s/'pglogical'/'pglogical,pglogical_ticker'/g" /etc/postgresql/$version/main/postgresql.conf +sudo -u postgres sed -i "\$apglogical_ticker.database = 'contrib_regression'" /etc/postgresql/$version/main/postgresql.conf +sigpg reload +ticker_check + +# Restart, now it should auto-launch +sigpg restart +sleep 11 +ticker_check + +echo "Testing soft crash restart" +PGPORT=$port psql contrib_regression -c "SELECT 'i filo postgres'::text, pg_sleep(10);" & +pid=`PGPORT=$port psql contrib_regression -Atq -c "SELECT pid FROM pg_stat_activity WHERE NOT pid = pg_backend_pid() AND query ~* 'i filo postgres'"` +echo "found pid $pid to kill" +sudo kill -9 $pid +sleep 12 +ticker_check + +sudo -u postgres sed -i "/pglogical_ticker.database/d" /etc/postgresql/$version/main/postgresql.conf +sigpg restart +sleep 11 +assert_ticker_not_running } test_all_versions() { @@ -42,7 +127,5 @@ make_and_test "10" make_and_test "11" } +test_all_versions "1.4" test_all_versions "1.3" -test_all_versions "1.2" -test_all_versions "1.1" -test_all_versions "1.0" From 4fdbb4d34583b290245d426686098e5313e6ff5b Mon Sep 17 00:00:00 2001 From: Jeremy Finzel Date: Thu, 12 Sep 2019 10:21:25 -0500 Subject: [PATCH 3/3] Update debian changelog --- debian/changelog | 6 ++++++ debian/control.in | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/debian/changelog b/debian/changelog index 5d8523b..c280207 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +pglogical-ticker (1.4.0-1) unstable; urgency=medium + + * Support ticker auto-restart + + -- Jeremy Finzel Thu, 12 Sep 2019 10:21:00 -0500 + pglogical-ticker (1.3.1-1) unstable; urgency=medium * Fix race conditions in tests diff --git a/debian/control.in b/debian/control.in index 3cab7b7..6c84037 100644 --- a/debian/control.in +++ b/debian/control.in @@ -11,4 +11,4 @@ Package: postgresql-PGVERSION-pglogical-ticker Architecture: any Depends: postgresql-PGVERSION, postgresql-PGVERSION-pglogical, ${shlibs:Depends}, ${misc:Depends} Description: Have time-based replication delay for pglogical - A pglogical extension to obtain time-based replication delay for PostgreSQL PGVERSION. + A pglogical extension to get time-based replication delay for PostgreSQL PGVERSION.