How to Wait to Receive a Response from a Dispatchworkitem Before Moving on to the Next Request or Next Dispatchworkitem in a Dispatch Queue

How can i wait to receive a response from a DispatchWorkItem before moving on to the next request or next DispatchWorkItem in a Dispatch Queue

I might advise against using semaphores or the like to block threads so that you can make asynchronous tasks behave synchronously, solely for the sake of DispatchWorkItem.

When I want to establish dependencies between asynchronous tasks, I have historically used Operation rather than DispatchWorkItem. (Admittedly, in iOS 13 and later, we might contemplate Combine’s Future/Promise, but for now operations are the way to go.) Operations have been designed to support wrapping of asynchronous processes much more elegantly than DispatchWorkItem. So you can use a queue whose maxConcurrentOperationCount is 1, like so:

let networkQueue = OperationQueue()
networkQueue.maxConcurrentOperationCount = 1

let completionOperation = BlockOperation {
print("all done")
}

for url in urls {
let operation = NetworkOperation(url: url) { result in
switch result {
case .failure(let error):
...

case .success(let data):
...
}
}
completionOperation.addDependency(operation)
networkQueue.addOperation(operation)
}

OperationQueue.main.addOperation(completionOperation)

Or you can use a more reasonable maxConcurrentOperationCount and use dependencies only between those operations where you need this sequential behavior:

let networkQueue = OperationQueue()
networkQueue.maxConcurrentOperationCount = 4

let completionOperation = BlockOperation {
print("all done")
}

var previousOperation: Operation?

for url in urls {
let operation = NetworkOperation(url: url) { result in
switch result {
case .failure(let error):
...

case .success(let data):
...
}
}
if let previousOperation = previousOperation {
operation.addDependency(previousOperation)
}
completionOperation.addDependency(operation)
networkQueue.addOperation(operation)
previousOperation = operation
}

OperationQueue.main.addOperation(completionOperation)

This is what that NetworkOperation might look like:

class NetworkOperation: AsynchronousOperation {
typealias NetworkCompletion = (Result<Data, Error>) -> Void

enum NetworkError: Error {
case invalidResponse(Data, URLResponse?)
}

private var networkCompletion: NetworkCompletion?
private var task: URLSessionTask!

init(request: URLRequest, completion: @escaping NetworkCompletion) {
super.init()

task = URLSession.shared.dataTask(with: request) { data, response, error in
defer {
self.networkCompletion = nil
self.finish()
}

guard let data = data, error == nil else {
self.networkCompletion?(.failure(error!))
return
}

guard
let httpResponse = response as? HTTPURLResponse,
200..<300 ~= httpResponse.statusCode
else {
self.networkCompletion?(.failure(NetworkError.invalidResponse(data, response)))
return
}

self.networkCompletion?(.success(data))
}
networkCompletion = completion
}

convenience init(url: URL, completion: @escaping NetworkCompletion) {
self.init(request: URLRequest(url: url), completion: completion)
}

override func main() {
task.resume()
}

override func cancel() {
task.cancel()
}
}

This is passing back Data, but you can write permutations/subclasses that further parse that into whatever your web service is returning using JSONDecoder or whatever. But hopefully this illustrates the basic idea.

The above uses this AsynchronousOperation class:

/// Asynchronous operation base class
///
/// This is abstract to class performs all of the necessary KVN of `isFinished` and
/// `isExecuting` for a concurrent `Operation` subclass. You can subclass this and
/// implement asynchronous operations. All you must do is:
///
/// - override `main()` with the tasks that initiate the asynchronous task;
///
/// - call `completeOperation()` function when the asynchronous task is done;
///
/// - optionally, periodically check `self.cancelled` status, performing any clean-up
/// necessary and then ensuring that `finish()` is called; or
/// override `cancel` method, calling `super.cancel()` and then cleaning-up
/// and ensuring `finish()` is called.

