Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
a370a7b
WIP reconfigure only changed namespaces
attilakreiner May 31, 2024
3757300
WIP watch resources
attilakreiner Jun 5, 2024
c02e5c4
WIP refactoring watch resources
attilakreiner Jun 5, 2024
8fcae70
WIP refactoring 2
attilakreiner Jun 5, 2024
96f070e
WIP fix
attilakreiner Jun 6, 2024
df0b998
WIP EngineConfigWatcher
attilakreiner Jun 6, 2024
c0a554f
WIP ResourceResolver
attilakreiner Jun 6, 2024
b726267
WIP fix ResourceResolver
attilakreiner Jun 6, 2024
9c2cbbe
fix indentation
attilakreiner Jun 10, 2024
abb923f
fix EngineManager method names
attilakreiner Jun 10, 2024
b9355c6
rm ResourceResolver
attilakreiner Jun 10, 2024
c58ebc9
WIP EngineManager addResources
attilakreiner Jun 10, 2024
2285123
Revert "WIP reconfigure only changed namespaces"
attilakreiner Jun 11, 2024
39f34d0
fix EngineConfigWatcher addResources
attilakreiner Jun 11, 2024
b482fac
Merge branch 'develop' into config-reload
attilakreiner Jun 11, 2024
fd17488
Merge branch 'develop' into config-reload
attilakreiner Jun 13, 2024
b6c2ff2
Merge branch 'develop' into config-reload
attilakreiner Jun 14, 2024
2da3603
WIP readPath resolvePath readLocation
attilakreiner Jun 11, 2024
c85bdc1
WIP filesystem-http
attilakreiner Jun 5, 2024
ab94dac
WIP
attilakreiner Jun 18, 2024
22a2d03
fix
attilakreiner Jun 18, 2024
dfe9610
Merge branch 'develop' into config-reload
attilakreiner Jun 18, 2024
e4cf407
fix
attilakreiner Jun 18, 2024
df172d8
fix EngineConfiguration configPath
attilakreiner Jun 19, 2024
bef04ad
WIP fix
attilakreiner Jun 19, 2024
3177c80
WIP hfs 1
attilakreiner Jun 20, 2024
2a217f8
WIP hfs uri
attilakreiner Jun 20, 2024
4f5737a
WIP hfs resolveSibling
attilakreiner Jun 20, 2024
6395780
WIP hfs rm timeout
attilakreiner Jun 20, 2024
54a2b87
WIP hfs revert send
attilakreiner Jun 20, 2024
a5d0a39
WIP hfs HP readBody resolveBody
attilakreiner Jun 20, 2024
f19e623
WIP hfs HP readBody etag
attilakreiner Jun 20, 2024
a8a4015
WIP hfs HP
attilakreiner Jun 20, 2024
c2efe21
WIP hfs ignore AppIT
attilakreiner Jun 20, 2024
873b5e1
WIP hfs fix AppIT
attilakreiner Jun 20, 2024
c166431
WIP hfs watchBody 1
attilakreiner Jun 20, 2024
0c0f7f4
WIP hfs watchBody 2
attilakreiner Jun 20, 2024
31b420c
WIP hfs watchBody 3
attilakreiner Jun 21, 2024
d3109f7
WIP hfs watchBody 4
attilakreiner Jun 21, 2024
3198b10
WIP hfs watchBody 5
attilakreiner Jun 21, 2024
ef2d377
WIP hfs watchBody 6
attilakreiner Jun 21, 2024
8f982fe
WIP hfs getPath
attilakreiner Jun 21, 2024
50c8792
WIP hfs watchBody 7
attilakreiner Jun 21, 2024
71231a9
WIP fix
attilakreiner Jun 21, 2024
7535666
Remove BindingConfig.readLocation
jfallows Jun 26, 2024
e7c6c56
No watch event needed for identical response body
jfallows Jun 26, 2024
5423718
Handle status 204 with null body and infer delay for optional prefer …
jfallows Jun 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP readPath resolvePath readLocation
  • Loading branch information
