Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
updating IT to use zilla.yaml instead of mock & handler
  • Loading branch information
ankitk-me committed May 9, 2024
commit 62c82207545b23401e5563ef17f22f68d8f31672
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ bindings:
options:
catalog:
catalog0:
- id: 1
- subject: not-subject1
exit: app0
2 changes: 1 addition & 1 deletion runtime/catalog-karapace/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<jacoco.coverage.ratio>0.84</jacoco.coverage.ratio>
<jacoco.coverage.ratio>0.95</jacoco.coverage.ratio>
<jacoco.missed.count>0</jacoco.missed.count>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ public int resolve(
event.onUnretrievableSchemaSubjectVersion(catalogId, subject, version);
if (cachedSchemaId != null && cachedSchemaId.id != NO_SCHEMA_ID)
{
event.onUnretrievableSchemaSubjectVersionStaleSchema(catalogId, subject, version, schemaId);
event.onUnretrievableSchemaSubjectVersionStaleSchema(catalogId, subject, version,
cachedSchemaId.id);
}
}

Expand Down

This file was deleted.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer this IT done via scripts not mocks.

Note: if using mocks then it is a unit test, not an integration test - unit tests with mocks are not stable after a refactor of the code as they also need to be changed, whereas integration tests remain stable, providing better confidence after changing implementation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be no need to drive any direct calls to KarapaceCatalogHandler via code and mocks.

Instead, this should be driven by test binding in zilla.yaml for the test. It may be necessary to add support for expected schema in test binding options so that it can verify the retrieved schema matches expectations.

Note: this would be for test binding only, as other bindings have no need to validate expectations.

Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,11 @@
package io.aklivity.zilla.runtime.catalog.karapace.internal;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.rules.RuleChain.outerRule;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.time.Clock;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
Expand All @@ -41,19 +34,29 @@

import io.aklivity.zilla.runtime.catalog.karapace.internal.config.KarapaceOptionsConfig;
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.binding.function.MessageConsumer;
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer;
import io.aklivity.zilla.runtime.engine.test.EngineRule;
import io.aklivity.zilla.runtime.engine.test.annotation.Configuration;

public class KarapaceIT
{
private final K3poRule k3po = new K3poRule()
.addScriptRoot("net", "io/aklivity/zilla/specs/engine/streams/network")
.addScriptRoot("app", "io/aklivity/zilla/specs/engine/streams/application")
.addScriptRoot("local", "io/aklivity/zilla/runtime/catalog/karapace/internal");

private final TestRule timeout = new DisableOnDebug(new Timeout(10, SECONDS));

private final EngineRule engine = new EngineRule()
.directory("target/zilla-itests")
.countersBufferCapacity(4096)
.configurationRoot("io/aklivity/zilla/specs/catalog/karapace/config")
.external("app0")
.clean();

@Rule
public final TestRule chain = outerRule(k3po).around(timeout);
public final TestRule chain = outerRule(engine).around(k3po).around(timeout);

private KarapaceOptionsConfig config;
private EngineContext context = mock(EngineContext.class);
Expand All @@ -69,169 +72,91 @@ public void setup()
}

@Test
@Configuration("resolve/schema/id/zilla.yaml")
@Specification({
"${net}/handshake/client",
"${app}/handshake/server",
"${local}/resolve.schema.via.schema.id" })
public void shouldResolveSchemaViaSchemaId() throws Exception
{
String expected = "{\"fields\":[{\"name\":\"id\",\"type\":\"string\"}," +
"{\"name\":\"status\",\"type\":\"string\"}]," +
"\"name\":\"Event\",\"namespace\":\"io.aklivity.example\",\"type\":\"record\"}";

KarapaceCatalogHandler catalog = new KarapaceCatalogHandler(config, context, 0L);

String schema = catalog.resolve(9);

k3po.finish();

assertThat(schema, not(nullValue()));
assertEquals(expected, schema);
}

@Test
@Configuration("resolve/subject/version/zilla.yaml")
@Specification({
"${net}/handshake/client",
"${app}/handshake/server",
"${local}/resolve.schema.via.subject.version" })
public void shouldResolveSchemaViaSubjectVersion() throws Exception
public void shouldResolveSchemaIdViaSubjectVersion() throws Exception
{
String expected = "{\"fields\":[{\"name\":\"id\",\"type\":\"string\"}," +
"{\"name\":\"status\",\"type\":\"string\"}]," +
"\"name\":\"Event\",\"namespace\":\"io.aklivity.example\",\"type\":\"record\"}";

KarapaceCatalogHandler catalog = new KarapaceCatalogHandler(config, context, 0L);

int schemaId = catalog.resolve("items-snapshots-value", "latest");

String schema = catalog.resolve(schemaId);

k3po.finish();

assertEquals(schemaId, 9);
assertThat(schema, not(nullValue()));
assertEquals(expected, schema);
}

@Test
@Configuration("resolve/schema/id/cache/zilla.yaml")
@Specification({
"${net}/handshake/client",
"${app}/handshake/server",
"${local}/resolve.schema.via.schema.id" })
public void shouldResolveSchemaViaSchemaIdFromCache() throws Exception
{
String expected = "{\"fields\":[{\"name\":\"id\",\"type\":\"string\"}," +
"{\"name\":\"status\",\"type\":\"string\"}]," +
"\"name\":\"Event\",\"namespace\":\"io.aklivity.example\",\"type\":\"record\"}";

KarapaceCatalogHandler catalog = new KarapaceCatalogHandler(config, context, 0L);

catalog.resolve(9);

k3po.finish();

String schema = catalog.resolve(9);

assertThat(schema, not(nullValue()));
assertEquals(expected, schema);
}

