How to Implement a Never Ending Task. (Timers VS Task)

Proper way to implement a never ending task. (Timers vs Task)

I'd use TPL Dataflow for this (since you're using .NET 4.5 and it uses Task internally). You can easily create an ActionBlock<TInput> which posts items to itself after it's processed it's action and waited an appropriate amount of time.

First, create a factory that will create your never-ending task:

ITargetBlock<DateTimeOffset> CreateNeverEndingTask(
Action<DateTimeOffset> action, CancellationToken cancellationToken)
{
// Validate parameters.
if (action == null) throw new ArgumentNullException("action");

// Declare the block variable, it needs to be captured.
ActionBlock<DateTimeOffset> block = null;

// Create the block, it will call itself, so
// you need to separate the declaration and
// the assignment.
// Async so you can wait easily when the
// delay comes.
block = new ActionBlock<DateTimeOffset>(async now => {
// Perform the action.
action(now);

// Wait.
await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken).
// Doing this here because synchronization context more than
// likely *doesn't* need to be captured for the continuation
// here. As a matter of fact, that would be downright
// dangerous.
ConfigureAwait(false);

// Post the action back to the block.
block.Post(DateTimeOffset.Now);
}, new ExecutionDataflowBlockOptions {
CancellationToken = cancellationToken
});

// Return the block.
return block;
}

I've chosen the ActionBlock<TInput> to take a DateTimeOffset structure; you have to pass a type parameter, and it might as well pass some useful state (you can change the nature of the state, if you want).

Also, note that the ActionBlock<TInput> by default processes only one item at a time, so you're guaranteed that only one action will be processed (meaning, you won't have to deal with reentrancy when it calls the Post extension method back on itself).

I've also passed the CancellationToken structure to both the constructor of the ActionBlock<TInput> and to the Task.Delay method call; if the process is cancelled, the cancellation will take place at the first possible opportunity.

From there, it's an easy refactoring of your code to store the ITargetBlock<DateTimeoffset> interface implemented by ActionBlock<TInput> (this is the higher-level abstraction representing blocks that are consumers, and you want to be able to trigger the consumption through a call to the Post extension method):

CancellationTokenSource wtoken;
ActionBlock<DateTimeOffset> task;

Your StartWork method:

void StartWork()
{
// Create the token source.
wtoken = new CancellationTokenSource();

// Set the task.
task = CreateNeverEndingTask(now => DoWork(), wtoken.Token);

// Start the task. Post the time.
task.Post(DateTimeOffset.Now);
}

And then your StopWork method:

void StopWork()
{
// CancellationTokenSource implements IDisposable.
using (wtoken)
{
// Cancel. This will cancel the task.
wtoken.Cancel();
}

// Set everything to null, since the references
// are on the class level and keeping them around
// is holding onto invalid state.
wtoken = null;
task = null;
}

Why would you want to use TPL Dataflow here? A few reasons:

Separation of concerns

The CreateNeverEndingTask method is now a factory that creates your "service" so to speak. You control when it starts and stops, and it's completely self-contained. You don't have to interweave state control of the timer with other aspects of your code. You simply create the block, start it, and stop it when you're done.

More efficient use of threads/tasks/resources

The default scheduler for the blocks in TPL data flow is the same for a Task, which is the thread pool. By using the ActionBlock<TInput> to process your action, as well as a call to Task.Delay, you're yielding control of the thread that you were using when you're not actually doing anything. Granted, this actually leads to some overhead when you spawn up the new Task that will process the continuation, but that should be small, considering you aren't processing this in a tight loop (you're waiting ten seconds between invocations).

If the DoWork function actually can be made awaitable (namely, in that it returns a Task), then you can (possibly) optimize this even more by tweaking the factory method above to take a Func<DateTimeOffset, CancellationToken, Task> instead of an Action<DateTimeOffset>, like so:

