联系我们 - 广告服务 - 联系电话:
您的当前位置: > 综合 > > 正文

天天新消息丨进程间的通信——消息队列的创建与使用

来源:CSDN 时间:2023-02-27 07:42:06

文章目录

消息队列


【资料图】

概述:

特点:

消息队列的创建与使用

函数接口:

使用:

ipcs和ipcrm用法:

信号量

概述:

相关概念的认识:

(一)进程关系

(二)临界资源和临界区

(三)原子操作

(四)PV操作

信号量接口的介绍:

信号量的使用:

封装系统调用:

案例使用:

思考:三个进程a.b,c分别输出"A","B","C",要求输出结果必须为"ABCABCABC....."

共享内存

概述:

特点:

共享内存的使用与创建

共享内存接口介绍 :

案例使用:

进程间的通信上节讲解:进程之间的通信(管道详解)

消息队列

概述:

消息队列是消息的链表,存放在内存中,由内核维护。消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法,这个数据块由消息类型和数据等信息组成。消息队列遵循先进先出的策略,但因为存在消息类型,有优先级,故消息队列类似于一个优先级队列。

整个过程就好比进程A要给进程B发送消息,A进程把消息放在对应的消息队列后就可以正常返回,B进行需要读取数据时进入相应的消息队列去读取

特点:

消息队列中的消息是由类型的。消息队列中的消息是有格式的。消息队列可以实现消息的随机查询。消息不一定要先进先出的次序读取,编程时可以按照消息的类型读取。消息队列允许一个或多个进程向他写入或者读取消息。与无名管道,命名管道一样,从消息队列中读取消息,消息队列中对应的数据会被删除。每个消息队列都有消息队列的标识符,消息队列的标识符在整个系统中是唯一的。只有内核重启或人工删除消息队列时,该消息队列才会被删除。若人工删除消息队列,消息队列会一直存在于系统中。

消息队列的创建与使用

函数接口:

int msgget(key_t key, int msqflg);msgget()创建或者获取一个消息队列 msgget()成功返回消息队列 ID,失败返回-1 key值可以是人为指定的数字,强转为key_t类型即可,也可以通过ftok 函数获得。人工指定key值:(key_t)1234;//1234就为key值,在创建获取消息队列函数的第一个参数可以这样写通过函数获取:# include# includekey_t ftok(const char* pathname,int proj_id);             成功返回key值,失败返回-1参数为:pathname :路径名;       proj_id:项目ID,非0整数,只有低8位有效。msqflg:标识函数的行为及消息队列的权限IPC_CREAT:创建消息队列IPC_EXCL:检测消息队列是否存在位或权限位:可以设置消息队列的访问权限,和其他两个参数可以或表示,如0600,仅拥有者具有文件的读取和写入权限

int msgsnd(int msqid, const void *msqp, size_t msqsz, int msqflg); msgsnd()发送一条消息; //参数意义: msqid:消息队列的id,因为系统中可能有多个消息队列,这id指明往哪个消息队列中添加数 据; msqp:往消息队列中添加的结构体,消息结构为: struct msgbuf {    long mtype; // 消息类型, 必须大于0(或者说>=1),长整型,比如图中的1,2     char mtext[1]; // 消息数据 ,用户自己定义,可以是任何类型;这里存放消息数据 };msqsz: 指定 mtext 中有效数据的长度//注意仅仅只接收的数据的大小,不包含消息类 型的大小; msqflg:一般设置为 0 可以设置 IPC_NOWAIT msgsnd()成功返回0,失败返回-1;

ssize_t msgrcv(int msqid, void *msgp, size_t msqsz, long msqtyp, int msqflg); msgrcv()接收一条消息 msgrcv()成功返回 mtext 中接收到的数据长度, 失败返回-1, msgp:接收消息的结构体,一般约定好,写入什么,接收什么结构体; msqsz:接收消息的大小,大于等于发送的消息大小,一般和发送的消息大小一样. msqtyp: 指定接收的消息类型(图中的1和2),类型可以为 0,为0表示不区分消息类型, 按顺位接收消息; msqflg: 一般设置为 0 可以设置 IPC_NOWAIT

