Skip to content

C++线程编程完整面试指南

目录

  1. 线程基础概念
  2. C++11标准线程库
  3. POSIX线程(pthread)
  4. 线程同步与互斥
  5. 线程通信机制
  6. 线程池设计
  7. 线程安全与原子操作
  8. 高级线程编程
  9. 常见面试题汇总
  10. 实战编程题

1. 线程基础概念

1.1 线程 vs 进程对比

特性线程进程
内存空间共享进程地址空间独立虚拟地址空间
创建开销小(只需分配栈空间)大(需要复制页表等)
切换开销小(只需切换寄存器)大(需要切换页表)
通信方式直接访问共享内存IPC(管道、共享内存等)
数据安全需要同步机制保护天然隔离,安全性好
崩溃影响整个进程崩溃只影响单个进程
资源消耗内存消耗小内存消耗大

1.2 线程的内存模型

进程地址空间
┌─────────────┐
│   内核空间   │
├─────────────┤
│   线程1栈   │ ← 每个线程独有
├─────────────┤
│   线程2栈   │ ← 每个线程独有
├─────────────┤
│   线程3栈   │ ← 每个线程独有
├─────────────┤
│             │
│    堆区     │ ← 所有线程共享
│             │
├─────────────┤
│   数据段    │ ← 所有线程共享
├─────────────┤
│   代码段    │ ← 所有线程共享
└─────────────┘

1.3 线程状态转换

┌─────────┐   创建   ┌─────────┐
│ 不存在   │ ─────→  │ 新建    │
└─────────┘         └─────────┘

                          │ 启动

                    ┌─────────┐
                ┌─→ │ 就绪    │ ←─┐
                │   └─────────┘   │
                │         │       │
                │ 时间片  │ 调度  │ I/O完成
                │ 用完    ↓       │ 或唤醒
                │   ┌─────────┐   │
                └── │ 运行    │ ──┘
                    └─────────┘

                          │ 等待I/O
                          │ 或阻塞

                    ┌─────────┐
                    │ 阻塞    │
                    └─────────┘

                          │ join/detach

                    ┌─────────┐
                    │ 终止    │
                    └─────────┘

2. C++11标准线程库

2.1 std::thread基础用法

cpp
#include <thread>
#include <iostream>
#include <chrono>

// 普通函数作为线程函数
void thread_function(int id, const std::string& name) {
    for (int i = 0; i < 5; i++) {
        std::cout << "Thread " << id << " (" << name << "): " << i << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

// 类成员函数作为线程函数
class ThreadWorker {
public:
    void work(int iterations) {
        for (int i = 0; i < iterations; i++) {
            std::cout << "Worker thread: " << i << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(50));
        }
    }
    
    void operator()(int iterations) {
        work(iterations);
    }
};

int main() {
    // 1. 使用普通函数创建线程
    std::thread t1(thread_function, 1, "Function Thread");
    
    // 2. 使用Lambda表达式创建线程
    std::thread t2([](int id) {
        for (int i = 0; i < 3; i++) {
            std::cout << "Lambda Thread " << id << ": " << i << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(150));
        }
    }, 2);
    
    // 3. 使用类成员函数创建线程
    ThreadWorker worker;
    std::thread t3(&ThreadWorker::work, &worker, 3);
    
    // 4. 使用函数对象创建线程
    std::thread t4(worker, 4);
    
    // 等待所有线程完成
    t1.join();
    t2.join();
    t3.join();
    t4.join();
    
    std::cout << "All threads completed!" << std::endl;
    return 0;
}

2.2 线程管理

cpp
#include <thread>
#include <iostream>
#include <vector>

void worker_task(int id) {
    std::cout << "Worker " << id << " started, thread ID: " 
              << std::this_thread::get_id() << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::cout << "Worker " << id << " finished" << std::endl;
}

int main() {
    std::vector<std::thread> threads;
    
    // 创建多个线程
    for (int i = 0; i < 4; i++) {
        threads.emplace_back(worker_task, i);
    }
    
    // 获取硬件并发数
    unsigned int num_cores = std::thread::hardware_concurrency();
    std::cout << "Hardware concurrency: " << num_cores << std::endl;
    
    // 等待所有线程完成
    for (auto& t : threads) {
        if (t.joinable()) {
            t.join();
        }
    }
    
    // 演示detach的使用
    std::thread detached_thread([]() {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        std::cout << "Detached thread finished" << std::endl;
    });
    
    detached_thread.detach();  // 分离线程
    
    // 主线程等待一段时间确保detached线程有机会执行
    std::this_thread::sleep_for(std::chrono::seconds(2));
    
    return 0;
}

2.3 线程异常处理

cpp
#include <thread>
#include <exception>
#include <iostream>

class ThreadRAII {
private:
    std::thread& t;
    
public:
    enum class DtorAction { join, detach };
    
    ThreadRAII(std::thread& t_, DtorAction action_) 
        : t(t_), action(action_) {}
    
    ~ThreadRAII() {
        if (t.joinable()) {
            if (action == DtorAction::join) {
                t.join();
            } else {
                t.detach();
            }
        }
    }
    
    ThreadRAII(const ThreadRAII&) = delete;
    ThreadRAII& operator=(const ThreadRAII&) = delete;
    
private:
    DtorAction action;
};

void risky_function() {
    // 可能抛出异常的函数
    throw std::runtime_error("Something went wrong!");
}

int main() {
    try {
        std::thread t([]() {
            std::cout << "Background thread running..." << std::endl;
            std::this_thread::sleep_for(std::chrono::seconds(3));
            std::cout << "Background thread finished" << std::endl;
        });
        
        // 使用RAII确保线程被正确管理
        ThreadRAII raii(t, ThreadRAII::DtorAction::join);
        
        // 执行可能抛出异常的代码
        risky_function();
        
    } catch (const std::exception& e) {
        std::cout << "Exception caught: " << e.what() << std::endl;
        // ThreadRAII的析构函数会自动调用join()
    }
    
    return 0;
}

3. POSIX线程(pthread)

3.1 pthread基础操作

cpp
#include <pthread.h>
#include <iostream>
#include <unistd.h>

// 线程函数
void* thread_function(void* arg) {
    int thread_id = *(static_cast<int*>(arg));
    
    std::cout << "Thread " << thread_id << " started" << std::endl;
    
    // 模拟工作
    sleep(2);
    
    std::cout << "Thread " << thread_id << " finished" << std::endl;
    
    // 返回值
    int* result = new int(thread_id * 100);
    pthread_exit(result);
}

int main() {
    const int NUM_THREADS = 3;
    pthread_t threads[NUM_THREADS];
    int thread_ids[NUM_THREADS];
    
    // 创建线程
    for (int i = 0; i < NUM_THREADS; i++) {
        thread_ids[i] = i;
        int result = pthread_create(&threads[i], NULL, thread_function, &thread_ids[i]);
        if (result != 0) {
            std::cerr << "Error creating thread " << i << std::endl;
            return 1;
        }
    }
    
    // 等待线程完成并获取返回值
    for (int i = 0; i < NUM_THREADS; i++) {
        void* thread_result;
        int result = pthread_join(threads[i], &thread_result);
        if (result == 0) {
            int* value = static_cast<int*>(thread_result);
            std::cout << "Thread " << i << " returned: " << *value << std::endl;
            delete value;  // 释放动态分配的内存
        }
    }
    
    std::cout << "All threads completed" << std::endl;
    return 0;
}

3.2 pthread属性设置

cpp
#include <pthread.h>
#include <iostream>

void* detached_thread_function(void* arg) {
    std::cout << "Detached thread running..." << std::endl;
    sleep(1);
    std::cout << "Detached thread finished" << std::endl;
    return NULL;
}

void* stacksize_thread_function(void* arg) {
    char large_array[1024 * 1024];  // 1MB数组
    large_array[0] = 'A';
    std::cout << "Large stack thread completed" << std::endl;
    return NULL;
}

int main() {
    pthread_attr_t attr;
    pthread_t thread;
    
    // 初始化线程属性
    pthread_attr_init(&attr);
    
    // 1. 设置为分离线程
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    pthread_create(&thread, &attr, detached_thread_function, NULL);
    // 分离线程不需要join
    
    // 2. 设置栈大小
    pthread_attr_t stack_attr;
    pthread_attr_init(&stack_attr);
    
    size_t stack_size = 2 * 1024 * 1024;  // 2MB栈空间
    pthread_attr_setstacksize(&stack_attr, stack_size);
    
    pthread_t stack_thread;
    pthread_create(&stack_thread, &stack_attr, stacksize_thread_function, NULL);
    pthread_join(stack_thread, NULL);
    
    // 3. 设置调度策略和优先级
    pthread_attr_t sched_attr;
    pthread_attr_init(&sched_attr);
    
    // 设置调度策略
    pthread_attr_setschedpolicy(&sched_attr, SCHED_FIFO);
    
    // 设置优先级
    struct sched_param param;
    param.sched_priority = 10;
    pthread_attr_setschedparam(&sched_attr, &param);
    
    // 清理属性对象
    pthread_attr_destroy(&attr);
    pthread_attr_destroy(&stack_attr);
    pthread_attr_destroy(&sched_attr);
    
    sleep(2);  // 等待分离线程完成
    return 0;
}

