C++线程编程完整面试指南
目录
1. 线程基础概念
1.1 线程 vs 进程对比
特性 | 线程 | 进程 |
---|---|---|
内存空间 | 共享进程地址空间 | 独立虚拟地址空间 |
创建开销 | 小(只需分配栈空间) | 大(需要复制页表等) |
切换开销 | 小(只需切换寄存器) | 大(需要切换页表) |
通信方式 | 直接访问共享内存 | IPC(管道、共享内存等) |
数据安全 | 需要同步机制保护 | 天然隔离,安全性好 |
崩溃影响 | 整个进程崩溃 | 只影响单个进程 |
资源消耗 | 内存消耗小 | 内存消耗大 |
1.2 线程的内存模型
进程地址空间
┌─────────────┐
│ 内核空间 │
├─────────────┤
│ 线程1栈 │ ← 每个线程独有
├─────────────┤
│ 线程2栈 │ ← 每个线程独有
├─────────────┤
│ 线程3栈 │ ← 每个线程独有
├─────────────┤
│ │
│ 堆区 │ ← 所有线程共享
│ │
├─────────────┤
│ 数据段 │ ← 所有线程共享
├─────────────┤
│ 代码段 │ ← 所有线程共享
└─────────────┘
1.3 线程状态转换
┌─────────┐ 创建 ┌─────────┐
│ 不存在 │ ─────→ │ 新建 │
└─────────┘ └─────────┘
│
│ 启动
↓
┌─────────┐
┌─→ │ 就绪 │ ←─┐
│ └─────────┘ │
│ │ │
│ 时间片 │ 调度 │ I/O完成
│ 用完 ↓ │ 或唤醒
│ ┌─────────┐ │
└── │ 运行 │ ──┘
└─────────┘
│
│ 等待I/O
│ 或阻塞
↓
┌─────────┐
│ 阻塞 │
└─────────┘
│
│ join/detach
↓
┌─────────┐
│ 终止 │
└─────────┘
2. C++11标准线程库
2.1 std::thread基础用法
cpp
#include <thread>
#include <iostream>
#include <chrono>
// 普通函数作为线程函数
void thread_function(int id, const std::string& name) {
for (int i = 0; i < 5; i++) {
std::cout << "Thread " << id << " (" << name << "): " << i << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
// 类成员函数作为线程函数
class ThreadWorker {
public:
void work(int iterations) {
for (int i = 0; i < iterations; i++) {
std::cout << "Worker thread: " << i << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
}
void operator()(int iterations) {
work(iterations);
}
};
int main() {
// 1. 使用普通函数创建线程
std::thread t1(thread_function, 1, "Function Thread");
// 2. 使用Lambda表达式创建线程
std::thread t2([](int id) {
for (int i = 0; i < 3; i++) {
std::cout << "Lambda Thread " << id << ": " << i << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(150));
}
}, 2);
// 3. 使用类成员函数创建线程
ThreadWorker worker;
std::thread t3(&ThreadWorker::work, &worker, 3);
// 4. 使用函数对象创建线程
std::thread t4(worker, 4);
// 等待所有线程完成
t1.join();
t2.join();
t3.join();
t4.join();
std::cout << "All threads completed!" << std::endl;
return 0;
}
2.2 线程管理
cpp
#include <thread>
#include <iostream>
#include <vector>
void worker_task(int id) {
std::cout << "Worker " << id << " started, thread ID: "
<< std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "Worker " << id << " finished" << std::endl;
}
int main() {
std::vector<std::thread> threads;
// 创建多个线程
for (int i = 0; i < 4; i++) {
threads.emplace_back(worker_task, i);
}
// 获取硬件并发数
unsigned int num_cores = std::thread::hardware_concurrency();
std::cout << "Hardware concurrency: " << num_cores << std::endl;
// 等待所有线程完成
for (auto& t : threads) {
if (t.joinable()) {
t.join();
}
}
// 演示detach的使用
std::thread detached_thread([]() {
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Detached thread finished" << std::endl;
});
detached_thread.detach(); // 分离线程
// 主线程等待一段时间确保detached线程有机会执行
std::this_thread::sleep_for(std::chrono::seconds(2));
return 0;
}
2.3 线程异常处理
cpp
#include <thread>
#include <exception>
#include <iostream>
class ThreadRAII {
private:
std::thread& t;
public:
enum class DtorAction { join, detach };
ThreadRAII(std::thread& t_, DtorAction action_)
: t(t_), action(action_) {}
~ThreadRAII() {
if (t.joinable()) {
if (action == DtorAction::join) {
t.join();
} else {
t.detach();
}
}
}
ThreadRAII(const ThreadRAII&) = delete;
ThreadRAII& operator=(const ThreadRAII&) = delete;
private:
DtorAction action;
};
void risky_function() {
// 可能抛出异常的函数
throw std::runtime_error("Something went wrong!");
}
int main() {
try {
std::thread t([]() {
std::cout << "Background thread running..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(3));
std::cout << "Background thread finished" << std::endl;
});
// 使用RAII确保线程被正确管理
ThreadRAII raii(t, ThreadRAII::DtorAction::join);
// 执行可能抛出异常的代码
risky_function();
} catch (const std::exception& e) {
std::cout << "Exception caught: " << e.what() << std::endl;
// ThreadRAII的析构函数会自动调用join()
}
return 0;
}
3. POSIX线程(pthread)
3.1 pthread基础操作
cpp
#include <pthread.h>
#include <iostream>
#include <unistd.h>
// 线程函数
void* thread_function(void* arg) {
int thread_id = *(static_cast<int*>(arg));
std::cout << "Thread " << thread_id << " started" << std::endl;
// 模拟工作
sleep(2);
std::cout << "Thread " << thread_id << " finished" << std::endl;
// 返回值
int* result = new int(thread_id * 100);
pthread_exit(result);
}
int main() {
const int NUM_THREADS = 3;
pthread_t threads[NUM_THREADS];
int thread_ids[NUM_THREADS];
// 创建线程
for (int i = 0; i < NUM_THREADS; i++) {
thread_ids[i] = i;
int result = pthread_create(&threads[i], NULL, thread_function, &thread_ids[i]);
if (result != 0) {
std::cerr << "Error creating thread " << i << std::endl;
return 1;
}
}
// 等待线程完成并获取返回值
for (int i = 0; i < NUM_THREADS; i++) {
void* thread_result;
int result = pthread_join(threads[i], &thread_result);
if (result == 0) {
int* value = static_cast<int*>(thread_result);
std::cout << "Thread " << i << " returned: " << *value << std::endl;
delete value; // 释放动态分配的内存
}
}
std::cout << "All threads completed" << std::endl;
return 0;
}
3.2 pthread属性设置
cpp
#include <pthread.h>
#include <iostream>
void* detached_thread_function(void* arg) {
std::cout << "Detached thread running..." << std::endl;
sleep(1);
std::cout << "Detached thread finished" << std::endl;
return NULL;
}
void* stacksize_thread_function(void* arg) {
char large_array[1024 * 1024]; // 1MB数组
large_array[0] = 'A';
std::cout << "Large stack thread completed" << std::endl;
return NULL;
}
int main() {
pthread_attr_t attr;
pthread_t thread;
// 初始化线程属性
pthread_attr_init(&attr);
// 1. 设置为分离线程
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&thread, &attr, detached_thread_function, NULL);
// 分离线程不需要join
// 2. 设置栈大小
pthread_attr_t stack_attr;
pthread_attr_init(&stack_attr);
size_t stack_size = 2 * 1024 * 1024; // 2MB栈空间
pthread_attr_setstacksize(&stack_attr, stack_size);
pthread_t stack_thread;
pthread_create(&stack_thread, &stack_attr, stacksize_thread_function, NULL);
pthread_join(stack_thread, NULL);
// 3. 设置调度策略和优先级
pthread_attr_t sched_attr;
pthread_attr_init(&sched_attr);
// 设置调度策略
pthread_attr_setschedpolicy(&sched_attr, SCHED_FIFO);
// 设置优先级
struct sched_param param;
param.sched_priority = 10;
pthread_attr_setschedparam(&sched_attr, ¶m);
// 清理属性对象
pthread_attr_destroy(&attr);
pthread_attr_destroy(&stack_attr);
pthread_attr_destroy(&sched_attr);
sleep(2); // 等待分离线程完成
return 0;
}
4. 线程同步与互斥
4.1 std::mutex使用
cpp
#include <thread>
#include <mutex>
#include <iostream>
#include <vector>
class Counter {
private:
int count;
std::mutex mtx;
public:
Counter() : count(0) {}
void increment() {
std::lock_guard<std::mutex> lock(mtx);
++count;
std::cout << "Count: " << count << " (Thread: "
<< std::this_thread::get_id() << ")" << std::endl;
}
void increment_manual() {
mtx.lock();
++count;
std::cout << "Manual Count: " << count << std::endl;
mtx.unlock();
}
void increment_unique() {
std::unique_lock<std::mutex> lock(mtx);
++count;
// 可以提前释放锁
lock.unlock();
// 进行一些不需要锁保护的操作
std::this_thread::sleep_for(std::chrono::milliseconds(10));
// 可以重新获取锁
lock.lock();
std::cout << "Unique Count: " << count << std::endl;
}
int get_count() const {
std::lock_guard<std::mutex> lock(mtx);
return count;
}
};
int main() {
Counter counter;
std::vector<std::thread> threads;
// 创建多个线程同时操作计数器
for (int i = 0; i < 5; i++) {
threads.emplace_back([&counter]() {
for (int j = 0; j < 3; j++) {
counter.increment();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
});
}
// 等待所有线程完成
for (auto& t : threads) {
t.join();
}
std::cout << "Final count: " << counter.get_count() << std::endl;
return 0;
}
4.2 读写锁(shared_mutex)
cpp
#include <thread>
#include <shared_mutex>
#include <iostream>
#include <vector>
#include <string>
class ThreadSafeMap {
private:
std::map<int, std::string> data;
mutable std::shared_mutex rw_mutex;
public:
void write(int key, const std::string& value) {
std::unique_lock<std::shared_mutex> lock(rw_mutex);
data[key] = value;
std::cout << "Written: " << key << " -> " << value
<< " (Thread: " << std::this_thread::get_id() << ")" << std::endl;
}
std::string read(int key) const {
std::shared_lock<std::shared_mutex> lock(rw_mutex);
auto it = data.find(key);
if (it != data.end()) {
std::cout << "Read: " << key << " -> " << it->second
<< " (Thread: " << std::this_thread::get_id() << ")" << std::endl;
return it->second;
}
return "";
}
size_t size() const {
std::shared_lock<std::shared_mutex> lock(rw_mutex);
return data.size();
}
};
int main() {
ThreadSafeMap map;
std::vector<std::thread> threads;
// 写线程
for (int i = 0; i < 3; i++) {
threads.emplace_back([&map, i]() {
for (int j = 0; j < 3; j++) {
map.write(i * 10 + j, "value_" + std::to_string(i * 10 + j));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
});
}
// 读线程
for (int i = 0; i < 5; i++) {
threads.emplace_back([&map]() {
for (int j = 0; j < 10; j++) {
map.read(j);
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
});
}
for (auto& t : threads) {
t.join();
}
std::cout << "Final map size: " << map.size() << std::endl;
return 0;
}
4.3 条件变量(condition_variable)
cpp
#include <thread>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <iostream>
template<typename T>
class ThreadSafeQueue {
private:
std::queue<T> queue;
mutable std::mutex mtx;
std::condition_variable cv;
public:
void push(const T& item) {
std::lock_guard<std::mutex> lock(mtx);
queue.push(item);
cv.notify_one(); // 通知等待的线程
}
bool pop(T& item) {
std::unique_lock<std::mutex> lock(mtx);
if (queue.empty()) {
return false;
}
item = queue.front();
queue.pop();
return true;
}
void wait_and_pop(T& item) {
std::unique_lock<std::mutex> lock(mtx);
// 等待直到队列不为空
cv.wait(lock, [this] { return !queue.empty(); });
item = queue.front();
queue.pop();
}
bool wait_for_pop(T& item, int timeout_ms) {
std::unique_lock<std::mutex> lock(mtx);
if (cv.wait_for(lock, std::chrono::milliseconds(timeout_ms),
[this] { return !queue.empty(); })) {
item = queue.front();
queue.pop();
return true;
}
return false;
}
bool empty() const {
std::lock_guard<std::mutex> lock(mtx);
return queue.empty();
}
size_t size() const {
std::lock_guard<std::mutex> lock(mtx);
return queue.size();
}
};
int main() {
ThreadSafeQueue<int> queue;
// 生产者线程
std::thread producer([&queue]() {
for (int i = 0; i < 10; i++) {
queue.push(i);
std::cout << "Produced: " << i << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
});
// 消费者线程1
std::thread consumer1([&queue]() {
int item;
for (int i = 0; i < 5; i++) {
queue.wait_and_pop(item);
std::cout << "Consumer1 got: " << item << std::endl;
}
});
// 消费者线程2
std::thread consumer2([&queue]() {
int item;
for (int i = 0; i < 5; i++) {
if (queue.wait_for_pop(item, 200)) {
std::cout << "Consumer2 got: " << item << std::endl;
} else {
std::cout << "Consumer2 timeout" << std::endl;
}
}
});
producer.join();
consumer1.join();
consumer2.join();
return 0;
}
4.4 pthread互斥锁和条件变量
cpp
#include <pthread.h>
#include <iostream>
#include <unistd.h>
class PthreadCounter {
private:
int count;
pthread_mutex_t mutex;
pthread_cond_t condition;
public:
PthreadCounter() : count(0) {
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&condition, NULL);
}
~PthreadCounter() {
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&condition);
}
void increment() {
pthread_mutex_lock(&mutex);
count++;
std::cout << "Count incremented to: " << count << std::endl;
pthread_cond_signal(&condition); // 通知等待的线程
pthread_mutex_unlock(&mutex);
}
void wait_for_count(int target) {
pthread_mutex_lock(&mutex);
while (count < target) {
std::cout << "Waiting for count to reach " << target
<< " (current: " << count << ")" << std::endl;
pthread_cond_wait(&condition, &mutex);
}
std::cout << "Count reached " << target << "!" << std::endl;
pthread_mutex_unlock(&mutex);
}
int get_count() {
pthread_mutex_lock(&mutex);
int current = count;
pthread_mutex_unlock(&mutex);
return current;
}
};
// 包装器用于传递参数给pthread
struct ThreadData {
PthreadCounter* counter;
int target;
};
void* incrementer_thread(void* arg) {
PthreadCounter* counter = static_cast<PthreadCounter*>(arg);
for (int i = 0; i < 5; i++) {
sleep(1);
counter->increment();
}
return NULL;
}
void* waiter_thread(void* arg) {
ThreadData* data = static_cast<ThreadData*>(arg);
data->counter->wait_for_count(data->target);
return NULL;
}
int main() {
PthreadCounter counter;
pthread_t inc_thread, wait_thread;
ThreadData wait_data = { &counter, 3 };
// 创建等待线程
pthread_create(&wait_thread, NULL, waiter_thread, &wait_data);
// 创建递增线程
pthread_create(&inc_thread, NULL, incrementer_thread, &counter);
// 等待线程完成
pthread_join(wait_thread, NULL);
pthread_join(inc_thread, NULL);
std::cout << "Final count: " << counter.get_count() << std::endl;
return 0;
}
5. 线程通信机制
5.1 基于共享内存的通信
cpp
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <vector>
#include <atomic>
// 生产者-消费者模式
template<typename T>
class BoundedBuffer {
private:
std::vector<T> buffer;
size_t capacity;
size_t front;
size_t rear;
size_t count;
std::mutex mtx;
std::condition_variable not_full;
std::condition_variable not_empty;
public:
BoundedBuffer(size_t size)
: capacity(size), front(0), rear(0), count(0) {
buffer.resize(capacity);
}
void produce(const T& item) {
std::unique_lock<std::mutex> lock(mtx);
// 等待缓冲区不满
not_full.wait(lock, [this] { return count != capacity; });
buffer[rear] = item;
rear = (rear + 1) % capacity;
++count;
std::cout << "Produced: " << item
<< " (buffer size: " << count << ")" << std::endl;
not_empty.notify_one(); // 通知消费者
}
T consume() {
std::unique_lock<std::mutex> lock(mtx);
// 等待缓冲区不空
not_empty.wait(lock, [this] { return count != 0; });
T item = buffer[front];
front = (front + 1) % capacity;
--count;
std::cout << "Consumed: " << item
<< " (buffer size: " << count << ")" << std::endl;
not_full.notify_one(); // 通知生产者
return item;
}
};
int main() {
BoundedBuffer<int> buffer(5);
// 生产者线程
std::thread producer([&buffer]() {
for (int i = 0; i < 10; i++) {
buffer.produce(i);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
});
// 消费者线程
std::thread consumer([&buffer]() {
for (int i = 0; i < 10; i++) {
int item = buffer.consume();
std::this_thread::sleep_for(std::chrono::milliseconds(150));
}
});
producer.join();
consumer.join();
return 0;
}
5.2 基于消息传递的通信
cpp
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <iostream>
// 消息类型
struct Message {
enum Type { DATA, COMMAND, QUIT };
Type type;
int data;
std::string command;
Message(Type t, int d = 0, const std::string& cmd = "")
: type(t), data(d), command(cmd) {}
};
// 消息队列
class MessageQueue {
private:
std::queue<Message> messages;
std::mutex mtx;
std::condition_variable cv;
bool shutdown;
public:
MessageQueue() : shutdown(false) {}
void send(const Message& msg) {
std::lock_guard<std::mutex> lock(mtx);
if (!shutdown) {
messages.push(msg);
cv.notify_one();
}
}
bool receive(Message& msg, int timeout_ms = -1) {
std::unique_lock<std::mutex> lock(mtx);
if (timeout_ms < 0) {
// 无限等待
cv.wait(lock, [this] { return !messages.empty() || shutdown; });
} else {
// 超时等待
if (!cv.wait_for(lock, std::chrono::milliseconds(timeout_ms),
[this] { return !messages.empty() || shutdown; })) {
return false; // 超时
}
}
if (shutdown && messages.empty()) {
return false;
}
msg = messages.front();
messages.pop();
return true;
}
void close() {
std::lock_guard<std::mutex> lock(mtx);
shutdown = true;
cv.notify_all();
}
};
// 工作线程类
class Worker {
private:
MessageQueue& queue;
std::thread worker_thread;
bool running;
public:
Worker(MessageQueue& q) : queue(q), running(true) {
worker_thread = std::thread(&Worker::work_loop, this);
}
~Worker() {
if (worker_thread.joinable()) {
worker_thread.join();
}
}
private:
void work_loop() {
Message msg(Message::DATA);
while (running) {
if (queue.receive(msg, 1000)) { // 1秒超时
switch (msg.type) {
case Message::DATA:
process_data(msg.data);
break;
case Message::COMMAND:
execute_command(msg.command);
break;
case Message::QUIT:
running = false;
break;
}
} else {
std::cout << "Worker timeout, checking status..." << std::endl;
}
}
std::cout << "Worker thread terminated" << std::endl;
}
void process_data(int data) {
std::cout << "Processing data: " << data << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
void execute_command(const std::string& cmd) {
std::cout << "Executing command: " << cmd << std::endl;
}
};
int main() {
MessageQueue queue;
Worker worker(queue);
// 发送一些消息
for (int i = 0; i < 5; i++) {
queue.send(Message(Message::DATA, i * 10));
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
queue.send(Message(Message::COMMAND, 0, "status"));
queue.send(Message(Message::COMMAND, 0, "restart"));
// 发送退出消息
std::this_thread::sleep_for(std::chrono::seconds(1));
queue.send(Message(Message::QUIT));
// 等待工作线程结束
std::this_thread::sleep_for(std::chrono::seconds(2));
queue.close();
return 0;
}
6. 线程池设计
6.1 简单线程池实现
cpp
#include <thread>
#include <vector>
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <future>
#include <atomic>
#include <iostream>
class SimpleThreadPool {
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
std::atomic<bool> stop;
public:
SimpleThreadPool(size_t threads) : stop(false) {
for (size_t i = 0; i < threads; ++i) {
workers.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex);
condition.wait(lock, [this] {
return stop || !tasks.empty();
});
if (stop && tasks.empty()) {
return;
}
task = std::move(tasks.front());
tasks.pop();
}
task();
}
});
}
}
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
if (stop) {
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
return res;
}
~SimpleThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread &worker: workers) {
worker.join();
}
}
};
// 使用示例
int calculate(int x, int y) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return x + y;
}
void print_message(const std::string& msg) {
std::cout << "Message: " << msg << " (Thread: "
<< std::this_thread::get_id() << ")" << std::endl;
}
int main() {
SimpleThreadPool pool(4);
std::vector<std::future<int>> results;
// 提交计算任务
for (int i = 0; i < 8; i++) {
results.emplace_back(
pool.enqueue(calculate, i, i * 2)
);
}
// 提交打印任务
for (int i = 0; i < 5; i++) {
pool.enqueue(print_message, "Hello " + std::to_string(i));
}
// 获取计算结果
for (auto& result : results) {
std::cout << "Result: " << result.get() << std::endl;
}
return 0;
}
6.2 高级线程池实现
cpp
#include <thread>
#include <vector>
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <future>
#include <atomic>
#include <chrono>
#include <iostream>
class AdvancedThreadPool {
public:
enum class Priority { LOW, NORMAL, HIGH };
private:
struct Task {
std::function<void()> function;
Priority priority;
std::chrono::steady_clock::time_point submit_time;
Task(std::function<void()> f, Priority p)
: function(std::move(f)), priority(p),
submit_time(std::chrono::steady_clock::now()) {}
bool operator<(const Task& other) const {
if (priority != other.priority) {
return priority < other.priority; // 高优先级在前
}
return submit_time > other.submit_time; // 早提交的在前
}
};
std::vector<std::thread> workers;
std::priority_queue<Task> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
std::atomic<bool> stop;
// 统计信息
std::atomic<size_t> active_threads;
std::atomic<size_t> completed_tasks;
public:
AdvancedThreadPool(size_t threads)
: stop(false), active_threads(0), completed_tasks(0) {
for (size_t i = 0; i < threads; ++i) {
workers.emplace_back([this] {
while (true) {
Task task([](){}, Priority::LOW);
{
std::unique_lock<std::mutex> lock(queue_mutex);
condition.wait(lock, [this] {
return stop || !tasks.empty();
});
if (stop && tasks.empty()) {
return;
}
task = std::move(const_cast<Task&>(tasks.top()));
tasks.pop();
}
++active_threads;
task.function();
++completed_tasks;
--active_threads;
}
});
}
}
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args, Priority priority = Priority::NORMAL)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
if (stop) {
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks.emplace([task]() { (*task)(); }, priority);
}
condition.notify_one();
return res;
}
size_t get_queue_size() const {
std::unique_lock<std::mutex> lock(queue_mutex);
return tasks.size();
}
size_t get_active_threads() const {
return active_threads.load();
}
size_t get_completed_tasks() const {
return completed_tasks.load();
}
void wait_for_completion() {
while (get_queue_size() > 0 || get_active_threads() > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
~AdvancedThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread &worker: workers) {
worker.join();
}
}
};
// 测试函数
void heavy_task(int id, int duration_ms) {
std::cout << "Task " << id << " started (duration: "
<< duration_ms << "ms)" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms));
std::cout << "Task " << id << " completed" << std::endl;
}
int main() {
AdvancedThreadPool pool(3);
// 提交不同优先级的任务
std::cout << "Submitting tasks with different priorities..." << std::endl;
// 低优先级任务
for (int i = 0; i < 3; i++) {
pool.enqueue(heavy_task, i, 200, AdvancedThreadPool::Priority::LOW);
}
// 正常优先级任务
for (int i = 10; i < 13; i++) {
pool.enqueue(heavy_task, i, 150, AdvancedThreadPool::Priority::NORMAL);
}
// 高优先级任务
for (int i = 20; i < 23; i++) {
pool.enqueue(heavy_task, i, 100, AdvancedThreadPool::Priority::HIGH);
}
// 监控状态
std::thread monitor([&pool]() {
while (pool.get_queue_size() > 0 || pool.get_active_threads() > 0) {
std::cout << "Queue size: " << pool.get_queue_size()
<< ", Active threads: " << pool.get_active_threads()
<< ", Completed: " << pool.get_completed_tasks() << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
});
pool.wait_for_completion();
monitor.join();
std::cout << "All tasks completed. Total: "
<< pool.get_completed_tasks() << std::endl;
return 0;
}
7. 线程安全与原子操作
7.1 std::atomic基础用法
cpp
#include <atomic>
#include <thread>
#include <vector>
#include <iostream>
#include <chrono>
class AtomicCounter {
private:
std::atomic<int> count;
std::atomic<bool> ready;
public:
AtomicCounter() : count(0), ready(false) {}
void increment() {
count.fetch_add(1, std::memory_order_relaxed);
}
void decrement() {
count.fetch_sub(1, std::memory_order_relaxed);
}
int get_count() const {
return count.load(std::memory_order_relaxed);
}
void set_ready() {
ready.store(true, std::memory_order_release);
}
bool is_ready() const {
return ready.load(std::memory_order_acquire);
}
// 原子比较交换
bool compare_and_set(int expected, int new_value) {
return count.compare_exchange_strong(expected, new_value);
}
};
// 无锁栈实现
template<typename T>
class LockFreeStack {
private:
struct Node {
T data;
Node* next;
Node(const T& data_) : data(data_), next(nullptr) {}
};
std::atomic<Node*> head;
public:
LockFreeStack() : head(nullptr) {}
void push(const T& data) {
Node* new_node = new Node(data);
new_node->next = head.load();
// 原子地更新头指针
while (!head.compare_exchange_weak(new_node->next, new_node)) {
// 如果失败,new_node->next已经被更新为当前的head值
// 重试
}
}
bool pop(T& result) {
Node* old_head = head.load();
while (old_head &&
!head.compare_exchange_weak(old_head, old_head->next)) {
// 重试
}
if (old_head) {
result = old_head->data;
delete old_head;
return true;
}
return false;
}
bool empty() const {
return head.load() == nullptr;
}
~LockFreeStack() {
T dummy;
while (pop(dummy)) {
// 清空栈
}
}
};
int main() {
AtomicCounter counter;
LockFreeStack<int> stack;
std::vector<std::thread> threads;
// 测试原子计数器
for (int i = 0; i < 5; i++) {
threads.emplace_back([&counter, i]() {
for (int j = 0; j < 1000; j++) {
counter.increment();
}
std::cout << "Thread " << i << " finished incrementing" << std::endl;
});
}
// 测试无锁栈
threads.emplace_back([&stack]() {
for (int i = 0; i < 100; i++) {
stack.push(i);
}
std::cout << "Producer finished" << std::endl;
});
threads.emplace_back([&stack]() {
int value;
int count = 0;
while (count < 100) {
if (stack.pop(value)) {
count++;
if (count % 20 == 0) {
std::cout << "Consumer popped: " << value
<< " (total: " << count << ")" << std::endl;
}
} else {
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
}
std::cout << "Consumer finished" << std::endl;
});
for (auto& t : threads) {
t.join();
}
std::cout << "Final counter value: " << counter.get_count() << std::endl;
std::cout << "Stack empty: " << stack.empty() << std::endl;
return 0;
}
7.2 内存序列和同步
cpp
#include <atomic>
#include <thread>
#include <iostream>
#include <cassert>
class MemoryOrderExample {
private:
std::atomic<int> data;
std::atomic<bool> ready;
public:
MemoryOrderExample() : data(0), ready(false) {}
// 顺序一致性(默认)
void sequential_consistency_writer() {
data.store(42);
ready.store(true);
}
void sequential_consistency_reader() {
while (!ready.load()) {
std::this_thread::yield();
}
assert(data.load() == 42); // 保证能读到42
}
// 获取-释放语义
void acquire_release_writer() {
data.store(42, std::memory_order_relaxed);
ready.store(true, std::memory_order_release); // 释放操作
}
void acquire_release_reader() {
while (!ready.load(std::memory_order_acquire)) { // 获取操作
std::this_thread::yield();
}
assert(data.load(std::memory_order_relaxed) == 42); // 保证能读到42
}
// 宽松序列
void relaxed_writer() {
data.store(42, std::memory_order_relaxed);
ready.store(true, std::memory_order_relaxed);
}
void relaxed_reader() {
while (!ready.load(std::memory_order_relaxed)) {
std::this_thread::yield();
}
// 注意:这里不保证能读到42,因为使用了relaxed语义
int value = data.load(std::memory_order_relaxed);
std::cout << "Read value: " << value << std::endl;
}
void reset() {
data.store(0);
ready.store(false);
}
};
// Dekker算法实现(使用原子操作)
class DekkerMutex {
private:
std::atomic<bool> flag[2];
std::atomic<int> turn;
public:
DekkerMutex() : turn(0) {
flag[0].store(false);
flag[1].store(false);
}
void lock(int id) {
int other = 1 - id;
flag[id].store(true, std::memory_order_relaxed);
while (flag[other].load(std::memory_order_relaxed)) {
if (turn.load(std::memory_order_relaxed) != id) {
flag[id].store(false, std::memory_order_relaxed);
while (turn.load(std::memory_order_relaxed) != id) {
std::this_thread::yield();
}
flag[id].store(true, std::memory_order_relaxed);
}
}
}
void unlock(int id) {
turn.store(1 - id, std::memory_order_relaxed);
flag[id].store(false, std::memory_order_relaxed);
}
};
int shared_counter = 0;
DekkerMutex dekker_mutex;
void dekker_test_thread(int id) {
for (int i = 0; i < 10000; i++) {
dekker_mutex.lock(id);
shared_counter++; // 临界区
dekker_mutex.unlock(id);
}
}
int main() {
MemoryOrderExample example;
// 测试顺序一致性
std::cout << "Testing sequential consistency..." << std::endl;
std::thread writer1(&MemoryOrderExample::sequential_consistency_writer, &example);
std::thread reader1(&MemoryOrderExample::sequential_consistency_reader, &example);
writer1.join();
reader1.join();
example.reset();
// 测试获取-释放语义
std::cout << "Testing acquire-release..." << std::endl;
std::thread writer2(&MemoryOrderExample::acquire_release_writer, &example);
std::thread reader2(&MemoryOrderExample::acquire_release_reader, &example);
writer2.join();
reader2.join();
example.reset();
// 测试宽松序列
std::cout << "Testing relaxed ordering..." << std::endl;
std::thread writer3(&MemoryOrderExample::relaxed_writer, &example);
std::thread reader3(&MemoryOrderExample::relaxed_reader, &example);
writer3.join();
reader3.join();
// 测试Dekker互斥算法
std::cout << "Testing Dekker mutex..." << std::endl;
std::thread t1(dekker_test_thread, 0);
std::thread t2(dekker_test_thread, 1);
t1.join();
t2.join();
std::cout << "Final counter value: " << shared_counter << std::endl;
std::cout << "Expected: " << 20000 << std::endl;
return 0;
}
8. 高级线程编程
8.1 线程局部存储(Thread Local Storage)
cpp
#include <thread>
#include <iostream>
#include <vector>
#include <random>
// 全局线程局部变量
thread_local int tls_counter = 0;
thread_local std::string tls_name;
class ThreadLocalExample {
private:
thread_local static int instance_count;
int id;
public:
ThreadLocalExample() : id(++instance_count) {
std::cout << "ThreadLocalExample created with ID: " << id
<< " in thread: " << std::this_thread::get_id() << std::endl;
}
int get_id() const { return id; }
static int get_instance_count() { return instance_count; }
};
// 必须在类外定义thread_local静态成员
thread_local int ThreadLocalExample::instance_count = 0;
void worker_function(int thread_id) {
// 设置线程局部变量
tls_name = "Thread_" + std::to_string(thread_id);
// 创建线程局部对象
ThreadLocalExample obj1;
ThreadLocalExample obj2;
// 执行一些工作
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(1, 10);
for (int i = 0; i < 5; i++) {
tls_counter += dis(gen);
std::cout << tls_name << " - Counter: " << tls_counter
<< ", Objects: " << ThreadLocalExample::get_instance_count()
<< std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
std::cout << tls_name << " finished. Final counter: " << tls_counter << std::endl;
}
int main() {
std::vector<std::thread> threads;
// 创建多个线程
for (int i = 0; i < 4; i++) {
threads.emplace_back(worker_function, i);
}
// 主线程也有自己的线程局部存储
tls_name = "Main_Thread";
tls_counter = 1000;
ThreadLocalExample main_obj;
std::cout << "Main thread - Counter: " << tls_counter
<< ", Objects: " << ThreadLocalExample::get_instance_count()
<< std::endl;
// 等待所有线程完成
for (auto& t : threads) {
t.join();
}
std::cout << "Main thread final counter: " << tls_counter << std::endl;
return 0;
}
8.2 协程式任务调度
cpp
#include <thread>
#include <future>
#include <functional>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <chrono>
// 简单的协作式调度器
class CooperativeScheduler {
public:
using Task = std::function<void()>;
private:
std::queue<Task> tasks;
std::mutex task_mutex;
std::condition_variable cv;
std::atomic<bool> running;
std::vector<std::thread> workers;
public:
CooperativeScheduler(size_t num_workers = 1) : running(true) {
for (size_t i = 0; i < num_workers; ++i) {
workers.emplace_back([this]() { worker_loop(); });
}
}
void schedule(Task task) {
{
std::lock_guard<std::mutex> lock(task_mutex);
tasks.push(std::move(task));
}
cv.notify_one();
}
template<typename Callable>
auto async_task(Callable&& callable) -> std::future<decltype(callable())> {
using return_type = decltype(callable());
auto promise = std::make_shared<std::promise<return_type>>();
auto future = promise->get_future();
schedule([promise, callable = std::forward<Callable>(callable)]() {
try {
if constexpr (std::is_void_v<return_type>) {
callable();
promise->set_value();
} else {
promise->set_value(callable());
}
} catch (...) {
promise->set_exception(std::current_exception());
}
});
return future;
}
void yield() {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
void shutdown() {
{
std::lock_guard<std::mutex> lock(task_mutex);
running = false;
}
cv.notify_all();
for (auto& worker : workers) {
if (worker.joinable()) {
worker.join();
}
}
}
~CooperativeScheduler() {
shutdown();
}
private:
void worker_loop() {
while (running) {
Task task;
{
std::unique_lock<std::mutex> lock(task_mutex);
cv.wait(lock, [this] { return !tasks.empty() || !running; });
if (!running && tasks.empty()) {
break;
}
if (!tasks.empty()) {
task = std::move(tasks.front());
tasks.pop();
}
}
if (task) {
task();
}
}
}
};
// 使用示例:模拟协程式的异步操作
class AsyncFileProcessor {
private:
CooperativeScheduler& scheduler;
public:
AsyncFileProcessor(CooperativeScheduler& sched) : scheduler(sched) {}
std::future<std::string> read_file_async(const std::string& filename) {
return scheduler.async_task([this, filename]() -> std::string {
std::cout << "Starting to read file: " << filename << std::endl;
// 模拟文件读取
for (int i = 0; i < 5; i++) {
std::cout << "Reading chunk " << i << " of " << filename << std::endl;
scheduler.yield(); // 让出执行权
}
std::cout << "Finished reading file: " << filename << std::endl;
return "Content of " + filename;
});
}
std::future<void> process_file_async(const std::string& content) {
return scheduler.async_task([this, content]() {
std::cout << "Starting to process: " << content << std::endl;
// 模拟处理
for (int i = 0; i < 3; i++) {
std::cout << "Processing step " << i << std::endl;
scheduler.yield(); // 让出执行权
}
std::cout << "Finished processing: " << content << std::endl;
});
}
};
int main() {
CooperativeScheduler scheduler(2); // 2个工作线程
AsyncFileProcessor processor(scheduler);
// 启动多个异步操作
std::vector<std::future<std::string>> read_futures;
std::vector<std::future<void>> process_futures;
// 读取多个文件
for (int i = 0; i < 3; i++) {
read_futures.push_back(
processor.read_file_async("file" + std::to_string(i) + ".txt")
);
}
// 等待读取完成,然后处理
for (auto& future : read_futures) {
std::string content = future.get();
process_futures.push_back(
processor.process_file_async(content)
);
}
// 等待所有处理完成
for (auto& future : process_futures) {
future.get();
}
std::cout << "All operations completed!" << std::endl;
return 0;
}
8.3 基于事件的异步编程
cpp
#include <thread>
#include <functional>
#include <unordered_map>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <iostream>
#include <chrono>
#include <typeindex>
#include <any>
// 事件系统基础类
class Event {
public:
virtual ~Event() = default;
virtual std::type_index get_type() const = 0;
};
template<typename T>
class TypedEvent : public Event {
public:
T data;
TypedEvent(const T& data_) : data(data_) {}
std::type_index get_type() const override {
return std::type_index(typeid(T));
}
};
// 事件总线
class EventBus {
public:
using EventHandler = std::function<void(const Event&)>;
private:
std::unordered_map<std::type_index, std::vector<EventHandler>> handlers;
std::queue<std::unique_ptr<Event>> event_queue;
std::mutex mutex;
std::condition_variable cv;
std::atomic<bool> running;
std::thread dispatcher_thread;
public:
EventBus() : running(true) {
dispatcher_thread = std::thread([this]() { dispatch_loop(); });
}
~EventBus() {
shutdown();
}
template<typename T>
void subscribe(std::function<void(const T&)> handler) {
std::lock_guard<std::mutex> lock(mutex);
auto wrapper = [handler](const Event& event) {
const auto& typed_event = static_cast<const TypedEvent<T>&>(event);
handler(typed_event.data);
};
handlers[std::type_index(typeid(T))].push_back(wrapper);
}
template<typename T>
void publish(const T& event_data) {
{
std::lock_guard<std::mutex> lock(mutex);
event_queue.push(std::make_unique<TypedEvent<T>>(event_data));
}
cv.notify_one();
}
void shutdown() {
{
std::lock_guard<std::mutex> lock(mutex);
running = false;
}
cv.notify_one();
if (dispatcher_thread.joinable()) {
dispatcher_thread.join();
}
}
private:
void dispatch_loop() {
while (running) {
std::unique_ptr<Event> event;
{
std::unique_lock<std::mutex> lock(mutex);
cv.wait(lock, [this] { return !event_queue.empty() || !running; });
if (!running && event_queue.empty()) {
break;
}
if (!event_queue.empty()) {
event = std::move(event_queue.front());
event_queue.pop();
}
}
if (event) {
auto type = event->get_type();
std::lock_guard<std::mutex> lock(mutex);
auto it = handlers.find(type);
if (it != handlers.end()) {
for (const auto& handler : it->second) {
handler(*event);
}
}
}
}
}
};
// 事件类型定义
struct UserLoginEvent {
std::string username;
std::chrono::system_clock::time_point timestamp;
};
struct DataProcessedEvent {
int process_id;
std::string result;
bool success;
};
struct SystemShutdownEvent {
std::string reason;
};
// 事件处理服务
class UserService {
public:
void handle_login(const UserLoginEvent& event) {
std::cout << "UserService: User " << event.username
<< " logged in at timestamp" << std::endl;
// 模拟用户验证处理
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
};
class LoggingService {
public:
void handle_login(const UserLoginEvent& event) {
std::cout << "LoggingService: Logging user login - "
<< event.username << std::endl;
}
void handle_data_processed(const DataProcessedEvent& event) {
std::cout << "LoggingService: Process " << event.process_id
<< " completed with result: " << event.result
<< " (success: " << event.success << ")" << std::endl;
}
void handle_shutdown(const SystemShutdownEvent& event) {
std::cout << "LoggingService: System shutting down - "
<< event.reason << std::endl;
}
};
class DataProcessor {
private:
EventBus& event_bus;
public:
DataProcessor(EventBus& bus) : event_bus(bus) {}
void process_data(int id, const std::string& data) {
std::cout << "DataProcessor: Processing data for ID " << id << std::endl;
// 模拟数据处理
std::this_thread::sleep_for(std::chrono::milliseconds(200));
bool success = (id % 2 == 0); // 偶数ID成功
std::string result = success ? "processed_" + data : "failed";
// 发布处理完成事件
event_bus.publish(DataProcessedEvent{id, result, success});
}
};
int main() {
EventBus event_bus;
// 创建服务实例
UserService user_service;
LoggingService logging_service;
DataProcessor data_processor(event_bus);
// 订阅事件
event_bus.subscribe<UserLoginEvent>(
[&user_service](const UserLoginEvent& event) {
user_service.handle_login(event);
}
);
event_bus.subscribe<UserLoginEvent>(
[&logging_service](const UserLoginEvent& event) {
logging_service.handle_login(event);
}
);
event_bus.subscribe<DataProcessedEvent>(
[&logging_service](const DataProcessedEvent& event) {
logging_service.handle_data_processed(event);
}
);
event_bus.subscribe<SystemShutdownEvent>(
[&logging_service](const SystemShutdownEvent& event) {
logging_service.handle_shutdown(event);
}
);
// 模拟事件发布
std::vector<std::thread> workers;
// 用户登录事件
workers.emplace_back([&event_bus]() {
for (int i = 0; i < 3; i++) {
event_bus.publish(UserLoginEvent{
"user" + std::to_string(i),
std::chrono::system_clock::now()
});
std::this_thread::sleep_for(std::chrono::milliseconds(300));
}
});
// 数据处理任务
workers.emplace_back([&data_processor]() {
for (int i = 0; i < 5; i++) {
data_processor.process_data(i, "data_" + std::to_string(i));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
});
// 等待所有工作完成
for (auto& worker : workers) {
worker.join();
}
// 发布系统关闭事件
event_bus.publish(SystemShutdownEvent{"Normal shutdown"});
// 等待事件处理完成
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "All events processed!" << std::endl;
return 0;
}
9. 常见面试题汇总
9.1 基础概念题
Q1: 进程和线程的区别?
- 内存模型:进程有独立地址空间,线程共享进程地址空间
- 创建开销:线程创建开销小,进程开销大
- 通信方式:线程直接共享内存,进程需要IPC
- 安全性:进程更安全,线程一个崩溃影响整个进程
Q2: 什么是线程安全?
- 多个线程同时访问共享数据时,程序行为仍然正确
- 需要通过同步机制(互斥锁、原子操作等)保证
- 线程安全的代码可以被多个线程并发调用
Q3: C++11线程库 vs pthread的区别?
特性 | C++11 std::thread | pthread |
---|---|---|
跨平台性 | 跨平台 | 主要Unix/Linux |
类型安全 | 类型安全 | 基于void* |
异常安全 | 支持异常 | 错误码机制 |
RAII | 支持 | 需要手动管理 |
性能 | 略有开销 | 更接近系统调用 |
9.2 同步机制题
Q4: mutex、lock_guard、unique_lock的区别?
- mutex:基础互斥锁,需要手动lock/unlock
- lock_guard:RAII包装,自动管理锁的生命周期,不可转移
- unique_lock:功能更强大,支持延迟锁定、条件变量、锁转移
Q5: 什么是死锁?如何避免?
定义:两个或多个线程互相等待对方释放资源
必要条件:互斥、占有且等待、不可抢占、循环等待
避免方法
:
- 锁排序:统一加锁顺序
- 超时机制:try_lock_for
- 避免嵌套锁定
- 使用std::lock同时获取多个锁
Q6: 条件变量的虚假唤醒是什么?
- 线程在没有满足条件的情况下被唤醒
- 原因:系统优化、信号处理等
- 解决:总是在循环中检查条件
cpp
// 正确做法
while (!condition) {
cv.wait(lock);
}
// 错误做法
if (!condition) {
cv.wait(lock); // 可能虚假唤醒
}
9.3 原子操作题
Q7: 什么是原子操作?有什么作用?
- 不可分割的操作,要么全部完成,要么全部不执行
- 避免数据竞争,实现无锁编程
- 比互斥锁性能更好,但使用复杂
Q8: memory_order的几种类型?
- memory_order_relaxed:最宽松,只保证原子性
- memory_order_acquire:获取语义,读操作
- memory_order_release:释放语义,写操作
- memory_order_acq_rel:获取-释放语义
- memory_order_seq_cst:顺序一致性(默认)
Q9: compare_exchange_strong和compare_exchange_weak的区别?
- strong:比较失败时一定不会更新
- weak:可能会有"虚假失败",在某些架构上性能更好
- weak版本通常在循环中使用
9.4 高级应用题
Q10: 如何设计线程池?
- 组件:任务队列、工作线程、同步机制
- 考虑因素:线程数量、任务分发、负载均衡、异常处理
- 优化:任务窃取、优先级队列、动态调整
Q11: 什么是线程局部存储?使用场景?
- 每个线程独有的存储空间
- 使用thread_local关键字
- 场景:错误码、随机数生成器、缓存等
Q12: 如何实现无锁数据结构?
- 使用原子操作和CAS(Compare-And-Swap)
- 处理ABA问题
- 内存回收问题(使用hazard pointer或epoch-based回收)
10. 实战编程题
10.1 生产者-消费者问题(多种实现)
cpp
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <atomic>
#include <iostream>
#include <vector>
// 方案1:使用mutex和condition_variable
template<typename T>
class BlockingQueue {
private:
std::queue<T> queue;
mutable std::mutex mtx;
std::condition_variable not_empty;
std::condition_variable not_full;
size_t capacity;
public:
BlockingQueue(size_t cap) : capacity(cap) {}
void push(const T& item) {
std::unique_lock<std::mutex> lock(mtx);
not_full.wait(lock, [this] { return queue.size() < capacity; });
queue.push(item);
not_empty.notify_one();
}
T pop() {
std::unique_lock<std::mutex> lock(mtx);
not_empty.wait(lock, [this] { return !queue.empty(); });
T item = queue.front();
queue.pop();
not_full.notify_one();
return item;
}
};
// 方案2:使用原子操作的无锁队列
template<typename T>
class LockFreeQueue {
private:
struct Node {
std::atomic<T*> data;
std::atomic<Node*> next;
Node() : data(nullptr), next(nullptr) {}
};
std::atomic<Node*> head;
std::atomic<Node*> tail;
public:
LockFreeQueue() {
Node* dummy = new Node;
head.store(dummy);
tail.store(dummy);
}
void enqueue(T item) {
Node* new_node = new Node;
T* data = new T(std::move(item));
new_node->data.store(data);
Node* prev_tail = tail.exchange(new_node);
prev_tail->next.store(new_node);
}
bool dequeue(T& result) {
Node* head_node = head.load();
Node* next = head_node->next.load();
if (next == nullptr) {
return false; // 队列为空
}
T* data = next->data.load();
if (data == nullptr) {
return false;
}
if (head.compare_exchange_weak(head_node, next)) {
result = *data;
delete data;
delete head_node;
return true;
}
return false;
}
};
// 测试函数
void test_producer_consumer() {
std::cout << "Testing Producer-Consumer with BlockingQueue..." << std::endl;
BlockingQueue<int> queue(10);
std::atomic<bool> finished(false);
// 生产者
std::thread producer([&queue, &finished]() {
for (int i = 0; i < 20; i++) {
queue.push(i);
std::cout << "Produced: " << i << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
finished = true;
});
// 消费者
std::thread consumer([&queue, &finished]() {
while (!finished || true) {
try {
int item = queue.pop();
std::cout << "Consumed: " << item << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
} catch (...) {
if (finished) break;
}
}
});
producer.join();
std::this_thread::sleep_for(std::chrono::seconds(1));
consumer.join();
}
10.2 哲学家进餐问题
cpp
#include <thread>
#include <mutex>
#include <vector>
#include <iostream>
#include <chrono>
#include <random>
class DiningPhilosophers {
private:
static const int NUM_PHILOSOPHERS = 5;
std::vector<std::mutex> forks;
std::vector<std::thread> philosophers;
public:
DiningPhilosophers() : forks(NUM_PHILOSOPHERS) {}
void start() {
for (int i = 0; i < NUM_PHILOSOPHERS; i++) {
philosophers.emplace_back(&DiningPhilosophers::philosopher_life, this, i);
}
}
void join() {
for (auto& philosopher : philosophers) {
philosopher.join();
}
}
private:
void philosopher_life(int id) {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> think_time(100, 500);
std::uniform_int_distribution<> eat_time(200, 400);
for (int i = 0; i < 5; i++) { // 每个哲学家吃5次
think(id, think_time(gen));
// 避免死锁:奇数号哲学家先拿左叉子,偶数号先拿右叉子
if (id % 2 == 0) {
pick_up_forks_even(id);
} else {
pick_up_forks_odd(id);
}
eat(id, eat_time(gen));
put_down_forks(id);
}
}
void think(int id, int duration) {
std::cout << "Philosopher " << id << " is thinking..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(duration));
}
void eat(int id, int duration) {
std::cout << "Philosopher " << id << " is eating..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(duration));
std::cout << "Philosopher " << id << " finished eating" << std::endl;
}
void pick_up_forks_even(int id) {
int left_fork = id;
int right_fork = (id + 1) % NUM_PHILOSOPHERS;
forks[left_fork].lock();
std::cout << "Philosopher " << id << " picked up left fork " << left_fork << std::endl;
forks[right_fork].lock();
std::cout << "Philosopher " << id << " picked up right fork " << right_fork << std::endl;
}
void pick_up_forks_odd(int id) {
int left_fork = id;
int right_fork = (id + 1) % NUM_PHILOSOPHERS;
forks[right_fork].lock();
std::cout << "Philosopher " << id << " picked up right fork " << right_fork << std::endl;
forks[left_fork].lock();
std::cout << "Philosopher " << id << " picked up left fork " << left_fork << std::endl;
}
void put_down_forks(int id) {
int left_fork = id;
int right_fork = (id + 1) % NUM_PHILOSOPHERS;
forks[left_fork].unlock();
forks[right_fork].unlock();
std::cout << "Philosopher " << id << " put down both forks" << std::endl;
}
};
// 使用std::lock的改进版本
class ImprovedDiningPhilosophers {
private:
static const int NUM_PHILOSOPHERS = 5;
std::vector<std::mutex> forks;
std::vector<std::thread> philosophers;
public:
ImprovedDiningPhilosophers() : forks(NUM_PHILOSOPHERS) {}
void start() {
for (int i = 0; i < NUM_PHILOSOPHERS; i++) {
philosophers.emplace_back(&ImprovedDiningPhilosophers::philosopher_life, this, i);
}
}
void join() {
for (auto& philosopher : philosophers) {
philosopher.join();
}
}
private:
void philosopher_life(int id) {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> think_time(100, 500);
std::uniform_int_distribution<> eat_time(200, 400);
for (int i = 0; i < 3; i++) {
think(id, think_time(gen));
pick_up_forks(id);
eat(id, eat_time(gen));
put_down_forks(id);
}
}
void think(int id, int duration) {
std::cout << "Philosopher " << id << " is thinking..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(duration));
}
void eat(int id, int duration) {
std::cout << "Philosopher " << id << " is eating..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(duration));
std::cout << "Philosopher " << id << " finished eating" << std::endl;
}
void pick_up_forks(int id) {
int left_fork = id;
int right_fork = (id + 1) % NUM_PHILOSOPHERS;
// 使用std::lock同时获取两个锁,避免死锁
std::lock(forks[left_fork], forks[right_fork]);
std::cout << "Philosopher " << id << " picked up both forks" << std::endl;
}
void put_down_forks(int id) {
int left_fork = id;
int right_fork = (id + 1) % NUM_PHILOSOPHERS;
forks[left_fork].unlock();
forks[right_fork].unlock();
std::cout << "Philosopher " << id << " put down both forks" << std::endl;
}
};
10.3 读者-写者问题
cpp
#include <shared_mutex>
#include <thread>
#include <iostream>
#include <vector>
#include <random>
#include <atomic>
class ReadersWriters {
private:
std::shared_mutex rw_mutex;
std::atomic<int> data;
std::atomic<int> reader_count;
std::atomic<int> writer_count;
public:
ReadersWriters() : data(0), reader_count(0), writer_count(0) {}
void read(int reader_id) {
{
std::shared_lock<std::shared_mutex> lock(rw_mutex);
reader_count++;
// 读取数据
int value = data.load();
std::cout << "Reader " << reader_id << " read value: " << value
<< " (active readers: " << reader_count.load() << ")" << std::endl;
// 模拟读取时间
std::this_thread::sleep_for(std::chrono::milliseconds(100));
reader_count--;
}
}
void write(int writer_id, int new_value) {
{
std::unique_lock<std::shared_mutex> lock(rw_mutex);
writer_count++;
std::cout << "Writer " << writer_id << " writing value: " << new_value << std::endl;
// 写入数据
data.store(new_value);
// 模拟写入时间
std::this_thread::sleep_for(std::chrono::milliseconds(200));
std::cout << "Writer " << writer_id << " finished writing" << std::endl;
writer_count--;
}
}
void start_simulation() {
std::vector<std::thread> threads;
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(1, 100);
// 创建读者线程
for (int i = 0; i < 5; i++) {
threads.emplace_back([this, i, &gen, &dis]() {
for (int j = 0; j < 3; j++) {
std::this_thread::sleep_for(std::chrono::milliseconds(dis(gen)));
read(i);
}
});
}
// 创建写者线程
for (int i = 0; i < 2; i++) {
threads.emplace_back([this, i, &gen, &dis]() {
for (int j = 0; j < 3; j++) {
std::this_thread::sleep_for(std::chrono::milliseconds(dis(gen) * 2));
write(i, i * 100 + j);
}
});
}
// 等待所有线程完成
for (auto& t : threads) {
t.join();
}
std::cout << "Final data value: " << data.load() << std::endl;
}
};
// 优先写者的读写锁实现
class WriterPreferredRW {
private:
std::mutex read_mutex;
std::mutex write_mutex;
std::mutex reader_count_mutex;
std::condition_variable reader_cv;
std::condition_variable writer_cv;
int reader_count;
int writer_count;
int waiting_writers;
bool writer_active;
std::atomic<int> data;
public:
WriterPreferredRW()
: reader_count(0), writer_count(0), waiting_writers(0),
writer_active(false), data(0) {}
void read(int reader_id) {
std::unique_lock<std::mutex> lock(read_mutex);
// 等待直到没有写者在等待或活动
reader_cv.wait(lock, [this] {
return waiting_writers == 0 && !writer_active;
});
reader_count++;
lock.unlock();
// 读取数据
int value = data.load();
std::cout << "Reader " << reader_id << " read: " << value << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
lock.lock();
reader_count--;
if (reader_count == 0) {
writer_cv.notify_one(); // 通知等待的写者
}
}
void write(int writer_id, int new_value) {
std::unique_lock<std::mutex> lock(write_mutex);
waiting_writers++;
writer_cv.wait(lock, [this] {
return reader_count == 0 && !writer_active;
});
waiting_writers--;
writer_active = true;
lock.unlock();
// 写入数据
std::cout << "Writer " << writer_id << " writing: " << new_value << std::endl;
data.store(new_value);
std::this_thread::sleep_for(std::chrono::milliseconds(200));
lock.lock();
writer_active = false;
if (waiting_writers > 0) {
writer_cv.notify_one(); // 优先通知其他写者
} else {
reader_cv.notify_all(); // 通知所有读者
}
}
};
10.4 并行矩阵乘法
cpp
#include <thread>
#include <vector>
#include <iostream>
#include <chrono>
#include <random>
#include <future>
class ParallelMatrixMultiply {
public:
using Matrix = std::vector<std::vector<double>>;
// 串行矩阵乘法
static Matrix multiply_serial(const Matrix& A, const Matrix& B) {
size_t rows_A = A.size();
size_t cols_A = A[0].size();
size_t cols_B = B[0].size();
Matrix C(rows_A, std::vector<double>(cols_B, 0.0));
for (size_t i = 0; i < rows_A; i++) {
for (size_t j = 0; j < cols_B; j++) {
for (size_t k = 0; k < cols_A; k++) {
C[i][j] += A[i][k] * B[k][j];
}
}
}
return C;
}
// 并行矩阵乘法(按行分割)
static Matrix multiply_parallel_rows(const Matrix& A, const Matrix& B, int num_threads = 4) {
size_t rows_A = A.size();
size_t cols_A = A[0].size();
size_t cols_B = B[0].size();
Matrix C(rows_A, std::vector<double>(cols_B, 0.0));
std::vector<std::thread> threads;
auto worker = [&](size_t start_row, size_t end_row) {
for (size_t i = start_row; i < end_row; i++) {
for (size_t j = 0; j < cols_B; j++) {
for (size_t k = 0; k < cols_A; k++) {
C[i][j] += A[i][k] * B[k][j];
}
}
}
};
size_t rows_per_thread = rows_A / num_threads;
for (int t = 0; t < num_threads; t++) {
size_t start_row = t * rows_per_thread;
size_t end_row = (t == num_threads - 1) ? rows_A : (t + 1) * rows_per_thread;
threads.emplace_back(worker, start_row, end_row);
}
for (auto& thread : threads) {
thread.join();
}
return C;
}
// 并行矩阵乘法(块分割)
static Matrix multiply_parallel_blocks(const Matrix& A, const Matrix& B, int num_threads = 4) {
size_t rows_A = A.size();
size_t cols_A = A[0].size();
size_t cols_B = B[0].size();
Matrix C(rows_A, std::vector<double>(cols_B, 0.0));
std::vector<std::future<void>> futures;
auto worker = [&](size_t start_i, size_t end_i, size_t start_j, size_t end_j) {
for (size_t i = start_i; i < end_i; i++) {
for (size_t j = start_j; j < end_j; j++) {
for (size_t k = 0; k < cols_A; k++) {
C[i][j] += A[i][k] * B[k][j];
}
}
}
};
int block_size = std::sqrt(num_threads);
size_t block_rows = rows_A / block_size;
size_t block_cols = cols_B / block_size;
for (int bi = 0; bi < block_size; bi++) {
for (int bj = 0; bj < block_size; bj++) {
size_t start_i = bi * block_rows;
size_t end_i = (bi == block_size - 1) ? rows_A : (bi + 1) * block_rows;
size_t start_j = bj * block_cols;
size_t end_j = (bj == block_size - 1) ? cols_B : (bj + 1) * block_cols;
futures.push_back(
std::async(std::launch::async, worker, start_i, end_i, start_j, end_j)
);
}
}
for (auto& future : futures) {
future.get();
}
return C;
}
// 生成随机矩阵
static Matrix generate_random_matrix(size_t rows, size_t cols) {
Matrix matrix(rows, std::vector<double>(cols));
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_real_distribution<> dis(0.0, 1.0);
for (size_t i = 0; i < rows; i++) {
for (size_t j = 0; j < cols; j++) {
matrix[i][j] = dis(gen);
}
}
return matrix;
}
// 性能测试
static void benchmark() {
const size_t SIZE = 500;
std::cout << "Generating random matrices (" << SIZE << "x" << SIZE << ")..." << std::endl;
Matrix A = generate_random_matrix(SIZE, SIZE);
Matrix B = generate_random_matrix(SIZE, SIZE);
// 串行版本
auto start = std::chrono::high_resolution_clock::now();
Matrix C_serial = multiply_serial(A, B);
auto end = std::chrono::high_resolution_clock::now();
auto serial_time = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << "Serial multiplication time: " << serial_time.count() << " ms" << std::endl;
// 并行版本(按行)
start = std::chrono::high_resolution_clock::now();
Matrix C_parallel_rows = multiply_parallel_rows(A, B, 4);
end = std::chrono::high_resolution_clock::now();
auto parallel_rows_time = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << "Parallel rows multiplication time: " << parallel_rows_time.count() << " ms" << std::endl;
// 并行版本(块分割)
start = std::chrono::high_resolution_clock::now();
Matrix C_parallel_blocks = multiply_parallel_blocks(A, B, 4);
end = std::chrono::high_resolution_clock::now();
auto parallel_blocks_time = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << "Parallel blocks multiplication time: " << parallel_blocks_time.count() << " ms" << std::endl;
// 计算加速比
double speedup_rows = (double)serial_time.count() / parallel_rows_time.count();
double speedup_blocks = (double)serial_time.count() / parallel_blocks_time.count();
std::cout << "Speedup (rows): " << speedup_rows << "x" << std::endl;
std::cout << "Speedup (blocks): " << speedup_blocks << "x" << std::endl;
}
};
int main() {
std::cout << "=== Producer-Consumer Test ===" << std::endl;
test_producer_consumer();
std::cout << "\n=== Dining Philosophers Test ===" << std::endl;
DiningPhilosophers philosophers;
philosophers.start();
philosophers.join();
std::cout << "\n=== Readers-Writers Test ===" << std::endl;
ReadersWriters rw;
rw.start_simulation();
std::cout << "\n=== Matrix Multiplication Benchmark ===" << std::endl;
ParallelMatrixMultiply::benchmark();
return 0;
}
总结
C++线程编程是现代软件开发的核心技能,涉及多个层面的知识:
关键要点
- 基础理解:线程模型、内存共享、同步需求
- 标准库掌握:std::thread、mutex、condition_variable、atomic
- 同步机制:互斥锁、读写锁、条件变量、信号量
- 高级技术:线程池、无锁编程、原子操作
- 实际应用:生产者消费者、读写者、哲学家进餐等经典问题
面试准备建议
- 理论基础:理解线程同步原理和各种锁机制
- 编程实践:能够实现基本的多线程程序和同步机制
- 性能优化:了解无锁编程和原子操作
- 问题解决:能够分析和解决死锁、竞态条件等问题
- 设计能力:能够设计线程池、异步任务系统等
线程编程的关键在于平衡性能和正确性,既要充分利用多核优势,又要确保程序的线程安全。在面试中,不仅要掌握语法和API,更要理解底层原理和最佳实践。