Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
support schema registration
  • Loading branch information
akrambek committed Sep 13, 2024
commit e061df897e020f2ef76e4e808d364e30518d56d3
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
public class SchemaRegistryCatalogHandler implements CatalogHandler
{
private static final String SUBJECT_VERSION_PATH = "/subjects/{0}/versions/{1}";
private static final String SUBJECT_PATH = "subjects/{0}/versions";

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing leading slash?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, no slash required. Spec is also correct accept "http://localhost:8081/subjects/items-snapshots-value/versions"

@jfallows jfallows Sep 13, 2024

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does SUBJECT_VERSION_PATH start with /subjects/... but SUBJECT_PATH starts with subjects/...?
Seems like these should be consistent, and paths tend to start with /, no?
It's possible that the HTTP client is conveniently injecting the / for us as a workaround for missing leading slash perhaps?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see what you mean I got it wrong yep :)

private static final String SCHEMA_PATH = "/schemas/ids/{0}";

private static final int MAX_PADDING_LENGTH = 5;
Expand Down Expand Up @@ -80,6 +81,22 @@ public SchemaRegistryCatalogHandler(
this.cachedSchemaIds = catalog.cache.schemaIds;
}

@Override
public int register(
String subject,
String schema)
{
int versionId = NO_VERSION_ID;

String response = sendPostHttpRequest(MessageFormat.format(SUBJECT_PATH, subject), schema);
if (response != null)
{
versionId = request.resolveResponse(response);
}

return versionId;
}

@Override
public String resolve(
int schemaId)
Expand Down Expand Up @@ -348,6 +365,30 @@ private String sendHttpRequest(
return responseBody;
}

private String sendPostHttpRequest(
String path,
String body)
{
HttpRequest httpRequest = HttpRequest
.newBuilder(toURI(baseUrl, path))
.header("content-type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(body))
.build();
// TODO: introduce interrupt/timeout for request to schema registry

String responseBody;
try
{
HttpResponse<String> httpResponse = client.send(httpRequest, HttpResponse.BodyHandlers.ofString());
responseBody = httpResponse.statusCode() == 200 ? httpResponse.body() : null;
}
catch (Exception ex)
{
responseBody = null;
}
return responseBody;
}

private URI toURI(
String baseUrl,
String path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,15 @@ public void shouldResolveSchemaIdFromCacheAndRetry() throws Exception
{
k3po.finish();
}

@Test
@Configuration("register/zilla.yaml")
@Specification({
"${net}/handshake/client",
"${app}/handshake/server",
"${remote}/register.schema" })
public void shouldRegisterSchema() throws Exception
{
k3po.finish();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
public interface CatalogHandler
{
int NO_SCHEMA_ID = 0;
int NO_VERSION_ID = 0;

@FunctionalInterface
interface Decoder
Expand Down Expand Up @@ -61,9 +62,12 @@ int accept(
ValueConsumer next);
}

int register(
default int register(
String subject,
String schema);
String schema)
{
return NO_VERSION_ID;
}

Comment on lines +65 to +71

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent.

String resolve(
int schemaId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void attach(

if (options.cataloged != null)
{
this.catalog = options.cataloged.size() != 0 ? options.cataloged.get(0).schemas.get(0) : null;
this.catalog = !options.cataloged.isEmpty() ? options.cataloged.get(0).schemas.get(0) : null;
this.catalogs = new LinkedList<>();
for (CatalogedConfig catalog : options.cataloged)
{
Expand Down Expand Up @@ -347,7 +347,11 @@ else if (assertion.schema != null && !assertion.schema.equals(schema))
}
else
{
if (catalog.subject != null && catalog.version != null)
if (catalog.subject != null && catalog.record != null)
{
handler.register(catalog.subject, catalog.record);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (catalog.subject != null && catalog.record != null)
{
handler.register(catalog.subject, catalog.record);
}
if (catalog.subject != null && catalog.schema != null)
{
handler.register(catalog.subject, catalog.schema);
}

else if (catalog.subject != null && catalog.version != null)
{
handler.resolve(catalog.subject, catalog.version);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#
# Copyright 2021-2023 Aklivity Inc
#
# Licensed under the Aklivity Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# https://www.aklivity.io/aklivity-community-license/
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#

---
name: test
catalogs:
catalog0:
type: schema-registry
options:
url: http://localhost:8081
bindings:
net0:
type: test
kind: server
options:
catalog:
catalog0:
- subject: items-snapshots-value
record: |

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

record is something else, this should be schema.

{
"schema":
{
"type": "record",
"name": "test",
"fields":
[
{
"type": "string",
"name": "field1"
},
{
"type": "com.acme.Referenced",
"name": "int"
}
]
},
"schemaType":"AVRO"
}
exit: app0
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,25 @@ read http:version "HTTP/1.1"
read http:header "Host" "localhost:8081"
read http:header "content-type" "application/json"

read '{'
'"schema":"'
'{'
'\\"type\\": \\"record\\",'
'\\"name\\": \\"test\\",'
'\\"fields\\":'
'['
'{'
'\\"type\\": \\"string\\",'
'\\"name\\": \\"field1\\"'
'},'
'{'
'\\"type\\": \\"com.acme.Referenced\\",'
'\\"name\\": \\"int\\"'
'}'
']'
'}",'
'"schemaType":"AVRO"'
'}'
read "{\n"
" \"schema\":\n"
" {\n"
" \"type\": \"record\",\n"
" \"name\": \"test\",\n"
" \"fields\":\n"
" [\n"
" {\n"
" \"type\": \"string\",\n"
" \"name\": \"field1\"\n"
" },\n"
" {\n"
" \"type\": \"com.acme.Referenced\",\n"
" \"name\": \"int\"\n"
" }\n"
" ]\n"
" },\n"
" \"schemaType\":\"AVRO\"\n"
"}\n"

read closed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,26 @@
"subject"
],
"additionalProperties": false
},
{
"type": "object",
"properties":
{
"subject":
{
"type": "string"
},
"record":
{
"type": "string",
}
},
"required":
[
"subject",
"record"
],
"additionalProperties": false
}
]
},
Expand Down