4. 线程同步与互斥

4.1 std::mutex使用

cpp
#include <thread>
#include <mutex>
#include <iostream>
#include <vector>

class Counter {
private:
    int count;
    std::mutex mtx;
    
public:
    Counter() : count(0) {}
    
    void increment() {
        std::lock_guard<std::mutex> lock(mtx);
        ++count;
        std::cout << "Count: " << count << " (Thread: " 
                  << std::this_thread::get_id() << ")" << std::endl;
    }
    
    void increment_manual() {
        mtx.lock();
        ++count;
        std::cout << "Manual Count: " << count << std::endl;
        mtx.unlock();
    }
    
    void increment_unique() {
        std::unique_lock<std::mutex> lock(mtx);
        ++count;
        
        // 可以提前释放锁
        lock.unlock();
        
        // 进行一些不需要锁保护的操作
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
        
        // 可以重新获取锁
        lock.lock();
        std::cout << "Unique Count: " << count << std::endl;
    }
    
    int get_count() const {
        std::lock_guard<std::mutex> lock(mtx);
        return count;
    }
};

int main() {
    Counter counter;
    std::vector<std::thread> threads;
    
    // 创建多个线程同时操作计数器
    for (int i = 0; i < 5; i++) {
        threads.emplace_back([&counter]() {
            for (int j = 0; j < 3; j++) {
                counter.increment();
                std::this_thread::sleep_for(std::chrono::milliseconds(100));
            }
        });
    }
    
    // 等待所有线程完成
    for (auto& t : threads) {
        t.join();
    }
    
    std::cout << "Final count: " << counter.get_count() << std::endl;
    return 0;
}

4.2 读写锁(shared_mutex)

cpp
#include <thread>
#include <shared_mutex>
#include <iostream>
#include <vector>
#include <string>

class ThreadSafeMap {
private:
    std::map<int, std::string> data;
    mutable std::shared_mutex rw_mutex;
    
public:
    void write(int key, const std::string& value) {
        std::unique_lock<std::shared_mutex> lock(rw_mutex);
        data[key] = value;
        std::cout << "Written: " << key << " -> " << value 
                  << " (Thread: " << std::this_thread::get_id() << ")" << std::endl;
    }
    
    std::string read(int key) const {
        std::shared_lock<std::shared_mutex> lock(rw_mutex);
        auto it = data.find(key);
        if (it != data.end()) {
            std::cout << "Read: " << key << " -> " << it->second 
                      << " (Thread: " << std::this_thread::get_id() << ")" << std::endl;
            return it->second;
        }
        return "";
    }
    
    size_t size() const {
        std::shared_lock<std::shared_mutex> lock(rw_mutex);
        return data.size();
    }
};

int main() {
    ThreadSafeMap map;
    std::vector<std::thread> threads;
    
    // 写线程
    for (int i = 0; i < 3; i++) {
        threads.emplace_back([&map, i]() {
            for (int j = 0; j < 3; j++) {
                map.write(i * 10 + j, "value_" + std::to_string(i * 10 + j));
                std::this_thread::sleep_for(std::chrono::milliseconds(100));
            }
        });
    }
    
    // 读线程
    for (int i = 0; i < 5; i++) {
        threads.emplace_back([&map]() {
            for (int j = 0; j < 10; j++) {
                map.read(j);
                std::this_thread::sleep_for(std::chrono::milliseconds(50));
            }
        });
    }
    
    for (auto& t : threads) {
        t.join();
    }
    
    std::cout << "Final map size: " << map.size() << std::endl;
    return 0;
}

4.3 条件变量(condition_variable)

cpp
#include <thread>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <iostream>

template<typename T>
class ThreadSafeQueue {
private:
    std::queue<T> queue;
    mutable std::mutex mtx;
    std::condition_variable cv;
    
public:
    void push(const T& item) {
        std::lock_guard<std::mutex> lock(mtx);
        queue.push(item);
        cv.notify_one();  // 通知等待的线程
    }
    
    bool pop(T& item) {
        std::unique_lock<std::mutex> lock(mtx);
        if (queue.empty()) {
            return false;
        }
        item = queue.front();
        queue.pop();
        return true;
    }
    
    void wait_and_pop(T& item) {
        std::unique_lock<std::mutex> lock(mtx);
        // 等待直到队列不为空
        cv.wait(lock, [this] { return !queue.empty(); });
        item = queue.front();
        queue.pop();
    }
    
    bool wait_for_pop(T& item, int timeout_ms) {
        std::unique_lock<std::mutex> lock(mtx);
        if (cv.wait_for(lock, std::chrono::milliseconds(timeout_ms),
                       [this] { return !queue.empty(); })) {
            item = queue.front();
            queue.pop();
            return true;
        }
        return false;
    }
    
    bool empty() const {
        std::lock_guard<std::mutex> lock(mtx);
        return queue.empty();
    }
    
    size_t size() const {
        std::lock_guard<std::mutex> lock(mtx);
        return queue.size();
    }
};

