Trying to Understand Asynchronous Operation Subclass

Trying to Understand Asynchronous Operation Subclass

You said:

  1. What is the purpose of the stateQueue property? I see it being used by get and set of the state computed property, but I can't find any documentation that explains the sync:flags:execute and sync:execute methods that they use.

This code "synchronizes" access to a property to make it thread safe. Regarding why you need to do that, see the Operation documentation, which advises:

Multicore Considerations

... When you subclass NSOperation, you must make sure that any overridden methods remain safe to call from multiple threads. If you implement custom methods in your subclass, such as custom data accessors, you must also make sure those methods are thread-safe. Thus, access to any data variables in the operation must be synchronized to prevent potential data corruption. For more information about synchronization, see Threading Programming Guide.

Regarding the exact use of this concurrent queue for synchronization, this is known as the "reader-writer" pattern. This basic concept of reader-writer pattern is that reads can happen concurrent with respect to each other (hence sync, with no barrier), but writes must never be performed concurrently with respect to any other access of that property (hence async with barrier).

For example, you might implement a reader-writer for thread-safety on an array like so:

class ThreadSafeArray<T> {
private var values: [T]
private let queue = DispatchQueue(label: "...", attributes: .concurrent)

init(_ values: [T]) {
self.values = values
}

func reader<U>(block: () throws -> U) rethrows -> U {
return try queue.sync {
try block()
}
}

func writer(block: @escaping (inout [T]) -> Void) {
queue.async(flags: .barrier) {
block(&self.values)
}
}

// e.g. you might use `reader` and `writer` like the following:

subscript(_ index: Int) -> T {
get { reader { values[index] } }
set { writer { $0[index] = newValue } }
}

func append(_ value: T) {
writer { $0.append(value) }
}

func remove(at index: Int) {
writer { $0.remove(at: index)}
}
}

Obviously, the use of reader-writer in this Operation subclass is even simpler, but the above illustrates the pattern.

You also asked:


  1. What is the purpose of the three class methods in the NSObject section that return ["state"]? I don't see them being used anywhere. I found, in NSObject, class func keyPathsForValuesAffectingValue(forKey key: String) -> Set<String>, but that doesn't seem to help me understand why these methods are declared.

These are just methods that ensure that changes to the state property trigger KVO notifications for properties isReady, isExecuting and isFinished. The KVO notifications of these three keys is critical for the correct functioning of asynchronous operations. Anyway, this syntax is outlined in the Key-Value Observing Programming Guide: Registering Dependent Keys.

The keyPathsForValuesAffectingValue method you found is related. You can either register dependent keys using that method, or have the individual methods as shown in your original code snippet.


BTW, here is a revised version of the AsynchronousOperation class you provided, namely:

  1. You must not call super.start(). As the start documentation says (emphasis added):

    If you are implementing a concurrent operation, you must override this method and use it to initiate your operation. Your custom implementation must not call super at any time.

  2. Add @objc required in Swift 4.

  3. Renamed execute to use main, which is the convention for Operation subclasses.

  4. It is inappropriate to declare isReady as a final property. Any subclass should have the right to further refine its isReady logic (though we admittedly rarely do so).

  5. Use #keyPath to make code a little more safe/robust.

  6. You don't need to do manual KVO notifications when using dynamic property. The manual calling of willChangeValue and didChangeValue is not needed in this example.

  7. Change finish so that it only moves to .finished state if not already finished.

Thus:

