Best Way to Convert Callback-Based Async Method to Awaitable Task

Best way to convert callback-based async method to awaitable task

Your code is short, readable and efficient, so I don't understand why are you looking for alternatives, but I can't think of anything. I think your approach is reasonable.

I'm also not sure why do you think that the synchronous part is okay in the original version, but you want to avoid it in the Task-based one. If you think the synchronous part might take too long, fix it for both versions of the method.

But if you want to run it asynchronously (i.e. on the ThreadPool) only in the Task version, you can use Task.Run():

public Task<string> GetStringFromUrl(string url)
{
return Task.Run(() =>
{
var t = new TaskCompletionSource<string>();

GetStringFromUrl(url, s => t.TrySetResult(s));

return t.Task;
});
}

Converting WCF Web service calls from callback-based async method to awaitable task

What happened to e.Cancelled, e.Error, and e.Result?

e.Cancelled

If you have an async method (in your case the cmc.getSlideImageAsync) then this can be cancelled through a CancellationToken. Inside this method if you repeatedly check where the cancellation has been requested or not (via the ThrowIfCancellationRequested) then it will throw a OperationCanceledException (or a derived class).

So, the equivalent of e.Cancelled is this:

getSlideImageResponse response;
try
{
response = await cmc.getSlideImageAsync(..., cancellationToken);
}
catch(OperationCanceledException ex)
{
//if(e.Cancelled) logic goes here
}

e.Error

If your async method fails for whatever reason then it will populate the underlying Task's Exception property.

Task<getSlideImageResponse> getTask = cmc.getSlideImageAsync(...);
getTask.Wait(); //BAD PRACTICE, JUST FOR DEMONSTRATION PURPOSES
if(getTask.Exception != null)
{
//if(e.Error != null) logic goes here
}

The above code is suboptimal since the .Wait is a blocking call and can cause deadlock. The recommended approach is to use await. This operator can retrieve the .Exception property from the Task and can throw it again:

getSlideImageResponse response;
try
{
response = await cmc.getSlideImageAsync(...);
}
catch(Exception ex)
{
//if(e.Error != null) logic goes here
}

e.Result

This property was populated only if the method was not cancelled or did not fail. The same is true here:

getSlideImageResponse response;
try
{
response = await cmc.getSlideImageAsync(..., cancellationToken);
}
catch(OperationCanceledException ocex)
{
//if(e.Cancelled) logic goes here
}
catch(Exception ex)
{
//if(e.Error != null) logic goes here
}

//if(e.Result != null) logic goes here

Converting a delegate callback-based API to async

There are a lot of answers that show how to convert events or Begin/End async operations into tasks. That code though doesn't follow the conventions of either model. It's similar to the Event-based Async model EAP without using an event. If you searched for event to task conversions, you'd find a lot of answers. Delegates arent' used for async operations though, as the convention before EAP was to sue the Asynchronous Programming Model (APM) or Begin/End.

The process process is still the same though. It's described in Interop with Other Asynchronous Patterns and Types.
In all cases, a TaskCompletionSource is used to create a Task that's signalled when an operation completes.

When the class follows the APM conventions, one can use the TaskFactory.FromAsync method to convert a Beging/End pair into a task. FromAsync uses a TaskCompletionSource under the covers to return a Task that's signaled when the callback is called. The Interop doc example for this is Stream.BeginRead :

public static Task<int> ReadAsync(this Stream stream, 
byte[] buffer, int offset,
int count)
{
if (stream == null)
throw new ArgumentNullException("stream");

return Task<int>.Factory.FromAsync(stream.BeginRead,
stream.EndRead, buffer,
offset, count, null);
}

Using delegates is similar to using events, which is also shown in the interop article. Adapted to the question, it would look something like this :

public Task<bool> ConnectAsync(ThatService service)
{
if (service==null)
throw new ArgumentNullException(nameof(service));

var tcs=new TaskCompletionSource<bool>();

service.ConnectResultHandler=(ok,msg)=>
{
if(ok)
{
tcs.TrySetResult(true);
}
else
{
tcs.TrySetException(new Exception(msg));
}
};

return tcs.Task;
}

This will allow you to use ConnectAsync in an async method, eg :

public async Task MyMethod()
{
...
var ok=await ConnectAsync(_service);
...

}

If msg contains data on success, you could change ConnectAsync to :

public Task<string> ConnectAsync(ThatService service)
{
if (service==null)
throw new ArgumentNullException(nameof(service));

var tcs=new TaskCompletionSource<string>();

service.ConnectResultHandler=(ok,msg)=>
{
if(ok)
{
tcs.TrySetResult(msg);
}
else
{
tcs.TrySetException(new Exception(msg));
}
};

return tcs.Task;
}

