.Net Asynchronous Stream Read/Write

.NET Asynchronous stream read/write

You are going to need to use the callback from the NetStream read to handle this. And frankly it might be easier to wrap the copying logic into its own class so that you can maintain the instance of the active Streams.

This is how I'd approach it (not tested):

public class Assignment1
{
public static void NetToFile(NetworkStream net, FileStream file)
{
var copier = new AsyncStreamCopier(net, file);
copier.Start();
}

public static void NetToFile_Option2(NetworkStream net, FileStream file)
{
var completedEvent = new ManualResetEvent(false);

// copy as usual but listen for completion
var copier = new AsyncStreamCopier(net, file);
copier.Completed += (s, e) => completedEvent.Set();
copier.Start();

completedEvent.WaitOne();
}

/// <summary>
/// The Async Copier class reads the input Stream Async and writes Synchronously
/// </summary>
public class AsyncStreamCopier
{
public event EventHandler Completed;

private readonly Stream input;
private readonly Stream output;

private byte[] buffer = new byte[4096];

public AsyncStreamCopier(Stream input, Stream output)
{
this.input = input;
this.output = output;
}

public void Start()
{
GetNextChunk();
}

private void GetNextChunk()
{
input.BeginRead(buffer, 0, buffer.Length, InputReadComplete, null);
}

private void InputReadComplete(IAsyncResult ar)
{
// input read asynchronously completed
int bytesRead = input.EndRead(ar);

if (bytesRead == 0)
{
RaiseCompleted();
return;
}

// write synchronously
output.Write(buffer, 0, bytesRead);

// get next
GetNextChunk();
}

private void RaiseCompleted()
{
if (Completed != null)
{
Completed(this, EventArgs.Empty);
}
}
}
}

Write and read from a stream asynchronously

Is there another option for this?

The code is currently downloading to a memory stream and then writing it out to disk. If that's all you need to do with it, then passing a file stream instead of a memory stream should be sufficient.

What I am struggling with is the reading since it seems streams (MemoryStream in this case) does not support simultaneous reading and writing.

This is correct. No built-in stream types support simultaneous reads and writes. One reason is that there's a notion of a single "position" that is updated by both reads and writes.

It should be possible to write a concurrent stream type. You'd need to handle concurrent access as well as having two positions rather than one, and some operations might not be supported for concurrent streams. I've thought about writing a type like this a few times, but didn't feel it was sufficiently useful in a world where Pipelines, Channels, Dataflow, and async streams already exist.

So, I'd say:

  1. If possible, just pass it a file stream and get rid of the memory stream completely.
  2. Otherwise, explore the GCP API and see if there's a non-stream-based solution you can use to download.
  3. If the GCP API must download to a stream and your code must do in-memory processing during the save, then you'll need to write a concurrent stream type.

Asynchronous Reading/Writing to same stream?

It's not clear why you would want to use MemoryStream. The Stream.CopyTo method in .NET 4 doesn't need to use an intermediate stream - it will just read into a local buffer of a fixed size, then write that buffer to the output stream, then read more data (overwriting the buffer) etc.

If you're not using .NET 4, it's easy to implement something similar, e.g.

public static void CopyTo(this Stream input, Stream output)
{
byte[] buffer = new byte[64 * 1024]; // 64K buffer
int bytesRead;
while ((bytesRead = input.Read(buffer, 0, buffer.Length)) > 0)
{
output.Write(buffer, 0, bytesRead);
}
}

Can starting multiple asyncronous read/write operations on the same Stream corrupt the data?

It depends on the implementation of the Stream. For instance sockets support multiple overlapped requests both for read and for write, and so does the file API. They guarantee consistency of each operation (no interleaving content) and also order of operations: for instance for socket reads the bytes received will be placed in the buffers posted in the order posted. If these guarantees would not be provided it would impossible to write high performance network application, as overlapping Sends are desired but overlapping Receives are actually required for high performance network IO. This behavior is described in many articles, including good ole' Windows Sockets 2.0: Write Scalable Winsock Apps Using Completion Ports and is documented on MSDN Overlapped Input/Output:

Both send and receive operations can
be overlapped. The receive functions
may be invoked multiple times to post
receive buffers in preparation for
incoming data, and the send functions
may be invoked multiple times to queue
up multiple buffers to be sent. Note
that while a series of overlapped send
buffers will be sent in the order
supplied, the corresponding completion
indications may occur in a different
order. Likewise, on the receiving
side, buffers will be filled in the
order they are supplied, but
completion indications may occur in a
different order.

Not surprisingly, the same guarantees carry over to the managed side of the world, eg. the NetworkStream class:

Read and write operations can be
performed simultaneously on an
instance of the NetworkStream class
without the need for synchronization.
As long as there is one unique thread
for the write operations and one
unique thread for the read operations,
there will be no cross-interference
between read and write threads and no
synchronization is required.

That being said, throwing randomly async reads and writes on a Stream will result in chaos pretty quickly. Applications need to carefully orchestrate the order in which threads submit the operations so that the order is deterministic. Usually this means keeping accounting in a (synchronized) list with special care to do the async operation invocation while holding the synchronization lock.

