Skip to content

Commit 039549d

Browse files
committed
Flush the IO statistics of active WAL senders more frequently
WAL senders do not flush their statistics until they exit, limiting the monitoring possible for live processes. This is penalizing when WAL senders are running for a long time, like in streaming or logical replication setups, because it is not possible to know the amount of IO they generate while running. This commit makes WAL senders more aggressive with their statistics flush, using an internal of 1 second, with the flush timing calculated based on the existing GetCurrentTimestamp() done before the sleeps done to wait for some activity. Note that the sleep done for logical and physical WAL senders happens in two different code paths, so the stats flushes need to happen in these two places. One test is added for the physical WAL sender case, and one for the logical WAL sender case. This can be done in a stable fashion by relying on the WAL generated by the TAP tests in combination with a stats reset while a server is running, but only on HEAD as WAL data has been added to pg_stat_io in a051e71. This issue exists since a9c70b4 and the introduction of pg_stat_io, so backpatch down to v16. Author: Bertrand Drouvot <bertranddrouvot.pg@gmail.com> Reviewed-by: vignesh C <vignesh21@gmail.com> Reviewed-by: Xuneng Zhou <xunengzhou@gmail.com> Discussion: https://postgr.es/m/Z73IsKBceoVd4t55@ip-10-97-1-34.eu-west-3.compute.internal Backpatch-through: 16
1 parent ba2a3c2 commit 039549d

File tree

3 files changed

+66
-2
lines changed

3 files changed

+66
-2
lines changed

src/backend/replication/walsender.c

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,14 @@
9191
#include "utils/guc.h"
9292
#include "utils/memutils.h"
9393
#include "utils/pg_lsn.h"
94+
#include "utils/pgstat_internal.h"
9495
#include "utils/ps_status.h"
9596
#include "utils/timeout.h"
9697
#include "utils/timestamp.h"
9798

99+
/* Minimum interval used by walsender for stats flushes, in ms */
100+
#define WALSENDER_STATS_FLUSH_INTERVAL 1000
101+
98102
/*
99103
* Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
100104
*
@@ -1797,6 +1801,7 @@ WalSndWaitForWal(XLogRecPtr loc)
17971801
int wakeEvents;
17981802
uint32 wait_event = 0;
17991803
static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1804+
TimestampTz last_flush = 0;
18001805

18011806
/*
18021807
* Fast path to avoid acquiring the spinlock in case we already know we
@@ -1817,6 +1822,7 @@ WalSndWaitForWal(XLogRecPtr loc)
18171822
{
18181823
bool wait_for_standby_at_stop = false;
18191824
long sleeptime;
1825+
TimestampTz now;
18201826

18211827
/* Clear any already-pending wakeups */
18221828
ResetLatch(MyLatch);
@@ -1927,7 +1933,8 @@ WalSndWaitForWal(XLogRecPtr loc)
19271933
* new WAL to be generated. (But if we have nothing to send, we don't
19281934
* want to wake on socket-writable.)
19291935
*/
1930-
sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
1936+
now = GetCurrentTimestamp();
1937+
sleeptime = WalSndComputeSleeptime(now);
19311938

19321939
wakeEvents = WL_SOCKET_READABLE;
19331940

@@ -1936,6 +1943,15 @@ WalSndWaitForWal(XLogRecPtr loc)
19361943

19371944
Assert(wait_event != 0);
19381945

1946+
/* Report IO statistics, if needed */
1947+
if (TimestampDifferenceExceeds(last_flush, now,
1948+
WALSENDER_STATS_FLUSH_INTERVAL))
1949+
{
1950+
pgstat_flush_io(false);
1951+
(void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO);
1952+
last_flush = now;
1953+
}
1954+
19391955
WalSndWait(wakeEvents, sleeptime, wait_event);
19401956
}
19411957

