Skip to content

Commit 6f29f9d

Browse files
committed
iproto: introduce graceful shutdown protocol
This commit adds the graceful shutdown feature to the IPROTO protocol. Net.box is patched by the next commit, which also adds tests. Part of #5924 @TarantoolBot document Title: Document IPROTO graceful shutdown A new IPROTO request type was introduced - `IPROTO_SHUTDOWN`, code 77. When asked to shut down (`os.exit()` is called or `SIGTERM` signal is received), a server stops accepting new connections and sends a packet of this type to each of its clients that support the graceful shutdown feature (see below how a server figures out if a client supports the feature). The server won't exit until all the clients that were sent the packets close connections. If all the clients don't close connections within the shutdown timeout, the server will exit anyway. The default shutdown timeout is 3 seconds, and it can be configured with `box.ctl.set_on_shutdown_timeout()`, which also determines the timeout of `box.ctl.on_shutdown()` triggers. An `IPROTO_SHUTDOWN` packet doesn't have any keys in its headers (not even sync number or schema version) nor a body. A client isn't supposed to reply to an `IPROTO_SHUTDOWN` packet. Instead it's supposed to close its connection as soon as possible. A client may wait for pending requests to complete and even send new requests after receiving an `IPROTO_SHUTDOWN` packet. The server will serve them as usual until it exits on timeout. Clients that support the graceful shutdown feature are supposed to set the `IPROTO_FEATURE_GRACEFUL_SHUTDOWN` feature (bit 4) when sending an `IPROTO_ID` request to a server. Servers that support the feature set the same bit in reply to an `IPROTO_ID` request. Introduction of this feature bumped the IPROTO protocol version up to 4.
1 parent fbc25aa commit 6f29f9d

File tree

10 files changed

+165
-9
lines changed

10 files changed

+165
-9
lines changed

src/box/iproto.cc

Lines changed: 89 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141

4242
#include "version.h"
4343
#include "fiber.h"
44+
#include "fiber_cond.h"
4445
#include "cbus.h"
4546
#include "say.h"
4647
#include "sio.h"
@@ -68,6 +69,7 @@
6869
#include "salad/stailq.h"
6970
#include "assoc.h"
7071
#include "txn.h"
72+
#include "on_shutdown.h"
7173

