Example program for pipeline threading model

Multithread programming is popularly adopted to make better use of the computation resources on modern computing devices including desktop computer, servers, and embedded systems. There are several major threading models for multithread programming, including thread pool model, boss/worker model, pipeline model, and producer/consumer model. The definition of these models can be founded on many places on internet. Some of them also provide sample code for practice. However, it is difficult to find a sample program for pipeline threading model. Hence, I prepare one for the students to learn.

In this example, we create two threads: the first thread does some computation and insert a message to message queue to pass the data to second thread; the second thread waits for the message in the queue and continues to process. The pthread functions we used to achieve the pipeline threading model are pthread_cond_signal and pthread_mutex_lock/pthread_mutex_unlock.

pthread_cond_signal() prevents the second thread to start before the first thread finishes its work. When the message queue is empty, the second thread will wait until the first thread enqueues a message to message queue. pthread_mutex_lock/pthread_mutex_unlock are used to assure that there is only one thread to access the message queue, either enqueue or dequeue.

The example program terminates when the first thread finishes inserting the message into queue. Hence, the second thread may leave few messages in the queue, which may not be the desirable features in many applications.

Below are the example code.

#define NUM_MSGS 65536

typedef struct msg {
    struct msg *m_next;
    char content[36];
} msg;

msg *workq;
pthread_cond_t qready = PTHREAD_COND_INITIALIZER;
pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;

    msg *mp;

    for (;;) {
        // Wake up by the another thread
        while (workq == NULL)
            pthread_cond_wait(&qready, &qlock);
        mp = workq;
        workq = mp->m_next;

        /* now process the second part of your work */
        printf("Message from anther thread: %sn", mp->content);

enqueue_msg(msg *mp){
    mp->m_next = workq;
    workq = mp;

void *
thr_fn1(void *arg){
    msg *new_msg;

    printf("thread 1: ID is %lun", (unsigned long) pthread_self());
    for (int i=0 ; i < NUM_MSGS ; i++){
        // Do the first part of work here

        // Send a message to wake the other thread
        new_msg = (struct msg *) malloc(sizeof(msg));
        sprintf(new_msg->content, "Message %d from Thread 1n", i);
    pthread_exit((void *)0);

void *
thr_fn2(void *arg){
    printf("thread 2: ID is %lun", (unsigned long) pthread_self());
    pthread_exit((void *)0);

void *
err_exit(int err, char *err_msg){
    printf("Error: %d, Message: %sn", err, err_msg);

    int err;
    pthread_t tid1, tid2;

    err = pthread_create(&tid1, NULL, thr_fn1, NULL);
    if (err != 0)
        err_exit(err, "can't create thread 1");

    printf("parent starting second threadn");
    err = pthread_create(&tid2, NULL, thr_fn2, NULL);
    if (err != 0)
        err_exit(err, "can't create thread 2");

    // the process terminates when thread 1 finishes.
    err = pthread_join(tid1, NULL);
    if (err != 0)
        err_exit(err, "can't join with thread 1");

Hope it helps. 🙂

This entry was posted in News, Teaching. Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s