Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using CommunityToolkit.HighPerformance.Streams;
using System;
using System.Buffers;
using System.IO;
using System.Runtime.CompilerServices;

namespace CommunityToolkit.HighPerformance;

/// <summary>
/// Helpers for working with the <see cref="ReadOnlySequence{T}"/> type.
/// </summary>
public static class ReadOnlySequenceExtensions
{
/// <summary>
/// Returns a <see cref="Stream"/> wrapping the contents of the given <see cref="Memory{T}"/> of <see cref="byte"/> instance.
/// </summary>
/// <param name="sequence">The input <see cref="ReadOnlySequence{T}"/> of <see cref="byte"/> instance.</param>
/// <returns>A <see cref="Stream"/> wrapping the data within <paramref name="sequence"/>.</returns>
/// <remarks>
/// Since this method only receives a <see cref="ReadOnlySequence{T}"/> instance, which does not track
/// the lifetime of its underlying buffer, it is responsibility of the caller to manage that.
/// In particular, the caller must ensure that the target buffer is not disposed as long
/// as the returned <see cref="Stream"/> is in use, to avoid unexpected issues.
/// </remarks>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static Stream AsStream(this ReadOnlySequence<byte> sequence)
{
return ReadOnlySequenceStream.Create(sequence);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Runtime.CompilerServices;

Expand All @@ -24,14 +25,28 @@ public static void ValidatePosition(long position, int length)
}
}

/// <summary>
/// Validates the <see cref="Stream.Position"/> argument (it needs to be in the [0, length]) range.
/// </summary>
/// <param name="position">The new <see cref="Stream.Position"/> value being set.</param>
/// <param name="length">The maximum length of the target <see cref="Stream"/>.</param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void ValidatePosition(long position, long length)
{
if ((ulong)position > (ulong)length)
{
ThrowArgumentOutOfRangeExceptionForPosition();
}
}

/// <summary>
/// Validates the <see cref="Stream.Read(byte[],int,int)"/> or <see cref="Stream.Write(byte[],int,int)"/> arguments.
/// </summary>
/// <param name="buffer">The target array.</param>
/// <param name="offset">The offset within <paramref name="buffer"/>.</param>
/// <param name="count">The number of elements to process within <paramref name="buffer"/>.</param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void ValidateBuffer(byte[]? buffer, int offset, int count)
public static void ValidateBuffer([NotNull] byte[]? buffer, int offset, int count)
{
if (buffer is null)
{
Expand Down
304 changes: 304 additions & 0 deletions src/CommunityToolkit.HighPerformance/Streams/ReadOnlySequenceStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Buffers;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

namespace CommunityToolkit.HighPerformance.Streams;

/// <summary>
/// A <see cref="Stream"/> implementation wrapping a <see cref="ReadOnlySequence{T}"/> of <see cref="byte"/> instance.
/// </summary>
internal sealed partial class ReadOnlySequenceStream : Stream
{
/// <summary>
/// The <see cref="ReadOnlySequence{T}"/> instance currently in use.
/// </summary>
private readonly ReadOnlySequence<byte> source;

/// <summary>
/// The current position within <see cref="source"/>.
/// </summary>
private long position;

/// <summary>
/// Indicates whether or not the current instance has been disposed
/// </summary>
private bool disposed;

/// <summary>
/// Initializes a new instance of the <see cref="ReadOnlySequenceStream"/> class with the specified <see cref="ReadOnlySequence{T}"/> source.
/// </summary>
/// <param name="source">The <see cref="ReadOnlySequence{T}"/> source.</param>
public ReadOnlySequenceStream(ReadOnlySequence<byte> source)
{
this.source = source;
}

/// <inheritdoc/>
public sealed override bool CanRead
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
get => !this.disposed;
}

/// <inheritdoc/>
public sealed override bool CanSeek
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
get => !this.disposed;
}

/// <inheritdoc/>
public sealed override bool CanWrite
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
get => false;
}

/// <inheritdoc/>
public sealed override long Length
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
get
{
MemoryStream.ValidateDisposed(this.disposed);

return this.source.Length;
}
}

/// <inheritdoc/>
public sealed override long Position
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
get
{
MemoryStream.ValidateDisposed(this.disposed);

return this.position;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
set
{
MemoryStream.ValidateDisposed(this.disposed);
MemoryStream.ValidatePosition(value, this.source.Length);

this.position = value;
}
}

