A Pattern to Pause/Resume an Async Task

A pattern to pause/resume an async task?

Updated for 2019, I've recently had a chance to revisit this code, below is complete example as a console app (warning: PauseTokenSource needs good unit testing).

Note, in my case, the requirement was that when the consumer-side code (which requested the pause) would continue, the producer-side code should have already reached the paused state. Thus, by the time the UI is ready to reflect the paused state, all background activity is expected to have been already paused.

using System;
using System.Threading.Tasks;
using System.Threading;

namespace Console_19613444
{
class Program
{
// PauseTokenSource
public class PauseTokenSource
{
bool _paused = false;
bool _pauseRequested = false;

TaskCompletionSource<bool> _resumeRequestTcs;
TaskCompletionSource<bool> _pauseConfirmationTcs;

readonly SemaphoreSlim _stateAsyncLock = new SemaphoreSlim(1);
readonly SemaphoreSlim _pauseRequestAsyncLock = new SemaphoreSlim(1);

public PauseToken Token { get { return new PauseToken(this); } }

public async Task<bool> IsPaused(CancellationToken token = default(CancellationToken))
{
await _stateAsyncLock.WaitAsync(token);
try
{
return _paused;
}
finally
{
_stateAsyncLock.Release();
}
}

public async Task ResumeAsync(CancellationToken token = default(CancellationToken))
{
await _stateAsyncLock.WaitAsync(token);
try
{
if (!_paused)
{
return;
}

await _pauseRequestAsyncLock.WaitAsync(token);
try
{
var resumeRequestTcs = _resumeRequestTcs;
_paused = false;
_pauseRequested = false;
_resumeRequestTcs = null;
_pauseConfirmationTcs = null;
resumeRequestTcs.TrySetResult(true);
}
finally
{
_pauseRequestAsyncLock.Release();
}
}
finally
{
_stateAsyncLock.Release();
}
}

public async Task PauseAsync(CancellationToken token = default(CancellationToken))
{
await _stateAsyncLock.WaitAsync(token);
try
{
if (_paused)
{
return;
}

Task pauseConfirmationTask = null;

await _pauseRequestAsyncLock.WaitAsync(token);
try
{
_pauseRequested = true;
_resumeRequestTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
_pauseConfirmationTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
pauseConfirmationTask = WaitForPauseConfirmationAsync(token);
}
finally
{
_pauseRequestAsyncLock.Release();
}

await pauseConfirmationTask;

_paused = true;
}
finally
{
_stateAsyncLock.Release();
}
}

private async Task WaitForResumeRequestAsync(CancellationToken token)
{
using (token.Register(() => _resumeRequestTcs.TrySetCanceled(), useSynchronizationContext: false))
{
await _resumeRequestTcs.Task;
}
}

private async Task WaitForPauseConfirmationAsync(CancellationToken token)
{
using (token.Register(() => _pauseConfirmationTcs.TrySetCanceled(), useSynchronizationContext: false))
{
await _pauseConfirmationTcs.Task;
}
}

internal async Task PauseIfRequestedAsync(CancellationToken token = default(CancellationToken))
{
Task resumeRequestTask = null;

await _pauseRequestAsyncLock.WaitAsync(token);
try
{
if (!_pauseRequested)
{
return;
}
resumeRequestTask = WaitForResumeRequestAsync(token);
_pauseConfirmationTcs.TrySetResult(true);
}
finally
{
_pauseRequestAsyncLock.Release();
}

await resumeRequestTask;
}
}

// PauseToken - consumer side
public struct PauseToken
{
readonly PauseTokenSource _source;

public PauseToken(PauseTokenSource source) { _source = source; }

public Task<bool> IsPaused() { return _source.IsPaused(); }

public Task PauseIfRequestedAsync(CancellationToken token = default(CancellationToken))
{
return _source.PauseIfRequestedAsync(token);
}
}

// Basic usage

public static async Task DoWorkAsync(PauseToken pause, CancellationToken token)
{
try
{
while (true)
{
token.ThrowIfCancellationRequested();

Console.WriteLine("Before await pause.PauseIfRequestedAsync()");
await pause.PauseIfRequestedAsync();
Console.WriteLine("After await pause.PauseIfRequestedAsync()");

await Task.Delay(1000);
}
}
catch (Exception e)
{
Console.WriteLine("Exception: {0}", e);
throw;
}
}

static async Task Test(CancellationToken token)
{
var pts = new PauseTokenSource();
var task = DoWorkAsync(pts.Token, token);

while (true)
{
token.ThrowIfCancellationRequested();

Console.WriteLine("Press enter to pause...");
Console.ReadLine();

Console.WriteLine("Before pause requested");
await pts.PauseAsync();
Console.WriteLine("After pause requested, paused: " + await pts.IsPaused());

Console.WriteLine("Press enter to resume...");
Console.ReadLine();

Console.WriteLine("Before resume");
await pts.ResumeAsync();
Console.WriteLine("After resume");
}
}

static async Task Main()
{
await Test(CancellationToken.None);
}
}
}

