Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.azure.cosmos.implementation.UserAgentContainer;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.NotImplementedException;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry;
import com.azure.cosmos.implementation.clienttelemetry.TagName;
import com.azure.cosmos.implementation.directconnectivity.rntbd.AsyncRntbdRequestRecord;
import com.azure.cosmos.implementation.directconnectivity.rntbd.OpenConnectionRntbdRequestRecord;
Expand Down Expand Up @@ -984,7 +985,7 @@ private static final class FakeEndpoint implements RntbdEndpoint {
private final RntbdDurableEndpointMetrics durableEndpointMetrics;

private FakeEndpoint(
final Config config, final RntbdRequestTimer timer, final Uri addressUri,
final Config config, final ClientTelemetry clientTelemetry, RntbdRequestTimer timer, final Uri addressUri,
final RntbdResponse... expected
) {

Expand Down Expand Up @@ -1013,7 +1014,7 @@ private FakeEndpoint(
);

RntbdRequestManager requestManager = new RntbdRequestManager(
new RntbdClientChannelHealthChecker(config),
new RntbdClientChannelHealthChecker(config, clientTelemetry),
config,
null,
null);
Expand Down Expand Up @@ -1183,12 +1184,14 @@ public OpenConnectionRntbdRequestRecord openConnection(RntbdRequestArgs openConn
static class Provider implements RntbdEndpoint.Provider {

final Config config;
final ClientTelemetry clientTelemetry;
final RntbdResponse expected;
final RntbdRequestTimer timer;
final IAddressResolver addressResolver;

Provider(RntbdTransportClient.Options options, SslContext sslContext, RntbdResponse expected, IAddressResolver addressResolver) {
this.config = new Config(options, sslContext, LogLevel.WARN);
this.clientTelemetry = new ClientTelemetry(mockDiagnosticsClientContext(), false, null, null, null, null, null, null, null, null, null, null);
this.timer = new RntbdRequestTimer(
config.tcpNetworkRequestTimeoutInNanos(),
config.requestTimerResolutionInNanos());
Expand Down Expand Up @@ -1218,12 +1221,12 @@ public int evictions() {

@Override
public RntbdEndpoint createIfAbsent(URI serviceEndpoint, Uri addressUri, ProactiveOpenConnectionsProcessor proactiveOpenConnectionsProcessor, int minRequiredChannelsForEndpoint, AddressSelector addressSelector) {
return new FakeEndpoint(config, timer, addressUri, expected);
return new FakeEndpoint(config, clientTelemetry, timer, addressUri, expected);
}

@Override
public RntbdEndpoint get(URI physicalAddress) {
return new FakeEndpoint(config, timer, new Uri(physicalAddress.toString()), expected);
return new FakeEndpoint(config, clientTelemetry, timer, new Uri(physicalAddress.toString()), expected);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package com.azure.cosmos.implementation.directconnectivity.rntbd;

import com.azure.cosmos.implementation.ConnectionPolicy;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetryInfo;
import com.azure.cosmos.implementation.cpu.CpuLoadHistory;
import com.azure.cosmos.implementation.cpu.CpuMemoryMonitor;
import com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient;
Expand All @@ -16,6 +18,7 @@
import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.SslContext;
import io.netty.util.concurrent.Future;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -48,7 +51,12 @@ public void isHealthyForWriteHangTests(boolean withFailureReason) throws Interru
sslContextMock,
LogLevel.INFO);

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config);
ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class);
ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class);
Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock);
Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId");

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock);
Channel channelMock = Mockito.mock(Channel.class);
ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class);
RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class);
Expand All @@ -74,6 +82,11 @@ public void isHealthyForWriteHangTests(boolean withFailureReason) throws Interru
assertThat(healthyResult.isSuccess()).isTrue();
assertThat(healthyResult.getNow()).isNotEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue);
assertThat(healthyResult.getNow().contains("health check failed due to non-responding write"));
assertThat(healthyResult.getNow().contains("clientVmId: testClientVmId"));
assertThat(healthyResult.getNow().contains("clientUsedMemory"));
assertThat(healthyResult.getNow().contains("clientAvailableMemory"));
assertThat(healthyResult.getNow().contains("clientSystemCpuLoad"));
assertThat(healthyResult.getNow().contains("clientAvailableProcessors"));
} else {
Future<Boolean> healthyResult = healthChecker.isHealthy(channelMock).sync();
assertThat(healthyResult.isSuccess()).isTrue();
Expand All @@ -90,7 +103,12 @@ public void isHealthyForReadHangTests(boolean withFailureReason) throws Interrup
sslContextMock,
LogLevel.INFO);

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config);
ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class);
ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class);
Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock);
Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId");

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock);
Channel channelMock = Mockito.mock(Channel.class);
ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class);
RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class);
Expand All @@ -115,6 +133,11 @@ public void isHealthyForReadHangTests(boolean withFailureReason) throws Interrup
assertThat(healthyResult.isSuccess()).isTrue();
assertThat(healthyResult.getNow()).isNotEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue);
assertThat(healthyResult.getNow().contains("health check failed due to non-responding read"));
assertThat(healthyResult.getNow().contains("clientVmId: testClientVmId"));
assertThat(healthyResult.getNow().contains("clientUsedMemory"));
assertThat(healthyResult.getNow().contains("clientAvailableMemory"));
assertThat(healthyResult.getNow().contains("clientSystemCpuLoad"));
assertThat(healthyResult.getNow().contains("clientAvailableProcessors"));
} else {
Future<Boolean> healthyResult = healthChecker.isHealthy(channelMock).sync();
assertThat(healthyResult.isSuccess()).isTrue();
Expand All @@ -131,7 +154,12 @@ public void transitTimeoutTimeLimitTests(boolean withFailureReason) throws Inter
sslContextMock,
LogLevel.INFO);

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config);
ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class);
ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class);
Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock);
Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId");

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock);
Channel channelMock = Mockito.mock(Channel.class);
ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class);
RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class);
Expand Down Expand Up @@ -174,7 +202,12 @@ public void transitTimeoutHighFrequencyTests(boolean withFailureReason) throws I
sslContextMock,
LogLevel.INFO);

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config);
ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class);
ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class);
Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock);
Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId");

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock);
Channel channelMock = Mockito.mock(Channel.class);
ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class);
RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class);
Expand Down Expand Up @@ -218,7 +251,12 @@ public void transitTimeoutOnWriteTests(boolean withFailureReason) throws Interru
sslContextMock,
LogLevel.INFO);

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config);
ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class);
ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class);
Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock);
Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId");

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock);
Channel channelMock = Mockito.mock(Channel.class);
ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class);
RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class);
Expand Down Expand Up @@ -264,7 +302,12 @@ public void transitTimeoutOnWrite_HighCPULoadTests(boolean withFailureReason) th
sslContextMock,
LogLevel.INFO);

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config);
ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class);
ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class);
Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock);
Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId");

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock);
Channel channelMock = Mockito.mock(Channel.class);
ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class);
RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class);
Expand Down Expand Up @@ -322,7 +365,12 @@ public void cancellationPronenessOfChannel_Test(boolean withFailureReason) throw
sslContextMock,
LogLevel.INFO);

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config);
ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class);
ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class);
Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock);
Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId");

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock);
Channel channelMock = Mockito.mock(Channel.class);
ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class);
RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class);
Expand Down Expand Up @@ -375,7 +423,12 @@ public void cancellationPronenessOfChannelWithHighCpuLoad_Test(boolean withFailu
sslContextMock,
LogLevel.INFO);

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config);
ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class);
ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class);
Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock);
Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId");

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock);
Channel channelMock = Mockito.mock(Channel.class);
ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class);
RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetryInfo;
import com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient;
import com.azure.cosmos.implementation.directconnectivity.Uri;
import io.netty.channel.Channel;
Expand Down Expand Up @@ -40,7 +42,13 @@ public void transitTimeoutTimestampTests() throws URISyntaxException {
new RntbdTransportClient.Options.Builder(ConnectionPolicy.getDefaultPolicy()).build(),
sslContextMock,
LogLevel.INFO);
RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config);

ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class);
ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class);
Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock);
Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId");

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock);

RntbdConnectionStateListener connectionStateListener = Mockito.mock(RntbdConnectionStateListener.class);

Expand Down Expand Up @@ -119,7 +127,13 @@ public void rntbdContextResponseTests() {
new RntbdTransportClient.Options.Builder(ConnectionPolicy.getDefaultPolicy()).build(),
sslContextMock,
LogLevel.INFO);
RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config);

ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class);
ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class);
Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock);
Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId");

RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock);

RntbdConnectionStateListener connectionStateListener = Mockito.mock(RntbdConnectionStateListener.class);

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#### Bugs Fixed

#### Other Changes
* Added client vmId info to Rntbd health check logs - See [43079](https://github.com/Azure/azure-sdk-for-java/pull/43079)

### 4.65.0 (2024-11-19)

Expand Down
Loading
Loading