How do you implement a bounded FIFO buffer in C-Linda? Assume the buffer contains integers (to make life a little easier). There are a number of producers and a number of consumer. Assume that the functions consume and produce have already been defined. Now consider the functions consumer and producer.

void consumer() 
{
   int data;
   while (1) 
   {
      data = get();
      consume(data);
   }
}
 
void producer() 
{
   int data;
   while (1) 
   {
      data = produce();
      put(data);
   }
}

A single consumer and producer can be created as follows.

void real_main() 
{
   eval("consumer", consumer());
   eval("producer", producer());
}

How can the above be extended to create multiple consumers and producers?

It remains to define the functions get and put.

int get()
{
   int data;
 
 
 
   return data;
}
 
void put(int data)
{
 
}

How should get and put be completed?


A try for comment:

void down_mutex()
{
  in("mutex", 1);
  out("mutex", 0);
}
 
void up_mutex()
{
  in("mutex", 0);
  out("mutex", 1);
}
 
int get()
{
  int i,j, data;
  in("full");
  down_mutex();
  rd("size",?i);
  in("head",?j);
  in("buffer",j,?data);
  j=(j+1)%i;
  out("head",j);
  up_mutex();
  out("empty")
  return data;
}
 
void put(int data)
{
  int i,j;
  in("empty");
  down_mutex();
  rd("size",?i);
  in("tail",?j)
  out("buffer",j,?data);
  j=(j+1)%i;
  out("tail",j);
  up_mutex();
  out("full");
  return;
}
 
void consumer()
{
  int data;
  while (1)
  {
    data = get();
    consume(data);
  }
}
 
void producer()
{
  int data;
  while (1)
  {
    data = produce();
    put(data);
  }
}
void real_main()
{
  #define Len 10
  #define Run 5
  for (int i =0 ; i<Len; i++)
    out("empty");
  out("head",1);
  out("tail", 1);
  out("size",len);
  out("mutex",1);
 
  for (int i =0 ; i<5; i++){
    eval("consumer", consumer());
    eval("producer", producer());
}

variation of the same concept

#define SEMAPHORE        "semaphore"
#define PUT_COUNTER      "put"
#define GET_COUNTER      "get"
 
//---------------------- counter API ---------------------
void counter_init(char * conterType) {
	out(conterType,NULL);
}
 
 
int counter_next(char * conterType) {
	int seq;
	in(conterType,?seq);
	out(conterType,++seq);
	return seq;
}
 
//-------------------- semaphore API -----------------
 
void semaphore_init (int counter) {
	for (int i = 0; i < counter; i++ )
		out(SEMAPHORE); 
}
 
void semaphore_acquire() {
	in(SEMAPHORE);
}
 
void semaphore_release() {
	out(SEMAPHORE);
}
 
//-------------------------------------------------
 
void put(int data) {
	semaphore_acquire();
	int seq_id = counter_next(PUT_COUNTER);
	out(seq_id,data); 
}
 
int get() {
	int seq_id = counter_next(GET_COUNTER);
	in(seq_id,?data);
	semaphore_release();
	return data;	
}
 
void real_main() 
{
   semaphore_init(BUFFER_SIZE);
 
   counter_init(GET_COUNTER);
   counter_init(PUT_COUNTER);
 
   eval("consumer", consumer());
   eval("producer", producer());
}

Another variation.

#define BUFFERSIZE 200
 
int get()
{
  int i,j, data;
  in("head",?i);
  rd("tail",?j);
  #wait when the buffer is empty
  while (i==j) rd("tail",?j);
  in("buffer",i,?data);
  i=(i+1)%BUFFERSIZE;
  out("head",i);
  return data;
}
 
void put(int data)
{
  int i,j;
  in("tail",?j);
  rd("head",?i);
  #wait when the buffer is full
  while ( j-i==BUFFERSIZE-1 || i-j==1 ) rd("head",?i);
  j=(j+1)%BUFFERSIZE;
  out("buffer",j,data);
  out("tail",j);
}

Another Version

#define SIZE 100
 
Void Initialize()
{
 out("head", 0);
 out("tail", 0);
}
 
int get()
{
  int head, tail, data;
  in("head",?head);
  rd("tail",?tail);
  while(tail == head) rd("tail",?tail);
  in("data",?data, head);
  out("head",(head+1)%SIZE);
  return data;
}
 
void put(int data)
{
  int head,tail;
  in("tail",?tail);
  rd("head",?head);
  while( head == (tail+1)%SIZE ) rd("head",?head);
  out("data",data, tail);
  out("tail",(tail+1)%SIZE);
}
// i1 and i2 are initialized to 0
void put(int data)
{
      int i1, i2;
      read("consumer", ?i2);
      in("producer",?i1);
      if(i1==size)i1=0;      
      while(i1+1==i2 || (i1+1==size && i2==0))
         read("consumer",?i2);      
      out("data", data, i1); 
      out("producer", i1+1);        
}
 
 
int get()
{
      int i2, data;
      in("consumer",?i2);
      if(i2==size)i2=0;
      in("data",?data,i2);
      out("consumer", i2+1);       
      return data; 
}