7274
enum {
7375
IPROTO_SALT_SIZE = 32,
@@ -255,6 +257,23 @@ unsigned iproto_readahead = 16320;
255257
/* The maximal number of iproto messages in fly. */
256258
static int iproto_msg_max = IPROTO_MSG_MAX_MIN;
257259

260+
/**
261+
* List of connections blocking shutdown in tx, linked by
262+
* iproto_connection::in_shutdown_list. Accessed and modified
263+
* only from the tx thread.
264+
*
265+
* The shutdown trigger callback won't return until it's empty.
266+
* We add a connection to the list if it supports the graceful
267+
* shutdown feature, and remove it when it's closed.
268+
*/
269+
static RLIST_HEAD(shutdown_list);
270+
271+
/** Signalled when shutdown_list becomes empty. */
272+
static struct fiber_cond shutdown_list_empty_cond;
273+
274+
/** This flag is set when the shutdown trigger is called. */
275+
static bool shutdown_is_inprogress;
276+
258277
int
259278
iproto_addr_count(void)
260279
{
@@ -645,6 +664,8 @@ struct iproto_connection
645664
* return.
646665
*/
647666
bool is_push_pending;
667+
/** Link in shutdown_list. */
668+
struct rlist in_shutdown_list;
648669
} tx;
649670
/** Authentication salt. */
650671
char salt[IPROTO_SALT_SIZE];
@@ -1433,6 +1454,7 @@ iproto_connection_new(struct iproto_thread *iproto_thread)
14331454
con->state = IPROTO_CONNECTION_ALIVE;
14341455
con->tx.is_push_pending = false;
14351456
con->tx.is_push_sent = false;
1457+
rlist_create(&con->tx.in_shutdown_list);
14361458
rmean_collect(iproto_thread->rmean, IPROTO_CONNECTIONS, 1);
14371459
return con;
14381460
}
@@ -1445,6 +1467,7 @@ iproto_connection_delete(struct iproto_connection *con)
14451467
assert(!iostream_is_initialized(&con->io));
14461468
assert(con->session == NULL);
14471469
assert(con->state == IPROTO_CONNECTION_DESTROYED);
1470+
assert(rlist_empty(&con->tx.in_shutdown_list));
14481471
/*
14491472
* The output buffers must have been deleted
14501473
* in tx thread.
@@ -1711,6 +1734,9 @@ tx_process_disconnect(struct cmsg *m)
17111734
session_run_on_disconnect_triggers(con->session);
17121735
}
17131736
}
1737+
rlist_del_entry(con, tx.in_shutdown_list);
1738+
if (rlist_empty(&shutdown_list))
1739+
fiber_cond_broadcast(&shutdown_list_empty_cond);
17141740
}
17151741

17161742
static void
@@ -2154,6 +2180,28 @@ tx_process_call(struct cmsg *m)
21542180
tx_end_msg(msg);
21552181
}
21562182

2183+
static void
2184+
tx_process_id(struct iproto_connection *con, const struct id_request *id)
2185+
{
2186+
con->session->meta.features = id->features;
2187+
if (iproto_features_test(&id->features,
2188+
IPROTO_FEATURE_GRACEFUL_SHUTDOWN)) {
2189+
rlist_add_entry(&shutdown_list, con, tx.in_shutdown_list);
2190+
if (shutdown_is_inprogress) {
2191+
/*
2192+
* IPROTO_ID enabling graceful shutdown was received
2193+
* after we've sent shutdown packets. Sent a packet
2194+
* to this one as well so that it doesn't block
2195+
* shutdown.
2196+
*/
2197+
if (iproto_send_shutdown(con->tx.p_obuf) != 0) {
2198+
diag_log();
2199+
rlist_del_entry(con, tx.in_shutdown_list);
2200+
}
2201+
}
2202+
}
2203+
}
2204+
21572205
static void
21582206
iproto_session_notify(struct session *session,
21592207
const char *key, size_t key_len,
@@ -2182,7 +2230,7 @@ tx_process_misc(struct cmsg *m)
21822230
::schema_version);
21832231
break;
21842232
case IPROTO_ID:
2185-
con->session->meta.features = msg->id.features;
2233+
tx_process_id(con, &msg->id);
21862234
iproto_reply_id_xc(out, msg->header.sync,
21872235
::schema_version);
21882236
break;
@@ -2758,6 +2806,40 @@ iproto_session_notify(struct session *session,
27582806

27592807
/** }}} */
27602808

2809+
static void
2810+
iproto_send_stop_msg(void);
2811+
2812+
/**
2813+
* Shutdown trigger callback.
2814+
* 1. Stops accepting new connections.
2815+
* 2. Sends an IPROTO_SHUTDOWN packet to all clients that support
2816+
* the graceful shutdown feature.
2817+
* 3. Waits for the clients to close connections.
2818+
*/
2819+
static int
2820+
iproto_on_shutdown_f(void *arg)
2821+
{
2822+
(void)arg;
2823+
assert(!shutdown_is_inprogress);
2824+
shutdown_is_inprogress = true;
2825+
fiber_set_name(fiber_self(), "iproto.shutdown");
2826+
iproto_send_stop_msg();
2827+
evio_service_stop(&tx_binary);
2828+
struct iproto_connection *con, *next_con;
2829+
rlist_foreach_entry_safe(con, &shutdown_list, tx.in_shutdown_list,
2830+
next_con) {
2831+
if (iproto_send_shutdown(con->tx.p_obuf) != 0) {
2832+
diag_log();
2833+
rlist_del_entry(con, tx.in_shutdown_list);
2834+
continue;
2835+
}
2836+
tx_push(con);
2837+
}
2838+
while (!rlist_empty(&shutdown_list))
2839+
fiber_cond_wait(&shutdown_list_empty_cond);
2840+
return 0;
2841+
}
2842+
27612843
static inline void
27622844
iproto_thread_init_routes(struct iproto_thread *iproto_thread)
27632845
{
@@ -2914,6 +2996,10 @@ iproto_init(int threads_count)
29142996
}
29152997

29162998
session_vtab_registry[SESSION_TYPE_BINARY] = iproto_session_vtab;
2999+
3000+
fiber_cond_create(&shutdown_list_empty_cond);
3001+
if (box_on_shutdown(NULL, iproto_on_shutdown_f, NULL) != 0)
3002+
panic("failed to set iproto shutdown trigger");
29173003
return;
29183004

29193005
fail:
@@ -3063,7 +3149,7 @@ iproto_do_cfg_crit(struct iproto_thread *iproto_thread,
30633149
assert(rc == 0);
30643150
}
30653151

3066-
static inline void
3152+
static void
30673153
iproto_send_stop_msg(void)
30683154
{
30693155
struct iproto_cfg_msg cfg_msg;
@@ -3072,7 +3158,7 @@ iproto_send_stop_msg(void)
30723158
iproto_do_cfg_crit(&iproto_threads[i], &cfg_msg);
30733159
}
30743160

3075-
static inline int
3161+
static int
30763162
iproto_send_listen_msg(struct evio_service *binary)
30773163
{
30783164
struct iproto_cfg_msg cfg_msg;

src/box/iproto_constants.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,17 @@ enum iproto_type {
282282
IPROTO_WATCH = 74,
283283
IPROTO_UNWATCH = 75,
284284
IPROTO_EVENT = 76,
285+
/**
286+
* Special request type sent by a server to all clients that support
287+
* the graceful shutdown feature to notify them that the server is
288+
* about to be shut down. The server won't exit until all the clients
289+
* close connections or a timeout occurs.
290+
*
291+
* The request type is asynchronous (doesn't have a sync number):
292+
* a client isn't supposed to reply to it; instead it's expected to
293+
* close the connection as soon as possible.
294+
*/
295+
IPROTO_SHUTDOWN = 77,
285296

286297
/** Vinyl run info stored in .index file */
287298
VY_INDEX_RUN_INFO = 100,

src/box/iproto_features.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,6 @@ iproto_features_init(void)
6666
IPROTO_FEATURE_ERROR_EXTENSION);
6767
iproto_features_set(&IPROTO_CURRENT_FEATURES,
6868
IPROTO_FEATURE_WATCHERS);
69+
iproto_features_set(&IPROTO_CURRENT_FEATURES,
70+
IPROTO_FEATURE_GRACEFUL_SHUTDOWN);
6971
}

src/box/iproto_features.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,15 @@ enum iproto_feature_id {
4545
* IPROTO_WATCH, IPROTO_UNWATCH, IPROTO_EVENT commands.
4646
*/
4747
IPROTO_FEATURE_WATCHERS = 3,
48+
/**
49+
* Graceful shutdown support: IPROTO_SHUTDOWN command.
50+
*
51+
* Upon receiving a shutdown request (os.exit or SIGTERM), a server
52+
* will send an IPROTO_SHUTDOWN packet to all clients that support
53+
* the graceful shutdown feature and won't exit until all of them
54+
* close connections or a timeout occurs.
55+
*/
56+
IPROTO_FEATURE_GRACEFUL_SHUTDOWN = 4,
4857
iproto_feature_id_MAX,
4958
};
5059

@@ -60,7 +69,7 @@ struct iproto_features {
6069
* It should be incremented every time a new feature is added or removed.
6170
*/
6271
enum {
63-
IPROTO_CURRENT_VERSION = 3,
72+
IPROTO_CURRENT_VERSION = 4,
6473
};
6574

6675
/**

src/box/lua/net_box.lua

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ local IPROTO_FEATURE_NAMES = {
5353
[1] = 'transactions',
5454
[2] = 'error_extension',
5555
[3] = 'watchers',
56+
[4] = 'graceful_shutdown',
5657
}
5758

5859
-- Given an array of IPROTO feature ids, returns a map {feature_name: bool}.

src/box/xrow.c

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -798,6 +798,37 @@ iproto_send_event(struct obuf *out, const char *key, size_t key_len,
798798
return 0;
799799
}
800800

801+
int
802+
iproto_send_shutdown(struct obuf *out)
803+
{
804+
/*
805+
* Calculate the packet size.
806+
* Note: no body; no sync or schema version.
807+
*/
808+
size_t size = 5;
809+
size += mp_sizeof_map(1);
810+
size += mp_sizeof_uint(IPROTO_REQUEST_TYPE);
811+
size += mp_sizeof_uint(IPROTO_SHUTDOWN);
812+
/* Encode the packet. */
813+
char *buf = obuf_alloc(out, size);
814+
if (buf == NULL) {
815+
diag_set(OutOfMemory, size, "obuf_alloc", "buf");
816+
return -1;
817+
}
818+
char *p = buf;
819+
/* Fix header. */
820+
*(p++) = 0xce;
821+
mp_store_u32(p, size - 5);
822+
p += 4;
823+
/* Packet header. */
824+
p = mp_encode_map(p, 1);
825+
p = mp_encode_uint(p, IPROTO_REQUEST_TYPE);
826+
p = mp_encode_uint(p, IPROTO_SHUTDOWN);
827+
assert(size == (size_t)(p - buf));
828+
(void)p;
829+
return 0;
830+
}
831+
801832
int
802833
xrow_decode_dml(struct xrow_header *row, struct request *request,
803834
uint64_t key_map)

src/box/xrow.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -803,6 +803,13 @@ int
803803
iproto_send_event(struct obuf *out, const char *key, size_t key_len,
804804
const char *data, const char *data_end);
805805

806+
/**
807+
* Encode IPROTO_SHUTDOWN packet.
808+
* @param out Encode to.
809+
*/
810+
int
811+
iproto_send_shutdown(struct obuf *out);
812+
806813
/** Write error directly to a socket. */
807814
void
808815
iproto_do_write_error(struct iostream *io, const struct error *e,

src/lib/core/evio.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,8 @@ evio_service_attach(struct evio_service *dst, const struct evio_service *src)
485485
void
486486
evio_service_detach(struct evio_service *service)
487487
{
488+
if (service->entries == NULL)
489+
return;
488490
for (int i = 0; i < service->entry_count; i++)
489491
evio_service_entry_detach(&service->entries[i]);
490492
free(service->entries);
@@ -515,6 +517,8 @@ evio_service_listen(struct evio_service *service)
515517
void
516518
evio_service_stop(struct evio_service *service)
517519
{
520+
if (service->entries == NULL)
521+
return;
518522
say_info("%s: stopped", evio_service_name(service));
519523
for (int i = 0; i < service->entry_count; i++)
520524
evio_service_entry_stop(&service->entries[i]);

test/box-py/iproto.result

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,9 +208,9 @@ Invalid MsgPack - request body
208208
# Invalid features
209209
Invalid MsgPack - request body
210210
# Empty request body
211-
version=3, features=[0, 1, 2, 3]
211+
version=4, features=[0, 1, 2, 3, 4]
212212
# Unknown version and features
213-
version=3, features=[0, 1, 2, 3]
213+
version=4, features=[0, 1, 2, 3, 4]
214214

215215
#
216216
# gh-6257 Watchers

test/box/net.box_iproto_id.result

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@ c = net.connect(box.cfg.listen)
1515
| ...
1616
c.peer_protocol_version
1717
| ---
18-
| - 3
18+
| - 4
1919
| ...
2020
c.peer_protocol_features
2121
| ---
2222
| - transactions: true
2323
| watchers: true
2424
| error_extension: true
2525
| streams: true
26+
| graceful_shutdown: true
2627
| ...
2728
c:close()
2829
| ---
@@ -50,6 +51,7 @@ c.peer_protocol_features
5051
| watchers: false
5152
| error_extension: false
5253
| streams: false
54+
| graceful_shutdown: false
5355
| ...
5456
errinj.set('ERRINJ_IPROTO_DISABLE_ID', false)
5557
| ---
@@ -98,6 +100,7 @@ c.peer_protocol_features
98100
| watchers: true
99101
| error_extension: true
100102
| streams: true
103+
| graceful_shutdown: true
101104
| ...
102105
c:close()
103106
| ---
@@ -143,14 +146,15 @@ c.error -- error
143146
| ...
144147
c.peer_protocol_version
145148
| ---
146-
| - 3
149+
| - 4
147150
| ...
148151
c.peer_protocol_features
149152
| ---
150153
| - transactions: false
151154
| watchers: true
152155
| error_extension: true
153156
| streams: true
157+
| graceful_shutdown: true
154158
| ...
155159
c:close()
156160
| ---
@@ -169,14 +173,15 @@ c.error -- error
169173
| ...
170174
c.peer_protocol_version
171175
| ---
172-
| - 3
176+
| - 4
173177
| ...
174178
c.peer_protocol_features
175179
| ---
176180
| - transactions: true
177181
| watchers: true
178182
| error_extension: true
179183
| streams: true
184+
| graceful_shutdown: true
180185
| ...
181186
c:close()
182187
| ---

0 commit comments

Comments
 (0)