Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP
  • Loading branch information
akrambek committed Mar 12, 2024
commit 08be1c47f8bd4933803457602b8b11774b2eae95
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class KafkaConfiguration extends Configuration
static
{
final ConfigurationDef config = new ConfigurationDef("zilla.binding.kafka");
KAFKA_CLIENT_ID = config.property("client.id", "zilla");
KAFKA_CLIENT_ID = config.property("client.id");
KAFKA_CLIENT_INSTANCE_ID = config.property(InstanceIdSupplier.class, "client.instance.id",
KafkaConfiguration::decodeInstanceId, KafkaConfiguration::defaultInstanceId);
KAFKA_CLIENT_MAX_IDLE_MILLIS = config.property("client.max.idle.ms", 1 * 60 * 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import java.util.function.Function;
import java.util.function.LongFunction;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;

import org.agrona.DirectBuffer;
Expand Down Expand Up @@ -76,6 +75,8 @@ public KafkaClientFactory(
final Signaler signaler = config.clientConnectionPool() ? connectionPool.signaler() :
context.signaler();

final Function<Integer, Signaler> signalSupplier = d -> d == 0 ? signaler : context.signaler();

final KafkaClientMetaFactory clientMetaFactory = new KafkaClientMetaFactory(
config, context, bindings::get, accountant::supplyDebitor, supplyClientRoute,
signaler, streamFactory, resolveSasl);
Expand All @@ -84,7 +85,7 @@ public KafkaClientFactory(
config, context, bindings::get, accountant::supplyDebitor, signaler, streamFactory, resolveSasl);

final KafkaClientGroupFactory clientGroupFactory = new KafkaClientGroupFactory(
config, context, bindings::get, accountant::supplyDebitor, signaler, streamFactorySupplier,
config, context, bindings::get, accountant::supplyDebitor, signalSupplier, streamFactorySupplier,
resolveSaslSupplier, supplyClientRoute);

final KafkaClientFetchFactory clientFetchFactory = new KafkaClientFetchFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,9 @@ public final class KafkaClientGroupFactory extends KafkaClientSaslHandshaker imp
private final MutableDirectBuffer userdataBuffer;
private final BufferPool decodePool;
private final BufferPool encodePool;
private final Signaler signaler;
private final Function<Integer, BindingHandler> streamFactory;
private final Function<Integer, UnaryOperator<KafkaSaslConfig>> resolveSasl;
private final Function<Integer, Signaler> signalerSupplier;
private final Function<Integer, BindingHandler> streamFactorySupplier;
private final Function<Integer, UnaryOperator<KafkaSaslConfig>> resolveSaslSupplier;
private final LongFunction<KafkaBindingConfig> supplyBinding;
private final Supplier<String> supplyInstanceId;
private final LongFunction<BudgetDebitor> supplyDebitor;
Expand All @@ -314,9 +314,9 @@ public KafkaClientGroupFactory(
EngineContext context,
LongFunction<KafkaBindingConfig> supplyBinding,
LongFunction<BudgetDebitor> supplyDebitor,
Signaler signaler,
Function<Integer, BindingHandler> streamFactory,
Function<Integer, UnaryOperator<KafkaSaslConfig>> resolveSasl,
Function<Integer, Signaler> signalerSupplier,
Function<Integer, BindingHandler> streamFactorySupplier,
Function<Integer, UnaryOperator<KafkaSaslConfig>> resolveSaslSupplier,
LongFunction<KafkaClientRoute> supplyClientRoute)
{
super(config, context);
Expand All @@ -331,9 +331,9 @@ public KafkaClientGroupFactory(
this.encodePool = context.bufferPool();
this.supplyBinding = supplyBinding;
this.supplyDebitor = supplyDebitor;
this.signaler = signaler;
this.streamFactory = streamFactory;
this.resolveSasl = resolveSasl;
this.signalerSupplier = signalerSupplier;
this.streamFactorySupplier = streamFactorySupplier;
this.resolveSaslSupplier = resolveSaslSupplier;
this.instanceIds = new Long2ObjectHashMap<>();
this.groupStreams = new Object2ObjectHashMap<>();
this.configs = new LinkedHashMap<>();
Expand Down Expand Up @@ -381,7 +381,7 @@ public MessageConsumer newStream(
{
final long resolvedId = resolved.id;
final List<KafkaServerConfig> servers = binding.servers();
final KafkaSaslConfig sasl = resolveSasl.apply(0).apply(binding.sasl());
final KafkaSaslConfig sasl = resolveSaslSupplier.apply(0).apply(binding.sasl());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please define a constant private static final int NO_REBALANCE_DELAY = 0 and use it here instead of 0.


final GroupMembership groupMembership = instanceIds.get(binding.id);
assert groupMembership != null;
Expand Down Expand Up @@ -458,7 +458,7 @@ private MessageConsumer newStream(
.build();

final MessageConsumer receiver =
streamFactory.apply(initialDelay)
streamFactorySupplier.apply(initialDelay)
.newStream(begin.typeId(), begin.buffer(), begin.offset(), begin.sizeof(), sender);

receiver.accept(begin.typeId(), begin.buffer(), begin.offset(), begin.sizeof());
Expand Down Expand Up @@ -1825,6 +1825,7 @@ private final class ClusterClient extends KafkaGroupClient
private final LongLongConsumer encodeSaslAuthenticateRequest = this::doEncodeSaslAuthenticateRequest;
private final LongLongConsumer encodeFindCoordinatorRequest = this::doEncodeFindCoordinatorRequest;
private final KafkaGroupStream delegate;
private final Signaler signaler;

private MessageConsumer network;

Expand Down Expand Up @@ -1870,6 +1871,7 @@ private final class ClusterClient extends KafkaGroupClient

this.encoder = sasl != null ? encodeSaslHandshakeRequest : encodeFindCoordinatorRequest;
this.delegate = delegate;
this.signaler = signalerSupplier.apply(GROUP_REBALANCE_INITIAL_DELAY_MS);
this.decoder = decodeClusterReject;
}

Expand Down Expand Up @@ -2571,6 +2573,7 @@ private final class DescribeClient extends KafkaGroupClient
private MessageConsumer network;
private final Map<String, String> configs;
private final KafkaGroupStream delegate;
private final Signaler signaler;

private int state;
private long authorization;
Expand Down Expand Up @@ -2612,6 +2615,7 @@ private final class DescribeClient extends KafkaGroupClient
super(server, sasl, originId, routedId);
this.configs = new LinkedHashMap<>();
this.delegate = delegate;
this.signaler = signalerSupplier.apply(GROUP_REBALANCE_INITIAL_DELAY_MS);

this.encoder = sasl != null ? encodeSaslHandshakeRequest : encodeDescribeRequest;
this.decoder = decodeReject;
Expand Down Expand Up @@ -3341,6 +3345,7 @@ private final class JoinGroupClient extends KafkaGroupClient
private final LongLongConsumer encodeJoinGroupRequest = this::doEncodeJoinGroupRequest;

private final KafkaGroupStream delegate;
private final Signaler signaler;
private final List<MemberProtocol> members;
private final int initialDelay;

Expand Down Expand Up @@ -3388,6 +3393,7 @@ private final class JoinGroupClient extends KafkaGroupClient

this.initialDelay = initialDelay;
this.delegate = delegate;
this.signaler = signalerSupplier.apply(initialDelay);
this.decoder = decodeJoinGroupReject;
this.members = new ArrayList<>();
this.encoder = sasl != null ? encodeSaslHandshakeRequest : encodeJoinGroupRequest;
Expand Down Expand Up @@ -4263,6 +4269,7 @@ private final class CoordinatorClient extends KafkaGroupClient

private final JoinGroupClient joinGroup;
private final KafkaGroupStream delegate;
private final Signaler signaler;
private final ArrayDeque<LongLongConsumer> encoders;

private List<MemberProtocol> members;
Expand Down Expand Up @@ -4310,6 +4317,7 @@ private final class CoordinatorClient extends KafkaGroupClient

this.joinGroup = joinGroup;
this.delegate = delegate;
this.signaler = signalerSupplier.apply(GROUP_REBALANCE_INITIAL_DELAY_MS);
this.decoder = decodeCoordinatorReject;
this.encoders = new ArrayDeque<>();
if (sasl != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,17 @@ public void shouldRebalanceProtocolHighlanderUnknownMemberId() throws Exception
k3po.finish();
}

@Test
@Configuration("client.yaml")
@Specification({
"${app}/leader.assignment/client",
"${net}/initial.delay.config/server"})
public void shouldCreateConnectionForJoinGroup() throws Exception
{
k3po.finish();
}


@Test
@Configuration("client.yaml")
@Specification({
Expand Down
Loading