public class AsynchronousOperation: Operation {

/// State for this operation.

@objc private enum OperationState: Int {
case ready
case executing
case finished
}

/// Concurrent queue for synchronizing access to `state`.

private let stateQueue = DispatchQueue(label: Bundle.main.bundleIdentifier! + ".rw.state", attributes: .concurrent)

/// Private backing stored property for `state`.

private var _state: OperationState = .ready

/// The state of the operation

@objc private dynamic var state: OperationState {
get { return stateQueue.sync { _state } }
set { stateQueue.async(flags: .barrier) { self._state = newValue } }
}

// MARK: - Various `Operation` properties

open override var isReady: Bool { return state == .ready && super.isReady }
public final override var isExecuting: Bool { return state == .executing }
public final override var isFinished: Bool { return state == .finished }
public final override var isAsynchronous: Bool { return true }

// KVN for dependent properties

open override class func keyPathsForValuesAffectingValue(forKey key: String) -> Set<String> {
if ["isReady", "isFinished", "isExecuting"].contains(key) {
return [#keyPath(state)]
}

return super.keyPathsForValuesAffectingValue(forKey: key)
}

// Start

public final override func start() {
if isCancelled {
state = .finished
return
}

state = .executing

main()
}

/// Subclasses must implement this to perform their work and they must not call `super`. The default implementation of this function throws an exception.

open override func main() {
fatalError("Subclasses must implement `main`.")
}

/// Call this function to finish an operation that is currently executing

public final func finish() {
if !isFinished { state = .finished }
}
}

Operation went isFinished=YES without being started by the queue it is in

The key problem is that your markAsCompleted is triggering isFinished when the operation is not isExecuting. I'd suggest you just fix that markAsCompleted to only do this if isExecuting is true. This reduces the burden on subclasses doing any complicated state tests to figure out whether they need to transition to isFinished or not.

That having been said, I see three basic patterns when writing cancelable asynchronous operations:

  1. If I'm dealing with some pattern where the canceling of the task will prevent it from transitioning executing operations to a isFinished state.

    In that case, I must have cancel implementation manually finish the executing operations. For example:

    class FiveSecondOperation: AsynchronousOperation {
    var block: DispatchWorkItem?

    override func main() {
    block = DispatchWorkItem { [weak self] in
    self?.finish()
    self?.block = nil
    }

    DispatchQueue.main.asyncAfter(deadline: .now() + 5, execute: block!)
    }

    override func cancel() {
    super.cancel()

    if isExecuting {
    block?.cancel()
    finish()
    }
    }
    }

    Focusing on the cancel implementation, because if I cancel the DispatchWorkItem it won't finish the operation, I therefore need to make sure that cancel will explicitly finish the operation itself.

  2. Sometimes, when you cancel some asynchronous task, it will call its completion handler automatically for you, in which case cancel doesn't need to do anything other than cancel the that task and call super. For example:

    class GetOperation: AsynchronousOperation {
    var url: URL
    weak var task: URLSessionTask?

    init(url: URL) {
    self.url = url
    super.init()
    }

    override func main() {
    let task = URLSession.shared.dataTask(with: url) { data, _, error in
    defer { self.finish() } // make sure to finish the operation

    // process `data` & `error` here
    }
    task.resume()
    self.task = task
    }

    override func cancel() {
    super.cancel()
    task?.cancel()
    }
    }

    Again, focusing on cancel, in this case we don't touch the "finished" state, but just cancel dataTask (which will call its completion handler even if you cancel the request) and call the super implementation.

  3. The third scenario is where you have some operation that is periodically checking isCancelled state. In that case, you don't have to implement cancel at all, as the default behavior is sufficient. For example:

    class DisplayLinkOperation: AsynchronousOperation {
    private weak var displayLink: CADisplayLink?
    private var startTime: CFTimeInterval!
    private let duration: CFTimeInterval = 2

    override func main() {
    startTime = CACurrentMediaTime()
    let displayLink = CADisplayLink(target: self, selector: #selector(handleDisplayLink(_:)))
    displayLink.add(to: .main, forMode: .commonModes)
    self.displayLink = displayLink
    }

    @objc func handleDisplayLink(_ displayLink: CADisplayLink) {
    let percentComplete = (CACurrentMediaTime() - startTime) / duration

    if percentComplete >= 1.0 || isCancelled {
    displayLink.invalidate()
    finish()
    }

    // now do some UI update based upon `elapsed`
    }
    }

