Async Threadsafe Get from Memorycache

Async threadsafe Get from MemoryCache

A simple solution would be to use SemaphoreSlim.WaitAsync() instead of a lock, and then you could get around the issue of awaiting inside a lock. Although, all other methods of MemoryCache are thread-safe.

private SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1);
public async Task<T> GetAsync(
string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
if (parameters != null)
key += JsonConvert.SerializeObject(parameters);

if (!_cache.Contains(key))
{
await semaphoreSlim.WaitAsync();
try
{
if (!_cache.Contains(key))
{
var data = await populator();
_cache.Add(key, data, DateTimeOffset.Now.Add(expire));
}
}
finally
{
semaphoreSlim.Release();
}
}

return (T)_cache.Get(key);
}

How I can GetAsync in a thread safe manner (memoryCache) c#

SemaphoreSlim powered lock.

For example you have a method that must be accessed only by single Thread at once, for example:

public async Task<MyData> GetDataAsync(string request); // not thread-safe!

An array of requests for data e.g.

string[] requests;

And you want to process these requests in parallel but do external call in a thread-safe manner using some method that performs the not safe call.

private async Task MakeRequestAsync(string request, SemaphoreSlim semaphore)
{
await PrepareAsync();
MyData data = null;

await semaphore.WaitAsync(); // begin sync code
try
{
data = await GetDataAsync(request);
}
finally
{
semaphore.Release(); // end sync code
}

if (data != null)
await ProcessResultAsync(data);
}
List<Task> tasks = new List<Task>();
using (SemaphoreSlim semaphore = new SemaphoreSlim(1)) // 1 = concurrency degree
{
foreach (string request in requests)
tasks.Add(MakeRequestAsync(request, semaphore));
await Task.WhenAll(tasks);
}

MemoryCache Thread Safety, Is Locking Necessary?

The default MS-provided MemoryCache is entirely thread safe. Any custom implementation that derives from MemoryCache may not be thread safe. If you're using plain MemoryCache out of the box, it is thread safe. Browse the source code of my open source distributed caching solution to see how I use it (MemCache.cs):

https://github.com/haneytron/dache/blob/master/Dache.CacheHost/Storage/MemCache.cs

Is MemoryCache.Set() thread-safe?

Yes, the MemoryCache class is thread safe:

System.Runtime.Caching.MemoryCache is threadsafe. Multiple concurrent
threads can read and write a MemoryCache instance. Internally
thread-safety is automatically handled to ensure the cache is updated
in a consistent manner.

What this might be referring to is that data stored within the cache
may itself not be threadsafe. For example if a List is placed in
the cache, and two separate threads both get a reference to the cached
List, the two threads will end up stepping on each other if they
both attempt to update the list simultaneously.

This being said the Get and Set methods are thread safe but if the data structure you might be storing into this cache is not thread safe you might get into trouble. Imagine for example that you stored a dictionary inside this cache. Then while thread1 uses Get to fetch the dictionary and starts reading from it, thread2 uses Get to fetch this same dictionary and tries to write to it. While the Get operation will be thread safe what will happen next could be pretty nasty.

Thread safety on async/await with memory caching

This is actually a compile optimization issue. When you compile in release for some reason it predicts complete will never be true and infinitely runs your application. Since you've based this on another example I'm guessing you already knew that. But as far as the async / await it can't be blamed.

To make this work you would still need to set complete as a volatile variable like so:

        static volatile bool complete = false;

This will tell the compiler to check it every cycle regardless and it will work.

I'm not saying I agree with it but what's happening is the compiler is seeing complete unchanged all the way up to the point of the while(!complete) section and since there is no volatile keyword it decides it's never going to change to optimize performance.

Another way to make this work is to remove the compiler optimizations. You can click on the project Properties and then the Build tab and uncheck 'Optimize code'. Then it will work in release.

Locking when returning a cache entry from a asynchronous request

Use a SemaphoreSlim so you can lock asynchronously and put the lock block before you make the call to the web service.

private static readonly SemaphoreSlim CompanyInfoLock = new SemaphoreSlim (1); 

public async Task<CompanyDto> GetCompanyInfo()
{
const string cacheKey = "_COMPANYINFO_";
CompanyDto companyInfo;

companyInfo = MemoryCache.Default[cacheKey] as CompanyDto;
if (companyInfo != null) return companyInfo;

await CompanyInfoLock.WaitAsync();
try
{
//Check a 2nd time inside the lock to see if the cache has the data.
companyInfo = MemoryCache.Default[cacheKey] as CompanyDto;
if (companyInfo != null) return companyInfo;

companyInfo = await _userManagementService.InvokeAsync(x => x.GetCompanyAsync(AppPrincipal.Current.CurrentCompanyId));

MemoryCache.Default.Add(cacheKey, companyInfo, new CacheItemPolicy
{
SlidingExpiration = new TimeSpan(2, 0, 0)
});

return companyInfo;
}
finally
{
CompanyInfoLock.Release();
}
}

Thread-safe Cached Enumerator - lock with yield

Your class is not thread safe, because shared state is mutated in unprotected regions inside your class. The unprotected regions are:

  1. The constructor
  2. The Dispose method

The shared state is:

  1. The _enumerator private field
  2. The _cache private field
  3. The CachingComplete public property

Some other issues regarding your class:

  1. Implementing IDisposable creates the responsibility to the caller to dispose your class. There is no need for IEnumerables to be disposable. In the contrary IEnumerators are disposable, but there is language support for their automatic disposal (feature of foreach statement).
  2. Your class offers extended functionality not expected from an IEnumerable (ElementAt, Count etc). Maybe you intended to implement a CachedList instead? Without implementing the IList<T> interface, LINQ methods like Count() and ToArray() cannot take advantage of your extended functionality, and will use the slow path like they do with plain vanilla IEnumerables.

