A Reusable Pattern to Convert Event into Task

A reusable pattern to convert event into task

It is possible with a helper class and a fluent-like syntax:

public static class TaskExt
{
public static EAPTask<TEventArgs, EventHandler<TEventArgs>> FromEvent<TEventArgs>()
{
var tcs = new TaskCompletionSource<TEventArgs>();
var handler = new EventHandler<TEventArgs>((s, e) => tcs.TrySetResult(e));
return new EAPTask<TEventArgs, EventHandler<TEventArgs>>(tcs, handler);
}
}

public sealed class EAPTask<TEventArgs, TEventHandler>
where TEventHandler : class
{
private readonly TaskCompletionSource<TEventArgs> _completionSource;
private readonly TEventHandler _eventHandler;

public EAPTask(
TaskCompletionSource<TEventArgs> completionSource,
TEventHandler eventHandler)
{
_completionSource = completionSource;
_eventHandler = eventHandler;
}

public EAPTask<TEventArgs, TOtherEventHandler> WithHandlerConversion<TOtherEventHandler>(
Converter<TEventHandler, TOtherEventHandler> converter)
where TOtherEventHandler : class
{
return new EAPTask<TEventArgs, TOtherEventHandler>(
_completionSource, converter(_eventHandler));
}

public async Task<TEventArgs> Start(
Action<TEventHandler> subscribe,
Action action,
Action<TEventHandler> unsubscribe,
CancellationToken cancellationToken)
{
subscribe(_eventHandler);
try
{
using(cancellationToken.Register(() => _completionSource.SetCanceled()))
{
action();
return await _completionSource.Task;
}
}
finally
{
unsubscribe(_eventHandler);
}
}
}

Now you have a WithHandlerConversion helper method, which can infer type parameter from converter argument, which means you need to write WebBrowserDocumentCompletedEventHandler only one time.
Usage:

await TaskExt
.FromEvent<WebBrowserDocumentCompletedEventArgs>()
.WithHandlerConversion(handler => new WebBrowserDocumentCompletedEventHandler(handler))
.Start(
handler => this.webBrowser.DocumentCompleted += handler,
() => this.webBrowser.Navigate(@"about:blank"),
handler => this.webBrowser.DocumentCompleted -= handler,
CancellationToken.None);

Howto wrap EAP Pattern Method and IProgress with Task

I ended up with using the follwoing solution from this post:
A reusable pattern to convert event into task.
The functions is now awaitbale, but not cancelbale..coudn't find a way to achieve this so far

I use the TaskExt Class like this in my CompressDirectoryTaskAsync:

   public static Task CompressDirectoryTaskAsync(SevenZipCompressor compressor, 
CompressItem actionItem, IProgress<CompressItem> progress,
CancellationToken cancellationToken)
{
// Do some stuff with MyClass to get sourcePath and archiveFileName
//
...

// Add Event Handler and Progress
compressor.Compressing += new EventHandler<ProgressEventArgs>((sender, args) =>
{ CompressItem_ProgressChanged(sender, args, actionItem, progress); });

compressor.CompressionFinished += new EventHandler<EventArgs>((sender, args) =>
{ CompressItem_FileCompleted(sender, args, actionItem, progress); });

compressor.FileCompressionStarted += new EventHandler<FileNameEventArgs>((sender, args) =>
{ CompressItem_FileCompressionStarted(sender, args, actionItem, progress); });

// Start Compression
await TaskExt
.FromEvent<EventArgs>()
.WithHandlerConversion(handler => new EventHandler<EventArgs>(handler))
.Start(
handler => compressor.CompressionFinished += handler,
() => compressor.BeginCompressDirectory(actionItem.SourcePath, archiveFileName),
handler => compressor.CompressionFinished -= handler,
cancellationToken).ConfigureAwait(false);
...
...
}

Interactivity with the d3 reusable pattern

What you're looking for is to create a separation between your core application code and code that can be reused. This is a good idea because it keeps your application's code small, making it easier for you to change it and move it forward. In a reusable chart you want to send a very generic events, e.g. clicked, while in your application you need the events to be very specific to your domain, e.g. addSugar.

There are two ingredients you need: d3.dispatch and d3.rebind. The former helps you create a clear internal API, while the latter helps you expose the dispatched events to the outside world.

Go here to find an example of this. What you want to look for is three things:

  1. Inside the reusable chart you create a dispatcher with events you want to publish to the outside world: var myDispatch = d3.dispatch('myclick', 'mydrag')
  2. Then in your reusable chart's event handlers you publish these events like so: myDispatch.myclick(argumentsyouwanttopass)
  3. As a last step you make these events available to the outside: return d3.rebind(_chart, myDispatch, "on");
  4. Then you can bind to these events on the outside: myChart.on('myclick', function(){})

So your example, rewritten could look like this:

function chart() {
var dispatch = d3.dispatch('click');

function _chart(selection) {
selection.each(function(d, i) {

selection.selectAll('rect')
.data(d)
.enter()
.append('rect')
.attr('x', 0)
.attr('y', function(d, i) {return i * 11;})
.attr('width', function(d, i) {return d;})
.attr('height', 10);
.on('click', dispatch.click);

});
}

return d3.rebind(_chart, dispatch, 'on');
}

Additional note: to register multiple event handlers, you need to namespace them (like you have to for regular d3 events), e.g. chart.on('click.foo', handlerFoo), chart.on('click.bar', handlerBar) etc.

How to convert this code to async await?

You can use TaskCompletionSource<T> to wrap your EAP (event-based async pattern) into Tasks. It's not clear how you handle errors and cancel operations in your DataFeed class, so you will need to modify this code and add error handling (sample):

private Task ConnectAsync(DataFeed feed)
{
var tcs = new TaskCompletionSource<object>();
feed.OnConnected += _ => tcs.TrySetResult(null);
feed.BeginConnect();
return tcs.Task;
}

private Task LoginAsync(DataFeed feed, string user, string password)
{
var tcs = new TaskCompletionSource<object>();
feed.OnReady += _ => tcs.TrySetResult(null);
feed.BeginLogin(user, pass);
return tcs.Task;
}

Now you can use these methods:

public async void InitConnection()
{
var feed = new DataFeed(host, port);
await ConnectAsync(feed);
await LoadAsync(feed, user, pass);
//Now I'm ready
}

Note - you can move these async methods to DataFeed class. But if you can modify DataFeed, then better use TaskFactory.FromAsync to wrap APM API to Tasks.

Unfortunately there is no non-generic TaskCompletionSource which would return non-generic Task so, usually workaround is usage of Task<object>.

How to make a EAP methods to work synchronously

The idea is to do this:

  1. Make the event handler signal an event
  2. Wait for the event

If you are using await then a TaskCompletionSource is a good "event". If you are going synchronous I'd probably still use that and simply Wait on the task that would be provided by the TCS.

How to make TaskCompletionSource.Task complete using specific TaskScheduler

It would be interesting to know your goals behind this. Anyway, if you like to avoid the overhead of ContinueWith (which I think is quite low), you'd probably have to come up with your own version of a pattern similar to TaskCompletionSource.

It's not that complex. E.g., something like Promise below can be used in the same way you use TaskCompletionSource, but would allow to provide a custom TaskScheduler for completion (disclaimer: almost untested):

public class Promise
{
readonly Task _task;
readonly CancellationTokenSource _cts;
readonly object _lock = new Object();
Action _completionAction = null;

// public API

public Promise()
{
_cts = new CancellationTokenSource();
_task = new Task(InvokeCompletionAction, _cts.Token);
}

public Task Task { get { return _task; } }

public void SetCompleted(TaskScheduler sheduler = null)
{
lock(_lock)
Complete(sheduler);
}

public void SetException(Exception ex, TaskScheduler sheduler = null)
{
lock (_lock)
{
_completionAction = () => { throw ex; };
Complete(sheduler);
}
}

public void SetException(System.Runtime.ExceptionServices.ExceptionDispatchInfo edi, TaskScheduler sheduler = null)
{
lock (_lock)
{
_completionAction = () => { edi.Throw(); };
Complete(sheduler);
}
}

public void SetCancelled(TaskScheduler sheduler = null)
{
lock (_lock)
{
// don't call _cts.Cancel() outside _completionAction
// otherwise the cancellation won't be done on the sheduler
_completionAction = () =>
{
_cts.Cancel();
_cts.Token.ThrowIfCancellationRequested();
};
Complete(sheduler);
}
}

// implementation

void InvokeCompletionAction()
{
if (_completionAction != null)
_completionAction();
}

void Complete(TaskScheduler sheduler)
{
if (Task.Status != TaskStatus.Created)
throw new InvalidOperationException("Invalid task state.");
_task.RunSynchronously(sheduler?? TaskScheduler.Current);
}
}

On a side note, this version has an override for SetException(ExceptionDispatchInfo edi), so you could propagate the active exception's state from inside catch:

catch(Exception ex)
{
var edi = ExceptionDispatchInfo.Capture(ex);
promise.SetException(edi);
}

It's easy to create a generic version of this, too.

There's a downside of this approach, though. A 3rd party can do promise.Task.Run or promise.Task.RunSynchronously, as the Task is exposed in the TaskStatus.Created state.

You could add a check for that into InvokeCompletionAction, or you could probably hide it using nested tasks / Task.Unwrap (although the latter would bring some overhead back).

General purpose FromEvent method

Here you go:

internal class TaskCompletionSourceHolder
{
private readonly TaskCompletionSource<object[]> m_tcs;

internal object Target { get; set; }
internal EventInfo EventInfo { get; set; }
internal Delegate Delegate { get; set; }

internal TaskCompletionSourceHolder(TaskCompletionSource<object[]> tsc)
{
m_tcs = tsc;
}

private void SetResult(params object[] args)
{
// this method will be called from emitted IL
// so we can set result here, unsubscribe from the event
// or do whatever we want.

// object[] args will contain arguments
// passed to the event handler
m_tcs.SetResult(args);
EventInfo.RemoveEventHandler(Target, Delegate);
}
}

