Linux编程–线程同步

1、什么是同步?

  • 同,指协同、协助、互相配合。主旨在协同步调,按预定的先后次序运行。
  • 线程同步,指一个线程发出某一功能调用时,在没有得到结果之前,该调用不返回。同时其它线程为保证数据一致性,不能调用该功能。
  • 同步的目的,是为了避免数据混乱,解决与时间有关的错误。实际上,不仅线程间需要同步,进程间、信号间等都需要同步机制。
  • 总结:所有“多个控制流,共同操作一个共享资源”的情况,都需要同步。

2、数据混乱的原因

  • 资源共享(独享资源不会)
  • 调度随机(意味着数据访问会出现竞争)
  • 线程间缺乏必要的同步机制。
    以上三点中,前两点不能改变,欲提高效率,传递数据,资源必须共享。只要共享资源,就一定会出现竞争。只要存在竞争关系,数据就很容易出现混乱。
    所以只能从第三点着手解决。使多个线程在访问共享资源的时候,出现互斥。

3、互斥量(互斥锁)

  • Linux中提供一把互斥锁mutex,每个线程在对资源操作前都尝试先加锁,成功加锁才操作,操作结束解锁。
  • 资源还是共享的,线程间也还是竞争的,但通过“锁”将资源访问变成互斥操作,而后的时间有关错误也不会再产生了。
  • 同一时间,只能有一个线程持有该锁。
  • 主要函数:
    // restrict关键字:用于限制指针,告诉编译器,所有修改该指针指向的内存中内容的操作,只能通过本指针完成。
    int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    
    int pthread_mutex_destroy(pthread_mutex_t *mutex);
    
    int pthread_mutex_lock(pthread_mutex_t *mutex);
    
    int pthread_mutex_trylock(pthread_mutex_t *mutex);
    
    int pthread_mutex_unlock(pthread_mutex_t *mutex);
    
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h>
// 1、定义一个全局变量互斥锁
pthread_mutex_t mutex;
// 线程回调函数
void *callback(void *agr){
    while(1){
        // 加锁、此时需要使用mutex的线程在该锁被释放前不能使用
        pthread_mutex_lock(&mutex);
        printf("123");
        sleep(1);
        printf("456\n”);
        // 解锁、释放出资源
        pthread_mutex_unlock(&mutex);
        sleep(1);
    }
    return NULL;
}

int main(){
    // 2、初始化互斥锁
    pthread_mutex_init(&mutex, NULL);

    pthread_t tid;
    pthread_create(&tid, NULL, callback, NULL);
    // 3、使用锁
    while(1){
        pthread_mutex_lock(&mutex);
        printf("abc");
        sleep(2);
        printf("def\n");
        pthread_mutex_unlock(&mutex);
        sleep(1);
    }
    // 4、释放/销毁互斥锁
    pthread_mutex_destroy(&mutex);
    return 0;
}

执行结果:

// 使用互斥锁前
parallels@ubuntu:~/Linux/pthread_syn$ ./pthread_mutex.out
abc123456
123456
123def
abc456
123def
abc456
123456
^C
parallels@ubuntu:~/Linux/pthread_syn$ 
// 使用互斥锁后
parallels@ubuntu:~/Linux/pthread_syn$ ./pthread_mutex.out
abcdef
123456
abcdef
123456
abcdef
123456
abcdef
^C
parallels@ubuntu:~/Linux/pthread_syn$

PS:在访问共享资源前加锁,访问结束后立即解锁。锁的“粒度”应越小越好。

4、死锁

  • 同一线程重复加锁,会造成自己等自己,死锁;
  • 两个线束相互等待对方的锁,死锁
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
// 1、定义两个互斥锁
pthread_mutex_t mutex1, mutex2;
// 子线程回调函数
void *callback(void *arg){
    // 对mutex1加锁
    pthread_mutex_lock(&mutex1);
    printf("sub pthread: I get mutex1....\n");
    sleep(1);
    // 对mutex2加锁
    pthread_mutex_lock(&mutex2);
    printf("sub pthread: I get mutex2...\n");
    pthread_mutex_unlock(&mutex2);
    pthread_mutex_unlock(&mutex1);
    return NULL;
}

int main(){
    // 2、初始化两把互斥锁
    pthread_mutex_init(&mutex1, NULL);
    pthread_mutex_init(&mutex2, NULL);    

    // 3、创建子线程
    pthread_t tid;
    pthread_create(&tid, NULL, callback, NULL);

    // 4、主线程与子线程相互等待对方的锁
    pthread_mutex_lock(&mutex2);
    printf("main pthread: I get mutex2...\n");
    sleep(1);
    pthread_mutex_lock(&mutex1);
    printf("main pthread: I get mutex1...\n");
    pthread_mutex_unlock(&mutex2);
    pthread_mutex_unlock(&mutex1);

    // 销毁释放
    pthread_mutex_destroy(&mutex1);
    pthread_mutex_destroy(&mutex2);
}

执行结果:

parallels@ubuntu:~/Linux/pthread_syn$ ./pthread_mutex_deadlock.out
main pthread: I get mutex2...
sub pthread: I get mutex1....
^C
parallels@ubuntu:~/Linux/pthread_syn$

5、读写锁

  • 写加锁:不可读写,阻塞读写线程
  • 读加锁:可读不可写,阻塞写线程
  • 写优先级高,写独占、读共享。
    int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
    
    int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
    
    int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
    
    int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
    
    int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
    
    int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
    
    int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
    

示例代码:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
// 1、定义读写锁
pthread_rwlock_t rwlock;
// “写”相关线程主函数
void *callback_read(void *arg){
    while(1){
        pthread_rwlock_rdlock(&rwlock);
        printf("I am a pthread -- read %d --, I get the rwlock now...\n", (int)(long)arg);
        sleep(1);
        pthread_rwlock_unlock(&rwlock);
        usleep(300);
    }
    return NULL;
}
// “读”相当线程主函数
void *callback_write(void *arg){
    while(1){
        pthread_rwlock_wrlock(&rwlock);
        printf("----------------------------------------------------\n");
        printf("I am a pthread -- writ %d --, I get the rwlock now...\n", (int)(long)arg);
        printf("----------------------------------------------------\n");
        sleep(1);
        pthread_rwlock_unlock(&rwlock);
        usleep(300);
    }
    return NULL;
}

int main(){
    // 2、初始化读写锁
    pthread_rwlock_init(&rwlock, NULL);
    // 3、创建10个线程,6个读4个写
    pthread_t tid[10];
    int i, ret;
    for (i = 0; i < 6; i++){
        ret = pthread_create(&tid[i], NULL, callback_read, (void*)(long)i);
        if (ret != 0){
            fprintf(stderr, "pthread_create error: %s\n", strerror(ret));
            continue;
        }
    }
    for (i = 6; i < 10; i++){
        ret = pthread_create(&tid[i], NULL, callback_write, (void *)(long)i);
        if (ret != 0){
            fprintf(stderr, "pthread_create error: %s\n", strerror(ret));
            continue;
        }
    }
    // 回收子线程
    for (i = 0; i < 10; i++){
        ret = pthread_join(tid[i], NULL);
        if (ret != 0){
            fprintf(stderr, "pthread_join error:%s\n", strerror(ret));
            continue;
        }
    }
    // 4、回收释放读写锁
    pthread_rwlock_destroy(&rwlock);
    pthread_exit(NULL);
}

执行结果:

parallels@ubuntu:~/Linux/pthread_syn$ ./pthread_rwlock.out
I am a pthread -- read 0 --, I get the rwlock now...
I am a pthread -- read 1 --, I get the rwlock now...
I am a pthread -- read 2 --, I get the rwlock now...
I am a pthread -- read 3 --, I get the rwlock now...
I am a pthread -- read 4 --, I get the rwlock now...
I am a pthread -- read 5 --, I get the rwlock now...
----------------------------------------------------
I am a pthread -- writ 6 --, I get the rwlock now...
----------------------------------------------------
I am a pthread -- read 1 --, I get the rwlock now...
I am a pthread -- read 2 --, I get the rwlock now...
I am a pthread -- read 0 --, I get the rwlock now...
I am a pthread -- read 3 --, I get the rwlock now...
I am a pthread -- read 5 --, I get the rwlock now...
I am a pthread -- read 4 --, I get the rwlock now...
----------------------------------------------------
I am a pthread -- writ 7 --, I get the rwlock now...
----------------------------------------------------
I am a pthread -- read 1 --, I get the rwlock now...
I am a pthread -- read 2 --, I get the rwlock now...
I am a pthread -- read 0 --, I get the rwlock now...
I am a pthread -- read 3 --, I get the rwlock now...
I am a pthread -- read 5 --, I get the rwlock now...
I am a pthread -- read 4 --, I get the rwlock now...
----------------------------------------------------
I am a pthread -- writ 9 --, I get the rwlock now...
----------------------------------------------------
I am a pthread -- read 1 --, I get the rwlock now...
I am a pthread -- read 2 --, I get the rwlock now...
I am a pthread -- read 4 --, I get the rwlock now...
I am a pthread -- read 0 --, I get the rwlock now...
I am a pthread -- read 3 --, I get the rwlock now...
I am a pthread -- read 5 --, I get the rwlock now...
----------------------------------------------------
I am a pthread -- writ 8 --, I get the rwlock now...
----------------------------------------------------
^C
parallels@ubuntu:~/Linux/pthread_syn$

6、条件变量

int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);

