Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
2cdf16b
Code changes to support binary encoding for point operations.
kundadebdatta Aug 23, 2024
387005b
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Sep 18, 2024
c0e5952
Code changes to introduce cosmos buffered stream wrapper.
kundadebdatta Sep 25, 2024
07eff74
Code changes to remove unnecessary using statement.
kundadebdatta Sep 25, 2024
265a7d3
Code changes to remove the pooling logic.
kundadebdatta Sep 25, 2024
bcdc75e
Code changes to port fixes into newtonsoft reader and writer to addre…
kundadebdatta Sep 25, 2024
4e35d0f
Code changes to set inner stream on disposal of buffered stream.
kundadebdatta Sep 26, 2024
4ac933f
Minor cosmetic code changes.
kundadebdatta Sep 26, 2024
8033075
Code changes to use clonable stream instead of buffered stream.
kundadebdatta Sep 27, 2024
4359dc8
Revert "Code changes to use clonable stream instead of buffered stream."
kundadebdatta Oct 1, 2024
88d5431
Code changes to use clonable stream from request handler when the out…
kundadebdatta Oct 2, 2024
abbcb11
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 2, 2024
16583c3
Code changes to address review comments.
kundadebdatta Oct 2, 2024
b3da27d
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 2, 2024
7fc7407
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 3, 2024
c275cf1
Code changes to fix test failures.
kundadebdatta Oct 3, 2024
da1f135
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 7, 2024
fbf9ffc
Code changes to address review comments.
kundadebdatta Oct 7, 2024
96c8581
Adding more stream tests.
kundadebdatta Oct 7, 2024
2baa15c
Code changes to fix batch item emulator tests.
kundadebdatta Oct 7, 2024
a97101a
Code changes to use cloneable stream in buffered stream.
kundadebdatta Oct 9, 2024
6981e01
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 9, 2024
9ed19b6
Code changes to fix build failures.
kundadebdatta Oct 9, 2024
dbc9596
Code changes to match ContainerCore.Items to master
kundadebdatta Oct 9, 2024
3929404
Code changes to fix build failures.
kundadebdatta Oct 9, 2024
b8151e0
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 9, 2024
cc67f67
Code changes to address review comments.
kundadebdatta Oct 10, 2024
de8aab6
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 10, 2024
d72dbe7
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 10, 2024
c932a1f
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 10, 2024
8855ebd
Code changes to fix container creation using binary encoding.
kundadebdatta Oct 16, 2024
7009570
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 18, 2024
f67ae93
Code changes to add internal types in a common place.
kundadebdatta Oct 18, 2024
b2b6af7
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 22, 2024
aea8a3a
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 23, 2024
3245664
Code changes to remove ContentSerializationFormat. Addressing minor r…
kundadebdatta Oct 23, 2024
cce11dd
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Code changes to use cloneable stream in buffered stream.
  • Loading branch information
kundadebdatta committed Oct 9, 2024
commit a97101a7c7f5851cef46955ff4c0342d8818b390
6 changes: 2 additions & 4 deletions Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,9 @@ public override async Task<ResponseMessage> SendAsync(
if (ConfigurationManager.IsBinaryEncodingEnabled()
&& RequestInvokerHandler.IsPointOperationSupportedForBinaryEncoding(request)
&& response.Content != null
&& (response.Content is not MemoryStream
|| response.Content is not CloneableStream))
&& response.Content is not CloneableStream)
{
CloneableStream clonableStream = await StreamExtension.AsClonableStreamAsync(response.Content, default);
response.Content = clonableStream;
response.Content = await StreamExtension.AsClonableStreamAsync(response.Content, default);
}

