pg_stat_statements + pg_stat_activity + loq_query = pg_ash?

作为文章“ 尝试为PostgreSQL创建ASH类似物”的简短补充。

挑战赛


您需要链接视图历史pg_stat_statemenets,pg_stat_activity。 结果,使用log_query服务表中的执行计划的历史记录,您可以获得许多有用的信息,可用于解决性能事件和优化查询。
警告
由于该主题的新颖性和不完整的测试期限,因此该文章可能包含错误。 强烈鼓励和期望批评和评论。

输入数据


History_pg_stat_activity表
--ACTIVITY_HIST.HISTORY_PG_STAT_ACTIVITY DROP TABLE IF EXISTS activity_hist.history_pg_stat_activity; CREATE TABLE activity_hist.history_pg_stat_activity ( timepoint timestamp without time zone , datid oid , datname name , pid integer, usesysid oid , usename name , application_name text , client_addr inet , client_hostname text , client_port integer, backend_start timestamp without time zone , xact_start timestamp without time zone , query_start timestamp without time zone , state_change timestamp without time zone , wait_event_type text , wait_event text , state text , backend_xid xid , backend_xmin xid , query text , backend_type text , queryid bigint ); 

pg_stat_db_queries表
 CREATE TABLE pg_stat_db_queries ( database_id integer , queryid bigint , query text , max_time double precision );( 

mvw_pg_stat_queries的实例化视图
 CREATE MATERIALIZED VIEW public.mvw_pg_stat_queries AS SELECT t.queryid, t.max_time, t.query FROM public.dblink('LINK1'::text, 'SELECT queryid , max_time , query FROM pg_stat_statements WHERE dbid=(SELECT oid FROM pg_database WHERE datname=current_database() ) AND max_time >= 0 '::text) t(queryid bigint, max_time double precision, query text) WITH NO DATA; 

Log_query表
 CREATE TABLE log_query ( id integer , queryid bigint , query_md5hash text , database_id integer , timepoint timestamp without time zone , query text , explained_plan text[] , plan_md5hash text , explained_plan_wo_costs text[] , plan_hash_value text , ip text, port text , pid integer ); 

通用算法


刷新pg_stat_db_queries表


刷新材料视图mvw_pg_stat_queries
 CREATE OR REPLACE FUNCTION refresh_pg_stat_queries_list( database_id int) RETURNS BOOLEAN AS $$ DECLARE result BOOLEAN ; database_rec record ; BEGIN SELECT * INTO database_rec FROM endpoint e JOIN database d ON e.id = d.endpoint_id WHERE d.id = database_id ; IF NOT database_rec.is_need_monitoring THEN RAISE NOTICE 'NO NEED MONITORING FOR database_id=%',database_id; return TRUE ; END IF ; EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' port=5432 dbname='||database_rec.name|| ' user='||database_rec.s_name||' password='||database_rec.s_pass|| ' '')'; REFRESH MATERIALIZED VIEW mvw_pg_stat_queries ; PERFORM dblink_disconnect('LINK1'); RETURN result; END $$ LANGUAGE plpgsql; 

填充pg_stat_db_queries表
 CREATE OR REPLACE FUNCTION refresh_pg_stat_db_queries( ) RETURNS BOOLEAN AS $$ DECLARE result BOOLEAN ; database_rec record ; pg_stat_rec record ; BEGIN TRUNCATE pg_stat_db_queries; FOR database_rec IN SELECT * FROM database d LOOP IF NOT database_rec.is_need_monitoring THEN RAISE NOTICE 'NO NEED MONITORING FOR database_id=%',database_rec.id; CONTINUE ; END IF ; PERFORM refresh_pg_stat_queries_list( database_rec.id ) ; FOR pg_stat_rec IN SELECT * FROM mvw_pg_stat_queries LOOP INSERT INTO pg_stat_db_queries ( database_id , queryid , query , max_time ) VALUES ( database_rec.id , pg_stat_rec.queryid , pg_stat_rec.query , pg_stat_rec.max_time); END LOOP; END LOOP; RETURN TRUE; END $$ LANGUAGE plpgsql; 

结果,该表包含规范化查询文本queryid,当前时刻(用于监视)的最大查询执行时间。

填充log_query并形成执行计划的历史记录。


实际的请求文本取自日志文件。 从目标主机到监视主机的日志文件分为bash脚本,cron。 为了节省空间,并且由于在主机之间复制一个文本文件的任务很简单,因此未提供该脚本。

解析日志文件并提取查询文本
 #!/bin/bash ######################################################### # upload_log_query.sh # Upload table table from dowloaded aws file # version 12.0 ########################################################### echo 'TIMESTAMP:'$(date +%c)' Upload log_query table ' source_file=$1 echo 'source_file='$source_file database_id=$2 echo 'database_id='$database_id database_name=$3 echo 'database_name='$database_name beginer=' ' first_line='1' let "line_count=0" sql_line=' ' sql_flag=' ' space=' ' cat $source_file | while read line do #first line will be passed if [[ $line_count == '0' ]]; then let "line_count++" continue fi line="$space$line" #echo 'line='$line if [[ $first_line == "1" ]]; then beginer=`echo $line | awk -F" " '{ print $1}' ` first_line='0' fi current_beginer=`echo $line | awk -F" " '{ print $1}' ` #echo 'current_beginer='$current_beginer #echo 'beginer='$beginer if [[ $current_beginer == $beginer ]]; then if [[ $sql_flag == '1' ]]; then sql_flag='0' #echo 'TIMESTAMP:'$(date +%c)' Upload log_query table : SQL STATEMENT ='"$sql_line" log_date=`echo $sql_line | awk -F" " '{ print $1}' ` #echo 'log_date='$log_date log_time=`echo $sql_line | awk -F" " '{ print $2}' ` #echo 'log_time='$log_time duration=`echo $sql_line | awk -F" " '{ print $5}' ` #echo 'duration='$duration connect=`echo $sql_line | awk -F" " '{ print $3}' ` userdb=`echo $connect | awk -F":" '{ print $3}' ` userdb2=$userdb'@' db_port_log=`echo $connect | awk -F"@" '{ print $2}' ` log_database_name=`echo $db_port_log | awk -F":" '{ print $1}' ` #echo 'connect='$connect #echo 'userdb='$userdb #echo 'userdb2='$userdb2 #echo 'db_port_log='$db_port_log #echo 'log_database_name='$log_database_name if [[ "$log_database_name" != "$database_name" ]]; then echo '*** database_name '$log_database_name' from log is not equal '$database_name' CONTINUE ' continue; fi #replace ' to '' sql_modline=`echo "$sql_line" | sed 's/'\''/'\'''\''/g'` sql_line=' ' #echo '*********************************log_query start' #echo 'pid_str='$pid_str #echo 'ip_port='$ip_port #echo 'database_id='$database_id #echo 'log_date='$log_date #echo 'log_time='$log_time #echo 'duration='$duration #echo 'sql_modline='$sql_modline if ! psql -U monitor -d monitor -v ON_ERROR_STOP=1 -A -t -q -c "select log_query( '$pid_str' , '$ip_port' , $database_id , '$log_date' , '$log_time' , '$duration' , '$sql_modline' )" then echo 'FATAL_ERROR - log_query ' exit 1 fi #echo '**********************************log_query finish' fi #if [[ $sql_flag == '1' ]]; then let "line_count=line_count+1" #echo 'line_count= '$line_count #echo $line #check=`echo $line | awk -F" " '{ print $8}' ` #check_sql=${check^^} #echo 'check_sql='$check_sql if [[ ${line^^} =~ "SELECT" ]]; then if [[ $line =~ "duration:" ]]; then test_statement=`echo $line | awk -F" " '{ print $8}'` is_select=${test_statement^^} #echo 'test_statement='$test_statement #echo 'is_select='$is_select if [[ $is_select == 'SELECT' ]]; then sql_flag='1' sql_line="$sql_line$line" ip_port=`echo $sql_line | awk -F":" '{ print $4}' ` pid_str=`echo $sql_line | awk -F":" '{ print $6}' ` fi fi fi else #echo $line #echo 'sql_flag ='$sql_flag if [[ $sql_flag == '1' ]]; then sql_line="$sql_line$line" fi fi #if [[ $current_beginer == $beginer ]]; then done 

Log_query表填充
 --log_query.sql --insert new query into log_query table CREATE OR REPLACE FUNCTION log_query( pid_str text , ip_port text ,log_database_id integer , log_date text , log_time text , duration text , sql_line text ) RETURNS boolean AS $$ DECLARE result boolean ; log_timepoint timestamp without time zone ; log_duration double precision ; pos integer ; log_query text ; activity_string text ; log_md5hash text ; log_explain_plan text[] ; log_planhash text ; log_plan_wo_costs text[] ; database_rec record ; pg_stat_query text ; test_log_query text ; metric_rec record; log_query_rec record; found_flag boolean; pg_stat_history_rec record ; port_start integer ; port_end integer ; client_ip text ; client_port text ; log_queryid bigint ; log_query_text text ; pg_stat_query_text text ; current_pid_str text ; current_pid integer; pid_start_pos integer ; pid_finish_pos integer ; BEGIN result = TRUE ; IF ip_port != '[local]' THEN port_start = position('(' in ip_port); port_end = position(')' in ip_port); client_ip = substring( ip_port from 1 for port_start-1 ); client_port = substring( ip_port from port_start+1 for port_end-port_start-1 ); ELSE client_ip = 'local'; client_port = 'local'; END IF; pid_start_pos = position('[' in pid_str); pid_finish_pos = position(']' in pid_str); current_pid_str=substring( pid_str from 2 for pid_finish_pos - pid_start_pos -1 ); current_pid = to_number(current_pid_str , '999999999999'); SELECT e.host , d.name , d.owner_pwd , d.owner_user INTO database_rec FROM database d JOIN endpoint e ON e.id = d.endpoint_id WHERE d.id = log_database_id ; log_timepoint = to_timestamp(log_date||' '||log_time,'YYYY-MM-DD HH24-MI-SS'); log_duration = duration::double precision ; pos = position ('SELECT' in UPPER(sql_line) ); log_query = substring( sql_line from pos for LENGTH(sql_line)); log_query = regexp_replace(log_query,' +',' ','g'); log_query = regexp_replace(log_query,';+','','g'); log_query = trim(trailing ' ' from log_query); log_md5hash = md5( log_query::text ); --Explain execution plan-- EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' port=5432 dbname='||database_rec.name||' user='||database_rec.owner_user||' password='||database_rec.owner_pwd||' '')'; log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) ); log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) ); PERFORM dblink_disconnect('LINK1'); -------------------------- BEGIN INSERT INTO log_query ( query_md5hash , database_id , timepoint , duration , query , explained_plan , plan_md5hash , explained_plan_wo_costs , plan_hash_value , ip , port , pid ) VALUES ( log_md5hash , log_database_id , log_timepoint , log_duration , log_query , log_explain_plan , md5(log_explain_plan::text) , log_plan_wo_costs , md5(log_plan_wo_costs::text), client_ip , client_port , current_pid ); activity_string = 'New query has logged '|| ' database_id = '|| log_database_id || ' query_md5hash='||log_md5hash|| ' , timepoint = '||to_char(log_timepoint,'YYYYMMDD HH24:MI:SS'); PERFORM pg_log( log_database_id , 'log_query' , activity_string); EXCEPTION WHEN unique_violation THEN activity_string = 'EXCEPTION *** query already has logged '|| ' database_id = '|| log_database_id || ' query_md5hash='||log_md5hash|| ' , timepoint = '||to_char(log_timepoint,'YYYYMMDD HH24:MI:SS'); PERFORM pg_log( log_database_id , 'log_query' , activity_string); END; SELECT queryid INTO log_queryid FROM log_query WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint; IF log_queryid IS NOT NULL THEN RETURN result; END IF; ------------------------------------------------ SELECT * INTO log_query_rec FROM log_query WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ; log_query_rec.query=regexp_replace(log_query_rec.query,';+','','g'); FOR pg_stat_history_rec IN SELECT queryid , query FROM pg_stat_db_queries WHERE database_id = log_database_id AND queryid is not null LOOP pg_stat_query = pg_stat_history_rec.query ; pg_stat_query=regexp_replace(pg_stat_query,'\n+',' ','g'); pg_stat_query=regexp_replace(pg_stat_query,'\t+',' ','g'); pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g'); pg_stat_query=regexp_replace(pg_stat_query,'\$.','%','g'); log_query_text = trim(trailing ' ' from log_query_rec.query); pg_stat_query_text = pg_stat_query; IF (log_query_text LIKE pg_stat_query_text) THEN found_flag = TRUE ; ELSE found_flag = FALSE ; END IF; IF found_flag THEN UPDATE log_query SET queryid = pg_stat_history_rec.queryid WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ; activity_string = ' updated queryid = '||pg_stat_history_rec.queryid|| ' for log_query with id = '||log_query_rec.id; EXIT ; END IF ; END LOOP ; RETURN result ; END $$ LANGUAGE plpgsql; 

