Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Review comments
  • Loading branch information
bmaidics committed Apr 19, 2024
commit 142b6998496f2bcf32b6a44403bffb7b3693e1b8
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ servers:
plain:
host: mqtt://localhost:7183
protocol: mqtt
defaultContentType: application/json
defaultContentType: application/json
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ catalogs:
options:
subjects:
subject1:
url: asyncapi/mqtt.yaml
path: asyncapi/mqtt.yaml

Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ catalogs:
options:
subjects:
subject1:
url: asyncapi/kafka.yaml
path: asyncapi/kafka.yaml
bindings:
net0:
type: test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@
"type": "object",
"properties":
{
"url":
"path":
{
"type": "string"
}
},
"required":
[
"url"
"path"
],
"additionalProperties": false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
*/
package io.aklivity.zilla.runtime.catalog.filesystem.internal;

import io.aklivity.zilla.runtime.common.feature.Incubating;
import io.aklivity.zilla.runtime.engine.Configuration;
import io.aklivity.zilla.runtime.engine.catalog.Catalog;
import io.aklivity.zilla.runtime.engine.catalog.CatalogFactorySpi;

@Incubating
public class FilesystemCatalogFactorySpi implements CatalogFactorySpi
Comment thread
bmaidics marked this conversation as resolved.
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/
package io.aklivity.zilla.runtime.catalog.filesystem.internal;

import java.io.InputStream;
import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -32,7 +34,7 @@ public class FilesystemCatalogHandler implements CatalogHandler
private final CRC32C crc32c;
private final FilesystemEventContext event;
private final long catalogId;
private final Function<String, String> readURL;
private final Function<String, URL> resolvePath;

