pthread

  • pthread
  • pthread_mutex_t pthread_cond_t pthread_attr_t

c++11 线程库 std::thread

  • thread
  • mutex
  • condition_variable
  • RAII
  • future future包装了未来某个计算结果的期诺。当你对所获得的future调用get时,程序会一直阻塞直到future的值被计算出来。(如果future的值已经计算出来了,get调用会立刻获得返回值)而这一切都是在后台执行的。
  • atomic atomic位于头文件atomic下,实现了类似于java.util.concurrent.atomic的功能。它提供了一组轻量级的、作用在单个变量上的原子操作,是volatile的替代品。有些时候你也可以用它来替换掉Lock(假如整个race condition中只有单个变量)

最后总结下std::thread对比于pthread的优缺点:

优点:

  • 简单,易用
  • 跨平台,pthread只能用在POSIX系统上(其他系统有其独立的thread实现)
  • 提供了更多高级功能,比如future
  • 更加C++(跟匿名函数,std::bind,RAII等C++特性更好的集成)

缺点:

  • 没有RWlock。有一个类似的shared_mutex,不过它属于C++14,你的编译器很有可能不支持。
  • 操作线程和Mutex等的API较少。毕竟为了跨平台,只能选取各原生实现的子集。如果你需要设置某些属性,需要通过API调用返回原生平台上的对应对象,再对返回的对象进行操作。

分别用std::thread和pthread实现的多生产者多消费者程序。注意行数上的差距。

pthread版本

#include <pthread.h>
#include <queue>
#include <stdio.h>
#include <unistd.h>

// 注意pthread_*函数返回的异常值,为了简单(偷懒),我没有去处理它们

pthread_mutex_t mutex;
pthread_cond_t condvar;

std::queue<int> msgQueue;
struct Produce_range {
    int start;
    int end;
};

void *producer(void *args)
{
    int start = static_cast<Produce_range *>(args)->start;
    int end = static_cast<Produce_range *>(args)->end;
    for (int x = start; x < end; x++) {
        usleep(200 * 1000);
        pthread_mutex_lock(&mutex);
        msgQueue.push(x);
        pthread_mutex_unlock(&mutex);
        pthread_cond_signal(&condvar);
        printf("Produce message %d\n", x);
    }
    pthread_exit((void *)0);
    return NULL;
}

void *consumer(void *args)
{
    int demand = *static_cast<int *>(args);
    while (true) {
        pthread_mutex_lock(&mutex);
        if (msgQueue.size() <= 0) {
            pthread_cond_wait(&condvar, &mutex);
        }
        if (msgQueue.size() > 0) {
            printf("Consume message %d\n", msgQueue.front());
            msgQueue.pop();
            --demand;
        }
        pthread_mutex_unlock(&mutex);
        if (!demand) break;
    }
    pthread_exit((void *)0);
    return NULL;
}


int main()
{
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_mutex_init(&mutex, NULL);
    pthread_cond_init(&condvar, NULL);

    pthread_t producer1, producer2, producer3, consumer1, consumer2;

    Produce_range range1 = {0, 10};
    pthread_create(&producer1, &attr, producer, static_cast<void *>(&range1));
    Produce_range range2 = {range1.end, range1.end + 10};
    pthread_create(&producer2, &attr, producer, static_cast<void *>(&range2));
    Produce_range range3 = {range2.end, range2.end + 10};
    pthread_create(&producer3, &attr, producer, static_cast<void *>(&range3));

    int consume_demand1 = 20;
    int consume_demand2 = 10;
    pthread_create(&consumer1, &attr, consumer, 
            static_cast<void *>(&consume_demand1));
    pthread_create(&consumer2, &attr, consumer, 
            static_cast<void *>(&consume_demand2));

    pthread_join(producer1, NULL);
    pthread_join(producer2, NULL);
    pthread_join(producer3, NULL);
    pthread_join(consumer1, NULL);
    pthread_join(consumer2, NULL);
} 

std::thread版本

#include <chrono>
#include <condition_variable>
#include <future>
#include <mutex>
#include <queue>

// 注意某些调用可能会抛出std::system_error, 为了简单(偷懒),我没有去捕获
std::mutex mutex;
std::condition_variable condvar;

std::queue<int> msgQueue;

void producer(int start, int end)
{
    for (int x = start; x < end; x++) {
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
        {        
            std::lock_guard<std::mutex> guard(mutex);
            msgQueue.push(x);
        }
        printf("Produce message %d\n", x);
        condvar.notify_all();
    }
}

void consumer(int demand)
{
    while (true) {
        std::unique_lock<std::mutex> ulock(mutex);
        condvar.wait(ulock, []{ return msgQueue.size() > 0;});
        // wait的第二个参数使得显式的double check不再必要
        printf("Consume message %d\n", msgQueue.front());
        msgQueue.pop();
        --demand;
        if (!demand) break;
    }
}


int main()
{
    std::thread producer1(producer, 0, 10);
    std::thread producer2(producer, 10, 20);
    std::thread producer3(producer, 20, 30);
    std::thread consumer1(consumer, 20);
    std::thread consumer2(consumer, 10);

    producer1.join();
    producer2.join();
    producer3.join();
    consumer1.join();
    consumer2.join();
}