How to Pause and Resume the Task which runs Asynchronously

You have to be aware of the differences between a Task and a Thread. A Task is something you want to be done. A thread is one of the many possible workers which performs that task. Separate tasks don't have to be performed by separate threads. The same thread can perform several Tasks. One task might be performed by several threads.

If you want a level of control where you want to be able to halt a running task for a while at any moment, regardless of what it is doing right now, consider using a thread. Be aware that when you halt a thread for a while it will be holding the scarce resources it might use at that moment.

Therefore it might be wiser to only hold your process at certain pre-defined points of execution. In that case a Task might be a wiser choice.

By using a Task you express that you don't want to be bothered with who is doing the execution, therefore you can't control the executioner. In fact, by using a task you express that you even don't care if your complete process is handled by one thread or multiple threads.

If you want to halt execution of your process at certain defined points in the process consider dividing the task into sub-tasks, each sub-task ending at a point where you would want the process to halt. By using Task.ContinueWith you can decide when the task should continue processing the next step of the process.

The neat solution would be to wrap your process (or task) into a class where users of the class could say: run until step X is performed, or: run until state Y is reached. This way the users of the class wouldn't even have to know whether your process is run using a Task or a Thread.

Pause and resume a looping Task without using TaskCompletionSource

Here's a version using async:

private int _target = 0;
private int _counter = 0;
private CancellationTokenSource cts = null;

private async Task Execute(CancellationToken ct)
{
while (true)
{
txtCount.Text = $"{++_counter}";
if (_counter == _target)
{
btnStart.Text = "START";
break;
}
await Task.Delay(TimeSpan.FromMilliseconds(500.0));
if (ct.IsCancellationRequested)
{
break;
}
}
}

private async void btnStart_Click(object sender, EventArgs e)
{
if (_target == _counter)
{
if (int.TryParse(txtInput.Text, out int target) && target > 0)
{
_target = target;
_counter = 0;
txtCount.Text = $"{_counter}";
btnStart.Text = "PAUSE";
cts = new CancellationTokenSource();
await Execute(cts.Token);
}
else
{
txtCount.Text = "INVALID";
}
}
else
{
if (cts != null)
{
cts.Cancel();
cts = null;
btnStart.Text = "RESUME";
}
else
{
btnStart.Text = "PAUSE";
cts = new CancellationTokenSource();
await Execute(cts.Token);
}
}
}


private int _target = 0;
private int _counter = 0;

private async Task Execute()
{
while (true)
{
if (btnStart.Text == "PAUSE")
{
txtCount.Text = $"{++_counter}";
if (_counter == _target)
{
btnStart.Text = "START";
break;
}
}
await Task.Delay(TimeSpan.FromMilliseconds(500.0));
}
}

private async void btnStart_Click(object sender, EventArgs e)
{
if (btnStart.Text == "START")
{
if (int.TryParse(txtInput.Text, out int target) && target > 0)
{
_target = target;
_counter = 0;
txtCount.Text = $"{_counter}";
btnStart.Text = "PAUSE";
await Execute();
}
else
{
txtCount.Text = "INVALID";
}
}
else
{
btnStart.Text = btnStart.Text == "RESUME" ? "PAUSE" : "RESUME";
}
}

How can I pause & resume a task in asyncio?

