pg_stat_statements + pg_stat_activity + loq_query = pg_ash?

Als kurze Ergänzung zum Artikel Versuchen Sie, ein Analogon von ASH für PostgreSQL zu erstellen .

Herausforderung


Sie müssen den Ansichtsverlauf pg_stat_statemenets, pg_stat_activity verknüpfen. Mithilfe des Verlaufs der Ausführungspläne aus der Diensttabelle log_query erhalten Sie daher viele nützliche Informationen zur Lösung von Leistungsvorfällen und zur Optimierung von Abfragen.
Warnung
Aufgrund der Neuheit des Themas und des unvollständigen Testzeitraums kann der Artikel Fehler enthalten. Kritik und Kommentare werden nachdrücklich ermutigt und erwartet.

Daten eingeben


History_pg_stat_activity-Tabelle
--ACTIVITY_HIST.HISTORY_PG_STAT_ACTIVITY DROP TABLE IF EXISTS activity_hist.history_pg_stat_activity; CREATE TABLE activity_hist.history_pg_stat_activity ( timepoint timestamp without time zone , datid oid , datname name , pid integer, usesysid oid , usename name , application_name text , client_addr inet , client_hostname text , client_port integer, backend_start timestamp without time zone , xact_start timestamp without time zone , query_start timestamp without time zone , state_change timestamp without time zone , wait_event_type text , wait_event text , state text , backend_xid xid , backend_xmin xid , query text , backend_type text , queryid bigint ); 