public static class ExtensionMethods
{
private static Dictionary<Type, DynamicMethod> s_emittedHandlers =
new Dictionary<Type, DynamicMethod>();

private static void GetDelegateParameterAndReturnTypes(Type delegateType,
out List<Type> parameterTypes, out Type returnType)
{
if (delegateType.BaseType != typeof(MulticastDelegate))
throw new ArgumentException("delegateType is not a delegate");

MethodInfo invoke = delegateType.GetMethod("Invoke");
if (invoke == null)
throw new ArgumentException("delegateType is not a delegate.");

ParameterInfo[] parameters = invoke.GetParameters();
parameterTypes = new List<Type>(parameters.Length);
for (int i = 0; i < parameters.Length; i++)
parameterTypes.Add(parameters[i].ParameterType);

returnType = invoke.ReturnType;
}

public static Task<object[]> FromEvent<T>(this T obj, string eventName)
{
var tcs = new TaskCompletionSource<object[]>();
var tcsh = new TaskCompletionSourceHolder(tcs);

EventInfo eventInfo = obj.GetType().GetEvent(eventName);
Type eventDelegateType = eventInfo.EventHandlerType;

DynamicMethod handler;
if (!s_emittedHandlers.TryGetValue(eventDelegateType, out handler))
{
Type returnType;
List<Type> parameterTypes;
GetDelegateParameterAndReturnTypes(eventDelegateType,
out parameterTypes, out returnType);

if (returnType != typeof(void))
throw new NotSupportedException();

Type tcshType = tcsh.GetType();
MethodInfo setResultMethodInfo = tcshType.GetMethod(
"SetResult", BindingFlags.NonPublic | BindingFlags.Instance);

// I'm going to create an instance-like method
// so, first argument must an instance itself
// i.e. TaskCompletionSourceHolder *this*
parameterTypes.Insert(0, tcshType);
Type[] parameterTypesAr = parameterTypes.ToArray();

handler = new DynamicMethod("unnamed",
returnType, parameterTypesAr, tcshType);

ILGenerator ilgen = handler.GetILGenerator();

// declare local variable of type object[]
LocalBuilder arr = ilgen.DeclareLocal(typeof(object[]));
// push array's size onto the stack
ilgen.Emit(OpCodes.Ldc_I4, parameterTypesAr.Length - 1);
// create an object array of the given size
ilgen.Emit(OpCodes.Newarr, typeof(object));
// and store it in the local variable
ilgen.Emit(OpCodes.Stloc, arr);

// iterate thru all arguments except the zero one (i.e. *this*)
// and store them to the array
for (int i = 1; i < parameterTypesAr.Length; i++)
{
// push the array onto the stack
ilgen.Emit(OpCodes.Ldloc, arr);
// push the argument's index onto the stack
ilgen.Emit(OpCodes.Ldc_I4, i - 1);
// push the argument onto the stack
ilgen.Emit(OpCodes.Ldarg, i);

// check if it is of a value type
// and perform boxing if necessary
if (parameterTypesAr[i].IsValueType)
ilgen.Emit(OpCodes.Box, parameterTypesAr[i]);

// store the value to the argument's array
ilgen.Emit(OpCodes.Stelem, typeof(object));
}

// load zero-argument (i.e. *this*) onto the stack
ilgen.Emit(OpCodes.Ldarg_0);
// load the array onto the stack
ilgen.Emit(OpCodes.Ldloc, arr);
// call this.SetResult(arr);
ilgen.Emit(OpCodes.Call, setResultMethodInfo);
// and return
ilgen.Emit(OpCodes.Ret);

s_emittedHandlers.Add(eventDelegateType, handler);
}

Delegate dEmitted = handler.CreateDelegate(eventDelegateType, tcsh);
tcsh.Target = obj;
tcsh.EventInfo = eventInfo;
tcsh.Delegate = dEmitted;

eventInfo.AddEventHandler(obj, dEmitted);
return tcs.Task;
}
}

This code will work for almost all events that return void (regardless of the parameter list).

It can be improved to support any return values if necessary.

You can see the difference between Dax's and mine methods below:

static async void Run() {
object[] result = await new MyClass().FromEvent("Fired");
Console.WriteLine(string.Join(", ", result.Select(arg =>
arg.ToString()).ToArray())); // 123, abcd
}

public class MyClass {
public delegate void TwoThings(int x, string y);

public MyClass() {
new Thread(() => {
Thread.Sleep(1000);
Fired(123, "abcd");
}).Start();
}

public event TwoThings Fired;
}

Briefly, my code supports really any kind of delegate type. You shouldn't (and don't need to) specify it explicitly like TaskFromEvent<int, string>.



Related Topics



Leave a reply



Submit