int main() {
    ThreadSafeQueue<int> queue;
    
    // 生产者线程
    std::thread producer([&queue]() {
        for (int i = 0; i < 10; i++) {
            queue.push(i);
            std::cout << "Produced: " << i << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
    });
    
    // 消费者线程1
    std::thread consumer1([&queue]() {
        int item;
        for (int i = 0; i < 5; i++) {
            queue.wait_and_pop(item);
            std::cout << "Consumer1 got: " << item << std::endl;
        }
    });
    
    // 消费者线程2
    std::thread consumer2([&queue]() {
        int item;
        for (int i = 0; i < 5; i++) {
            if (queue.wait_for_pop(item, 200)) {
                std::cout << "Consumer2 got: " << item << std::endl;
            } else {
                std::cout << "Consumer2 timeout" << std::endl;
            }
        }
    });
    
    producer.join();
    consumer1.join();
    consumer2.join();
    
    return 0;
}

4.4 pthread互斥锁和条件变量

cpp
#include <pthread.h>
#include <iostream>
#include <unistd.h>

class PthreadCounter {
private:
    int count;
    pthread_mutex_t mutex;
    pthread_cond_t condition;
    
public:
    PthreadCounter() : count(0) {
        pthread_mutex_init(&mutex, NULL);
        pthread_cond_init(&condition, NULL);
    }
    
    ~PthreadCounter() {
        pthread_mutex_destroy(&mutex);
        pthread_cond_destroy(&condition);
    }
    
    void increment() {
        pthread_mutex_lock(&mutex);
        count++;
        std::cout << "Count incremented to: " << count << std::endl;
        pthread_cond_signal(&condition);  // 通知等待的线程
        pthread_mutex_unlock(&mutex);
    }
    
    void wait_for_count(int target) {
        pthread_mutex_lock(&mutex);
        while (count < target) {
            std::cout << "Waiting for count to reach " << target 
                      << " (current: " << count << ")" << std::endl;
            pthread_cond_wait(&condition, &mutex);
        }
        std::cout << "Count reached " << target << "!" << std::endl;
        pthread_mutex_unlock(&mutex);
    }
    
    int get_count() {
        pthread_mutex_lock(&mutex);
        int current = count;
        pthread_mutex_unlock(&mutex);
        return current;
    }
};

// 包装器用于传递参数给pthread
struct ThreadData {
    PthreadCounter* counter;
    int target;
};

void* incrementer_thread(void* arg) {
    PthreadCounter* counter = static_cast<PthreadCounter*>(arg);
    
    for (int i = 0; i < 5; i++) {
        sleep(1);
        counter->increment();
    }
    
    return NULL;
}

void* waiter_thread(void* arg) {
    ThreadData* data = static_cast<ThreadData*>(arg);
    data->counter->wait_for_count(data->target);
    return NULL;
}

int main() {
    PthreadCounter counter;
    pthread_t inc_thread, wait_thread;
    
    ThreadData wait_data = { &counter, 3 };
    
    // 创建等待线程
    pthread_create(&wait_thread, NULL, waiter_thread, &wait_data);
    
    // 创建递增线程
    pthread_create(&inc_thread, NULL, incrementer_thread, &counter);
    
    // 等待线程完成
    pthread_join(wait_thread, NULL);
    pthread_join(inc_thread, NULL);
    
    std::cout << "Final count: " << counter.get_count() << std::endl;
    return 0;
}

5. 线程通信机制

5.1 基于共享内存的通信

cpp
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <vector>
#include <atomic>

// 生产者-消费者模式
template<typename T>
class BoundedBuffer {
private:
    std::vector<T> buffer;
    size_t capacity;
    size_t front;
    size_t rear;
    size_t count;
    
    std::mutex mtx;
    std::condition_variable not_full;
    std::condition_variable not_empty;
    
public:
    BoundedBuffer(size_t size) 
        : capacity(size), front(0), rear(0), count(0) {
        buffer.resize(capacity);
    }
    
    void produce(const T& item) {
        std::unique_lock<std::mutex> lock(mtx);
        
        // 等待缓冲区不满
        not_full.wait(lock, [this] { return count != capacity; });
        
        buffer[rear] = item;
        rear = (rear + 1) % capacity;
        ++count;
        
        std::cout << "Produced: " << item 
                  << " (buffer size: " << count << ")" << std::endl;
        
        not_empty.notify_one();  // 通知消费者
    }
    
    T consume() {
        std::unique_lock<std::mutex> lock(mtx);
        
        // 等待缓冲区不空
        not_empty.wait(lock, [this] { return count != 0; });
        
        T item = buffer[front];
        front = (front + 1) % capacity;
        --count;
        
        std::cout << "Consumed: " << item 
                  << " (buffer size: " << count << ")" << std::endl;
        
        not_full.notify_one();  // 通知生产者
        return item;
    }
};

int main() {
    BoundedBuffer<int> buffer(5);
    
    // 生产者线程
    std::thread producer([&buffer]() {
        for (int i = 0; i < 10; i++) {
            buffer.produce(i);
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
    });
    
    // 消费者线程
    std::thread consumer([&buffer]() {
        for (int i = 0; i < 10; i++) {
            int item = buffer.consume();
            std::this_thread::sleep_for(std::chrono::milliseconds(150));
        }
    });
    
    producer.join();
    consumer.join();
    
    return 0;
}

5.2 基于消息传递的通信

cpp
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <iostream>

// 消息类型
struct Message {
    enum Type { DATA, COMMAND, QUIT };
    
    Type type;
    int data;
    std::string command;
    
    Message(Type t, int d = 0, const std::string& cmd = "") 
        : type(t), data(d), command(cmd) {}
};

// 消息队列
class MessageQueue {
private:
    std::queue<Message> messages;
    std::mutex mtx;
    std::condition_variable cv;
    bool shutdown;
    
public:
    MessageQueue() : shutdown(false) {}
    
    void send(const Message& msg) {
        std::lock_guard<std::mutex> lock(mtx);
        if (!shutdown) {
            messages.push(msg);
            cv.notify_one();
        }
    }
    
    bool receive(Message& msg, int timeout_ms = -1) {
        std::unique_lock<std::mutex> lock(mtx);
        
        if (timeout_ms < 0) {
            // 无限等待
            cv.wait(lock, [this] { return !messages.empty() || shutdown; });
        } else {
            // 超时等待
            if (!cv.wait_for(lock, std::chrono::milliseconds(timeout_ms),
                           [this] { return !messages.empty() || shutdown; })) {
                return false;  // 超时
            }
        }
        
        if (shutdown && messages.empty()) {
            return false;
        }
        
        msg = messages.front();
        messages.pop();
        return true;
    }
    
    void close() {
        std::lock_guard<std::mutex> lock(mtx);
        shutdown = true;
        cv.notify_all();
    }
};

// 工作线程类
class Worker {
private:
    MessageQueue& queue;
    std::thread worker_thread;
    bool running;
    
public:
    Worker(MessageQueue& q) : queue(q), running(true) {
        worker_thread = std::thread(&Worker::work_loop, this);
    }
    
    ~Worker() {
        if (worker_thread.joinable()) {
            worker_thread.join();
        }
    }
    
private:
    void work_loop() {
        Message msg(Message::DATA);
        
        while (running) {
            if (queue.receive(msg, 1000)) {  // 1秒超时
                switch (msg.type) {
                    case Message::DATA:
                        process_data(msg.data);
                        break;
                    case Message::COMMAND:
                        execute_command(msg.command);
                        break;
                    case Message::QUIT:
                        running = false;
                        break;
                }
            } else {
                std::cout << "Worker timeout, checking status..." << std::endl;
            }
        }
        
        std::cout << "Worker thread terminated" << std::endl;
    }
    
    void process_data(int data) {
        std::cout << "Processing data: " << data << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    
    void execute_command(const std::string& cmd) {
        std::cout << "Executing command: " << cmd << std::endl;
    }
};

int main() {
    MessageQueue queue;
    Worker worker(queue);
    
    // 发送一些消息
    for (int i = 0; i < 5; i++) {
        queue.send(Message(Message::DATA, i * 10));
        std::this_thread::sleep_for(std::chrono::milliseconds(50));
    }
    
    queue.send(Message(Message::COMMAND, 0, "status"));
    queue.send(Message(Message::COMMAND, 0, "restart"));
    
    // 发送退出消息
    std::this_thread::sleep_for(std::chrono::seconds(1));
    queue.send(Message(Message::QUIT));
    
    // 等待工作线程结束
    std::this_thread::sleep_for(std::chrono::seconds(2));
    queue.close();
    
    return 0;
}

6. 线程池设计

6.1 简单线程池实现

cpp
#include <thread>
#include <vector>
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <future>
#include <atomic>
#include <iostream>

class SimpleThreadPool {
private:
    std::vector<std::thread> workers;
    std::queue<std::function<void()>> tasks;
    
    std::mutex queue_mutex;
    std::condition_variable condition;
    std::atomic<bool> stop;
    
public:
    SimpleThreadPool(size_t threads) : stop(false) {
        for (size_t i = 0; i < threads; ++i) {
            workers.emplace_back([this] {
                while (true) {
                    std::function<void()> task;
                    
                    {
                        std::unique_lock<std::mutex> lock(queue_mutex);
                        condition.wait(lock, [this] { 
                            return stop || !tasks.empty(); 
                        });
                        
                        if (stop && tasks.empty()) {
                            return;
                        }
                        
                        task = std::move(tasks.front());
                        tasks.pop();
                    }
                    
                    task();
                }
            });
        }
    }
    
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) 
        -> std::future<typename std::result_of<F(Args...)>::type> {
        
        using return_type = typename std::result_of<F(Args...)>::type;
        
        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        
        std::future<return_type> res = task->get_future();
        
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            
            if (stop) {
                throw std::runtime_error("enqueue on stopped ThreadPool");
            }
            
            tasks.emplace([task]() { (*task)(); });
        }
        
        condition.notify_one();
        return res;
    }
    
    ~SimpleThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop = true;
        }
        
        condition.notify_all();
        
        for (std::thread &worker: workers) {
            worker.join();
        }
    }
};

// 使用示例
int calculate(int x, int y) {
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    return x + y;
}

void print_message(const std::string& msg) {
    std::cout << "Message: " << msg << " (Thread: " 
              << std::this_thread::get_id() << ")" << std::endl;
}

int main() {
    SimpleThreadPool pool(4);
    std::vector<std::future<int>> results;
    
    // 提交计算任务
    for (int i = 0; i < 8; i++) {
        results.emplace_back(
            pool.enqueue(calculate, i, i * 2)
        );
    }
    
    // 提交打印任务
    for (int i = 0; i < 5; i++) {
        pool.enqueue(print_message, "Hello " + std::to_string(i));
    }
    
    // 获取计算结果
    for (auto& result : results) {
        std::cout << "Result: " << result.get() << std::endl;
    }
    
    return 0;
}

6.2 高级线程池实现

cpp
#include <thread>
#include <vector>
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <future>
#include <atomic>
#include <chrono>
#include <iostream>

class AdvancedThreadPool {
public:
    enum class Priority { LOW, NORMAL, HIGH };
    
private:
    struct Task {
        std::function<void()> function;
        Priority priority;
        std::chrono::steady_clock::time_point submit_time;
        
        Task(std::function<void()> f, Priority p) 
            : function(std::move(f)), priority(p), 
              submit_time(std::chrono::steady_clock::now()) {}
        
        bool operator<(const Task& other) const {
            if (priority != other.priority) {
                return priority < other.priority;  // 高优先级在前
            }
            return submit_time > other.submit_time;  // 早提交的在前
        }
    };
    
    std::vector<std::thread> workers;
    std::priority_queue<Task> tasks;
    
    std::mutex queue_mutex;
    std::condition_variable condition;
    std::atomic<bool> stop;
    
