Gcd Pattern for Chaining Async Operations While Piping the Results

NSOperation or GCD threading with animations

Your situation is a little more complex than the situation outlined in Apple's documentation, but it would help to read that (and if you're still saying, "Huh?" after reading that, go read this SO answer) to understand the intended pattern. In short, the general idea is that the producer "drives" the chain, and GCD's hooks in the OS help it to make sure things are being dispatched appropriately based on the state of various things in the kernel.

The problem with that approach w/r/t your problem here is that it's not straightforward to let the producer side drive things here because your consumer is driven in real-time by the vertical blanking callbacks, and not purely by the availability of consumable resources. This case is further complicated by the inherently serial nature of the workflow -- for instance, even if you could theoretically parallelize the decoding of frame data into images, the images still have to be delivered serially to the next stage in the pipeline, which is a case not well handled by the GCD API in streaming cases (i.e. it would be easy to do with dispatch_apply if you could have everything in memory at once, but cuts to the heart of the problem: you need this to happen in a quasi-streaming context.)

In trying to think of how one might handle this, I came up with the following example that attempts to simulate your situation by using text files where each line in the file is a "frame" in the video, and it "crossfades" the two clips by appending the strings. A full, working (for me, at least) version of this is available here. This code is meant to illustrate how you might architect a processing pipeline like this using only GCD primitives, and using a (largely) producer-driven pattern, while still linking up with the CVDisplayLink-based consumer.

It is NOT bulletproof (i.e. among many other things, it's not tolerant of a file with fewer frames in it than needed to overlap) and may totally fail to address your real-time or memory-use bounding requirements (which are hard for me to replicate & test without doing more work than I'm willing to do. :) ) It also doesn't try to address the issue I mentioned above where you might be able to parallelize workloads that would need to be re-serialized before the next stage of the pipeline. (Code also assumes ARC.) With all those caveats, hopefully there's still some interesting/relevant ideas here for you. Here's the code:

static void DieOnError(int error);
static NSString* NSStringFromDispatchData(dispatch_data_t data);
static dispatch_data_t FrameDataFromAccumulator(dispatch_data_t* accumulator);
static CVReturn MyDisplayLinkCallback(CVDisplayLinkRef displayLink, const CVTimeStamp* now, const CVTimeStamp* outputTime, CVOptionFlags flagsIn, CVOptionFlags* flagsOut, void* displayLinkContext);

static const NSUInteger kFramesToOverlap = 15;

@implementation SOAppDelegate
{
// Display link state
CVDisplayLinkRef mDisplayLink;

// State for our file reading process -- protected via mFrameReadQueue
dispatch_queue_t mFrameReadQueue;
NSUInteger mFileIndex; // keep track of what file we're reading
dispatch_io_t mReadingChannel; // channel for reading
dispatch_data_t mFrameReadAccumulator; // keep track of left-over data across read operations

// State for processing raw frame data delivered by the read process - protected via mFrameDataProcessingQueue
dispatch_queue_t mFrameDataProcessingQueue;
NSMutableArray* mFilesForOverlapping;
NSMutableArray* mFrameArraysForOverlapping;

// State for blending frames (or passing them through)
dispatch_queue_t mFrameBlendingQueue;

// Delivery state
dispatch_queue_t mFrameDeliveryQueue; // Is suspended/resumed to deliver one frame at a time
dispatch_queue_t mFrameDeliveryStateQueue; // Protects access to the iVars
dispatch_data_t mDeliveredFrame; // Data of the frame that has been delivered, but not yet picked up by the CVDisplayLink
NSInteger mLastFrameDelivered; // Counter of frames delivered
NSInteger mLastFrameDisplayed; // Counter of frames displayed
}

- (void)applicationDidFinishLaunching:(NSNotification *)aNotification
{
mFileIndex = 1;
mLastFrameDelivered = -1;
mLastFrameDisplayed = -1;

mFrameReadQueue = dispatch_queue_create("mFrameReadQueue", DISPATCH_QUEUE_SERIAL);
mFrameDataProcessingQueue = dispatch_queue_create("mFrameDataProcessingQueue", DISPATCH_QUEUE_SERIAL);
mFrameBlendingQueue = dispatch_queue_create("mFrameBlendingQueue", DISPATCH_QUEUE_SERIAL);
mFrameDeliveryQueue = dispatch_queue_create("mFrameDeliveryQueue", DISPATCH_QUEUE_SERIAL);
mFrameDeliveryStateQueue = dispatch_queue_create("mFrameDeliveryStateQueue", DISPATCH_QUEUE_SERIAL);

CVDisplayLinkCreateWithActiveCGDisplays(&mDisplayLink);
CVDisplayLinkSetOutputCallback(mDisplayLink, &MyDisplayLinkCallback, (__bridge void*)self);

[self readNextFile];
}

