Skip to content

Commit bfdb4a2

Browse files
committed
初步完成基础线程池的实现 #12
1 parent bced685 commit bfdb4a2

File tree

1 file changed

+94
-16
lines changed

1 file changed

+94
-16
lines changed

md/详细分析/04线程池.md

Lines changed: 94 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -267,34 +267,112 @@ threadPool->start([=]{
267267
268268
```cpp
269269
class ThreadPool {
270-
std::mutex mutex;
271-
std::condition_variable cv;
272-
std::atomic<bool> stop;
273-
std::atomic<std::size_t> thread_num;
274-
std::queue<Task> tasks;
275-
std::vector<std::thread> pool;
270+
std::mutex mutex_;
271+
std::condition_variable cv_;
272+
std::atomic<bool> stop_;
273+
std::atomic<std::size_t> num_threads_;
274+
std::queue<Task> tasks_;
275+
std::vector<std::thread> pool_;
276276
};
277277
```
278278

279-
1. **`std::mutex mutex`**
280-
279+
1. **`std::mutex mutex_`**
281280
- 用于保护共享资源(如任务队列)在多线程环境中的访问,避免数据竞争。
282281

283-
2. **`std::condition_variable cv`**
284-
282+
2. **`std::condition_variable cv_`**
285283
- 用于线程间的同步,允许线程等待特定条件(如新任务加入队列)并在条件满足时唤醒线程。
286284

287-
3. **`std::atomic<bool> stop`**
285+
3. **`std::atomic<bool> stop_`**
288286
- 用于指示线程池是否停止接收新任务,并安全地通知所有工作线程退出。
289287

290-
4. **`std::atomic<std::size_t> thread_num`**
291-
288+
4. **`std::atomic<std::size_t> num_threads_`**
292289
- 表示线程池中的线程数量。
293290

294-
5. **`std::queue<Task> tasks`**
295-
291+
5. **`std::queue<Task> tasks_`**
296292
- 任务队列,存储等待执行的任务,任务按提交顺序执行。
297293

298-
6. **`std::vector<std::thread> pool`**
294+
6. **`std::vector<std::thread> pool_`**
299295

300296
- 线程容器,存储管理线程对象,每个线程从任务队列中获取任务并执行。
297+
298+
那么再直接提供构造函数以及添加任务到线程池的接口,然后内部再进行一点小小的操作,也就完成了:
299+
300+
```cpp
301+
inline std::size_t default_thread_pool_size()noexcept {
302+
std::size_t num_threads = std::thread::hardware_concurrency() * 2;
303+
num_threads = num_threads == 0 ? 2 : num_threads;
304+
return num_threads;
305+
}
306+
307+
class ThreadPool {
308+
public:
309+
using Task = std::packaged_task<void()>;
310+
311+
ThreadPool(const ThreadPool&) = delete;
312+
ThreadPool& operator=(const ThreadPool&) = delete;
313+
314+
ThreadPool(std::size_t num_thread = default_thread_pool_size())
315+
: stop_{ false }, num_threads_{ num_thread } {
316+
start();
317+
}
318+
~ThreadPool(){
319+
stop();
320+
}
321+
322+
template<typename F, typename... Args>
323+
std::future<std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>> submit(F&& f, Args&&...args){
324+
using RetType = std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>;
325+
if (stop_.load()){
326+
throw std::runtime_error("ThreadPool is stopped");
327+
}
328+
329+
auto task = std::make_shared<std::packaged_task<RetType()>>(
330+
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
331+
std::future<RetType> ret = task->get_future();
332+
333+
{
334+
std::lock_guard<std::mutex> lc{ mutex_ };
335+
tasks_.emplace([task] {(*task)(); });
336+
}
337+
cv_.notify_one();
338+
return ret;
339+
}
340+
341+
void stop(){
342+
stop_.store(true);
343+
cv_.notify_all();
344+
for (auto& thread : pool_){
345+
if (thread.joinable()) {
346+
thread.join();
347+
}
348+
}
349+
}
350+
351+
void start(){
352+
for (std::size_t i = 0; i < num_threads_; ++i){
353+
pool_.emplace_back([this] {
354+
while (!stop_) {
355+
Task task;
356+
{
357+
std::unique_lock<std::mutex> lc{ mutex_ };
358+
cv_.wait(lc, [this] {return stop_ || !tasks_.empty(); });
359+
if (tasks_.empty())
360+
return;
361+
task = std::move(tasks_.front());
362+
tasks_.pop();
363+
}
364+
task();
365+
}
366+
});
367+
}
368+
}
369+
370+
private:
371+
std::mutex mutex_;
372+
std::condition_variable cv_;
373+
std::atomic<bool> stop_ ;
374+
std::atomic<std::size_t> num_threads_;
375+
std::queue<Task> tasks_;
376+
std::vector<std::thread> pool_;
377+
};
378+
```

0 commit comments

Comments
 (0)