You can change ConnectAsync into an extension method which will allow you to use it as if it were a method of your service class :

public static class MyServiceExtensions 
{
public static Task<string> ConnectAsync(this ThatService service)
{
//Same as before
}
}

And use it :

public async Task MyMethod()
{
...
var msg=await _service.ConnectAsync();
...
}

How to wrap async callback with Observable or async/await?

I am not sure about RX. However, you can convert callback-based SendNetworkRequest to an awaitable task like this:

void SendNetworkRequest(string requestType, Action<Response> callback)
{
// This code by third party, this is here just for testing
if (callback != null)
{
var result = new Response();
callback(result);
}
}

Task<Response> SendNetworkRequestAsync(string requestType)
{
return Task.Run(() =>
{
var t = new TaskCompletionSource<Response>();

SendNetworkRequest(requestType, s => t.TrySetResult(s));

return t.Task;
});
}

Now you can consume SendNetworkRequestAsync with async/await more naturally

Converting a Python function with a callback to an asyncio awaitable

An equivalent of promisify wouldn't work for this use case for two reasons:

  • PyAudio's async API doesn't use the asyncio event loop - the documentation specifies that the callback is invoked from a background thread. This requires precautions to correctly communicate with asyncio.
  • The callback cannot be modeled by a single future because it is invoked multiple times, whereas a future can only have one result. Instead, it must be converted to an async iterator, just as shown in your sample code.

Here is one possible implementation:

def make_iter():
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
def put(*args):
loop.call_soon_threadsafe(queue.put_nowait, args)
async def get():
while True:
yield await queue.get()
return get(), put

make_iter returns a pair of <async iterator, put-callback>. The returned objects hold the property that invoking the callback causes the iterator to produce its next value (the arguments passed to the callback). The callback may be called to call from an arbitrary thread and is thus safe to pass to pyaudio.open, while the async iterator should be given to async for in an asyncio coroutine, which will be suspended while waiting for the next value:

async def main():
stream_get, stream_put = make_iter()
stream = pa.open(stream_callback=stream_put)
stream.start_stream()
async for in_data, frame_count, time_info, status in stream_get:
# ...

asyncio.get_event_loop().run_until_complete(main())

Note that, according to the documentation, the callback must also return a meaningful value, a tuple of frames and a Boolean flag. This can be incorporated in the design by changing the fill function to also receive the data from the asyncio side. The implementation is not included because it might not make much sense without an understanding of the domain.

Promisify / async-await callback in c#

You can make 'promisify' Foo as follows:

static Task<string> FooAsync(List<int> data)
{
var tcs = new TaskCompletionSource<string>();
Action action = () => Foo(data, result => tcs.SetResult(result));
// If Foo isn't blocking, we can execute it in the ThreadPool:
Task.Run(action);
// If it is blocking for some time, it is better to create a dedicated thread
// and avoid starving the ThreadPool. Instead of 'Task.Run' use:
// Task.Factory.StartNew(action, TaskCreationOptions.LongRunning);
return tcs.Task;
}

Now you can call it:

string result = await FooAsync(myList);

How to convert callback-based async function to async generator

As Vincent noticed, you shouldn't use async for to fetch connections since code inside it would block other connections from handling. Consider each connection as separate task that should be run regardless of others.

Here's example of how you can yield inside my_start_server using queue which also shows that we still will return to some sort of on_connection:

async def my_start_server(host, port):
queue = asyncio.Queue()

async def put(reader, writer):
await queue.put((reader, writer,))

await asyncio.start_server(put, host, port)

while True:
yield (await queue.get())

.

async for (reader, writer) in my_start_server(host, port):

# If we will just handle reader/writer here, others will be suspended from handling.
# We can avoid it starting some task,
# but in this case 'handle' would be nothing different from 'on_connection'

asyncio.Task(handle(reader, writer))

callback based async method with multiple parameters to awaitabletask

As mentioned in the comment

The SDK for Accountright API supports async/await i.e. GetRangeAsync

so you can do something like this if you wanted/needed to wrap it in a TaskCompletionSource

static Task<CompanyFile[]> DoWork()
{
var tcs = new TaskCompletionSource<CompanyFile[]>();
Task.Run(async () =>
{
var cfsCloud = new CompanyFileService(_configurationCloud, null, _oAuthKeyService);
var files = await cfsCloud.GetRangeAsync();
tcs.SetResult(files);
});
return tcs.Task;
}


Related Topics



Leave a reply



Submit