public FilesystemCatalogHandler(
FilesystemOptionsConfig config,
Expand All @@ -43,7 +45,7 @@ public FilesystemCatalogHandler(
this.schemaIds = new HashMap<>();
this.crc32c = new CRC32C();
this.event = new FilesystemEventContext(context);
this.readURL = config.readURL;
this.resolvePath = context::resolvePath;
this.catalogId = catalogId;
registerSchema(config.subjects);
}
Expand All @@ -63,22 +65,25 @@ public int resolve(
return schemaIds.getOrDefault(subject, NO_SCHEMA_ID);
}



private void registerSchema(List<FilesystemSchemaConfig> configs)
private void registerSchema(
List<FilesystemSchemaConfig> configs)
{
for (FilesystemSchemaConfig config : configs)
{
String schema = readURL.apply(config.url);
if (schema != null && !schema.isEmpty())
try
{
int schemaId = generateCRC32C(schema);
schemas.put(schemaId, schema);
schemaIds.put(config.subject, schemaId);
URL storeURL = resolvePath.apply(config.path);
try (InputStream input = storeURL.openStream())
{
String schema = new String(input.readAllBytes());
int schemaId = generateCRC32C(schema);
schemas.put(schemaId, schema);
schemaIds.put(config.subject, schemaId);
}
}
else
catch (Exception ex)
{
event.fileNotFound(catalogId, config.url);
event.fileNotFound(catalogId, config.path);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
public class FilesystemOptionsConfig extends OptionsConfig
{
public final List<FilesystemSchemaConfig> subjects;
public final Function<String, String> readURL;

public static FilesystemOptionsConfigBuilder<FilesystemOptionsConfig> builder()
{
Expand All @@ -36,10 +35,8 @@ public static <T> FilesystemOptionsConfigBuilder<T> builder(
}

public FilesystemOptionsConfig(
List<FilesystemSchemaConfig> subjects,
Function<String, String> readURL)
List<FilesystemSchemaConfig> subjects)
{
this.subjects = subjects;
this.readURL = readURL;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,18 @@
*/
package io.aklivity.zilla.runtime.catalog.filesystem.internal.config;

import java.util.function.Function;

import jakarta.json.Json;
import jakarta.json.JsonObject;
import jakarta.json.JsonObjectBuilder;
import jakarta.json.bind.adapter.JsonbAdapter;

import io.aklivity.zilla.runtime.engine.config.ConfigAdapterContext;
import io.aklivity.zilla.runtime.engine.config.OptionsConfig;
import io.aklivity.zilla.runtime.engine.config.OptionsConfigAdapterSpi;

public class FilesystemOptionsConfigAdapter implements OptionsConfigAdapterSpi, JsonbAdapter<OptionsConfig, JsonObject>
{
private static final String SUBJECTS_NAME = "subjects";
private static final String URL_NAME = "url";
private Function<String, String> readURL;
private static final String PATH_NAME = "path";

@Override
public Kind kind()
Expand Down Expand Up @@ -57,7 +53,7 @@ public JsonObject adaptToJson(
{
JsonObjectBuilder schemaJson = Json.createObjectBuilder();

schemaJson.add(URL_NAME, schema.url);
schemaJson.add(PATH_NAME, schema.path);

catalogs.add(schema.subject, schemaJson);
}
Expand All @@ -80,21 +76,13 @@ public OptionsConfig adaptFromJson(
{
JsonObject schemaJson = subjectsJson.getJsonObject(subject);

String url = schemaJson.getString(URL_NAME);
String url = schemaJson.getString(PATH_NAME);

options.subjects(new FilesystemSchemaConfig(subject, url));
}
}
}
options.readURL(readURL);

return options.build();
}

@Override
public void adaptContext(
ConfigAdapterContext context)
{
this.readURL = context::readURL;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public final class FilesystemOptionsConfigBuilder<T> extends ConfigBuilder<T, Fi
private final Function<OptionsConfig, T> mapper;

private List<FilesystemSchemaConfig> subjects;
private Function<String, String> readURL;

FilesystemOptionsConfigBuilder(
Function<OptionsConfig, T> mapper)
Expand Down Expand Up @@ -57,16 +56,9 @@ public FilesystemOptionsConfigBuilder<T> subjects(
return this;
}

public FilesystemOptionsConfigBuilder<T> readURL(
Function<String, String> readURL)
{
this.readURL = readURL;
return this;
}

@Override
public T build()
{
return mapper.apply(new FilesystemOptionsConfig(subjects, readURL));
return mapper.apply(new FilesystemOptionsConfig(subjects));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
public class FilesystemSchemaConfig
{
public final String subject;
public final String url;
public final String path;

public FilesystemSchemaConfig(
String subject,
String url)
String path)
{
this.subject = subject;
this.url = url;
this.path = path;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class FilesystemSchemaConfigBuilder<T> extends ConfigBuilder<T, Filesyste
private final Function<FilesystemSchemaConfig, T> mapper;

private String subject;
private String url;
private String path;

public FilesystemSchemaConfigBuilder(
Function<FilesystemSchemaConfig, T> mapper)
Expand All @@ -45,16 +45,16 @@ public FilesystemSchemaConfigBuilder<T> subject(
return this;
}

public FilesystemSchemaConfigBuilder<T> url(
String url)
public FilesystemSchemaConfigBuilder<T> path(
String path)
{
this.url = url;
this.path = path;
return this;
}

@Override
public T build()
{
return mapper.apply(new FilesystemSchemaConfig(subject, url));
return mapper.apply(new FilesystemSchemaConfig(subject, path));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;

import java.util.function.Function;
import java.net.URL;

import org.junit.Test;
import org.mockito.Mockito;
Expand All @@ -47,15 +47,17 @@ public void shouldLoadAndCreate()
assertThat(catalog, instanceOf(FilesystemCatalog.class));
assertEquals("filesystem", catalog.name());

CatalogContext context = catalog.supply(mock(EngineContext.class));
assertThat(context, instanceOf(FilesystemCatalogContext.class));
EngineContext engineContext = mock(EngineContext.class);
URL url = FilesystemCatalogFactoryTest.class
.getResource("../../../../specs/catalog/filesystem/config/asyncapi/mqtt.yaml");
Mockito.doReturn(url).when(engineContext).resolvePath("asyncapi/mqtt.yaml");

Function<String, String> readURL = mock(Function.class);
Mockito.doReturn("test").when(readURL).apply("asyncapi/mqtt.yaml");
CatalogContext context = catalog.supply(engineContext);
assertThat(context, instanceOf(FilesystemCatalogContext.class));

FilesystemOptionsConfig catalogConfig =
new FilesystemOptionsConfig(singletonList(
new FilesystemSchemaConfig("subject1", "asyncapi/mqtt.yaml")), readURL);
new FilesystemSchemaConfig("subject1", "asyncapi/mqtt.yaml")));

CatalogConfig options = new CatalogConfig("test", "catalog0", "filesystem", catalogConfig);
CatalogHandler handler = context.attach(options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,19 @@
*/
package io.aklivity.zilla.runtime.catalog.filesystem.internal;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.singletonList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;

import java.io.IOException;
import java.io.InputStream;
import java.util.function.Function;
import java.net.URL;

import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;

import io.aklivity.zilla.runtime.catalog.filesystem.internal.config.FilesystemOptionsConfig;
Expand All @@ -43,22 +39,15 @@ public class FilesystemIT
{
private FilesystemOptionsConfig config;
private EngineContext context = mock(EngineContext.class);
@Mock
private Function<String, String> readURL = mock(Function.class);

@Before
public void setup() throws IOException
public void setup()
{
config = new FilesystemOptionsConfig(singletonList(
new FilesystemSchemaConfig("subject1", "asyncapi/mqtt.yaml")), readURL);

String content;
try (InputStream resource = FilesystemIT.class
.getResourceAsStream("../../../../specs/catalog/filesystem/config/asyncapi/mqtt.yaml"))
{
content = new String(resource.readAllBytes(), UTF_8);
}
Mockito.doReturn(content).when(readURL).apply("asyncapi/mqtt.yaml");
new FilesystemSchemaConfig("subject1", "asyncapi/mqtt.yaml")));

URL url = FilesystemIT.class.getResource("../../../../specs/catalog/filesystem/config/asyncapi/mqtt.yaml");
Mockito.doReturn(url).when(context).resolvePath("asyncapi/mqtt.yaml");
}

@Test
Expand All @@ -74,7 +63,7 @@ public void shouldResolveSchemaViaSchemaId()
" plain:\n" +
" host: mqtt://localhost:7183\n" +
" protocol: mqtt\n" +
"defaultContentType: application/json";
"defaultContentType: application/json\n";

FilesystemCatalogHandler catalog = new FilesystemCatalogHandler(config, context, 0L);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void shouldReadCondition()
"{" +
"\"subject1\":" +
"{" +
"\"url\":\"asyncapi/mqtt.yaml\"" +
"\"path\":\"asyncapi/mqtt.yaml\"" +
"}" +
"}" +
"}";
Expand All @@ -57,7 +57,7 @@ public void shouldReadCondition()
assertThat(catalog, not(nullValue()));
FilesystemSchemaConfig schema = catalog.subjects.get(0);
assertThat(schema.subject, equalTo("subject1"));
assertThat(schema.url, equalTo("asyncapi/mqtt.yaml"));
assertThat(schema.path, equalTo("asyncapi/mqtt.yaml"));
}

@Test
Expand All @@ -68,15 +68,15 @@ public void shouldWriteCondition()
"{" +
"\"subject1\":" +
"{" +
"\"url\":\"asyncapi/mqtt.yaml\"" +
"\"path\":\"asyncapi/mqtt.yaml\"" +
"}" +
"}" +
"}";

FilesystemOptionsConfig catalog = (FilesystemOptionsConfig) new FilesystemOptionsConfigBuilder<>(identity())
.subjects()
.subject("subject1")
.url("asyncapi/mqtt.yaml")
.path("asyncapi/mqtt.yaml")
.build()
.build();

Expand Down