Multiple Producers, Single Consumer

Multiple producers, single consumer

This kind of thing is very easy to do using the BlockingCollection<T> defined in System.Collections.Concurrent.

Basically, you create your queue so that all threads can access it:

BlockingCollection<LogRecord> LogQueue = new BlockingCollection<LogRecord>();

Each producer adds items to the queue:

while (!Shutdown)
{
LogRecord rec = CreateLogRecord(); // however that's done
LogQueue.Add(rec);
}

And the consumer does something similar:

while (!Shutdown)
{
LogRecord rec = LogQueue.Take();
// process the record
}

By default, BlockingCollection uses a ConcurrentQueue<T> as the backing store. The ConcurrentQueue takes care of thread synchronization and, and the BlockingCollection does a non-busy wait when trying to take an item. That is, if the consumer calls Take when there are no items in the queue, it does a non-busy wait (no sleeping/spinning) until an item is available.

Multiple producers, single consumer: all goroutines are asleep - deadlock

Given that you do have multiple writers on a single channel, you have a bit of a challenge, because the easy way to do this in Go in general is to have a single writer on a single channel, and then have that single writer close the channel upon sending the last datum:

func produce(... args including channel) {
defer close(ch)
for stuff_to_produce {
ch <- item
}
}

This pattern has the nice property that no matter how you get out of produce, the channel gets closed, signalling the end of production.

You're not using this pattern—you deliver one channel to many goroutines, each of which can send one message—so you need to move the close (or, of course, use yet some other pattern). The simplest way to express the pattern you need is this:

func overall_produce(... args including channel ...) {
var pg sync.WaitGroup
defer close(ch)
for stuff_to_produce {
pg.Add(1)
go produceInParallel(ch, &pg) // add more args if appropriate
}
pg.Wait()
}

The pg counter accumulates active producers. Each must call pg.Done() to indicate that it is done using ch. The overall producer now waits for them all to be done, then it closes the channel on its way out.

(If you write the inner produceInParallel function as a closure, you don't need to pass ch and pg to it explicitly. You may also write overallProducer as a closure.)

Note that your single consumer's loop is probably best expressed using the for ... range construct:

func receive(msg <-chan message, wg *sync.WaitGroup) {
for m := range msg {
fmt.Println("Received:", m)
}
wg.Done()
}

(You mention an intent to add a select to the loop so that you can do some other computing if a message is not yet ready. If that code cannot be spun off into independent goroutines, you will in fact need the fancier m, ok := <-msg construct.)

Note also that the wg for receive—which may turn out to be unnecessary, depending on how you structure other things—is quite independent from the wait-group pg for the producers. While it's true that, as written, the consumer cannot be done until all the producers are done, we'd like to wait independently for the producers to be done, so that we can close the channel in the overall-producer wrapper.

Is it possible to have multiple producers for the same topic on Pulsar?

Yes, you can have multiple producers on a topic. You just have to make sure each producer has a unique name. From the ProducerBuilder.producerName section of the Java client API docs:

When specifying a name, it is up to the user to ensure that, for a
given topic, the producer name is unique across all Pulsar's clusters.
Brokers will enforce that only a single producer a given name can be
publishing on a topic.

The easiest way to ensure the producer name is unique is to let Pulsar set it automatically for you. From the same section:

If not assigned, the system will generate a globally unique name which
can be accessed with Producer.getProducerName().

Implementing multiple producer and multiple workers results in deadlock

Here is an example of multiple consumers multiple producers implementation. You can use the daemon flag when instantiating the processes so that they are automatically closed when the program quits. You can then use the JoinableQueue and join them (instead of joining the processes) so that the programs quits when there is no item left to process.

You should use if __main__ == "__main__ to launch the program without causing a recursive execution of that program.

from multiprocessing import Process, JoinableQueue
from time import sleep

def consumer(in_queue: JoinableQueue, out_queue: JoinableQueue):
while True:
item = in_queue.get()
sleep(0.5)
s = str(item)
out_queue.put(s)
in_queue.task_done()

def producer(in_queue: JoinableQueue):
while True:
item = in_queue.get()
sleep(0.5)
n = int(item)
print(n)
in_queue.task_done()

if __name__ == "__main__":
number_queue = JoinableQueue()
str_queue = JoinableQueue()

for _ in range(4):
Process(target=consumer, args=(number_queue, str_queue), daemon=True).start()
Process(target=producer, args=(str_queue,), daemon=True).start()

for i in range(100):
number_queue.put(i)

number_queue.join()
str_queue.join()


Related Topics



Leave a reply



Submit