Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright 2021-2023 Aklivity Inc.
*
* Aklivity licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.binding.kafka.identity;

import static io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfiguration.KAFKA_CLIENT_ID_DEFAULT;
import static io.aklivity.zilla.runtime.common.feature.FeatureFilter.filter;
import static java.util.ServiceLoader.load;

import java.util.ArrayList;
import java.util.List;

import io.aklivity.zilla.runtime.binding.kafka.config.KafkaServerConfig;
import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfiguration;
import io.aklivity.zilla.runtime.engine.Configuration;

public final class KafkaClientIdSupplier
{
public static KafkaClientIdSupplier instantiate(
Configuration config)
{
return instantiate(config, filter(load(KafkaClientIdSupplierFactorySpi.class)));
}

private final List<KafkaClientIdSupplierSpi> suppliers;

public String get(
KafkaServerConfig server)
{
String clientId = null;

match:
for (int index = 0; index < suppliers.size(); index++)
{
KafkaClientIdSupplierSpi supplier = suppliers.get(index);
if (supplier.matches(server))
{
clientId = supplier.get();
break match;
}
}

return clientId;
}

private KafkaClientIdSupplier(
List<KafkaClientIdSupplierSpi> suppliers)
{
this.suppliers = suppliers;
}

private static KafkaClientIdSupplier instantiate(
Configuration config,
Iterable<KafkaClientIdSupplierFactorySpi> factories)
{
List<KafkaClientIdSupplierSpi> suppliers = new ArrayList<>();

KafkaConfiguration kafka = new KafkaConfiguration(config);
String clientId = kafka.clientId();

if (clientId != null)
{
suppliers.add(new Fixed(clientId));
}

for (KafkaClientIdSupplierFactorySpi factory : factories)
{
suppliers.add(factory.create(config));
}

if (clientId == null)
{
suppliers.add(new Fixed(KAFKA_CLIENT_ID_DEFAULT));
}

return new KafkaClientIdSupplier(suppliers);
}

private static final class Fixed implements KafkaClientIdSupplierSpi
{
private final String clientId;

private Fixed(
String clientId)
{
this.clientId = clientId;
}

@Override
public boolean matches(
KafkaServerConfig server)
{
return true;
}

@Override
public String get()
{
return clientId;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2021-2023 Aklivity Inc.
*
* Aklivity licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.binding.kafka.identity;

import io.aklivity.zilla.runtime.engine.Configuration;

public interface KafkaClientIdSupplierFactorySpi
{
KafkaClientIdSupplierSpi create(
Configuration conifg);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,16 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.binding.kafka.internal.stream;
package io.aklivity.zilla.runtime.binding.kafka.identity;

final class KafkaBrokerInfo
import java.util.function.Supplier;

import io.aklivity.zilla.runtime.binding.kafka.config.KafkaServerConfig;

public interface KafkaClientIdSupplierSpi extends Supplier<String>
{
final int brokerId;
final String host;
final int port;
boolean matches(
KafkaServerConfig server);

KafkaBrokerInfo(
int brokerId,
String host,
int port)
{
this.brokerId = brokerId;
this.host = host;
this.port = port;
}
String get();
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class KafkaConfiguration extends Configuration
public static final boolean DEBUG = Boolean.getBoolean("zilla.binding.kafka.debug");
public static final boolean DEBUG_PRODUCE = DEBUG || Boolean.getBoolean("zilla.binding.kafka.debug.produce");

public static final String KAFKA_CLIENT_ID_DEFAULT = "zilla";

public static final IntPropertyDef KAFKA_CLIENT_MAX_IDLE_MILLIS;
public static final LongPropertyDef KAFKA_CLIENT_CONNECTION_POOL_CLEANUP_MILLIS;
public static final IntPropertyDef KAFKA_CLIENT_META_MAX_AGE_MILLIS;
Expand Down Expand Up @@ -79,7 +81,7 @@ public class KafkaConfiguration extends Configuration
static
{
final ConfigurationDef config = new ConfigurationDef("zilla.binding.kafka");
KAFKA_CLIENT_ID = config.property("client.id", "zilla");
KAFKA_CLIENT_ID = config.property("client.id");
KAFKA_CLIENT_INSTANCE_ID = config.property(InstanceIdSupplier.class, "client.instance.id",
KafkaConfiguration::decodeInstanceId, KafkaConfiguration::defaultInstanceId);
KAFKA_CLIENT_MAX_IDLE_MILLIS = config.property("client.max.idle.ms", 1 * 60 * 1000);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2021-2023 Aklivity Inc.
*
* Aklivity licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.binding.kafka.internal.identity;

import io.aklivity.zilla.runtime.binding.kafka.config.KafkaServerConfig;
import io.aklivity.zilla.runtime.binding.kafka.identity.KafkaClientIdSupplierSpi;
import io.aklivity.zilla.runtime.engine.Configuration;
import io.aklivity.zilla.runtime.engine.EngineConfiguration;

final class KafkaConfluentClientIdSupplier implements KafkaClientIdSupplierSpi
{
private final String clientId;

KafkaConfluentClientIdSupplier(
Configuration config)
{
EngineConfiguration engine = new EngineConfiguration(config);
clientId = String.format("cwc|0014U00003IYePAQA1|%s", engine.name());
}

public boolean matches(
KafkaServerConfig server)
{
return
server != null &&
server.host != null &&
server.host.endsWith(".confluent.cloud");
}

public String get()
{
return clientId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2021-2023 Aklivity Inc.
*
* Aklivity licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.aklivity.zilla.runtime.binding.kafka.internal.identity;

import io.aklivity.zilla.runtime.binding.kafka.identity.KafkaClientIdSupplierFactorySpi;
import io.aklivity.zilla.runtime.binding.kafka.identity.KafkaClientIdSupplierSpi;
import io.aklivity.zilla.runtime.engine.Configuration;

public final class KafkaConfluentClientIdSupplierFactory implements KafkaClientIdSupplierFactorySpi
{
@Override
public KafkaClientIdSupplierSpi create(
Configuration config)
{
return new KafkaConfluentClientIdSupplier(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static io.aklivity.zilla.runtime.engine.concurrent.Signaler.NO_CANCEL_ID;
import static java.lang.System.currentTimeMillis;

import java.util.List;
import java.util.function.Consumer;
import java.util.function.IntConsumer;
import java.util.function.LongFunction;
Expand All @@ -37,6 +38,7 @@
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.binding.kafka.config.KafkaSaslConfig;
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaServerConfig;
import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfiguration;
import io.aklivity.zilla.runtime.binding.kafka.internal.budget.MergedBudgetCreditor;
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaBindingConfig;
Expand Down Expand Up @@ -236,9 +238,10 @@ private KafkaClientConnection newConnection(
long authorization)
{
final KafkaBindingConfig binding = supplyBinding.apply(originId);
final List<KafkaServerConfig> servers = binding.servers();
final KafkaSaslConfig sasl = binding.sasl();

return new KafkaClientConnection(originId, routedId, authorization, sasl);
return new KafkaClientConnection(originId, routedId, authorization, servers, sasl);
}

private MessageConsumer newNetworkStream(
Expand Down Expand Up @@ -1217,9 +1220,10 @@ private KafkaClientConnection(
long originId,
long routedId,
long authorization,
List<KafkaServerConfig> servers,
KafkaSaslConfig sasl)
{
super(sasl, originId, routedId);
super(servers, sasl, originId, routedId);

this.originId = originId;
this.routedId = routedId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static java.util.Objects.requireNonNull;

import java.nio.ByteOrder;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -134,8 +133,6 @@ public final class KafkaClientDescribeFactory extends KafkaClientSaslHandshaker
private final KafkaDescribeClientDecoder decodeIgnoreAll = this::decodeIgnoreAll;
private final KafkaDescribeClientDecoder decodeReject = this::decodeReject;

private final SecureRandom randomServerIdGenerator = new SecureRandom();

private final long maxAgeMillis;
private final int kafkaTypeId;
private final int proxyTypeId;
Expand Down Expand Up @@ -909,7 +906,6 @@ private final class KafkaDescribeClient extends KafkaSaslClient
private MessageConsumer network;
private final String topic;
private final Map<String, String> configs;
private final List<KafkaServerConfig> servers;

private int state;
private long authorization;
Expand Down Expand Up @@ -948,10 +944,9 @@ private final class KafkaDescribeClient extends KafkaSaslClient
List<KafkaServerConfig> servers,
KafkaSaslConfig sasl)
{
super(sasl, originId, routedId);
super(servers, sasl, originId, routedId);
this.topic = requireNonNull(topic);
this.configs = new LinkedHashMap<>(configs.size());
this.servers = servers;
configs.forEach(c -> this.configs.put(c, null));

this.encoder = sasl != null ? encodeSaslHandshakeRequest : encodeDescribeRequest;
Expand Down Expand Up @@ -1196,19 +1191,16 @@ private void doNetworkBegin(

Consumer<OctetsFW.Builder> extension = EMPTY_EXTENSION;

final KafkaServerConfig kafkaServerConfig =
servers != null ? servers.get(randomServerIdGenerator.nextInt(servers.size())) : null;

if (kafkaServerConfig != null)
if (server != null)
{
extension = e -> e.set((b, o, l) -> proxyBeginExRW.wrap(b, o, l)
.typeId(proxyTypeId)
.address(a -> a.inet(i -> i.protocol(p -> p.set(STREAM))
.source("0.0.0.0")
.destination(kafkaServerConfig.host)
.destination(server.host)
.sourcePort(0)
.destinationPort(kafkaServerConfig.port)))
.infos(i -> i.item(ii -> ii.authority(kafkaServerConfig.host)))
.destinationPort(server.port)))
.infos(i -> i.item(ii -> ii.authority(server.host)))
.build()
.sizeof());
}
Expand Down
Loading