Skip to content

Commit 5497a41

Browse files
author
Darshan M N
committed
WL#13543 InnoDB: Provide partitioning information to RAPID during load
To support partitioned table in change propagation secondary engine requires InnoDB to provide partitioning information during table offload. As part of the offload process the parallel read threads spawned by InnoDB converts the record from the InnoDB to MySQL format and stacks up the records in a buffer and sends the buffer to RAPID via a callback. As part of this worklog, a) the callback provides the partition ID information of the partition it is working with b) all records that are present in the buffer sent via the callback will always belong to the same partition RB: 23739 Reviewed-by: Mayank Prasad <mayank.prasad@oracle.com> Reviewed-by: Sunny Bains <Sunny.Bains@oracle.com>
1 parent 6475320 commit 5497a41

File tree

8 files changed

+86
-34
lines changed

8 files changed

+86
-34
lines changed

share/messages_to_error_log.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10824,6 +10824,9 @@ ER_IB_ERR_PAGE_ARCH_INVALID_FORMAT
1082410824
ER_INVALID_XPLUGIN_SOCKET_SAME_AS_SERVER
1082510825
eng "X Plugins UNIX socket must use different file than MySQL server. X Plugin won't be accessible through UNIX socket"
1082610826

10827+
ER_INNODB_UNABLE_TO_ACQUIRE_DD_OBJECT
10828+
eng "%s"
10829+
1082710830
# DO NOT add server-to-client messages here;
1082810831
# they go in messages_to_clients.txt
1082910832
# in the same directory as this file.

