Workaround on the Threads Limit in Grand Central Dispatch

Workaround on the threads limit in Grand Central Dispatch?

Well, if you're bound and determined, you can free yourself of the shackles of GCD, and go forth and slam right up against the OS per-process thread limit using pthreads, but the bottom line is this: if you're hitting the queue-width limit in GCD, you might want to consider reevaluating your concurrency approach.

At the extremes, there are two ways you can hit the limit:

  1. You can have 64 threads blocked on some OS primitive via a blocking syscall. (I/O bound)
  2. You can legitimately have 64 runnable tasks all ready to rock at the same time. (CPU bound)

If you're in situation #1, then the recommended approach is to use non-blocking I/O. In fact, GCD has a whole bunch of calls, introduced in 10.7/Lion IIRC, that facilitate asynchronous scheduling of I/O and improve thread re-use. If you use the GCD I/O mechanism, then those threads won't be tied up waiting on I/O, GCD will just queue up your blocks (or functions) when data becomes available on your file descriptor (or mach port). See the documentation for dispatch_io_create and friends.

In case it helps, here's a little example (presented without warranty) of a TCP echo server implemented using the GCD I/O mechanism:

in_port_t port = 10000;
void DieWithError(char *errorMessage);

// Returns a block you can call later to shut down the server -- caller owns block.
dispatch_block_t CreateCleanupBlockForLaunchedServer()
{
// Create the socket
int servSock = -1;
if ((servSock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
DieWithError("socket() failed");
}

// Bind the socket - if the port we want is in use, increment until we find one that isn't
struct sockaddr_in echoServAddr;
memset(&echoServAddr, 0, sizeof(echoServAddr));
echoServAddr.sin_family = AF_INET;
echoServAddr.sin_addr.s_addr = htonl(INADDR_ANY);
do {
printf("server attempting to bind to port %d\n", (int)port);
echoServAddr.sin_port = htons(port);
} while (bind(servSock, (struct sockaddr *) &echoServAddr, sizeof(echoServAddr)) < 0 && ++port);

// Make the socket non-blocking
if (fcntl(servSock, F_SETFL, O_NONBLOCK) < 0) {
shutdown(servSock, SHUT_RDWR);
close(servSock);
DieWithError("fcntl() failed");
}

// Set up the dispatch source that will alert us to new incoming connections
dispatch_queue_t q = dispatch_queue_create("server_queue", DISPATCH_QUEUE_CONCURRENT);
dispatch_source_t acceptSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, servSock, 0, q);
dispatch_source_set_event_handler(acceptSource, ^{
const unsigned long numPendingConnections = dispatch_source_get_data(acceptSource);
for (unsigned long i = 0; i < numPendingConnections; i++) {
int clntSock = -1;
struct sockaddr_in echoClntAddr;
unsigned int clntLen = sizeof(echoClntAddr);

// Wait for a client to connect
if ((clntSock = accept(servSock, (struct sockaddr *) &echoClntAddr, &clntLen)) >= 0)
{
printf("server sock: %d accepted\n", clntSock);

dispatch_io_t channel = dispatch_io_create(DISPATCH_IO_STREAM, clntSock, q, ^(int error) {
if (error) {
fprintf(stderr, "Error: %s", strerror(error));
}
printf("server sock: %d closing\n", clntSock);
close(clntSock);
});

// Configure the channel...
dispatch_io_set_low_water(channel, 1);
dispatch_io_set_high_water(channel, SIZE_MAX);

// Setup read handler
dispatch_io_read(channel, 0, SIZE_MAX, q, ^(bool done, dispatch_data_t data, int error) {
BOOL close = NO;
if (error) {
fprintf(stderr, "Error: %s", strerror(error));
close = YES;
}

const size_t rxd = data ? dispatch_data_get_size(data) : 0;
if (rxd) {
// echo...
printf("server sock: %d received: %ld bytes\n", clntSock, (long)rxd);
// write it back out; echo!
dispatch_io_write(channel, 0, data, q, ^(bool done, dispatch_data_t data, int error) {});
}
else {
close = YES;
}

if (close) {
dispatch_io_close(channel, DISPATCH_IO_STOP);
dispatch_release(channel);
}
});
}
else {
printf("accept() failed;\n");
}
}
});

// Resume the source so we're ready to accept once we listen()
dispatch_resume(acceptSource);

// Listen() on the socket
if (listen(servSock, SOMAXCONN) < 0) {
shutdown(servSock, SHUT_RDWR);
close(servSock);
DieWithError("listen() failed");
}

// Make cleanup block for the server queue
dispatch_block_t cleanupBlock = ^{
dispatch_async(q, ^{
shutdown(servSock, SHUT_RDWR);
close(servSock);
dispatch_release(acceptSource);
dispatch_release(q);
});
};

return Block_copy(cleanupBlock);
}

Anyway... back to the topic at hand:

If you're in situation #2, you should ask yourself, "Am I really gaining anything through this approach?" Let's say you have the most studly MacPro out there -- 12 cores, 24 hyperthreaded/virtual cores. With 64 threads, you've got an approx. 3:1 thread to virtual core ratio. Context switches and cache misses aren't free. Remember, we presumed that you weren't I/O bound for this scenario, so all you're doing by having more tasks than cores is wasting CPU time with context switches and cache thrash.

In reality, if your application is hanging because you've hit the queue width limit, then the most likely scenario is that you've starved your queue. You've likely created a dependency that reduces to a deadlock. The case I've seen most often is when multiple, interlocked threads are trying to dispatch_sync on the same queue, when there are no threads left. This is always fail.