/// <summary>
/// Creates a new <see cref="Stream"/> from the input <see cref="ReadOnlySequence{T}"/> of <see cref="byte"/> instance.
/// </summary>
/// <param name="sequence">The input <see cref="ReadOnlySequence{T}"/> instance.</param>
/// <returns>A <see cref="Stream"/> wrapping the underlying data for <paramref name="sequence"/>.</returns>
public static Stream Create(ReadOnlySequence<byte> sequence)
{
return new ReadOnlySequenceStream(sequence);
}

/// <inheritdoc/>
public sealed override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled(cancellationToken);
}

try
{
MemoryStream.ValidateDisposed(this.disposed);

if (this.position >= this.source.Length)
{
return Task.CompletedTask;
}

if (this.source.IsSingleSegment)
{
ReadOnlyMemory<byte> buffer = this.source.First.Slice(unchecked((int)this.position));

this.position = this.source.Length;

return destination.WriteAsync(buffer, cancellationToken).AsTask();
}

async Task CoreCopyToAsync(Stream destination, CancellationToken cancellationToken)
{
ReadOnlySequence<byte> sequence = this.source.Slice(this.position);

this.position = this.source.Length;

foreach (ReadOnlyMemory<byte> segment in sequence)
{
await destination.WriteAsync(segment, cancellationToken).ConfigureAwait(false);
}
}

return CoreCopyToAsync(destination, cancellationToken);
}
catch (OperationCanceledException e)
{
return Task.FromCanceled(e.CancellationToken);
}
catch (Exception e)
{
return Task.FromException(e);
}
}

/// <inheritdoc/>
public sealed override void Flush()
{
}

/// <inheritdoc/>
public sealed override Task FlushAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled(cancellationToken);
}

return Task.CompletedTask;
}

/// <inheritdoc/>
public sealed override Task<int> ReadAsync(byte[]? buffer, int offset, int count, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled<int>(cancellationToken);
}

try
{
int result = Read(buffer, offset, count);

return Task.FromResult(result);
}
catch (OperationCanceledException e)
{
return Task.FromCanceled<int>(e.CancellationToken);
}
catch (Exception e)
{
return Task.FromException<int>(e);
}
}

public sealed override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
throw MemoryStream.GetNotSupportedException();
}

/// <inheritdoc/>
public sealed override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
{
throw MemoryStream.GetNotSupportedException();
}

/// <inheritdoc/>
public sealed override long Seek(long offset, SeekOrigin origin)
{
MemoryStream.ValidateDisposed(this.disposed);

long index = origin switch
{
SeekOrigin.Begin => offset,
SeekOrigin.Current => this.position + offset,
SeekOrigin.End => this.source.Length + offset,
_ => MemoryStream.ThrowArgumentExceptionForSeekOrigin()
};

MemoryStream.ValidatePosition(index, this.source.Length);

this.position = index;

return index;
}

/// <inheritdoc/>
public sealed override void SetLength(long value)
{
throw MemoryStream.GetNotSupportedException();
}

/// <inheritdoc/>
public sealed override int Read(byte[]? buffer, int offset, int count)
{
MemoryStream.ValidateDisposed(this.disposed);
MemoryStream.ValidateBuffer(buffer, offset, count);

if (this.position >= this.source.Length)
{
return 0;
}

ReadOnlySequence<byte> sequence = this.source.Slice(this.position);
Span<byte> destination = buffer.AsSpan(offset, count);
int bytesCopied = 0;

foreach (ReadOnlyMemory<byte> segment in sequence)
{
int bytesToCopy = Math.Min(segment.Length, destination.Length);

segment.Span.Slice(0, bytesToCopy).CopyTo(destination);

destination = destination.Slice(bytesToCopy);

bytesCopied += bytesToCopy;

this.position += bytesToCopy;

if (destination.Length == 0)
{
break;
}
}

return bytesCopied;
}

/// <inheritdoc/>
public sealed override int ReadByte()
{
MemoryStream.ValidateDisposed(this.disposed);

if (this.position == this.source.Length)
{
return -1;
}

ReadOnlySequence<byte> sequence = this.source.Slice(this.position);

this.position++;

return sequence.First.Span[0];
}

/// <inheritdoc/>
public sealed override void Write(byte[]? buffer, int offset, int count)
{
throw MemoryStream.GetNotSupportedException();
}

/// <inheritdoc/>
public sealed override void WriteByte(byte value)
{
throw MemoryStream.GetNotSupportedException();
}

/// <inheritdoc/>
protected override void Dispose(bool disposing)
{
this.disposed = true;
}
}
Loading