读者写者问题


Linux下实现读者写者问题

定义

读者写者问题是一个经典的进程同步问题
一个数据文件或记录可以被多个进程共享,我们把只读该文件的进程称为“读者进程”,其他进程为“写者进程”。允许多个进程同时读一个共享对象,但不允许一个写者进程和其他写者进程或读者进程同时访问共享对象。即:保证一个写者进程必须与其他进程互斥的访问共享对象的同步问题;
目前主要有两种解决读者写者问题的方案

Linux的信号量操作

信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施,用于进程同步;通俗地说信号量是一种资源计数器,对信号量的操作都是原子操作;信号量分为整型和记录型;其中记录型还具有对等待资源的进程排队的功能
信号量的使用可以非常复杂,但是一般的场景下只有两种使用方式:

  1. 初始化为1,作为互斥锁使用(下文称 “锁” )(他和std::mutex还有区别这个我们后面介绍)
  2. 初始化为0,作为同步工具

Linux信号量都是记录型;Linux信号量有两种一种是POSIX型,他比较简单一般用于线程间同步,还有一种是systemV型信号量,它的本质是共享内存区,可以使用ipcs -sa命令查看现有信号量

这里我们介绍后者,相关系统调用点击这篇文章

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
int getMutexsemaphore(key_t key)
{
int semfd=semget(key,1,0666);
if(semfd==-1)//获取失败就创建
{
if(errno==ENOENT)//信号量不存在的标记
{
semfd=semget(key,1,0666|IPC_CREAT|IPC_EXCL);
if(semfd==-1)
{
if(errno==EEXIST)//信号量已经存在
{
//再次获取
semfd=semget(key,1,0666);
if(semfd==-1)
{
perror("init 1 semget()");
return -1;
}
}
else
{
perror("init 2 semget()"); return false;
}
}


union semun {
int val; /* Value for SETVAL */
struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */
unsigned short *array; /* Array for GETALL, SETALL */
struct seminfo *__buf; /* Buffer for IPC_INFO
(Linux-specific) */
}semVal;
semVal.val=1;
//创建的信号量需要初始化

if(semctl(semfd,0,SETVAL,semVal)<0)
{
perror("init semctl()"); return -1;
}
else
{
return semfd;
}
}
}
return semfd;
}

我们要求用户使用不同的key_t获取mutex即可,这篇文章需要注意的地方是在检测到信号量不存在时,去创建信号量。还要再考虑是否有线程在同时创建,其中有一个会收到EEXIST的错误。创建共享内存也是一个逻辑,下面是代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
int* const getCount()
{
void* ptr;
int shmfd=shmget(0X5005,sizeof(int),0666);
if(shmfd==-1)
{
shmfd=shmget(0X5005,sizeof(int),0666|IPC_CREAT);
if(shmfd==-1)
{
perror("shmgetfailed");
return (int*)(void*)-1;
}
else
{
ptr=shmat(shmfd,NULL,0);
if(ptr==(void*)-1)
{
perror("shmatfailed");
return (int*)(void*)-1;
}
else
{
memset(ptr,0,sizeof(int));
}
return (int*)ptr;
}
}
else
{
ptr=shmat(shmfd,NULL,0);
if(ptr==(void*)-1)
{
perror("shmatfailed");
return (int*)(void*)-1;
}
return (int*)ptr;
}
}

本次实验只使用一个共享内存且固定为一个int
下面是P,V操作,就是两个系统调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
int P(int semfd)
{
sembuf buf;
buf.sem_flg=SEM_UNDO;
buf.sem_num=0;
buf.sem_op=-1;
if(semop(semfd,&buf,1)==-1)
{
printf("P Function abnormal");
return -1;
}
else
return 0;
}
int V(int semfd)
{
sembuf buf;
buf.sem_flg=SEM_UNDO;
buf.sem_num=0;
buf.sem_op=1;
if(semop(semfd,&buf,1)==-1)
{
printf("P Function abnormal");
return -1;
}
else
return 0;
}

读者优先的解决方案

最朴素解决方案是读者优先,我们考虑对共享文件按加锁(使用rw锁)
由于可以多个读者同时读,因此使用count计数,只有第一个访问的读进程需要加锁,最后一个退出的读进程解锁;其余读进程直接运行不需要对锁操作,访问共享内存count是使用的锁mutex除外

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
int main(int argc,char* argv[])
{
if(argc<=1)
{
printf("输入需要写入的字符串\n");
exit(-1);
}
int rw=getMutexsemaphore((key_t)0X1005); //用于实现对共享文件的互斥访问
int mutex=getMutexsemaphore((key_t)0X1050); //用于实现对变量Count的互斥访问

P(mutex); //加锁访问Count
int* const countptr=getCount();
if(*countptr==0) //当前是第一个读进程
{
P(rw); //负责加读锁,不是第一个就不用加
}
++(*countptr); //访问文件进程数加一

V(mutex); //释放访问Count的锁

//现在开始读文件内容
ReadOut(atoi(argv[1]));

//读完减少count
P(mutex);
--(*countptr);
if(*countptr==0) //当前是最后一个读进程
{
V(rw); //放开读锁
}
V(mutex);
printf("Reader已正常退出\n");
}

ReadOut函数可自己定义
而对于写线程,在访问文章时加锁访问后解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int main(int argc,char* argv[])
{
if(argc<=1)
{
printf("输入执行次数\n");
exit(-1);
}
const char* source=argv[1];
int rw=getMutexsemaphore((key_t)0X1005);

P(rw); //读写文件时上锁
writeFile(source);
V(rw);
printf("写进程完毕程序正常退出\n");
}

