How to Concatenate Two System.Io.Stream Instances into One

How do I concatenate two System.IO.Stream instances into one?

class ConcatenatedStream : Stream
{
Queue<Stream> streams;

public ConcatenatedStream(IEnumerable<Stream> streams)
{
this.streams = new Queue<Stream>(streams);
}

public override bool CanRead
{
get { return true; }
}

public override int Read(byte[] buffer, int offset, int count)
{
int totalBytesRead = 0;

while (count > 0 && streams.Count > 0)
{
int bytesRead = streams.Peek().Read(buffer, offset, count);
if (bytesRead == 0)
{
streams.Dequeue().Dispose();
continue;
}

totalBytesRead += bytesRead;
offset += bytesRead;
count -= bytesRead;
}

return totalBytesRead;
}

public override bool CanSeek
{
get { return false; }
}

public override bool CanWrite
{
get { return false; }
}

public override void Flush()
{
throw new NotImplementedException();
}

public override long Length
{
get { throw new NotImplementedException(); }
}

public override long Position
{
get
{
throw new NotImplementedException();
}
set
{
throw new NotImplementedException();
}
}

public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}

public override void SetLength(long value)
{
throw new NotImplementedException();
}

public override void Write(byte[] buffer, int offset, int count)
{
throw new NotImplementedException();
}
}

How to merge two memory streams?

Spectacularly easy with CopyTo or CopyToAsync:

var streamOne = new MemoryStream();
FillThisStreamUp(streamOne);
var streamTwo = new MemoryStream();
DoSomethingToThisStreamLol(streamTwo);
streamTwo.CopyTo(streamOne); // streamOne holds the contents of both

The framework, people. The framework.

Is there a way to merge two OutputStreams?

OutputStream a is defined as Maybe a -> IO (), so two streams can be merged as follows:

-- Let the types guide you.
merge_ :: (Maybe a -> IO ()) -> (Maybe b -> IO ()) -> (Maybe (a, b) -> IO ())
merge_ eatA eatB Nothing = eatA Nothing >> eatB Nothing
merge_ eatA eatB (Just (a, b)) = eatA (Just a) >> eatB (Just b)

-- Use `writeTo` and `makeOutputStream` to convert between `OutputStream a` and its underlying representation `Maybe a -> IO ()`.
merge :: OutputStream a -> OutputStream b -> OutputStream (a, b)
merge streamA streamB = makeOutputStream (merge_ (writeTo streamA) (writeTo streamB))

Sidetrack

This appendix assumes some familiarity with category theory.

There was another question in the comments:

would it be correct to call this an instantiation of a monoidal functor in the Kleisli category of IO?

OutputStream and merge together form a monoidal functor, but not in Kleisli IO.

A monoidal functor is defined between monoidal categories, so first we must spell out those categories. A monoidal category is a category with a tensor product. In the rest of this post we will refer to a monoidal category as "_ with _"; in practice, the tensor product ("with _") is often left implicit if it can be inferred from the context or after it's been said at least once.

The obvious candidate here for a monoidal category is Kleisli IO with (,), but that is not a monoidal category: (,) is not even a bifunctor (when Kleisli IO is the relevant category), which would be necessary for it to be a tensor product. When you try to satisfy the definition of a bifunctor, you run into the problem that there is no function with the following signature that satisfies the composition law:

bimap :: (a -> IO c) -> (b -> IO d) -> ((a, b) -> IO (c, d))

-- Identity law (OK):
-- bimap pure pure = pure

-- Composition law (BROKEN):
-- bimap f1 g1 >=> bimap f2 g2 = bimap (f1 >=> f2) (g1 >=> g2)

The monoidal category you do have is the category of functions, with cartesian products (i.e., tuples) as the tensor product, hereafter denoted "(->) with (,)". And it happens that OutputStream with merge is a monoidal functor between the monoidal category (->) and its opposite.