attilakreiner committed Jun 14, 2024
commit 2da3603eb940efe4460fd3d0b9333fe917a4ede2
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ private void attachProxyBinding(
namespaceGenerator.init(binding);
final List<String> labels = configs.stream().map(c -> c.apiLabel).collect(toList());
final NamespaceConfig composite = namespaceGenerator.generateProxy(binding, asyncapis, schemaIdsByApiId::get, labels);
composite.readURL = binding.readURL;
composite.readLocation = binding.readLocation;
attach.accept(composite);
updateNamespace(configs, composite, new ArrayList<>(asyncapis.values()));
}
Expand Down Expand Up @@ -256,7 +256,7 @@ private void attachServerClientBinding(
namespaceConfig.servers.forEach(s -> s.setAsyncapiProtocol(
namespaceGenerator.resolveProtocol(s.protocol(), options, namespaceConfig.asyncapis, namespaceConfig.servers)));
final NamespaceConfig composite = namespaceGenerator.generate(binding, namespaceConfig);
composite.readURL = binding.readURL;
composite.readLocation = binding.readLocation;
attach.accept(composite);
updateNamespace(namespaceConfig.configs, composite, namespaceConfig.asyncapis);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package io.aklivity.zilla.runtime.binding.echo.internal.bench;

import java.net.InetAddress;
import java.net.URL;
import java.nio.channels.SelectableChannel;
import java.nio.file.Path;
import java.time.Clock;
import java.util.function.LongSupplier;

Expand Down Expand Up @@ -319,8 +319,8 @@ public ConverterHandler supplyWriteConverter(
}

@Override
public URL resolvePath(
String path)
public Path resolvePath(
String location)
{
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public final class GrpcOptionsConfigAdapter implements OptionsConfigAdapterSpi,

private final GrpcProtobufParser parser = new GrpcProtobufParser();

private Function<String, String> readURL;
private Function<String, String> readLocation;

@Override
public Kind kind()
Expand Down Expand Up @@ -88,7 +88,7 @@ public OptionsConfig adaptFromJson(
public void adaptContext(
ConfigAdapterContext context)
{
this.readURL = context::readURL;
this.readLocation = context::readLocation;
}

private List<GrpcProtobufConfig> asListProtobufs(
Expand All @@ -103,7 +103,7 @@ private GrpcProtobufConfig asProtobuf(
JsonValue value)
{
final String location = ((JsonString) value).getString();
final String protobuf = readURL.apply(location);
final String protobuf = readLocation.apply(location);

return parser.parse(location, protobuf);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void initJson() throws IOException
{
content = new String(resource.readAllBytes(), UTF_8);
}
Mockito.doReturn(content).when(context).readURL("protobuf/echo.proto");
Mockito.doReturn(content).when(context).readLocation("protobuf/echo.proto");
adapter = new OptionsConfigAdapter(OptionsConfigAdapterSpi.Kind.BINDING, context);
adapter.adaptType("grpc");
JsonbConfig config = new JsonbConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void attach(
Object2ObjectHashMap::new));

this.composite = namespaceGenerator.generate(binding, openapis, asyncapis, openapiSchemaIdsByApiId::get);
this.composite.readURL = binding.readURL;
this.composite.readLocation = binding.readLocation;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to get rid of composite.readURL / composite.readLocation entirely?

attach.accept(this.composite);

BindingConfig mappingBinding = composite.bindings.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void attach(
for (OpenapiNamespaceConfig namespaceConfig : namespaceConfigs.values())
{
final NamespaceConfig composite = namespaceGenerator.generate(binding, namespaceConfig);
composite.readURL = binding.readURL;
composite.readLocation = binding.readLocation;
attach.accept(composite);
namespaceConfig.configs.forEach(c ->
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
import static io.aklivity.zilla.runtime.engine.internal.stream.StreamId.isInitial;
import static java.lang.ThreadLocal.withInitial;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.agrona.LangUtil.rethrowUnchecked;

import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.channels.SelectableChannel;
import java.nio.file.Path;
import java.time.Clock;
import java.util.function.IntConsumer;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -85,7 +83,7 @@ public class TlsWorker implements EngineContext
private final BindingFactory factory;
private final VaultFactory vaultFactory;
private final Configuration config;
private final URL configURL;
private final Path configPath;

private final TlsSignaler signaler;

Expand All @@ -105,7 +103,7 @@ public TlsWorker(
.readonly(false)
.build()
.bufferPool();
this.configURL = config.configURL();
this.configPath = config.configPath();

this.signaler = new TlsSignaler();

Expand Down Expand Up @@ -387,19 +385,10 @@ public ConverterHandler supplyWriteConverter(
}

@Override
public URL resolvePath(
String path)
public Path resolvePath(
String location)
{
URL resolved = null;
try
{
resolved = new URL(configURL, path);
}
catch (MalformedURLException ex)
{
rethrowUnchecked(ex);
}
return resolved;
return configPath.resolveSibling(location);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
package io.aklivity.zilla.runtime.catalog.filesystem.internal;

import java.io.InputStream;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -34,7 +35,7 @@ public class FilesystemCatalogHandler implements CatalogHandler
private final CRC32C crc32c;
private final FilesystemEventContext event;
private final long catalogId;
private final Function<String, URL> resolvePath;
private final Function<String, Path> resolvePath;

public FilesystemCatalogHandler(
FilesystemOptionsConfig config,
Expand Down Expand Up @@ -72,8 +73,8 @@ private void registerSchema(
{
try
{
URL storeURL = resolvePath.apply(config.path);
try (InputStream input = storeURL.openStream())
Path storePath = resolvePath.apply(config.path);
try (InputStream input = Files.newInputStream(storePath))
{
String schema = new String(input.readAllBytes());
int schemaId = generateCRC32C(schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.mockito.Mockito.mock;

import java.net.URL;
import java.nio.file.Path;

import org.junit.Test;
import org.mockito.Mockito;
Expand All @@ -38,7 +39,7 @@
public class FilesystemCatalogFactoryTest
{
@Test
public void shouldLoadAndCreate()
public void shouldLoadAndCreate() throws Exception
{
Configuration config = new Configuration();
CatalogFactory factory = CatalogFactory.instantiate();
Expand All @@ -50,7 +51,8 @@ public void shouldLoadAndCreate()
EngineContext engineContext = mock(EngineContext.class);
URL url = FilesystemCatalogFactoryTest.class
.getResource("../../../../specs/catalog/filesystem/config/asyncapi/mqtt.yaml");
Mockito.doReturn(url).when(engineContext).resolvePath("asyncapi/mqtt.yaml");
Path path = Path.of(url.toURI());
Mockito.doReturn(path).when(engineContext).resolvePath("asyncapi/mqtt.yaml");

CatalogContext context = catalog.supply(engineContext);
assertThat(context, instanceOf(FilesystemCatalogContext.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.mockito.Mockito.mock;

import java.net.URL;
import java.nio.file.Path;

import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
Expand All @@ -41,13 +42,14 @@ public class FilesystemIT
private EngineContext context = mock(EngineContext.class);

@Before
public void setup()
public void setup() throws Exception
{
config = new FilesystemOptionsConfig(singletonList(
new FilesystemSchemaConfig("subject1", "asyncapi/mqtt.yaml")));

URL url = FilesystemIT.class.getResource("../../../../specs/catalog/filesystem/config/asyncapi/mqtt.yaml");
Mockito.doReturn(url).when(context).resolvePath("asyncapi/mqtt.yaml");
Path path = Path.of(url.toURI());
Mockito.doReturn(path).when(context).resolvePath("asyncapi/mqtt.yaml");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
package io.aklivity.zilla.runtime.engine;

import static io.aklivity.zilla.runtime.engine.internal.layouts.metrics.HistogramsLayout.BUCKETS;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.stream.Collectors.toList;
import static org.agrona.LangUtil.rethrowUnchecked;

import java.io.InputStream;
import java.net.URL;
import java.net.URLConnection;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -80,7 +79,7 @@ public final class Engine implements Collector, AutoCloseable
private final AtomicInteger nextTaskId;
private final ThreadFactory factory;

private final URL configURL;
private final Path configPath;
private final List<EngineWorker> workers;
private final boolean readonly;
private final EngineConfiguration config;
Expand Down Expand Up @@ -149,7 +148,7 @@ public final class Engine implements Collector, AutoCloseable
for (int workerIndex = 0; workerIndex < workerCount; workerIndex++)
{
EngineWorker worker =
new EngineWorker(config, tasks, labels, errorHandler, tuning::affinity, bindings, exporters,
new EngineWorker(config, tasks, labels, errorHandler, tuning::affinity, this::resolvePath, bindings, exporters,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
new EngineWorker(config, tasks, labels, errorHandler, tuning::affinity, this::resolvePath, bindings, exporters,
new EngineWorker(config, tasks, labels, errorHandler, tuning::affinity, bindings, exporters,

Move resolvePath to EngineWorker implementation instead.

guards, vaults, catalogs, models, metricGroups, this, this::supplyEventReader,
eventFormatterFactory, workerIndex, readonly, this::process);
workers.add(worker);
Expand Down Expand Up @@ -178,7 +177,7 @@ public final class Engine implements Collector, AutoCloseable
final Map<String, Guard> guardsByType = guards.stream()
.collect(Collectors.toMap(g -> g.name(), g -> g));

this.configURL = config.configURL();
this.configPath = config.configPath();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like configPath is no longer needed as Engine field after resovlePath(...) method is moved to EngineWorker implementation directly.

EngineManager manager = new EngineManager(
schemaTypes,
bindingsByType::get,
Expand All @@ -192,8 +191,9 @@ public final class Engine implements Collector, AutoCloseable
context,
config,
extensions,
this.configURL,
this::readURL);
this.configPath,
this::readPath,
this::readLocation);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of these need to be passed to EngineManager constructor.

Only configPath is needed and can be obtained from EngineConfiguration (already passed).


this.bindings = bindings;
this.tasks = tasks;
Expand Down Expand Up @@ -285,18 +285,13 @@ public static EngineBuilder builder()
return new EngineBuilder();
}

private String readURL(
String location)
private String readPath(
Path path)
{
String result;
try
{
URL url = new URL(configURL, location);
URLConnection connection = url.openConnection();
try (InputStream input = connection.getInputStream())
{
result = new String(input.readAllBytes(), UTF_8);
}
result = Files.readString(path);
}
catch (Exception ex)
{
Expand All @@ -305,6 +300,18 @@ private String readURL(
return result;
}

public Path resolvePath(
String location)
{
return configPath.resolveSibling(location);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method implementation should move to EngineWorker instead of passing a lambda from Engine.

Note: EngineWorker already has EngineConfig and can call config.configPath() in EngineWorker constructor instead of passing as an explicit constructor parameter.


private String readLocation(
String location)
{
return readPath(resolvePath(location));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove readLocation everywhere, and use Files.readString(path) instead at the call sites.

We should no longer need BindingConfig.readLocation directly given that EngineWorker.resolvePath(location) is available to all handlers, so BindingConfig.readLocation can be removed as well.

}

private Thread newTaskThread(
Runnable r)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.agrona.LangUtil.rethrowUnchecked;

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
Expand Down Expand Up @@ -146,6 +147,34 @@ public URL configURL()
return ENGINE_CONFIG_URL.get(this);
}

public Path configPath()
{
Path configPath = null;
try
{
URI configUri = configURL().toURI();
if ("file".equals(configUri.getScheme()) && !Path.of(configUri.getSchemeSpecificPart()).isAbsolute())
{
// this works for relative file e.g. file:zilla.yaml
Path basePath = Path.of("").toAbsolutePath();
configPath = basePath.resolve(configUri.getSchemeSpecificPart());
}
else
{
// this works for absolute file e.g. file:/path/dir/zilla.yaml
// this works for http e.g. http://localhost:7115/zilla.yaml
// this works for jar e.g. jar:file:/path/engine.jar!/package/zilla.yaml
// (the jar filesystem is opened and closed by EngineRule)
configPath = Path.of(configUri);
}
}
catch (Exception ex)
{
rethrowUnchecked(ex);
}
return configPath;
}

Comment thread
attilakreiner marked this conversation as resolved.
Outdated
public int configPollIntervalSeconds()
{
return ENGINE_CONFIG_POLL_INTERVAL_SECONDS.getAsInt(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package io.aklivity.zilla.runtime.engine;

import java.net.InetAddress;
import java.net.URL;
import java.nio.channels.SelectableChannel;
import java.nio.file.Path;
import java.time.Clock;
import java.util.function.LongSupplier;

Expand Down Expand Up @@ -157,8 +157,8 @@ ConverterHandler supplyReadConverter(
ConverterHandler supplyWriteConverter(
ModelConfig config);

URL resolvePath(
String path);
Path resolvePath(
String location);

Metric resolveMetric(
String name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class BindingConfig
public transient long id;
public transient long entryId;
public transient ToLongFunction<String> resolveId;
public transient Function<String, String> readURL;
public transient Function<String, String> readLocation;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to remove this completely?


public transient long vaultId;
public transient String qvault;
Expand Down
Loading