Skip to content

Introduce ThreadPool and use it for quicksort #201

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 23 additions & 18 deletions src/avx512-16bit-qsort.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -548,25 +548,35 @@ avx512_qsort_fp16_helper(uint16_t *arr, arrsize_t arrsize)
using T = uint16_t;
using vtype = zmm_vector<float16>;

#ifdef XSS_COMPILE_OPENMP
#ifdef XSS_BUILD_WITH_STD_THREADS
bool use_parallel = arrsize > 100000;
#else
bool use_parallel = false;
#endif

if (use_parallel) {
// This thread limit was determined experimentally; it may be better for it to be the number of physical cores on the system
#ifdef XSS_BUILD_WITH_STD_THREADS

// This thread limit was determined experimentally
constexpr int thread_limit = 8;
int thread_count = std::min(thread_limit, omp_get_max_threads());
int thread_count = std::min(thread_limit,
(int)std::thread::hardware_concurrency());
Comment on lines 561 to +563
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is 8 threads in a system with a lot of cores still better? Maybe you need to partition the data better to avoid locality problems.

arrsize_t task_threshold = std::max((arrsize_t)100000, arrsize / 100);

// We use omp parallel and then omp single to setup the threads that will run the omp task calls in qsort_
// The omp single prevents multiple threads from running the initial qsort_ simultaneously and causing problems
// Note that we do not use the if(...) clause built into OpenMP, because it causes a performance regression for small arrays
#pragma omp parallel num_threads(thread_count)
#pragma omp single
qsort_<vtype, comparator, T>(arr,
0,
arrsize - 1,
2 * (arrsize_t)log2(arrsize),
task_threshold);
// Create a thread pool
ThreadPool pool(thread_count);

// Initial sort task
qsort_threads<vtype, comparator, T>(arr,
0,
arrsize - 1,
2 * (arrsize_t)log2(arrsize),
task_threshold,
pool);

// Wait for all tasks to complete
pool.wait_all();
#endif
}
else {
qsort_<vtype, comparator, T>(arr,
Expand All @@ -575,11 +585,6 @@ avx512_qsort_fp16_helper(uint16_t *arr, arrsize_t arrsize)
2 * (arrsize_t)log2(arrsize),
std::numeric_limits<arrsize_t>::max());
}
#pragma omp taskwait
#else
qsort_<vtype, comparator, T>(
arr, 0, arrsize - 1, 2 * (arrsize_t)log2(arrsize), 0);
#endif
}

[[maybe_unused]] X86_SIMD_SORT_INLINE void
Expand Down
171 changes: 126 additions & 45 deletions src/xss-common-qsort.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
#ifndef XSS_COMMON_QSORT
#define XSS_COMMON_QSORT

#ifdef XSS_BUILD_WITH_STD_THREADS
#include "xss-thread-pool.hpp"
#endif

/*
* Quicksort using AVX-512. The ideas and code are based on these two research
* papers [1] and [2]. On a high level, the idea is to vectorize quicksort
Expand Down Expand Up @@ -533,8 +537,61 @@ static void qsort_(type_t *arr,
arrsize_t max_iters,
arrsize_t task_threshold)
{
UNUSED(task_threshold);
/*
* Resort to std::sort if quicksort isnt making any progress
* Resort to std::sort if quicksort isn't making any progress
*/
if (max_iters <= 0) {
std::sort(arr + left, arr + right + 1, comparator::STDSortComparator);
return;
}
/*
* Base case: use bitonic networks to sort arrays <= vtype::network_sort_threshold
*/
if (right + 1 - left <= vtype::network_sort_threshold) {
sort_n<vtype, comparator, vtype::network_sort_threshold>(
arr + left, (int32_t)(right + 1 - left));
return;
}

auto pivot_result
= get_pivot_smart<vtype, comparator, type_t>(arr, left, right);
type_t pivot = pivot_result.pivot;

if (pivot_result.result == pivot_result_t::Sorted) { return; }

type_t smallest = vtype::type_max();
type_t biggest = vtype::type_min();

arrsize_t pivot_index = partition_unrolled<vtype,
comparator,
vtype::partition_unroll_factor>(
arr, left, right + 1, pivot, &smallest, &biggest);

if (pivot_result.result == pivot_result_t::Only2Values) { return; }

type_t leftmostValue = comparator::leftmost(smallest, biggest);
type_t rightmostValue = comparator::rightmost(smallest, biggest);

// Sequential recursion
if (pivot != leftmostValue)
qsort_<vtype, comparator>(arr, left, pivot_index - 1, max_iters - 1, 0);
if (pivot != rightmostValue)
qsort_<vtype, comparator>(arr, pivot_index, right, max_iters - 1, 0);
}