The only way to confrm that is to walk through the definitions by yourself. Here's a rough list of all the things you have to check, which implicitly involves defining many functions and proving that they satisfy various laws, so each item hides a nontrivial amount of content if you're new to the concepts:

  • (->) is a category.
  • (->) with (,) is a monoidal category.
  • We can take the opposite of a monoidal category, which is another monoidal category; here that gives us (<-) with (,).
  • OutputStream is a functor between the categories (->) and its opposite (<-).
  • (OutputStream _, OutputStream _) and OutputStream (_, _) are (bi)functors, where the domain is the product category (->) x (->) and the codomain is (<-). (They are two ways of composing the bifunctor (,) with the functor OutputStream, so this comes for free if you accept that you can compose functors, but it's important to be able to spell them out to follow the other points below).
  • merge (or more technically, uncurry merge) is a natural transformation between (OutputStream _, OutputStream _) and OutputStream (_, _).
  • merge satisfies some additional coherence laws (involving the monoidal categorical structure of (->) with (,)): OutputStream with merge is a monoidal functor between (->) (with (,)) and its opposite (<-) (with (,)).

How to combine partially pre-cached MemoryStream with FileStream?

I would suggest something like the following solution:

  • Inherit your own CachableFileStream from FileStream
  • Implement a very simple Cache which uses a data structure you prefer (like a Queue)
  • Allow Preloading data into the internal cache
  • Allow Reloading data into the internal cache
  • Modify the original Read behaviour in a way, that your cache is used

To give you an idea of my idea I would suggest some implementation like the following one:

The usage could be like that:

CachableFileStream cachedStream = new CachableFileStream(...)
{
PreloadSize = 8192,
ReloadSize = 4096,
};

// Force preloading data into the cache
cachedStream.Preload();

...
cachedStream.Read(buffer, 0, buffer.Length);
...

Warning: The code below is neither correctly tested nor ideal - this shall just give you an idea!

The CachableFileStream class:

using System;
using System.IO;
using System.Threading.Tasks;

/// <summary>
/// Represents a filestream with cache.
/// </summary>
public class CachableFileStream : FileStream
{
private Cache<byte> cache;
private int preloadSize;
private int reloadSize;

/// <summary>
/// Gets or sets the amount of bytes to be preloaded.
/// </summary>
public int PreloadSize
{
get
{
return this.preloadSize;
}

set
{
if (value <= 0)
throw new ArgumentOutOfRangeException(nameof(value), "The specified preload size must not be smaller than or equal to zero.");

this.preloadSize = value;
}
}

/// <summary>
/// Gets or sets the amount of bytes to be reloaded.
/// </summary>
public int ReloadSize
{
get
{
return this.reloadSize;
}

set
{
if (value <= 0)
throw new ArgumentOutOfRangeException(nameof(value), "The specified reload size must not be smaller than or equal to zero.");

this.reloadSize = value;
}
}

/// <summary>
/// Initializes a new instance of the <see cref="CachableFileStream"/> class with the specified path and creation mode.
/// </summary>
/// <param name="path">A relative or absolute path for the file that the current CachableFileStream object will encapsulate</param>
/// <param name="mode">A constant that determines how to open or create the file.</param>
/// <exception cref="System.ArgumentException">
/// Path is an empty string (""), contains only white space, or contains one or more invalid characters.
/// -or- path refers to a non-file device, such as "con:", "com1:", "lpt1:", etc. in an NTFS environment.
/// </exception>
/// <exception cref="System.NotSupportedException">
/// Path refers to a non-file device, such as "con:", "com1:", "lpt1:", etc. in a non-NTFS environment.
/// </exception>
/// <exception cref="System.ArgumentNullException">
/// Path is null.
/// </exception>
/// <exception cref="System.Security.SecurityException">
/// The caller does not have the required permission.
/// </exception>
/// <exception cref="System.IO.FileNotFoundException">
/// The file cannot be found, such as when mode is FileMode.Truncate or FileMode.Open, and the file specified by path does not exist.
/// The file must already exist in these modes.
/// </exception>
/// <exception cref="System.IO.IOException">
/// An I/O error, such as specifying FileMode.CreateNew when the file specified by path already exists, occurred.-or-The stream has been closed.
/// </exception>
/// <exception cref="System.IO.DirectoryNotFoundException">
/// The specified path is invalid, such as being on an unmapped drive.
/// </exception>
/// <exception cref="System.IO.PathTooLongException">
/// The specified path, file name, or both exceed the system-defined maximum length.
/// For example, on Windows-based platforms, paths must be less than 248 characters, and file names must be less than 260 characters.
/// </exception>
/// <exception cref="System.ArgumentOutOfRangeException">
/// Mode contains an invalid value
/// </exception>
public CachableFileStream(string path, FileMode mode) : base(path, mode)
{
this.cache = new Cache<byte>();
this.cache.CacheIsRunningLow += CacheIsRunningLow;
}

/// <summary>
/// Reads a block of bytes from the stream and writes the data in a given buffer.
/// </summary>
/// <param name="array">
/// When this method returns, contains the specified byte array with the values between
/// offset and (offset + count - 1) replaced by the bytes read from the current source.
/// </param>
/// <param name="offset">The byte offset in array at which the read bytes will be placed.</param>
/// <param name="count">The maximum number of bytes to read.</param>
/// <returns>
/// The total number of bytes read into the buffer. This might be less than the number
/// of bytes requested if that number of bytes are not currently available, or zero
/// if the end of the stream is reached.
/// </returns>
/// <exception cref="System.ArgumentNullException">
/// Array is null.
/// </exception>
/// <exception cref="System.ArgumentOutOfRangeException">
/// Offset or count is negative.
/// </exception>
/// <exception cref="System.NotSupportedException">
/// The stream does not support reading.
/// </exception>
/// <exception cref="System.IO.IOException">
/// An I/O error occurred.
/// </exception>
/// <exception cref="System.ArgumentException">
/// Offset and count describe an invalid range in array.
/// </exception>
/// <exception cref="System.ObjectDisposedException">
/// Methods were called after the stream was closed.
/// </exception>
public override int Read(byte[] array, int offset, int count)
{
int readBytesFromCache;

for (readBytesFromCache = 0; readBytesFromCache < count; readBytesFromCache++)
{
if (this.cache.Size == 0)
break;

array[offset + readBytesFromCache] = this.cache.Read();
}

if (readBytesFromCache < count)
readBytesFromCache += base.Read(array, offset + readBytesFromCache, count - readBytesFromCache);

return readBytesFromCache;
}

/// <summary>
/// Preload data into the cache.
/// </summary>
public void Preload()
{
this.LoadBytesFromStreamIntoCache(this.PreloadSize);
}

/// <summary>
/// Reload data into the cache.
/// </summary>
public void Reload()
{
this.LoadBytesFromStreamIntoCache(this.ReloadSize);
}

/// <summary>
/// Loads bytes from the stream into the cache.
/// </summary>
/// <param name="count">The number of bytes to read.</param>
private void LoadBytesFromStreamIntoCache(int count)
{
byte[] buffer = new byte[count];
int readBytes = base.Read(buffer, 0, buffer.Length);

this.cache.AddRange(buffer, 0, readBytes);
}

/// <summary>
/// Represents the event handler for the CacheIsRunningLow event.
/// </summary>
/// <param name="sender">The sender of the event.</param>
/// <param name="e">Event arguments.</param>
private void CacheIsRunningLow(object sender, EventArgs e)
{
this.cache.WarnIfRunningLow = false;

new Task(() =>
{
Reload();
this.cache.WarnIfRunningLow = true;
}).Start();
}
}

The Cache class:

using System;
using System.Collections.Concurrent;

/// <summary>
/// Represents a generic cache.
/// </summary>
/// <typeparam name="T">Defines the type of the items in the cache.</typeparam>
public class Cache<T>
{
private ConcurrentQueue<T> queue;

/// <summary>
/// Is executed when the number of items within the cache run below the
/// specified warning limit and WarnIfRunningLow is set.
/// </summary>
public event EventHandler CacheIsRunningLow;

/// <summary>
/// Gets or sets a value indicating whether the CacheIsRunningLow event shall be fired or not.
/// </summary>
public bool WarnIfRunningLow
{
get;
set;
}

/// <summary>
/// Gets or sets a value that represents the lower warning limit.
/// </summary>
public int LowerWarningLimit
{
get;
set;
}

/// <summary>
/// Gets the number of items currently stored in the cache.
/// </summary>
public int Size
{
get;
private set;
}

/// <summary>
/// Initializes a new instance of the <see cref="Cache{T}"/> class.
/// </summary>
public Cache()
{
this.queue = new ConcurrentQueue<T>();
this.Size = 0;
this.LowerWarningLimit = 1024;
this.WarnIfRunningLow = true;
}

/// <summary>
/// Adds an item into the cache.
/// </summary>
/// <param name="item">The item to be added to the cache.</param>
public void Add(T item)
{
this.queue.Enqueue(item);
this.Size++;
}

/// <summary>
/// Adds the items of the specified array to the end of the cache.
/// </summary>
/// <param name="items">The items to be added.</param>
public void AddRange(T[] items)
{
this.AddRange(items, 0, items.Length);
}

/// <summary>
/// Adds the specified count of items of the specified array starting
/// from offset to the end of the cache.
/// </summary>
/// <param name="items">The array that contains the items.</param>
/// <param name="offset">The offset that shall be used.</param>
/// <param name="count">The number of items that shall be added.</param>
public void AddRange(T[] items, int offset, int count)
{
for (int i = offset; i < count; i++)
this.Add(items[i]);
}

/// <summary>
/// Reads one item from the cache.
/// </summary>
/// <returns>The item that has been read from the cache.</returns>
/// <exception cref="System.InvalidOperationException">
/// The cache is empty.
/// </exception>
public T Read()
{
T item;

if (!this.queue.TryDequeue(out item))
throw new InvalidOperationException("The cache is empty.");

this.Size--;

if (this.WarnIfRunningLow &&
this.Size < this.LowerWarningLimit)
{
this.CacheIsRunningLow?.Invoke(this, EventArgs.Empty);
}

return item;
}

/// <summary>
/// Peeks the next item from cache.
/// </summary>
/// <returns>The item that has been read from the cache (without deletion).</returns>
/// <exception cref="System.InvalidOperationException">
/// The cache is empty.
/// </exception>
public T Peek()
{
T item;

if (!this.queue.TryPeek(out item))
throw new InvalidOperationException("The cache is empty.");

return item;
}
}

I hope this helps, have fun ;-)

