diff --git a/.github/workflows/c-cpp.yml b/.github/workflows/c-cpp.yml index 73f0a0a..2686e5e 100644 --- a/.github/workflows/c-cpp.yml +++ b/.github/workflows/c-cpp.yml @@ -165,7 +165,7 @@ jobs: CXX: clang++-18 run: | make clean - meson setup -Dbuild_tests=true -Duse_openmp=true -Db_sanitize=address,undefined -Dfatal_sanitizers=true -Dasan_ci_dont_validate=true -Db_lundef=false --warnlevel 0 --buildtype release builddir + meson setup -Dbuild_tests=true -Duse_openmp=true -Duse_stdthreads=true -Db_sanitize=address,undefined -Dfatal_sanitizers=true -Dasan_ci_dont_validate=true -Db_lundef=false --warnlevel 0 --buildtype release builddir cd builddir ninja @@ -202,7 +202,7 @@ jobs: CXX: clang++-18 run: | make clean - meson setup -Dbuild_tests=true -Duse_openmp=true -Db_sanitize=address,undefined -Dfatal_sanitizers=true -Dasan_ci_dont_validate=true -Db_lundef=false --warnlevel 0 --buildtype release builddir + meson setup -Dbuild_tests=true -Duse_openmp=true -Duse_stdthreads=true -Db_sanitize=address,undefined -Dfatal_sanitizers=true -Dasan_ci_dont_validate=true -Db_lundef=false --warnlevel 0 --buildtype release builddir cd builddir ninja @@ -235,7 +235,7 @@ jobs: CXX: g++-10 run: | make clean - meson setup -Dbuild_tests=true -Duse_openmp=true --warnlevel 2 --werror --buildtype release builddir + meson setup -Dbuild_tests=true -Duse_openmp=true -Duse_stdthreads=true --warnlevel 2 --werror --buildtype release builddir cd builddir ninja diff --git a/Makefile b/Makefile index c51d658..65e6254 100644 --- a/Makefile +++ b/Makefile @@ -6,6 +6,10 @@ test_openmp: meson setup -Dbuild_tests=true -Duse_openmp=true --warnlevel 2 --werror --buildtype release builddir cd builddir && ninja +test_stdthreads: + meson setup -Dbuild_tests=true -Duse_stdthreads=true --warnlevel 2 --werror --buildtype release builddir + cd builddir && ninja + test_asan: meson setup -Dbuild_tests=true -Duse_openmp=true -Db_sanitize=address,undefined -Dfatal_sanitizers=true -Db_lundef=false -Dasan_ci_dont_validate=true --warnlevel 0 --buildtype debugoptimized builddir cd builddir && ninja diff --git a/lib/meson.build b/lib/meson.build index 48046b3..ded5134 100644 --- a/lib/meson.build +++ b/lib/meson.build @@ -1,18 +1,13 @@ libtargets = [] -# Add compile flags for OpenMP if enabled -openmpflags = [] -if get_option('use_openmp') - openmpflags = ['-DXSS_USE_OPENMP=true', '-fopenmp'] -endif - if cpp.has_argument('-march=haswell') libtargets += static_library('libavx', files( 'x86simdsort-avx2.cpp', ), include_directories : [src], - cpp_args : ['-march=haswell', openmpflags], + cpp_args : ['-march=haswell', stdthreadsflag], + dependencies: [omp_dep], gnu_symbol_visibility : 'inlineshidden', ) endif @@ -23,7 +18,8 @@ if cpp.has_argument('-march=skylake-avx512') 'x86simdsort-skx.cpp', ), include_directories : [src], - cpp_args : ['-march=skylake-avx512', openmpflags], + cpp_args : ['-march=skylake-avx512', stdthreadsflag], + dependencies: [omp_dep], gnu_symbol_visibility : 'inlineshidden', ) endif @@ -34,7 +30,8 @@ if cpp.has_argument('-march=icelake-client') 'x86simdsort-icl.cpp', ), include_directories : [src], - cpp_args : ['-march=icelake-client', openmpflags], + cpp_args : ['-march=icelake-client', stdthreadsflag], + dependencies: [omp_dep], gnu_symbol_visibility : 'inlineshidden', ) endif @@ -45,7 +42,8 @@ if cancompilefp16 'x86simdsort-spr.cpp', ), include_directories : [src], - cpp_args : ['-march=sapphirerapids', openmpflags], + cpp_args : ['-march=sapphirerapids', stdthreadsflag], + dependencies: [omp_dep], gnu_symbol_visibility : 'inlineshidden', ) endif diff --git a/meson.build b/meson.build index c796a0a..ee822ef 100644 --- a/meson.build +++ b/meson.build @@ -29,6 +29,20 @@ if get_option('build_vqsortbench') benchvq = true endif +# build with openmp +omp = [] +omp_dep = [] +if get_option('use_openmp') + omp = dependency('openmp', required : true) + omp_dep = declare_dependency(dependencies: omp, compile_args: ['-DXSS_USE_OPENMP']) +endif + +# build with std::threads +stdthreadsflag = [] +if get_option('use_stdthreads') + stdthreadsflag += ['-DXSS_BUILD_WITH_STD_THREADS'] +endif + fp16code = '''#include int main() { __m512h temp = _mm512_set1_ph(1.0f); @@ -43,8 +57,8 @@ if get_option('lib_type') == 'shared' libsimdsort = shared_library('x86simdsortcpp', 'lib/x86simdsort.cpp', include_directories : [src, utils, lib], - link_args : [openmpflags], link_with : [libtargets], + dependencies: [omp], gnu_symbol_visibility : 'inlineshidden', install : true, soversion : 1, @@ -53,7 +67,7 @@ else libsimdsort = static_library('x86simdsortcpp', 'lib/x86simdsort.cpp', include_directories : [src, utils, lib], - link_args : [openmpflags], + dependencies: [omp], link_with : [libtargets], gnu_symbol_visibility : 'inlineshidden', install : true, diff --git a/meson_options.txt b/meson_options.txt index 6edeb4e..43feadd 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -8,6 +8,8 @@ option('build_vqsortbench', type : 'boolean', value : true, description : 'Add google vqsort to benchmarks (default: "true").') option('use_openmp', type : 'boolean', value : false, description : 'Use OpenMP to accelerate key-value sort (default: "false").') +option('use_stdthreads', type : 'boolean', value : false, + description : 'Use std::threads to accelerate qsort (default: "false").') option('lib_type', type : 'string', value : 'shared', description : 'Library type: shared or static (default: "shared").') option('fatal_sanitizers', type : 'boolean', value : 'false', diff --git a/scripts/bench-compare.sh b/scripts/bench-compare.sh index a224acd..d25a350 100755 --- a/scripts/bench-compare.sh +++ b/scripts/bench-compare.sh @@ -11,7 +11,7 @@ if [ ! -d .bench/google-benchmark ]; then fi compare=$(realpath .bench/google-benchmark/tools/compare.py) -meson setup -Dbuild_benchmarks=true -Dbuild_ippbench=true --warnlevel 0 --buildtype release builddir-${branch} +meson setup -Dbuild_benchmarks=true -Duse_stdthreads=true -Duse_openmp=true --warnlevel 0 --buildtype release builddir-${branch} cd builddir-${branch} ninja $compare filters ./benchexe $1 $2 --benchmark_repetitions=$3 diff --git a/scripts/branch-compare.sh b/scripts/branch-compare.sh index 0d5057f..90fa99c 100755 --- a/scripts/branch-compare.sh +++ b/scripts/branch-compare.sh @@ -27,7 +27,7 @@ build_branch() { fi fi cd $dir_name - meson setup -Dbuild_benchmarks=true -Duse_openmp=true --warnlevel 0 --buildtype release builddir + meson setup -Dbuild_benchmarks=true -Duse_openmp=true -Duse_stdthreads=true --warnlevel 0 --buildtype release builddir cd builddir ninja cd ../../ diff --git a/src/avx512-16bit-qsort.hpp b/src/avx512-16bit-qsort.hpp index 6dbe24d..d428d0a 100644 --- a/src/avx512-16bit-qsort.hpp +++ b/src/avx512-16bit-qsort.hpp @@ -548,25 +548,35 @@ avx512_qsort_fp16_helper(uint16_t *arr, arrsize_t arrsize) using T = uint16_t; using vtype = zmm_vector; -#ifdef XSS_COMPILE_OPENMP +#ifdef XSS_BUILD_WITH_STD_THREADS bool use_parallel = arrsize > 100000; +#else + bool use_parallel = false; +#endif if (use_parallel) { - // This thread limit was determined experimentally; it may be better for it to be the number of physical cores on the system +#ifdef XSS_BUILD_WITH_STD_THREADS + + // This thread limit was determined experimentally constexpr int thread_limit = 8; - int thread_count = std::min(thread_limit, omp_get_max_threads()); + int thread_count = std::min(thread_limit, + (int)std::thread::hardware_concurrency()); arrsize_t task_threshold = std::max((arrsize_t)100000, arrsize / 100); - // We use omp parallel and then omp single to setup the threads that will run the omp task calls in qsort_ - // The omp single prevents multiple threads from running the initial qsort_ simultaneously and causing problems - // Note that we do not use the if(...) clause built into OpenMP, because it causes a performance regression for small arrays -#pragma omp parallel num_threads(thread_count) -#pragma omp single - qsort_(arr, - 0, - arrsize - 1, - 2 * (arrsize_t)log2(arrsize), - task_threshold); + // Create a thread pool + xss::tp::ThreadPool pool(thread_count); + + // Initial sort task + qsort_threads(arr, + 0, + arrsize - 1, + 2 * (arrsize_t)log2(arrsize), + task_threshold, + pool); + + // Wait for all tasks to complete + pool.wait_all(); +#endif } else { qsort_(arr, @@ -575,11 +585,6 @@ avx512_qsort_fp16_helper(uint16_t *arr, arrsize_t arrsize) 2 * (arrsize_t)log2(arrsize), std::numeric_limits::max()); } -#pragma omp taskwait -#else - qsort_( - arr, 0, arrsize - 1, 2 * (arrsize_t)log2(arrsize), 0); -#endif } [[maybe_unused]] X86_SIMD_SORT_INLINE void diff --git a/src/xss-common-qsort.h b/src/xss-common-qsort.h index cf4a34a..7c1a889 100644 --- a/src/xss-common-qsort.h +++ b/src/xss-common-qsort.h @@ -11,6 +11,10 @@ #ifndef XSS_COMMON_QSORT #define XSS_COMMON_QSORT +#ifdef XSS_BUILD_WITH_STD_THREADS +#include "xss-thread-pool.hpp" +#endif + /* * Quicksort using AVX-512. The ideas and code are based on these two research * papers [1] and [2]. On a high level, the idea is to vectorize quicksort @@ -533,8 +537,9 @@ static void qsort_(type_t *arr, arrsize_t max_iters, arrsize_t task_threshold) { + UNUSED(task_threshold); /* - * Resort to std::sort if quicksort isnt making any progress + * Resort to std::sort if quicksort isn't making any progress */ if (max_iters <= 0) { std::sort(arr + left, arr + right + 1, comparator::STDSortComparator); @@ -568,41 +573,119 @@ static void qsort_(type_t *arr, type_t leftmostValue = comparator::leftmost(smallest, biggest); type_t rightmostValue = comparator::rightmost(smallest, biggest); -#ifdef XSS_COMPILE_OPENMP + // Sequential recursion + if (pivot != leftmostValue) + qsort_(arr, left, pivot_index - 1, max_iters - 1, 0); + if (pivot != rightmostValue) + qsort_(arr, pivot_index, right, max_iters - 1, 0); +} + +// Template function for std::thread-based parallel quicksort implementation +#ifdef XSS_BUILD_WITH_STD_THREADS +template +static void qsort_threads(type_t *arr, + arrsize_t left, + arrsize_t right, + arrsize_t max_iters, + arrsize_t task_threshold, + xss::tp::ThreadPool &thread_pool) +{ + /* + * Resort to std::sort if quicksort isn't making any progress + */ + if (max_iters <= 0) { + std::sort(arr + left, arr + right + 1, comparator::STDSortComparator); + return; + } + /* + * Base case: use bitonic networks to sort arrays <= vtype::network_sort_threshold + */ + if (right + 1 - left <= vtype::network_sort_threshold) { + sort_n( + arr + left, (int32_t)(right + 1 - left)); + return; + } + + auto pivot_result + = get_pivot_smart(arr, left, right); + type_t pivot = pivot_result.pivot; + + if (pivot_result.result == pivot_result_t::Sorted) { return; } + + type_t smallest = vtype::type_max(); + type_t biggest = vtype::type_min(); + + arrsize_t pivot_index = partition_unrolled( + arr, left, right + 1, pivot, &smallest, &biggest); + + if (pivot_result.result == pivot_result_t::Only2Values) { return; } + + type_t leftmostValue = comparator::leftmost(smallest, biggest); + type_t rightmostValue = comparator::rightmost(smallest, biggest); + + // Process left partition if (pivot != leftmostValue) { bool parallel_left = (pivot_index - left) > task_threshold; if (parallel_left) { -#pragma omp task - qsort_( - arr, left, pivot_index - 1, max_iters - 1, task_threshold); + xss::tp::submit_task(thread_pool, + [arr, + left, + pivot_index, + max_iters, + task_threshold, + &thread_pool]() { + qsort_threads( + arr, + left, + pivot_index - 1, + max_iters - 1, + task_threshold, + thread_pool); + }); } else { - qsort_( - arr, left, pivot_index - 1, max_iters - 1, task_threshold); + qsort_threads(arr, + left, + pivot_index - 1, + max_iters - 1, + task_threshold, + thread_pool); } } + + // Process right partition if (pivot != rightmostValue) { bool parallel_right = (right - pivot_index) > task_threshold; - if (parallel_right) { -#pragma omp task - qsort_( - arr, pivot_index, right, max_iters - 1, task_threshold); + xss::tp::submit_task(thread_pool, + [arr, + pivot_index, + right, + max_iters, + task_threshold, + &thread_pool]() { + qsort_threads( + arr, + pivot_index, + right, + max_iters - 1, + task_threshold, + thread_pool); + }); } else { - qsort_( - arr, pivot_index, right, max_iters - 1, task_threshold); + qsort_threads(arr, + pivot_index, + right, + max_iters - 1, + task_threshold, + thread_pool); } } -#else - UNUSED(task_threshold); - - if (pivot != leftmostValue) - qsort_(arr, left, pivot_index - 1, max_iters - 1, 0); - if (pivot != rightmostValue) - qsort_(arr, pivot_index, right, max_iters - 1, 0); -#endif } +#endif // XSS_BUILD_WITH_STD_THREADS template X86_SIMD_SORT_INLINE void qselect_(type_t *arr, @@ -667,40 +750,38 @@ X86_SIMD_SORT_INLINE void xss_qsort(T *arr, arrsize_t arrsize, bool hasnan) UNUSED(hasnan); -#ifdef XSS_COMPILE_OPENMP - +#ifdef XSS_BUILD_WITH_STD_THREADS bool use_parallel = arrsize > 100000; - if (use_parallel) { - // This thread limit was determined experimentally; it may be better for it to be the number of physical cores on the system + // This thread limit was determined experimentally constexpr int thread_limit = 8; - int thread_count = std::min(thread_limit, omp_get_max_threads()); + int thread_count = std::min( + thread_limit, (int)std::thread::hardware_concurrency()); arrsize_t task_threshold = std::max((arrsize_t)100000, arrsize / 100); - // We use omp parallel and then omp single to setup the threads that will run the omp task calls in qsort_ - // The omp single prevents multiple threads from running the initial qsort_ simultaneously and causing problems - // Note that we do not use the if(...) clause built into OpenMP, because it causes a performance regression for small arrays -#pragma omp parallel num_threads(thread_count) -#pragma omp single - qsort_(arr, - 0, - arrsize - 1, - 2 * (arrsize_t)log2(arrsize), - task_threshold); -#pragma omp taskwait + // Create a thread pool + xss::tp::ThreadPool pool(thread_count); + + // Initial sort task + qsort_threads(arr, + 0, + arrsize - 1, + 2 * (arrsize_t)log2(arrsize), + task_threshold, + pool); + // Wait for all tasks to complete + pool.wait_all(); } else { - qsort_(arr, - 0, - arrsize - 1, - 2 * (arrsize_t)log2(arrsize), - std::numeric_limits::max()); + // For small arrays, just use the sequential version + qsort_( + arr, 0, arrsize - 1, 2 * (arrsize_t)log2(arrsize), 0); } #else qsort_( arr, 0, arrsize - 1, 2 * (arrsize_t)log2(arrsize), 0); -#endif +#endif // XSS_BUILD_WITH_STD_THREADS replace_inf_with_nan(arr, arrsize, nan_count, descending); } diff --git a/src/xss-thread-pool.hpp b/src/xss-thread-pool.hpp new file mode 100644 index 0000000..67932ae --- /dev/null +++ b/src/xss-thread-pool.hpp @@ -0,0 +1,140 @@ +/******************************************************************* + * Copyright (C) 2025 Intel Corporation + * SPDX-License-Identifier: BSD-3-Clause + * Authors: Raghuveer Devulapalli + * ****************************************************************/ + +#ifndef XSS_THREAD_POOL +#define XSS_THREAD_POOL + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace xss { +namespace tp { + + /* + * ThreadPool class and doc: Generated by copilot + * This thread pool implementation is a simple and efficient way to manage a + * pool of threads for executing tasks concurrently. It uses a std::queue to store + * tasks and a set of worker threads which wait for tasks to be added to the + * queue. When a task is added, one of the worker threads will pick it up and + * execute it. The thread pool can be stopped gracefully, and it also provides + * a way to wait for all tasks to complete before stopping. + * */ + class ThreadPool { + private: + std::vector workers; + std::queue> tasks; + std::mutex queue_mutex; + std::condition_variable condition; // Condition variable for task queue + std::condition_variable + done_condition; // Condition variable for waiting + int active_tasks {0}; + bool stop {false}; + + public: + ThreadPool(size_t num_threads) + { + for (size_t i = 0; i < num_threads; ++i) { + // Create a worker thread and add it to the pool + // Each thread will run a lambda function that waits for tasks + workers.emplace_back([this] { + while (true) { + // Lock the queue mutex and wait for a task to be available + std::unique_lock lock(queue_mutex); + // Wait until there is a task or the pool is stopped + condition.wait(lock, [this] { + return stop || !tasks.empty(); + }); + + // Check if we need to terminate the thread + if (stop && tasks.empty()) { return; } + + // Extract the next task from the queue + auto task = std::move(tasks.front()); + tasks.pop(); + // Unlock the mutex before executing the task + lock.unlock(); + // Execute the task: + task(); + } + }); + } + } + + template + void enqueue(F &&func) + { + // Add a new task to the queue and notify one of the worker threads + std::unique_lock lock(queue_mutex); + tasks.emplace(std::forward(func)); + condition.notify_one(); + } + + ~ThreadPool() + { + // Stop the thread pool and join all threads + std::unique_lock lock(queue_mutex); + stop = true; + lock.unlock(); + condition.notify_all(); + for (std::thread &worker : workers) { + worker.join(); + } + } + + // Wait for all tasks to complete before stopping the pool + void wait_all() + { + std::unique_lock lock(queue_mutex); + done_condition.wait(lock, [this] { + return tasks.empty() && (active_tasks == 0); + }); + } + + // Track the number of active tasks + void task_start() + { + std::unique_lock lock(queue_mutex); + active_tasks++; + } + + // Decrement the active task count and notify if all tasks are done + void task_end() + { + std::unique_lock lock(queue_mutex); + active_tasks--; + if (tasks.empty() && active_tasks == 0) { + done_condition.notify_all(); + } + } + }; + + // Wrapper for submitting tasks to the thread pool with automatic tracking + template + void submit_task(ThreadPool &pool, F &&f) + { + pool.task_start(); + pool.enqueue([f = std::forward(f), &pool]() { + try { + f(); + } catch (...) { + // Ensure task_end is called even if the task throws an exception + pool.task_end(); + throw; // Re-throw the exception + } + pool.task_end(); + }); + } + +} // namespace tp +} // namespace xss + +#endif // XSS_THREAD_POOL diff --git a/tests/meson.build b/tests/meson.build index b070bcc..e1132b0 100644 --- a/tests/meson.build +++ b/tests/meson.build @@ -1,9 +1,5 @@ libtests = [] -if get_option('use_openmp') - openmpflags = ['-DXSS_USE_OPENMP=true'] -endif - # Add compile flags when needed for the ASAN CI run testargs = [] if get_option('asan_ci_dont_validate') @@ -16,21 +12,21 @@ endif libtests += static_library('tests_qsort', files('test-qsort.cpp', ), - dependencies: gtest_dep, + dependencies: [gtest_dep, omp], include_directories : [src, lib, utils], - cpp_args : [testargs, openmpflags], + cpp_args : [testargs, stdthreadsflag], ) libtests += static_library('tests_kvsort', files('test-keyvalue.cpp', ), - dependencies: gtest_dep, + dependencies: [gtest_dep, omp], include_directories : [src, lib, utils], - cpp_args : [testargs, openmpflags], + cpp_args : [testargs, stdthreadsflag], ) libtests += static_library('tests_objsort', files('test-objqsort.cpp', ), - dependencies: gtest_dep, + dependencies: [gtest_dep, omp], include_directories : [src, lib, utils], - cpp_args : [testargs, openmpflags], + cpp_args : [testargs, stdthreadsflag], ) diff --git a/tests/test-qsort.cpp b/tests/test-qsort.cpp index f2ce3a6..903f1df 100644 --- a/tests/test-qsort.cpp +++ b/tests/test-qsort.cpp @@ -12,8 +12,7 @@ class simdsort : public ::testing::Test { { std::iota(arrsize.begin(), arrsize.end(), 0); std::iota(arrsize_long.begin(), arrsize_long.end(), 0); -#ifdef XSS_USE_OPENMP - // These extended tests are only needed for the OpenMP logic +#if defined(XSS_BUILD_WITH_STD_THREADS) arrsize_long.push_back(10'000); arrsize_long.push_back(100'000); arrsize_long.push_back(1'000'000);