int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);

int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);

int pthread_cond_signal(pthread_cond_t *cond);

int pthread_cond_broadcast(pthread_cond_t *cond);

int pthread_cond_destroy(pthread_cond_t *cond);
  • int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
    1. 阻塞等待条件变量cond
    2. 释放已掌握的互斥锁,相当于pthread_mutex_unlock(&mutex)
    3. 唤醒时解决阻塞,重新申请互斥锁pthread_mutex_lock(&mutex)
    4. PS:前两步为原子操作
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
// 1、定义互斥锁、条件变量
pthread_mutex_t mutex;
pthread_cond_t cond;
// 子线程主函数
void *callback(void *arg){
    pthread_mutex_lock(&mutex);
    int i = 10;
    while(i--){
        printf("I am sub pthread runloop at %d\n", i);
        // 中间设计
        if (i == 7){
            pthread_cond_wait(&cond, &mutex);
            /* 使用pthread_cond_timedwait函数
            struct timespec t;
            t.tv_sec = time(NULL) + 3;
            // 第三个参数const struct timespec *restrict abstime
            // timespec{time_t tv_sec;time_t tv_usec}
            pthread_cond_timedwait(&cond, &mutex, &t);
            */
        }
        //sleep(1);
    }
    pthread_mutex_unlock(&mutex);
    return NULL;
}