Update: I just noticed another thread-safety issue. This one is related to the public IEnumerator<T> GetEnumerator() method. The enumerator is compiler-generated, since the method is an iterator (utilizes yield return). Compiler-generated enumerators are not thread safe. Consider this code for example:

var enumerable = Enumerable.Range(0, 1_000_000);
var cachedEnumerable = new CachedEnumerable<int>(enumerable);
var enumerator = cachedEnumerable.GetEnumerator();
var tasks = Enumerable.Range(1, 4).Select(id => Task.Run(() =>
{
int count = 0;
while (enumerator.MoveNext())
{
count++;
}
Console.WriteLine($"Task #{id} count: {count}");
})).ToArray();
Task.WaitAll(tasks);

Four threads are using concurrently the same IEnumerator. The enumerable has 1,000,000 items. You may expect that each thread would enumerate ~250,000 items, but that's not what happens.

Output:

Task #1 count: 0

Task #4 count: 0

Task #3 count: 0

Task #2 count: 1000000

The MoveNext in the line while (enumerator.MoveNext()) is not your safe MoveNext. It is the compiler-generated unsafe MoveNext. Although unsafe, it includes a mechanism intended probably for dealing with exceptions, that marks temporarily the enumerator as finished before calling the externally provided code. So when multiple threads are calling the MoveNext concurrently, all but the first will get a return value of false, and will terminate instantly the enumeration, having completed zero loops. To solve this you must probably code your own IEnumerator class.


Update: Actually my last point about thread-safe enumeration is a bit unfair, because enumerating with the IEnumerator interface is an inherently unsafe operation, which is impossible to fix without the cooperation of the calling code. This is because obtaining the next element is not an atomic operation, since it involves two steps (call MoveNext() + read Current). So your thread-safety concerns are limited to the protection of the internal state of your class (fields _enumerator, _cache and CachingComplete). These are left unprotected only in the constructor and in the Dispose method, but I suppose that the normal use of your class may not follow code paths that create the race conditions that would result to internal state corruption.

Personally I would prefer to take care of these code paths too, and I wouldn't let it to the whims of chance.


Update: I wrote a cache for IAsyncEnumerables, to demonstrate an alternative technique. The enumeration of the source IAsyncEnumerable is not driven by the callers, using locks or semaphores to obtain exclusive access, but by a separate worker-task. The first caller starts the worker-task. Each caller at first yields all items that are already cached, and then awaits for more items, or for a notification that there are no more items. As notification mechanism I used a TaskCompletionSource<bool>. A lock is still used to ensure that all access to shared resources is synchronized.

public class CachedAsyncEnumerable<T> : IAsyncEnumerable<T>
{
private readonly object _locker = new object();
private IAsyncEnumerable<T> _source;
private Task _sourceEnumerationTask;
private List<T> _buffer;
private TaskCompletionSource<bool> _moveNextTCS;
private Exception _sourceEnumerationException;
private int _sourceEnumerationVersion; // Incremented on exception

public CachedAsyncEnumerable(IAsyncEnumerable<T> source)
{
_source = source ?? throw new ArgumentNullException(nameof(source));
}

public async IAsyncEnumerator<T> GetAsyncEnumerator(
CancellationToken cancellationToken = default)
{
lock (_locker)
{
if (_sourceEnumerationTask == null)
{
_buffer = new List<T>();
_moveNextTCS = new TaskCompletionSource<bool>();
_sourceEnumerationTask = Task.Run(
() => EnumerateSourceAsync(cancellationToken));
}
}
int index = 0;
int localVersion = -1;
while (true)
{
T current = default;
Task<bool> moveNextTask = null;
lock (_locker)
{
if (localVersion == -1)
{
localVersion = _sourceEnumerationVersion;
}
else if (_sourceEnumerationVersion != localVersion)
{
ExceptionDispatchInfo
.Capture(_sourceEnumerationException).Throw();
}
if (index < _buffer.Count)
{
current = _buffer[index];
index++;
}
else
{
moveNextTask = _moveNextTCS.Task;
}
}
if (moveNextTask == null)
{
yield return current;
continue;
}
var moved = await moveNextTask;
if (!moved) yield break;
lock (_locker)
{
current = _buffer[index];
index++;
}
yield return current;
}
}

private async Task EnumerateSourceAsync(CancellationToken cancellationToken)
{
TaskCompletionSource<bool> localMoveNextTCS;
try
{
await foreach (var item in _source.WithCancellation(cancellationToken))
{
lock (_locker)
{
_buffer.Add(item);
localMoveNextTCS = _moveNextTCS;
_moveNextTCS = new TaskCompletionSource<bool>();
}
localMoveNextTCS.SetResult(true);
}
lock (_locker)
{
localMoveNextTCS = _moveNextTCS;
_buffer.TrimExcess();
_source = null;
}
localMoveNextTCS.SetResult(false);
}
catch (Exception ex)
{
lock (_locker)
{
localMoveNextTCS = _moveNextTCS;
_sourceEnumerationException = ex;
_sourceEnumerationVersion++;
_sourceEnumerationTask = null;
}
localMoveNextTCS.SetException(ex);
}
}
}

This implementation follows a specific strategy for dealing with exceptions. If an exception occurs while enumerating the source IAsyncEnumerable, the exception will be propagated to all current callers, the currently used IAsyncEnumerator will be discarded, and the incomplete cached data will be discarded too. A new worker-task may start again later, when the next enumeration request is received.



Related Topics



Leave a reply



Submit