Linux Synchronization with Fifo Waiting Queue

Linux synchronization with FIFO waiting queue

Here is a way to create a simple queueing "ticket lock", built on pthreads primitives. It should give you some ideas:

#include <pthread.h>

typedef struct ticket_lock {
pthread_cond_t cond;
pthread_mutex_t mutex;
unsigned long queue_head, queue_tail;
} ticket_lock_t;

#define TICKET_LOCK_INITIALIZER { PTHREAD_COND_INITIALIZER, PTHREAD_MUTEX_INITIALIZER }

void ticket_lock(ticket_lock_t *ticket)
{
unsigned long queue_me;

pthread_mutex_lock(&ticket->mutex);
queue_me = ticket->queue_tail++;
while (queue_me != ticket->queue_head)
{
pthread_cond_wait(&ticket->cond, &ticket->mutex);
}
pthread_mutex_unlock(&ticket->mutex);
}

void ticket_unlock(ticket_lock_t *ticket)
{
pthread_mutex_lock(&ticket->mutex);
ticket->queue_head++;
pthread_cond_broadcast(&ticket->cond);
pthread_mutex_unlock(&ticket->mutex);
}

FIFO queue synchronization

What do you mean by "synchronized"? If your reader & writer are in separate threads, you want the FIFO to handle the concurrency "correctly", including such details as:

  • proper use of FIFO API should never cause data structures to be corrupted
  • proper use of FIFO API should not cause deadlock (although there should be a mechanism for a reader to wait until there is something to read)
  • the objects read from the FIFO should be the same objects, in the same order, written to the FIFO (there shouldn't be missing objects or rearranged order)
  • there should be a bounded time (one would hope!) between when the writer puts something into the FIFO, and when it is available to the reader.

In the Java world there's a good book on this, Java Concurrency In Practice. There are multiple ways to implement a FIFO that handles concurrency correctly. The simplest implementations are blocking, more complex ones use non-blocking algorithms based on compare-and-swap instructions found on most processors these days.

iTron data-queue in linux

On the few occassions I have needed to implement my own message queue, I tend to use 1 semaphore and 1 mutex(or semaphore) for each queue. I have only dealt with thread level queues, so this probably doesn't apply if you want a queue between two processes.

The semaphore is used to count the number of messages in the queue and provide the OS mechanism for thread to suspend/wait upon new messages.

The mutex is used to protect the overall queue structure.

So, it might look a bit like this (very much pseudo code):

DataQueueRx( Queue*, WORD*, timeout? )
{
WaitOnSemaphore( Queue->sema, timeout? ); //get token
LockMutex
{
//manipulate your queue, and transfer the data to WORD
}
ReleaseMutex
}

DataQueueTx( Queue*, WORD )
{
LockMutex
{
//manipulate your queue, inserting new WORD msg

ReleaseSemaphore(Queue->sema); //increment semaphore count
}
UnlockMutex
}

However, perhaps this isn't very "light weight". This also assumes that queues are not destroyed while in use. Also, I suspect that with a "WORD" only queue, there could be some optimizations.

If you are seeking "Lock-free code", then I suggest spending a day or two reading through these articles by Sutter.

Good luck!

Controlling the order of dequeing in the Semaphore Queue in Linux

I can think of two approaches:

  • Use a condition variable to "wake some or all waiters," who will sort out priority release themselves; or
  • Use (realtime) signals to "wake a single, specific waiter" in priority order

In each case, the semaphore has at least a mutex, a value, and some bookkeeping. If the value is below zero, its absolute value is the number of waiters (e.g., value == -3 means 3 threads are waiting).

Condition Variable Approach

The semaphore tracks the number of waiters at any given priority, as well as the number of waiters released at any given priority. In pseudo-C:

typedef struct priority_sem_s {
int value; // if negative, abs(sem->value) == no. of waiting threads
pthread_mutex_t mutex;
pthread_cond_t cv;
int n_waiting[N_PRIORITIES]; // no. waiting (blocked) at each priority
int n_released[N_PRIORITIES]; // no. waiters released (unblocked) at each priority
} priosem_t;

void post(priosem_t *sem):
lock(sem->mutex);
sem->value++;

if (sem->value <= 0 && prio_waiting_is_NOT_empty(sem)):
// someone was waiting; release one of the highest prio
int prio = fetch_highest_prio_waiting(sem);
sem->prio_waiting[prio]--;
sem->prio_released[prio]++;
cond_broadcast(sem->cv, sem->mutex);

unlock(sem->mutex);

void wait(priosem_t *sem, int prio):
lock(sem->mutex);
sem->value--;

if (sem->value < 0):
// get in line
sem->prio_waiting[prio]++;
while (sem->prio_released[prio] < 0):
cond_wait(sem->cv, sem->mutex);
// ok to leave
sem->prio_released[prio]--;

unlock(sem->mutex);

Advantages: Can be shared across processes (implemented in shared memory).

Disadvantages: Awakens every waiter to release just one. Martin James suggests one conditional variable per priority, which would reduce "unnecessary" wake-ups at the cost of more synchronization primitives.

Signal Approach

Use sigsuspend and a realtime signal with a noop handler to pause and resume waiters. In pseudo-C:

typedef struct priority_sem_s {
int value; // if negative, abs(value) == no. of waiting threads
pthread_mutex_t mutex;
void *waiting; // ordered list of [priority, thread-id] pairs
} priosem_t;

void post(priosem_t *sem):
lock(sem->mutex);
sem->value++;

if (sem->value <= 0 && waiting_queue_is_NOT_empty(sem)):
pthread_t tid = pop_highest_prio_waiter(sem);
pthread_kill(tid, SIGRTMIN+n);

unlock(sem->mutex);

void wait(priosem_t *sem, int prio):
// XXX --> PRECONDITION: SIGRTMIN+n is SIG_BLOCK'd <-- XXX
// XXX --> PRECONDITION: SIGRTMIN+n has a no-op handler installed <-- XXX
lock(sem->mutex);
sem->value--;

if (sem->value < 0):
// get in line
add_me_to_wait_list(sem, pthread_self(), prio);
unlock(sem->mutex);
sigsuspend(full_mask_except_sigrtmin_plus_n);
return; // OK!

unlock(sem->mutex);

Advantages: Conceptually simpler; no unnecessary wake-ups.

Disadvantages: Cannot be shared across processes. An available realtime signal must be chosen or dynamically selected (look for an unmasked signal with SIG_DFL disposition?) and masked as early as possible.

Process-shared condition variable : how to recover after one process dies?

Ok, quoting posix pthread_mutex_lock() reference:

If mutex is a robust mutex and the process containing the owning thread terminated while holding the mutex lock, a call to pthread_mutex_lock() shall return the error value [EOWNERDEAD]. [...] In these cases, the mutex is locked by the thread but the state it protects is marked as inconsistent. The application should ensure that the state is made consistent for reuse and when that is complete call pthread_mutex_consistent(). If the application is unable to recover the state, it should unlock the mutex without a prior call to pthread_mutex_consistent(), after which the mutex is marked permanently unusable.

So in addition to alk's comment to robustly handle processes dying with the mutex locked we need to watch for EOWNERDEAD when calling pthread_mutex_lock() and pthread_cond_timedwait(), and call pthread_mutex_consistent() on it.

Something like:

if ((errno = pthread_cond_timedwait(&ticket->cond, &ticket->mutex, &ts)) == 0)
continue;
if (errno == EOWNERDEAD) /* Recover mutex owned by dead process */
pthread_mutex_consistent(&ticket->mutex);
else if (errno != ETIMEDOUT)
fail("pthread_cond_timedwait");

Linux passive waiting for condition update

You will need to implement this in terms of some form of IPC. You mentioned using Linux, but I will assume POSIX-with-unnamed-semaphores (i.e. not OS X) since you aren't yet using anything Linux-specific. Others have mentioned this could be simpler if you used threads. But maybe you have some reason for using multiple processes, so I'll just assume that.

As specified, the code does not appear to allow adults to exit, which makes things a bit simpler. You already know how many children are allowed at any point in time, as that is directly proportional to the number of adults present at any given point in time.

To figure out how to solve the problem, let's consider how such a thing might be handled in real life. We can imagine that there is some kind of "gatekeeper" at the daycare. This "gatekeeper" is represented in the code by the sum of the state: the semaphore and the shared memory variables representing the number of adults and children present at any point in time. When no children are allowed to enter, the gatekeeper blocks entry and the children must form a line. I assume that the intent is that children are allowed to enter in a first-come-first-serve basis; this means you'll need to have some kind of FIFO to represent the queue. When a child leaves, the gatekeeper must be able to notify the first child in line that they are eligible to enter.

So this code is missing two things:

  1. Some kind of FIFO representing the ordering of children waiting to enter
  2. Some kind of notification mechanism that a child can wait for a notification on, and that the gatekeeper can trigger to "wake up" the child.

Now, the question is what data we store in this queue and how we do the notification. There are several options, but I'll discuss the two most obvious.

Making the child wait could be as simple as having the gatekeeper place the child PID at the tail of the FIFO and sending that PID SIGSTOP using kill(2). This may happen several times. Once a child leaves, the gatekeeper dequeues from the head of the FIFO and sends the pid SIGCONT.

As currently architected, the "gatekeeper" in your program is more of an abstract concept. A clearer implementation might implement a gatekeeper as a management process.

But since no such process exists, we need to imagine something like the child sees a "please wait" sign at the door and waits. The process representing the child can make itself wait by placing itself at the tail of the FIFO, and using the raise(3) library function, and sending itself SIGSTOP. Then, when any child leaves, it reads from the front of the FIFO and sends that pid SIGCONT using kill(2).

This implementation is relatively straightforward and the only additional resources required are to somehow represent the queue in shared memory.

An alternative approach would be to give each child its own file descriptor(s). This could be either a pipe(2), or a bidirectional file descriptor like a PF_LOCAL socket(2). Leaving the file descriptors in blocking mode, when a child is not allowed to enter, it puts (maybe the write-side of, if a pipe) its file descriptor at the tail of the FIFO, and blocks read(2)ing one byte from the read-side (which would be the same fd if not a pipe).

When a child exits, it pulls the entry from the front of the FIFO and write(2)s one byte to the file descriptor there. This will wake the child process that is blocked in read(2), and it will continue on its merry way into the daycare.

As previously stated, condition variables have also been suggested. I usually avoid them; they are easy to misuse, and you're already serializing the entry process.

In both the signal and the file descriptor case, a ring buffer of integers suffices -- so that's all the state you need to store in the FIFO.

The FIFO requires some careful consideration. Since multiple processes will be reading and manipulating it, it must also be in shared memory. Whether the FIFO is implemented as a ring buffer or some other way, you'll probably want to define some limits on the length of your queue. If there are too many children in line, maybe arriving children just "go home." Also, you'll need to make sure you handle the case of an empty FIFO gracefully on entry/exit, and make sure that transitioning from 1 waiter to 0 works as intended. Since you're serializing entry / exit with a semaphore, this should be straightforward.

Synchronizing processes with semaphores and signals in C

Taking your code verbatim onto a Mac running Mac OS X 10.10.3, using GCC 5.1.0, using the standard compilation options I use for compiling code from Stack Overflow, I get compilation warnings as shown (I called your code semshm.c):

$ gcc -O3 -g -std=c11 -Wall -Wextra -Wmissing-prototypes -Wstrict-prototypes \
> -Wold-style-definition -Werror semshm.c -o semshm
semshm.c:29:7: error: redefinition of ‘union semun’
union semun {
^
In file included from semshm.c:8:0:
/usr/include/sys/sem.h:176:7: note: originally defined here
union semun {
^
semshm.c:48:6: error: no previous prototype for ‘main_quit’ [-Werror=missing-prototypes]
void main_quit(int n) {
^
semshm.c: In function ‘main_quit’:
semshm.c:48:20: error: unused parameter ‘n’ [-Werror=unused-parameter]
void main_quit(int n) {
^
semshm.c: In function ‘main’:
semshm.c:54:17: error: unused variable ‘arg’ [-Werror=unused-variable]
union semun arg;
^
semshm.c: In function ‘process1’:
semshm.c:161:43: error: ordered comparison of pointer with integer zero [-Werror=extra]
while (fgets(tab, sizeof(tab), stdin) > 0) {
^
semshm.c: In function ‘process2’:
semshm.c:197:20: error: format ‘%d’ expects argument of type ‘int’, but argument 3 has type ‘size_t {aka long unsigned int}’ [-Werror=format=]
printf("[P2] Recieved \"%s\" with length %d.\n", tab, strlen(tab));
^
semshm.c:197:20: error: format ‘%d’ expects argument of type ‘int’, but argument 3 has type ‘size_t {aka long unsigned int}’ [-Werror=format=]
semshm.c: In function ‘signal_handling’:
semshm.c:239:26: error: unused parameter ‘snd’ [-Werror=unused-parameter]
void signal_handling(int snd, int rvc) {
^
semshm.c:239:35: error: unused parameter ‘rvc’ [-Werror=unused-parameter]
void signal_handling(int snd, int rvc) {
^
cc1: all warnings being treated as errors

The first problem is peculiar to Mac OS X; it seems to define union semun in <sys/sem.h> despite the POSIX specification for semctl()
saying explicitly:

The semctl() function provides a variety of semaphore control operations as specified by cmd. The fourth argument is optional and depends upon the operation requested. If required, it is of type union semun, which the application shall explicitly declare:

union semun {
int val;
struct semid_ds *buf;
unsigned short *array;
} arg;

The only excuse for the POSIX requirement is bad historical precedent, which Mac OS X has over-ridden. So, the double declaration of struct semun is not held against you. It is curious that POSIX requires an extra member, though, compared with what you specify. Anyway, I commented out that declaration to get past that.

There are other warnings, though many of them are not all that serious. Unused variables are not good, but not hugely harmful. The format mismatch between %d and size_t matters if, as on my machine, %d looks for a 32-bit value and size_t is a 64-bit quantity. The test for fgets() should be != 0; it returns NULL (or 0 — I use 0 too) on failure.

The most curious problem is the unused arguments to signal_handling(). The calls pass the global variables inter_snd and inter_rcv to the function, but the function ignores those values and simply manipulates the global variables. There is at least a serious disconnect between the calling and called code here.

For the purposes of getting on with life, I converted signal_handling() to a parameterless function void signal_handling(void); and left its body unchanged (but fixed all the calls to it). I added:

printf("%s(): signal %d\n", __func__, n);

to main_quit() in defiance of the advice on how to avoid using printf() in a signal handler. I also fixed the 'i' before 'e' except after 'c' spelling mistakes. Also 'quitting' has two t's and 'stopping' has two p's — English is such a weird language.

Finally, I extracted the two FIFO names (1fifo2 and 2fifo3) into an array:

static const char *fifoname[] = { "1fifo2", "2fifo3" };

and used either fifoname[0] or fifoname[1] in place of the literals. Repeated file name literals are a bad idea. I also used the new names to clean up (unlink(fifoname[0]); unlink(fifoname[1]);) the FIFOs if either existed after failing to create them.

  • Despite the collection of warnings, the code is not bad; few programs pass that set of options unscathed without some editing to make it pass cleanly.

With the code compiled, I ran it a few times. One of the last runs was:

$ ./semshm
[G] Launching
[G] Creating FIFO... [ OK ]
[G] Creating semaphores... [ OK ]
[G] Initializing semaphores values... [ OK ]
[G] Reserving shared memory... [ OK ]
[G] Building process tree... [ OK ]
[G] P1[pid]: 36030, P2[pid]: 36031, P3[pid]: 36032
[P2] Ready.
[P1] Ready.
[P3] Ready.
^C>> SIGNAL <<
main_quit(): signal 2
>> SIGNAL <<
>> SIGNAL <<
>> SIGNAL <<
absolute baloney
[P1] Sending: absolute baloney
Signal received...
[P2] Received "absolute" with length 8.
Signal received...
commestible goods for everyone
^C>> SIGNAL <<
>> SIGNAL <<
main_quit(): signal 2
>> SIGNAL <<
Sending to other processes
>> SIGNAL <<
Sending to other processes
>> SIGNAL <<
>> SIGNAL <<
>> SIGNAL <<
Quitting...
>> SIGNAL <<
Quitting...
[P2] Ending...
[P3] Received: 8 characters.
Signal received...
Sending to other processes
>> SIGNAL <<
>> SIGNAL <<
Quitting...
>> SIGNAL <<
[P3] Ending...
^\Quit: 3
$

I typed an interrupt, which was handled. I typed 'absolute baloney', of which the 'absolute' part was apparently received in one place, but the 'baloney' part never made it. That's puzzling. I typed 'commestible goods for everyone' and to all appearances, it was completely ignored. I tried another interrupt, which apparently interrupted things. I tried control-D (EOF), which apparently did nothing. I used control-backslash (control-\) to generate a quit signal, which did indeed stop the program. I'd done that before; reruns showed the wisdom of cleaning up after spotting that the FIFOs were already created as I got the failure message, then reran the code and it worked OK. The output shows green OK messages, and flashing yellow-ish >>> SIGNAL <<< messages.

So, you didn't tell us how to use the program, or the output to expect from it.

Your 'no compiler warnings' assertion was a little optimistic, but not too far off the truth.

Now you need to upgrade the question to specify/illustrate the inputs you give it and the expected behaviour from that input and to show the unexpected behaviour you actually get from it.

One semi-working version of semshm.c

This is the 'as compiled', partially annotated code that I compiled and ran. To compile it on Linux or other more accurately POSIX-compliant platform (more accurate than Mac OS X, that is), you'll need to uncomment one of the union semun blocks. I'd normally remove <sys/types.h> (modern POSIX almost never requires you to include it; older versions from the last millennium did require it — see POSIX 1997 which did, but compare with POSIX 2004 which did not).

#include <stdio.h>

…code co-opted into question…see revision 3 at https://stackoverflow.com/posts/30813144/revisions…

…I ran over the 30,000 character limit with this code on board…

}

Another mostly working version of semshm.c

Taking the extra information about how the code is supposed to be working, and putting in a lot of extra diagnostic printing, and observing that process 2 and 3 get stuck waiting on a semaphore which is not signalled, I came up with this variant of the code:

#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/ipc.h>
#include <unistd.h>
#include <sys/sem.h>
#include <signal.h>
#include <sys/shm.h>
#include <sys/wait.h>
#include <string.h>
#include <errno.h>

#define WRITE 1
#define READ 0
#define MEM_ID 1
#define MEM_SIZE 1

#define OK "[ \033[1;32mOK\033[0m ]\n"
#define ERR "[\033[1;31mFAIL\033[0m] "
#define SIGNAL "\033[1;33m\033[5m>> SIGNAL <<\033[0m\n"

#define S1 SIGINT
#define S2 SIGUSR1
#define S3 SIGUSR2
#define S4 SIGCONT

int process1(void);
int process2(void);
int process3(void);
void signal_callback(int signo);
void signal_handling(void);
void sem_down(int semid, int semnum);
void sem_up(int semid, int semnum);

char work = 1, quit = 0;
char inter_snd = 0, inter_rcv = 0;
struct sembuf semstruct;
int sem23, sem12, mem;
char *message;
int res[3];

static const char *fifoname[] = { "1fifo2", "2fifo3" };

static
void main_quit(int n)
{
printf("%s(): signal %d\n", __func__, n);
kill(res[0], S1);
}

int main(void)
{
printf("[G] Launching\n");

signal(SIGINT, main_quit);
signal(SIGTERM, main_quit);
// creating FIFO
printf("[G] Creating FIFO... ");
res[0] = mkfifo(fifoname[0], 0644);
res[1] = mkfifo(fifoname[1], 0644);
if ((res[0] == -1) || (res[1] == -1))
{
perror(ERR);
unlink(fifoname[0]);
unlink(fifoname[1]);
return 1;
}
else
printf(OK);

// create two semaphores and set values
printf("[G] Creating semaphores... ");
sem12 = semget(READ, 1, IPC_CREAT | 0644);
sem23 = semget(WRITE, 1, IPC_CREAT | 0644);

if ((sem23 == -1) || (sem12 == -1))
{
perror(ERR);
return 1;
}
else
printf(OK);

printf("[G] Initializing semaphores values... ");
semctl(sem12, 0, SETVAL, 0);
semctl(sem12, 1, SETVAL, 1);
semctl(sem23, 0, SETVAL, 0);
semctl(sem23, 1, SETVAL, 1);
printf(OK);

// creating shared memory
printf("[G] Reserving shared memory... ");
mem = shmget(MEM_ID, MEM_SIZE, IPC_CREAT | 0644);
message = (char *)shmat(mem, 0, 0);

if (mem == -1 || message == 0)
{
perror(ERR);
return 1;
}
else
printf(OK);

if ((res[0] = fork()) == 0)
{
process1();
printf("Returned from process1() - exiting\n");
exit(0);
}

if ((res[1] = fork()) == 0)
{
process2();
printf("Returned from process2() - exiting\n");
exit(0);
}

if ((res[2] = fork()) == 0)
{
process3();
printf("Returned from process3() - exiting\n");
exit(0);
}

printf("[G] Building process tree... ");
if ((res[0] == -1) || (res[1] == -1) || (res[2] == -1))
{
perror(ERR);
return 1;
}
else
{
printf(OK);
printf("[G] P1[pid]: %d, P2[pid]: %d, P3[pid]: %d\n", res[0], res[1], res[2]);
}

int corpse;
int status;
while ((corpse = wait(&status)) > 0)
printf("[G] PID %d exited with status 0x%.4X\n", corpse, status);

printf("[G] Deleting FIFO... ");
res[0] = unlink(fifoname[0]);
res[1] = unlink(fifoname[1]);
if ((res[0] == -1) || (res[1] == -1))
perror(ERR);
else
printf(OK);

printf("[G] Freeing shared memory... ");
res[0] = shmdt((char *)message);
res[1] = shmctl(mem, IPC_RMID, 0);
if ((res[0] == -1) || (res[1] == -1))
perror(ERR);
else
printf(OK);

printf("[G] Deleting semaphores... ");
res[0] = semctl(sem23, 0, IPC_RMID, 0);
res[1] = semctl(sem12, 0, IPC_RMID, 0);
if ((res[0] == -1) || (res[1] == -1))
perror(ERR);
else
printf(OK);

printf("[G] Ending...\n");
return 0;
}

int process1(void)
{
char tab[100];
FILE *fifoh;

signal(S1, signal_callback);
signal(S2, signal_callback);
signal(S3, signal_callback);
signal(S4, signal_callback);

fifoh = fopen(fifoname[0], "w");
setbuf(fifoh, NULL);

printf("[P1] Ready.\n");
while (fgets(tab, sizeof(tab), stdin) != 0)
{
if (work)
{
sem_down(sem12, WRITE);
printf("[P1] Sending: %s", tab);
fprintf(fifoh, "%s", tab);
sem_up(sem12, READ);
}
else
printf("[P1] Ignoring line because work is zero: %s", tab);
signal_handling();
}
fclose(fifoh);
printf("[P1] Ending...\n");
sem_up(sem12, READ); // Crucial
return 0;
}

int process2(void)
{
char tab[100];
FILE *fifo_in, *fifo_out;

printf("[P2] Ready.\n");

fifo_in = fopen(fifoname[0], "r");
fifo_out = fopen(fifoname[1], "w");

setbuf(fifo_out, NULL);
setbuf(fifo_in, NULL);

signal(S1, signal_callback);
signal(S2, signal_callback);
signal(S3, signal_callback);
signal(S4, signal_callback);

do
{
if (work)
{
printf("[P2]: Waiting on semaphore sem12/R\n");
sem_down(sem12, READ);
printf("[P2]: Proceed on semaphore sem12/R\n");
if (fscanf(fifo_in, "%s", (char *)tab) != 1)
{
printf("[P2]: EOF\n");
break;
}
printf("[P2]: Signalling semaphore sem12/W\n");
sem_up(sem12, WRITE);
printf("[P2]: Proceeding semaphore sem12/W\n");
printf("[P2] Received \"%s\" with length %zu.\n", tab, strlen(tab));
printf("[P2]: Waiting on semaphore sem23/R\n");
sem_down(sem23, WRITE);
printf("[P2]: Proceed on semaphore sem23/R\n");
fprintf(fifo_out, "%zu\n", strlen(tab));
printf("[P2]: Signalling semaphore sem23/W\n");
sem_up(sem23, READ);
printf("[P2]: Proceeding semaphore sem23/W\n");
}
else
printf("[P2] Looping: work is zero\n");
printf("[P2]: signal handling\n");
signal_handling();
printf("[P2]: signal handling done\n");
} while (!quit);

fclose(fifo_in);
fclose(fifo_out);
printf("[P2] Ending...\n");
sem_up(sem23, READ); // Crucial
return 0;
}

int process3(void)
{
FILE *fifo_in;
int count;

printf("[P3] Ready.\n");

signal(S1, signal_callback);
signal(S2, signal_callback);
signal(S3, signal_callback);
signal(S4, signal_callback);

fifo_in = fopen(fifoname[1], "r");
setbuf(fifo_in, NULL);

do
{
if (work)
{
printf("[P3]: Waiting on semaphore sem23\n");
sem_down(sem23, READ);
if (fscanf(fifo_in, "%d", &count) == 1)
printf("[P3] Received: %d as the length.\n", count);
else
{
printf("[P3] Failed to read an integer\n");
break;
}
sem_up(sem23, WRITE);
}
else
printf("[P3] Looping: work is zero\n");
printf("[P3]: signal handling\n");
signal_handling();
printf("[P3]: signal handling done\n");
} while (!quit);
fclose(fifo_in);
printf("[P3] Ending...\n");
return 0;
}

void signal_handling(void)
{
if (inter_snd > 0)
{
printf("PID %d: Signal received...\n", (int)getpid());

semstruct.sem_op = -3;
semop(sem23, &semstruct, 1);

*message = inter_snd;
inter_snd = 0;

semstruct.sem_op = 3;
semop(sem12, &semstruct, 1);

printf("Sending to other processes\n");
kill(0, S4);
}
else
printf("PID %d: inter_snd = %d\n", (int)getpid(), inter_snd);

if (inter_rcv)
{
inter_rcv = 0;

semstruct.sem_op = -1;
semop(sem12, &semstruct, 1);

switch (*message)
{
case 1:
printf("Quitting...\n");
quit = 1;
break;
case 2:
printf("Stopping...\n");
work = 0;
break;
case 3:
printf("Starting...\n");
work = 1;
break;
default:
printf("There's garbage in memory :/..\n");
}
semstruct.sem_op = 1;
semop(sem23, &semstruct, 1);
}
else
printf("PID %d: inter_rcv = %d\n", (int)getpid(), inter_rcv);
}

void signal_callback(int signo)
{
printf("%sSignal %d in PID %d\n", SIGNAL, signo, (int)getpid());
switch (signo)
{
case S1:
inter_snd = 1;
break;
case S2:
inter_snd = 2;
break;
case S3:
inter_snd = 3;
break;
case S4:
inter_rcv = 1;
break;
}
}

void sem_down(int semid, int semnum)
{
semstruct.sem_flg = 0;
semstruct.sem_num = semnum;
semstruct.sem_op = -1;}


Related Topics



Leave a reply



Submit