int msgctl(int msqid, int cmd, struct msqid_ds *buf); msgctl()控制消息队列,也就是对消息队列做一个控制,可以设置消息队列,也可以移除消息队列; msgctl()成功返回 0,失败返回-1 msqid:消息标识符,即由msgget返回的消息队列表示码cmd(1)IPC_STAT:把msqid_ds结构体中个元素的当前值存入到由buf指向的结构体中。(2)IPC_SET:把msqid结构体中的元素设置为buf中的对应值。(3)IPC_RMID:从系统中删除该消息队列以及仍在该队列中的所有数据,这种删除立即生效。buf:设置或者获取消息队列需要的一个结构体,如果删除就直接给一个NULL;msgctl(msgid,IPC_RMID,NULL);消息队列在内存中创建;

使用:

进程a发送一条消息,进程b读取消息;

利用消息队列进行进程间通讯的方式,我们可以类比为:A需要发送信息给B,但是因为距离原因当面给A,那么就需要A先把信息给XX地址的SS驿站,驿站将其存储,存储箱编号为YYYYY,B要取出这个信息,必须到XX地址的SS驿站取出编号为YYYYY的箱子,打开才可以获取。

在消息队列中,键(key)值相当于XX地址,消息队列标识符相当于SS驿站,消息类型相当于YYYYY编号的箱子。

同一个键(key)值可以保证是同一个消息队列,同一个消息队列标识符才能保证不同进程可以相互通信,同一个消息类型才能保证某个进程取出的是对方的信息。

a.c#include#include#include#include#include#includestruct mess{    long type;    char buff[128];};int main(){   int msgid= msgget((key_t)1234,IPC_CREAT|0600);//自己指定的key值,也可以通过ftok获取   assert(msgid!=-1);   struct mess dt;   dt.type=1;   strcpy(dt.buff,"hello1");   //发送消息   msgsnd(msgid,&dt,128,0);   exit(0);}

b.c#include#include#include#include#include#includestruct mess{    long type;    char buff[128];};int main(){   int msgid= msgget((key_t)1234,IPC_CREAT|0600);   assert(msgid!=-1);   struct mess dt;   //接收   msgrcv(msgid,&dt,128,1,0);   printf("%s",dt.buff);   exit(0);}

发送三次消息,读两次

将b.c接收处改为IPC_NOWAIT,没有消息不等待,即使没有消息队列,里面也不会阻塞

if( msgrcv(msgid,&dt,128,1,IPC_NOWAIT)==-1)  {      printf("no mess");  }  else   printf("%s",dt.buff);

ipcs和ipcrm用法:

ipcs用法

ipcs==ipcs -a 是默认的输出信息 打印出当前系统中所有的进程间通信方式的信息ipcs -m 打印出使用共享内存进行进程间通信的信息ipcs -q 打印出使用消息队列进行进程间通信的信息ipcs -s 打印出使用信号进行进程间通信的信息

输出格式的控制

ipcs -t 输出信息的详细变化时间ipcs -p 输出ipc方式的进程ID ipcs -c 输出ipc方式的创建者/拥有者ipcs -c 输出ipc各种方式的在该系统下的限制条件信息ipcs -u 输出当前系统下ipc各种方式的状态信息(共享内存,消息队列,信号)

ipcrm 命令

移除一个消息对象。或者共享内存段,或者一个信号集,同时会将与ipc对象相关链的数据也一起移除。当然,只有超级管理员,或者ipc对象的创建者才有这项权利啦

ipcrm用法

