How to protect resources that may be used in a multi-threaded or async environment?
You can use SemaphoreSlim
with 1 as the number of requests. SemaphoreSlim
allows to lock in both an async
fashion using WaitAsync
and the old synchronous way:
await _semphore.WaitAsync()
try
{
... use shared resource.
}
finally
{
_semphore.Release()
}
You can also write your own AsyncLock
based on Stephen Toub's great post Building Async Coordination Primitives, Part 6: AsyncLock. I did it in my application and allowed for both synchronous and asynchronous locks on the same construct.
Usage:
// Async
using (await _asyncLock.LockAsync())
{
... use shared resource.
}
// Synchronous
using (_asyncLock.Lock())
{
... use shared resource.
}
Implementation:
class AsyncLock
{
private readonly Task<IDisposable> _releaserTask;
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly IDisposable _releaser;
public AsyncLock()
{
_releaser = new Releaser(_semaphore);
_releaserTask = Task.FromResult(_releaser);
}
public IDisposable Lock()
{
_semaphore.Wait();
return _releaser;
}
public Task<IDisposable> LockAsync()
{
var waitTask = _semaphore.WaitAsync();
return waitTask.IsCompleted
? _releaserTask
: waitTask.ContinueWith(
(_, releaser) => (IDisposable) releaser,
_releaser,
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
}
private class Releaser : IDisposable
{
private readonly SemaphoreSlim _semaphore;
public Releaser(SemaphoreSlim semaphore)
{
_semaphore = semaphore;
}
public void Dispose()
{
_semaphore.Release();
}
}
}
How to synchronize shared resources with async/await pattern
Use SemaphoreSlim.WaitAsync
as a replacement for lock
.
Reentrancy in async/await?
Since both calls run on the UI thread the code is "thread safe" in the traditional sense of - there wouldn't be any exceptions or corrupted data.
However, can there be logical race conditions? Sure. You could easily have this flow (or any other):
UpdateCurrentIp() - button
UpdateCurrentIp() - Timer
UpdateUserIps() - Timer
UpdateUserIps() - button
By the method names it seems not to really be an issue but that depends on the actual implementation of these methods.
Generally you can avoid these problems by synchronizing calls using a SemaphoreSlim
, or an AsyncLock
(How to protect resources that may be used in a multi-threaded or async environment?):
using (await _asyncLock.LockAsync())
{
await IpChangedReactor.UpdateIps();
}
In this case though, it seems that simply avoiding starting a new update when one is currently running is good enough:
if (_isUpdating) return;
_isUpdating = true;
try
{
await IpChangedReactor.UpdateIps();
}
finally
{
_isUpdating = false;
}
async await async all the way
No, this will not deadlock because you're blocking on a task that's being executed by a ThreadPool
thread with no SynchronizationContext
. Since it isn't running on the UI thread there's nothing stopping that task from completing and so there's no deadlock.
If this was your code, it will have deadlocked:
public void Initialize()
{
DoSomeWorkAsync().Wait();
}
This is still not a good reason to block though, you should use async-await all they way up.
Guarantee TransformBlock output sequence
TransformBlock
already maintains FIFO order. The order in which you post items to the block is the exact order in which the items will be returned from the block.
When you specify a maximum degree of parallelism that is larger than 1, multiple messages are processed simultaneously, and therefore, messages might not be processed in the order in which they are received. The order in which the messages are output from the block will, however, be correctly ordered.
From Dataflow (Task Parallel Library)
You can see that with this example:
private static async Task MainAsync()
{
var transformBlock = new TransformBlock<int, int>(async input =>
{
await Task.Delay(RandomGen2.Next(5, 100));
return input;
}, new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 10});
foreach (var number in Enumerable.Range(0,100))
{
await transformBlock.SendAsync(number);
}
for (int i = 0; i < 100; i++)
{
var result = await transformBlock.ReceiveAsync();
Console.WriteLine(result);
}
}
In which the order will be ordered 0-99.
However, what you seem to want is some correlation with threads, so a thread will post an item to the block and then receive its result. This doesn't really fit into TPL Dataflow which should be more a pipeline of blocks. You can hack it with BoundedCapacity = 1
but you probably shouldn't.
SynchronizationLockException on Monitor.Exit when using await
You can't await
a task inside a lock
scope (which is syntactic sugar for Monitor.Enter
and Monitor.Exit
). Using a Monitor
directly will fool the compiler but not the framework.
async-await
has no thread-affinity like a Monitor
does. The code after the await
will probably run in a different thread than the code before it. Which means that the thread that releases the Monitor
isn't necessarily the one that acquired it.
Either don't use async-await
in this case, or use a different synchronization construct like SemaphoreSlim
or an AsyncLock
you can build yourself. Here's mine: https://stackoverflow.com/a/21011273/885318
How can I fire and forget a task without blocking main thread?
Your questions are so abstract that I'll try to give common answers to all of them.
How can I "fire and forget" a task without blocking main thread?
It depends on what you mean by saying forget.
- If you are not planning to access that task after running, you can run it in a parallel process.
- If the main application should be able to access a background task, then you should have an event-driven architecture. In that case, the things previously called tasks will be services or microservices.
I don't want to use any task queues (celery, rabbitmq, etc.) here because the tasks I'm thinking of are too small and fast to run. Just want to get them done as out of the way as possible. Would that be an async approach? Throwing them onto another process?
If it contains loops or other CPU-bound operations, then right to use a subprocess. If the task makes a request (async), reads files, logs to stdout
, or other I/O bound operations, then it is right to use coroutines or threads.
Does it make sense to have a separate thread that handles background jobs? Like a simple job queue but very lightweight and does not require additional infrastructure?
We can't just use a thread as it can be blocked by another task that uses CPU-bound operations. Instead, we can run a background process and use pipes, queues, and events to communicate between processes. Unfortunately, we cannot provide complex objects between processes, but we can provide basic data structures to handle status changes of the tasks running in the background.
Regarding the Starlette and the BackgroundTask
Starlette is a lightweight ASGI framework/toolkit, which is ideal for building async web services in Python. (README description)
It is based on concurrency. So even this is not a generic solution for all kinds of tasks.
NOTE: Concurrency differs from parallelism.
I'm wondering if we can build something more generic where you can run background tasks in scripts or webservers alike, without sacrificing performance.
The above-mentioned solution suggests use a background process. Still, it will depend on the application design as you must do things (emit an event, add an indicator to the queue, etc.) that are needed for communication and synchronization of running processes (tasks). There is no generic tool for that, but there are situation-dependent solutions.
Situation 1 - The tasks are asynchronous functions
Suppose we have a request
function that should call an API without blocking the work of other tasks. Also, we have a sleep
function that should not block anything.
import asyncio
import aiohttp
async def request(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
try:
return await response.json()
except aiohttp.ContentTypeError:
return await response.read()
async def sleep(t):
await asyncio.sleep(t)
async def main():
background_task_1 = asyncio.create_task(request("https://google.com/"))
background_task_2 = asyncio.create_task(sleep(5))
... # here we can do even CPU-bound operations
result1 = await background_task_1
... # use the 'result1', etc.
await background_task_2
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
In this situation, we use asyncio.create_task
to run a coroutine concurrently (like in the background). Sure we could run it in a subprocess, but there is no reason for that as it would use more resources without improving the performance.
Situation 2 - The tasks are synchronous functions (I/O bound)
Unlike the first situation where the functions were already asynchronous, in this situation, those are synchronous but not CPU-bound (I/O bound). This gives an ability to run them in threads or make them asynchronous (using asyncio.to_thread
) and run concurrently.
import time
import asyncio
import requests
def asynchronous(func):
"""
This decorator converts a synchronous function to an asynchronous
Usage:
@asynchronous
def sleep(t):
time.sleep(t)
async def main():
await sleep(5)
"""
async def wrapper(*args, **kwargs):
await asyncio.to_thread(func, *args, **kwargs)
return wrapper
@asynchronous
def request(url):
with requests.Session() as session:
response = session.get(url)
try:
return response.json()
except requests.JSONDecodeError:
return response.text
@asynchronous
def sleep(t):
time.sleep(t)
async def main():
background_task_1 = asyncio.create_task(request("https://google.com/"))
background_task_2 = asyncio.create_task(sleep(5))
...
Here we used a decorator to convert a synchronous (I/O bound) function to an asynchronous one and use them like in the first situation.
Situation 3 - The tasks are synchronous functions (CPU-bound)
To run CPU-bound tasks parallelly in the background we have to use multiprocessing. And for ensuring the task is done we use the join
method.
import time
import multiprocessing
def task():
for i in range(10):
time.sleep(0.3)
def main():
background_task = multiprocessing.Process(target=task)
background_task.start()
... # do the rest stuff that does not depend on the background task
background_task.join() # wait until the background task is done
... # do stuff that depends on the background task
if __name__ == "__main__":
main()
Suppose the main application depends on the parts of the background task. In this case, we need an event-driven design as the join
cannot be called multiple times.
import multiprocessing
event = multiprocessing.Event()
def task():
... # synchronous operations
event.set() # notify the main function that the first part of the task is done
... # synchronous operations
event.set() # notify the main function that the second part of the task is also done
... # synchronous operations
def main():
background_task = multiprocessing.Process(target=task)
background_task.start()
... # do the rest stuff that does not depend on the background task
event.wait() # wait until the first part of the background task is done
... # do stuff that depends on the first part of the background task
event.wait() # wait until the second part of the background task is done
... # do stuff that depends on the second part of the background task
background_task.join() # wait until the background task is finally done
... # do stuff that depends on the whole background task
if __name__ == "__main__":
main()
As you already noticed with events we can just provide binary information and those are not effective if the processes are more than two (It will be impossible to know where the event was emitted from). So we use pipes, queues, and manager to provide non-binary information between the processes.
Related Topics
Error Accessing Com Components
C# Conditional and (&&) or (||) Precedence
X509Certificate Constructor Exception
What's the How to Minimize to Tray a C# Winforms App
How to Delete a Directory with Read-Only Files in C#
Finding Quoted Strings with Escaped Quotes in C# Using a Regular Expression
How to Find Default Web Browser Using C#
Get Property Name and Type Using Lambda Expression
Async Process Start and Wait for It to Finish
Open Xml Reading from Excel File
"Updatesourcetrigger=Propertychanged" Equivalent for a Windows Phone 7 Textbox
Navigation Property Should Be Virtual - Not Required in Ef Core
Question Mark and Colon in Statement. What Does It Mean
Create Bitmap from a Byte Array of Pixel Data