sql/handler.h

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4420,14 +4420,17 @@ class handler {
44204420
/**
44214421
This callback is called by each parallel load thread when processing
44224422
of rows is required for the adapter scan.
4423-
@param[in] cookie The cookie for this thread
4424-
@param[in] nrows The nrows that are available
4425-
@param[in] rowdata The mysql-in-memory row data buffer. This is a memory
4426-
buffer for nrows records. The length of each record
4427-
is fixed and communicated via Load_init_cbk
4423+
@param[in] cookie The cookie for this thread
4424+
@param[in] nrows The nrows that are available
4425+
@param[in] rowdata The mysql-in-memory row data buffer. This is a
4426+
memory buffer for nrows records. The length of each record is fixed and
4427+
communicated via Load_init_cbk
4428+
@param[in] partition_id Partition id if it's a partitioned table, else
4429+
std::numeric_limits<uint64_t>::max()
44284430
@returns true if there is an error, false otherwise.
44294431
*/
4430-
using Load_cbk = std::function<bool(void *cookie, uint nrows, void *rowdata)>;
4432+
using Load_cbk = std::function<bool(void *cookie, uint nrows, void *rowdata,
4433+
uint64_t partition_id)>;
44314434

44324435
/**
44334436
This callback is called by each parallel load thread when processing

storage/innobase/handler/handler0alter.cc

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9766,8 +9766,21 @@ int ha_innopart::parallel_scan_init(void *&scan_ctx, size_t &num_threads) {
97669766

97679767
trx_assign_read_view(trx);
97689768

9769-
const Parallel_reader::Scan_range FULL_SCAN{};
9769+
auto dd_client = ha_thd()->dd_client();
9770+
dd::cache::Dictionary_client::Auto_releaser releaser(dd_client);
9771+
const dd::Table *dd_table = nullptr;
9772+
9773+
if (dd_client->acquire(table_share->db.str, table_share->table_name.str,
9774+
&dd_table)) {
9775+
ib::error(ER_INNODB_UNABLE_TO_ACQUIRE_DD_OBJECT)
9776+
<< "InnoDB can't get table object for table"
9777+
<< table_share->table_name.str;
9778+
return HA_ERR_INTERNAL_ERROR;
9779+
}
97709780

9781+
auto dd_partitions = dd_table->leaf_partitions();
9782+
9783+
const Parallel_reader::Scan_range FULL_SCAN{};
97719784
const auto first_used_partition = m_part_info->get_first_used_partition();
97729785

97739786
for (auto i = first_used_partition; i < m_tot_parts;
@@ -9779,12 +9792,13 @@ int ha_innopart::parallel_scan_init(void *&scan_ctx, size_t &num_threads) {
97799792
m_prebuilt->table->name.m_name);
97809793

97819794
UT_DELETE(adapter);
9782-
return (HA_ERR_NO_SUCH_TABLE);
9795+
return HA_ERR_NO_SUCH_TABLE;
97839796
}
97849797

97859798
build_template(true);
97869799

9787-
Parallel_reader::Config config(FULL_SCAN, m_prebuilt->table->first_index());
9800+
Parallel_reader::Config config(FULL_SCAN, m_prebuilt->table->first_index(),
9801+
0, dd_partitions[i]->number());
97889802

97899803
dberr_t err =
97909804
adapter->add_scan(trx, config, [=](const Parallel_reader::Ctx *ctx) {

storage/innobase/include/row0pread-adapter.h

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/*****************************************************************************
22
3-
Copyright (c) 2018, 2019, Oracle and/or its affiliates. All Rights Reserved.
3+
Copyright (c) 2018, 2020, Oracle and/or its affiliates. All Rights Reserved.
44
55
This program is free software; you can redistribute it and/or modify it under
66
the terms of the GNU General Public License, version 2.0, as published by the
@@ -95,15 +95,18 @@ class Parallel_reader_adapter {
9595
dberr_t init(size_t thread_id) MY_ATTRIBUTE((warn_unused_result));
9696

9797
/** For pushing any left over rows to the caller.
98-
@param[in] thread_id ID of the thread.
98+
@param[in] ctx Parallel read context.
99+
@param[in] thread_id ID of the thread.
99100
@return DB_SUCCESS or error code. */
100-
dberr_t end(size_t thread_id) MY_ATTRIBUTE((warn_unused_result));
101+
dberr_t end(Parallel_reader::Ctx *ctx, size_t thread_id)
102+
MY_ATTRIBUTE((warn_unused_result));
101103

102104
/** Send a batch of records.
103-
@param[in] thread_id ID of the thread.
104-
@param[in] n_recs Number of records to send.
105+
@param[in] ctx Parallel read context.
106+
@param[in] thread_id ID of the thread.
107+
@param[in] n_recs Number of records to send.
105108
@return DB_SUCCESS or error code. */
106-
dberr_t send_batch(size_t thread_id, uint64_t n_recs)
109+
dberr_t send_batch(const Parallel_reader::Ctx *ctx, uint64_t n_recs)
107110
MY_ATTRIBUTE((warn_unused_result));
108111

109112
/** Get the number of rows buffered but not sent.

storage/innobase/include/row0pread.h

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ class Parallel_reader {
116116
using Start = std::function<dberr_t(size_t thread_id)>;
117117

118118
/** Callback to finalise callers state. */
119-
using Finish = std::function<dberr_t(size_t thread_id)>;
119+
using Finish =
120+
std::function<dberr_t(Parallel_reader::Ctx *ctx, size_t thread_id)>;
120121

121122
/** Callback to process the rows. */
122123
using F = std::function<dberr_t(const Ctx *)>;
@@ -150,18 +151,23 @@ class Parallel_reader {
150151
/** Scan (Scan_ctx) configuration. */
151152
struct Config {
152153
/** Constructor.
153-
@param[in] scan_range Range to scan.
154-
@param[in] index Cluster index to scan.
155-
@param[in] read_level Btree level from which records need to be read.
156-
@param[in] read_ahead If true then start read ahead threads.
154+
@param[in] scan_range Range to scan.
155+
@param[in] index Cluster index to scan.
156+
@param[in] read_level Btree level from which records need to be read.
157+
@param[in] partition_id Partition id if it the index to be scanned belongs
158+
to a partitioned table.
159+
@param[in] read_ahead If true then start read ahead threads.
157160
*/
158161
Config(const Scan_range &scan_range, dict_index_t *index,
159-
size_t read_level = 0, bool read_ahead = false)
162+
size_t read_level = 0,
163+
size_t partition_id = std::numeric_limits<uint64_t>::max(),
164+
bool read_ahead = false)
160165
: m_scan_range(scan_range),
161166
m_index(index),
162167
m_is_compact(dict_table_is_comp(index->table)),
163168
m_page_size(dict_tf_to_fsp_flags(index->table->flags)),
164169
m_read_level(read_level),
170+
m_partition_id(partition_id),
165171
m_read_ahead(read_ahead) {}
166172

167173
/** Copy constructor.
@@ -172,6 +178,7 @@ class Parallel_reader {
172178
m_is_compact(config.m_is_compact),
173179
m_page_size(config.m_page_size),
174180
m_read_level(config.m_read_level),
181+
m_partition_id(config.m_partition_id),
175182
m_read_ahead(config.m_read_ahead) {}
176183

177184
/** Range to scan. */
@@ -189,6 +196,10 @@ class Parallel_reader {
189196
/** Btree level from which records need to be read. */
190197
size_t m_read_level{0};
191198

199+
/** Partition id if the index to be scanned belongs to a partitioned table,
200+
else std::numeric_limits<uint64_t>::max(). */
201+
size_t m_partition_id{std::numeric_limits<uint64_t>::max()};
202+
192203
/** if true then enable separate read ahead threads. */
193204
bool m_read_ahead{false};
194205
};
@@ -697,6 +708,13 @@ class Parallel_reader::Ctx {
697708
return (m_scan_ctx->m_config.m_index);
698709
}
699710

711+
/** @return the partition id of the index.
712+
@note this is std::numeric_limits<uint64_t>::max() if the index does not
713+
belong to a partition. */
714+
size_t partition_id() const MY_ATTRIBUTE((warn_unused_result)) {
715+
return m_scan_ctx->m_config.m_partition_id;
716+
}
717+
700718
private:
701719
/** Traverse the pages by key order.
702720
@return DB_SUCCESS or error code. */

storage/innobase/row/row0pread-adapter.cc

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@ void Parallel_reader_adapter::set(row_prebuilt_t *prebuilt) {
9696
return (init(thread_id));
9797
});
9898

99-
m_parallel_reader.set_finish_callback([=](size_t thread_id) {
100-
return (end(thread_id));
99+
m_parallel_reader.set_finish_callback([=](Parallel_reader::Ctx *ctx, size_t thread_id) {
100+
return end(ctx, thread_id);
101101
});
102102
// clang-format on
103103

@@ -115,10 +115,13 @@ dberr_t Parallel_reader_adapter::run(void **thread_contexts, Init_fn init_fn,
115115
return (m_parallel_reader.run());
116116
}
117117

118-
dberr_t Parallel_reader_adapter::send_batch(size_t thread_id, uint64_t n_recs) {
118+
dberr_t Parallel_reader_adapter::send_batch(const Parallel_reader::Ctx *ctx,
119+
uint64_t n_recs) {
119120
ut_a(n_recs <= m_batch_size);
120121

121122
dberr_t err{DB_SUCCESS};
123+
auto thread_id = ctx->m_thread_id;
124+
size_t id = ctx->partition_id();
122125

123126
/* Push the row buffer to the caller if we have filled the buffer with
124127
ADAPTER_SEND_NUM_RECS number of records or it's a start of a new range. */
@@ -131,7 +134,7 @@ dberr_t Parallel_reader_adapter::send_batch(size_t thread_id, uint64_t n_recs) {
131134

132135
const auto p = &buffer[start * m_mysql_row.m_max_len];
133136

134-
if (m_load_fn(m_thread_contexts[thread_id], static_cast<uint>(n_recs), p)) {
137+
if (m_load_fn(m_thread_contexts[thread_id], n_recs, p, id)) {
135138
err = DB_INTERRUPTED;
136139
m_parallel_reader.set_error_state(DB_INTERRUPTED);
137140
}
@@ -175,7 +178,7 @@ dberr_t Parallel_reader_adapter::process_rows(const Parallel_reader::Ctx *ctx) {
175178

176179
/* Start of a new range, send what we have buffered. */
177180
if (ctx->m_start && n_pending > 0) {
178-
err = send_batch(thread_id, n_pending);
181+
err = send_batch(ctx, n_pending);
179182

180183
if (err != DB_SUCCESS) {
181184
if (heap != nullptr) {
@@ -203,7 +206,7 @@ dberr_t Parallel_reader_adapter::process_rows(const Parallel_reader::Ctx *ctx) {
203206
originated from RAPID threads. */
204207
err = DB_ERROR;
205208
} else if (is_buffer_full(thread_id)) {
206-
err = send_batch(thread_id, pending(thread_id));
209+
err = send_batch(ctx, pending(thread_id));
207210
}
208211
} else {
209212
err = DB_ERROR;
@@ -216,19 +219,20 @@ dberr_t Parallel_reader_adapter::process_rows(const Parallel_reader::Ctx *ctx) {
216219
return (err);
217220
}
218221

219-
dberr_t Parallel_reader_adapter::end(size_t thread_id) {
222+
dberr_t Parallel_reader_adapter::end(Parallel_reader::Ctx *ctx,
223+
size_t thread_id) {
220224
ut_a(Counter::get(m_n_sent, thread_id) <= Counter::get(m_n_read, thread_id));
221225

222226
ut_a((Counter::get(m_n_read, thread_id) -
223227
Counter::get(m_n_sent, thread_id)) <= m_batch_size);
224228

225229
dberr_t err{DB_SUCCESS};
226230

227-
if (!m_parallel_reader.is_error_set()) {
231+
if (!m_parallel_reader.is_error_set() && ctx != nullptr) {
228232
/* It's possible that we might not have sent the records in the buffer
229233
when we have reached the end of records and the buffer is not full.
230234
Send them now. */
231-
err = (pending(thread_id) != 0) ? send_batch(thread_id, pending(thread_id))
235+
err = (pending(thread_id) != 0) ? send_batch(ctx, pending(thread_id))
232236
: DB_SUCCESS;
233237
}
234238

storage/innobase/row/row0pread-histogram.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/*****************************************************************************
22
3-
Copyright (c) 2020, Oracle and/or its affiliates. All Rights Reserved.
3+
Copyright (c) 2019, 2020 Oracle and/or its affiliates. All Rights Reserved.
44
55
This program is free software; you can redistribute it and/or modify it under
66
the terms of the GNU General Public License, version 2.0, as published by the
@@ -62,7 +62,8 @@ Histogram_sampler::Histogram_sampler(size_t max_threads, int sampling_seed,
6262

6363
m_n_sampled = 0;
6464

65-
m_parallel_reader.set_finish_callback([&](size_t thread_id) {
65+
m_parallel_reader.set_finish_callback([&](Parallel_reader::Ctx *ctx,
66+
size_t thread_id) {
6667
DBUG_PRINT("histogram_sampler_buffering_print", ("-> Buffering complete."));
6768

6869
DBUG_LOG("histogram_sampler_buffering_print",

storage/innobase/row/row0pread.cc

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,8 @@ void Parallel_reader::worker(size_t thread_id) {
636636
constexpr auto FOREVER = OS_SYNC_INFINITE_TIME;
637637
os_event_wait_time_low(m_event, FOREVER, m_sig_count);
638638

639+
std::shared_ptr<Parallel_reader::Ctx> last_ctx{nullptr};
640+
639641
for (;;) {
640642
size_t n_completed = 0;
641643
int64_t sig_count = os_event_reset(m_event);
@@ -664,6 +666,7 @@ void Parallel_reader::worker(size_t thread_id) {
664666
}
665667

666668
++n_completed;
669+
last_ctx = std::move(ctx);
667670
}
668671

669672
if (err != DB_SUCCESS || is_error_set()) {
@@ -696,9 +699,12 @@ void Parallel_reader::worker(size_t thread_id) {
696699
}
697700

698701
if (m_finish_callback) {
699-
dberr_t finish_err = m_finish_callback(thread_id);
702+
dberr_t finish_err = m_finish_callback(last_ctx.get(), thread_id);
703+
700704
/* Keep the err status from previous failed operations */
701-
if (unlikely(finish_err != DB_SUCCESS)) err = finish_err;
705+
if (finish_err != DB_SUCCESS) {
706+
err = finish_err;
707+
}
702708
}
703709

704710
ut_a(err != DB_SUCCESS || is_error_set() ||

0 commit comments

Comments
 (0)