ipcrm -M shmkey 移除用shmkey创建的共享内存段ipcrm -m shmid 移除用shmid标识的共享内存段ipcrm -Q msgkey 移除用msqkey创建的消息队列ipcrm -q msqid 移除用msqid标识的消息队列ipcrm -S semkey 移除用semkey创建的信号ipcrm -s semid 移除用semid标识的信号ipcrm -a 删除所有进程间通信资源

信号量

概述:

信号量就是控制某个进程能够对某个资源进行访问;保证同一时刻只能由一个进程对某个资源进程访问; 信号量主要用于进程或线程的同步和互斥

信号量是一个特殊的变量,一般取正数值。它的值代表允许访问的资源数目, 获取资源时,需要对信号量的值进行原子减一,该操作被称为 P 操作。

当信号量值为 0时,代表没有资源可用, P 操作会阻塞。释放资源时,需要对信号量的值进行原子

加一,该操作被称为 V操作。

信号量的值如果只取0,1:将其称为二值信号量。

如果信号量的值大于 1:则称之为计数信号量。

相关概念的认识:

(一)进程关系

1.进程同步:进程之间相互合作,协同工作的关系称为进程的同步。简单来说就是多个相关进程在执行次序上的协调,谁先执行后执行都有顺序。是一种直接制约关系。2.进程互斥:多个进程因为争夺临界资源而互斥执行称为进程的互斥。是一种间接制约关系。

比如多个人打篮球,篮球是临界资源,形成互斥,对他们形成了一个间接制约关系。工厂上流水线工作,一道工序接着下一道,有明确的次序,形成同步,上一道的工序对下一道工序有直接影响,是一种直接制约关系。

(二)临界资源和临界区

1.临界资源:在操作系统中,把那些可以被进程共享的资源(文件,打印机等),但是在同一时刻只允许一个进程/线程访问的资源统称为临界资源或共享变量。2.临界区:访问临界资源的那段代码。

(三)原子操作

原子操作是指不会被进程/线程调度机制打断的操作,这种操作一旦开始,就一直运行到结束,中间不会有任何进程切换。故原子操作要不然不做,要不然一直做到结束。

(四)PV操作

P(SV):如果SV的值大于1,进行减一操作,表示获得资源;如果SV的值为0,则挂起进程的执行,即阻塞,因为目前已经没有资源了。 V(SV):如果当前有挂起的进程在等待资源,那么执行V就会将它唤醒;如果没有,SV加一,即释放资源。P操作会出现阻塞,V操作永远不会阻塞。

信号量接口的介绍:

int semget(key_t key,int nsems,int semflg);创建或者获取一个已经存在的信号量;key:两个进程使用相同的key值,就可以使用同一个信号量;nsems:创建几个信号量;semflg:标志位;如果为创建:IPC_CREAT;             IPC_EXCL:检测信号量集合是否存在             位或权限位:可以设置信号量集合的访问权限,和其他两个参数可以或表示 //运行的时候:./a& ./b&如果为全新创建,也就是不知道是否有人创建过,则IPC_CREATE|IPC_EXCEL,就是如果没有则创建,如果有则创建失败 ;