One final note is that all these async APIs make a special note in calling out that the order of completion is not guaranteed to match the order of submission.

Write binary asynchronously?

BinaryWriter is unnecessary here. Just WriteAsync the byte[] to the stream

using (FileStream fs = new FileStream(fileFullPath, FileMode.OpenOrCreate, FileAccess.Write))
{
await fs.WriteAsync(data,0,data.Length);
logger.Debug("Finished writing file {0} to disk", fileFullPath);
}

Where do I gain something with asynchronous stream copy in .net

Async processing helps whenever you're dealing with non-CPU-related latencies and want to avoid those without spanning insane amounts of threads (which doesn't scale well - there shouldn't be too many threads per CPU because threads are "expensive" resources, abundant context switching can kill your performance).

Typical benefactors are therefore IO operations, since both disk and network are "slow" compared to the computing effort required for processing the data. So whenever you may have many parallel streams (network and disk) you should use async operations to deal with them.

That said, proper async programming is hard, especially without the baked-in support in the new .NET compilers.

A good async copy should use at least two buffers; one reads data (async) and one writes data (async) concurrently. Of course the idle writer will only start after the reader is done with its block, and the idle reader will only start when there is at least a spare buffer (e.g. writer has finished writing). When you have one buffer only, there will be breaks between consecutive read and write operations, e.g. it will always be "read, write, read, ...write" instead of "read, read+write, read+write, ...write".

Constantly read from NetworkStream async

async void should at the very least be async Task with the return value thrown away. That makes the method adhere to sane standards and pushes the responsibility into the caller which is better equipped to make decisions about waiting and error handling.

But you don't have to throw away the return value. You can attach a logging continuation:

async Task Log(Task t) {
try { await t; }
catch ...
}

And use it like this:

Log(Listen());

Throw away the task returned by Log (or, await it if you wish to logically wait).

Or, simply wrap everything in Listen in a try-catch. This appears to be the case already.

Can I catch exceptions thrown in Listen if I don't await it?

You can find out about exceptions using any way that attaches a continuation or waits synchronously (the latter is not your strategy).

Is there a better way to constantly read from a network stream using async/await?

No, this is the way it's supposed to be done. At any given time there should be one read IO outstanding. (Or zero for a brief period of time.)

Is it actually sane to try to continuously read from a network stream using async/await or is a thread a better option?

Both will work correctly. There is a trade-off to be made. Synchronous code can be simpler, easier to debug and even less CPU intensive. Asynchronous code saved on thread stack memory and context switches. In UI apps await has significant benefits.

Stream reading with await async

You've got a never-ending recursion.

You're calling ReadAsync() forever and never return from the method (hence breaking the infinite recursion).

A possible solution is:

public async void ReadAsync()
{
byte[] data = new byte[1024];
int numBytesRead = await _stream.ReadAsync(data, 0, 1024);

// Process data ...

// Read again.
if(numBytesRead > 0)
{
ReadAsync();
}
}

To understand recursion better, check this.

Process incoming FileStream asynchronously

The point here is that you're working within the constraints of ASP.NET, which abstracts away a lot of the underlying HTTP stuff.

When you say you want to process a user-uploaded file asynchronously, you want to step out of the normal order of doing things with HTTP and ASP.NET. You see, when a client sends a request with a body (the file), the server receives the request headers and kicks off ASP.NET to tell your application code that there's a new request incoming.

It hasn't even (fully) read the request body at this point. This is why you get a Stream to deal with the request, and not a string or a filename - the data doesn't have to be arrived at the server yet! Just the request headers, informing the web server about the request.

If you return a response at that point, for all HTTP and ASP.NET care, you're done with the request, and you cannot continue reading its body.

Now what you want to do, is to read the request body (the file), and process that after sending a response to the client. You can do that, but then you'll still have to read the request body - because if you return something from your action method before reading the request, the framework will think you're done with it and dispose the request stream. That's what's causing your exception.

If you'd use a string, or model binding, or anything that involves the framework reading the request body, then yes, your code will only execute once the body has been read.

The short-term solution that would appear to get you going, is to read the request stream into a stream that you own, not the framework:

var myStream = new MemoryStream();
await stream.CopyTo(myStream);
Task.Run(async () => await ProcessFileAsync(myStream));

Now you'll have read the entire request body and saved it in memory, so ASP.NET can safely dispose the request stream and send a response to the client.

But don't do this. Starting fire-and-forget tasks from a controller is a bad idea. Keeping uploaded files in memory is a bad idea.

What you actually should do, if you still want to do this out-of-band:

  • Save the incoming file as an actual, temporary file on your server
  • Send a response to the client with an identifier (the temporarily generated filename, for example a GUID)
  • Expose an endpoint that clients can use to request the status using said GUID
  • Have a background process continuously scan the directory for newly uploaded files and process them

For the latter you could hosted services or third-party tools like Hangfire.



Related Topics



Leave a reply



Submit