Tabelle Pg_stat_db_queries
 CREATE TABLE pg_stat_db_queries ( database_id integer , queryid bigint , query text , max_time double precision );( 

Materialisierte Ansicht von mvw_pg_stat_queries
 CREATE MATERIALIZED VIEW public.mvw_pg_stat_queries AS SELECT t.queryid, t.max_time, t.query FROM public.dblink('LINK1'::text, 'SELECT queryid , max_time , query FROM pg_stat_statements WHERE dbid=(SELECT oid FROM pg_database WHERE datname=current_database() ) AND max_time >= 0 '::text) t(queryid bigint, max_time double precision, query text) WITH NO DATA; 

Log_query-Tabelle
 CREATE TABLE log_query ( id integer , queryid bigint , query_md5hash text , database_id integer , timepoint timestamp without time zone , query text , explained_plan text[] , plan_md5hash text , explained_plan_wo_costs text[] , plan_hash_value text , ip text, port text , pid integer ); 

Allgemeiner Algorithmus


Aktualisieren Sie die Tabelle pg_stat_db_queries


Materialansicht aktualisieren mvw_pg_stat_queries
 CREATE OR REPLACE FUNCTION refresh_pg_stat_queries_list( database_id int) RETURNS BOOLEAN AS $$ DECLARE result BOOLEAN ; database_rec record ; BEGIN SELECT * INTO database_rec FROM endpoint e JOIN database d ON e.id = d.endpoint_id WHERE d.id = database_id ; IF NOT database_rec.is_need_monitoring THEN RAISE NOTICE 'NO NEED MONITORING FOR database_id=%',database_id; return TRUE ; END IF ; EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' port=5432 dbname='||database_rec.name|| ' user='||database_rec.s_name||' password='||database_rec.s_pass|| ' '')'; REFRESH MATERIALIZED VIEW mvw_pg_stat_queries ; PERFORM dblink_disconnect('LINK1'); RETURN result; END $$ LANGUAGE plpgsql; 

Füllen Sie die Tabelle pg_stat_db_queries aus
 CREATE OR REPLACE FUNCTION refresh_pg_stat_db_queries( ) RETURNS BOOLEAN AS $$ DECLARE result BOOLEAN ; database_rec record ; pg_stat_rec record ; BEGIN TRUNCATE pg_stat_db_queries; FOR database_rec IN SELECT * FROM database d LOOP IF NOT database_rec.is_need_monitoring THEN RAISE NOTICE 'NO NEED MONITORING FOR database_id=%',database_rec.id; CONTINUE ; END IF ; PERFORM refresh_pg_stat_queries_list( database_rec.id ) ; FOR pg_stat_rec IN SELECT * FROM mvw_pg_stat_queries LOOP INSERT INTO pg_stat_db_queries ( database_id , queryid , query , max_time ) VALUES ( database_rec.id , pg_stat_rec.queryid , pg_stat_rec.query , pg_stat_rec.max_time); END LOOP; END LOOP; RETURN TRUE; END $$ LANGUAGE plpgsql; 

Infolgedessen enthält die Tabelle normalisierte Abfragetexte, queryid, die maximale Ausführungszeit der Abfrage zum aktuellen Zeitpunkt (zur Überwachung verwendet).

Füllen Sie log_query und erstellen Sie einen Verlauf der Ausführungspläne.


Der eigentliche Anforderungstext wird aus der Protokolldatei entnommen. Protokolldatei vom Zielhost zum Überwachungshost in Teilen, Bash-Skript, Cron. Aus Platzgründen und aufgrund der Trivialität der Aufgabe, einen Teil einer Textdatei von Host zu Host zu kopieren, wird das Skript nicht bereitgestellt.

Analysieren einer Protokolldatei und Extrahieren von Abfragetext
 #!/bin/bash ######################################################### # upload_log_query.sh # Upload table table from dowloaded aws file # version 12.0 ########################################################### echo 'TIMESTAMP:'$(date +%c)' Upload log_query table ' source_file=$1 echo 'source_file='$source_file database_id=$2 echo 'database_id='$database_id database_name=$3 echo 'database_name='$database_name beginer=' ' first_line='1' let "line_count=0" sql_line=' ' sql_flag=' ' space=' ' cat $source_file | while read line do #first line will be passed if [[ $line_count == '0' ]]; then let "line_count++" continue fi line="$space$line" #echo 'line='$line if [[ $first_line == "1" ]]; then beginer=`echo $line | awk -F" " '{ print $1}' ` first_line='0' fi current_beginer=`echo $line | awk -F" " '{ print $1}' ` #echo 'current_beginer='$current_beginer #echo 'beginer='$beginer if [[ $current_beginer == $beginer ]]; then if [[ $sql_flag == '1' ]]; then sql_flag='0' #echo 'TIMESTAMP:'$(date +%c)' Upload log_query table : SQL STATEMENT ='"$sql_line" log_date=`echo $sql_line | awk -F" " '{ print $1}' ` #echo 'log_date='$log_date log_time=`echo $sql_line | awk -F" " '{ print $2}' ` #echo 'log_time='$log_time duration=`echo $sql_line | awk -F" " '{ print $5}' ` #echo 'duration='$duration connect=`echo $sql_line | awk -F" " '{ print $3}' ` userdb=`echo $connect | awk -F":" '{ print $3}' ` userdb2=$userdb'@' db_port_log=`echo $connect | awk -F"@" '{ print $2}' ` log_database_name=`echo $db_port_log | awk -F":" '{ print $1}' ` #echo 'connect='$connect #echo 'userdb='$userdb #echo 'userdb2='$userdb2 #echo 'db_port_log='$db_port_log #echo 'log_database_name='$log_database_name if [[ "$log_database_name" != "$database_name" ]]; then echo '*** database_name '$log_database_name' from log is not equal '$database_name' CONTINUE ' continue; fi #replace ' to '' sql_modline=`echo "$sql_line" | sed 's/'\''/'\'''\''/g'` sql_line=' ' #echo '*********************************log_query start' #echo 'pid_str='$pid_str #echo 'ip_port='$ip_port #echo 'database_id='$database_id #echo 'log_date='$log_date #echo 'log_time='$log_time #echo 'duration='$duration #echo 'sql_modline='$sql_modline if ! psql -U monitor -d monitor -v ON_ERROR_STOP=1 -A -t -q -c "select log_query( '$pid_str' , '$ip_port' , $database_id , '$log_date' , '$log_time' , '$duration' , '$sql_modline' )" then echo 'FATAL_ERROR - log_query ' exit 1 fi #echo '**********************************log_query finish' fi #if [[ $sql_flag == '1' ]]; then let "line_count=line_count+1" #echo 'line_count= '$line_count #echo $line #check=`echo $line | awk -F" " '{ print $8}' ` #check_sql=${check^^} #echo 'check_sql='$check_sql if [[ ${line^^} =~ "SELECT" ]]; then if [[ $line =~ "duration:" ]]; then test_statement=`echo $line | awk -F" " '{ print $8}'` is_select=${test_statement^^} #echo 'test_statement='$test_statement #echo 'is_select='$is_select if [[ $is_select == 'SELECT' ]]; then sql_flag='1' sql_line="$sql_line$line" ip_port=`echo $sql_line | awk -F":" '{ print $4}' ` pid_str=`echo $sql_line | awk -F":" '{ print $6}' ` fi fi fi else #echo $line #echo 'sql_flag ='$sql_flag if [[ $sql_flag == '1' ]]; then sql_line="$sql_line$line" fi fi #if [[ $current_beginer == $beginer ]]; then done 

Füllen der Log_query-Tabelle
 --log_query.sql --insert new query into log_query table CREATE OR REPLACE FUNCTION log_query( pid_str text , ip_port text ,log_database_id integer , log_date text , log_time text , duration text , sql_line text ) RETURNS boolean AS $$ DECLARE result boolean ; log_timepoint timestamp without time zone ; log_duration double precision ; pos integer ; log_query text ; activity_string text ; log_md5hash text ; log_explain_plan text[] ; log_planhash text ; log_plan_wo_costs text[] ; database_rec record ; pg_stat_query text ; test_log_query text ; metric_rec record; log_query_rec record; found_flag boolean; pg_stat_history_rec record ; port_start integer ; port_end integer ; client_ip text ; client_port text ; log_queryid bigint ; log_query_text text ; pg_stat_query_text text ; current_pid_str text ; current_pid integer; pid_start_pos integer ; pid_finish_pos integer ; BEGIN result = TRUE ; IF ip_port != '[local]' THEN port_start = position('(' in ip_port); port_end = position(')' in ip_port); client_ip = substring( ip_port from 1 for port_start-1 ); client_port = substring( ip_port from port_start+1 for port_end-port_start-1 ); ELSE client_ip = 'local'; client_port = 'local'; END IF; pid_start_pos = position('[' in pid_str); pid_finish_pos = position(']' in pid_str); current_pid_str=substring( pid_str from 2 for pid_finish_pos - pid_start_pos -1 ); current_pid = to_number(current_pid_str , '999999999999'); SELECT e.host , d.name , d.owner_pwd , d.owner_user INTO database_rec FROM database d JOIN endpoint e ON e.id = d.endpoint_id WHERE d.id = log_database_id ; log_timepoint = to_timestamp(log_date||' '||log_time,'YYYY-MM-DD HH24-MI-SS'); log_duration = duration::double precision ; pos = position ('SELECT' in UPPER(sql_line) ); log_query = substring( sql_line from pos for LENGTH(sql_line)); log_query = regexp_replace(log_query,' +',' ','g'); log_query = regexp_replace(log_query,';+','','g'); log_query = trim(trailing ' ' from log_query); log_md5hash = md5( log_query::text ); --Explain execution plan-- EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' port=5432 dbname='||database_rec.name||' user='||database_rec.owner_user||' password='||database_rec.owner_pwd||' '')'; log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) ); log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) ); PERFORM dblink_disconnect('LINK1'); -------------------------- BEGIN INSERT INTO log_query ( query_md5hash , database_id , timepoint , duration , query , explained_plan , plan_md5hash , explained_plan_wo_costs , plan_hash_value , ip , port , pid ) VALUES ( log_md5hash , log_database_id , log_timepoint , log_duration , log_query , log_explain_plan , md5(log_explain_plan::text) , log_plan_wo_costs , md5(log_plan_wo_costs::text), client_ip , client_port , current_pid ); activity_string = 'New query has logged '|| ' database_id = '|| log_database_id || ' query_md5hash='||log_md5hash|| ' , timepoint = '||to_char(log_timepoint,'YYYYMMDD HH24:MI:SS'); PERFORM pg_log( log_database_id , 'log_query' , activity_string); EXCEPTION WHEN unique_violation THEN activity_string = 'EXCEPTION *** query already has logged '|| ' database_id = '|| log_database_id || ' query_md5hash='||log_md5hash|| ' , timepoint = '||to_char(log_timepoint,'YYYYMMDD HH24:MI:SS'); PERFORM pg_log( log_database_id , 'log_query' , activity_string); END; SELECT queryid INTO log_queryid FROM log_query WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint; IF log_queryid IS NOT NULL THEN RETURN result; END IF; ------------------------------------------------ SELECT * INTO log_query_rec FROM log_query WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ; log_query_rec.query=regexp_replace(log_query_rec.query,';+','','g'); FOR pg_stat_history_rec IN SELECT queryid , query FROM pg_stat_db_queries WHERE database_id = log_database_id AND queryid is not null LOOP pg_stat_query = pg_stat_history_rec.query ; pg_stat_query=regexp_replace(pg_stat_query,'\n+',' ','g'); pg_stat_query=regexp_replace(pg_stat_query,'\t+',' ','g'); pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g'); pg_stat_query=regexp_replace(pg_stat_query,'\$.','%','g'); log_query_text = trim(trailing ' ' from log_query_rec.query); pg_stat_query_text = pg_stat_query; IF (log_query_text LIKE pg_stat_query_text) THEN found_flag = TRUE ; ELSE found_flag = FALSE ; END IF; IF found_flag THEN UPDATE log_query SET queryid = pg_stat_history_rec.queryid WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ; activity_string = ' updated queryid = '||pg_stat_history_rec.queryid|| ' for log_query with id = '||log_query_rec.id; EXIT ; END IF ; END LOOP ; RETURN result ; END $$ LANGUAGE plpgsql; 