How do you merge two input streams in Java?

As commented, it's not clear what you mean by merge.

Taking available input "randomly" from either is complicated by InputStream.available not necessarily giving you a useful answer and blocking behaviour of streams. You would need two threads to be reading from the streams and then passing back data through, say, java.io.Piped(In|Out)putStream (although those classes have issues). Alternatively for some types of stream it may be possible to use a different interface, for instance java.nio non-blocking channels.

If you want the full contents of the first input stream followed by the second: new java.io.SequenceInputStream(s1, s2).

Pass the Length of uncertain Stream to WCF Service

Let's make it simple:

  1. If you have the length of a part of the stream on client before you start pushing it to server you can append a structure before the payload and read that structure on server. That is a standard data transfer template. Doing so i.e. appending a header before each payload you give your server a hint on how long the next part is going to be.

  2. If you do not have the length of a part of the stream on client before you start pushing it to server, you are going to have to 'insert' the header inside the payload. That's not very intuitive and not that useful but it does work. I used such a thing when I had my data prepared asynchronously on client and the first buffers were ready before the length was known. In this scenario you are going to need a so called marker i.e. a set of bytes that could not be found anywhere in the stream but before the header.

    This scenario is the toughest of the 3 to implement when done for the first time. Buckle up. In order to do it right you should create an artificial structure of your stream. Such a structure is used for streaming video over network and called Network Abstraction Layer or NAL, read about it. It is also called stream format AnnexB from the h264 standard. You should abstract from the field in which the standard is described, the idea is very versatile.

    In short the payload is divided into parts, so called NAL Units or NALUs, each part has a byte sequence which marks it's start, then goes the type indicator and length of the current NALU, then follows the payload of the NALU. For your purposes you would need to implement NALUs of two types:

    • Main data payload
    • Metadata

    After you imagine how your stream should look like, you have to grip on the idea of "stream encoding". Those are fearsome words but do not worry. You just have to ensure that the byte sequence that is used to mark the start of the NALU is never met inside the payload of the NALU. In order to achieve that you are going to implement some replacement tactic. Browse for samples.

    When you are done thinking this through and before you dive into that, think twice about it. Might be the scenario 3 would fit you easier.

    In the case you are sure you will never have to process a part of the streamed data you can greatly simplify the scenario i.e. totally skip the stream encoding and implement something like this:

Client Stream principal code:

private byte[] mabytPayload;
private int mintCurrentPayloadPosition;

private int? mintTotalPayloadLength;
private bool mblnTotalPayloadLengthSent;

public int Read(byte[] iBuffer, int iStart, int iLength)
{
if (mintTotalPayloadLength.HasValue && !mblnTotalPayloadLengthSent)
{
//1. Write the packet type (0)
//3. Write the total stream length (4 bytes).
...
mblnTotalPayloadLengthSent = true;
}
else
{
//1. Write the packet type (1)
//2. Write the packet length (iLength - 1 for example, 1 byte is for
//the type specification)
//3. Write the payload packet.
...
}
}

public void TotalStreamLengthSet(int iTotalStreamLength)
{
mintTotalPayloadLength = iTotalStreamLength;
}

Server stream reader:

Public void WCFUploadCallback(Stream iUploadStream)
{
while(!endOfStream)
{
//1. Read the packet type.
if (normalPayload)
{
//2.a Read the payload packet length.
//2.b Read the payload.
}
else
{
//2.c Read the total stream length.
}
}
}

  1. In the scenario where your upload is no-stop and the metadata about the stream is ready on client long after the payload, that happens as well, you are going to need two channels i.e. one channel for payload stream and another channel with metadata where you server will answer to the client with another question like 'what did you just started sending me' or 'what have you sent me' and the client will explain itself in the next message.

If you are ready to stick to one of the scenarios, one could give you some further details and/or recommendations.

What would be the fastest way to concatenate three files in C#?

void CopyStream(Stream destination, Stream source) {
int count;
byte[] buffer = new byte[BUFFER_SIZE];
while( (count = source.Read(buffer, 0, buffer.Length)) > 0)
destination.Write(buffer, 0, count);
}

CopyStream(outputFileStream, fileStream1);
CopyStream(outputFileStream, fileStream2);
CopyStream(outputFileStream, fileStream3);

How do I convert struct System.Byte byte[] to a System.IO.Stream object in C#?

The easiest way to convert a byte array to a stream is using the MemoryStream class:

Stream stream = new MemoryStream(byteArray);


Related Topics



Leave a reply



Submit