int semop(int semid,struct sembuf *sops,unsigned nsops); 对信号量进行改变,做P操作或者V操作;semid:信号量的id号,也就是刚才semget的返回值;说明对哪个信号量进行操作;sops:结构体指针,指向sembuf的结构体指针,sembuf结构体有三个成员变量:sem_num表示信号量的编号(即指定信号量集中的信号量下标);sem_op表示是p还是v操作;1为v操作(加1),-1为p操作(减1);若此时已经没有资源了进行-1操作:如果指定了IPC_NOWIT,会出错返回。未指定,进程会被挂起,知道有资源或捕捉到信号结束挂起。sem_flg为标志位; struct sembuf{     unsigned short sem_num;//信号量ID     short          sem_op; //信号量操作     short          sem_flg;//操作标志};

int semctl(int semid,int semnum,int cmd,...); 对信号量进行控制:初始化/删除信号量semid:信号量id;semnum:信号量编号;cmd:命令:SETVAL:初始化信号量; IPC_RMID:删除信号量;**注意**:联合体semun,这个联合体需要自己定义;union semun{   int              val;   //信号量的值   struct semid_ds* buf;   // ipc_stat,ipc_set的缓冲区   unsigned short* array;  //SETALL,GETALL数组   struct seminfo* _buf;   //ipc_info缓冲区(Linux专用) };

信号量的使用:

封装系统调用:

在进行进程同步控制时,经常说到P、V操作,那么如何用上述的系统调用封装一系列的函数,让我们使用起来方便。根据需求我们将其分为4类函数:创建/获取信号量;P操作;V操作;删除信号集合。我们将表示信号量信息的union semun联合体(因为业务简单,所以联合体中只包含信号量的值val即可)和函数声明写在自己创建的.h文件中。

# include# include# include# include# include//保存信号量值的联合体union semval{    int val;};//创建/获得信号量集合int CreateSem(int key,int init_val[],int len);//减1操作void SemP(int semid,int index);//加1操作void SemV(int semid,int index);//删除信号量集合void DeleteSem(int semid);

具体操作实现:

# include# include# include# include# include# include# include "sem.h"//创建/获得信号量集合int CreateSem(int key,int init_val[],int len){    //    int semid=semget((key_t)key,0,0664);    if(semid!=-1)    {        return semid;    }    //    semid=semget((key_t)key,len,IPC_CREAT|0664);    if(semid==-1)    {        perror("Create Sem Error\n");        return -1;    }    //    int i=0;    for(;i<LEN;I++) p="" union="" semval="" data;="" data.val="init_val[i];" if(semctl(semid,i,setval,data)="=-1)" perror(?init="" sem="" value="" fail\n?);="" -1;="" }="" return="" semid;}="" 减1操作void="" semp(int="" 加1操作void="" semv(int="" semid,int="" index){="" struct="" sembuf="" buf;="" buf.sem_num="index;" buf.sem_op="-1;//" buf.sem_flg="SEM_UNDO;" if(semop(semid,&buf,1)="=-1)" perror(?sem="" v="" }}="" 删除信号量集合void="" deletesem(int="" semid){="" if(semctl(semid,0,ipc_rmid)="=-1)" {="" perror(?delete="" error\n?);="" }}

案例使用:

例题: 进程a和进程b模拟访问打印机,进程a输出第一个字符’a’表示开始使用打印机,输出第二个字符‘a’表示结束打印机;b进程的操作和a进程相同,每个进程循环使用打印机5次。现在打印机为临界资源,所以在某一时刻它只能被一个进程使用,所以输出的结果不应该出现:abab,abba等情况。采取信号量进行同步控制:打印机输入临界资源,那么设定一个信号量来管理打印机:

它只有一台,所以初始值为1,在a进程使用时,我们对其进行一个P操作,即让a获取打印机,此时信号量为0,如果b来访问就会被挂起。在a使用完打印机后,我们释放打印机资源,进行V操作,这是信号量为1,b进程这时就可以访问打印机,b也是一样的,访问前进行一次P操作获取,打印完毕进行一次V操作释放。

a.c# include# include# include# include# include# include# include "sem.h"int main(){    int val=1;//信号量初始值    int semid=CreateSem(1234,&val,1);//a先运行,创建信号集合IDkey为1234,值为1,1个信号量的信号集合    assert(semid!=-1);    int count=0;//使用打印机次数    while(1)    {        SemP(semid,0);//P,占有打印机        printf("a\n");        sleep(5);        printf("a\n");        SemV(semid,0);//V,释放打印机        sleep(2);        count++;        if(count==5)        {            break;        }    }    exit(0);}

b.c# include# include# include# include# include# include# include "sem.h"int main(){    int val=1;//信号量初始值    int semid=CreateSem(1234,&val,1);//b后运行,直接获取到信号量    assert(semid!=-1);    int count=0;//使用打印机次数    while(1)    {        SemP(semid,0);//P,占有打印机        printf("b\n");        sleep(5);        printf("b\n");        SemV(semid,0);//V,释放打印机        sleep(2);        count++;        if(count==5)        {            break;        }    }    exit(0);}

编译运行gcc -o a a.c sem.c gcc -o b b.c sem.c

我们观察结果,发现没有出现abab这种a,b进程混乱使用打印机的现象,都是a/b进程打印完了,释放资源,a/b进程再进行获取打印。

思考:三个进程a.b,c分别输出"A","B","C",要求输出结果必须为"ABCABCABC....."

信号量用于同步:有多少个任务就需要多少个信号量。最先 执行的任务对应的信号量为1,其他信号量全为0,每个任务先p自己,v下一个任务的信号量

共享内存

概述:

共享内存为多个进程之间共享和传递数据提供了一种有效的方式。共享内存是先在物理内存申请一块空间,多个进程可以将其映射到自己的虚拟地址空间中。所有进程都可以访问共享内存中的地址,就好像他们都是由malloc分配一样。如果某个进程向共享内存写入了数据,所做的改动将立即可被访问同一段共享内存的任何其他进程看到。

由于他并未提供同步机制,也就是说,在第一个进程结束对共享内存的写操作之前,并无自动机制可以阻止第二个进程开始对它进行读取。所以我们通常需要用其他的机制来同步对共享内存的访问。例如前面说到的信号量

特点:

注意:共享内存区是最快的IPC形式。一旦这样的内存映射到共享它的进程的地址空间,这些进程间数据传递不再涉及到内核,换句话说是进程不再通过执行进入内核的系统调用来传递彼此的数据。

管道和消息队列需要四次数据拷贝:因为管道和消息队列进行共享的空间都是由内核对象提供管理,所执行的操作也都是系统调用,而这些数据最终还是要在内存中存储的。管道使用数组来保存数据,利用write,read系统调用将数据写入/读取;消息队列使用自定义消息结构体保存数据,利用msgsnd,msgrcv将数据保存/读取到内核中。我们画出管道的拷贝过程:

过程分别是:

从内存空间缓冲区将数据拷贝到内核空间缓冲区。从内核缓冲区将数据拷贝到内存从内存将数据拷贝到内核空间缓冲区从内核空间缓冲区将数据拷贝到用户空间缓冲区。

而共享内存使用相关函数,在内存中开辟一块空间,映射到不同进程的虚拟地址空间,并且向用户返回指向该块内存的指针,因此对该内存可通过指针直接访问,只需要两次:

过程为:

从用户空间缓冲区拷贝数据到内存。从内存拷贝数据到用户空间缓冲区。

共享内存的使用与创建

下面我会详细介绍shm,还有一种内存共享的机制mmap内存映射,下面附相关链接

共享内存机制——mmap和shm_广州市民林先生的博客-CSDN博客_mmap和shm

共享内存与存储映射(mmap) - 秋雨声 - 博客园 (cnblogs.com)

关于共享内存shm和内存映射mmap的区别是什么? - 知乎 (zhihu.com)

mmap()共享内存详解_KuoGavin的博客-CSDN博客_共享内存mmap

共享内存接口介绍:

buf:内核为每个共享内存创建的结构体shmid_dsstruct shmid_ds {     struct ipc_perm shm_perm;   //用户权限     size_t          shm_segsz;  //共享内存大小     time_t          shm_atime;  //最后一次连接时间     time_t          shm_dtime;  //最后断开连接时间     time_t          shm_ctime;  //最后改变事件     pid_t           shm_cpid;   //PID     pid_t           shm_lpid;   //最后一个PID     shmatt_t        shm_nattch; //多少个进程正在使用这个共享存储};

案例使用:

题目: 两个进程,A进程将获取到的用户数据写入内存,B进程打印共享内存中的数据。

如果我们不对这块共享内存进行同步控制,就会出现读取混乱的问题,会出现A写入的数据还没有被B读,A又重新写入了等问题,所以我们必须要对这块内存进行一个同步控制,就采用信号量来控制,我们分析一下,如何控制A,B才可以实现:A写数据时,B不能读数据;B读数据时,A不能写数据,B进程只有在A写入数据后才可读取数据,A只有在B取出数据后才可以继续发数据。

可以看到A对B有影响,B对A有影响。故需要两个信号量控制,sem1控制A,sem2控制B。信号量初始值的考虑:运行程序,A进程先写入数据,B进程处于阻塞状态,故sem1=1,sem2=0

图示:

完整过程:

先进行shmget函数对内存空间的创建和初始化,再连接共享内存,用ptr保存shmat连接到的内存地址。信号量的创建,初始化。进行循环读取数据,对sem1信号量P操作,进行数据的写入,写完之后对sem2信号量V操作。B进程这一块相反,shmdt断开共享内存连接。shcmtl删除共享内存

//sem.h//两个信号量#include#include#include#define SEM1 0#define SEM2 1#define SEM_MAX 2union  semun{int val;};void sem_init();void sem_p(int index); //对哪个信号量进行操作void sem_v(int index);void sem_destroy();

//sem.c#include"sem.h"static int semid = -1;void sem_init(){semid = semget((key_t)1234, SEM_MAX, IPC_CREAT | IPC_EXCL | 0600);//全新创建if (semid == -1){semid = semget((key_t)1234, SEM_MAX, 0600);//获取if (semid == -1){printf("create sem failed!\n");}else{//初始化union semun a;int arr[SEM_MAX] = { 1,0 };for (int i = 0; i < SEM_MAX; ++i)//初始化两个信号{a.val = arr[i];if (semctl(semid, i, SETVAL, a) == -1){printf("semctl setval failed!\n");}}}}}void sem_p(int index){if (index < 0 || index >= SEM_MAX){return;}struct sembuf a;a.sem_num = index;a.sem_op = -1;//p操作a.sem_flg = SEM_UNDO;if (semop(semid, &a, 1) == -1){perror("p error\n");}}void sem_v(int index){if (index < 0 || index >= SEM_MAX){return;}struct sembuf a;a.sem_num = index;a.sem_op = 1;//v操作a.sem_flg = SEM_UNDO;if (semop(semid, &a, 1) == -1){perror("v error\n");}}void sem_destroy(){if (semctl(semid, 0, IPC_RMID) == -1){perror("destroy sem error\n");}}

//w.c写数据#include#include#include#include#include#include#include"sem.h"int main(){int shmid=shmget((key_t)1234, 256, IPC_CREAT | 0600);//获取assert(shmid != -1);char* s = (char*)shmat(shmid, NULL, 0);//映射if (s == (char*)-1){exit(1);}sem_init();while (1){printf("input:\n");char buff[128] = {0};fgets(buff, 128, stdin);sem_p(SEM1);strcpy(s, buff);sem_v(SEM2);if (strncmp("buff", "end", 3) == 0){break;}}shmdt(s);//断开exit(0);}

//r.c读数据#include#include#include#include#include#include#include"sem.h"int main(){int shmid = shmget((key_t)1234, 256, IPC_CREAT | 0600);//获取assert(shmid != -1);char* s = (char*)shmat(shmid, NULL, 0);//映射if (s == (char*)-1){exit(1);}sem_init();while (1){sem_p(SEM2);if (strncmp(s, "end", 3) == 0){break;}printf("read:%s\n", s);sem_v(SEM1);}shmdt(s);//删除共享内存sem_destroy();//销毁信号量shmctl(shmid, IPC_RMID, NULL);exit(0);}

进程的更新就到这了,感谢观看!

责任编辑:

标签:

相关推荐:

精彩放送:

新闻聚焦
Top