    // 统计信息
    std::atomic<size_t> active_threads;
    std::atomic<size_t> completed_tasks;
    
public:
    AdvancedThreadPool(size_t threads) 
        : stop(false), active_threads(0), completed_tasks(0) {
        
        for (size_t i = 0; i < threads; ++i) {
            workers.emplace_back([this] {
                while (true) {
                    Task task([](){}, Priority::LOW);
                    
                    {
                        std::unique_lock<std::mutex> lock(queue_mutex);
                        condition.wait(lock, [this] { 
                            return stop || !tasks.empty(); 
                        });
                        
                        if (stop && tasks.empty()) {
                            return;
                        }
                        
                        task = std::move(const_cast<Task&>(tasks.top()));
                        tasks.pop();
                    }
                    
                    ++active_threads;
                    task.function();
                    ++completed_tasks;
                    --active_threads;
                }
            });
        }
    }
    
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args, Priority priority = Priority::NORMAL) 
        -> std::future<typename std::result_of<F(Args...)>::type> {
        
        using return_type = typename std::result_of<F(Args...)>::type;
        
        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        
        std::future<return_type> res = task->get_future();
        
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            
            if (stop) {
                throw std::runtime_error("enqueue on stopped ThreadPool");
            }
            
            tasks.emplace([task]() { (*task)(); }, priority);
        }
        
        condition.notify_one();
        return res;
    }
    
    size_t get_queue_size() const {
        std::unique_lock<std::mutex> lock(queue_mutex);
        return tasks.size();
    }
    
    size_t get_active_threads() const {
        return active_threads.load();
    }
    
    size_t get_completed_tasks() const {
        return completed_tasks.load();
    }
    
    void wait_for_completion() {
        while (get_queue_size() > 0 || get_active_threads() > 0) {
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
        }
    }
    
    ~AdvancedThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop = true;
        }
        
        condition.notify_all();
        
        for (std::thread &worker: workers) {
            worker.join();
        }
    }
};

// 测试函数
void heavy_task(int id, int duration_ms) {
    std::cout << "Task " << id << " started (duration: " 
              << duration_ms << "ms)" << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms));
    std::cout << "Task " << id << " completed" << std::endl;
}