You cannot suspend and resume an asyncio task.

You could cancel the task and later create a new one, but this leads to more problems than it solves. Data consistency may be compromised and the new task will not resume exactly where the previous one was interupted.

You could easily make the task wait at some specific point (or points)

async def loop_orders():
while True:
... wait here while paused ...
do_this()
do_that()

but when the create_order pauses the loop_orders task, the former must wait until the latter reaches that point where it pauses - the create_order task requests a pause and the loop_orders acknowledges.

I made a little demo with two Events that I named enable and idle in an attempt to make the method names .clear, .set and .wait match the logic.

import asyncio

enable = None
idle = None

async def loop_orders():
while True:
idle.set()
await enable.wait()
idle.clear();
print("processing order ... ", end="", flush=True)
await asyncio.sleep(0.7)
print("done")

async def create_order():
enable.clear();
await idle.wait()
print("-- pause start ... ", end="", flush=True)
await asyncio.sleep(2)
print("stop")
enable.set()

async def test():
global enable, idle

enable = asyncio.Event()
enable.set() # initial state = enabled
idle = asyncio.Event()

asyncio.create_task(loop_orders())
await asyncio.sleep(2)
await create_order()
await asyncio.sleep(2)
await create_order()
await asyncio.sleep(1)
print("EXIT")

asyncio.run(test())

How to Pause/Resume an asynchronous worker, without incurring memory allocation overhead?

I believe you're misunderstanding Reset. ManualResetValueTaskSourceCore<T>.Reset is nothing at all like ManualResetEvent.Reset. For ManualResetValueTaskSourceCore<T>, Reset means "the previous operation has completed, and now I want to reuse the value task, so change the version". So, it should be called after GetResult (i.e., after the paused code is done awaiting), not within Pause. The easiest way to encapsulate this IMO is to Reset the state immediately before returning the ValueTask.

Similarly, your code shouldn't call SetResult unless there's a value task already returned. ManualResetValueTaskSourceCore<T> is really designed around sequential operations pretty strictly, and having a separate "controller" complicates things. You could probably get it working by keeping track of whether or not the consumer is waiting, and only attempting to complete if there is one:

public class Pausation : IValueTaskSource
{
private ManualResetValueTaskSourceCore<bool> _source;
private readonly object _mutex = new();
private bool _paused;
private bool _waiting;

public Pausation() => _source.RunContinuationsAsynchronously = true;

public void Pause()
{
lock (_mutex)
_paused = true;
}

public void Resume()
{
var wasWaiting = false;
lock (_mutex)
{
wasWaiting = _waiting;
_paused = _waiting = false;
}

if (wasWaiting)
_source.SetResult(default);
}

public ValueTask WaitWhilePausedAsync()
{
lock (_mutex)
{
if (!_paused) return ValueTask.CompletedTask;
_waiting = true;
_source.Reset();
return new ValueTask(this, _source.Version);
}
}

void IValueTaskSource.GetResult(short token)
{
_source.GetResult(token);
}

ValueTaskSourceStatus IValueTaskSource.GetStatus(short token)
{
return _source.GetStatus(token);
}

void IValueTaskSource.OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
{
_source.OnCompleted(continuation, state, token, flags);
}
}

I fixed some of the obvious race conditions using a mutex, but I can't guarantee there aren't more subtle ones remaining. It will certainly be limited to a single controller and single consumer, at least.

Pause and resume AsyncTask Android

Pausing AsynkTask is not best choice. Still you may find example of pausing AsynkTask in Google sample app DisplayingBitmaps in ImageWorker class.

private class BitmapWorkerTask extends AsynkTask<Void, Void, BitmapDrawable> {
@Override
protected BitmapDrawable doInBackground(Void... params) {
synchronized (mPauseWorkLock) {
while (mPauseWork && !isCancelled()) {
try {
mPauseWorkLock.wait();
} catch (InterruptedException e) {}
}
}
}
...
}

public void setPauseWork(boolean pauseWork) {
synchronized(mPauseWorkLock) {
mPauseWork = pauseWork;
if (!mPauseWork) {
mPauseWorkLock.notifyAll();
}
}
}


Related Topics



Leave a reply



Submit