writeFile函数可自己定义
显然这样做是读者优先的,如果有源源不断的读者进程到来,写者进程就会一直等,最后被饿死

写者优先的解决方案

我们考虑有读者进程在读或者在等待时,让后面的进程等待。这里考虑使用一个锁来实现,它表示当前有没有写者要操作(w锁),写者进程在操作期间会持有这个锁,读者进程操作之前检查这个锁
下面是读者程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
int main(int argc,char* argv[])
{
if(argc<=1)
{
printf("输入需要写入的字符串\n");
exit(-1);
}
int rw=getMutexsemaphore((key_t)0X1005); //用于实现对共享文件的互斥访问
int mutex=getMutexsemaphore((key_t)0X1050); //用于实现对变量Count的互斥访问
int w=getMutexsemaphore((key_t)0X5050); //用于实现写优先

P(w); //先确保没有写进程在等待或者使用
P(mutex); //加锁访问Count
int* const countptr=getCount();
if(*countptr==0) //当前是第一个读进程
{
P(rw); //负责加读锁,不是第一个就不用加
}
++(*countptr); //访问文件进程数加一
V(mutex); //释放访问Count的锁
V(w); //释放写锁

//现在开始读文件内容
ReadOut(atoi(argv[1]));

//读完减少count
P(mutex);
--(*countptr);
if(*countptr==0) //当前是最后一个读进程
{
V(rw); //放开读锁
}
V(mutex);
printf("Reader已正常退出\n");
}

下面是写者程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

int main(int argc,char* argv[])
{
if(argc<=1)
{
printf("输入执行次数\n");
exit(-1);
}
const char* source=argv[1];
int rw=getMutexsemaphore((key_t)0X1005); //对文件操作加锁
int w=getMutexsemaphore((key_t)0X5050); //用于实现写优先,思路是额外加一个w锁,如果w被锁住,不然后面的reader直接访问count
//一旦有一个writer申请,后面的reader就必须等了

P(w);
P(rw); //操作文件时上锁
writeFile(source);
V(rw);
V(w);
printf("写进程完毕程序正常退出\n");
}

可以看到这实际上是一种读写平衡的策略

使用C++11能实现多线程读者写者问题吗

C++11并不提供进程间通信/同步机制,因此多进程版本的肯定是不行的,但是能不能使用C++实现多线程读者写者问题呢?
答案是勉强可以
实现的思路是什么呢,单纯使用std::mutex是不可以的(起码上面的算法不可以),这是因为: std::mutex要求加锁和解锁的线程必须是同一个这就是std::mutex和初值为1的信号量的区别
但是我们仍有办法实现,那就是使用大名鼎鼎的std::atomic,和信号量一样,std::atomic也是一个非常强大的工具,但是它的使用非常复杂(5helter也没能完成掌握),但是对它的使用里有一个特别的应用:

自旋锁

自旋锁是什么:

  • 字面意思就很明白,自己旋转,翻译成人话就是循环,一般是用一个无限循环实现。这样一来,一个无限循环中,执行一个 CAS 操作,当操作成功,返回 true 时,循环结束;当返回 false 时,接着执行循环,继续尝试 CAS 操作,直到返回 true
    CAS实现自旋锁的代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    #include <atomic>
    class SpinLock {

    public:
    SpinLock() : flag_(false)
    {}

    void lock()
    {
    bool expect = false;
    while (!flag_.compare_exchange_weak(expect, true))
    {
    //这里一定要将expect复原,执行失败时expect结果是未定的
    expect = false;
    }
    }

    void unlock()
    {
    flag_.store(false);
    }
    private:
    std::atomic<bool> flag_;
    };
    关于compare_exchange_weak网上有很多资料shelterz在这里不再赘述
    这个锁的本质决定了它支持加锁和解锁的线程不是同一个

实现多线程读者写者问题

下面是代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
SpinLock mutex_;
SpinLock rw;
int countR = 0;

void reader()
{
mutex_.lock();
cout << "count锁" << endl;
if (countR == 0)
{
rw.lock();
cout << "file锁" << endl;
}
++countR;
mutex_.unlock();

for (int i = 0; i < 5; i++)
{
cout << "r" << endl;
this_thread::sleep_for(chrono::seconds(1));
}


mutex_.lock();
--countR;
if (countR == 0)
{
rw.unlock();
cout << "file开锁" << endl;
}
mutex_.unlock();

}
void writer()
{
rw.lock();
for (int i = 0; i < 5; i++)
{
cout << "w" << endl;
this_thread::sleep_for(chrono::seconds(1));
}
rw.unlock();
}
int main()
{
vector<thread> ts;
while (true)
{
int a = 0;
cout << "请输入要执行的操作" << endl;
cin >> a;
if (a == 0)
{
ts.push_back(thread(reader));
}
else if (a == 1)
{
ts.push_back(thread(writer));
}
else
{
break;
}
}
for (auto& ele : ts)
{
ele.join();
}
}

这里只是使用输出简单表示了一下读写:)
那么为什么说只是勉强可以呢,不要忘了记录型信号量在有多个等待进程等待是会形成等待队列,而这里的atomic只是相当于整型信号量,有资源时等待的线程会争抢资源,而于等待的次序无关,这有可能导致线程饿死


Author: 5helter
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint policy. If reproduced, please indicate source 5helter !
  TOC