9.6 Message Queues
Message Queues sind erst einmal nichts anderes als Nachrichten, die in Form einer verketteten Liste verwaltet werden. Jede dieser Message Queues hat eine eindeutige Kennung, woran diese auch von anderen Prozessen identifiziert werden können. Sie können sich die Message Queues gerne als ein Art Mailbox für Prozesse vorstellen. Prozesse können dabei Daten in der Message Queue ablegen, die von anderen Prozessen wieder abgeholt werden. Diese Daten bleiben übrigens auch nach Beendigung des Erzeugers in der Message Queue.
Außer der normalen Warteschlange nach dem FIFO-Prinzip können Sie die Nachrichten auch mit einer Priorität versehen. Der Prozess, der dann diese Nachricht aus der Schlange (Queue) abholen will, kann dann die Nachrichten folgendermaßen abholen:
|
Nachricht mit der Priorität N – Dabei holt sich der Prozess die erste Nachricht in der Warteschlange mit der Priorität N, auch wenn diese als letzte eingefügt wurde. |
|
Nachrichten ab einer Priorität N – Hierbei ist der Prozess nicht nur an einer Nachricht der Priorität N interessiert, sondern an allen Nachrichten mit der gleichen und einer höheren Priorität. |
Das folgende Bild demonstriert Ihnen den simplen Vorgang:
Die Funktionen zur Verwendung der Message Queues sind denen der Semaphoren und dem anschließenden Shared Memory recht ähnlich.
9.6.1 Eine Message Queue öffnen oder erzeugen – msgget()
Wenn Sie eine existierende Message Queue erzeugen oder eine bereits existierende öffnen wollen, müssen Sie die Funktion msgget() dazu verwenden. Hier die Syntax zu dieser Funktion:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int flag);
Als ersten Parameter benötigt msgget() wieder die »magische« Nummer und als zweiten Parameter den oder die Flags. Für die Flags gilt auch hier dasselbe wie bei den Semaphoren:
|
IPC_PRIVATE – Öffnen eines privaten Schlüssels |
|
IPC_CREAT – Damit wird ein noch nicht existierender Schlüssel erzeugt. |
|
IPC_EXCL – Dieses Flag wird in der Regel mit dem bitweisen ODER-Operator hinter IPC_CREAT angefügt. Damit gehen Sie sicher, dass wirklich ein neues Objekt (eine Message Queue) angelegt wird und nicht ein bereits existierendes mit derselben Kennung. Existiert bereits eine Kennung, bricht die entsprechende Funktion – hier msgget() – mit einem Fehler ab (errno = EEXIST). |
Des Weiteren, nicht zu verachten, sollten Sie bei den Flags auch die entsprechenden Zugriffsrechte setzen, damit Sie überhaupt Zugriff auf die Message Queue haben. Der Rückgabewert der Funktion msgget() ist bei Erfolg die ID der Message Queue, die Sie bei den weiteren Funktionen benötigen, oder bei einem Fehler -1.
9.6.2 Nachrichten versenden – msgsnd()
Um eine Nachricht an eine existierende Message Queue zu versenden, steht Ihnen die Funktion msgsnd() zur Verfügung:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgsnd(int msg_id, const void *buf,
size_t m_len, int flag);
Damit schicken Sie eine Nachricht an die Message Queue mit der ID msg_id, die Sie von der Funktion msgget() erhalten haben. Das Argument buf beinhaltet die Adresse des Message-Typs, gefolgt von dem eigentlichen Message-Text. Häufig verwendet man für buf eine Struktur wie folgt:
struct my_nachricht {
long nachrichten_typ; /* Message-Typ */
char nachricht[MSG_LEN]; /* Message Text */
}
Das würde bedeuten, als Argument für buf wird die Adresse der selbst definierten Struktur my_nachricht übergeben. Die erste Variable der Struktur nachrichten_typ ist nur dann von Bedeutung, wenn Sie die Nachrichten mit der Funktion msgrcv() in einer anderen Reihenfolge empfangen wollen. Hier geben Sie also die Priorität der Nachricht an. Mit nachricht geben Sie in der Struktur my_nachricht den eigentlichen Inhalt der Nachricht an.
Mit dem dritten Parameter der Funktion msgsnd() geben Sie die Textlänge der Nachricht von buf an. In diesem Fall wäre es die Textlänge der Strukturvariablen nachricht – also MSG_LEN Bytes.
Mit dem letzten Parameter können Sie das Flag IPC_NOWAIT setzen, wenn Sie wollen, dass eine Message Queue nicht, wie es standardmäßig der Fall ist, blockiert, wenn diese voll ist. In dem Fall kehrt msgsnd() sofort mit einem Fehler (errno == EAGAIN) zurück.
Wenn die Nachricht mit der Funktion msgsnd() erfolgreich verschickt werden konnte, wird 0, ansonsten bei einem Fehler -1 zurückgegeben.
9.6.3 Eine Nachricht empfangen – msgrcv()
Um eine gesendete Nachricht aus einer Message Queue zu empfangen, steht Ihnen die Funktion msgrcv() zur Verfügung:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgrcv(int msg_id, void *buf,
size_t msg_len, long typ, int flag);
Die Funktion ist ähnlich aufgebaut wie die Funktion zum Senden einer Nachricht. Als erstes Argument bekommt msgrcv() die ID einer Message Queue. In der Adresse des zweiten Parameters befindet sich der Message-Typ und die Nachricht als Text. Mit msg_len geben Sie auch hier die Textlänge an, die aus dem Nachrichtentext gelesen werden soll. Mit typ legen Sie die Priorität fest, in welcher Reihenfolge die Nachrichten aus einer Message Queue gelesen werden sollen. Folgende Angaben können hierfür gemacht werden:
|
typ == 0 – Erste Nachricht aus einer Message Queue wird gelesen (nach dem FIFO-Prinzip). |
|
typ == N – Erste Nachricht wird aus einer Message Queue gelesen, die den Typ N hat. Das Gegenteil erreichen Sie mit dem Flag MSG_EXCEPT. |
|
typ < 0 – Erste Nachricht aus einer Message Queue wird empfangen, deren Typ der kleinste Wert ist, der kleiner oder gleich dem absoluten Wert von typ ist. |
Auch hier kann das Flag IPC_NOWAIT als letztes Argument angegeben werden. Damit können Sie verhindern, dass msgrcv() blockiert und auf das Eintreffen einer entsprechenden Nachricht wartet – was auch die Standardeinstellung ist. Allerdings bricht beim Setzen des Flags IPC_NOWAIT die Funktion msgrcv() mit einem Fehler ab (errno == ENOMSG). Bei erfolgreicher Ausführung gibt die Funktion msgrcv() die der empfangenen Nachricht zurück oder bei einem Fehler -1.
9.6.4 Abfragen, Ändern oder Löschen einer Message Queue – msgctl()
Will man den Zustand einer Message Queue abfragen, verändern oder gar löschen, steht hierfür die Funktion msgctl() zur Verfügung:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgctl(int msg_id, int kommando, struct msqid_ds *buf);
Das Argument msg_id ist wie gehabt die ID einer Message Queue, auf die sich die Operation mit msgctl() beziehen soll. Mit kommando legen Sie die auszuführende Aktion auf einer Message Queue fest. Dabei können folgende Angaben gemacht werden (entsprechende Rechte vorausgesetzt):
Tabelle 9.2
Kommandoangaben für msgctl()
Kommando
|
Bedeutung
|
IPC_STAT
|
Abragen des Status in einer Message Queue. Dabei werden die Informationen in den Puffer buf, der vom Typ struct msqid_ds ist, geschrieben.
|
IPC_SET
|
Setzen der UID/GID-Zugriffsrechte und der maximalen Größe einer Message Queue. Dabei werden Variablen in der Struktur msqid_ds beschrieben.
|
IPC_RMD
|
Eine Message Queue löschen
|
IPC_INFO
|
(nur Linux) Zusätzliche Informationen zu einer Message Queue
|
Mehr zu den einzelnen Variablen der Struktur msqid_ds können Sie der Headerdatei <bits/msq.h> entnehmen. In dieser Struktur befindet sich der momentane Status der Message Queue. In diesem Buch wird diese Struktur nicht benötigt. Hier nur ein kurzer Überblick dazu:
struct msqid_ds {
struct ipc_perm msg_perm;
struct msg *msg_first; // Erste Nachricht in der Queue
struct msg *msg_last; // Letzte Nachricht in der Queue
time_t msg_stime; // Letzter msgsnd()-Aufruf (Zeit)
time_t msg_rtime; // Letzter msgrcv()-Aufruf (Zeit)
time_t msg_ctime; // Letzte Änderungen (Zeit)
struct wait_queue *wwait;
struct wait_queue *rwait;
ushort msg_cbytes;
ushort msg_qnum; // Anzahl der Nachrichten in Queue
ushort msg_qbytes; // Max. Anzahl Bytes der Queue
ushort msg_lspid; // PID vom letzten msgsnd()-Aufruf
ushort msg_lrpid; // PID vom letzten msgrcv()-Aufruf
};
Mit dem folgenden Beispiel möchte ich Ihnen ein einfaches Server-Client-Prinzip mithilfe einer Message Queue demonstrieren. Sie können sich das Prinzip wie bei einer Mailingliste vorstellen. Zuerst erzeugen Sie einen Server, der die eingehenden Nachrichten der Clients empfängt. Um es einfach zu halten, werden als Nachrichtentyp die Kennung der Message-Queue-ID und als Text lediglich »Login« verwendet. Alle anderen bereits in den Server eingeloggten User bekommen vom Server Bescheid, wenn sich ein neuer User (Client) angemeldet hat. Der neu gestartete Prozess bekommt lediglich seine Message-Queue-ID und das Login bestätigt.
Zuerst benötigen Sie hierzu die gemeinsame Headerdatei:
/* msq_header.h */
#include <stdlib.h>
#include <signal.h>
/* Magische Nummer */
#define KEY 1234L
/* Begrenzung der Nachricht */
#define MSG_LEN 1024
/* Zugriffsrechte */
#define PERM 0666
#undef signal
/* Datentypen zum Senden und Empfangen der Nachrichten */
typedef struct {
long prioritaet;
char message[MSG_LEN];
} client2server;
typedef struct {
long prioritaet;
char message[MSG_LEN];
} server2client;
struct id_verwaltung {
int id;
struct id_verwaltung *next;
};
struct id_verwaltung *id_first = NULL;
static void mq_verwaltung_add( int i ) {
struct id_verwaltung *ptr;
if( id_first == NULL) {
id_first = malloc(sizeof( struct id_verwaltung ));
if( id_first == NULL )
exit(EXIT_FAILURE);
id_first->id = i;
id_first->next = NULL;
}
else {
ptr = id_first;
while(ptr->next != NULL)
ptr=ptr->next;
ptr->next = malloc(sizeof( struct id_verwaltung ));
ptr = ptr->next;
ptr->id = i;
ptr->next = NULL;
}
}
static void mq_verwaltung_remove( int i ) {
struct id_verwaltung *ptr_tmp;
struct id_verwaltung *ptr;
if(id_first == NULL)
return;
if( id_first->id == i ) {
ptr = id_first->next;
free(id_first);
id_first = ptr;
printf("User %d hat sich ausgeloggt\n",i);
return;
}
ptr = id_first;
while(ptr->next != NULL) {
ptr_tmp = ptr->next;
if( ptr_tmp->id == i ) {
ptr->next = ptr_tmp->next;
free(ptr_tmp);
printf("User %d hat sich ausgeloggt\n",i);
break;
}
ptr=ptr_tmp;
}
}
typedef void (sigfunk) (int);
sigfunk * signal (int sig_nr, sigfunk signalhandler) {
struct sigaction neu_sig, alt_sig;
neu_sig.sa_handler = signalhandler;
sigemptyset (&neu_sig.sa_mask);
neu_sig.sa_flags = SA_RESTART;
if (sigaction (sig_nr, &neu_sig, &alt_sig) < 0)
return SIG_ERR;
return alt_sig.sa_handler;
}
Die Daten, die in der Nachrichtenwarteschlange miteinander ausgetauscht werden, werden damit festgelegt:
typedef struct {
long prioritaet;
char message[MSG_LEN];
} client2server;
typedef struct {
long prioritaet;
char message[MSG_LEN];
} server2client;
Außerdem wurde mit den Funktionen msq_verwaltung_add() und msq_verwaltung_remove() eine dynamisch verkettete Liste erstellt, worin alle vorhandenen Client-Message-Queue-IDs aufgezeichnet und wieder entfernt werden. Dies dient anschließend dazu, dass der Server keine Nachrichten an einen nicht mehr existierenden Client verschickt. Das neue Signalkonzept wurde auch implementiert, das anschließend von einem Clientprozess zur Beendigung mit der Tastenkombination (STRG)+(C) für Aufräumarbeiten verwendet wird.
Als Erstes das Serverprogramm:
/* server_msq.c */
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/stat.h>
#include <stdio.h>
#include <unistd. h.>
#include "msq_header.h"
static int setup_server( key_t key, int flag ) {
int res;
res = msgget( key, flag );
if( res < 0 ) {
printf("Serverfehler beim Einrichten Message "
"Queues\n");
return -1;
}
return res;
}
static void verteilen( const char *str, int p ) {
int res;
server2client s2c;
struct id_verwaltung *ptr = id_first;
s2c.prioritaet = p;
sprintf(s2c.message,"%s",str);
while(ptr != NULL) {
res = msgsnd(ptr->id, &s2c, MSG_LEN, 0);
if( res < 0 )
printf("Konnte Nachricht an Client MQ %d nicht"
" zustellen ...\n",ptr->id);
else
printf("Nachricht an Client MQ %d zugestellt\n",
ptr->id);
ptr=ptr->next;
}
}
int main(void) {
int server_id, res;
client2server c2s;
char puffer[MSG_LEN];
int client_id;
server_id = setup_server( KEY, PERM | IPC_CREAT );
if( server_id < 0 )
return EXIT_FAILURE;
printf("Server läuft ...\n");
while(1) {
/* Nachricht empfangen */
res = msgrcv( server_id, &c2s, MSG_LEN, 0, 0 );
if( res < 0 ) {
printf("Fehler beim Empfangen einer Nachricht?\n");
return EXIT_FAILURE;
}
/*Nachricht auswerten */
if( c2s.prioritaet == 1 ) {
/* Message Queues aus der Verwaltung entfernen */
sscanf(c2s.message,"%d",&client_id);
mq_verwaltung_remove( client_id );
}
else {
sscanf(c2s.message,"%d:%s",&client_id, puffer);
/* client_id eventuell zur Liste hinzufügen */
mq_verwaltung_add( client_id );
/* Allen anderen User einen Hinweis schicken */
verteilen( puffer, client_id );
}
}
printf(" --- Server-Ende ---\n");
return EXIT_SUCCESS;
}
Der Server erzeugt erst eine Message Queue mit msgget(). Anschließend werden in einer Endlosschleife Daten (falls vorhanden) von den Clients empfangen, die an die Message Queue des Servers etwas geschickt haben. Hat ein Client eine Nachricht gesendet, wird erst noch überprüft, ob dies mit der Priorität 1 geschehen ist. Dies würde bedeuten, dass sich der Client mit (STRG)+(C) beendet hat. Somit wird diese Message Queue (mit entsprechender ID) aus der Liste entfernt. Ansonsten wird die Nachricht mit sscanf() ausgelesen, falls es eine neue Message Queue-ID war, in die Verwaltung eingefügt, und alle anderen eingeloggten Prozesse werden mit der Funktion verteilen() benachrichtigt.
Hinweis Die Client Message Queues können auch untereinander ohne den Server kommunizieren.
|
Jetzt zum Client-Programm:
/* client_msq.c */
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/stat.h>
#include <stdio.h>
#include <unistd. h.>
#include "msq_header.h"
static int ende = 1;
static int setup_client (key_t key, int flag) {
int res;
res = msgget (key, flag);
if (res == -1) {
printf ("Client-Fehler beim Einrichten"
" der Message Queues...\n");
return -1;
}
return res;
}
/* Der Client will sich beenden */
static void shutdown_msq (int signr) {
ende = 0;
}
int main (void) {
int server_id, client_id;
int res;
client2server c2s;
server2client s2c;
/* Signalhandler für STRG+C einrichten */
signal (SIGINT, shutdown_msq);
/* Eine Message Queue zum Server */
server_id = setup_client (KEY, 0);
if (server_id < 0)
return EXIT_FAILURE;
/* Eine Message Queue für den Client */
client_id = setup_client (IPC_PRIVATE, PERM | IPC_CREAT);
if (client_id < 0)
return EXIT_FAILURE;
/* Eine Nachricht an den Server versenden */
c2s.prioritaet = 2;
sprintf (c2s.message, "%d:Login", client_id);
res = msgsnd (server_id, &c2s, MSG_LEN, 0);
if (res == -1) {
printf ("Konnte keine Nachricht versenden ...\n");
return EXIT_FAILURE;
}
/* Bestätigung des Servers oder Rundschreiben */
/* von anderen Clients empfangen */
res = msgrcv (client_id, &s2c, MSG_LEN, 0, 0);
if (res == -1) {
printf ("Fehler beim Erhalt der Nachricht ...\n");
return EXIT_FAILURE;
}
/*Bestätigung oder Rundschreiben auslesen und ausgeben*/
printf ("%ld: %s\n", s2c.prioritaet, s2c.message);
while (ende) {
/* Hier könnte der wichtige Code zur Kommunikation */
/* zwischen den Prozessen geschrieben werden. */
/* In diesem Beispiel werden nur die neu erstellten */
/* Message Queues als neue User ausgegeben. Die */
/* Schleife wartet auf das Signal SIGINT == STRG+C */
/* Bestätigung oder Rundschreiben empfangen */
res= msgrcv (client_id, &s2c, MSG_LEN, 0, IPC_NOWAIT);
if (res != -1) {
printf ("(%s) von User mit der Message-Queue-ID: "
" %ld\n", s2c.message, s2c.prioritaet);
}
/* Dauerndes Pollen belastet unnötig die CPU */
/* – Eine Bremse (siehe top mit und ohne usleep() */
usleep( 1000 );
}
/* STRG+C also das Signal SIGINT wurde ausgelöst ... */
c2s.prioritaet = 1;
sprintf (c2s.message, "%d", client_id);
res = msgsnd (server_id, &c2s, MSG_LEN, 0);
if (res == -1) {
printf ("Konnte keine Nachricht versenden ...\n");
return EXIT_FAILURE;
}
/* Message Queues entfernen */
msgctl (client_id, IPC_RMID, NULL);
return EXIT_SUCCESS;
}
Der Client erzeugt sich zuerst eine eigene Message Queue und richtet anschließend eine Verbindung zur Server Message Queue ein. Anschließend sendet (msgsnd()) der Client eine Nachricht an den Server (praktisch die Anmeldung) und wartet mit msgrcv() auf die Antwort. Anschließend geht es in die Endlosschleife, die mithilfe des Signals SIGINT ((STRG)+(C)) und der Funktion shutdown_msq beendet werden kann. In der Schleife selbst könnten Sie jetzt wichtige Arbeiten, die für eine Message Queue üblich sind, ausführen. Hier wurde der Einfachheit halber nur jeder neu eingeloggte Client als Nachricht empfangen und ausgegeben. Außerdem wird nach Beendigung mit (STRG)+(C) die Message Queue des Clients mit msgctl() entfernt.
Das Programm bei der Ausführung in einer Beispielsitzung:
---[tty1]---
$ gcc -o server_msq server_msq.c
$ gcc -o client_msq client_msq.c
$ ./server_msq
Server läuft ...
---[tty2 -> andere Konsole]---
$ ./client_msq
524291: Login
---[tty1 währenddessen]---
$ ./server_msq
Server läuft ...
Nachricht an Client MQ 524291 zugestellt
---[tty3 -> andere Konsole]---
$ ./client_msq
557060: Login
---[tty1 währenddessen]---
$./server_msq
Server läuft ...
Nachricht an Client MQ 524291 zugestellt
Nachricht an Client MQ 524291 zugestellt
Nachricht an Client MQ 557060 zugestellt
---[tty2 währenddessen]---
$ ./client_msq
524291: Login
(Login) von User mit der Message-Queue-ID: 557060
---[tty3]---
$ ./client_msq
557060: Login
STRG+C
$
---[tty1 währenddessen]---
$./server_msq
Server läuft ...
Nachricht an Client MQ 524291 zugestellt
Nachricht an Client MQ 524291 zugestellt
Nachricht an Client MQ 557060 zugestellt
User 557060 hat sich ausgeloggt
---[tty2]---
$./client_msq
524291: Login
(Login) von User mit der Message-Queue-ID: 557060
STRG+C
$
---[tty1]---
$./server_msq
Server läuft ...
Nachricht an Client MQ 524291 zugestellt
Nachricht an Client MQ 524291 zugestellt
Nachricht an Client MQ 557060 zugestellt
User 557060 hat sich ausgeloggt
User 524291 hat sich ausgeloggt
Sie können das Beispiel gerne anhand mehrerer Clients testen.
|