return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ private async Task<ResponseMessage> ProcessItemStreamAsync(
OperationType operationType,
ItemRequestOptions requestOptions,
ITrace trace,
JsonSerializationFormat? targetResponseSerializationFormat,
JsonSerializationFormat targetResponseSerializationFormat,
CancellationToken cancellationToken)
{
if (trace == null)
Expand All @@ -924,14 +924,9 @@ private async Task<ResponseMessage> ProcessItemStreamAsync(
string resourceUri = this.GetResourceUri(requestOptions, operationType, itemId);

// Convert Text to Binary Stream.
Stream convertedStream = await CosmosSerializationUtil.TrySerializeStreamToTargetFormatAsync(
streamPayload = await CosmosSerializationUtil.TrySerializeStreamToTargetFormatAsync(
targetSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
inputStream: streamPayload);

if (convertedStream != null)
{
streamPayload = convertedStream;
}
inputStream: streamPayload == null ? null : await StreamExtension.AsClonableStreamAsync(streamPayload));

ResponseMessage responseMessage = await this.ClientContext.ProcessResourceOperationStreamAsync(
resourceUri: resourceUri,
Expand All @@ -947,16 +942,12 @@ private async Task<ResponseMessage> ProcessItemStreamAsync(
cancellationToken: cancellationToken);

// Convert Binary Stream to Text.
if (requestOptions == null || !requestOptions.EnableBinaryResponseOnPointOperations)
if ((requestOptions == null || !requestOptions.EnableBinaryResponseOnPointOperations)
&& responseMessage?.Content is CloneableStream outputCloneableStream)
{
Stream outputResponseStream = await CosmosSerializationUtil.TrySerializeStreamToTargetFormatAsync(
responseMessage.Content = await CosmosSerializationUtil.TrySerializeStreamToTargetFormatAsync(
targetSerializationFormat: targetResponseSerializationFormat,
inputStream: responseMessage?.Content);

if (outputResponseStream != null)
{
responseMessage.Content = outputResponseStream;
}
inputStream: outputCloneableStream);
}

return responseMessage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace Microsoft.Azure.Cosmos.Serializer
{
using System;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -20,7 +19,7 @@ internal class CosmosBufferedStreamWrapper : Stream
/// <summary>
/// The inner stream being wrapped.
/// </summary>
private readonly Stream innerStream;
private readonly CloneableStream innerStream;

/// <summary>
/// Indicates whether the inner stream should be disposed.
Expand All @@ -43,13 +42,9 @@ internal class CosmosBufferedStreamWrapper : Stream
/// <param name="inputStream">The input stream to wrap.</param>
/// <param name="shouldDisposeInnerStream">Indicates whether the inner stream should be disposed.</param>
public CosmosBufferedStreamWrapper(
Stream inputStream,
CloneableStream inputStream,
bool shouldDisposeInnerStream)
{
Debug.Assert(
inputStream is CloneableStream || inputStream is MemoryStream,
"The inner stream is neither a memory stream nor a cloneable stream.");

this.innerStream = inputStream ?? throw new ArgumentNullException(nameof(inputStream));
this.shouldDisposeInnerStream = shouldDisposeInnerStream;
}
Expand Down Expand Up @@ -99,33 +94,7 @@ public override int Read(byte[] buffer, int offset, int count)
throw new ArgumentNullException(nameof(buffer));
}

if (offset < 0
|| count < 0
|| (buffer.Length - offset) < count
|| this.innerStream.Position == this.innerStream.Length)
{
return 0;
}

int bytesRead = 0;
if (this.hasReadFirstByte
&& this.innerStream.Position == 1
&& offset == 0
&& count > 0)
{
buffer[0] = this.firstByteBuffer[0];
bytesRead = 1;
offset++;
count--;
}

if (count > 0)
{
int innerBytesRead = this.innerStream.Read(buffer, offset, count);
bytesRead += innerBytesRead;
}

return bytesRead;
return this.innerStream.Read(buffer, offset, count);
}

/// <inheritdoc />
Expand All @@ -146,10 +115,7 @@ protected override void Dispose(bool disposing)
}
else
{
if (this.innerStream.CanSeek)
{
this.innerStream.Position = 0;
}
this.ResetStreamPosition();
}
}

Expand All @@ -173,12 +139,6 @@ public byte[] ReadAll()
totalBytes += count;
}

if (this.hasReadFirstByte)
{
bytes[0] = this.firstByteBuffer[0];
totalBytes += 1;
}

return totalBytes > 0 ? bytes : default;
}

Expand All @@ -199,12 +159,6 @@ public async Task<byte[]> ReadAllAsync(CancellationToken cancellationToken = def
totalBytes += count;
}

