Multiple producers, single consumer
This kind of thing is very easy to do using the BlockingCollection<T>
defined in System.Collections.Concurrent
.
Basically, you create your queue so that all threads can access it:
BlockingCollection<LogRecord> LogQueue = new BlockingCollection<LogRecord>();
Each producer adds items to the queue:
while (!Shutdown)
{
LogRecord rec = CreateLogRecord(); // however that's done
LogQueue.Add(rec);
}
And the consumer does something similar:
while (!Shutdown)
{
LogRecord rec = LogQueue.Take();
// process the record
}
By default, BlockingCollection
uses a ConcurrentQueue<T>
as the backing store. The ConcurrentQueue
takes care of thread synchronization and, and the BlockingCollection
does a non-busy wait when trying to take an item. That is, if the consumer calls Take
when there are no items in the queue, it does a non-busy wait (no sleeping/spinning) until an item is available.
Multiple producers, single consumer: all goroutines are asleep - deadlock
Given that you do have multiple writers on a single channel, you have a bit of a challenge, because the easy way to do this in Go in general is to have a single writer on a single channel, and then have that single writer close the channel upon sending the last datum:
func produce(... args including channel) {
defer close(ch)
for stuff_to_produce {
ch <- item
}
}
This pattern has the nice property that no matter how you get out of produce
, the channel gets closed, signalling the end of production.
You're not using this pattern—you deliver one channel to many goroutines, each of which can send one message—so you need to move the close
(or, of course, use yet some other pattern). The simplest way to express the pattern you need is this:
func overall_produce(... args including channel ...) {
var pg sync.WaitGroup
defer close(ch)
for stuff_to_produce {
pg.Add(1)
go produceInParallel(ch, &pg) // add more args if appropriate
}
pg.Wait()
}
The pg
counter accumulates active producers. Each must call pg.Done()
to indicate that it is done using ch
. The overall producer now waits for them all to be done, then it closes the channel on its way out.
(If you write the inner produceInParallel
function as a closure, you don't need to pass ch
and pg
to it explicitly. You may also write overallProducer
as a closure.)
Note that your single consumer's loop is probably best expressed using the for ... range
construct:
func receive(msg <-chan message, wg *sync.WaitGroup) {
for m := range msg {
fmt.Println("Received:", m)
}
wg.Done()
}
(You mention an intent to add a select
to the loop so that you can do some other computing if a message is not yet ready. If that code cannot be spun off into independent goroutines, you will in fact need the fancier m, ok := <-msg
construct.)
Note also that the wg
for receive
—which may turn out to be unnecessary, depending on how you structure other things—is quite independent from the wait-group pg
for the producers. While it's true that, as written, the consumer cannot be done until all the producers are done, we'd like to wait independently for the producers to be done, so that we can close the channel in the overall-producer wrapper.
Is it possible to have multiple producers for the same topic on Pulsar?
Yes, you can have multiple producers on a topic. You just have to make sure each producer has a unique name. From the ProducerBuilder.producerName
section of the Java client API docs:
When specifying a name, it is up to the user to ensure that, for a
given topic, the producer name is unique across all Pulsar's clusters.
Brokers will enforce that only a single producer a given name can be
publishing on a topic.
The easiest way to ensure the producer name is unique is to let Pulsar set it automatically for you. From the same section:
If not assigned, the system will generate a globally unique name which
can be accessed with Producer.getProducerName().
Implementing multiple producer and multiple workers results in deadlock
Here is an example of multiple consumers multiple producers implementation. You can use the daemon
flag when instantiating the processes so that they are automatically closed when the program quits. You can then use the JoinableQueue
and join them (instead of joining the processes) so that the programs quits when there is no item left to process.
You should use if __main__ == "__main__
to launch the program without causing a recursive execution of that program.
from multiprocessing import Process, JoinableQueue
from time import sleep
def consumer(in_queue: JoinableQueue, out_queue: JoinableQueue):
while True:
item = in_queue.get()
sleep(0.5)
s = str(item)
out_queue.put(s)
in_queue.task_done()
def producer(in_queue: JoinableQueue):
while True:
item = in_queue.get()
sleep(0.5)
n = int(item)
print(n)
in_queue.task_done()
if __name__ == "__main__":
number_queue = JoinableQueue()
str_queue = JoinableQueue()
for _ in range(4):
Process(target=consumer, args=(number_queue, str_queue), daemon=True).start()
Process(target=producer, args=(str_queue,), daemon=True).start()
for i in range(100):
number_queue.put(i)
number_queue.join()
str_queue.join()
Related Topics
Asp .Net Button - Onclientclick="Return Function()" Vs Onclientclick="Function()"
What Is Difference Between Regasm.Exe and Regsvr32? How to Generate a Tlb File Using Regsvr32
Passing Eval from Aspx to JavaScript Function as Parameter
Error Handling When Downloading File from ASP.NET Web Handler (.Ashx)
C++, C# and JavaScript on Winrt
C# Datetime.Ticks Equivalent in Java
How to Develop iOS App Using Xamarin Studio on Windows
Method Overloading. Can You Overuse It
Why Accessviolationexception Cannot Be Caught by .Net4.0
How to Get The Http Post Data in C#
Getting a Request.Headers Value
C# Linq Where Clause as a Variable
How to Add Attributes for C# Xml Serialization
Why Must C# Operator Overloads Be Static
Android Emulator Not Connecting to Localhost API
How to Get All Input Elements in a Form with HTMLagilitypack Without Getting a Null Reference Error