Here's why: Queue width is an implementation detail. The 64 thread width limit of GCD is undocumented because a well-designed concurrency architecture shouldn't depend on the queue width. You should always design your concurrency architecture such that a 2 thread wide queue would eventually finish the job to the same result (if slower) as a 1000 thread wide queue. If you don't, there will always be a chance that your queue will get starved. Dividing your workload into parallelizable units should be opening yourself to the possibility of optimization, not a requirement for basic functioning. One way to enforce this discipline during development is to try working with a serial queue in places where you use concurrent queues, but expect non-interlocked behavior. Performing checks like this will help you catch some (but not all) of these bugs earlier.

Also, to the precise point of your original question: IIUC, the 64 thread limit is 64 threads per top-level concurrent queue, so if you really feel the need, you can use all three top level concurrent queues (Default, High and Low priority) to achieve more than 64 threads total. Please don't do this though. Fix your design such that it doesn't starve itself instead. You'll be happier. And anyway, as I hinted above, if you're starving out a 64 thread wide queue, you'll probably eventually just fill all three top level queues and/or run into the per-process thread limit and starve yourself that way too.

Grand Central Dispatch - Passing data between threads

I made the mistake of simplifying my code to keep my question clear.

On the main queue I actually message self with another call and this causes the bad access.

The solution was to call the method on my async queue, and inside the method wrap the code in the dispatch_async(dispatch_get_main_queue(), ^{}); block.

Hopefully this will help someone else.

Why are We Moved from the Threads Closures are called On in Grand Central Dispatch?

"Why" questions are always a little funny. In the most direct sense, because that's just how URLSession works:

All delegate method calls and completion handlers related to the session are performed on this queue.

How to get an array out of Grand Central Dispatch?

A couple of reactions:

  1. In the last line of your code, you're setting student_Array to student_temp. Clearly that line makes no sense because you're populating student_temp asynchronously. And you're opening yourself up to synchronization issues if you're trying to simultaneously access the save variable in two queues. Don't bother to assign student_Array to student_temp at the end of viewDidLoad, but rather just do it inside the nested dispatch_async calls.

  2. Inside the block, you're populating and setting student_temp. It probably makes more sense to make that variable scoped within that block, avoiding temptation to access it from outside that block as well as simplifying your code because the __block qualifier is no longer needed.

  3. This block is running asynchronously, so when you update student_Array in the main queue, you might want to update your UI at the same time (e.g. reload the tableview or whatever). Perhaps you're doing that already and just removed it for the sake of brevity, but I just wanted to make sure.

Thus:

- (void)viewDidLoad
{
[super viewDidLoad];

dispatch_queue_t queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0ul);
dispatch_async(queue, ^{

R2LFetcher *studentFetch = [[R2LFetcher alloc] init];

// long-running code goes here, for example ...

NSMutableArray *student_temp = [studentFetch fetchToStudentArray];

dispatch_async(dispatch_get_main_queue(), ^{

student_Array = student_temp;

// code the updates the main thread (UI) here, for example...

[self.tableView reloadData];
});
});
}

Is it safe to schedule blocks on the main DispatchQueue at the same time from concurrent threads?

No need for locks. The main queue is a lock! That's the whole point (well, a lot of the point). It is a serial queue; nothing can start executing on the main queue as long as something else is already executing. There is no concurrency within the main thread. What you're doing is exactly right; just be aware that you may be lining up a whole bunch of blocks to be executed on the main thread, one at a time. But that's not an issue unless here are a heck of a lot of them.

Where and why deadlock?

This pattern (async writes with barrier, concurrent reads) is known as the “reader-writer” pattern. This particular multithreaded synchronization mechanism can deadlock in thread explosion scenarios.

In short, it deadlocks because:

  • You have “thread explosion”;

  • You have exhausted the worker thread pool, which only has 64 threads;

  • Your dispatched item has two potentially blocking calls, not only the sync, which obviously can block, but also the concurrent async (see next point); and

  • When you hit a dispatch, if there is not an available worker thread in the pool, it will wait until one is made available (even if dispatching asynchronously).

The key observation is that one should simply avoid unbridled thread explosion. Generally we reach for tools such as GCD's concurrentPerform (a parallel for loop which is constrained to the maximum number of CPU cores), operation queues (which can be controlled through judicious maxConcurrentOperationCount setting) or Swift concurrency (use its cooperative thread pool to control degree of concurrency, actors for synchronization, etc.).


While the reader-writer has intuitive appeal, in practice it simply introduces complexities (synchronization for multithreaded environment with yet another multithreaded mechanism, both of which are constrained by surprisingly small GCD worker thread pools), without many practical benefits. Benchmark it and you will see that it is negligibly faster than a simple serial GCD queue, and relatively much slower than lock-based approaches.

How to parallelize many (100+) tasks without hitting global GCD limit?

Instead of directly using GCD with a concurrent queue, use an NSOperationQueue. Set its maxConcurrentOperationCount to something reasonable, like 4 or 8.

If you can, you should also separate I/O from pure computation. Use the width-restricted operation queue for the I/O. The pure computation you can use an unrestricted operation queue or pure GCD for.

The reason is that I/O blocks. GCD detects that the system is idle and spins up another worker thread and starts another task from the queue. That blocks in I/O, too, so it does that some more until it hits its limit. Then, the I/O starts completing and the tasks unblock. Now you have oversubscribed the system resources (i.e. CPU) because there are more tasks in flight than cores and suddenly they are actually using CPU instead of being blocked by I/O.

Pure computation tasks don't provoke this problem because GCD sees that the system is actually busy and doesn't dequeue more tasks until earlier ones have completed.



Related Topics



Leave a reply



Submit