if (this.hasReadFirstByte)
{
bytes[0] = this.firstByteBuffer[0];
totalBytes += 1;
}

return totalBytes > 0 ? bytes : default;
}

Expand All @@ -216,7 +170,7 @@ public async Task<byte[]> ReadAllAsync(CancellationToken cancellationToken = def
/// </returns>
public JsonSerializationFormat GetJsonSerializationFormat()
{
this.ReadFirstByte();
this.ReadFirstByteAndResetStream();
if (this.firstByteBuffer[0] == (byte)JsonSerializationFormat.Binary)
{
return JsonSerializationFormat.Binary;
Expand All @@ -230,17 +184,29 @@ public JsonSerializationFormat GetJsonSerializationFormat()
}

/// <summary>
/// Reads the first byte from the inner stream and stores it in the buffer.
/// Reads the first byte from the inner stream and stores it in the buffer. It also resets the stream position to zero.
/// </summary>
/// <remarks>
/// This method sets the <see cref="hasReadFirstByte"/> flag to true if the first byte is successfully read.
/// </remarks>
private void ReadFirstByte()
private void ReadFirstByteAndResetStream()
{
if (!this.hasReadFirstByte
&& this.innerStream.Read(this.firstByteBuffer, 0, 1) > 0)
{
this.hasReadFirstByte = true;
this.ResetStreamPosition();
}
}

/// <summary>
/// Resets the inner stream position to zero.
/// </summary>
private void ResetStreamPosition()
{
if (this.innerStream.CanSeek)
{
this.innerStream.Position = 0;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace Microsoft.Azure.Cosmos
using System.IO;
using System.Text;
using Microsoft.Azure.Cosmos.Serializer;
using Microsoft.Azure.Documents;
using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;

Expand Down Expand Up @@ -88,7 +89,7 @@ internal CosmosJsonDotNetSerializer(
/// <returns>The object representing the deserialized stream</returns>
public override T FromStream<T>(Stream stream)
{
using (CosmosBufferedStreamWrapper bufferedStream = new (stream, shouldDisposeInnerStream: true))
using (stream)
{
if (typeof(Stream).IsAssignableFrom(typeof(T)))
{
Expand All @@ -97,25 +98,29 @@ public override T FromStream<T>(Stream stream)

JsonSerializer jsonSerializer = this.GetSerializer();

if (bufferedStream.GetJsonSerializationFormat() == Json.JsonSerializationFormat.Binary)
if (stream is CloneableStream cloneableStream)
{
byte[] content = bufferedStream.ReadAll();
using (CosmosBufferedStreamWrapper bufferedStream = new (cloneableStream, shouldDisposeInnerStream: false))
{
if (bufferedStream.GetJsonSerializationFormat() == Json.JsonSerializationFormat.Binary)
{
byte[] content = bufferedStream.ReadAll();

using Json.Interop.CosmosDBToNewtonsoftReader reader = new (
jsonReader: Json.JsonReader.Create(
jsonSerializationFormat: Json.JsonSerializationFormat.Binary,
buffer: content));
using Json.Interop.CosmosDBToNewtonsoftReader reader = new (
jsonReader: Json.JsonReader.Create(
jsonSerializationFormat: Json.JsonSerializationFormat.Binary,
buffer: content));

return jsonSerializer.Deserialize<T>(reader);
return jsonSerializer.Deserialize<T>(reader);
}
}
}
else

using (StreamReader sr = new (stream))
{
using (StreamReader sr = new (bufferedStream))
using (JsonTextReader jsonTextReader = new (sr))
{
using (JsonTextReader jsonTextReader = new (sr))
{
return jsonSerializer.Deserialize<T>(jsonTextReader);
}
return jsonSerializer.Deserialize<T>(jsonTextReader);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace Microsoft.Azure.Cosmos
using Microsoft.Azure.Cosmos.Json;
using Microsoft.Azure.Cosmos.Json.Interop;
using Microsoft.Azure.Cosmos.Serializer;
using Microsoft.Azure.Documents;
using Newtonsoft.Json.Serialization;

internal static class CosmosSerializationUtil
Expand Down Expand Up @@ -45,10 +46,10 @@ internal static string GetStringWithPropertyNamingPolicy(CosmosPropertyNamingPol
/// <param name="inputStream">The input stream containing the data to be serialized.</param>
/// <returns>Returns true if the input stream is successfully serialized to the target format, otherwise false.</returns>
internal static async Task<Stream> TrySerializeStreamToTargetFormatAsync(
JsonSerializationFormat? targetSerializationFormat,
Stream inputStream)
JsonSerializationFormat targetSerializationFormat,
CloneableStream inputStream)
{
if (targetSerializationFormat == null || inputStream == null)
if (inputStream == null)
{
return null;
}
Expand All @@ -68,12 +69,12 @@ internal static async Task<Stream> TrySerializeStreamToTargetFormatAsync(
{
return CosmosSerializationUtil.ConvertToStreamUsingJsonSerializationFormat(
targetContent,
targetSerializationFormat.Value);
targetSerializationFormat);
}
}
}

return null;
return inputStream;
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,24 +50,30 @@ public override T FromStream<T>(Stream stream)
return default;
}

using (CosmosBufferedStreamWrapper bufferedStream = new (stream, shouldDisposeInnerStream: true))
using (stream)
{
if (bufferedStream.GetJsonSerializationFormat() == Json.JsonSerializationFormat.Binary)
if (stream is Documents.CloneableStream cloneableStream)
{
byte[] content = bufferedStream.ReadAll();

if (CosmosObject.TryCreateFromBuffer(content, out CosmosObject cosmosObject))
{
return System.Text.Json.JsonSerializer.Deserialize<T>(cosmosObject.ToString(), this.jsonSerializerOptions);
}
else
using (CosmosBufferedStreamWrapper bufferedStream = new (cloneableStream, shouldDisposeInnerStream: false))
{
using Stream textStream = CosmosSerializationUtil.ConvertToStreamUsingJsonSerializationFormat(content, JsonSerializationFormat.Text);
return this.DeserializeStream<T>(textStream);
if (bufferedStream.GetJsonSerializationFormat() == JsonSerializationFormat.Binary)
{
byte[] content = bufferedStream.ReadAll();

if (CosmosObject.TryCreateFromBuffer(content, out CosmosObject cosmosObject))
{
return System.Text.Json.JsonSerializer.Deserialize<T>(cosmosObject.ToString(), this.jsonSerializerOptions);
}
else
{
using Stream textStream = CosmosSerializationUtil.ConvertToStreamUsingJsonSerializationFormat(content, JsonSerializationFormat.Text);
return this.DeserializeStream<T>(textStream);
}
}
}
}

return this.DeserializeStream<T>(bufferedStream);
return this.DeserializeStream<T>(stream);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Microsoft.Azure.Cosmos.Tests
using System.IO;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Json;
using Microsoft.Azure.Documents;
using Microsoft.VisualStudio.TestTools.UnitTesting;

/// <summary>
Expand Down Expand Up @@ -114,7 +115,8 @@ public async Task TrySerializeStreamToTargetFormat_Success(string expected, stri
: CosmosSerializerUtils.ConvertInputToBinaryStream(json, Newtonsoft.Json.JsonSerializer.Create());

// Act
Stream outputStream = await CosmosSerializationUtil.TrySerializeStreamToTargetFormatAsync(targetFormat, inputStream);
CloneableStream cloneableStream = await StreamExtension.AsClonableStreamAsync(inputStream);
Stream outputStream = await CosmosSerializationUtil.TrySerializeStreamToTargetFormatAsync(targetFormat, cloneableStream);

// Assert
Assert.IsNotNull(outputStream);
Expand All @@ -132,7 +134,8 @@ public async Task TrySerializeStreamToTargetFormat_Failure()
JsonSerializationFormat targetFormat = JsonSerializationFormat.Text;

// Act
Stream outputStream = await CosmosSerializationUtil.TrySerializeStreamToTargetFormatAsync(targetFormat, inputStream);
CloneableStream cloneableStream = await StreamExtension.AsClonableStreamAsync(inputStream);
Stream outputStream = await CosmosSerializationUtil.TrySerializeStreamToTargetFormatAsync(targetFormat, cloneableStream);

// Assert
Assert.IsNull(outputStream);
Expand Down