Skip to content
Merged
Prev Previous commit
Include kafka client id consistently in all kafka protocol encoders
  • Loading branch information
jfallows committed Dec 6, 2023
commit ab92a2fc7ad5b821f3fd4ecb90ad229297c3ae3b
Original file line number Diff line number Diff line change
Expand Up @@ -1270,7 +1270,7 @@ private void doEncodeDescribeRequest(
.apiKey(DESCRIBE_CONFIGS_API_KEY)
.apiVersion(DESCRIBE_CONFIGS_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand Down Expand Up @@ -1307,7 +1307,7 @@ private void doEncodeDescribeRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2606,7 +2606,7 @@ private void doEncodeOffsetsRequest(
.apiKey(OFFSETS_API_KEY)
.apiVersion(OFFSETS_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand Down Expand Up @@ -2645,7 +2645,7 @@ private void doEncodeOffsetsRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

if (KafkaConfiguration.DEBUG)
Expand Down Expand Up @@ -2673,7 +2673,7 @@ private void doEncodeFetchRequest(
.apiKey(FETCH_API_KEY)
.apiVersion(FETCH_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand Down Expand Up @@ -2712,7 +2712,7 @@ private void doEncodeFetchRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

if (KafkaConfiguration.DEBUG)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@ public final class KafkaClientGroupFactory extends KafkaClientSaslHandshaker imp
private final Long2ObjectHashMap<GroupMembership> instanceIds;
private final Object2ObjectHashMap<String, KafkaGroupStream> groupStreams;
private final Map<String, String> configs;
private final String clientId;
private final Duration rebalanceTimeout;
private final String groupMinSessionTimeoutDefault;
private final String groupMaxSessionTimeoutDefault;
Expand All @@ -291,7 +290,6 @@ public KafkaClientGroupFactory(
{
super(config, context);
this.rebalanceTimeout = config.clientGroupRebalanceTimeout();
this.clientId = config.clientId();
this.supplyInstanceId = config.clientInstanceIdSupplier();
this.kafkaTypeId = context.supplyTypeId(KafkaBinding.NAME);
this.proxyTypeId = context.supplyTypeId("proxy");
Expand Down Expand Up @@ -2123,7 +2121,7 @@ private void doEncodeFindCoordinatorRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down Expand Up @@ -2823,7 +2821,7 @@ private void doEncodeDescribeRequest(
.apiKey(DESCRIBE_CONFIGS_API_KEY)
.apiVersion(DESCRIBE_CONFIGS_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand Down Expand Up @@ -2860,7 +2858,7 @@ private void doEncodeDescribeRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down Expand Up @@ -3627,7 +3625,7 @@ private void doEncodeJoinGroupRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down Expand Up @@ -3833,7 +3831,7 @@ private void doEncodeSyncGroupRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress.get());
Expand Down Expand Up @@ -3881,7 +3879,7 @@ private void doEncodeHeartbeatRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down Expand Up @@ -3934,7 +3932,7 @@ private void doEncodeLeaveGroupRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1481,7 +1481,7 @@ private void doEncodeMetaRequest(
.apiKey(METADATA_API_KEY)
.apiVersion(METADATA_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand All @@ -1501,7 +1501,7 @@ private void doEncodeMetaRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1319,7 +1319,7 @@ private void doEncodeOffsetCommitRequest(
.apiKey(OFFSET_COMMIT_API_KEY)
.apiVersion(OFFSET_COMMIT_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand Down Expand Up @@ -1363,7 +1363,7 @@ private void doEncodeOffsetCommitRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1392,7 +1392,7 @@ private void doEncodeOffsetFetchRequest(
.apiKey(OFFSET_FETCH_API_KEY)
.apiVersion(OFFSET_FETCH_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand Down Expand Up @@ -1430,7 +1430,7 @@ private void doEncodeOffsetFetchRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ public final class KafkaClientProduceFactory extends KafkaClientSaslHandshaker i
private static final byte RECORD_ATTRIBUTES_NONE = 0;

private static final String TRANSACTION_ID_NONE = null;
private static final String CLIENT_ID_NONE = null;

private static final int TIMESTAMP_NONE = 0;

Expand Down Expand Up @@ -1834,7 +1833,7 @@ private void doEncodeProduceRequest(
.apiKey(PRODUCE_API_KEY)
.apiVersion(PRODUCE_API_VERSION)
.correlationId(0)
.clientId(CLIENT_ID_NONE)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand Down Expand Up @@ -1902,7 +1901,7 @@ private void doEncodeProduceRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

if (KafkaConfiguration.DEBUG)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaSaslConfig;
import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfiguration;
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaScramMechanism;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.String16FW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.RequestHeaderFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.ResponseHeaderFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.sasl.SaslAuthenticateRequestFW;
Expand Down Expand Up @@ -94,6 +95,7 @@ public abstract class KafkaClientSaslHandshaker
private Matcher serverResponseMatcher;
private byte[] result, ui, prev;

protected final String16FW clientId;
protected final LongUnaryOperator supplyInitialId;
protected final LongUnaryOperator supplyReplyId;
protected final MutableDirectBuffer writeBuffer;
Expand All @@ -102,6 +104,7 @@ public KafkaClientSaslHandshaker(
KafkaConfiguration config,
EngineContext context)
{
this.clientId = new String16FW(config.clientId());
this.supplyInitialId = context::supplyInitialId;
this.supplyReplyId = context::supplyReplyId;
this.writeBuffer = new UnsafeBuffer(new byte[context.writeBuffer().capacity()]);
Expand Down Expand Up @@ -157,7 +160,7 @@ protected final void doEncodeSaslHandshakeRequest(
.apiKey(SASL_HANDSHAKE_API_KEY)
.apiVersion(SASL_HANDSHAKE_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand All @@ -177,7 +180,7 @@ protected final void doEncodeSaslHandshakeRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

if (KafkaConfiguration.DEBUG)
Expand Down Expand Up @@ -212,7 +215,7 @@ private void doEncodeSaslPlainAuthenticateRequest(
.apiKey(SASL_AUTHENTICATE_API_KEY)
.apiVersion(SASL_AUTHENTICATE_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand Down Expand Up @@ -242,7 +245,7 @@ private void doEncodeSaslPlainAuthenticateRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

if (KafkaConfiguration.DEBUG)
Expand Down Expand Up @@ -270,7 +273,7 @@ private void doEncodeSaslScramFirstAuthenticateRequest(
.apiKey(SASL_AUTHENTICATE_API_KEY)
.apiVersion(SASL_AUTHENTICATE_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand Down Expand Up @@ -304,7 +307,7 @@ private void doEncodeSaslScramFirstAuthenticateRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

if (KafkaConfiguration.DEBUG)
Expand Down Expand Up @@ -359,7 +362,7 @@ private void doEncodeSaslScramFinalAuthenticateRequest(
.apiKey(SASL_AUTHENTICATE_API_KEY)
.apiVersion(SASL_AUTHENTICATE_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand All @@ -379,7 +382,7 @@ private void doEncodeSaslScramFinalAuthenticateRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ connect "zilla://streams/net0"

connected

write 17 # size
write 22 # size
17s # sasl.handshake
1s # v1
${newRequestId}
-1s # no client id
5s "zilla" # client id
5s "PLAIN" # mechanism

read 17 # size
Expand All @@ -41,11 +41,11 @@ read 17 # size
1 # mechanisms
5s "PLAIN" # PLAIN

write 32 # size
write 37 # size
36s # sasl.authenticate
1s # v1
${newRequestId}
-1s # no client id
5s "zilla" # client id
18
[0x00] "username" # authentication bytes
[0x00] "password"
Expand All @@ -57,11 +57,11 @@ read 20 # size
-1s # authentication bytes
0L # session lifetime

write 41 # size
write 46 # size
32s # describe configs
0s # v0
${newRequestId}
-1s # no client id
5s "zilla" # client id
1 # resources
[0x02] # topic resource
4s "test" # "test" topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ accepted

connected

read 17 # size
read 22 # size
17s # sasl.handshake
1s # v1
(int:requestId)
-1s # no client id
5s "zilla" # client id
5s "PLAIN" # mechanism

write 17 # size
Expand All @@ -38,11 +38,11 @@ write 17 # size
1 # mechanisms
5s "PLAIN" # PLAIN

read 32 # size
read 37 # size
36s # sasl.authenticate
1s # v1
(int:requestId)
-1s # no client id
5s "zilla" # client id
18
[0x00] "username" # authentication bytes
[0x00] "password"
Expand All @@ -54,11 +54,11 @@ write 20 # size
-1s # authentication bytes
0L # session lifetime

read 41 # size
read 46 # size
32s # describe configs
0s # v0
(int:requestId)
-1s # no client id
5s "zilla" # client id
1 # resources
[0x02] # topic resource
4s "test" # "test" topic
Expand Down
Loading