- (void)dealloc
{
if (mDisplayLink)
{
if (CVDisplayLinkIsRunning(mDisplayLink))
{
CVDisplayLinkStop(mDisplayLink);
}
CVDisplayLinkRelease(mDisplayLink);
}
}

- (void)readNextFile
{
dispatch_async (mFrameReadQueue, ^{
NSURL* url = [[NSBundle mainBundle] URLForResource: [NSString stringWithFormat: @"File%lu", mFileIndex++] withExtension: @"txt"];

if (!url)
return;

if (mReadingChannel)
{
dispatch_io_close(mReadingChannel, DISPATCH_IO_STOP);
mReadingChannel = nil;
}

// We don't care what queue the cleanup handler gets called on, because we know there's only ever one file being read at a time
mReadingChannel = dispatch_io_create_with_path(DISPATCH_IO_STREAM, [[url path] fileSystemRepresentation], O_RDONLY|O_NONBLOCK, 0, mFrameReadQueue, ^(int error) {
DieOnError(error);

mReadingChannel = nil;

// Start the next file
[self readNextFile];
});

// We don't care what queue the read handlers get called on, because we know they're inherently serial
dispatch_io_read(mReadingChannel, 0, SIZE_MAX, mFrameReadQueue, ^(bool done, dispatch_data_t data, int error) {
DieOnError(error);

// Grab frames
dispatch_data_t localAccumulator = mFrameReadAccumulator ? dispatch_data_create_concat(mFrameReadAccumulator, data) : data;
dispatch_data_t frameData = nil;
do
{
frameData = FrameDataFromAccumulator(&localAccumulator);
mFrameReadAccumulator = localAccumulator;
[self processFrameData: frameData fromFile: url];
} while (frameData);

if (done)
{
dispatch_io_close(mReadingChannel, DISPATCH_IO_STOP);
}
});
});
}

- (void)processFrameData: (dispatch_data_t)frameData fromFile: (NSURL*)file
{
if (!frameData || !file)
return;

// We want the data blobs constituting each frame to be processed serially
dispatch_async(mFrameDataProcessingQueue, ^{
mFilesForOverlapping = mFilesForOverlapping ?: [NSMutableArray array];
mFrameArraysForOverlapping = mFrameArraysForOverlapping ?: [NSMutableArray array];

NSMutableArray* arrayToAddTo = nil;
if ([file isEqual: mFilesForOverlapping.lastObject])
{
arrayToAddTo = mFrameArraysForOverlapping.lastObject;
}
else
{
arrayToAddTo = [NSMutableArray array];
[mFilesForOverlapping addObject: file];
[mFrameArraysForOverlapping addObject: arrayToAddTo];
}

[arrayToAddTo addObject: frameData];

// We've gotten to file two, and we have enough frames to process the overlap
if (mFrameArraysForOverlapping.count == 2 && [mFrameArraysForOverlapping[1] count] >= kFramesToOverlap)
{
NSMutableArray* fileOneFrames = mFrameArraysForOverlapping[0];
NSMutableArray* fileTwoFrames = mFrameArraysForOverlapping[1];

for (NSUInteger i = 0; i < kFramesToOverlap; ++i)
{
[self blendOneFrame:fileOneFrames[0] withOtherFrame: fileTwoFrames[0]];
[fileOneFrames removeObjectAtIndex:0];
[fileTwoFrames removeObjectAtIndex:0];
}

[mFilesForOverlapping removeObjectAtIndex: 0];
[mFrameArraysForOverlapping removeObjectAtIndex: 0];
}

// We're pulling in frames from file 1, haven't gotten to file 2 yet, have more than enough to overlap
while (mFrameArraysForOverlapping.count == 1 && [mFrameArraysForOverlapping[0] count] > kFramesToOverlap)
{
NSMutableArray* frameArray = mFrameArraysForOverlapping[0];
dispatch_data_t first = frameArray[0];
[mFrameArraysForOverlapping[0] removeObjectAtIndex: 0];
[self blendOneFrame: first withOtherFrame: nil];
}
});
}

- (void)blendOneFrame: (dispatch_data_t)frameA withOtherFrame: (dispatch_data_t)frameB
{
dispatch_async(mFrameBlendingQueue, ^{
NSString* blendedFrame = [NSString stringWithFormat: @"%@%@", [NSStringFromDispatchData(frameA) stringByReplacingOccurrencesOfString: @"\n" withString:@""], NSStringFromDispatchData(frameB)];
dispatch_data_t blendedFrameData = dispatch_data_create(blendedFrame.UTF8String, blendedFrame.length, NULL, DISPATCH_DATA_DESTRUCTOR_DEFAULT);
[self deliverFrameForDisplay: blendedFrameData];
});
}