@@ -2742,6 +2758,8 @@ WalSndCheckTimeOut(void)
27422758
static void
27432759
WalSndLoop(WalSndSendDataCallback send_data)
27442760
{
2761+
TimestampTz last_flush = 0;
2762+
27452763
/*
27462764
* Initialize the last reply timestamp. That enables timeout processing
27472765
* from hereon.
@@ -2836,13 +2854,17 @@ WalSndLoop(WalSndSendDataCallback send_data)
28362854
* WalSndWaitForWal() handle any other blocking; idle receivers need
28372855
* its additional actions. For physical replication, also block if
28382856
* caught up; its send_data does not block.
2857+
*
2858+
* The IO statistics are reported in WalSndWaitForWal() for the
2859+
* logical WAL senders.
28392860
*/
28402861
if ((WalSndCaughtUp && send_data != XLogSendLogical &&
28412862
!streamingDoneSending) ||
28422863
pq_is_send_pending())
28432864
{
28442865
long sleeptime;
28452866
int wakeEvents;
2867+
TimestampTz now;
28462868

28472869
if (!streamingDoneReceiving)
28482870
wakeEvents = WL_SOCKET_READABLE;
@@ -2853,11 +2875,21 @@ WalSndLoop(WalSndSendDataCallback send_data)
28532875
* Use fresh timestamp, not last_processing, to reduce the chance
28542876
* of reaching wal_sender_timeout before sending a keepalive.
28552877
*/
2856-
sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
2878+
now = GetCurrentTimestamp();
2879+
sleeptime = WalSndComputeSleeptime(now);
28572880

28582881
if (pq_is_send_pending())
28592882
wakeEvents |= WL_SOCKET_WRITEABLE;
28602883

2884+
/* Report IO statistics, if needed */
2885+
if (TimestampDifferenceExceeds(last_flush, now,
2886+
WALSENDER_STATS_FLUSH_INTERVAL))
2887+
{
2888+
pgstat_flush_io(false);
2889+
(void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO);
2890+
last_flush = now;
2891+
}
2892+
28612893
/* Sleep until something happens or we time out */
28622894
WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
28632895
}

src/test/recovery/t/001_stream_rep.pl

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@
4242
has_streaming => 1);
4343
$node_standby_2->start;
4444

45+
# Reset IO statistics, for the WAL sender check with pg_stat_io.
46+
$node_primary->safe_psql('postgres', "SELECT pg_stat_reset_shared('io')");
47+
4548
# Create some content on primary and check its presence in standby nodes
4649
$node_primary->safe_psql('postgres',
4750
"CREATE TABLE tab_int AS SELECT generate_series(1,1002) AS a");
@@ -333,6 +336,19 @@ sub test_target_session_attrs
333336

334337
note "switching to physical replication slot";
335338

339+
# Wait for the physical WAL sender to update its IO statistics. This is
340+
# done before the next restart, which would force a flush of its stats, and
341+
# far enough from the reset done above to not impact the run time.
342+
$node_primary->poll_query_until(
343+
'postgres',
344+
qq[SELECT sum(reads) > 0
345+
FROM pg_catalog.pg_stat_io
346+
WHERE backend_type = 'walsender'
347+
AND object = 'wal']
348+
)
349+
or die
350+
"Timed out while waiting for the walsender to update its IO statistics";
351+
336352
# Switch to using a physical replication slot. We can do this without a new
337353
# backup since physical slots can go backwards if needed. Do so on both
338354
# standbys. Since we're going to be testing things that affect the slot state,

src/test/subscription/t/001_rep_changes.pl

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,9 @@
113113
# Wait for initial table sync to finish
114114
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
115115

116+
# Reset IO statistics, for the WAL sender check with pg_stat_io.
117+
$node_publisher->safe_psql('postgres', "SELECT pg_stat_reset_shared('io')");
118+
116119
my $result =
117120
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep");
118121
is($result, qq(0), 'check non-replicated table is empty on subscriber');
@@ -184,6 +187,19 @@
184187
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_no_col");
185188
is($result, qq(2), 'check replicated changes for table having no columns');
186189

190+
# Wait for the logical WAL sender to update its IO statistics. This is
191+
# done before the next restart, which would force a flush of its stats, and
192+
# far enough from the reset done above to not impact the run time.
193+
$node_publisher->poll_query_until(
194+
'postgres',
195+
qq[SELECT sum(reads) > 0
196+
FROM pg_catalog.pg_stat_io
197+
WHERE backend_type = 'walsender'
198+
AND object = 'wal']
199+
)
200+
or die
201+
"Timed out while waiting for the walsender to update its IO statistics";
202+
187203
# insert some duplicate rows
188204
$node_publisher->safe_psql('postgres',
189205
"INSERT INTO tab_full SELECT generate_series(1,10)");

0 commit comments

Comments
 (0)