@Test
@Configuration("resolve/subject/version/cache/zilla.yaml")
@Specification({
"${net}/handshake/client",
"${app}/handshake/server",
"${local}/resolve.schema.via.subject.version" })
public void shouldResolveSchemaViaSubjectVersionFromCache() throws Exception
public void shouldResolveSchemaIdViaSubjectVersionFromCache() throws Exception
{
String expected = "{\"fields\":[{\"name\":\"id\",\"type\":\"string\"}," +
"{\"name\":\"status\",\"type\":\"string\"}]," +
"\"name\":\"Event\",\"namespace\":\"io.aklivity.example\",\"type\":\"record\"}";

KarapaceCatalogHandler catalog = new KarapaceCatalogHandler(config, context, 0L);

catalog.resolve(catalog.resolve("items-snapshots-value", "latest"));

k3po.finish();

int schemaId = catalog.resolve("items-snapshots-value", "latest");

String schema = catalog.resolve(schemaId);

assertEquals(schemaId, 9);
assertThat(schema, not(nullValue()));
assertEquals(expected, schema);
}

@Test
@Configuration("unretrievable/schema/id/zilla.yaml")
@Specification({
"${net}/handshake/client",
"${app}/handshake/server",
"${local}/resolve.schema.via.schema.id.failed"})
public void shouldLogFailedRegistryResponseForSchema() throws Exception
{
when(context.clock()).thenReturn(Clock.systemUTC());
when(context.supplyEventWriter()).thenReturn(mock(MessageConsumer.class));
KarapaceCache cache = new KarapaceCache();
KarapaceCatalogHandler catalog = new KarapaceCatalogHandler(config, context, 0L, cache);

String schema = catalog.resolve(1);

k3po.finish();

assertEquals(schema, null);
assertEquals(cache.schemas.get(1).get().retryAttempts.get(), 1);
}

@Test
@Configuration("unretrievable/schema/subject/version/zilla.yaml")
@Specification({
"${net}/handshake/client",
"${app}/handshake/server",
"${local}/resolve.schema.via.subject.version.failed"})
public void shouldServeStaleSchemaIdFromCacheAfterFailedRegistryResponseForId() throws Exception
public void shouldLogFailedRegistryResponseForSchemaId() throws Exception
{
when(context.clock()).thenReturn(Clock.systemUTC());
when(context.supplyEventWriter()).thenReturn(mock(MessageConsumer.class));
KarapaceCache cache = new KarapaceCache();
CompletableFuture<CachedSchemaId> future = new CompletableFuture<>();
future.complete(new CachedSchemaId(0L, 1, new AtomicInteger(0), 0L));
cache.schemaIds.put(-754089167, future);
KarapaceCatalogHandler catalog = new KarapaceCatalogHandler(config, context, 0L, cache);

int schemaId = catalog.resolve("items-snapshots-value", "latest");

k3po.finish();

assertEquals(schemaId, 1);

for (int schemaKey: cache.schemaIds.keySet())
{
assertEquals(cache.schemaIds.get(schemaKey).get().retryAfter, 1000L);
assertEquals(cache.schemaIds.get(schemaKey).get().retryAttempts.get(), 1);
}
}

@Test
@Configuration("resolve/schema/id/retry/zilla.yaml")
@Specification({
"${local}/resolve.schema.via.subject.version" })
public void shouldRetryToResolveSchemaViaSubjectVersionFromCache() throws Exception
"${net}/handshake/client",
"${app}/handshake/server",
"${local}/resolve.schema.via.schema.id.on.retry" })
public void shouldResolveSchemaViaSchemaIdOnRetry() throws Exception
{
String expected = "{\"fields\":[{\"name\":\"id\",\"type\":\"string\"}," +
"{\"name\":\"status\",\"type\":\"string\"}]," +
"\"name\":\"Event\",\"namespace\":\"io.aklivity.example\",\"type\":\"record\"}";
when(context.clock()).thenReturn(Clock.systemUTC());
when(context.supplyEventWriter()).thenReturn(mock(MessageConsumer.class));
KarapaceCache cache = new KarapaceCache();
CompletableFuture<CachedSchemaId> future = new CompletableFuture<>();
future.complete(new CachedSchemaId(System.currentTimeMillis(), 1, new AtomicInteger(2), 2000L));
cache.schemaIds.put(-754089167, future);

KarapaceCatalogHandler catalog = new KarapaceCatalogHandler(config, context, 0L, cache);

int schemaId = catalog.resolve("items-snapshots-value", "latest");

Thread.sleep(2000);

assertEquals(schemaId, 1);

k3po.start();

schemaId = catalog.resolve("items-snapshots-value", "latest");

String schema = catalog.resolve(schemaId);

k3po.finish();
}

assertEquals(schemaId, 9);
assertThat(schema, not(nullValue()));
assertEquals(expected, schema);
@Test
@Configuration("resolve/subject/version/retry/zilla.yaml")
@Specification({
"${net}/handshake/client",
"${app}/handshake/server",
"${local}/resolve.schema.via.subject.version.retry"})
public void shouldResolveSchemaIdFromCacheAndRetry() throws Exception
{
k3po.finish();
}

@Test
Expand Down
Loading