结果,该表包含实际的请求文本,执行计划,执行计划的哈希值,请求文本的哈希值。

在history_pg_stat_activity表中填写queryid值


update_history_pg_stat_activity_by_queryid.sql
 --update_history_pg_stat_activity_by_queryid.sql CREATE OR REPLACE FUNCTION update_history_pg_stat_activity_by_queryid() RETURNS boolean AS $$ DECLARE result boolean ; history_pg_stat_activity_rec record ; pg_stat_query text ; pg_stat_query_text text ; pg_stat_history_rec record; found_flag boolean; history_pg_stat_activity_query text ; query_text text ; activity_string text ; BEGIN RAISE NOTICE '***update_history_pg_stat_activity_by_queryid'; result = TRUE ; FOR history_pg_stat_activity_rec IN SELECT DISTINCT(query) AS query FROM activity_hist.history_pg_stat_activity WHERE queryid IS NULL LOOP history_pg_stat_activity_query = regexp_replace(history_pg_stat_activity_rec.query,'\n+',' ','g'); history_pg_stat_activity_query = regexp_replace(history_pg_stat_activity_query,'\t+',' ','g'); history_pg_stat_activity_query = regexp_replace(history_pg_stat_activity_query,' +',' ','g'); history_pg_stat_activity_query = regexp_replace(history_pg_stat_activity_query,';','','g'); query_text = trim(trailing ' ' from history_pg_stat_activity_query); FOR pg_stat_history_rec IN SELECT queryid , query FROM --pg_stat_history pg_stat_db_queries WHERE queryid is not null GROUP BY queryid , query LOOP pg_stat_query = pg_stat_history_rec.query ; pg_stat_query=regexp_replace(pg_stat_query,'\n+',' ','g'); pg_stat_query=regexp_replace(pg_stat_query,'\t+',' ','g'); pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g'); pg_stat_query=regexp_replace(pg_stat_query,'\$.','%','g'); pg_stat_query_text = pg_stat_query; IF (query_text LIKE pg_stat_query_text) THEN found_flag = TRUE ; ELSE found_flag = FALSE ; END IF; IF found_flag THEN UPDATE activity_hist.history_pg_stat_activity SET queryid = pg_stat_history_rec.queryid WHERE regexp_replace(regexp_replace(regexp_replace(regexp_replace(query,'\n+',' ','g'),'\t+',' ','g'),' +',' ','g'),';','','g') LIKE query_text||'%' ; activity_string = 'history_pg_stat_activity has updated by queryid = '||pg_stat_history_rec.queryid; RAISE NOTICE '%',activity_string; PERFORM pg_log( 999 , 'update_history_pg_stat_activity_by_queryid' , activity_string); EXIT ; END IF ; END LOOP ; IF NOT found_flag THEN activity_string = 'WARNING : Not FOUND queryid for the query : '||query_text ; RAISE NOTICE '%',activity_string; PERFORM pg_log( 999 , 'update_history_pg_stat_activity_by_queryid' , activity_string); END IF ; RAISE NOTICE 'UPDATE log_query if query has not logged in log-file'; END LOOP; RETURN result ; END $$ LANGUAGE plpgsql; 

结果,该表包含与查询的queryid值相对应的queryid值。

总结


通过链接pg_stat_activity,pg_stat_statements,log_query,您可以获得有关请求的许多有用信息,尤其是:

  • 实施计划的历史。
  • CPU时间请求的历史记录。
  • 请求等待历史。

数据和许多其他报告将在下一篇文章中介绍。

发展历程


通过将可用信息与pg_locks视图的历史链接起来,可以获取有关请求正在等待哪个特定锁,最重要的是,哪个进程(请求)持有该锁的信息。

下一个文章将描述该问题的解决方案。 现在正在进行测试和改进。

Source: https://habr.com/ru/post/zh-CN467277/


All Articles