    In this case, where I've wrapped a display link in an operation so I can manage dependencies and/or encapsulate the display link in a convenient object, I don't have to implement cancel at all, because the default implementation will update isCancelled for me, and I can just check for that.

Those are three basic cancel patterns I generally see. That having been said, updating markAsCompleted to only trigger isFinished if isExecuting is a good safety check to make sure you can never get the problem you described.


By the way, the AsynchronousOperation that I used for the above examples is as follows, adapted from Trying to Understand Asynchronous Operation Subclass. BTW, what you called markAsCompleted is called finish, and it sounds like you're triggering the isFinished and isExecuting KVO via a different mechanism, but the idea is basically the same. Just check the current state before you trigger isFinished KVO:

open class AsynchronousOperation: Operation {

/// State for this operation.

@objc private enum OperationState: Int {
case ready
case executing
case finished
}

/// Concurrent queue for synchronizing access to `state`.

private let stateQueue = DispatchQueue(label: Bundle.main.bundleIdentifier! + ".rw.state", attributes: .concurrent)

/// Private backing stored property for `state`.

private var rawState: OperationState = .ready

/// The state of the operation

@objc private dynamic var state: OperationState {
get { return stateQueue.sync { rawState } }
set { stateQueue.sync(flags: .barrier) { rawState = newValue } }
}

// MARK: - Various `Operation` properties

open override var isReady: Bool { return state == .ready && super.isReady }
public final override var isExecuting: Bool { return state == .executing }
public final override var isFinished: Bool { return state == .finished }
public final override var isAsynchronous: Bool { return true }

// MARK: - KVN for dependent properties

open override class func keyPathsForValuesAffectingValue(forKey key: String) -> Set<String> {
if ["isReady", "isFinished", "isExecuting"].contains(key) {
return [#keyPath(state)]
}

return super.keyPathsForValuesAffectingValue(forKey: key)
}

// MARK: - Foundation.Operation

public final override func start() {
if isCancelled {
state = .finished
return
}

state = .executing

main()
}

/// Subclasses must implement this to perform their work and they must not call `super`. The default implementation of this function throws an exception.

open override func main() {
fatalError("Subclasses must implement `main`.")
}

/// Call this function to finish an operation that is currently executing

public final func finish() {
if isExecuting { state = .finished }
}
}

OperationQueue with custom `maxConcurrentOperationCount` does not pick up / execute all operations in the queue, after finishing first operation

The issue is that the methods are not KVC/KVO compliant. As the Operation documentation says:

The NSOperation class is key-value coding (KVC) and key-value observing (KVO) compliant for several of its properties.

If you provide custom implementations for any of the preceding properties, your implementations must maintain KVC and KVO compliance.

Constraints on the degree of concurrency (e.g., both maxConcurrentOperationCount and addDependency(_:)) rely upon KVO to know when the prior operation is complete. If you fail to perform the required KVO notifications, the queue will not know when subsequent operations may proceed.

See the latter part of Trying to Understand Asynchronous Operation Subclass for an example implementation.


FWIW, here is an asynchronous operation implementation:

public class AsynchronousOperation: Operation {

@Atomic @objc private dynamic var state: OperationState = .ready

// MARK: - Various `Operation` properties

open override var isReady: Bool { state == .ready && super.isReady }
public final override var isExecuting: Bool { state == .executing }
public final override var isFinished: Bool { state == .finished }
public final override var isAsynchronous: Bool { true }

// KVO for dependent properties

open override class func keyPathsForValuesAffectingValue(forKey key: String) -> Set<String> {
if [#keyPath(isReady), #keyPath(isFinished), #keyPath(isExecuting)].contains(key) {
return [#keyPath(state)]
}

return super.keyPathsForValuesAffectingValue(forKey: key)
}

// Start

public final override func start() {
if isCancelled {
state = .finished
return
}

state = .executing

main()
}

/// Subclasses must implement this to perform their work and they must not call `super`. The default implementation of this function throws an exception.

open override func main() {
fatalError("Subclasses must implement `main`.")
}

/// Call this function to finish an operation that is currently executing

public final func finish() {
if !isFinished { state = .finished }
}
}

private extension AsynchronousOperation {
/// State for this operation.

@objc enum OperationState: Int {
case ready
case executing
case finished
}
}

With the following:

@propertyWrapper
public class Atomic<T> {
private var _wrappedValue: T
private var lock = NSLock()

public var wrappedValue: T {
get { lock.synchronized { _wrappedValue } }
set { lock.synchronized { _wrappedValue = newValue } }
}

public init(wrappedValue: T) {
_wrappedValue = wrappedValue
}
}

extension NSLocking {
func synchronized<T>(block: () throws -> T) rethrows -> T {
lock()
defer { unlock() }
return try block()
}
}

With the above, I abstract the asynchronous Operation code into something I can subclass and inherit the asynchronous behaviors. E.g., here is an operation that performs the same asyncAfter as your example (but with some extra OSLog signposts so I can visually see the operations in Instruments):

import os.log

private let log = OSLog(subsystem: "Op", category: .pointsOfInterest)

class MyOperation: AsynchronousOperation {
var value: Int

init(value: Int) {
self.value = value
super.init()
}

override func main() {
let id = OSSignpostID(log: log)
os_signpost(.begin, log: log, name: "Operation", signpostID: id, "%d", value)

DispatchQueue.main.asyncAfter(deadline: .now() + 1) { [self] in
finish()
os_signpost(.end, log: log, name: "Operation", signpostID: id, "%d", value)
}
}
}

Then ...

let queue = OperationQueue()
queue.maxConcurrentOperationCount = 1

for i in 0..<5 {
queue.addOperation(MyOperation(value: i))
}

... yields a timeline of the operations like so:

Sample Image

sync and async status of Operation and OperationQueues

Synchronous operation will be seen as completed by OperationQueue when the block you submit to BlockOperation returns (or main method if you subclass Operation)

Asynchronous operation (that is returning true from its isAsynchronous property), must be marked as finished manually in a subclass of Operation by setting isFinished = true (you should also set isExecuting = false at the same time). This allows you to dispatch whatever work you need to do to a different queue by using Dispatch.async, but still keep the operation in OperationQueue. This is useful to build dependant operations or to allow only certain amount of operations to run in parallel.

Apple's Operation docs have good explanations for all of this.

Can asynchronous operations be used with `progress` on OperationQueue?

You are combining two different but related concepts; asynchronous and concurrency.

An OperationQueue always dispatches Operations onto a separate thread so you do not need to make them explicitly make them asynchronous and there is no need to override start(). You should ensure that your main() does not return until the operation is complete. This means blocking if you perform asynchronous tasks such as network operations.

It is possible to execute an Operation directly. In the case where you want concurrent execution of those operations you need to make them asynchronous. It is in this situation that you would override start()

If you want to implement a concurrent operation—that is, one that runs asynchronously with respect to the calling thread—you must write additional code to start the operation asynchronously. For example, you might spawn a separate thread, call an asynchronous system function, or do anything else to ensure that the start method starts the task and returns immediately and, in all likelihood, before the task is finished.

Most developers should never need to implement concurrent operation objects. If you always add your operations to an operation queue, you do not need to implement concurrent operations. When you submit a nonconcurrent operation to an operation queue, the queue itself creates a thread on which to run your operation. Thus, adding a nonconcurrent operation to an operation queue still results in the asynchronous execution of your operation object code. The ability to define concurrent operations is only necessary in cases where you need to execute the operation asynchronously without adding it to an operation queue.

In summary, make sure your operations are synchronous and do not override start if you want to take advantage of progress

Update

While the normal advice is not to try and make asynchronous tasks synchronous, in this case it is the only thing you can do if you want to take advantage of progress. The problem is that if you have an asynchronous operation, the queue cannot tell when it is actually complete. If the queue can't tell when an operation is complete then it can't update progress accurately for that operation.

You do need to consider the impact on the thread pool of doing this.

The alternative is not to use the inbuilt progress feature and create your own property that you update from your tasks.

Why does my BlockOperation continue after I add it to an OperationQueue?

You said:

Apple documentation says that Operations run synchronously. Why then does the code continue to run after an operation is added to a queue?

Your debug output is correct. The BlockOperation will run asynchronously with respect to the thread which added it to the operation queue. The confusing and inconsistent terminology employed by the Operation and OperationQueue documentation does not help the matter (see below).

But, in short, operations added to an operation queue will run asynchronously with respect to the thread which added them to the queue (unless you explicitly wanted to “wait until finished”, i.e., you supply true for the second parameter of addOperations(_:waitUntilFinished:); but that is an anti-pattern which we simply do not use very often).


I do not know if this is the source of the confusion or not, but there is also a question as to whether the task wrapped by the operation is, itself, synchronous or asynchronous. The Apple documentation refers to these as “non-concurrent” or “concurrent” operations, respectively.

BlockOperation is for “non-concurrent” tasks only (i.e., when the block finishes, the BlockOperation is finished). So, if you stumble across any documentation that refers to the synchronous nature of BlockOperation, that is in reference to the synchronous, i.e., non-concurrent, nature of the block supplied to the operation, not the broader relationship between the BlockOperation and the thread that added it to the queue. Operations run asynchronously with respect to the thread that added them to the queue (whether the operation, itself, is concurrent or not).

FWIW, if you really were wrapping an inherently asynchronous task (such as a network request) within an operation, that called a “concurrent” operation in Apple’s terminology. You would not use BlockOperation for a “concurrent” operation. You would subclass Operation, perform the necessary KVO for “concurrent” operations. (If you really want to go tumbling down that rabbit hole, see https://stackoverflow.com/a/40560463/1271826 or https://stackoverflow.com/a/48104095/1271826, but this is largely unrelated to the question at hand.)


There is a final notion of “synchronous” vs “asynchronous” within the Operation and OperationQueue documentation. Specifically, the documentation dwells on the terms “asynchronous” and “synchronous” operations in the context of the start method, i.e., operations that are not added to a queue, but are just started immediately. If you call start on a “synchronous operation” (rather than adding it to a queue), the calling thread will wait. If you call start an “asynchronous operation”, the calling thread will not wait.

I hate to even bring up this idiosyncratic terminology, but only mention for those stumbling through Apple’s documentation. Apple’s use of “synchronous” vs “asynchronous” in this section only applies within the context of the start method. Obviously, we (and Apple, elsewhere in their own documentation) often use these terms more generally.


You raised another question:

The synchronous code runs immediately and the thread that calls it waits until the synchronous code finishes. Or is that only if the synchronous code runs on the same thread?

“Synchronous” simply means that the caller will not proceed until the synchronous code finishes.

Not to split hairs, but it makes no assurances that the synchronous code will execute immediately (though it generally will). It just means that the caller will not proceed until the synchronous call finishes. But if, for example, the synchronously dispatched code is added to a queue that is backlogged, it might not start immediately.

The term “synchronous” also makes no assurances that it will run on the same thread or a different one. Sometimes is will run it on the current thread. Sometimes it will simply wait on the current thread while the synchronous task finishes on some other thread. It depends upon the particular situation.

“Synchronous” only means that the caller will wait, with no assurances about when or where the synchronous code will execute. That’s it.

Completion block is getting triggered even before my operation completes in main method

The problem is that your operation is initiating an asynchronous process, but the operation finishes when the asynchronous task is started, not when the asynchronous task finishes.

You need to do the KVO associated with a “concurrent” operation, as outlined in the documentation:

If you are creating a concurrent operation, you need to override the following methods and properties at a minimum:

  • start()
  • isAsynchronous
  • isExecuting
  • isFinished

In a concurrent operation, your start() method is responsible for starting the operation in an asynchronous manner. Whether you spawn a thread or call an asynchronous function, you do it from this method. Upon starting the operation, your start() method should also update the execution state of the operation as reported by the isExecuting property. You do this by sending out KVO notifications for the isExecuting key path, which lets interested clients know that the operation is now running. Your isExecuting property must also provide the status in a thread-safe manner.

Upon completion or cancellation of its task, your concurrent operation object must generate KVO notifications for both the isExecuting and isFinished key paths to mark the final change of state for your operation. (In the case of cancellation, it is still important to update the isFinished key path, even if the operation did not completely finish its task. Queued operations must report that they are finished before they can be removed from a queue.) In addition to generating KVO notifications, your overrides of the isExecuting and isFinished properties should also continue to report accurate values based on the state of your operation.

Now all of that sounds quite hairy, but it’s actually not that bad. One way is to write a base operation class that takes care of all of this KVO stuff, and this this answer outlines one example implementation.

Then you can subclass AsynchronousOperation instead, and make sure to call finish (or whatever triggers the isFinished KVO) when the task is done:

class UserRegistrationOperation: AsynchronousOperation {
var registrationRecord: RegistrationRecord

init(registrationRecord: RegistrationRecord) {
self.registrationRecord = registrationRecord
super.init() // whenever you subclass, remember to call `super`
}

override func main() {
self.registrationRecord.state = .pending

//Firebase invocation to create a user in Firebase Auth

Auth.auth().createUser(withEmail: registrationRecord.user.userEmail, password: registrationRecord.encryptedData) { [weak self] result, error in
defer { self?.finish() } // make sure to call `finish` regardless of how we leave this closure

guard let result = result, error == nil else {
print("Error occured while user registration process")
self?.registrationRecord.state = .failed
return
}

self?.registrationRecord.user.uid = result.user.uid
self?.registrationRecord.state = .processed
}
}
}

There are lots of ways to implement that AsynchronousOperation class and this is just one example. But once you have a class that nicely encapsulates the concurrent operation KVO, you can subclass it and you can write your own concurrent operations with very little changes to your code.



Related Topics



Leave a reply



Submit