public class AsynchronousOperation: Operation {

/// State for this operation.

@objc private enum OperationState: Int {
case ready
case executing
case finished
}

/// Concurrent queue for synchronizing access to `state`.

private let stateQueue = DispatchQueue(label: Bundle.main.bundleIdentifier! + ".rw.state", attributes: .concurrent)

/// Private backing stored property for `state`.

private var _state: OperationState = .ready

/// The state of the operation

@objc private dynamic var state: OperationState {
get { stateQueue.sync { _state } }
set { stateQueue.sync(flags: .barrier) { _state = newValue } }
}

// MARK: - Various `Operation` properties

open override var isReady: Bool { return state == .ready && super.isReady }
public final override var isAsynchronous: Bool { return true }
public final override var isExecuting: Bool { return state == .executing }
public final override var isFinished: Bool { return state == .finished }

// KVN for dependent properties

open override class func keyPathsForValuesAffectingValue(forKey key: String) -> Set<String> {
if ["isReady", "isFinished", "isExecuting"].contains(key) {
return [#keyPath(state)]
}

return super.keyPathsForValuesAffectingValue(forKey: key)
}

// Start

public final override func start() {
if isCancelled {
state = .finished
return
}

state = .executing

main()
}

/// Subclasses must implement this to perform their work and they must not call `super`. The default implementation of this function throws an exception.

open override func main() {
fatalError("Subclasses must implement `main`.")
}

/// Call this function to finish an operation that is currently executing

public final func finish() {
if isExecuting { state = .finished }
}
}

There are lots of ways to write a base AsynchronousOperation, and I don’t want to get lost in the details, but the idea is that we now have an Operation that we can use for any asynchronous process.

Is there a specific way to append DispatchWorkItems to a DispatchQueue instead of re declaring them in code?

Yes, you can have an array of DispatchWorkItem objects, but to dispatch them all, you’d just have to iterate through them, e.g., with either for-in or forEach:

let queue = DispatchQueue(label: "com.domain.app.requests")
let group = DispatchGroup()

let itemsToExecute: [DispatchWorkItem] = [item1, item2]

itemsToExecute.forEach { queue.async(group: group, execute: $0) }

group.notify(queue: .main) {
print("all done") // this is called when the requests are done
}

Note, I used async vs sync, because the whole point of using GCD is to avoid blocking the main queue, and while sync blocks, async doesn’t.

This begs the question of why you’d bother using an array of DispatchWorkItem at all, though. Just add the tasks to the queue directly, and the queue takes care of keeping track of all of them for you.


Frankly, we’d probably just want to use URLSession. For example:

@discardableResult
func request(from urlString: String, completion: @escaping (Result<String,Error>) -> Void) -> URLSessionTask {
let task = URLSession.shared.dataTask(with: URL(string: urlString)!) { data, response, error in
guard let data = data, error == nil else {
completion(.failure(error!))
return
}

guard
let httpResponse = response as? HTTPURLResponse,
200..<300 ~= httpResponse.statusCode
else {
completion(.failure(NetworkError.invalidResponse(data, response)))
return
}

guard let string = String(data: data, encoding: .utf8) else {
completion(.failure(NetworkError.nonStringBody))
return
}

completion(.success(string))
}
task.resume()
return task
}

Where perhaps:

enum NetworkError: Error {
case invalidResponse(Data, URLResponse?)
case nonStringBody
}

Then, you can do something like:

for urlString in urlStrings {
group.enter()
request(from: urlString) { result in
defer { group.leave() }

switch result {
case .failure(let error):
print(urlString, error)

case .success(let string):
print(urlString, string.count)
}
}
}

group.notify(queue: .main) {
print("all done")
}

Waiting until the task finishes

Use DispatchGroups to achieve this. You can either get notified when the group's enter() and leave() calls are balanced:

func myFunction() {
var a: Int?

let group = DispatchGroup()
group.enter()

DispatchQueue.main.async {
a = 1
group.leave()
}

// does not wait. But the code in notify() gets run
// after enter() and leave() calls are balanced

group.notify(queue: .main) {
print(a)
}
}

or you can wait:

func myFunction() {
var a: Int?

let group = DispatchGroup()
group.enter()

// avoid deadlocks by not using .main queue here
DispatchQueue.global(attributes: .qosDefault).async {
a = 1
group.leave()
}

// wait ...
group.wait()

print(a) // you could also `return a` here
}

Note: group.wait() blocks the current queue (probably the main queue in your case), so you have to dispatch.async on another queue (like in the above sample code) to avoid a deadlock.

Waiting until two async blocks are executed before starting another block

Use dispatch groups: see here for an example, "Waiting on Groups of Queued Tasks" in the "Dispatch Queues" chapter of Apple's iOS Developer Library's Concurrency Programming Guide

Your example could look something like this:

dispatch_group_t group = dispatch_group_create();

dispatch_group_async(group,dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^ {
// block1
NSLog(@"Block1");
[NSThread sleepForTimeInterval:5.0];
NSLog(@"Block1 End");
});

dispatch_group_async(group,dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^ {
// block2
NSLog(@"Block2");
[NSThread sleepForTimeInterval:8.0];
NSLog(@"Block2 End");
});

dispatch_group_notify(group,dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0), ^ {
// block3
NSLog(@"Block3");
});

// only for non-ARC projects, handled automatically in ARC-enabled projects.
dispatch_release(group);

and could produce output like this:

2012-08-11 16:10:18.049 Dispatch[11858:1e03] Block1
2012-08-11 16:10:18.052 Dispatch[11858:1d03] Block2
2012-08-11 16:10:23.051 Dispatch[11858:1e03] Block1 End
2012-08-11 16:10:26.053 Dispatch[11858:1d03] Block2 End
2012-08-11 16:10:26.054 Dispatch[11858:1d03] Block3

Swift Cancel DispatchQueue Process

There is no way to stop or "kill" a DispatchWorkItem or NSOperation from outside. There is a cancel() method, but that merely sets the isCancelled property of the item or operation to true. This does not stop the execution of the item itself. Ans since recv is blocking, there is no way to check the isCancelled flag during execution. This means the answer posted by Vadian unfortunately wouldn't do anything.

According to the Apple docs on NSOperation.cancel:

This method does not force your operation code to stop.

The same goes for NSOperationQueue.cancelAllOperations:

Canceling the operations does not automatically remove them from the queue or stop those that are currently executing.

You might think it is possible to drop down to using a raw NSThread. However, the same principle applies hier. You cannot deterministically kill a thread from the outside.

Possible solution: timeout

The best solution I can think of is to use the timeout feature of the socket. I don't know where UDPServer comes from, but perhaps it has a built in timeout.

Possible solution: Poor man's timeout (send packet to localhost)

Another option you can try is to send some UDP packets to yourself after a certain time has elapsed. This way, recv will receive some data, and execution will continue. This could possibly be used as a "poor man's timeout".

When to use Semaphore instead of Dispatch Group?

Conceptually, both of DispatchGroup and Semaphore serve the same purpose (unless I misunderstand something).

The above is not exactly true. You can use a semaphore to do the same thing as a dispatch group but it is much more general.

Dispatch groups are used when you have a load of things you want to do that can all happen at once, but you need to wait for them all to finish before doing something else.

Semaphores can be used for the above but they are general purpose synchronisation objects and can be used for many other purposes too. The concept of a semaphore is not limited to Apple and can be found in many operating systems.

In general, a semaphore has a value which is a non negative integer and two operations:

  • wait If the value is not zero, decrement it, otherwise block until something signals the semaphore.

  • signal If there are threads waiting, unblock one of them, otherwise increment the value.

Needless to say both operations have to be thread safe. In olden days, when you only had one CPU, you'd simply disable interrupts whilst manipulating the value and the queue of waiting threads. Nowadays, it is more complicated because of multiple CPU cores and on chip caches etc.

A semaphore can be used in any case where you have a resource that can be accessed by at most N threads at the same time. You set the semaphore's initial value to N and then the first N threads that wait on it are not blocked but the next thread has to wait until one of the first N threads has signaled the semaphore. The simplest case is N = 1. In that case, the semaphore behaves like a mutex lock.

A semaphore can be used to emulate a dispatch group. You start the sempahore at 0, start all the tasks - tracking how many you have started and wait on the semaphore that number of times. Each task must signal the semaphore when it completes.

However, there are some gotchas. For example, you need a separate count to know how many times to wait. If you want to be able to add more tasks to the group after you have started waiting, the count can only be updated in a mutex protected block and that may lead to problems with deadlocking. Also, I think the Dispatch implementation of semaphores might be vulnerable to priority inversion. Priority inversion occurs when a high priority thread waits for a resource that a low priority has grabbed. The high priority thread is blocked until the low priority thread releases the resource. If there is a medium priority thread running, this may never happen.

You can pretty much do anything with a semaphore that other higher level synchronisation abstractions can do, but doing it right is often a tricky business to get right. The higher level abstractions are (hopefully) carefully written and you should use them in preference to a "roll your own" implementation with semaphores, if possible.

Concurrent vs serial queues in GCD

A simple example: you have a block that takes a minute to execute. You add it to a queue from the main thread. Let's look at the four cases.

  • async - concurrent: the code runs on a background thread. Control returns immediately to the main thread (and UI). The block can't assume that it's the only block running on that queue
  • async - serial: the code runs on a background thread. Control returns immediately to the main thread. The block can assume that it's the only block running on that queue
  • sync - concurrent: the code runs on a background thread but the main thread waits for it to finish, blocking any updates to the UI. The block can't assume that it's the only block running on that queue (I could have added another block using async a few seconds previously)
  • sync - serial: the code runs on a background thread but the main thread waits for it to finish, blocking any updates to the UI. The block can assume that it's the only block running on that queue

Obviously you wouldn't use either of the last two for long running processes. You normally see it when you're trying to update the UI (always on the main thread) from something that may be running on another thread.



Related Topics



Leave a reply



Submit