int main(){
    // 2、初始化互斥锁、条件变量
    pthread_mutex_init(&mutex, NULL);
    pthread_cond_init(&cond, NULL);
    // 3、开启子线程
    pthread_t tid;
    pthread_create(&tid, NULL, callback, NULL);

    // 4、发送条件变量信号
    sleep(3);
    pthread_cond_signal(&cond);

    // 5、注销条件变量、互斥锁
    pthread_cond_destroy(&cond);
    pthread_mutex_destroy(&mutex);
    // 回收子线程,退出当前线程
    pthread_join(tid, NULL);
    pthread_exit(NULL);
}

7、生产者消费者模型

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

// 1.1、定义并创建互斥锁、条件变量
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

// 1.2、定义数据模型与数据结构
// 测试用的数据模型
struct msg{
    struct msg *next;
    int num;
};
// 全局链表
struct msg *head;
// 打印输出“未消费”的产品
void print_msg(){
    printf(" | ");
    struct msg *msg = head;
    while(msg){
        printf("%d ", msg->num);
        msg = msg->next;
    }
    putchar('\n');
}

// 3.1、生产者线程主函数
void *producer(void *arg){
    while(1){
        pthread_mutex_lock(&mutex);
        // 产生一条新数据
        struct msg *msg = malloc(sizeof(struct msg));
        msg->num = rand() % 99;

        // 在队头插入数组
        msg->next = head;
        head = msg;

        // 打印当前结果
        printf("producer msg with num = %d", msg->num);
        print_msg();
        pthread_mutex_unlock(&mutex);

        pthread_cond_signal(&cond);
        sleep(rand() % 3);
    }
}
// 3.2、消费者线程主函数
void *consumer(void *arg){
    while (1){
        pthread_mutex_lock(&mutex);
        if (head == NULL){
            pthread_cond_wait(&cond, &mutex);
        }
        // 从队头取数据
        struct msg *msg = head;
        head = msg->next;

        // 打印当前结果
        printf("consumer msg with num = %d", msg->num);
        print_msg();
        pthread_mutex_unlock(&mutex);

        free(msg);
        sleep(rand() % 3);
    }
}

int main(){
    // 生成一个随机数
    srand(time(NULL));
    // 2、创建2个线程并开启
    pthread_t pid, cid;
    pthread_create(&pid, NULL, producer, NULL);
    pthread_create(&cid, NULL, consumer, NULL);
    // 4、回收线程并注销锁、条件变量
    pthread_join(pid, NULL);
    pthread_join(cid, NULL);
    pthread_cond_destroy(&cond);
    pthread_mutex_destroy(&mutex);
    pthread_exit(NULL);
}

执行结果:

parallels@ubuntu:~/Linux/pthread_syn$ ./pthread_cp.out
producer msg with num = 91 | 91
consumer msg with num = 91 |
producer msg with num = 67 | 67
consumer msg with num = 67 |
producer msg with num = 36 | 36
consumer msg with num = 36 |
producer msg with num = 54 | 54
consumer msg with num = 54 |
producer msg with num = 16 | 16

条件变量的优点:相对mutex而言,条件变量可以减少竞争,如果直接使用mutex,除了生产者,消费者之间也要竞争互斥量,事实上如果汇聚中没有数据,消费者之间的竞争是没有意义的,有了条件变量之后,只有生产者完成生产,消费者之间才会有竞争,提高了程序的效率。

Leave a Reply