int main() {
    AdvancedThreadPool pool(3);
    
    // 提交不同优先级的任务
    std::cout << "Submitting tasks with different priorities..." << std::endl;
    
    // 低优先级任务
    for (int i = 0; i < 3; i++) {
        pool.enqueue(heavy_task, i, 200, AdvancedThreadPool::Priority::LOW);
    }
    
    // 正常优先级任务
    for (int i = 10; i < 13; i++) {
        pool.enqueue(heavy_task, i, 150, AdvancedThreadPool::Priority::NORMAL);
    }
    
    // 高优先级任务
    for (int i = 20; i < 23; i++) {
        pool.enqueue(heavy_task, i, 100, AdvancedThreadPool::Priority::HIGH);
    }
    
    // 监控状态
    std::thread monitor([&pool]() {
        while (pool.get_queue_size() > 0 || pool.get_active_threads() > 0) {
            std::cout << "Queue size: " << pool.get_queue_size() 
                      << ", Active threads: " << pool.get_active_threads()
                      << ", Completed: " << pool.get_completed_tasks() << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
    });
    
    pool.wait_for_completion();
    monitor.join();
    
    std::cout << "All tasks completed. Total: " 
              << pool.get_completed_tasks() << std::endl;
    
    return 0;
}

7. 线程安全与原子操作

7.1 std::atomic基础用法

cpp
#include <atomic>
#include <thread>
#include <vector>
#include <iostream>
#include <chrono>

class AtomicCounter {
private:
    std::atomic<int> count;
    std::atomic<bool> ready;
    
public:
    AtomicCounter() : count(0), ready(false) {}
    
    void increment() {
        count.fetch_add(1, std::memory_order_relaxed);
    }
    
    void decrement() {
        count.fetch_sub(1, std::memory_order_relaxed);
    }
    
    int get_count() const {
        return count.load(std::memory_order_relaxed);
    }
    
    void set_ready() {
        ready.store(true, std::memory_order_release);
    }
    
    bool is_ready() const {
        return ready.load(std::memory_order_acquire);
    }
    
    // 原子比较交换
    bool compare_and_set(int expected, int new_value) {
        return count.compare_exchange_strong(expected, new_value);
    }
};

// 无锁栈实现
template<typename T>
class LockFreeStack {
private:
    struct Node {
        T data;
        Node* next;
        Node(const T& data_) : data(data_), next(nullptr) {}
    };
    
    std::atomic<Node*> head;
    
public:
    LockFreeStack() : head(nullptr) {}
    
    void push(const T& data) {
        Node* new_node = new Node(data);
        new_node->next = head.load();
        
        // 原子地更新头指针
        while (!head.compare_exchange_weak(new_node->next, new_node)) {
            // 如果失败,new_node->next已经被更新为当前的head值
            // 重试
        }
    }
    
    bool pop(T& result) {
        Node* old_head = head.load();
        
        while (old_head && 
               !head.compare_exchange_weak(old_head, old_head->next)) {
            // 重试
        }
        
        if (old_head) {
            result = old_head->data;
            delete old_head;
            return true;
        }
        
        return false;
    }
    
    bool empty() const {
        return head.load() == nullptr;
    }
    
    ~LockFreeStack() {
        T dummy;
        while (pop(dummy)) {
            // 清空栈
        }
    }
};

int main() {
    AtomicCounter counter;
    LockFreeStack<int> stack;
    
    std::vector<std::thread> threads;
    
    // 测试原子计数器
    for (int i = 0; i < 5; i++) {
        threads.emplace_back([&counter, i]() {
            for (int j = 0; j < 1000; j++) {
                counter.increment();
            }
            std::cout << "Thread " << i << " finished incrementing" << std::endl;
        });
    }
    
    // 测试无锁栈
    threads.emplace_back([&stack]() {
        for (int i = 0; i < 100; i++) {
            stack.push(i);
        }
        std::cout << "Producer finished" << std::endl;
    });
    
    threads.emplace_back([&stack]() {
        int value;
        int count = 0;
        while (count < 100) {
            if (stack.pop(value)) {
                count++;
                if (count % 20 == 0) {
                    std::cout << "Consumer popped: " << value 
                              << " (total: " << count << ")" << std::endl;
                }
            } else {
                std::this_thread::sleep_for(std::chrono::microseconds(1));
            }
        }
        std::cout << "Consumer finished" << std::endl;
    });
    
    for (auto& t : threads) {
        t.join();
    }
    
    std::cout << "Final counter value: " << counter.get_count() << std::endl;
    std::cout << "Stack empty: " << stack.empty() << std::endl;
    
    return 0;
}

7.2 内存序列和同步

cpp
#include <atomic>
#include <thread>
#include <iostream>
#include <cassert>

class MemoryOrderExample {
private:
    std::atomic<int> data;
    std::atomic<bool> ready;
    
public:
    MemoryOrderExample() : data(0), ready(false) {}
    
    // 顺序一致性(默认)
    void sequential_consistency_writer() {
        data.store(42);
        ready.store(true);
    }
    
    void sequential_consistency_reader() {
        while (!ready.load()) {
            std::this_thread::yield();
        }
        assert(data.load() == 42);  // 保证能读到42
    }
    
    // 获取-释放语义
    void acquire_release_writer() {
        data.store(42, std::memory_order_relaxed);
        ready.store(true, std::memory_order_release);  // 释放操作
    }
    
    void acquire_release_reader() {
        while (!ready.load(std::memory_order_acquire)) {  // 获取操作
            std::this_thread::yield();
        }
        assert(data.load(std::memory_order_relaxed) == 42);  // 保证能读到42
    }
    
    // 宽松序列
    void relaxed_writer() {
        data.store(42, std::memory_order_relaxed);
        ready.store(true, std::memory_order_relaxed);
    }
    
    void relaxed_reader() {
        while (!ready.load(std::memory_order_relaxed)) {
            std::this_thread::yield();
        }
        // 注意:这里不保证能读到42,因为使用了relaxed语义
        int value = data.load(std::memory_order_relaxed);
        std::cout << "Read value: " << value << std::endl;
    }
    
    void reset() {
        data.store(0);
        ready.store(false);
    }
};

// Dekker算法实现(使用原子操作)
class DekkerMutex {
private:
    std::atomic<bool> flag[2];
    std::atomic<int> turn;
    
public:
    DekkerMutex() : turn(0) {
        flag[0].store(false);
        flag[1].store(false);
    }
    
    void lock(int id) {
        int other = 1 - id;
        flag[id].store(true, std::memory_order_relaxed);
        
        while (flag[other].load(std::memory_order_relaxed)) {
            if (turn.load(std::memory_order_relaxed) != id) {
                flag[id].store(false, std::memory_order_relaxed);
                while (turn.load(std::memory_order_relaxed) != id) {
                    std::this_thread::yield();
                }
                flag[id].store(true, std::memory_order_relaxed);
            }
        }
    }
    
    void unlock(int id) {
        turn.store(1 - id, std::memory_order_relaxed);
        flag[id].store(false, std::memory_order_relaxed);
    }
};

int shared_counter = 0;
DekkerMutex dekker_mutex;

void dekker_test_thread(int id) {
    for (int i = 0; i < 10000; i++) {
        dekker_mutex.lock(id);
        shared_counter++;  // 临界区
        dekker_mutex.unlock(id);
    }
}

int main() {
    MemoryOrderExample example;
    
    // 测试顺序一致性
    std::cout << "Testing sequential consistency..." << std::endl;
    std::thread writer1(&MemoryOrderExample::sequential_consistency_writer, &example);
    std::thread reader1(&MemoryOrderExample::sequential_consistency_reader, &example);
    writer1.join();
    reader1.join();
    
    example.reset();
    
    // 测试获取-释放语义
    std::cout << "Testing acquire-release..." << std::endl;
    std::thread writer2(&MemoryOrderExample::acquire_release_writer, &example);
    std::thread reader2(&MemoryOrderExample::acquire_release_reader, &example);
    writer2.join();
    reader2.join();
    
    example.reset();
    
    // 测试宽松序列
    std::cout << "Testing relaxed ordering..." << std::endl;
    std::thread writer3(&MemoryOrderExample::relaxed_writer, &example);
    std::thread reader3(&MemoryOrderExample::relaxed_reader, &example);
    writer3.join();
    reader3.join();
    
    // 测试Dekker互斥算法
    std::cout << "Testing Dekker mutex..." << std::endl;
    std::thread t1(dekker_test_thread, 0);
    std::thread t2(dekker_test_thread, 1);
    t1.join();
    t2.join();
    
    std::cout << "Final counter value: " << shared_counter << std::endl;
    std::cout << "Expected: " << 20000 << std::endl;
    
    return 0;
}

8. 高级线程编程

8.1 线程局部存储(Thread Local Storage)

cpp
#include <thread>
#include <iostream>
#include <vector>
#include <random>

// 全局线程局部变量
thread_local int tls_counter = 0;
thread_local std::string tls_name;

class ThreadLocalExample {
private:
    thread_local static int instance_count;
    int id;
    
public:
    ThreadLocalExample() : id(++instance_count) {
        std::cout << "ThreadLocalExample created with ID: " << id 
                  << " in thread: " << std::this_thread::get_id() << std::endl;
    }
    
    int get_id() const { return id; }
    static int get_instance_count() { return instance_count; }
};

// 必须在类外定义thread_local静态成员
thread_local int ThreadLocalExample::instance_count = 0;

void worker_function(int thread_id) {
    // 设置线程局部变量
    tls_name = "Thread_" + std::to_string(thread_id);
    
    // 创建线程局部对象
    ThreadLocalExample obj1;
    ThreadLocalExample obj2;
    
    // 执行一些工作
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<> dis(1, 10);
    
    for (int i = 0; i < 5; i++) {
        tls_counter += dis(gen);
        std::cout << tls_name << " - Counter: " << tls_counter 
                  << ", Objects: " << ThreadLocalExample::get_instance_count() 
                  << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    
    std::cout << tls_name << " finished. Final counter: " << tls_counter << std::endl;
}

int main() {
    std::vector<std::thread> threads;
    
    // 创建多个线程
    for (int i = 0; i < 4; i++) {
        threads.emplace_back(worker_function, i);
    }
    
    // 主线程也有自己的线程局部存储
    tls_name = "Main_Thread";
    tls_counter = 1000;
    ThreadLocalExample main_obj;
    
    std::cout << "Main thread - Counter: " << tls_counter 
              << ", Objects: " << ThreadLocalExample::get_instance_count() 
              << std::endl;
    
    // 等待所有线程完成
    for (auto& t : threads) {
        t.join();
    }
    
    std::cout << "Main thread final counter: " << tls_counter << std::endl;
    
    return 0;
}

8.2 协程式任务调度

cpp
#include <thread>
#include <future>
#include <functional>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <chrono>

// 简单的协作式调度器
class CooperativeScheduler {
public:
    using Task = std::function<void()>;
    
private:
    std::queue<Task> tasks;
    std::mutex task_mutex;
    std::condition_variable cv;
    std::atomic<bool> running;
    std::vector<std::thread> workers;
    
public:
    CooperativeScheduler(size_t num_workers = 1) : running(true) {
        for (size_t i = 0; i < num_workers; ++i) {
            workers.emplace_back([this]() { worker_loop(); });
        }
    }
    
    void schedule(Task task) {
        {
            std::lock_guard<std::mutex> lock(task_mutex);
            tasks.push(std::move(task));
        }
        cv.notify_one();
    }
    
    template<typename Callable>
    auto async_task(Callable&& callable) -> std::future<decltype(callable())> {
        using return_type = decltype(callable());
        
        auto promise = std::make_shared<std::promise<return_type>>();
        auto future = promise->get_future();
        
        schedule([promise, callable = std::forward<Callable>(callable)]() {
            try {
                if constexpr (std::is_void_v<return_type>) {
                    callable();
                    promise->set_value();
                } else {
                    promise->set_value(callable());
                }
            } catch (...) {
                promise->set_exception(std::current_exception());
            }
        });
        
        return future;
    }
    
    void yield() {
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
    }
    
    void shutdown() {
        {
            std::lock_guard<std::mutex> lock(task_mutex);
            running = false;
        }
        cv.notify_all();
        
        for (auto& worker : workers) {
            if (worker.joinable()) {
                worker.join();
            }
        }
    }
    
    ~CooperativeScheduler() {
        shutdown();
    }
    
private:
    void worker_loop() {
        while (running) {
            Task task;
            
            {
                std::unique_lock<std::mutex> lock(task_mutex);
                cv.wait(lock, [this] { return !tasks.empty() || !running; });
                
                if (!running && tasks.empty()) {
                    break;
                }
                
                if (!tasks.empty()) {
                    task = std::move(tasks.front());
                    tasks.pop();
                }
            }
            
            if (task) {
                task();
            }
        }
    }
};

// 使用示例:模拟协程式的异步操作
class AsyncFileProcessor {
private:
    CooperativeScheduler& scheduler;
    
public:
    AsyncFileProcessor(CooperativeScheduler& sched) : scheduler(sched) {}
    
    std::future<std::string> read_file_async(const std::string& filename) {
        return scheduler.async_task([this, filename]() -> std::string {
            std::cout << "Starting to read file: " << filename << std::endl;
            
            // 模拟文件读取
            for (int i = 0; i < 5; i++) {
                std::cout << "Reading chunk " << i << " of " << filename << std::endl;
                scheduler.yield();  // 让出执行权
            }
            
            std::cout << "Finished reading file: " << filename << std::endl;
            return "Content of " + filename;
        });
    }
    
    std::future<void> process_file_async(const std::string& content) {
        return scheduler.async_task([this, content]() {
            std::cout << "Starting to process: " << content << std::endl;
            
            // 模拟处理
            for (int i = 0; i < 3; i++) {
                std::cout << "Processing step " << i << std::endl;
                scheduler.yield();  // 让出执行权
            }
            
            std::cout << "Finished processing: " << content << std::endl;
        });
    }
};

int main() {
    CooperativeScheduler scheduler(2);  // 2个工作线程
    AsyncFileProcessor processor(scheduler);
    
    // 启动多个异步操作
    std::vector<std::future<std::string>> read_futures;
    std::vector<std::future<void>> process_futures;
    
    // 读取多个文件
    for (int i = 0; i < 3; i++) {
        read_futures.push_back(
            processor.read_file_async("file" + std::to_string(i) + ".txt")
        );
    }
    
    // 等待读取完成,然后处理
    for (auto& future : read_futures) {
        std::string content = future.get();
        process_futures.push_back(
            processor.process_file_async(content)
        );
    }
    
    // 等待所有处理完成
    for (auto& future : process_futures) {
        future.get();
    }
    
    std::cout << "All operations completed!" << std::endl;
    
    return 0;
}

8.3 基于事件的异步编程

cpp
#include <thread>
#include <functional>
#include <unordered_map>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <iostream>
#include <chrono>
#include <typeindex>
#include <any>

// 事件系统基础类
class Event {
public:
    virtual ~Event() = default;
    virtual std::type_index get_type() const = 0;
};

template<typename T>
class TypedEvent : public Event {
public:
    T data;
    
    TypedEvent(const T& data_) : data(data_) {}
    
    std::type_index get_type() const override {
        return std::type_index(typeid(T));
    }
};

// 事件总线
class EventBus {
public:
    using EventHandler = std::function<void(const Event&)>;
    
private:
    std::unordered_map<std::type_index, std::vector<EventHandler>> handlers;
    std::queue<std::unique_ptr<Event>> event_queue;
    std::mutex mutex;
    std::condition_variable cv;
    std::atomic<bool> running;
    std::thread dispatcher_thread;
    
public:
    EventBus() : running(true) {
        dispatcher_thread = std::thread([this]() { dispatch_loop(); });
    }
    
    ~EventBus() {
        shutdown();
    }
    
    template<typename T>
    void subscribe(std::function<void(const T&)> handler) {
        std::lock_guard<std::mutex> lock(mutex);
        
        auto wrapper = [handler](const Event& event) {
            const auto& typed_event = static_cast<const TypedEvent<T>&>(event);
            handler(typed_event.data);
        };
        
        handlers[std::type_index(typeid(T))].push_back(wrapper);
    }
    
    template<typename T>
    void publish(const T& event_data) {
        {
            std::lock_guard<std::mutex> lock(mutex);
            event_queue.push(std::make_unique<TypedEvent<T>>(event_data));
        }
        cv.notify_one();
    }
    
    void shutdown() {
        {
            std::lock_guard<std::mutex> lock(mutex);
            running = false;
        }
        cv.notify_one();
        
        if (dispatcher_thread.joinable()) {
            dispatcher_thread.join();
        }
    }
    
private:
    void dispatch_loop() {
        while (running) {
            std::unique_ptr<Event> event;
            
            {
                std::unique_lock<std::mutex> lock(mutex);
                cv.wait(lock, [this] { return !event_queue.empty() || !running; });
                
                if (!running && event_queue.empty()) {
                    break;
                }
                
                if (!event_queue.empty()) {
                    event = std::move(event_queue.front());
                    event_queue.pop();
                }
            }
            
            if (event) {
                auto type = event->get_type();
                std::lock_guard<std::mutex> lock(mutex);
                
                auto it = handlers.find(type);
                if (it != handlers.end()) {
                    for (const auto& handler : it->second) {
                        handler(*event);
                    }
                }
            }
        }
    }
};

// 事件类型定义
struct UserLoginEvent {
    std::string username;
    std::chrono::system_clock::time_point timestamp;
};

struct DataProcessedEvent {
    int process_id;
    std::string result;
    bool success;
};

struct SystemShutdownEvent {
    std::string reason;
};

// 事件处理服务
class UserService {
public:
    void handle_login(const UserLoginEvent& event) {
        std::cout << "UserService: User " << event.username 
                  << " logged in at timestamp" << std::endl;
        
        // 模拟用户验证处理
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
};

class LoggingService {
public:
    void handle_login(const UserLoginEvent& event) {
        std::cout << "LoggingService: Logging user login - " 
                  << event.username << std::endl;
    }
    
    void handle_data_processed(const DataProcessedEvent& event) {
        std::cout << "LoggingService: Process " << event.process_id 
                  << " completed with result: " << event.result 
                  << " (success: " << event.success << ")" << std::endl;
    }
    
    void handle_shutdown(const SystemShutdownEvent& event) {
        std::cout << "LoggingService: System shutting down - " 
                  << event.reason << std::endl;
    }
};

class DataProcessor {
private:
    EventBus& event_bus;
    
public:
    DataProcessor(EventBus& bus) : event_bus(bus) {}
    
    void process_data(int id, const std::string& data) {
        std::cout << "DataProcessor: Processing data for ID " << id << std::endl;
        
        // 模拟数据处理
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
        
        bool success = (id % 2 == 0);  // 偶数ID成功
        std::string result = success ? "processed_" + data : "failed";
        
        // 发布处理完成事件
        event_bus.publish(DataProcessedEvent{id, result, success});
    }
};

int main() {
    EventBus event_bus;
    
    // 创建服务实例
    UserService user_service;
    LoggingService logging_service;
    DataProcessor data_processor(event_bus);
    
    // 订阅事件
    event_bus.subscribe<UserLoginEvent>(
        [&user_service](const UserLoginEvent& event) {
            user_service.handle_login(event);
        }
    );
    
    event_bus.subscribe<UserLoginEvent>(
        [&logging_service](const UserLoginEvent& event) {
            logging_service.handle_login(event);
        }
    );
    
    event_bus.subscribe<DataProcessedEvent>(
        [&logging_service](const DataProcessedEvent& event) {
            logging_service.handle_data_processed(event);
        }
    );
    
    event_bus.subscribe<SystemShutdownEvent>(
        [&logging_service](const SystemShutdownEvent& event) {
            logging_service.handle_shutdown(event);
        }
    );
    
    // 模拟事件发布
    std::vector<std::thread> workers;
    
    // 用户登录事件
    workers.emplace_back([&event_bus]() {
        for (int i = 0; i < 3; i++) {
            event_bus.publish(UserLoginEvent{
                "user" + std::to_string(i),
                std::chrono::system_clock::now()
            });
            std::this_thread::sleep_for(std::chrono::milliseconds(300));
        }
    });
    
    // 数据处理任务
    workers.emplace_back([&data_processor]() {
        for (int i = 0; i < 5; i++) {
            data_processor.process_data(i, "data_" + std::to_string(i));
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
    });
    
    // 等待所有工作完成
    for (auto& worker : workers) {
        worker.join();
    }
    
    // 发布系统关闭事件
    event_bus.publish(SystemShutdownEvent{"Normal shutdown"});
    
    // 等待事件处理完成
    std::this_thread::sleep_for(std::chrono::seconds(1));
    
    std::cout << "All events processed!" << std::endl;
    
    return 0;
}

9. 常见面试题汇总

9.1 基础概念题

Q1: 进程和线程的区别?

  • 内存模型:进程有独立地址空间,线程共享进程地址空间
  • 创建开销:线程创建开销小,进程开销大
  • 通信方式:线程直接共享内存,进程需要IPC
  • 安全性:进程更安全,线程一个崩溃影响整个进程

Q2: 什么是线程安全?

  • 多个线程同时访问共享数据时,程序行为仍然正确
  • 需要通过同步机制(互斥锁、原子操作等)保证
  • 线程安全的代码可以被多个线程并发调用

Q3: C++11线程库 vs pthread的区别?

特性C++11 std::threadpthread
跨平台性跨平台主要Unix/Linux
类型安全类型安全基于void*
异常安全支持异常错误码机制
RAII支持需要手动管理
性能略有开销更接近系统调用

9.2 同步机制题

Q4: mutex、lock_guard、unique_lock的区别?

  • mutex:基础互斥锁,需要手动lock/unlock
  • lock_guard:RAII包装,自动管理锁的生命周期,不可转移
  • unique_lock:功能更强大,支持延迟锁定、条件变量、锁转移

Q5: 什么是死锁?如何避免?

  • 定义:两个或多个线程互相等待对方释放资源

  • 必要条件:互斥、占有且等待、不可抢占、循环等待

  • 避免方法

    • 锁排序:统一加锁顺序
    • 超时机制:try_lock_for
    • 避免嵌套锁定
    • 使用std::lock同时获取多个锁

Q6: 条件变量的虚假唤醒是什么?

  • 线程在没有满足条件的情况下被唤醒
  • 原因:系统优化、信号处理等
  • 解决:总是在循环中检查条件
cpp
// 正确做法
while (!condition) {
    cv.wait(lock);
}

// 错误做法
if (!condition) {
    cv.wait(lock);  // 可能虚假唤醒
}

9.3 原子操作题

Q7: 什么是原子操作?有什么作用?

  • 不可分割的操作,要么全部完成,要么全部不执行
  • 避免数据竞争,实现无锁编程
  • 比互斥锁性能更好,但使用复杂

Q8: memory_order的几种类型?

  • memory_order_relaxed:最宽松,只保证原子性
  • memory_order_acquire:获取语义,读操作
  • memory_order_release:释放语义,写操作
  • memory_order_acq_rel:获取-释放语义
  • memory_order_seq_cst:顺序一致性(默认)

Q9: compare_exchange_strong和compare_exchange_weak的区别?

  • strong:比较失败时一定不会更新
  • weak:可能会有"虚假失败",在某些架构上性能更好
  • weak版本通常在循环中使用

9.4 高级应用题

Q10: 如何设计线程池?

  • 组件:任务队列、工作线程、同步机制
  • 考虑因素:线程数量、任务分发、负载均衡、异常处理
  • 优化:任务窃取、优先级队列、动态调整

Q11: 什么是线程局部存储?使用场景?

  • 每个线程独有的存储空间
  • 使用thread_local关键字
  • 场景:错误码、随机数生成器、缓存等

Q12: 如何实现无锁数据结构?

  • 使用原子操作和CAS(Compare-And-Swap)
  • 处理ABA问题
  • 内存回收问题(使用hazard pointer或epoch-based回收)

10. 实战编程题

10.1 生产者-消费者问题(多种实现)

cpp
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <atomic>
#include <iostream>
#include <vector>

// 方案1:使用mutex和condition_variable
template<typename T>
class BlockingQueue {
private:
    std::queue<T> queue;
    mutable std::mutex mtx;
    std::condition_variable not_empty;
    std::condition_variable not_full;
    size_t capacity;
    
public:
    BlockingQueue(size_t cap) : capacity(cap) {}
    
    void push(const T& item) {
        std::unique_lock<std::mutex> lock(mtx);
        not_full.wait(lock, [this] { return queue.size() < capacity; });
        
        queue.push(item);
        not_empty.notify_one();
    }
    
    T pop() {
        std::unique_lock<std::mutex> lock(mtx);
        not_empty.wait(lock, [this] { return !queue.empty(); });
        
        T item = queue.front();
        queue.pop();
        not_full.notify_one();
        return item;
    }
};

// 方案2:使用原子操作的无锁队列
template<typename T>
class LockFreeQueue {
private:
    struct Node {
        std::atomic<T*> data;
        std::atomic<Node*> next;
        Node() : data(nullptr), next(nullptr) {}
    };
    
    std::atomic<Node*> head;
    std::atomic<Node*> tail;
    
public:
    LockFreeQueue() {
        Node* dummy = new Node;
        head.store(dummy);
        tail.store(dummy);
    }
    
    void enqueue(T item) {
        Node* new_node = new Node;
        T* data = new T(std::move(item));
        new_node->data.store(data);
        
        Node* prev_tail = tail.exchange(new_node);
        prev_tail->next.store(new_node);
    }
    
    bool dequeue(T& result) {
        Node* head_node = head.load();
        Node* next = head_node->next.load();
        
        if (next == nullptr) {
            return false;  // 队列为空
        }
        
        T* data = next->data.load();
        if (data == nullptr) {
            return false;
        }
        
        if (head.compare_exchange_weak(head_node, next)) {
            result = *data;
            delete data;
            delete head_node;
            return true;
        }
        
        return false;
    }
};

// 测试函数
void test_producer_consumer() {
    std::cout << "Testing Producer-Consumer with BlockingQueue..." << std::endl;
    
    BlockingQueue<int> queue(10);
    std::atomic<bool> finished(false);
    
    // 生产者
    std::thread producer([&queue, &finished]() {
        for (int i = 0; i < 20; i++) {
            queue.push(i);
            std::cout << "Produced: " << i << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(50));
        }
        finished = true;
    });
    
    // 消费者
    std::thread consumer([&queue, &finished]() {
        while (!finished || true) {
            try {
                int item = queue.pop();
                std::cout << "Consumed: " << item << std::endl;
                std::this_thread::sleep_for(std::chrono::milliseconds(100));
            } catch (...) {
                if (finished) break;
            }
        }
    });
    
    producer.join();
    std::this_thread::sleep_for(std::chrono::seconds(1));
    consumer.join();
}

10.2 哲学家进餐问题

cpp
#include <thread>
#include <mutex>
#include <vector>
#include <iostream>
#include <chrono>
#include <random>

class DiningPhilosophers {
private:
    static const int NUM_PHILOSOPHERS = 5;
    std::vector<std::mutex> forks;
    std::vector<std::thread> philosophers;
    
public:
    DiningPhilosophers() : forks(NUM_PHILOSOPHERS) {}
    
    void start() {
        for (int i = 0; i < NUM_PHILOSOPHERS; i++) {
            philosophers.emplace_back(&DiningPhilosophers::philosopher_life, this, i);
        }
    }
    
    void join() {
        for (auto& philosopher : philosophers) {
            philosopher.join();
        }
    }
    
private:
    void philosopher_life(int id) {
        std::random_device rd;
        std::mt19937 gen(rd());
        std::uniform_int_distribution<> think_time(100, 500);
        std::uniform_int_distribution<> eat_time(200, 400);
        
        for (int i = 0; i < 5; i++) {  // 每个哲学家吃5次
            think(id, think_time(gen));
            
            // 避免死锁:奇数号哲学家先拿左叉子,偶数号先拿右叉子
            if (id % 2 == 0) {
                pick_up_forks_even(id);
            } else {
                pick_up_forks_odd(id);
            }
            
            eat(id, eat_time(gen));
            put_down_forks(id);
        }
    }
    
    void think(int id, int duration) {
        std::cout << "Philosopher " << id << " is thinking..." << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(duration));
    }
    
    void eat(int id, int duration) {
        std::cout << "Philosopher " << id << " is eating..." << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(duration));
        std::cout << "Philosopher " << id << " finished eating" << std::endl;
    }
    
    void pick_up_forks_even(int id) {
        int left_fork = id;
        int right_fork = (id + 1) % NUM_PHILOSOPHERS;
        
        forks[left_fork].lock();
        std::cout << "Philosopher " << id << " picked up left fork " << left_fork << std::endl;
        
        forks[right_fork].lock();
        std::cout << "Philosopher " << id << " picked up right fork " << right_fork << std::endl;
    }
    
    void pick_up_forks_odd(int id) {
        int left_fork = id;
        int right_fork = (id + 1) % NUM_PHILOSOPHERS;
        
        forks[right_fork].lock();
        std::cout << "Philosopher " << id << " picked up right fork " << right_fork << std::endl;
        
        forks[left_fork].lock();
        std::cout << "Philosopher " << id << " picked up left fork " << left_fork << std::endl;
    }
    
    void put_down_forks(int id) {
        int left_fork = id;
        int right_fork = (id + 1) % NUM_PHILOSOPHERS;
        
        forks[left_fork].unlock();
        forks[right_fork].unlock();
        
        std::cout << "Philosopher " << id << " put down both forks" << std::endl;
    }
};

// 使用std::lock的改进版本
class ImprovedDiningPhilosophers {
private:
    static const int NUM_PHILOSOPHERS = 5;
    std::vector<std::mutex> forks;
    std::vector<std::thread> philosophers;
    
public:
    ImprovedDiningPhilosophers() : forks(NUM_PHILOSOPHERS) {}
    
    void start() {
        for (int i = 0; i < NUM_PHILOSOPHERS; i++) {
            philosophers.emplace_back(&ImprovedDiningPhilosophers::philosopher_life, this, i);
        }
    }
    
    void join() {
        for (auto& philosopher : philosophers) {
            philosopher.join();
        }
    }
    
private:
    void philosopher_life(int id) {
        std::random_device rd;
        std::mt19937 gen(rd());
        std::uniform_int_distribution<> think_time(100, 500);
        std::uniform_int_distribution<> eat_time(200, 400);
        
        for (int i = 0; i < 3; i++) {
            think(id, think_time(gen));
            pick_up_forks(id);
            eat(id, eat_time(gen));
            put_down_forks(id);
        }
    }
    
    void think(int id, int duration) {
        std::cout << "Philosopher " << id << " is thinking..." << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(duration));
    }
    
    void eat(int id, int duration) {
        std::cout << "Philosopher " << id << " is eating..." << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(duration));
        std::cout << "Philosopher " << id << " finished eating" << std::endl;
    }
    
    void pick_up_forks(int id) {
        int left_fork = id;
        int right_fork = (id + 1) % NUM_PHILOSOPHERS;
        
        // 使用std::lock同时获取两个锁,避免死锁
        std::lock(forks[left_fork], forks[right_fork]);
        std::cout << "Philosopher " << id << " picked up both forks" << std::endl;
    }
    
    void put_down_forks(int id) {
        int left_fork = id;
        int right_fork = (id + 1) % NUM_PHILOSOPHERS;
        
        forks[left_fork].unlock();
        forks[right_fork].unlock();
        std::cout << "Philosopher " << id << " put down both forks" << std::endl;
    }
};