Infolgedessen enthält die Tabelle den tatsächlichen Anforderungstext, Ausführungspläne, den Hashwert des Ausführungsplans und den Hashwert des Anforderungstextes.

Geben Sie den Abfrage-ID-Wert in die Tabelle history_pg_stat_activity ein


update_history_pg_stat_activity_by_queryid.sql
 --update_history_pg_stat_activity_by_queryid.sql CREATE OR REPLACE FUNCTION update_history_pg_stat_activity_by_queryid() RETURNS boolean AS $$ DECLARE result boolean ; history_pg_stat_activity_rec record ; pg_stat_query text ; pg_stat_query_text text ; pg_stat_history_rec record; found_flag boolean; history_pg_stat_activity_query text ; query_text text ; activity_string text ; BEGIN RAISE NOTICE '***update_history_pg_stat_activity_by_queryid'; result = TRUE ; FOR history_pg_stat_activity_rec IN SELECT DISTINCT(query) AS query FROM activity_hist.history_pg_stat_activity WHERE queryid IS NULL LOOP history_pg_stat_activity_query = regexp_replace(history_pg_stat_activity_rec.query,'\n+',' ','g'); history_pg_stat_activity_query = regexp_replace(history_pg_stat_activity_query,'\t+',' ','g'); history_pg_stat_activity_query = regexp_replace(history_pg_stat_activity_query,' +',' ','g'); history_pg_stat_activity_query = regexp_replace(history_pg_stat_activity_query,';','','g'); query_text = trim(trailing ' ' from history_pg_stat_activity_query); FOR pg_stat_history_rec IN SELECT queryid , query FROM --pg_stat_history pg_stat_db_queries WHERE queryid is not null GROUP BY queryid , query LOOP pg_stat_query = pg_stat_history_rec.query ; pg_stat_query=regexp_replace(pg_stat_query,'\n+',' ','g'); pg_stat_query=regexp_replace(pg_stat_query,'\t+',' ','g'); pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g'); pg_stat_query=regexp_replace(pg_stat_query,'\$.','%','g'); pg_stat_query_text = pg_stat_query; IF (query_text LIKE pg_stat_query_text) THEN found_flag = TRUE ; ELSE found_flag = FALSE ; END IF; IF found_flag THEN UPDATE activity_hist.history_pg_stat_activity SET queryid = pg_stat_history_rec.queryid WHERE regexp_replace(regexp_replace(regexp_replace(regexp_replace(query,'\n+',' ','g'),'\t+',' ','g'),' +',' ','g'),';','','g') LIKE query_text||'%' ; activity_string = 'history_pg_stat_activity has updated by queryid = '||pg_stat_history_rec.queryid; RAISE NOTICE '%',activity_string; PERFORM pg_log( 999 , 'update_history_pg_stat_activity_by_queryid' , activity_string); EXIT ; END IF ; END LOOP ; IF NOT found_flag THEN activity_string = 'WARNING : Not FOUND queryid for the query : '||query_text ; RAISE NOTICE '%',activity_string; PERFORM pg_log( 999 , 'update_history_pg_stat_activity_by_queryid' , activity_string); END IF ; RAISE NOTICE 'UPDATE log_query if query has not logged in log-file'; END LOOP; RETURN result ; END $$ LANGUAGE plpgsql; 

Infolgedessen enthält die Tabelle den Abfrage-ID-Wert, der dem Abfrage-ID-Wert der Abfrage entspricht.

Zusammenfassung


Durch Verknüpfen von pg_stat_activity, pg_stat_statements und log_query erhalten Sie viele nützliche Informationen zu der Anforderung, insbesondere:

  • Geschichte der Umsetzungspläne.
  • Verlauf der CPU-Zeitanforderung.
  • Warteverlauf anfordern.

Daten und viele zusätzliche Berichte werden im nächsten Artikel beschrieben.

Entwicklung


Durch Verknüpfen der verfügbaren Informationen mit dem Verlauf der Ansicht pg_locks können Sie Informationen darüber abrufen, auf welche spezifischen Sperren die Anforderung gewartet hat und vor allem, welcher Prozess (Anforderung) diese Sperre gehalten hat.

Die Lösung für dieses Problem wird im nächsten Artikel beschrieben. Jetzt werden Tests und Verbesserungen durchgeführt.

Source: https://habr.com/ru/post/de467277/


All Articles