- (void)deliverFrameForDisplay: (dispatch_data_t)frame
{
// By suspending the queue from within the block, and by virtue of this being a serial queue, we guarantee that
// only one task will get called for each call to dispatch_resume on the queue...

dispatch_async(mFrameDeliveryQueue, ^{
dispatch_suspend(mFrameDeliveryQueue);
dispatch_sync(mFrameDeliveryStateQueue, ^{
mLastFrameDelivered++;
mDeliveredFrame = frame;
});

if (!CVDisplayLinkIsRunning(mDisplayLink))
{
CVDisplayLinkStart(mDisplayLink);
}
});
}

- (dispatch_data_t)getFrameForDisplay
{
__block dispatch_data_t frameData = nil;
dispatch_sync(mFrameDeliveryStateQueue, ^{
if (mLastFrameDelivered > mLastFrameDisplayed)
{
frameData = mDeliveredFrame;
mDeliveredFrame = nil;
mLastFrameDisplayed = mLastFrameDelivered;
}
});

// At this point, I've either got the next frame or I dont...
// resume the delivery queue so it will deliver the next frame
if (frameData)
{
dispatch_resume(mFrameDeliveryQueue);
}

return frameData;
}

@end

static void DieOnError(int error)
{
if (error)
{
NSLog(@"Error in %s: %s", __PRETTY_FUNCTION__, strerror(error));
exit(error);
}
}

static NSString* NSStringFromDispatchData(dispatch_data_t data)
{
if (!data || !dispatch_data_get_size(data))
return @"";

const char* buf = NULL;
size_t size = 0;
dispatch_data_t notUsed = dispatch_data_create_map(data, (const void**)&buf, &size);
#pragma unused(notUsed)
NSString* str = [[NSString alloc] initWithBytes: buf length: size encoding: NSUTF8StringEncoding];
return str;
}

// Peel off a frame if there is one, and put the left-overs back.
static dispatch_data_t FrameDataFromAccumulator(dispatch_data_t* accumulator)
{
__block dispatch_data_t frameData = dispatch_data_create(NULL, 0, NULL, NULL); // empty
__block dispatch_data_t leftOver = dispatch_data_create(NULL, 0, NULL, NULL); // empty

__block BOOL didFindFrame = NO;
dispatch_data_apply(*accumulator, ^bool(dispatch_data_t region, size_t offset, const void *buffer, size_t size) {
ssize_t newline = -1;
for (size_t i = 0; !didFindFrame && i < size; ++i)
{
if (((const char *)buffer)[i] == '\n')
{
newline = i;
break;
}
}

if (newline == -1)
{
if (!didFindFrame)
{
frameData = dispatch_data_create_concat(frameData, region);
}
else
{
leftOver = dispatch_data_create_concat(leftOver, region);
}
}
else if (newline >= 0)
{
didFindFrame = YES;
frameData = dispatch_data_create_concat(frameData, dispatch_data_create_subrange(region, 0, newline + 1));
leftOver = dispatch_data_create_concat(leftOver, dispatch_data_create_subrange(region, newline + 1, size - newline - 1));
}

return true;
});

*accumulator = leftOver;

return didFindFrame ? frameData : nil;
}

static CVReturn MyDisplayLinkCallback(CVDisplayLinkRef displayLink, const CVTimeStamp* now, const CVTimeStamp* outputTime, CVOptionFlags flagsIn, CVOptionFlags* flagsOut, void* displayLinkContext)
{
SOAppDelegate* self = (__bridge SOAppDelegate*)displayLinkContext;

dispatch_data_t frameData = [self getFrameForDisplay];

NSString* dataAsString = NSStringFromDispatchData(frameData);

if (dataAsString.length == 0)
{
NSLog(@"Dropped frame...");
}
else
{
NSLog(@"Drawing frame in CVDisplayLink. Contents: %@", dataAsString);
}

return kCVReturnSuccess;
}

In theory, GCD is supposed to balance these queues for you. For instance, if allowing the "producer" queue to proceed was causing memory usage to go up, GCD would (in theory) start letting other queues go, and hold up the producer queue. In practice, this mechanism is opaque to us, so who knows how well it would work for you under real-world circumstances, especially in the face of your real-time restrictions.

If any specific thing here is unclear, please post a comment, and I'll try to elaborate.

why class prefixes are still followed in apple's latest frameworks

Now since we have namespacing available and is implicit across modules, why classes are still prefixed

Because the APIs are written in, and must remain usable by, Objective-C, which has no namespacing.

The NS is now (Swift 3) gone from the Foundation classes, which is certainly something. This is accomplished by putting an "overlay" in front of the Foundation API. In the future, this could be done with other frameworks. (Questions about the future, however, are inappropriate for Stack Overflow, and you are as capable of reading the swift-evolution site as everyone else is.)



Related Topics



Leave a reply



Submit