10.3 读者-写者问题

cpp
#include <shared_mutex>
#include <thread>
#include <iostream>
#include <vector>
#include <random>
#include <atomic>

class ReadersWriters {
private:
    std::shared_mutex rw_mutex;
    std::atomic<int> data;
    std::atomic<int> reader_count;
    std::atomic<int> writer_count;
    
public:
    ReadersWriters() : data(0), reader_count(0), writer_count(0) {}
    
    void read(int reader_id) {
        {
            std::shared_lock<std::shared_mutex> lock(rw_mutex);
            reader_count++;
            
            // 读取数据
            int value = data.load();
            std::cout << "Reader " << reader_id << " read value: " << value 
                      << " (active readers: " << reader_count.load() << ")" << std::endl;
            
            // 模拟读取时间
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
            
            reader_count--;
        }
    }
    
    void write(int writer_id, int new_value) {
        {
            std::unique_lock<std::shared_mutex> lock(rw_mutex);
            writer_count++;
            
            std::cout << "Writer " << writer_id << " writing value: " << new_value << std::endl;
            
            // 写入数据
            data.store(new_value);
            
            // 模拟写入时间
            std::this_thread::sleep_for(std::chrono::milliseconds(200));
            
            std::cout << "Writer " << writer_id << " finished writing" << std::endl;
            writer_count--;
        }
    }
    