// Template function for std::thread-based parallel quicksort implementation
#ifdef XSS_BUILD_WITH_STD_THREADS
template <typename vtype, typename comparator, typename type_t>
static void qsort_threads(type_t *arr,
arrsize_t left,
arrsize_t right,
arrsize_t max_iters,
arrsize_t task_threshold,
ThreadPool &thread_pool)
{
/*
* Resort to std::sort if quicksort isn't making any progress
*/
if (max_iters <= 0) {
std::sort(arr + left, arr + right + 1, comparator::STDSortComparator);
Expand Down Expand Up @@ -568,41 +625,65 @@ static void qsort_(type_t *arr,
type_t leftmostValue = comparator::leftmost(smallest, biggest);
type_t rightmostValue = comparator::rightmost(smallest, biggest);

#ifdef XSS_COMPILE_OPENMP
// Process left partition
if (pivot != leftmostValue) {
bool parallel_left = (pivot_index - left) > task_threshold;
if (parallel_left) {
#pragma omp task
qsort_<vtype, comparator>(
arr, left, pivot_index - 1, max_iters - 1, task_threshold);
submit_task(thread_pool,
[arr,
left,
pivot_index,
max_iters,
task_threshold,
&thread_pool]() {
qsort_threads<vtype, comparator>(arr,
left,
pivot_index - 1,
max_iters - 1,
task_threshold,
thread_pool);
});
}
else {
qsort_<vtype, comparator>(
arr, left, pivot_index - 1, max_iters - 1, task_threshold);
qsort_threads<vtype, comparator>(arr,
left,
pivot_index - 1,
max_iters - 1,
task_threshold,
thread_pool);
}
}

// Process right partition
if (pivot != rightmostValue) {
bool parallel_right = (right - pivot_index) > task_threshold;

if (parallel_right) {
#pragma omp task
qsort_<vtype, comparator>(
arr, pivot_index, right, max_iters - 1, task_threshold);
submit_task(thread_pool,
[arr,
pivot_index,
right,
max_iters,
task_threshold,
&thread_pool]() {
qsort_threads<vtype, comparator>(arr,
pivot_index,
right,
max_iters - 1,
task_threshold,
thread_pool);
});
}
else {
qsort_<vtype, comparator>(
arr, pivot_index, right, max_iters - 1, task_threshold);
qsort_threads<vtype, comparator>(arr,
pivot_index,
right,
max_iters - 1,
task_threshold,
thread_pool);
}
}
#else
UNUSED(task_threshold);

if (pivot != leftmostValue)
qsort_<vtype, comparator>(arr, left, pivot_index - 1, max_iters - 1, 0);
if (pivot != rightmostValue)
qsort_<vtype, comparator>(arr, pivot_index, right, max_iters - 1, 0);
#endif
}
#endif // XSS_BUILD_WITH_STD_THREADS

template <typename vtype, typename comparator, typename type_t>
X86_SIMD_SORT_INLINE void qselect_(type_t *arr,
Expand Down Expand Up @@ -667,40 +748,40 @@ X86_SIMD_SORT_INLINE void xss_qsort(T *arr, arrsize_t arrsize, bool hasnan)

UNUSED(hasnan);

#ifdef XSS_COMPILE_OPENMP

#ifdef XSS_BUILD_WITH_STD_THREADS
bool use_parallel = arrsize > 100000;
#else
bool use_parallel = false;
#endif

if (use_parallel) {
// This thread limit was determined experimentally; it may be better for it to be the number of physical cores on the system
#ifdef XSS_BUILD_WITH_STD_THREADS
// This thread limit was determined experimentally
constexpr int thread_limit = 8;
int thread_count = std::min(thread_limit, omp_get_max_threads());
int thread_count = std::min(
thread_limit, (int)std::thread::hardware_concurrency());
arrsize_t task_threshold
= std::max((arrsize_t)100000, arrsize / 100);

// We use omp parallel and then omp single to setup the threads that will run the omp task calls in qsort_
// The omp single prevents multiple threads from running the initial qsort_ simultaneously and causing problems
// Note that we do not use the if(...) clause built into OpenMP, because it causes a performance regression for small arrays
#pragma omp parallel num_threads(thread_count)
#pragma omp single
qsort_<vtype, comparator, T>(arr,
0,
arrsize - 1,
2 * (arrsize_t)log2(arrsize),
task_threshold);
#pragma omp taskwait
// Create a thread pool
ThreadPool pool(thread_count);

// Initial sort task
qsort_threads<vtype, comparator, T>(arr,
0,
arrsize - 1,
2 * (arrsize_t)log2(arrsize),
task_threshold,
pool);
// Wait for all tasks to complete
pool.wait_all();
#endif
}
else {
qsort_<vtype, comparator, T>(arr,
0,
arrsize - 1,
2 * (arrsize_t)log2(arrsize),
std::numeric_limits<arrsize_t>::max());
// For small arrays, just use the sequential version
qsort_<vtype, comparator, T>(
arr, 0, arrsize - 1, 2 * (arrsize_t)log2(arrsize), 0);
}
#else
qsort_<vtype, comparator, T>(
arr, 0, arrsize - 1, 2 * (arrsize_t)log2(arrsize), 0);
#endif

replace_inf_with_nan(arr, arrsize, nan_count, descending);
}
Expand Down
Loading