ITargetBlock<DateTimeOffset> CreateNeverEndingTask(
Func<DateTimeOffset, CancellationToken, Task> action,
CancellationToken cancellationToken)
{
// Validate parameters.
if (action == null) throw new ArgumentNullException("action");

// Declare the block variable, it needs to be captured.
ActionBlock<DateTimeOffset> block = null;

// Create the block, it will call itself, so
// you need to separate the declaration and
// the assignment.
// Async so you can wait easily when the
// delay comes.
block = new ActionBlock<DateTimeOffset>(async now => {
// Perform the action. Wait on the result.
await action(now, cancellationToken).
// Doing this here because synchronization context more than
// likely *doesn't* need to be captured for the continuation
// here. As a matter of fact, that would be downright
// dangerous.
ConfigureAwait(false);

// Wait.
await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken).
// Same as above.
ConfigureAwait(false);

// Post the action back to the block.
block.Post(DateTimeOffset.Now);
}, new ExecutionDataflowBlockOptions {
CancellationToken = cancellationToken
});

// Return the block.
return block;
}

Of course, it would be good practice to weave the CancellationToken through to your method (if it accepts one), which is done here.

That means you would then have a DoWorkAsync method with the following signature:

Task DoWorkAsync(CancellationToken cancellationToken);

You'd have to change (only slightly, and you're not bleeding out separation of concerns here) the StartWork method to account for the new signature passed to the CreateNeverEndingTask method, like so:

void StartWork()
{
// Create the token source.
wtoken = new CancellationTokenSource();

// Set the task.
task = CreateNeverEndingTask((now, ct) => DoWorkAsync(ct), wtoken.Token);

// Start the task. Post the time.
task.Post(DateTimeOffset.Now, wtoken.Token);
}

Best-way for endless-task

Since you're running your continuous task asynchronously, why not just make it an endless loop?

Task.Run(() =>
{
while (true)
{
var output = await READ_LINE_ASYNCHRONOUSLY();
_outputLines.Add(output);
}
});

And if you need some way to break out of your loop (I'd assume you might), use CancellationToken, as described e.g. here:

How to cancel a Task in await?

EDIT: Here's the full code doing what you probably want to do:

Task.Run(() => ReadUntilEnd(streamReader, cancellationTokenSource.Token),
cancellationTokenSource.Token);

//...

private static async Task ReadUntilEnd(StreamReader streamReader,
CancellationToken cancellationToken)
{
char[] buffer = new char[1];
while (!cancellationToken.IsCancellationRequested)
{
await streamReader.ReadAsync(buffer, 0, 1);
readAChar(buffer[0]);
}
}

Execute never ending multIple Task in parallel

There is no need use Parallel.ForEach as you already have 3 tasks. This should do it:

var actions = new Action[] { EventCallBack, LogCallBack, DataCallBack };

await Task.WhenAll(actions.Select(async action =>
{
while (!_cts.Token.IsCancellationRequested)
{
action();
ExecutionCore(_cts.Token);
await Task.Delay(ExecutionLoopDelayMs, _cts.Token)
}
}, _cts.Token));

Task.Delay vs Thread.Sleep for suspending System.Timers.Timer's job

It depends on what your goal is.

Thread.Sleep will block the system thread that is in use. By default System.Timers.Timer uses the system thread-pool, meaning you wouldn't block your main thread, and the use of Thread.Sleep probably will execute concurrently with the rest of your program. If you desire is to use (and therefore block) a specific thread, you will need use the SynchronizingObject property, in coordination with Thread.Sleep.

Task.Delay will provide a logical delay without blocking the current thread. This is the best approach, unless you have a reason to block a specific thread. Keep in mind you'll need to make your event handler asynchronous to use Task.Delay:

 timer.Elapsed += async (sender, args) =>
{
for (int i = 0; i < 15; i++)
{
await Task.Delay(1000);
...
}
}

A task that will never end until cancellation is requested

As an alternative to TaskCompletionSource with token.Register, here are some one-liners:

var task = new Task(() => {}, token); // don't do task.Run()!

Or, simply this:

var task = Task.Delay(Timeout.Infinite, token);

There's even a nice optimization for Timeout.Infinite in the current Task.Delay implementation.



Related Topics



Leave a reply



Submit