    void start_simulation() {
        std::vector<std::thread> threads;
        std::random_device rd;
        std::mt19937 gen(rd());
        std::uniform_int_distribution<> dis(1, 100);
        
        // 创建读者线程
        for (int i = 0; i < 5; i++) {
            threads.emplace_back([this, i, &gen, &dis]() {
                for (int j = 0; j < 3; j++) {
                    std::this_thread::sleep_for(std::chrono::milliseconds(dis(gen)));
                    read(i);
                }
            });
        }
        
        // 创建写者线程
        for (int i = 0; i < 2; i++) {
            threads.emplace_back([this, i, &gen, &dis]() {
                for (int j = 0; j < 3; j++) {
                    std::this_thread::sleep_for(std::chrono::milliseconds(dis(gen) * 2));
                    write(i, i * 100 + j);
                }
            });
        }
        
        // 等待所有线程完成
        for (auto& t : threads) {
            t.join();
        }
        
        std::cout << "Final data value: " << data.load() << std::endl;
    }
};

// 优先写者的读写锁实现
class WriterPreferredRW {
private:
    std::mutex read_mutex;
    std::mutex write_mutex;
    std::mutex reader_count_mutex;
    std::condition_variable reader_cv;
    std::condition_variable writer_cv;
    
    int reader_count;
    int writer_count;
    int waiting_writers;
    bool writer_active;
    std::atomic<int> data;
    
public:
    WriterPreferredRW() 
        : reader_count(0), writer_count(0), waiting_writers(0), 
          writer_active(false), data(0) {}
    
    void read(int reader_id) {
        std::unique_lock<std::mutex> lock(read_mutex);
        
        // 等待直到没有写者在等待或活动
        reader_cv.wait(lock, [this] { 
            return waiting_writers == 0 && !writer_active; 
        });
        
        reader_count++;
        lock.unlock();
        
        // 读取数据
        int value = data.load();
        std::cout << "Reader " << reader_id << " read: " << value << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        
        lock.lock();
        reader_count--;
        if (reader_count == 0) {
            writer_cv.notify_one();  // 通知等待的写者
        }
    }
    
    void write(int writer_id, int new_value) {
        std::unique_lock<std::mutex> lock(write_mutex);
        
        waiting_writers++;
        writer_cv.wait(lock, [this] { 
            return reader_count == 0 && !writer_active; 
        });
        
        waiting_writers--;
        writer_active = true;
        lock.unlock();
        
        // 写入数据
        std::cout << "Writer " << writer_id << " writing: " << new_value << std::endl;
        data.store(new_value);
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
        
        lock.lock();
        writer_active = false;
        
        if (waiting_writers > 0) {
            writer_cv.notify_one();  // 优先通知其他写者
        } else {
            reader_cv.notify_all();  // 通知所有读者
        }
    }
};

10.4 并行矩阵乘法

cpp
#include <thread>
#include <vector>
#include <iostream>
#include <chrono>
#include <random>
#include <future>

class ParallelMatrixMultiply {
public:
    using Matrix = std::vector<std::vector<double>>;
    
    // 串行矩阵乘法
    static Matrix multiply_serial(const Matrix& A, const Matrix& B) {
        size_t rows_A = A.size();
        size_t cols_A = A[0].size();
        size_t cols_B = B[0].size();
        
        Matrix C(rows_A, std::vector<double>(cols_B, 0.0));
        
        for (size_t i = 0; i < rows_A; i++) {
            for (size_t j = 0; j < cols_B; j++) {
                for (size_t k = 0; k < cols_A; k++) {
                    C[i][j] += A[i][k] * B[k][j];
                }
            }
        }
        
        return C;
    }
    
    // 并行矩阵乘法(按行分割)
    static Matrix multiply_parallel_rows(const Matrix& A, const Matrix& B, int num_threads = 4) {
        size_t rows_A = A.size();
        size_t cols_A = A[0].size();
        size_t cols_B = B[0].size();
        
        Matrix C(rows_A, std::vector<double>(cols_B, 0.0));
        std::vector<std::thread> threads;
        
        auto worker = [&](size_t start_row, size_t end_row) {
            for (size_t i = start_row; i < end_row; i++) {
                for (size_t j = 0; j < cols_B; j++) {
                    for (size_t k = 0; k < cols_A; k++) {
                        C[i][j] += A[i][k] * B[k][j];
                    }
                }
            }
        };
        
        size_t rows_per_thread = rows_A / num_threads;
        
        for (int t = 0; t < num_threads; t++) {
            size_t start_row = t * rows_per_thread;
            size_t end_row = (t == num_threads - 1) ? rows_A : (t + 1) * rows_per_thread;
            threads.emplace_back(worker, start_row, end_row);
        }
        
        for (auto& thread : threads) {
            thread.join();
        }
        
        return C;
    }
    
    // 并行矩阵乘法(块分割)
    static Matrix multiply_parallel_blocks(const Matrix& A, const Matrix& B, int num_threads = 4) {
        size_t rows_A = A.size();
        size_t cols_A = A[0].size();
        size_t cols_B = B[0].size();
        
        Matrix C(rows_A, std::vector<double>(cols_B, 0.0));
        std::vector<std::future<void>> futures;
        
        auto worker = [&](size_t start_i, size_t end_i, size_t start_j, size_t end_j) {
            for (size_t i = start_i; i < end_i; i++) {
                for (size_t j = start_j; j < end_j; j++) {
                    for (size_t k = 0; k < cols_A; k++) {
                        C[i][j] += A[i][k] * B[k][j];
                    }
                }
            }
        };
        
        int block_size = std::sqrt(num_threads);
        size_t block_rows = rows_A / block_size;
        size_t block_cols = cols_B / block_size;
        
        for (int bi = 0; bi < block_size; bi++) {
            for (int bj = 0; bj < block_size; bj++) {
                size_t start_i = bi * block_rows;
                size_t end_i = (bi == block_size - 1) ? rows_A : (bi + 1) * block_rows;
                size_t start_j = bj * block_cols;
                size_t end_j = (bj == block_size - 1) ? cols_B : (bj + 1) * block_cols;
                
                futures.push_back(
                    std::async(std::launch::async, worker, start_i, end_i, start_j, end_j)
                );
            }
        }
        
        for (auto& future : futures) {
            future.get();
        }
        
        return C;
    }
    
    // 生成随机矩阵
    static Matrix generate_random_matrix(size_t rows, size_t cols) {
        Matrix matrix(rows, std::vector<double>(cols));
        std::random_device rd;
        std::mt19937 gen(rd());
        std::uniform_real_distribution<> dis(0.0, 1.0);
        
        for (size_t i = 0; i < rows; i++) {
            for (size_t j = 0; j < cols; j++) {
                matrix[i][j] = dis(gen);
            }
        }
        
        return matrix;
    }
    
    // 性能测试
    static void benchmark() {
        const size_t SIZE = 500;
        
        std::cout << "Generating random matrices (" << SIZE << "x" << SIZE << ")..." << std::endl;
        Matrix A = generate_random_matrix(SIZE, SIZE);
        Matrix B = generate_random_matrix(SIZE, SIZE);
        
        // 串行版本
        auto start = std::chrono::high_resolution_clock::now();
        Matrix C_serial = multiply_serial(A, B);
        auto end = std::chrono::high_resolution_clock::now();
        auto serial_time = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
        
        std::cout << "Serial multiplication time: " << serial_time.count() << " ms" << std::endl;
        
        // 并行版本(按行)
        start = std::chrono::high_resolution_clock::now();
        Matrix C_parallel_rows = multiply_parallel_rows(A, B, 4);
        end = std::chrono::high_resolution_clock::now();
        auto parallel_rows_time = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
        
        std::cout << "Parallel rows multiplication time: " << parallel_rows_time.count() << " ms" << std::endl;
        
        // 并行版本(块分割)
        start = std::chrono::high_resolution_clock::now();
        Matrix C_parallel_blocks = multiply_parallel_blocks(A, B, 4);
        end = std::chrono::high_resolution_clock::now();
        auto parallel_blocks_time = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
        
        std::cout << "Parallel blocks multiplication time: " << parallel_blocks_time.count() << " ms" << std::endl;
        
        // 计算加速比
        double speedup_rows = (double)serial_time.count() / parallel_rows_time.count();
        double speedup_blocks = (double)serial_time.count() / parallel_blocks_time.count();
        
        std::cout << "Speedup (rows): " << speedup_rows << "x" << std::endl;
        std::cout << "Speedup (blocks): " << speedup_blocks << "x" << std::endl;
    }
};

int main() {
    std::cout << "=== Producer-Consumer Test ===" << std::endl;
    test_producer_consumer();
    
    std::cout << "\n=== Dining Philosophers Test ===" << std::endl;
    DiningPhilosophers philosophers;
    philosophers.start();
    philosophers.join();
    
    std::cout << "\n=== Readers-Writers Test ===" << std::endl;
    ReadersWriters rw;
    rw.start_simulation();
    
    std::cout << "\n=== Matrix Multiplication Benchmark ===" << std::endl;
    ParallelMatrixMultiply::benchmark();
    
    return 0;
}

总结

C++线程编程是现代软件开发的核心技能,涉及多个层面的知识:

关键要点

  1. 基础理解:线程模型、内存共享、同步需求
  2. 标准库掌握:std::thread、mutex、condition_variable、atomic
  3. 同步机制:互斥锁、读写锁、条件变量、信号量
  4. 高级技术:线程池、无锁编程、原子操作
  5. 实际应用:生产者消费者、读写者、哲学家进餐等经典问题

面试准备建议

  1. 理论基础:理解线程同步原理和各种锁机制
  2. 编程实践:能够实现基本的多线程程序和同步机制
  3. 性能优化:了解无锁编程和原子操作
  4. 问题解决:能够分析和解决死锁、竞态条件等问题
  5. 设计能力:能够设计线程池、异步任务系统等

线程编程的关键在于平衡性能和正确性,既要充分利用多核优势,又要确保程序的线程安全。在面试中,不仅要掌握语法和API,更要理解底层原理和最佳实践。