diff --git a/cloud/docker-image/pom.xml b/cloud/docker-image/pom.xml
index 4d3ace0346..47e0d218d3 100644
--- a/cloud/docker-image/pom.xml
+++ b/cloud/docker-image/pom.xml
@@ -360,6 +360,12 @@
${project.version}
runtime
+
+ ${project.groupId}
+ validator-protobuf
+ ${project.version}
+ runtime
+
diff --git a/cloud/docker-image/src/main/docker/assembly.xml b/cloud/docker-image/src/main/docker/assembly.xml
index bb5bbfbfe8..793a868b4e 100644
--- a/cloud/docker-image/src/main/docker/assembly.xml
+++ b/cloud/docker-image/src/main/docker/assembly.xml
@@ -61,6 +61,8 @@
com/fasterxml/jackson/**
org/yaml/snakeyaml/**
org/junit/**
+ com/google/**
+ org/checkerframework/**
diff --git a/cloud/docker-image/src/main/docker/incubator/zpm.json.template b/cloud/docker-image/src/main/docker/incubator/zpm.json.template
index 4d0a518aaa..79c3395b3d 100644
--- a/cloud/docker-image/src/main/docker/incubator/zpm.json.template
+++ b/cloud/docker-image/src/main/docker/incubator/zpm.json.template
@@ -51,6 +51,7 @@
"io.aklivity.zilla:validator-avro",
"io.aklivity.zilla:validator-core",
"io.aklivity.zilla:validator-json",
+ "io.aklivity.zilla:validator-protobuf",
"io.aklivity.zilla:vault-filesystem",
"org.slf4j:slf4j-simple",
"org.antlr:antlr4-runtime"
diff --git a/incubator/catalog-schema-registry/src/main/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/SchemaRegistryCatalogHandler.java b/incubator/catalog-schema-registry/src/main/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/SchemaRegistryCatalogHandler.java
index ab28a26a04..e7235a7d29 100644
--- a/incubator/catalog-schema-registry/src/main/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/SchemaRegistryCatalogHandler.java
+++ b/incubator/catalog-schema-registry/src/main/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/SchemaRegistryCatalogHandler.java
@@ -190,7 +190,7 @@ public int encode(
SchemaRegistryPrefixFW prefix = prefixRW.rewrap().schemaId(schemaId).build();
next.accept(prefix.buffer(), prefix.offset(), prefix.sizeof());
int valLength = encoder.accept(schemaId, data, index, length, next);
- return valLength != 0 ? prefix.sizeof() + valLength : -1;
+ return valLength > 0 ? prefix.sizeof() + valLength : -1;
}
@Override
diff --git a/incubator/command-generate/pom.xml b/incubator/command-generate/pom.xml
index c98be759eb..24b216abdd 100644
--- a/incubator/command-generate/pom.xml
+++ b/incubator/command-generate/pom.xml
@@ -103,6 +103,12 @@
${project.version}
provided
+
+ io.aklivity.zilla
+ validator-protobuf
+ ${project.version}
+ provided
+
io.aklivity.zilla
vault-filesystem
diff --git a/incubator/command-generate/src/main/moditect/module-info.java b/incubator/command-generate/src/main/moditect/module-info.java
index fe511b5c48..734ccaa7d5 100644
--- a/incubator/command-generate/src/main/moditect/module-info.java
+++ b/incubator/command-generate/src/main/moditect/module-info.java
@@ -26,6 +26,7 @@
requires io.aklivity.zilla.runtime.validator.avro;
requires io.aklivity.zilla.runtime.validator.core;
requires io.aklivity.zilla.runtime.validator.json;
+ requires io.aklivity.zilla.runtime.validator.protobuf;
requires com.fasterxml.jackson.dataformat.yaml;
requires com.fasterxml.jackson.databind;
diff --git a/incubator/pom.xml b/incubator/pom.xml
index 4db4a32fd0..825da641f8 100644
--- a/incubator/pom.xml
+++ b/incubator/pom.xml
@@ -24,6 +24,7 @@
validator-avro.spec
validator-core.spec
validator-json.spec
+ validator-protobuf.spec
binding-amqp
@@ -40,6 +41,7 @@
validator-avro
validator-core
validator-json
+ validator-protobuf
@@ -99,6 +101,11 @@
validator-json
${project.version}
+
+ ${project.groupId}
+ validator-protobuf
+ ${project.version}
+
diff --git a/incubator/validator-protobuf.spec/COPYRIGHT b/incubator/validator-protobuf.spec/COPYRIGHT
new file mode 100644
index 0000000000..0cb10b6f62
--- /dev/null
+++ b/incubator/validator-protobuf.spec/COPYRIGHT
@@ -0,0 +1,12 @@
+Copyright ${copyrightYears} Aklivity Inc
+
+Licensed under the Aklivity Community License (the "License"); you may not use
+this file except in compliance with the License. You may obtain a copy of the
+License at
+
+ https://www.aklivity.io/aklivity-community-license/
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OF ANY KIND, either express or implied. See the License for the
+specific language governing permissions and limitations under the License.
diff --git a/incubator/validator-protobuf.spec/LICENSE b/incubator/validator-protobuf.spec/LICENSE
new file mode 100644
index 0000000000..f6abb6327b
--- /dev/null
+++ b/incubator/validator-protobuf.spec/LICENSE
@@ -0,0 +1,114 @@
+ Aklivity Community License Agreement
+ Version 1.0
+
+This Aklivity Community License Agreement Version 1.0 (the “Agreement”) sets
+forth the terms on which Aklivity, Inc. (“Aklivity”) makes available certain
+software made available by Aklivity under this Agreement (the “Software”). BY
+INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY OF THE SOFTWARE,
+YOU AGREE TO THE TERMS AND CONDITIONS OF THIS AGREEMENT. IF YOU DO NOT AGREE TO
+SUCH TERMS AND CONDITIONS, YOU MUST NOT USE THE SOFTWARE. IF YOU ARE RECEIVING
+THE SOFTWARE ON BEHALF OF A LEGAL ENTITY, YOU REPRESENT AND WARRANT THAT YOU
+HAVE THE ACTUAL AUTHORITY TO AGREE TO THE TERMS AND CONDITIONS OF THIS
+AGREEMENT ON BEHALF OF SUCH ENTITY. “Licensee” means you, an individual, or
+the entity on whose behalf you are receiving the Software.
+
+ 1. LICENSE GRANT AND CONDITIONS.
+
+ 1.1 License. Subject to the terms and conditions of this Agreement,
+ Aklivity hereby grants to Licensee a non-exclusive, royalty-free,
+ worldwide, non-transferable, non-sublicenseable license during the term
+ of this Agreement to: (a) use the Software; (b) prepare modifications and
+ derivative works of the Software; (c) distribute the Software (including
+ without limitation in source code or object code form); and (d) reproduce
+ copies of the Software (the “License”). Licensee is not granted the
+ right to, and Licensee shall not, exercise the License for an Excluded
+ Purpose. For purposes of this Agreement, “Excluded Purpose” means making
+ available any software-as-a-service, platform-as-a-service,
+ infrastructure-as-a-service or other similar online service that competes
+ with Aklivity products or services that provide the Software.
+
+ 1.2 Conditions. In consideration of the License, Licensee’s distribution
+ of the Software is subject to the following conditions:
+
+ (a) Licensee must cause any Software modified by Licensee to carry
+ prominent notices stating that Licensee modified the Software.
+
+ (b) On each Software copy, Licensee shall reproduce and not remove or
+ alter all Aklivity or third party copyright or other proprietary
+ notices contained in the Software, and Licensee must provide the
+ notice below with each copy.
+
+ “This software is made available by Aklivity, Inc., under the
+ terms of the Aklivity Community License Agreement, Version 1.0
+ located at http://www.Aklivity.io/Aklivity-community-license. BY
+ INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY OF
+ THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.”
+
+ 1.3 Licensee Modifications. Licensee may add its own copyright notices
+ to modifications made by Licensee and may provide additional or different
+ license terms and conditions for use, reproduction, or distribution of
+ Licensee’s modifications. While redistributing the Software or
+ modifications thereof, Licensee may choose to offer, for a fee or free of
+ charge, support, warranty, indemnity, or other obligations. Licensee, and
+ not Aklivity, will be responsible for any such obligations.
+
+ 1.4 No Sublicensing. The License does not include the right to
+ sublicense the Software, however, each recipient to which Licensee
+ provides the Software may exercise the Licenses so long as such recipient
+ agrees to the terms and conditions of this Agreement.
+
+ 2. TERM AND TERMINATION. This Agreement will continue unless and until
+ earlier terminated as set forth herein. If Licensee breaches any of its
+ conditions or obligations under this Agreement, this Agreement will
+ terminate automatically and the License will terminate automatically and
+ permanently.
+
+ 3. INTELLECTUAL PROPERTY. As between the parties, Aklivity will retain all
+ right, title, and interest in the Software, and all intellectual property
+ rights therein. Aklivity hereby reserves all rights not expressly granted
+ to Licensee in this Agreement. Aklivity hereby reserves all rights in its
+ trademarks and service marks, and no licenses therein are granted in this
+ Agreement.
+
+ 4. DISCLAIMER. Aklivity HEREBY DISCLAIMS ANY AND ALL WARRANTIES AND
+ CONDITIONS, EXPRESS, IMPLIED, STATUTORY, OR OTHERWISE, AND SPECIFICALLY
+ DISCLAIMS ANY WARRANTY OF MERCHANTABILITY OR FITNESS FOR A PARTICULAR
+ PURPOSE, WITH RESPECT TO THE SOFTWARE.
+
+ 5. LIMITATION OF LIABILITY. Aklivity WILL NOT BE LIABLE FOR ANY DAMAGES OF
+ ANY KIND, INCLUDING BUT NOT LIMITED TO, LOST PROFITS OR ANY CONSEQUENTIAL,
+ SPECIAL, INCIDENTAL, INDIRECT, OR DIRECT DAMAGES, HOWEVER CAUSED AND ON ANY
+ THEORY OF LIABILITY, ARISING OUT OF THIS AGREEMENT. THE FOREGOING SHALL
+ APPLY TO THE EXTENT PERMITTED BY APPLICABLE LAW.
+
+ 6.GENERAL.
+
+ 6.1 Governing Law. This Agreement will be governed by and interpreted in
+ accordance with the laws of the state of California, without reference to
+ its conflict of laws principles. If Licensee is located within the
+ United States, all disputes arising out of this Agreement are subject to
+ the exclusive jurisdiction of courts located in Santa Clara County,
+ California. USA. If Licensee is located outside of the United States,
+ any dispute, controversy or claim arising out of or relating to this
+ Agreement will be referred to and finally determined by arbitration in
+ accordance with the JAMS International Arbitration Rules. The tribunal
+ will consist of one arbitrator. The place of arbitration will be Palo
+ Alto, California. The language to be used in the arbitral proceedings
+ will be English. Judgment upon the award rendered by the arbitrator may
+ be entered in any court having jurisdiction thereof.
+
+ 6.2 Assignment. Licensee is not authorized to assign its rights under
+ this Agreement to any third party. Aklivity may freely assign its rights
+ under this Agreement to any third party.
+
+ 6.3 Other. This Agreement is the entire agreement between the parties
+ regarding the subject matter hereof. No amendment or modification of
+ this Agreement will be valid or binding upon the parties unless made in
+ writing and signed by the duly authorized representatives of both
+ parties. In the event that any provision, including without limitation
+ any condition, of this Agreement is held to be unenforceable, this
+ Agreement and all licenses and rights granted hereunder will immediately
+ terminate. Waiver by Aklivity of a breach of any provision of this
+ Agreement or the failure by Aklivity to exercise any right hereunder
+ will not be construed as a waiver of any subsequent breach of that right
+ or as a waiver of any other right.
\ No newline at end of file
diff --git a/incubator/validator-protobuf.spec/NOTICE b/incubator/validator-protobuf.spec/NOTICE
new file mode 100644
index 0000000000..ed4c502c75
--- /dev/null
+++ b/incubator/validator-protobuf.spec/NOTICE
@@ -0,0 +1,23 @@
+Licensed under the Aklivity Community License (the "License"); you may not use
+this file except in compliance with the License. You may obtain a copy of the
+License at
+
+ https://www.aklivity.io/aklivity-community-license/
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OF ANY KIND, either express or implied. See the License for the
+specific language governing permissions and limitations under the License.
+
+This project includes:
+ agrona under The Apache License, Version 2.0
+ ANTLR 4 Runtime under BSD-3-Clause
+ ICU4J under Unicode/ICU License
+ Jakarta JSON Processing API under Eclipse Public License 2.0 or GNU General Public License, version 2 with the GNU Classpath Exception
+ Java Unified Expression Language API under The Apache Software License, Version 2.0
+ Java Unified Expression Language Implementation under The Apache Software License, Version 2.0
+ k3po/lang under The Apache Software License, Version 2.0
+ Kaazing Corporation License under The Apache Software License, Version 2.0
+ org.leadpony.justify under The Apache Software License, Version 2.0
+ zilla::specs::engine.spec under The Apache Software License, Version 2.0
+
diff --git a/incubator/validator-protobuf.spec/NOTICE.template b/incubator/validator-protobuf.spec/NOTICE.template
new file mode 100644
index 0000000000..209ca12f74
--- /dev/null
+++ b/incubator/validator-protobuf.spec/NOTICE.template
@@ -0,0 +1,13 @@
+Licensed under the Aklivity Community License (the "License"); you may not use
+this file except in compliance with the License. You may obtain a copy of the
+License at
+
+ https://www.aklivity.io/aklivity-community-license/
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OF ANY KIND, either express or implied. See the License for the
+specific language governing permissions and limitations under the License.
+
+This project includes:
+#GENERATED_NOTICES#
diff --git a/incubator/validator-protobuf.spec/mvnw b/incubator/validator-protobuf.spec/mvnw
new file mode 100755
index 0000000000..d2f0ea3808
--- /dev/null
+++ b/incubator/validator-protobuf.spec/mvnw
@@ -0,0 +1,310 @@
+#!/bin/sh
+# ----------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# ----------------------------------------------------------------------------
+
+# ----------------------------------------------------------------------------
+# Maven2 Start Up Batch script
+#
+# Required ENV vars:
+# ------------------
+# JAVA_HOME - location of a JDK home dir
+#
+# Optional ENV vars
+# -----------------
+# M2_HOME - location of maven2's installed home dir
+# MAVEN_OPTS - parameters passed to the Java VM when running Maven
+# e.g. to debug Maven itself, use
+# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
+# MAVEN_SKIP_RC - flag to disable loading of mavenrc files
+# ----------------------------------------------------------------------------
+
+if [ -z "$MAVEN_SKIP_RC" ] ; then
+
+ if [ -f /etc/mavenrc ] ; then
+ . /etc/mavenrc
+ fi
+
+ if [ -f "$HOME/.mavenrc" ] ; then
+ . "$HOME/.mavenrc"
+ fi
+
+fi
+
+# OS specific support. $var _must_ be set to either true or false.
+cygwin=false;
+darwin=false;
+mingw=false
+case "`uname`" in
+ CYGWIN*) cygwin=true ;;
+ MINGW*) mingw=true;;
+ Darwin*) darwin=true
+ # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
+ # See https://developer.apple.com/library/mac/qa/qa1170/_index.html
+ if [ -z "$JAVA_HOME" ]; then
+ if [ -x "/usr/libexec/java_home" ]; then
+ export JAVA_HOME="`/usr/libexec/java_home`"
+ else
+ export JAVA_HOME="/Library/Java/Home"
+ fi
+ fi
+ ;;
+esac
+
+if [ -z "$JAVA_HOME" ] ; then
+ if [ -r /etc/gentoo-release ] ; then
+ JAVA_HOME=`java-config --jre-home`
+ fi
+fi
+
+if [ -z "$M2_HOME" ] ; then
+ ## resolve links - $0 may be a link to maven's home
+ PRG="$0"
+
+ # need this for relative symlinks
+ while [ -h "$PRG" ] ; do
+ ls=`ls -ld "$PRG"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG="`dirname "$PRG"`/$link"
+ fi
+ done
+
+ saveddir=`pwd`
+
+ M2_HOME=`dirname "$PRG"`/..
+
+ # make it fully qualified
+ M2_HOME=`cd "$M2_HOME" && pwd`
+
+ cd "$saveddir"
+ # echo Using m2 at $M2_HOME
+fi
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin ; then
+ [ -n "$M2_HOME" ] &&
+ M2_HOME=`cygpath --unix "$M2_HOME"`
+ [ -n "$JAVA_HOME" ] &&
+ JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+ [ -n "$CLASSPATH" ] &&
+ CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
+fi
+
+# For Mingw, ensure paths are in UNIX format before anything is touched
+if $mingw ; then
+ [ -n "$M2_HOME" ] &&
+ M2_HOME="`(cd "$M2_HOME"; pwd)`"
+ [ -n "$JAVA_HOME" ] &&
+ JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
+fi
+
+if [ -z "$JAVA_HOME" ]; then
+ javaExecutable="`which javac`"
+ if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
+ # readlink(1) is not available as standard on Solaris 10.
+ readLink=`which readlink`
+ if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
+ if $darwin ; then
+ javaHome="`dirname \"$javaExecutable\"`"
+ javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
+ else
+ javaExecutable="`readlink -f \"$javaExecutable\"`"
+ fi
+ javaHome="`dirname \"$javaExecutable\"`"
+ javaHome=`expr "$javaHome" : '\(.*\)/bin'`
+ JAVA_HOME="$javaHome"
+ export JAVA_HOME
+ fi
+ fi
+fi
+
+if [ -z "$JAVACMD" ] ; then
+ if [ -n "$JAVA_HOME" ] ; then
+ if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+ # IBM's JDK on AIX uses strange locations for the executables
+ JAVACMD="$JAVA_HOME/jre/sh/java"
+ else
+ JAVACMD="$JAVA_HOME/bin/java"
+ fi
+ else
+ JAVACMD="`which java`"
+ fi
+fi
+
+if [ ! -x "$JAVACMD" ] ; then
+ echo "Error: JAVA_HOME is not defined correctly." >&2
+ echo " We cannot execute $JAVACMD" >&2
+ exit 1
+fi
+
+if [ -z "$JAVA_HOME" ] ; then
+ echo "Warning: JAVA_HOME environment variable is not set."
+fi
+
+CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
+
+# traverses directory structure from process work directory to filesystem root
+# first directory with .mvn subdirectory is considered project base directory
+find_maven_basedir() {
+
+ if [ -z "$1" ]
+ then
+ echo "Path not specified to find_maven_basedir"
+ return 1
+ fi
+
+ basedir="$1"
+ wdir="$1"
+ while [ "$wdir" != '/' ] ; do
+ if [ -d "$wdir"/.mvn ] ; then
+ basedir=$wdir
+ break
+ fi
+ # workaround for JBEAP-8937 (on Solaris 10/Sparc)
+ if [ -d "${wdir}" ]; then
+ wdir=`cd "$wdir/.."; pwd`
+ fi
+ # end of workaround
+ done
+ echo "${basedir}"
+}
+
+# concatenates all lines of a file
+concat_lines() {
+ if [ -f "$1" ]; then
+ echo "$(tr -s '\n' ' ' < "$1")"
+ fi
+}
+
+BASE_DIR=`find_maven_basedir "$(pwd)"`
+if [ -z "$BASE_DIR" ]; then
+ exit 1;
+fi
+
+##########################################################################################
+# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
+# This allows using the maven wrapper in projects that prohibit checking in binary data.
+##########################################################################################
+if [ -r "$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" ]; then
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Found .mvn/wrapper/maven-wrapper.jar"
+ fi
+else
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..."
+ fi
+ if [ -n "$MVNW_REPOURL" ]; then
+ jarUrl="$MVNW_REPOURL/io/takari/maven-wrapper/0.5.5/maven-wrapper-0.5.5.jar"
+ else
+ jarUrl="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.5/maven-wrapper-0.5.5.jar"
+ fi
+ while IFS="=" read key value; do
+ case "$key" in (wrapperUrl) jarUrl="$value"; break ;;
+ esac
+ done < "$BASE_DIR/.mvn/wrapper/maven-wrapper.properties"
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Downloading from: $jarUrl"
+ fi
+ wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar"
+ if $cygwin; then
+ wrapperJarPath=`cygpath --path --windows "$wrapperJarPath"`
+ fi
+
+ if command -v wget > /dev/null; then
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Found wget ... using wget"
+ fi
+ if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
+ wget "$jarUrl" -O "$wrapperJarPath"
+ else
+ wget --http-user=$MVNW_USERNAME --http-password=$MVNW_PASSWORD "$jarUrl" -O "$wrapperJarPath"
+ fi
+ elif command -v curl > /dev/null; then
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Found curl ... using curl"
+ fi
+ if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
+ curl -o "$wrapperJarPath" "$jarUrl" -f
+ else
+ curl --user $MVNW_USERNAME:$MVNW_PASSWORD -o "$wrapperJarPath" "$jarUrl" -f
+ fi
+
+ else
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Falling back to using Java to download"
+ fi
+ javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java"
+ # For Cygwin, switch paths to Windows format before running javac
+ if $cygwin; then
+ javaClass=`cygpath --path --windows "$javaClass"`
+ fi
+ if [ -e "$javaClass" ]; then
+ if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo " - Compiling MavenWrapperDownloader.java ..."
+ fi
+ # Compiling the Java class
+ ("$JAVA_HOME/bin/javac" "$javaClass")
+ fi
+ if [ -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then
+ # Running the downloader
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo " - Running MavenWrapperDownloader.java ..."
+ fi
+ ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$MAVEN_PROJECTBASEDIR")
+ fi
+ fi
+ fi
+fi
+##########################################################################################
+# End of extension
+##########################################################################################
+
+export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}
+if [ "$MVNW_VERBOSE" = true ]; then
+ echo $MAVEN_PROJECTBASEDIR
+fi
+MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin; then
+ [ -n "$M2_HOME" ] &&
+ M2_HOME=`cygpath --path --windows "$M2_HOME"`
+ [ -n "$JAVA_HOME" ] &&
+ JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
+ [ -n "$CLASSPATH" ] &&
+ CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
+ [ -n "$MAVEN_PROJECTBASEDIR" ] &&
+ MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
+fi
+
+# Provide a "standardized" way to retrieve the CLI args that will
+# work with both Windows and non-Windows executions.
+MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@"
+export MAVEN_CMD_LINE_ARGS
+
+WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
+
+exec "$JAVACMD" \
+ $MAVEN_OPTS \
+ -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
+ "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
+ ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"
diff --git a/incubator/validator-protobuf.spec/mvnw.cmd b/incubator/validator-protobuf.spec/mvnw.cmd
new file mode 100644
index 0000000000..b26ab24f03
--- /dev/null
+++ b/incubator/validator-protobuf.spec/mvnw.cmd
@@ -0,0 +1,182 @@
+@REM ----------------------------------------------------------------------------
+@REM Licensed to the Apache Software Foundation (ASF) under one
+@REM or more contributor license agreements. See the NOTICE file
+@REM distributed with this work for additional information
+@REM regarding copyright ownership. The ASF licenses this file
+@REM to you under the Apache License, Version 2.0 (the
+@REM "License"); you may not use this file except in compliance
+@REM with the License. You may obtain a copy of the License at
+@REM
+@REM http://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing,
+@REM software distributed under the License is distributed on an
+@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+@REM KIND, either express or implied. See the License for the
+@REM specific language governing permissions and limitations
+@REM under the License.
+@REM ----------------------------------------------------------------------------
+
+@REM ----------------------------------------------------------------------------
+@REM Maven2 Start Up Batch script
+@REM
+@REM Required ENV vars:
+@REM JAVA_HOME - location of a JDK home dir
+@REM
+@REM Optional ENV vars
+@REM M2_HOME - location of maven2's installed home dir
+@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
+@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending
+@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
+@REM e.g. to debug Maven itself, use
+@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
+@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
+@REM ----------------------------------------------------------------------------
+
+@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
+@echo off
+@REM set title of command window
+title %0
+@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on'
+@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
+
+@REM set %HOME% to equivalent of $HOME
+if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
+
+@REM Execute a user defined script before this one
+if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
+@REM check for pre script, once with legacy .bat ending and once with .cmd ending
+if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
+if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
+:skipRcPre
+
+@setlocal
+
+set ERROR_CODE=0
+
+@REM To isolate internal variables from possible post scripts, we use another setlocal
+@setlocal
+
+@REM ==== START VALIDATION ====
+if not "%JAVA_HOME%" == "" goto OkJHome
+
+echo.
+echo Error: JAVA_HOME not found in your environment. >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+:OkJHome
+if exist "%JAVA_HOME%\bin\java.exe" goto init
+
+echo.
+echo Error: JAVA_HOME is set to an invalid directory. >&2
+echo JAVA_HOME = "%JAVA_HOME%" >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+@REM ==== END VALIDATION ====
+
+:init
+
+@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
+@REM Fallback to current working directory if not found.
+
+set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
+IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
+
+set EXEC_DIR=%CD%
+set WDIR=%EXEC_DIR%
+:findBaseDir
+IF EXIST "%WDIR%"\.mvn goto baseDirFound
+cd ..
+IF "%WDIR%"=="%CD%" goto baseDirNotFound
+set WDIR=%CD%
+goto findBaseDir
+
+:baseDirFound
+set MAVEN_PROJECTBASEDIR=%WDIR%
+cd "%EXEC_DIR%"
+goto endDetectBaseDir
+
+:baseDirNotFound
+set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
+cd "%EXEC_DIR%"
+
+:endDetectBaseDir
+
+IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
+
+@setlocal EnableExtensions EnableDelayedExpansion
+for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
+@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
+
+:endReadAdditionalConfig
+
+SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
+set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
+set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
+
+set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.5/maven-wrapper-0.5.5.jar"
+
+FOR /F "tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO (
+ IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B
+)
+
+@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
+@REM This allows using the maven wrapper in projects that prohibit checking in binary data.
+if exist %WRAPPER_JAR% (
+ if "%MVNW_VERBOSE%" == "true" (
+ echo Found %WRAPPER_JAR%
+ )
+) else (
+ if not "%MVNW_REPOURL%" == "" (
+ SET DOWNLOAD_URL="%MVNW_REPOURL%/io/takari/maven-wrapper/0.5.5/maven-wrapper-0.5.5.jar"
+ )
+ if "%MVNW_VERBOSE%" == "true" (
+ echo Couldn't find %WRAPPER_JAR%, downloading it ...
+ echo Downloading from: %DOWNLOAD_URL%
+ )
+
+ powershell -Command "&{"^
+ "$webclient = new-object System.Net.WebClient;"^
+ "if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^
+ "$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^
+ "}"^
+ "[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"^
+ "}"
+ if "%MVNW_VERBOSE%" == "true" (
+ echo Finished downloading %WRAPPER_JAR%
+ )
+)
+@REM End of extension
+
+@REM Provide a "standardized" way to retrieve the CLI args that will
+@REM work with both Windows and non-Windows executions.
+set MAVEN_CMD_LINE_ARGS=%*
+
+%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
+if ERRORLEVEL 1 goto error
+goto end
+
+:error
+set ERROR_CODE=1
+
+:end
+@endlocal & set ERROR_CODE=%ERROR_CODE%
+
+if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
+@REM check for post script, once with legacy .bat ending and once with .cmd ending
+if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
+if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
+:skipRcPost
+
+@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
+if "%MAVEN_BATCH_PAUSE%" == "on" pause
+
+if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
+
+exit /B %ERROR_CODE%
diff --git a/incubator/validator-protobuf.spec/pom.xml b/incubator/validator-protobuf.spec/pom.xml
new file mode 100644
index 0000000000..aaf5571fe3
--- /dev/null
+++ b/incubator/validator-protobuf.spec/pom.xml
@@ -0,0 +1,111 @@
+
+
+
+4.0.0
+
+ io.aklivity.zilla
+ incubator
+ develop-SNAPSHOT
+ ../pom.xml
+
+
+validator-protobuf.spec
+zilla::incubator::validator-protobuf.spec
+
+
+
+ Aklivity Community License Agreement
+ https://www.aklivity.io/aklivity-community-license/
+ repo
+
+
+
+
+ 11
+ 11
+ 0.98
+ 0
+
+
+
+
+ ${project.groupId}
+ engine.spec
+ ${project.version}
+
+
+ junit
+ junit
+ test
+
+
+ org.hamcrest
+ hamcrest-library
+ test
+
+
+
+
+
+
+ src/main/resources
+
+
+ src/main/scripts
+
+
+
+
+
+ org.jasig.maven
+ maven-notice-plugin
+
+
+ com.mycila
+ license-maven-plugin
+
+
+ maven-checkstyle-plugin
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+ org.moditect
+ moditect-maven-plugin
+
+
+ org.jacoco
+ jacoco-maven-plugin
+
+
+
+ BUNDLE
+
+
+ INSTRUCTION
+ COVEREDRATIO
+ ${jacoco.coverage.ratio}
+
+
+ CLASS
+ MISSEDCOUNT
+ ${jacoco.missed.count}
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/incubator/validator-protobuf.spec/src/main/moditect/module-info.java b/incubator/validator-protobuf.spec/src/main/moditect/module-info.java
new file mode 100644
index 0000000000..9c10b90fa0
--- /dev/null
+++ b/incubator/validator-protobuf.spec/src/main/moditect/module-info.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc
+ *
+ * Licensed under the Aklivity Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * https://www.aklivity.io/aklivity-community-license/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+open module io.aklivity.zilla.specs.validator.protobuf
+{
+ requires transitive io.aklivity.zilla.specs.engine;
+}
diff --git a/incubator/validator-protobuf.spec/src/main/scripts/io/aklivity/zilla/specs/validator/protobuf/config/validator.yaml b/incubator/validator-protobuf.spec/src/main/scripts/io/aklivity/zilla/specs/validator/protobuf/config/validator.yaml
new file mode 100644
index 0000000000..e7f8d62b55
--- /dev/null
+++ b/incubator/validator-protobuf.spec/src/main/scripts/io/aklivity/zilla/specs/validator/protobuf/config/validator.yaml
@@ -0,0 +1,42 @@
+#
+# Copyright 2021-2023 Aklivity Inc
+#
+# Licensed under the Aklivity Community License (the "License"); you may not use
+# this file except in compliance with the License. You may obtain a copy of the
+# License at
+#
+# https://www.aklivity.io/aklivity-community-license/
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+#
+
+---
+name: test
+catalogs:
+ test0:
+ type: test
+ options:
+ schema: |
+ syntax = "proto3";
+ message example
+ {
+ string id = 1;
+ string status = 2;
+ }
+bindings:
+ test:
+ kind: server
+ type: test
+ options:
+ value:
+ type: protobuf
+ format: json
+ catalog:
+ catalog0:
+ - subject: test0
+ version: latest
+ record: example
+ exit: test
diff --git a/incubator/validator-protobuf.spec/src/main/scripts/io/aklivity/zilla/specs/validator/protobuf/schema/protobuf.schema.patch.json b/incubator/validator-protobuf.spec/src/main/scripts/io/aklivity/zilla/specs/validator/protobuf/schema/protobuf.schema.patch.json
new file mode 100644
index 0000000000..92ec64be6d
--- /dev/null
+++ b/incubator/validator-protobuf.spec/src/main/scripts/io/aklivity/zilla/specs/validator/protobuf/schema/protobuf.schema.patch.json
@@ -0,0 +1,152 @@
+[
+ {
+ "op": "add",
+ "path": "/$defs/validator/types/enum/-",
+ "value": "protobuf"
+ },
+ {
+ "op": "add",
+ "path": "/$defs/validator/allOf/-",
+ "value":
+ {
+ "if":
+ {
+ "properties":
+ {
+ "type":
+ {
+ "const": "protobuf"
+ }
+ }
+ },
+ "then":
+ {
+ "properties":
+ {
+ "type":
+ {
+ "const": "protobuf"
+ },
+ "format":
+ {
+ "type": "string",
+ "enum":
+ [
+ "json"
+ ]
+ },
+ "catalog":
+ {
+ "type": "object",
+ "patternProperties":
+ {
+ "^[a-zA-Z]+[a-zA-Z0-9\\._\\-]*$":
+ {
+ "type": "array",
+ "items":
+ {
+ "oneOf":
+ [
+ {
+ "type": "object",
+ "properties":
+ {
+ "id":
+ {
+ "type": "integer"
+ },
+ "record":
+ {
+ "type": "string"
+ }
+ },
+ "required":
+ [
+ "id"
+ ],
+ "additionalProperties": false
+ },
+ {
+ "type": "object",
+ "properties":
+ {
+ "schema":
+ {
+ "type": "string"
+ },
+ "version":
+ {
+ "type": "string",
+ "default": "latest"
+ },
+ "record":
+ {
+ "type": "string"
+ }
+ },
+ "required":
+ [
+ "schema"
+ ],
+ "additionalProperties": false
+ },
+ {
+ "type": "object",
+ "properties":
+ {
+ "strategy":
+ {
+ "type": "string"
+ },
+ "version":
+ {
+ "type": "string",
+ "default": "latest"
+ },
+ "record":
+ {
+ "type": "string"
+ }
+ },
+ "required":
+ [
+ "strategy"
+ ],
+ "additionalProperties": false
+ },
+ {
+ "type": "object",
+ "properties":
+ {
+ "subject":
+ {
+ "type": "string"
+ },
+ "version":
+ {
+ "type": "string",
+ "default": "latest"
+ },
+ "record":
+ {
+ "type": "string"
+ }
+ },
+ "required":
+ [
+ "subject"
+ ],
+ "additionalProperties": false
+ }
+ ]
+ }
+ }
+ },
+ "maxProperties": 1
+ }
+ },
+ "additionalProperties": false
+ }
+ }
+ }
+]
diff --git a/incubator/validator-protobuf.spec/src/test/java/io/aklivity/zilla/specs/validator/protobuf/config/SchemaTest.java b/incubator/validator-protobuf.spec/src/test/java/io/aklivity/zilla/specs/validator/protobuf/config/SchemaTest.java
new file mode 100644
index 0000000000..38111b881a
--- /dev/null
+++ b/incubator/validator-protobuf.spec/src/test/java/io/aklivity/zilla/specs/validator/protobuf/config/SchemaTest.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc
+ *
+ * Licensed under the Aklivity Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * https://www.aklivity.io/aklivity-community-license/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.aklivity.zilla.specs.validator.protobuf.config;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+
+import jakarta.json.JsonObject;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import io.aklivity.zilla.specs.engine.config.ConfigSchemaRule;
+
+public class SchemaTest
+{
+ @Rule
+ public final ConfigSchemaRule schema = new ConfigSchemaRule()
+ .schemaPatch("io/aklivity/zilla/specs/engine/schema/binding/test.schema.patch.json")
+ .schemaPatch("io/aklivity/zilla/specs/engine/schema/catalog/test.schema.patch.json")
+ .schemaPatch("io/aklivity/zilla/specs/validator/protobuf/schema/protobuf.schema.patch.json")
+ .configurationRoot("io/aklivity/zilla/specs/validator/protobuf/config");
+
+ @Test
+ public void shouldValidateCatalog()
+ {
+ JsonObject config = schema.validate("validator.yaml");
+
+ assertThat(config, not(nullValue()));
+ }
+}
diff --git a/incubator/validator-protobuf/COPYRIGHT b/incubator/validator-protobuf/COPYRIGHT
new file mode 100644
index 0000000000..0cb10b6f62
--- /dev/null
+++ b/incubator/validator-protobuf/COPYRIGHT
@@ -0,0 +1,12 @@
+Copyright ${copyrightYears} Aklivity Inc
+
+Licensed under the Aklivity Community License (the "License"); you may not use
+this file except in compliance with the License. You may obtain a copy of the
+License at
+
+ https://www.aklivity.io/aklivity-community-license/
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OF ANY KIND, either express or implied. See the License for the
+specific language governing permissions and limitations under the License.
diff --git a/incubator/validator-protobuf/LICENSE b/incubator/validator-protobuf/LICENSE
new file mode 100644
index 0000000000..f6abb6327b
--- /dev/null
+++ b/incubator/validator-protobuf/LICENSE
@@ -0,0 +1,114 @@
+ Aklivity Community License Agreement
+ Version 1.0
+
+This Aklivity Community License Agreement Version 1.0 (the “Agreement”) sets
+forth the terms on which Aklivity, Inc. (“Aklivity”) makes available certain
+software made available by Aklivity under this Agreement (the “Software”). BY
+INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY OF THE SOFTWARE,
+YOU AGREE TO THE TERMS AND CONDITIONS OF THIS AGREEMENT. IF YOU DO NOT AGREE TO
+SUCH TERMS AND CONDITIONS, YOU MUST NOT USE THE SOFTWARE. IF YOU ARE RECEIVING
+THE SOFTWARE ON BEHALF OF A LEGAL ENTITY, YOU REPRESENT AND WARRANT THAT YOU
+HAVE THE ACTUAL AUTHORITY TO AGREE TO THE TERMS AND CONDITIONS OF THIS
+AGREEMENT ON BEHALF OF SUCH ENTITY. “Licensee” means you, an individual, or
+the entity on whose behalf you are receiving the Software.
+
+ 1. LICENSE GRANT AND CONDITIONS.
+
+ 1.1 License. Subject to the terms and conditions of this Agreement,
+ Aklivity hereby grants to Licensee a non-exclusive, royalty-free,
+ worldwide, non-transferable, non-sublicenseable license during the term
+ of this Agreement to: (a) use the Software; (b) prepare modifications and
+ derivative works of the Software; (c) distribute the Software (including
+ without limitation in source code or object code form); and (d) reproduce
+ copies of the Software (the “License”). Licensee is not granted the
+ right to, and Licensee shall not, exercise the License for an Excluded
+ Purpose. For purposes of this Agreement, “Excluded Purpose” means making
+ available any software-as-a-service, platform-as-a-service,
+ infrastructure-as-a-service or other similar online service that competes
+ with Aklivity products or services that provide the Software.
+
+ 1.2 Conditions. In consideration of the License, Licensee’s distribution
+ of the Software is subject to the following conditions:
+
+ (a) Licensee must cause any Software modified by Licensee to carry
+ prominent notices stating that Licensee modified the Software.
+
+ (b) On each Software copy, Licensee shall reproduce and not remove or
+ alter all Aklivity or third party copyright or other proprietary
+ notices contained in the Software, and Licensee must provide the
+ notice below with each copy.
+
+ “This software is made available by Aklivity, Inc., under the
+ terms of the Aklivity Community License Agreement, Version 1.0
+ located at http://www.Aklivity.io/Aklivity-community-license. BY
+ INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY OF
+ THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.”
+
+ 1.3 Licensee Modifications. Licensee may add its own copyright notices
+ to modifications made by Licensee and may provide additional or different
+ license terms and conditions for use, reproduction, or distribution of
+ Licensee’s modifications. While redistributing the Software or
+ modifications thereof, Licensee may choose to offer, for a fee or free of
+ charge, support, warranty, indemnity, or other obligations. Licensee, and
+ not Aklivity, will be responsible for any such obligations.
+
+ 1.4 No Sublicensing. The License does not include the right to
+ sublicense the Software, however, each recipient to which Licensee
+ provides the Software may exercise the Licenses so long as such recipient
+ agrees to the terms and conditions of this Agreement.
+
+ 2. TERM AND TERMINATION. This Agreement will continue unless and until
+ earlier terminated as set forth herein. If Licensee breaches any of its
+ conditions or obligations under this Agreement, this Agreement will
+ terminate automatically and the License will terminate automatically and
+ permanently.
+
+ 3. INTELLECTUAL PROPERTY. As between the parties, Aklivity will retain all
+ right, title, and interest in the Software, and all intellectual property
+ rights therein. Aklivity hereby reserves all rights not expressly granted
+ to Licensee in this Agreement. Aklivity hereby reserves all rights in its
+ trademarks and service marks, and no licenses therein are granted in this
+ Agreement.
+
+ 4. DISCLAIMER. Aklivity HEREBY DISCLAIMS ANY AND ALL WARRANTIES AND
+ CONDITIONS, EXPRESS, IMPLIED, STATUTORY, OR OTHERWISE, AND SPECIFICALLY
+ DISCLAIMS ANY WARRANTY OF MERCHANTABILITY OR FITNESS FOR A PARTICULAR
+ PURPOSE, WITH RESPECT TO THE SOFTWARE.
+
+ 5. LIMITATION OF LIABILITY. Aklivity WILL NOT BE LIABLE FOR ANY DAMAGES OF
+ ANY KIND, INCLUDING BUT NOT LIMITED TO, LOST PROFITS OR ANY CONSEQUENTIAL,
+ SPECIAL, INCIDENTAL, INDIRECT, OR DIRECT DAMAGES, HOWEVER CAUSED AND ON ANY
+ THEORY OF LIABILITY, ARISING OUT OF THIS AGREEMENT. THE FOREGOING SHALL
+ APPLY TO THE EXTENT PERMITTED BY APPLICABLE LAW.
+
+ 6.GENERAL.
+
+ 6.1 Governing Law. This Agreement will be governed by and interpreted in
+ accordance with the laws of the state of California, without reference to
+ its conflict of laws principles. If Licensee is located within the
+ United States, all disputes arising out of this Agreement are subject to
+ the exclusive jurisdiction of courts located in Santa Clara County,
+ California. USA. If Licensee is located outside of the United States,
+ any dispute, controversy or claim arising out of or relating to this
+ Agreement will be referred to and finally determined by arbitration in
+ accordance with the JAMS International Arbitration Rules. The tribunal
+ will consist of one arbitrator. The place of arbitration will be Palo
+ Alto, California. The language to be used in the arbitral proceedings
+ will be English. Judgment upon the award rendered by the arbitrator may
+ be entered in any court having jurisdiction thereof.
+
+ 6.2 Assignment. Licensee is not authorized to assign its rights under
+ this Agreement to any third party. Aklivity may freely assign its rights
+ under this Agreement to any third party.
+
+ 6.3 Other. This Agreement is the entire agreement between the parties
+ regarding the subject matter hereof. No amendment or modification of
+ this Agreement will be valid or binding upon the parties unless made in
+ writing and signed by the duly authorized representatives of both
+ parties. In the event that any provision, including without limitation
+ any condition, of this Agreement is held to be unenforceable, this
+ Agreement and all licenses and rights granted hereunder will immediately
+ terminate. Waiver by Aklivity of a breach of any provision of this
+ Agreement or the failure by Aklivity to exercise any right hereunder
+ will not be construed as a waiver of any subsequent breach of that right
+ or as a waiver of any other right.
\ No newline at end of file
diff --git a/incubator/validator-protobuf/NOTICE b/incubator/validator-protobuf/NOTICE
new file mode 100644
index 0000000000..aa95b451ff
--- /dev/null
+++ b/incubator/validator-protobuf/NOTICE
@@ -0,0 +1,23 @@
+Licensed under the Aklivity Community License (the "License"); you may not use
+this file except in compliance with the License. You may obtain a copy of the
+License at
+
+ https://www.aklivity.io/aklivity-community-license/
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OF ANY KIND, either express or implied. See the License for the
+specific language governing permissions and limitations under the License.
+
+This project includes:
+ error-prone annotations under Apache 2.0
+ FindBugs-jsr305 under The Apache Software License, Version 2.0
+ Gson under Apache-2.0
+ Guava: Google Core Libraries for Java under Apache License, Version 2.0
+ J2ObjC Annotations under Apache License, Version 2.0
+ Protocol Buffers [Core] under BSD-3-Clause
+ Protocol Buffers [Util] under BSD-3-Clause
+
+
+This project also includes code under copyright of the following entities:
+ https://github.com/reaktivity/
diff --git a/incubator/validator-protobuf/NOTICE.template b/incubator/validator-protobuf/NOTICE.template
new file mode 100644
index 0000000000..ff901de01b
--- /dev/null
+++ b/incubator/validator-protobuf/NOTICE.template
@@ -0,0 +1,16 @@
+Licensed under the Aklivity Community License (the "License"); you may not use
+this file except in compliance with the License. You may obtain a copy of the
+License at
+
+ https://www.aklivity.io/aklivity-community-license/
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OF ANY KIND, either express or implied. See the License for the
+specific language governing permissions and limitations under the License.
+
+This project includes:
+#GENERATED_NOTICES#
+
+This project also includes code under copyright of the following entities:
+ https://github.com/reaktivity/
\ No newline at end of file
diff --git a/incubator/validator-protobuf/mvnw b/incubator/validator-protobuf/mvnw
new file mode 100755
index 0000000000..d2f0ea3808
--- /dev/null
+++ b/incubator/validator-protobuf/mvnw
@@ -0,0 +1,310 @@
+#!/bin/sh
+# ----------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# ----------------------------------------------------------------------------
+
+# ----------------------------------------------------------------------------
+# Maven2 Start Up Batch script
+#
+# Required ENV vars:
+# ------------------
+# JAVA_HOME - location of a JDK home dir
+#
+# Optional ENV vars
+# -----------------
+# M2_HOME - location of maven2's installed home dir
+# MAVEN_OPTS - parameters passed to the Java VM when running Maven
+# e.g. to debug Maven itself, use
+# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
+# MAVEN_SKIP_RC - flag to disable loading of mavenrc files
+# ----------------------------------------------------------------------------
+
+if [ -z "$MAVEN_SKIP_RC" ] ; then
+
+ if [ -f /etc/mavenrc ] ; then
+ . /etc/mavenrc
+ fi
+
+ if [ -f "$HOME/.mavenrc" ] ; then
+ . "$HOME/.mavenrc"
+ fi
+
+fi
+
+# OS specific support. $var _must_ be set to either true or false.
+cygwin=false;
+darwin=false;
+mingw=false
+case "`uname`" in
+ CYGWIN*) cygwin=true ;;
+ MINGW*) mingw=true;;
+ Darwin*) darwin=true
+ # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home
+ # See https://developer.apple.com/library/mac/qa/qa1170/_index.html
+ if [ -z "$JAVA_HOME" ]; then
+ if [ -x "/usr/libexec/java_home" ]; then
+ export JAVA_HOME="`/usr/libexec/java_home`"
+ else
+ export JAVA_HOME="/Library/Java/Home"
+ fi
+ fi
+ ;;
+esac
+
+if [ -z "$JAVA_HOME" ] ; then
+ if [ -r /etc/gentoo-release ] ; then
+ JAVA_HOME=`java-config --jre-home`
+ fi
+fi
+
+if [ -z "$M2_HOME" ] ; then
+ ## resolve links - $0 may be a link to maven's home
+ PRG="$0"
+
+ # need this for relative symlinks
+ while [ -h "$PRG" ] ; do
+ ls=`ls -ld "$PRG"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG="`dirname "$PRG"`/$link"
+ fi
+ done
+
+ saveddir=`pwd`
+
+ M2_HOME=`dirname "$PRG"`/..
+
+ # make it fully qualified
+ M2_HOME=`cd "$M2_HOME" && pwd`
+
+ cd "$saveddir"
+ # echo Using m2 at $M2_HOME
+fi
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin ; then
+ [ -n "$M2_HOME" ] &&
+ M2_HOME=`cygpath --unix "$M2_HOME"`
+ [ -n "$JAVA_HOME" ] &&
+ JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+ [ -n "$CLASSPATH" ] &&
+ CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
+fi
+
+# For Mingw, ensure paths are in UNIX format before anything is touched
+if $mingw ; then
+ [ -n "$M2_HOME" ] &&
+ M2_HOME="`(cd "$M2_HOME"; pwd)`"
+ [ -n "$JAVA_HOME" ] &&
+ JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
+fi
+
+if [ -z "$JAVA_HOME" ]; then
+ javaExecutable="`which javac`"
+ if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
+ # readlink(1) is not available as standard on Solaris 10.
+ readLink=`which readlink`
+ if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
+ if $darwin ; then
+ javaHome="`dirname \"$javaExecutable\"`"
+ javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
+ else
+ javaExecutable="`readlink -f \"$javaExecutable\"`"
+ fi
+ javaHome="`dirname \"$javaExecutable\"`"
+ javaHome=`expr "$javaHome" : '\(.*\)/bin'`
+ JAVA_HOME="$javaHome"
+ export JAVA_HOME
+ fi
+ fi
+fi
+
+if [ -z "$JAVACMD" ] ; then
+ if [ -n "$JAVA_HOME" ] ; then
+ if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+ # IBM's JDK on AIX uses strange locations for the executables
+ JAVACMD="$JAVA_HOME/jre/sh/java"
+ else
+ JAVACMD="$JAVA_HOME/bin/java"
+ fi
+ else
+ JAVACMD="`which java`"
+ fi
+fi
+
+if [ ! -x "$JAVACMD" ] ; then
+ echo "Error: JAVA_HOME is not defined correctly." >&2
+ echo " We cannot execute $JAVACMD" >&2
+ exit 1
+fi
+
+if [ -z "$JAVA_HOME" ] ; then
+ echo "Warning: JAVA_HOME environment variable is not set."
+fi
+
+CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
+
+# traverses directory structure from process work directory to filesystem root
+# first directory with .mvn subdirectory is considered project base directory
+find_maven_basedir() {
+
+ if [ -z "$1" ]
+ then
+ echo "Path not specified to find_maven_basedir"
+ return 1
+ fi
+
+ basedir="$1"
+ wdir="$1"
+ while [ "$wdir" != '/' ] ; do
+ if [ -d "$wdir"/.mvn ] ; then
+ basedir=$wdir
+ break
+ fi
+ # workaround for JBEAP-8937 (on Solaris 10/Sparc)
+ if [ -d "${wdir}" ]; then
+ wdir=`cd "$wdir/.."; pwd`
+ fi
+ # end of workaround
+ done
+ echo "${basedir}"
+}
+
+# concatenates all lines of a file
+concat_lines() {
+ if [ -f "$1" ]; then
+ echo "$(tr -s '\n' ' ' < "$1")"
+ fi
+}
+
+BASE_DIR=`find_maven_basedir "$(pwd)"`
+if [ -z "$BASE_DIR" ]; then
+ exit 1;
+fi
+
+##########################################################################################
+# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
+# This allows using the maven wrapper in projects that prohibit checking in binary data.
+##########################################################################################
+if [ -r "$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" ]; then
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Found .mvn/wrapper/maven-wrapper.jar"
+ fi
+else
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..."
+ fi
+ if [ -n "$MVNW_REPOURL" ]; then
+ jarUrl="$MVNW_REPOURL/io/takari/maven-wrapper/0.5.5/maven-wrapper-0.5.5.jar"
+ else
+ jarUrl="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.5/maven-wrapper-0.5.5.jar"
+ fi
+ while IFS="=" read key value; do
+ case "$key" in (wrapperUrl) jarUrl="$value"; break ;;
+ esac
+ done < "$BASE_DIR/.mvn/wrapper/maven-wrapper.properties"
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Downloading from: $jarUrl"
+ fi
+ wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar"
+ if $cygwin; then
+ wrapperJarPath=`cygpath --path --windows "$wrapperJarPath"`
+ fi
+
+ if command -v wget > /dev/null; then
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Found wget ... using wget"
+ fi
+ if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
+ wget "$jarUrl" -O "$wrapperJarPath"
+ else
+ wget --http-user=$MVNW_USERNAME --http-password=$MVNW_PASSWORD "$jarUrl" -O "$wrapperJarPath"
+ fi
+ elif command -v curl > /dev/null; then
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Found curl ... using curl"
+ fi
+ if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
+ curl -o "$wrapperJarPath" "$jarUrl" -f
+ else
+ curl --user $MVNW_USERNAME:$MVNW_PASSWORD -o "$wrapperJarPath" "$jarUrl" -f
+ fi
+
+ else
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo "Falling back to using Java to download"
+ fi
+ javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java"
+ # For Cygwin, switch paths to Windows format before running javac
+ if $cygwin; then
+ javaClass=`cygpath --path --windows "$javaClass"`
+ fi
+ if [ -e "$javaClass" ]; then
+ if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo " - Compiling MavenWrapperDownloader.java ..."
+ fi
+ # Compiling the Java class
+ ("$JAVA_HOME/bin/javac" "$javaClass")
+ fi
+ if [ -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then
+ # Running the downloader
+ if [ "$MVNW_VERBOSE" = true ]; then
+ echo " - Running MavenWrapperDownloader.java ..."
+ fi
+ ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$MAVEN_PROJECTBASEDIR")
+ fi
+ fi
+ fi
+fi
+##########################################################################################
+# End of extension
+##########################################################################################
+
+export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}
+if [ "$MVNW_VERBOSE" = true ]; then
+ echo $MAVEN_PROJECTBASEDIR
+fi
+MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin; then
+ [ -n "$M2_HOME" ] &&
+ M2_HOME=`cygpath --path --windows "$M2_HOME"`
+ [ -n "$JAVA_HOME" ] &&
+ JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
+ [ -n "$CLASSPATH" ] &&
+ CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
+ [ -n "$MAVEN_PROJECTBASEDIR" ] &&
+ MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
+fi
+
+# Provide a "standardized" way to retrieve the CLI args that will
+# work with both Windows and non-Windows executions.
+MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@"
+export MAVEN_CMD_LINE_ARGS
+
+WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
+
+exec "$JAVACMD" \
+ $MAVEN_OPTS \
+ -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
+ "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
+ ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@"
diff --git a/incubator/validator-protobuf/mvnw.cmd b/incubator/validator-protobuf/mvnw.cmd
new file mode 100644
index 0000000000..b26ab24f03
--- /dev/null
+++ b/incubator/validator-protobuf/mvnw.cmd
@@ -0,0 +1,182 @@
+@REM ----------------------------------------------------------------------------
+@REM Licensed to the Apache Software Foundation (ASF) under one
+@REM or more contributor license agreements. See the NOTICE file
+@REM distributed with this work for additional information
+@REM regarding copyright ownership. The ASF licenses this file
+@REM to you under the Apache License, Version 2.0 (the
+@REM "License"); you may not use this file except in compliance
+@REM with the License. You may obtain a copy of the License at
+@REM
+@REM http://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing,
+@REM software distributed under the License is distributed on an
+@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+@REM KIND, either express or implied. See the License for the
+@REM specific language governing permissions and limitations
+@REM under the License.
+@REM ----------------------------------------------------------------------------
+
+@REM ----------------------------------------------------------------------------
+@REM Maven2 Start Up Batch script
+@REM
+@REM Required ENV vars:
+@REM JAVA_HOME - location of a JDK home dir
+@REM
+@REM Optional ENV vars
+@REM M2_HOME - location of maven2's installed home dir
+@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
+@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending
+@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
+@REM e.g. to debug Maven itself, use
+@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
+@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
+@REM ----------------------------------------------------------------------------
+
+@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
+@echo off
+@REM set title of command window
+title %0
+@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on'
+@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
+
+@REM set %HOME% to equivalent of $HOME
+if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
+
+@REM Execute a user defined script before this one
+if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
+@REM check for pre script, once with legacy .bat ending and once with .cmd ending
+if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
+if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
+:skipRcPre
+
+@setlocal
+
+set ERROR_CODE=0
+
+@REM To isolate internal variables from possible post scripts, we use another setlocal
+@setlocal
+
+@REM ==== START VALIDATION ====
+if not "%JAVA_HOME%" == "" goto OkJHome
+
+echo.
+echo Error: JAVA_HOME not found in your environment. >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+:OkJHome
+if exist "%JAVA_HOME%\bin\java.exe" goto init
+
+echo.
+echo Error: JAVA_HOME is set to an invalid directory. >&2
+echo JAVA_HOME = "%JAVA_HOME%" >&2
+echo Please set the JAVA_HOME variable in your environment to match the >&2
+echo location of your Java installation. >&2
+echo.
+goto error
+
+@REM ==== END VALIDATION ====
+
+:init
+
+@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
+@REM Fallback to current working directory if not found.
+
+set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
+IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
+
+set EXEC_DIR=%CD%
+set WDIR=%EXEC_DIR%
+:findBaseDir
+IF EXIST "%WDIR%"\.mvn goto baseDirFound
+cd ..
+IF "%WDIR%"=="%CD%" goto baseDirNotFound
+set WDIR=%CD%
+goto findBaseDir
+
+:baseDirFound
+set MAVEN_PROJECTBASEDIR=%WDIR%
+cd "%EXEC_DIR%"
+goto endDetectBaseDir
+
+:baseDirNotFound
+set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
+cd "%EXEC_DIR%"
+
+:endDetectBaseDir
+
+IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
+
+@setlocal EnableExtensions EnableDelayedExpansion
+for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
+@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
+
+:endReadAdditionalConfig
+
+SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
+set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
+set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
+
+set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.5/maven-wrapper-0.5.5.jar"
+
+FOR /F "tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO (
+ IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B
+)
+
+@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
+@REM This allows using the maven wrapper in projects that prohibit checking in binary data.
+if exist %WRAPPER_JAR% (
+ if "%MVNW_VERBOSE%" == "true" (
+ echo Found %WRAPPER_JAR%
+ )
+) else (
+ if not "%MVNW_REPOURL%" == "" (
+ SET DOWNLOAD_URL="%MVNW_REPOURL%/io/takari/maven-wrapper/0.5.5/maven-wrapper-0.5.5.jar"
+ )
+ if "%MVNW_VERBOSE%" == "true" (
+ echo Couldn't find %WRAPPER_JAR%, downloading it ...
+ echo Downloading from: %DOWNLOAD_URL%
+ )
+
+ powershell -Command "&{"^
+ "$webclient = new-object System.Net.WebClient;"^
+ "if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^
+ "$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^
+ "}"^
+ "[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"^
+ "}"
+ if "%MVNW_VERBOSE%" == "true" (
+ echo Finished downloading %WRAPPER_JAR%
+ )
+)
+@REM End of extension
+
+@REM Provide a "standardized" way to retrieve the CLI args that will
+@REM work with both Windows and non-Windows executions.
+set MAVEN_CMD_LINE_ARGS=%*
+
+%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
+if ERRORLEVEL 1 goto error
+goto end
+
+:error
+set ERROR_CODE=1
+
+:end
+@endlocal & set ERROR_CODE=%ERROR_CODE%
+
+if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
+@REM check for post script, once with legacy .bat ending and once with .cmd ending
+if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
+if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
+:skipRcPost
+
+@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
+if "%MAVEN_BATCH_PAUSE%" == "on" pause
+
+if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
+
+exit /B %ERROR_CODE%
diff --git a/incubator/validator-protobuf/pom.xml b/incubator/validator-protobuf/pom.xml
new file mode 100644
index 0000000000..6d1f1bfef6
--- /dev/null
+++ b/incubator/validator-protobuf/pom.xml
@@ -0,0 +1,209 @@
+
+
+
+4.0.0
+
+ io.aklivity.zilla
+ incubator
+ develop-SNAPSHOT
+ ../pom.xml
+
+
+validator-protobuf
+zilla::incubator::validator-protobuf
+
+
+
+ Aklivity Community License Agreement
+ https://www.aklivity.io/aklivity-community-license/
+ repo
+
+
+
+
+ 11
+ 11
+ 0.90
+ 0
+
+
+
+
+ ${project.groupId}
+ validator-protobuf.spec
+ ${project.version}
+ provided
+
+
+ ${project.groupId}
+ engine
+ ${project.version}
+ provided
+
+
+ com.google.protobuf
+ protobuf-java
+ 3.24.4
+
+
+ com.google.protobuf
+ protobuf-java-util
+ 3.24.4
+
+
+ org.antlr
+ antlr4-runtime
+ provided
+
+
+ ${project.groupId}
+ engine
+ test-jar
+ ${project.version}
+ test
+
+
+ org.kaazing
+ k3po.junit
+ test
+
+
+ org.kaazing
+ k3po.lang
+ test
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+
+
+
+
+ org.jasig.maven
+ maven-notice-plugin
+
+
+ com.mycila
+ license-maven-plugin
+
+
+ maven-checkstyle-plugin
+
+
+ org.antlr
+ antlr4-maven-plugin
+
+
+ maven-dependency-plugin
+
+
+ process-resources
+
+ unpack
+
+
+
+
+ ${project.groupId}
+ validator-protobuf.spec
+
+
+ ^\Qio/aklivity/zilla/specs/validator/protobuf/\E
+ io/aklivity/zilla/runtime/validator/protobuf/
+
+
+
+
+ io/aklivity/zilla/specs/validator/protobuf/schema/protobuf.schema.patch.json
+ ${project.build.directory}/classes
+
+
+
+ unpack-proto
+ generate-sources
+
+ unpack
+
+
+
+
+ ${project.groupId}
+ validator-protobuf.spec
+ ${project.version}
+ ${basedir}/target/test-classes
+ **\/*.proto
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+ org.moditect
+ moditect-maven-plugin
+
+
+ org.apache.maven.plugins
+ maven-failsafe-plugin
+
+
+ org.jacoco
+ jacoco-maven-plugin
+
+
+ io/aklivity/zilla/runtime/validator/protobuf/internal/parser/**/*.class
+
+
+
+ BUNDLE
+
+
+ INSTRUCTION
+ COVEREDRATIO
+ ${jacoco.coverage.ratio}
+
+
+ CLASS
+ MISSEDCOUNT
+ ${jacoco.missed.count}
+
+
+
+
+
+
+
+ org.kaazing
+ k3po-maven-plugin
+
+
+ ${project.groupId}
+ engine
+ ${project.version}
+ test-jar
+
+
+ ${project.groupId}
+ engine
+ ${project.version}
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/incubator/validator-protobuf/src/main/antlr4/io/aklivity/zilla/runtime/validator/protobuf/internal/parser/Protobuf3.g4 b/incubator/validator-protobuf/src/main/antlr4/io/aklivity/zilla/runtime/validator/protobuf/internal/parser/Protobuf3.g4
new file mode 100644
index 0000000000..ce9835dbee
--- /dev/null
+++ b/incubator/validator-protobuf/src/main/antlr4/io/aklivity/zilla/runtime/validator/protobuf/internal/parser/Protobuf3.g4
@@ -0,0 +1,400 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc
+ *
+ * Licensed under the Aklivity Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * https://www.aklivity.io/aklivity-community-license/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+grammar Protobuf3;
+
+proto
+ : syntax
+ (
+ importStatement
+ | packageStatement
+ | optionStatement
+ | topLevelDef
+ | emptyStatement_
+ )* EOF
+ ;
+
+// Syntax
+
+syntax
+ : SYNTAX EQ (PROTO3_LIT_SINGLE | PROTO3_LIT_DOUBLE) SEMI
+ ;
+
+// Import Statement
+
+importStatement
+ : IMPORT ( WEAK | PUBLIC )? strLit SEMI
+ ;
+
+// Package
+
+packageStatement
+ : PACKAGE fullIdent SEMI
+ ;
+
+// Option
+
+optionStatement
+ : OPTION optionName EQ constant SEMI
+ ;
+
+optionName
+ : fullIdent
+ | LP fullIdent RP ( DOT fullIdent )?
+ ;
+
+// Normal Field
+fieldLabel
+ : OPTIONAL | REPEATED
+ ;
+
+field
+ : fieldLabel? type_ fieldName EQ fieldNumber ( LB fieldOptions RB )? SEMI
+ ;
+
+fieldOptions
+ : fieldOption ( COMMA fieldOption )*
+ ;
+
+fieldOption
+ : optionName EQ constant
+ ;
+
+fieldNumber
+ : intLit
+ ;
+
+// Oneof and oneof field
+
+oneof
+ : ONEOF oneofName LC ( optionStatement | oneofField | emptyStatement_ )* RC
+ ;
+
+oneofField
+ : type_ fieldName EQ fieldNumber ( LB fieldOptions RB )? SEMI
+ ;
+
+// Map field
+
+mapField
+ : MAP LT keyType COMMA type_ GT mapName
+ EQ fieldNumber ( LB fieldOptions RB )? SEMI
+ ;
+keyType
+ : INT32
+ | INT64
+ | UINT32
+ | UINT64
+ | SINT32
+ | SINT64
+ | FIXED32
+ | FIXED64
+ | SFIXED32
+ | SFIXED64
+ | BOOL
+ | STRING
+ ;
+
+// field types
+
+type_
+ : DOUBLE
+ | FLOAT
+ | INT32
+ | INT64
+ | UINT32
+ | UINT64
+ | SINT32
+ | SINT64
+ | FIXED32
+ | FIXED64
+ | SFIXED32
+ | SFIXED64
+ | BOOL
+ | STRING
+ | BYTES
+ | messageType
+ | enumType
+ ;
+
+// Reserved
+
+reserved
+ : RESERVED ( ranges | reservedFieldNames ) SEMI
+ ;
+
+ranges
+ : range_ ( COMMA range_ )*
+ ;
+
+range_
+ : intLit ( TO ( intLit | MAX ) )?
+ ;
+
+reservedFieldNames
+ : strLit ( COMMA strLit )*
+ ;
+
+// Top Level definitions
+
+topLevelDef
+ : messageDef
+ | enumDef
+ | extendDef
+ | serviceDef
+ ;
+
+// enum
+
+enumDef
+ : ENUM enumName enumBody
+ ;
+
+enumBody
+ : LC enumElement* RC
+ ;
+
+enumElement
+ : optionStatement
+ | enumField
+ | emptyStatement_
+ ;
+
+enumField
+ : ident EQ ( MINUS )? intLit enumValueOptions?SEMI
+ ;
+
+enumValueOptions
+ : LB enumValueOption ( COMMA enumValueOption )* RB
+ ;
+
+enumValueOption
+ : optionName EQ constant
+ ;
+
+// message
+
+messageDef
+ : MESSAGE messageName messageBody
+ ;
+
+messageBody
+ : LC messageElement* RC
+ ;
+
+messageElement
+ : field
+ | enumDef
+ | messageDef
+ | extendDef
+ | optionStatement
+ | oneof
+ | mapField
+ | reserved
+ | emptyStatement_
+ ;
+
+// Extend definition
+//
+// NB: not defined in the spec but supported by protoc and covered by protobuf3 tests
+// see e.g. php/tests/proto/test_import_descriptor_proto.proto
+// of https://github.com/protocolbuffers/protobuf
+// it also was discussed here: https://github.com/protocolbuffers/protobuf/issues/4610
+
+extendDef
+ : EXTEND messageType LC ( field
+ | emptyStatement_
+ )* RC
+ ;
+
+// service
+
+serviceDef
+ : SERVICE serviceName LC serviceElement* RC
+ ;
+
+serviceElement
+ : optionStatement
+ | rpc
+ | emptyStatement_
+ ;
+
+rpc
+ : RPC rpcName LP ( clientStreaming=STREAM )? messageType RP
+ RETURNS LP ( serverStreaming=STREAM )? messageType RP
+ (LC ( optionStatement | emptyStatement_ )* RC | SEMI)
+ ;
+
+// lexical
+
+constant
+ : fullIdent
+ | (MINUS | PLUS )? intLit
+ | ( MINUS | PLUS )? floatLit
+ | strLit
+ | boolLit
+ | blockLit
+ ;
+
+// not specified in specification but used in tests
+blockLit
+ : LC ( ident COLON constant )* RC
+ ;
+
+emptyStatement_: SEMI;
+
+// Lexical elements
+
+ident: IDENTIFIER | keywords;
+fullIdent: ident ( DOT ident )*;
+messageName: ident;
+enumName: ident;
+fieldName: ident;
+oneofName: ident;
+mapName: ident;
+serviceName: ident;
+rpcName: ident;
+messageType: ( DOT )? ( ident DOT )* messageName;
+enumType: ( DOT )? ( ident DOT )* enumName;
+
+intLit: INT_LIT;
+strLit: STR_LIT | PROTO3_LIT_SINGLE | PROTO3_LIT_DOBULE;
+boolLit: BOOL_LIT;
+floatLit: FLOAT_LIT;
+
+// keywords
+SYNTAX: 'syntax';
+IMPORT: 'import';
+WEAK: 'weak';
+PUBLIC: 'public';
+PACKAGE: 'package';
+OPTION: 'option';
+OPTIONAL: 'optional';
+REPEATED: 'repeated';
+ONEOF: 'oneof';
+MAP: 'map';
+INT32: 'int32';
+INT64: 'int64';
+UINT32: 'uint32';
+UINT64: 'uint64';
+SINT32: 'sint32';
+SINT64: 'sint64';
+FIXED32: 'fixed32';
+FIXED64: 'fixed64';
+SFIXED32: 'sfixed32';
+SFIXED64: 'sfixed64';
+BOOL: 'bool';
+STRING: 'string';
+DOUBLE: 'double';
+FLOAT: 'float';
+BYTES: 'bytes';
+RESERVED: 'reserved';
+TO: 'to';
+MAX: 'max';
+ENUM: 'enum';
+MESSAGE: 'message';
+SERVICE: 'service';
+EXTEND: 'extend';
+RPC: 'rpc';
+STREAM: 'stream';
+RETURNS: 'returns';
+
+PROTO3_LIT_SINGLE: '"proto3"';
+PROTO3_LIT_DOBULE: '\'proto3\'';
+
+// symbols
+
+SEMI: ';';
+EQ: '=';
+LP: '(';
+RP: ')';
+LB: '[';
+RB: ']';
+LC: '{';
+RC: '}';
+LT: '<';
+GT: '>';
+DOT: '.';
+COMMA: ',';
+COLON: ':';
+PLUS: '+';
+MINUS: '-';
+
+STR_LIT: ( '\'' ( CHAR_VALUE )*? '\'' ) | ( '"' ( CHAR_VALUE )*? '"' );
+fragment CHAR_VALUE: HEX_ESCAPE | OCT_ESCAPE | CHAR_ESCAPE | ~[\u0000\n\\];
+fragment HEX_ESCAPE: '\\' ( 'x' | 'X' ) HEX_DIGIT HEX_DIGIT;
+fragment OCT_ESCAPE: '\\' OCTAL_DIGIT OCTAL_DIGIT OCTAL_DIGIT;
+fragment CHAR_ESCAPE: '\\' ( 'a' | 'b' | 'f' | 'n' | 'r' | 't' | 'v' | '\\' | '\'' | '"' );
+
+BOOL_LIT: 'true' | 'false';
+
+FLOAT_LIT : ( DECIMALS DOT DECIMALS? EXPONENT? | DECIMALS EXPONENT | DOT DECIMALS EXPONENT? ) | 'inf' | 'nan';
+fragment EXPONENT : ( 'e' | 'E' ) (PLUS | MINUS)? DECIMALS;
+fragment DECIMALS : DECIMAL_DIGIT+;
+
+INT_LIT : DECIMAL_LIT | OCTAL_LIT | HEX_LIT;
+fragment DECIMAL_LIT : ( [1-9] ) DECIMAL_DIGIT*;
+fragment OCTAL_LIT : '0' OCTAL_DIGIT*;
+fragment HEX_LIT : '0' ( 'x' | 'X' ) HEX_DIGIT+ ;
+
+IDENTIFIER: LETTER ( LETTER | DECIMAL_DIGIT )*;
+
+fragment LETTER: [A-Za-z_];
+fragment DECIMAL_DIGIT: [0-9];
+fragment OCTAL_DIGIT: [0-7];
+fragment HEX_DIGIT: [0-9A-Fa-f];
+
+// comments
+WS : [ \t\r\n\u000C]+ -> skip;
+LINE_COMMENT: '//' ~[\r\n]* -> channel(HIDDEN);
+COMMENT: '/*' .*? '*/' -> channel(HIDDEN);
+
+keywords
+ : SYNTAX
+ | IMPORT
+ | WEAK
+ | PUBLIC
+ | PACKAGE
+ | OPTION
+ | OPTIONAL
+ | REPEATED
+ | ONEOF
+ | MAP
+ | INT32
+ | INT64
+ | UINT32
+ | UINT64
+ | SINT32
+ | SINT64
+ | FIXED32
+ | FIXED64
+ | SFIXED32
+ | SFIXED64
+ | BOOL
+ | STRING
+ | DOUBLE
+ | FLOAT
+ | BYTES
+ | RESERVED
+ | TO
+ | MAX
+ | ENUM
+ | MESSAGE
+ | SERVICE
+ | EXTEND
+ | RPC
+ | STREAM
+ | RETURNS
+ | BOOL_LIT
+ ;
diff --git a/incubator/validator-protobuf/src/main/java/io/aklivity/zilla/runtime/validator/protobuf/DescriptorTree.java b/incubator/validator-protobuf/src/main/java/io/aklivity/zilla/runtime/validator/protobuf/DescriptorTree.java
new file mode 100644
index 0000000000..fcf57c1c39
--- /dev/null
+++ b/incubator/validator-protobuf/src/main/java/io/aklivity/zilla/runtime/validator/protobuf/DescriptorTree.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc
+ *
+ * Licensed under the Aklivity Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * https://www.aklivity.io/aklivity-community-license/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.aklivity.zilla.runtime.validator.protobuf;
+
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FileDescriptor;
+
+public class DescriptorTree
+{
+ protected final Map children;
+ protected final List indexes;
+
+ protected Descriptors.Descriptor descriptor;
+ protected String name;
+
+ private DescriptorTree()
+ {
+ this.children = new LinkedHashMap<>();
+ this.indexes = new LinkedList<>();
+ }
+
+ protected DescriptorTree(
+ FileDescriptor fileDescriptors)
+ {
+ this();
+ this.name = fileDescriptors.getPackage();
+ for (Descriptor descriptor : fileDescriptors.getMessageTypes())
+ {
+ addDescriptor(descriptor);
+ addNestedDescriptors(descriptor);
+ }
+ }
+
+ protected DescriptorTree findByName(
+ String path)
+ {
+ DescriptorTree current = this;
+ int start = 0;
+ int end;
+
+ while (start < path.length())
+ {
+ end = path.indexOf('.', start);
+ if (end == -1)
+ {
+ end = path.length();
+ }
+
+ String part = path.substring(start, end);
+ current = current.children.get(part);
+
+ if (current == null)
+ {
+ break;
+ }
+ start = end + 1;
+ }
+ return current;
+ }
+
+ protected Descriptor findByIndexes(
+ List indexes)
+ {
+ DescriptorTree current = this;
+
+ for (Integer index : indexes)
+ {
+ current = current.findChild(index);
+ if (current == null)
+ {
+ break;
+ }
+ }
+ return current != null ? current.descriptor : null;
+ }
+
+ private DescriptorTree findParent(
+ String path)
+ {
+ int index = path.lastIndexOf('.');
+ String part = index >= 0 ? path.substring(index + 1) : path;
+ return this.children.getOrDefault(part, null);
+ }
+
+ private DescriptorTree findChild(
+ int index)
+ {
+ DescriptorTree tree = this;
+ int currentIndex = 0;
+ for (Map.Entry entry : children.entrySet())
+ {
+ if (currentIndex == index)
+ {
+ tree = entry.getValue();
+ break;
+ }
+ currentIndex++;
+ }
+ return tree;
+ }
+
+ private void addNestedDescriptor(
+ Descriptor parent,
+ int index)
+ {
+ DescriptorTree parentNode = findParent(parent.getFullName());
+ if (parentNode != null)
+ {
+ Descriptors.Descriptor nestedDescriptor = parent.getNestedTypes().get(index);
+ parentNode.addDescriptor(nestedDescriptor);
+ parentNode.addNestedDescriptors(nestedDescriptor);
+ }
+ }
+
+ private void addDescriptor(
+ Descriptor descriptor)
+ {
+ DescriptorTree node = new DescriptorTree();
+ node.descriptor = descriptor;
+ node.name = name;
+ node.indexes.addAll(this.indexes);
+ node.indexes.add(this.children.size());
+ this.children.put(descriptor.getName(), node);
+ }
+
+ private void addNestedDescriptors(Descriptor descriptor)
+ {
+ for (int i = 0; i < descriptor.getNestedTypes().size(); i++)
+ {
+ addNestedDescriptor(descriptor, i);
+ }
+ }
+}
diff --git a/incubator/validator-protobuf/src/main/java/io/aklivity/zilla/runtime/validator/protobuf/ProtoListener.java b/incubator/validator-protobuf/src/main/java/io/aklivity/zilla/runtime/validator/protobuf/ProtoListener.java
new file mode 100644
index 0000000000..8ab28e564d
--- /dev/null
+++ b/incubator/validator-protobuf/src/main/java/io/aklivity/zilla/runtime/validator/protobuf/ProtoListener.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc
+ *
+ * Licensed under the Aklivity Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * https://www.aklivity.io/aklivity-community-license/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.aklivity.zilla.runtime.validator.protobuf;
+
+import static java.util.Map.entry;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import com.google.protobuf.DescriptorProtos;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
+
+import io.aklivity.zilla.runtime.validator.protobuf.internal.parser.Protobuf3BaseListener;
+import io.aklivity.zilla.runtime.validator.protobuf.internal.parser.Protobuf3Parser;
+
+public class ProtoListener extends Protobuf3BaseListener
+{
+ private static final Map TYPES = Map.ofEntries(
+ entry("double", Type.TYPE_DOUBLE),
+ entry("float", Type.TYPE_FLOAT),
+ entry("int32", Type.TYPE_INT32),
+ entry("int64", Type.TYPE_INT64),
+ entry("uint32", Type.TYPE_UINT32),
+ entry("uint64", Type.TYPE_UINT64),
+ entry("sint32", Type.TYPE_SINT32),
+ entry("sint64", Type.TYPE_SINT64),
+ entry("fixed32", Type.TYPE_FIXED32),
+ entry("fixed64", Type.TYPE_FIXED64),
+ entry("sfixed32", Type.TYPE_SFIXED32),
+ entry("sfixed64", Type.TYPE_SFIXED64),
+ entry("bool", Type.TYPE_BOOL),
+ entry("string", Type.TYPE_STRING),
+ entry("bytes", Type.TYPE_BYTES)
+ );
+
+ private static final Map LABELS = Map.ofEntries(
+ entry("optional", Label.LABEL_OPTIONAL),
+ entry("required", Label.LABEL_REQUIRED),
+ entry("repeated", Label.LABEL_REPEATED)
+ );
+
+ private String packageName;
+ private List imports;
+ private final FileDescriptorProto.Builder builder;
+ private Stack messageHierarchy = new Stack<>();
+
+ public ProtoListener()
+ {
+ this.imports = new ArrayList<>();
+ this.builder = FileDescriptorProto.newBuilder();
+ }
+
+ @Override
+ public void enterSyntax(
+ Protobuf3Parser.SyntaxContext ctx)
+ {
+ builder.setSyntax(ctx.getChild(2).getText());
+ }
+
+ @Override
+ public void enterPackageStatement(
+ Protobuf3Parser.PackageStatementContext ctx)
+ {
+ packageName = ctx.fullIdent().getText();
+ builder.setPackage(packageName);
+ }
+
+ @Override
+ public void enterImportStatement(
+ Protobuf3Parser.ImportStatementContext ctx)
+ {
+ String importStatement = ctx.strLit().getText();
+ imports.add(importStatement);
+ System.out.println("Import statements are currently not supported");
+ }
+
+ @Override
+ public void enterMessageDef(
+ Protobuf3Parser.MessageDefContext ctx)
+ {
+ DescriptorProto.Builder builder = DescriptorProto.newBuilder();
+ String name = ctx.messageName().getText();
+ builder.setName(name);
+ messageHierarchy.push(name);
+
+ for (Protobuf3Parser.MessageElementContext element : ctx.messageBody().messageElement())
+ {
+ if (element.field() != null)
+ {
+ builder.addField(processFieldElement(element.field()));
+ }
+ if (element.messageDef() != null)
+ {
+ builder.addNestedType(processNestedMessage(element.messageDef()));
+ }
+ }
+ if (messageHierarchy.size() == 1)
+ {
+ this.builder.addMessageType(builder.build());
+ builder.clear();
+ }
+ }
+
+ @Override
+ public void exitMessageDef(
+ Protobuf3Parser.MessageDefContext ctx)
+ {
+ messageHierarchy.pop();
+ }
+
+ public DescriptorProtos.FileDescriptorProto build()
+ {
+ return builder.build();
+ }
+
+ private DescriptorProto processNestedMessage(
+ Protobuf3Parser.MessageDefContext ctx)
+ {
+ DescriptorProto.Builder builder = DescriptorProto.newBuilder();
+ String name = ctx.messageName().getText();
+ builder.setName(name);
+
+ for (Protobuf3Parser.MessageElementContext element : ctx.messageBody().messageElement())
+ {
+ if (element.field() != null)
+ {
+ builder.addField(processFieldElement(element.field()));
+ }
+ if (element.messageDef() != null)
+ {
+ builder.addNestedType(processNestedMessage(element.messageDef()));
+ }
+ }
+ return builder.build();
+ }
+
+ private FieldDescriptorProto processFieldElement(
+ Protobuf3Parser.FieldContext ctx)
+ {
+ FieldDescriptorProto.Builder builder = FieldDescriptorProto.newBuilder();
+ String type = ctx.type_().getText();
+ String name = ctx.fieldName().getText();
+ String label = ctx.fieldLabel() != null ? ctx.fieldLabel().getText() : null;
+ int number = Integer.parseInt(ctx.fieldNumber().getText());
+
+ builder.setName(name);
+ builder.setNumber(number);
+ if (label != null)
+ {
+ builder.setLabel(LABELS.get(label));
+ }
+ if (TYPES.containsKey(type))
+ {
+ builder.setType(TYPES.get(type));
+ }
+ else
+ {
+ builder.setTypeName(type);
+ }
+ return builder.build();
+ }
+}
diff --git a/incubator/validator-protobuf/src/main/java/io/aklivity/zilla/runtime/validator/protobuf/ProtobufReadValidator.java b/incubator/validator-protobuf/src/main/java/io/aklivity/zilla/runtime/validator/protobuf/ProtobufReadValidator.java
new file mode 100644
index 0000000000..8fc7d49d93
--- /dev/null
+++ b/incubator/validator-protobuf/src/main/java/io/aklivity/zilla/runtime/validator/protobuf/ProtobufReadValidator.java
@@ -0,0 +1,179 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc
+ *
+ * Licensed under the Aklivity Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * https://www.aklivity.io/aklivity-community-license/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.aklivity.zilla.runtime.validator.protobuf;
+
+import static io.aklivity.zilla.runtime.engine.catalog.CatalogHandler.NO_SCHEMA_ID;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.function.LongFunction;
+
+import org.agrona.DirectBuffer;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.util.JsonFormat;
+
+import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
+import io.aklivity.zilla.runtime.engine.validator.FragmentValidator;
+import io.aklivity.zilla.runtime.engine.validator.ValueValidator;
+import io.aklivity.zilla.runtime.engine.validator.function.FragmentConsumer;
+import io.aklivity.zilla.runtime.engine.validator.function.ValueConsumer;
+import io.aklivity.zilla.runtime.validator.protobuf.config.ProtobufValidatorConfig;
+
+public class ProtobufReadValidator extends ProtobufValidator implements ValueValidator, FragmentValidator
+{
+ private final JsonFormat.Printer printer;
+ private final OutputStreamWriter output;
+
+ public ProtobufReadValidator(
+ ProtobufValidatorConfig config,
+ LongFunction supplyCatalog)
+ {
+ super(config, supplyCatalog);
+ this.printer = JsonFormat.printer()
+ .omittingInsignificantWhitespace()
+ .preservingProtoFieldNames()
+ .includingDefaultValueFields();
+ this.output = new OutputStreamWriter(out);
+ }
+
+ @Override
+ public int padding(
+ DirectBuffer data,
+ int index,
+ int length)
+ {
+ int padding = 0;
+ if (FORMAT_JSON.equals(format))
+ {
+ int schemaId = handler.resolve(data, index, length);
+
+ if (schemaId == NO_SCHEMA_ID)
+ {
+ schemaId = catalog.id != NO_SCHEMA_ID
+ ? catalog.id
+ : handler.resolve(subject, catalog.version);
+ }
+ padding = supplyJsonFormatPadding(schemaId);
+ }
+ return padding;
+ }
+
+ @Override
+ public int validate(
+ DirectBuffer data,
+ int index,
+ int length,
+ ValueConsumer next)
+ {
+ return validateComplete(data, index, length, next);
+ }
+
+ @Override
+ public int validate(
+ int flags,
+ DirectBuffer data,
+ int index,
+ int length,
+ FragmentConsumer next)
+ {
+ return (flags & FLAGS_FIN) != 0x00
+ ? validateComplete(data, index, length, (b, i, l) -> next.accept(FLAGS_COMPLETE, b, i, l))
+ : 0;
+ }
+
+ private int validateComplete(
+ DirectBuffer data,
+ int index,
+ int length,
+ ValueConsumer next)
+ {
+ return handler.decode(data, index, length, next, this::decodePayload);
+ }
+
+ private int decodePayload(
+ int schemaId,
+ DirectBuffer data,
+ int index,
+ int length,
+ ValueConsumer next)
+ {
+ if (schemaId == NO_SCHEMA_ID)
+ {
+ if (catalog.id != NO_SCHEMA_ID)
+ {
+ schemaId = catalog.id;
+ }
+ else
+ {
+ schemaId = handler.resolve(subject, catalog.version);
+ }
+ }
+
+ int progress = decodeIndexes(data, index, length);
+
+ return validate(schemaId, data, index + progress, length - progress, next);
+ }
+
+ private int validate(
+ int schemaId,
+ DirectBuffer data,
+ int index,
+ int length,
+ ValueConsumer next)
+ {
+ int valLength = -1;
+ DescriptorTree tree = supplyDescriptorTree(schemaId);
+ if (tree != null)
+ {
+ Descriptors.Descriptor descriptor = tree.findByIndexes(indexes);
+ if (descriptor != null)
+ {
+ in.wrap(data, index, length);
+ DynamicMessage.Builder builder = supplyDynamicMessageBuilder(descriptor);
+ validate:
+ try
+ {
+ DynamicMessage message = builder.mergeFrom(in).build();
+ builder.clear();
+ if (!message.getUnknownFields().asMap().isEmpty())
+ {
+ break validate;
+ }
+
+ if (FORMAT_JSON.equals(format))
+ {
+ out.wrap(out.buffer());
+ printer.appendTo(message, output);
+ output.flush();
+ valLength = out.position();
+ next.accept(out.buffer(), 0, valLength);
+ }
+ else
+ {
+ next.accept(data, index, length);
+ valLength = length;
+ }
+ }
+ catch (IOException ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+ }
+ return valLength;
+ }
+}
diff --git a/incubator/validator-protobuf/src/main/java/io/aklivity/zilla/runtime/validator/protobuf/ProtobufValidator.java b/incubator/validator-protobuf/src/main/java/io/aklivity/zilla/runtime/validator/protobuf/ProtobufValidator.java
new file mode 100644
index 0000000000..a699cc457e
--- /dev/null
+++ b/incubator/validator-protobuf/src/main/java/io/aklivity/zilla/runtime/validator/protobuf/ProtobufValidator.java
@@ -0,0 +1,273 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc
+ *
+ * Licensed under the Aklivity Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * https://www.aklivity.io/aklivity-community-license/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.aklivity.zilla.runtime.validator.protobuf;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.LongFunction;
+
+import org.agrona.BitUtil;
+import org.agrona.DirectBuffer;
+import org.agrona.ExpandableDirectByteBuffer;
+import org.agrona.collections.Int2IntHashMap;
+import org.agrona.collections.Int2ObjectCache;
+import org.agrona.collections.Object2ObjectHashMap;
+import org.agrona.io.DirectBufferInputStream;
+import org.agrona.io.ExpandableDirectBufferOutputStream;
+import org.antlr.v4.runtime.BailErrorStrategy;
+import org.antlr.v4.runtime.CharStream;
+import org.antlr.v4.runtime.CharStreams;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.tree.ParseTreeWalker;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.DescriptorValidationException;
+import com.google.protobuf.Descriptors.FileDescriptor;
+import com.google.protobuf.DynamicMessage;
+
+import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
+import io.aklivity.zilla.runtime.engine.config.CatalogedConfig;
+import io.aklivity.zilla.runtime.engine.config.SchemaConfig;
+import io.aklivity.zilla.runtime.validator.protobuf.config.ProtobufValidatorConfig;
+import io.aklivity.zilla.runtime.validator.protobuf.internal.parser.Protobuf3Lexer;
+import io.aklivity.zilla.runtime.validator.protobuf.internal.parser.Protobuf3Parser;
+
+public class ProtobufValidator
+{
+ protected static final byte[] ZERO_INDEX = new byte[]{0x0};
+ protected static final String FORMAT_JSON = "json";
+
+ private static final int JSON_FIELD_STRUCTURE_LENGTH = "\"\":\"\",".length();
+ private static final int JSON_OBJECT_CURLY_BRACES = 2;
+
+ protected final SchemaConfig catalog;
+ protected final CatalogHandler handler;
+ protected final String subject;
+ protected final String format;
+ protected final List indexes;
+ protected final DirectBufferInputStream in;
+ protected final ExpandableDirectBufferOutputStream out;
+
+ private final Int2ObjectCache descriptors;
+ private final Int2ObjectCache tree;
+ private final Object2ObjectHashMap builders;
+ private final FileDescriptor[] dependencies;
+ private final Int2IntHashMap paddings;
+
+ protected ProtobufValidator(
+ ProtobufValidatorConfig config,
+ LongFunction supplyCatalog)
+ {
+ CatalogedConfig cataloged = config.cataloged.get(0);
+ this.handler = supplyCatalog.apply(cataloged.id);
+ this.catalog = cataloged.schemas.size() != 0 ? cataloged.schemas.get(0) : null;
+ this.subject = catalog != null && catalog.subject != null
+ ? catalog.subject
+ : config.subject;
+ this.format = config.format;
+ this.descriptors = new Int2ObjectCache<>(1, 1024, i -> {});
+ this.tree = new Int2ObjectCache<>(1, 1024, i -> {});
+ this.builders = new Object2ObjectHashMap<>();
+ this.in = new DirectBufferInputStream();
+ this.dependencies = new FileDescriptor[0];
+ this.indexes = new LinkedList<>();
+ this.paddings = new Int2IntHashMap(-1);
+ this.out = new ExpandableDirectBufferOutputStream(new ExpandableDirectByteBuffer());
+ }
+
+ protected FileDescriptor supplyDescriptor(
+ int schemaId)
+ {
+ return descriptors.computeIfAbsent(schemaId, this::createDescriptors);
+ }
+
+ protected DescriptorTree supplyDescriptorTree(
+ int schemaId)
+ {
+ return tree.computeIfAbsent(schemaId, this::createDescriptorTree);
+ }
+
+ protected byte[] encodeIndexes()
+ {
+ int size = indexes.size();
+
+ byte[] indexes = new byte[size * 5];
+
+ int index = 0;
+ for (int i = 0; i < size; i++)
+ {
+ int entry = this.indexes.get(i);
+ int value = (entry << 1) ^ (entry >> 31);
+ while ((value & ~0x7F) != 0)
+ {
+ indexes[index++] = (byte) ((value & 0x7F) | 0x80);
+ value >>>= 7;
+ }
+ indexes[index++] = (byte) value;
+ }
+
+ return Arrays.copyOf(indexes, index);
+ }
+
+ protected int decodeIndexes(
+ DirectBuffer data,
+ int index,
+ int length)
+ {
+ int progress = 0;
+ indexes.clear();
+ int encodedLength = decodeIndex(data.getByte(index));
+ progress += BitUtil.SIZE_OF_BYTE;
+ if (encodedLength == 0)
+ {
+ indexes.add(encodedLength);
+ }
+ for (int i = 0; i < encodedLength; i++)
+ {
+ indexes.add(decodeIndex(data.getByte(index + progress)));
+ progress += BitUtil.SIZE_OF_BYTE;
+ }
+ return progress;
+ }
+
+ protected int supplyIndexPadding(
+ int schemaId)
+ {
+ return paddings.computeIfAbsent(schemaId, this::calculateIndexPadding);
+ }
+
+ protected int supplyJsonFormatPadding(
+ int schemaId)
+ {
+ return paddings.computeIfAbsent(schemaId, id -> calculateJsonFormatPadding(supplyDescriptor(id)));
+ }
+
+ protected DynamicMessage.Builder supplyDynamicMessageBuilder(
+ Descriptors.Descriptor descriptor)
+ {
+ DynamicMessage.Builder builder;
+ if (builders.containsKey(descriptor.getFullName()))
+ {
+ builder = builders.get(descriptor.getFullName());
+ }
+ else
+ {
+ builder = createDynamicMessageBuilder(descriptor);
+ builders.put(descriptor.getFullName(), builder);
+ }
+ return builder;
+ }
+
+ private DynamicMessage.Builder createDynamicMessageBuilder(
+ Descriptors.Descriptor descriptor)
+ {
+ return DynamicMessage.newBuilder(descriptor);
+ }
+
+ private int decodeIndex(
+ byte encodedByte)
+ {
+ int result = 0;
+ int shift = 0;
+ do
+ {
+ result |= (encodedByte & 0x7F) << shift;
+ shift += 7;
+ }
+ while ((encodedByte & 0x80) != 0);
+ return (result >>> 1) ^ -(result & 1);
+ }
+
+ private int calculateIndexPadding(
+ int schemaId)
+ {
+ int padding = 0;
+ DescriptorTree trees = supplyDescriptorTree(schemaId);
+ if (trees != null && catalog.record != null)
+ {
+ DescriptorTree tree = trees.findByName(catalog.record);
+ if (tree != null)
+ {
+ padding = tree.indexes.size() + 1;
+ }
+ }
+ return padding;
+ }
+
+ private int calculateJsonFormatPadding(
+ FileDescriptor descriptor)
+ {
+ int padding = 0;
+
+ if (descriptor != null)
+ {
+ for (Descriptors.Descriptor message : descriptor.getMessageTypes())
+ {
+ padding += JSON_OBJECT_CURLY_BRACES;
+ for (Descriptors.FieldDescriptor field : message.getFields())
+ {
+ padding += field.getName().getBytes().length + JSON_FIELD_STRUCTURE_LENGTH;
+ }
+ }
+
+ }
+ return padding;
+ }
+
+ private FileDescriptor createDescriptors(
+ int schemaId)
+ {
+ FileDescriptor descriptor = null;
+
+ String schemaText = handler.resolve(schemaId);
+ if (schemaText != null)
+ {
+ CharStream input = CharStreams.fromString(schemaText);
+ Protobuf3Lexer lexer = new Protobuf3Lexer(input);
+ CommonTokenStream tokens = new CommonTokenStream(lexer);
+
+ Protobuf3Parser parser = new Protobuf3Parser(tokens);
+ parser.setErrorHandler(new BailErrorStrategy());
+ ParseTreeWalker walker = new ParseTreeWalker();
+
+ ProtoListener listener = new ProtoListener();
+ walker.walk(listener, parser.proto());
+
+ try
+ {
+ descriptor = FileDescriptor.buildFrom(listener.build(), dependencies);
+ }
+ catch (DescriptorValidationException ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+ return descriptor;
+ }
+
+ private DescriptorTree createDescriptorTree(
+ int schemaId)
+ {
+ DescriptorTree tree = null;
+ FileDescriptor descriptor = supplyDescriptor(schemaId);
+
+ if (descriptor != null)
+ {
+ tree = new DescriptorTree(descriptor);
+ }
+ return tree;
+ }
+}
diff --git a/incubator/validator-protobuf/src/main/java/io/aklivity/zilla/runtime/validator/protobuf/ProtobufValidatorFactory.java b/incubator/validator-protobuf/src/main/java/io/aklivity/zilla/runtime/validator/protobuf/ProtobufValidatorFactory.java
new file mode 100644
index 0000000000..6b6bd34b37
--- /dev/null
+++ b/incubator/validator-protobuf/src/main/java/io/aklivity/zilla/runtime/validator/protobuf/ProtobufValidatorFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc
+ *
+ * Licensed under the Aklivity Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * https://www.aklivity.io/aklivity-community-license/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.aklivity.zilla.runtime.validator.protobuf;
+
+import java.net.URL;
+import java.util.function.LongFunction;
+
+import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
+import io.aklivity.zilla.runtime.engine.config.ValidatorConfig;
+import io.aklivity.zilla.runtime.engine.validator.FragmentValidator;
+import io.aklivity.zilla.runtime.engine.validator.ValidatorFactorySpi;
+import io.aklivity.zilla.runtime.engine.validator.ValueValidator;
+import io.aklivity.zilla.runtime.validator.protobuf.config.ProtobufValidatorConfig;
+
+public final class ProtobufValidatorFactory implements ValidatorFactorySpi
+{
+ @Override
+ public String type()
+ {
+ return "protobuf";
+ }
+
+ public URL schema()
+ {
+ return getClass().getResource("schema/protobuf.schema.patch.json");
+ }
+
+ @Override
+ public ValueValidator createValueReader(
+ ValidatorConfig config,
+ LongFunction supplyCatalog)
+ {
+ return createReader(config, supplyCatalog);
+ }
+
+ @Override
+ public ValueValidator createValueWriter(
+ ValidatorConfig config,
+ LongFunction supplyCatalog)
+ {
+ return createWriter(config, supplyCatalog);
+ }
+
+ @Override
+ public FragmentValidator createFragmentReader(
+ ValidatorConfig config,
+ LongFunction supplyCatalog)
+ {
+ return createReader(config, supplyCatalog);
+ }
+
+ @Override
+ public FragmentValidator createFragmentWriter(
+ ValidatorConfig config,
+ LongFunction supplyCatalog)
+ {
+ return createWriter(config, supplyCatalog);
+ }
+
+ private ProtobufReadValidator createReader(
+ ValidatorConfig config,
+ LongFunction supplyCatalog)
+ {
+ return new ProtobufReadValidator(ProtobufValidatorConfig.class.cast(config), supplyCatalog);
+ }
+
+ private ProtobufWriteValidator createWriter(
+ ValidatorConfig config,
+ LongFunction supplyCatalog)
+ {
+ return new ProtobufWriteValidator(ProtobufValidatorConfig.class.cast(config), supplyCatalog);
+ }
+}
diff --git a/incubator/validator-protobuf/src/main/java/io/aklivity/zilla/runtime/validator/protobuf/ProtobufWriteValidator.java b/incubator/validator-protobuf/src/main/java/io/aklivity/zilla/runtime/validator/protobuf/ProtobufWriteValidator.java
new file mode 100644
index 0000000000..2a785c7d74
--- /dev/null
+++ b/incubator/validator-protobuf/src/main/java/io/aklivity/zilla/runtime/validator/protobuf/ProtobufWriteValidator.java
@@ -0,0 +1,210 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc
+ *
+ * Licensed under the Aklivity Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * https://www.aklivity.io/aklivity-community-license/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.aklivity.zilla.runtime.validator.protobuf;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.function.LongFunction;
+
+import org.agrona.DirectBuffer;
+import org.agrona.concurrent.UnsafeBuffer;
+import org.agrona.io.DirectBufferInputStream;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.util.JsonFormat;
+
+import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
+import io.aklivity.zilla.runtime.engine.validator.FragmentValidator;
+import io.aklivity.zilla.runtime.engine.validator.ValueValidator;
+import io.aklivity.zilla.runtime.engine.validator.function.FragmentConsumer;
+import io.aklivity.zilla.runtime.engine.validator.function.ValueConsumer;
+import io.aklivity.zilla.runtime.validator.protobuf.config.ProtobufValidatorConfig;
+
+public class ProtobufWriteValidator extends ProtobufValidator implements ValueValidator, FragmentValidator
+{
+ private final DirectBuffer indexesRO;
+ private final InputStreamReader input;
+ private final DirectBufferInputStream in;
+ private final JsonFormat.Parser parser;
+
+ public ProtobufWriteValidator(
+ ProtobufValidatorConfig config,
+ LongFunction supplyCatalog)
+ {
+ super(config, supplyCatalog);
+ this.indexesRO = new UnsafeBuffer();
+ this.in = new DirectBufferInputStream();
+ this.input = new InputStreamReader(in);
+ this.parser = JsonFormat.parser();
+ }
+
+ @Override
+ public int padding(
+ DirectBuffer data,
+ int index,
+ int length)
+ {
+ int schemaId = catalog != null && catalog.id > 0
+ ? catalog.id
+ : handler.resolve(subject, catalog.version);
+
+ return handler.encodePadding() + supplyIndexPadding(schemaId);
+ }
+
+ @Override
+ public int validate(
+ DirectBuffer data,
+ int index,
+ int length,
+ ValueConsumer next)
+ {
+ return validateComplete(data, index, length, next);
+ }
+
+ @Override
+ public int validate(
+ int flags,
+ DirectBuffer data,
+ int index,
+ int length,
+ FragmentConsumer next)
+ {
+ return (flags & FLAGS_FIN) != 0x00
+ ? validateComplete(data, index, length, (b, i, l) -> next.accept(FLAGS_COMPLETE, b, i, l))
+ : 0;
+ }
+
+ private int validateComplete(
+ DirectBuffer data,
+ int index,
+ int length,
+ ValueConsumer next)
+ {
+ int valLength = -1;
+
+ int schemaId = catalog != null && catalog.id > 0
+ ? catalog.id
+ : handler.resolve(subject, catalog.version);
+
+ if (FORMAT_JSON.equals(format))
+ {
+ valLength = handler.encode(schemaId, data, index, length, next, this::serializeJsonRecord);
+ }
+ else if (validate(schemaId, data, index, length))
+ {
+ valLength = handler.encode(schemaId, data, index, length, next, this::encode);
+ }
+ return valLength;
+ }
+
+ private boolean validate(
+ int schemaId,
+ DirectBuffer buffer,
+ int index,
+ int length)
+ {
+ boolean status = false;
+ DescriptorTree trees = supplyDescriptorTree(schemaId);
+ if (trees != null && catalog.record != null)
+ {
+ DescriptorTree tree = trees.findByName(catalog.record);
+ if (tree != null)
+ {
+ Descriptors.Descriptor descriptor = tree.descriptor;
+ indexes.add(tree.indexes.size());
+ indexes.addAll(tree.indexes);
+ in.wrap(buffer, index, length);
+ DynamicMessage.Builder builder = supplyDynamicMessageBuilder(descriptor);
+ try
+ {
+ DynamicMessage message = builder.mergeFrom(in).build();
+ builder.clear();
+ status = message.getUnknownFields().asMap().isEmpty();
+ }
+ catch (IOException ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+ }
+ return status;
+ }
+
+ private int encode(
+ int schemaId,
+ DirectBuffer buffer,
+ int index,
+ int length,
+ ValueConsumer next)
+ {
+ int valLength = -1;
+ if (indexes.size() == 2 && indexes.get(0) == 1 && indexes.get(1) == 0)
+ {
+ indexesRO.wrap(ZERO_INDEX);
+ valLength = 1;
+ }
+ else
+ {
+ indexesRO.wrap(encodeIndexes());
+ valLength = indexes.size();
+ }
+ indexes.clear();
+ next.accept(indexesRO, 0, valLength);
+ next.accept(buffer, index, length);
+ return valLength + length;
+ }
+
+ private int serializeJsonRecord(
+ int schemaId,
+ DirectBuffer buffer,
+ int index,
+ int length,
+ ValueConsumer next)
+ {
+ int valLength = -1;
+ DescriptorTree tree = supplyDescriptorTree(schemaId);
+ if (tree != null && catalog.record != null)
+ {
+ tree = tree.findByName(catalog.record);
+ if (tree != null)
+ {
+ Descriptors.Descriptor descriptor = tree.descriptor;
+ indexes.clear();
+ indexes.add(tree.indexes.size());
+ indexes.addAll(tree.indexes);
+ DynamicMessage.Builder builder = supplyDynamicMessageBuilder(descriptor);
+ in.wrap(buffer, index, length);
+ try
+ {
+ parser.merge(input, builder);
+ DynamicMessage message = builder.build();
+ builder.clear();
+ if (message.isInitialized() && message.getUnknownFields().asMap().isEmpty())
+ {
+ out.wrap(out.buffer());
+ message.writeTo(out);
+ valLength = encode(schemaId, out.buffer(), 0, out.position(), next);
+ }
+ }
+ catch (IOException ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+ }
+ return valLength;
+ }
+}
diff --git a/incubator/validator-protobuf/src/main/java/io/aklivity/zilla/runtime/validator/protobuf/config/ProtobufValidatorConfig.java b/incubator/validator-protobuf/src/main/java/io/aklivity/zilla/runtime/validator/protobuf/config/ProtobufValidatorConfig.java
new file mode 100644
index 0000000000..89a7827dd6
--- /dev/null
+++ b/incubator/validator-protobuf/src/main/java/io/aklivity/zilla/runtime/validator/protobuf/config/ProtobufValidatorConfig.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc
+ *
+ * Licensed under the Aklivity Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * https://www.aklivity.io/aklivity-community-license/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.aklivity.zilla.runtime.validator.protobuf.config;
+
+import java.util.List;
+import java.util.function.Function;
+
+import io.aklivity.zilla.runtime.engine.config.CatalogedConfig;
+import io.aklivity.zilla.runtime.engine.config.ValidatorConfig;
+
+public final class ProtobufValidatorConfig extends ValidatorConfig
+{
+ public final String subject;
+ public final String format;
+
+ ProtobufValidatorConfig(
+ List cataloged,
+ String subject,
+ String format)
+ {
+ super("protobuf", cataloged);
+ this.subject = subject;
+ this.format = format;
+ }
+
+ public static ProtobufValidatorConfigBuilder builder(
+ Function mapper)
+ {
+ return new ProtobufValidatorConfigBuilder<>(mapper::apply);
+ }
+
+ public static ProtobufValidatorConfigBuilder builder()
+ {
+ return new ProtobufValidatorConfigBuilder<>(ProtobufValidatorConfig.class::cast);
+ }
+}
diff --git a/incubator/validator-protobuf/src/main/java/io/aklivity/zilla/runtime/validator/protobuf/config/ProtobufValidatorConfigAdapter.java b/incubator/validator-protobuf/src/main/java/io/aklivity/zilla/runtime/validator/protobuf/config/ProtobufValidatorConfigAdapter.java
new file mode 100644
index 0000000000..6471d50cc9
--- /dev/null
+++ b/incubator/validator-protobuf/src/main/java/io/aklivity/zilla/runtime/validator/protobuf/config/ProtobufValidatorConfigAdapter.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc
+ *
+ * Licensed under the Aklivity Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * https://www.aklivity.io/aklivity-community-license/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.aklivity.zilla.runtime.validator.protobuf.config;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import jakarta.json.Json;
+import jakarta.json.JsonArray;
+import jakarta.json.JsonArrayBuilder;
+import jakarta.json.JsonObject;
+import jakarta.json.JsonObjectBuilder;
+import jakarta.json.JsonValue;
+import jakarta.json.bind.adapter.JsonbAdapter;
+
+import io.aklivity.zilla.runtime.engine.config.CatalogedConfig;
+import io.aklivity.zilla.runtime.engine.config.SchemaConfig;
+import io.aklivity.zilla.runtime.engine.config.SchemaConfigAdapter;
+import io.aklivity.zilla.runtime.engine.config.ValidatorConfig;
+import io.aklivity.zilla.runtime.engine.config.ValidatorConfigAdapterSpi;
+
+public final class ProtobufValidatorConfigAdapter implements ValidatorConfigAdapterSpi, JsonbAdapter
+{
+ private static final String PROTOBUF = "protobuf";
+ private static final String TYPE_NAME = "type";
+ private static final String CATALOG_NAME = "catalog";
+ private static final String SUBJECT_NAME = "subject";
+ private static final String FORMAT = "format";
+
+ private final SchemaConfigAdapter schema = new SchemaConfigAdapter();
+
+ @Override
+ public String type()
+ {
+ return PROTOBUF;
+ }
+
+ @Override
+ public JsonValue adaptToJson(
+ ValidatorConfig config)
+ {
+ ProtobufValidatorConfig protobufConfig = (ProtobufValidatorConfig) config;
+ JsonObjectBuilder validator = Json.createObjectBuilder();
+ validator.add(TYPE_NAME, PROTOBUF);
+
+ if (protobufConfig.format != null)
+ {
+ validator.add(FORMAT, protobufConfig.format);
+ }
+
+ if (protobufConfig.cataloged != null && !protobufConfig.cataloged.isEmpty())
+ {
+ JsonObjectBuilder catalogs = Json.createObjectBuilder();
+ for (CatalogedConfig catalog : protobufConfig.cataloged)
+ {
+ JsonArrayBuilder array = Json.createArrayBuilder();
+ for (SchemaConfig schemaItem: catalog.schemas)
+ {
+ array.add(schema.adaptToJson(schemaItem));
+ }
+ catalogs.add(catalog.name, array);
+ }
+ validator.add(CATALOG_NAME, catalogs);
+ }
+ return validator.build();
+ }
+
+ @Override
+ public ValidatorConfig adaptFromJson(
+ JsonValue value)
+ {
+ JsonObject object = (JsonObject) value;
+
+ assert object.containsKey(CATALOG_NAME);
+
+ JsonObject catalogsJson = object.getJsonObject(CATALOG_NAME);
+ List catalogs = new LinkedList<>();
+ for (String catalogName: catalogsJson.keySet())
+ {
+ JsonArray schemasJson = catalogsJson.getJsonArray(catalogName);
+ List schemas = new LinkedList<>();
+ for (JsonValue item : schemasJson)
+ {
+ JsonObject schemaJson = (JsonObject) item;
+ SchemaConfig schemaElement = schema.adaptFromJson(schemaJson);
+ schemas.add(schemaElement);
+ }
+ catalogs.add(new CatalogedConfig(catalogName, schemas));
+ }
+
+ String subject = object.containsKey(SUBJECT_NAME)
+ ? object.getString(SUBJECT_NAME)
+ : null;
+
+ String format = object.containsKey(FORMAT)
+ ? object.getString(FORMAT)
+ : null;
+
+ return new ProtobufValidatorConfig(catalogs, subject, format);
+ }
+}
diff --git a/incubator/validator-protobuf/src/main/java/io/aklivity/zilla/runtime/validator/protobuf/config/ProtobufValidatorConfigBuilder.java b/incubator/validator-protobuf/src/main/java/io/aklivity/zilla/runtime/validator/protobuf/config/ProtobufValidatorConfigBuilder.java
new file mode 100644
index 0000000000..be37990c94
--- /dev/null
+++ b/incubator/validator-protobuf/src/main/java/io/aklivity/zilla/runtime/validator/protobuf/config/ProtobufValidatorConfigBuilder.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc
+ *
+ * Licensed under the Aklivity Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * https://www.aklivity.io/aklivity-community-license/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.aklivity.zilla.runtime.validator.protobuf.config;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Function;
+
+import io.aklivity.zilla.runtime.engine.config.CatalogedConfig;
+import io.aklivity.zilla.runtime.engine.config.CatalogedConfigBuilder;
+import io.aklivity.zilla.runtime.engine.config.ConfigBuilder;
+
+public class ProtobufValidatorConfigBuilder extends ConfigBuilder>
+{
+ private final Function mapper;
+
+ private List catalogs;
+ private String subject;
+ private String format;
+
+ ProtobufValidatorConfigBuilder(
+ Function mapper)
+ {
+ this.mapper = mapper;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected Class> thisType()
+ {
+ return (Class>) getClass();
+ }
+
+ public CatalogedConfigBuilder> catalog()
+ {
+ return CatalogedConfig.builder(this::catalog);
+ }
+
+ public ProtobufValidatorConfigBuilder subject(
+ String subject)
+ {
+ this.subject = subject;
+ return this;
+ }
+
+ public ProtobufValidatorConfigBuilder catalog(
+ CatalogedConfig catalog)
+ {
+ if (catalogs == null)
+ {
+ catalogs = new LinkedList<>();
+ }
+ catalogs.add(catalog);
+ return this;
+ }
+
+ public ProtobufValidatorConfigBuilder format(
+ String format)
+ {
+ this.format = format;
+ return this;
+ }
+
+ @Override
+ public T build()
+ {
+ return mapper.apply(new ProtobufValidatorConfig(catalogs, subject, format));
+ }
+}
diff --git a/incubator/validator-protobuf/src/main/moditect/module-info.java b/incubator/validator-protobuf/src/main/moditect/module-info.java
new file mode 100644
index 0000000000..4781ede24e
--- /dev/null
+++ b/incubator/validator-protobuf/src/main/moditect/module-info.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc
+ *
+ * Licensed under the Aklivity Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * https://www.aklivity.io/aklivity-community-license/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+module io.aklivity.zilla.runtime.validator.protobuf
+{
+ requires org.antlr.antlr4.runtime;
+ requires protobuf.java;
+ requires io.aklivity.zilla.runtime.engine;
+
+ exports io.aklivity.zilla.runtime.validator.protobuf.config;
+
+ provides io.aklivity.zilla.runtime.engine.config.ValidatorConfigAdapterSpi
+ with io.aklivity.zilla.runtime.validator.protobuf.config.ProtobufValidatorConfigAdapter;
+
+ provides io.aklivity.zilla.runtime.engine.validator.ValidatorFactorySpi
+ with io.aklivity.zilla.runtime.validator.protobuf.ProtobufValidatorFactory;
+}
diff --git a/incubator/validator-protobuf/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.config.ValidatorConfigAdapterSpi b/incubator/validator-protobuf/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.config.ValidatorConfigAdapterSpi
new file mode 100644
index 0000000000..f159a5ed34
--- /dev/null
+++ b/incubator/validator-protobuf/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.config.ValidatorConfigAdapterSpi
@@ -0,0 +1 @@
+io.aklivity.zilla.runtime.validator.protobuf.config.ProtobufValidatorConfigAdapter
diff --git a/incubator/validator-protobuf/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.validator.ValidatorFactorySpi b/incubator/validator-protobuf/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.validator.ValidatorFactorySpi
new file mode 100644
index 0000000000..903ff8382f
--- /dev/null
+++ b/incubator/validator-protobuf/src/main/resources/META-INF/services/io.aklivity.zilla.runtime.engine.validator.ValidatorFactorySpi
@@ -0,0 +1 @@
+io.aklivity.zilla.runtime.validator.protobuf.ProtobufValidatorFactory
diff --git a/incubator/validator-protobuf/src/test/java/io/aklivity/zilla/runtime/validator/protobuf/ProtobufValidatorFactoryTest.java b/incubator/validator-protobuf/src/test/java/io/aklivity/zilla/runtime/validator/protobuf/ProtobufValidatorFactoryTest.java
new file mode 100644
index 0000000000..02e4824417
--- /dev/null
+++ b/incubator/validator-protobuf/src/test/java/io/aklivity/zilla/runtime/validator/protobuf/ProtobufValidatorFactoryTest.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc
+ *
+ * Licensed under the Aklivity Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * https://www.aklivity.io/aklivity-community-license/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.aklivity.zilla.runtime.validator.protobuf;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.function.LongFunction;
+
+import org.junit.Test;
+
+import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
+import io.aklivity.zilla.runtime.engine.config.ValidatorConfig;
+import io.aklivity.zilla.runtime.engine.test.internal.catalog.TestCatalogHandler;
+import io.aklivity.zilla.runtime.engine.test.internal.catalog.config.TestCatalogOptionsConfig;
+import io.aklivity.zilla.runtime.engine.validator.ValueValidator;
+import io.aklivity.zilla.runtime.validator.protobuf.config.ProtobufValidatorConfig;
+
+
+public class ProtobufValidatorFactoryTest
+{
+ @Test
+ public void shouldCreateReadValidator()
+ {
+ // GIVEN
+ ValidatorConfig validator = ProtobufValidatorConfig.builder()
+ .subject("test-value")
+ .catalog()
+ .name("test0")
+ .schema()
+ .subject("subject1")
+ .version("latest")
+ .build()
+ .build()
+ .build();
+ LongFunction supplyCatalog = i -> new TestCatalogHandler(
+ TestCatalogOptionsConfig.builder()
+ .id(1)
+ .schema("schema0")
+ .build());
+ ProtobufValidatorFactory factory = new ProtobufValidatorFactory();
+
+ // WHEN
+ ValueValidator reader = factory.createValueReader(validator, supplyCatalog);
+
+ // THEN
+ assertThat(reader, instanceOf(ProtobufReadValidator.class));
+ }
+
+ @Test
+ public void shouldCreateWriteValidator()
+ {
+ // GIVEN
+ ValidatorConfig validator = ProtobufValidatorConfig.builder()
+ .subject("test-value")
+ .catalog()
+ .name("test0")
+ .schema()
+ .subject("subject1")
+ .version("latest")
+ .build()
+ .build()
+ .build();
+ LongFunction supplyCatalog = i -> new TestCatalogHandler(
+ TestCatalogOptionsConfig.builder()
+ .id(1)
+ .schema("schema0")
+ .build());
+ ProtobufValidatorFactory factory = new ProtobufValidatorFactory();
+
+ // WHEN
+ ValueValidator writer = factory.createValueWriter(validator, supplyCatalog);
+
+ // THEN
+ assertThat(writer, instanceOf(ProtobufWriteValidator.class));
+ }
+}
diff --git a/incubator/validator-protobuf/src/test/java/io/aklivity/zilla/runtime/validator/protobuf/ProtobufValidatorTest.java b/incubator/validator-protobuf/src/test/java/io/aklivity/zilla/runtime/validator/protobuf/ProtobufValidatorTest.java
new file mode 100644
index 0000000000..d464df6583
--- /dev/null
+++ b/incubator/validator-protobuf/src/test/java/io/aklivity/zilla/runtime/validator/protobuf/ProtobufValidatorTest.java
@@ -0,0 +1,384 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc
+ *
+ * Licensed under the Aklivity Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * https://www.aklivity.io/aklivity-community-license/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.aklivity.zilla.runtime.validator.protobuf;
+
+import static io.aklivity.zilla.runtime.engine.EngineConfiguration.ENGINE_DIRECTORY;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+import java.util.function.LongFunction;
+
+import org.agrona.DirectBuffer;
+import org.agrona.concurrent.UnsafeBuffer;
+import org.junit.Before;
+import org.junit.Test;
+
+import io.aklivity.zilla.runtime.engine.Configuration;
+import io.aklivity.zilla.runtime.engine.EngineContext;
+import io.aklivity.zilla.runtime.engine.catalog.Catalog;
+import io.aklivity.zilla.runtime.engine.catalog.CatalogContext;
+import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
+import io.aklivity.zilla.runtime.engine.config.CatalogConfig;
+import io.aklivity.zilla.runtime.engine.test.internal.catalog.TestCatalog;
+import io.aklivity.zilla.runtime.engine.test.internal.catalog.config.TestCatalogOptionsConfig;
+import io.aklivity.zilla.runtime.engine.validator.function.ValueConsumer;
+import io.aklivity.zilla.runtime.validator.protobuf.config.ProtobufValidatorConfig;
+
+public class ProtobufValidatorTest
+{
+ private static final String SCHEMA = "syntax = \"proto3\";" +
+ "package io.aklivity.examples.clients.proto;" +
+ "message SimpleMessage " +
+ "{ " +
+ "string content = 1;" +
+ "optional string date_time = 2;" +
+ "message DeviceMessage2 " +
+ "{ " +
+ "int32 id = 1;" +
+ "message DeviceMessage6 " +
+ "{ " +
+ "int32 id = 1;" +
+ "}" +
+ "}" +
+ "DeviceMessage2 device = 3;" +
+ "}" +
+ "message DemoMessage " +
+ "{ " +
+ "string status = 1;" +
+ "message DeviceMessage " +
+ "{ " +
+ "int32 id = 1;" +
+ "}" +
+ "message DeviceMessage1 " +
+ "{ " +
+ "int32 id = 1;" +
+ "}" +
+ "optional string date_time = 2;" +
+ "message SimpleMessage " +
+ "{ " +
+ "string content = 1;" +
+ "optional string date_time = 2;" +
+ "}" +
+ "}";
+ private CatalogContext context;
+
+ @Before
+ public void init()
+ {
+ Properties properties = new Properties();
+ properties.setProperty(ENGINE_DIRECTORY.name(), "target/zilla-itests");
+ Configuration config = new Configuration(properties);
+ Catalog catalog = new TestCatalog(config);
+ context = catalog.supply(mock(EngineContext.class));
+ }
+
+ @Test
+ public void shouldWriteValidProtobufEvent()
+ {
+ CatalogConfig catalogConfig = new CatalogConfig("test0", "test",
+ TestCatalogOptionsConfig.builder()
+ .id(1)
+ .schema(SCHEMA)
+ .build());
+
+ ProtobufValidatorConfig config = ProtobufValidatorConfig.builder()
+ .catalog()
+ .name("test0")
+ .schema()
+ .strategy("topic")
+ .version("latest")
+ .subject("test-value")
+ .record("SimpleMessage")
+ .build()
+ .build()
+ .build();
+ LongFunction handler = value -> context.attach(catalogConfig);
+ ProtobufWriteValidator validator = new ProtobufWriteValidator(config, handler);
+
+ DirectBuffer data = new UnsafeBuffer();
+
+ byte[] bytes = {0x0a, 0x02, 0x4f, 0x4b, 0x12, 0x08, 0x30, 0x31, 0x30, 0x31, 0x32, 0x30, 0x32, 0x34};
+ data.wrap(bytes, 0, bytes.length);
+ assertEquals(data.capacity() + 1, validator.validate(data, 0, data.capacity(), ValueConsumer.NOP));
+
+ assertEquals(data.capacity() + 1, validator.validate(data, 0, data.capacity(), ValueConsumer.NOP));
+ }
+
+ @Test
+ public void shouldWriteValidProtobufEventNestedMessage()
+ {
+ CatalogConfig catalogConfig = new CatalogConfig("test0", "test",
+ TestCatalogOptionsConfig.builder()
+ .id(1)
+ .schema(SCHEMA)
+ .build());
+
+ ProtobufValidatorConfig config = ProtobufValidatorConfig.builder()
+ .catalog()
+ .name("test0")
+ .schema()
+ .strategy("topic")
+ .version("latest")
+ .subject("test-value")
+ .record("DemoMessage.SimpleMessage")
+ .build()
+ .build()
+ .build();
+ LongFunction handler = value -> context.attach(catalogConfig);
+ ProtobufWriteValidator validator = new ProtobufWriteValidator(config, handler);
+
+ DirectBuffer data = new UnsafeBuffer();
+
+ byte[] bytes = {0x0a, 0x02, 0x4f, 0x4b, 0x12, 0x08, 0x30, 0x31, 0x30, 0x31, 0x32, 0x30, 0x32, 0x34};
+ data.wrap(bytes, 0, bytes.length);
+ assertEquals(data.capacity() + 3, validator.validate(data, 0, data.capacity(), ValueConsumer.NOP));
+ }
+
+ @Test
+ public void shouldWriteValidProtobufEventIncorrectRecordName()
+ {
+ CatalogConfig catalogConfig = new CatalogConfig("test0", "test",
+ TestCatalogOptionsConfig.builder()
+ .id(1)
+ .schema(SCHEMA)
+ .build());
+
+ ProtobufValidatorConfig config = ProtobufValidatorConfig.builder()
+ .catalog()
+ .name("test0")
+ .schema()
+ .strategy("topic")
+ .version("latest")
+ .subject("test-value")
+ .record("DemoMessage.IncorrectRecord")
+ .build()
+ .build()
+ .build();
+ LongFunction handler = value -> context.attach(catalogConfig);
+ ProtobufWriteValidator validator = new ProtobufWriteValidator(config, handler);
+
+ DirectBuffer data = new UnsafeBuffer();
+
+ byte[] bytes = {0x0a, 0x02, 0x4f, 0x4b, 0x12, 0x08, 0x30, 0x31, 0x30, 0x31, 0x32, 0x30, 0x32, 0x34};
+ data.wrap(bytes, 0, bytes.length);
+ assertEquals(-1, validator.validate(data, 0, data.capacity(), ValueConsumer.NOP));
+ }
+
+ @Test
+ public void shouldReadValidProtobufEvent()
+ {
+ CatalogConfig catalogConfig = new CatalogConfig("test0", "test",
+ TestCatalogOptionsConfig.builder()
+ .id(1)
+ .schema(SCHEMA)
+ .build());
+
+ ProtobufValidatorConfig config = ProtobufValidatorConfig.builder()
+ .catalog()
+ .name("test0")
+ .schema()
+ .strategy("topic")
+ .version("latest")
+ .subject("test-value")
+ .build()
+ .build()
+ .build();
+ LongFunction handler = value -> context.attach(catalogConfig);
+ ProtobufReadValidator validator = new ProtobufReadValidator(config, handler);
+
+ DirectBuffer data = new UnsafeBuffer();
+
+ byte[] bytes = {0x00, 0x0a, 0x02, 0x4f, 0x4b, 0x12, 0x08, 0x30, 0x31, 0x30, 0x31, 0x32, 0x30, 0x32, 0x34};
+ data.wrap(bytes, 0, bytes.length);
+ assertEquals(data.capacity() - 1, validator.validate(data, 0, data.capacity(), ValueConsumer.NOP));
+
+ assertEquals(data.capacity() - 1, validator.validate(data, 0, data.capacity(), ValueConsumer.NOP));
+ }
+
+ @Test
+ public void shouldReadValidProtobufEventNestedMessage()
+ {
+ CatalogConfig catalogConfig = new CatalogConfig("test0", "test",
+ TestCatalogOptionsConfig.builder()
+ .id(1)
+ .schema(SCHEMA)
+ .build());
+
+ ProtobufValidatorConfig config = ProtobufValidatorConfig.builder()
+ .catalog()
+ .name("test0")
+ .schema()
+ .strategy("topic")
+ .version("latest")
+ .subject("test-value")
+ .build()
+ .build()
+ .build();
+ LongFunction handler = value -> context.attach(catalogConfig);
+ ProtobufReadValidator validator = new ProtobufReadValidator(config, handler);
+
+ DirectBuffer data = new UnsafeBuffer();
+
+ byte[] bytes = {0x04, 0x02, 0x04, 0x0a, 0x02, 0x4f, 0x4b, 0x12, 0x08, 0x30, 0x31, 0x30, 0x31, 0x32, 0x30, 0x32, 0x34};
+ data.wrap(bytes, 0, bytes.length);
+ assertEquals(data.capacity() - 3, validator.validate(data, 0, data.capacity(), ValueConsumer.NOP));
+ }
+
+ @Test
+ public void shouldReadValidProtobufEventFormatJson()
+ {
+ CatalogConfig catalogConfig = new CatalogConfig("test0", "test",
+ TestCatalogOptionsConfig.builder()
+ .id(1)
+ .schema(SCHEMA)
+ .build());
+
+ ProtobufValidatorConfig config = ProtobufValidatorConfig.builder()
+ .format("json")
+ .catalog()
+ .name("test0")
+ .schema()
+ .strategy("topic")
+ .version("latest")
+ .subject("test-value")
+ .build()
+ .build()
+ .build();
+
+ LongFunction handler = value -> context.attach(catalogConfig);
+ ProtobufReadValidator validator = new ProtobufReadValidator(config, handler);
+
+ DirectBuffer data = new UnsafeBuffer();
+
+ byte[] bytes = {0x00, 0x0a, 0x02, 0x4f, 0x4b, 0x12, 0x08, 0x30, 0x31, 0x30, 0x31, 0x32, 0x30, 0x32, 0x34};
+ data.wrap(bytes, 0, bytes.length);
+
+ String json =
+ "{" +
+ "\"content\":\"OK\"," +
+ "\"date_time\":\"01012024\"" +
+ "}";
+
+ final ValueConsumer consumer = (buffer, index, length) ->
+ {
+ byte[] jsonBytes = new byte[length];
+ buffer.getBytes(index, jsonBytes);
+ assertEquals(json, new String(jsonBytes, StandardCharsets.UTF_8));
+ };
+ validator.validate(data, 0, data.capacity(), consumer);
+
+ validator.validate(data, 0, data.capacity(), consumer);
+ }
+
+ @Test
+ public void shouldWriteValidProtobufEventFormatJson()
+ {
+ CatalogConfig catalogConfig = new CatalogConfig("test0", "test",
+ TestCatalogOptionsConfig.builder()
+ .id(1)
+ .schema(SCHEMA)
+ .build());
+
+ ProtobufValidatorConfig config = ProtobufValidatorConfig.builder()
+ .format("json")
+ .catalog()
+ .name("test0")
+ .schema()
+ .strategy("topic")
+ .version("latest")
+ .subject("test-value")
+ .record("SimpleMessage")
+ .build()
+ .build()
+ .build();
+
+ LongFunction handler = value -> context.attach(catalogConfig);
+ ProtobufWriteValidator validator = new ProtobufWriteValidator(config, handler);
+
+ DirectBuffer data = new UnsafeBuffer();
+
+ String json =
+ "{" +
+ "\"content\":\"OK\"," +
+ "\"date_time\":\"01012024\"" +
+ "}";
+ data.wrap(json.getBytes(), 0, json.getBytes().length);
+
+ byte[] expectedBytes = {0x00, 0x0a, 0x02, 0x4f, 0x4b, 0x12, 0x08, 0x30, 0x31, 0x30, 0x31, 0x32, 0x30, 0x32, 0x34};
+ DirectBuffer expected = new UnsafeBuffer();
+ expected.wrap(expectedBytes, 0, expectedBytes.length);
+
+ assertEquals(expected.capacity(), validator.validate(data, 0, data.capacity(), ValueConsumer.NOP));
+
+ assertEquals(expected.capacity(), validator.validate(data, 0, data.capacity(), ValueConsumer.NOP));
+ }
+
+ @Test
+ public void shouldVerifyJsonFormatPaddingLength()
+ {
+ CatalogConfig catalogConfig = new CatalogConfig("test0", "test",
+ TestCatalogOptionsConfig.builder()
+ .id(9)
+ .schema(SCHEMA)
+ .build());
+ LongFunction handler = value -> context.attach(catalogConfig);
+ ProtobufValidatorConfig config = ProtobufValidatorConfig.builder()
+ .format("json")
+ .catalog()
+ .name("test0")
+ .schema()
+ .strategy("topic")
+ .version("latest")
+ .subject("test-value")
+ .build()
+ .build()
+ .build();
+ ProtobufReadValidator validator = new ProtobufReadValidator(config, handler);
+
+ DirectBuffer data = new UnsafeBuffer();
+
+ assertEquals(71, validator.padding(data, 0, data.capacity()));
+ }
+
+ @Test
+ public void shouldVerifyIndexPaddingLength()
+ {
+ CatalogConfig catalogConfig = new CatalogConfig("test0", "test",
+ TestCatalogOptionsConfig.builder()
+ .id(9)
+ .schema(SCHEMA)
+ .build());
+ LongFunction handler = value -> context.attach(catalogConfig);
+ ProtobufValidatorConfig config = ProtobufValidatorConfig.builder()
+ .catalog()
+ .name("test0")
+ .schema()
+ .strategy("topic")
+ .version("latest")
+ .subject("test-value")
+ .record("DemoMessage.SimpleMessage")
+ .build()
+ .build()
+ .build();
+ ProtobufWriteValidator validator = new ProtobufWriteValidator(config, handler);
+
+ DirectBuffer data = new UnsafeBuffer();
+
+ assertEquals(3, validator.padding(data, 0, data.capacity()));
+
+ }
+}
diff --git a/incubator/validator-protobuf/src/test/java/io/aklivity/zilla/runtime/validator/protobuf/config/ProtobufValidatorConfigAdapterTest.java b/incubator/validator-protobuf/src/test/java/io/aklivity/zilla/runtime/validator/protobuf/config/ProtobufValidatorConfigAdapterTest.java
new file mode 100644
index 0000000000..e9c8ce0de3
--- /dev/null
+++ b/incubator/validator-protobuf/src/test/java/io/aklivity/zilla/runtime/validator/protobuf/config/ProtobufValidatorConfigAdapterTest.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright 2021-2023 Aklivity Inc
+ *
+ * Licensed under the Aklivity Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * https://www.aklivity.io/aklivity-community-license/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.aklivity.zilla.runtime.validator.protobuf.config;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+
+import jakarta.json.bind.Jsonb;
+import jakarta.json.bind.JsonbBuilder;
+import jakarta.json.bind.JsonbConfig;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class ProtobufValidatorConfigAdapterTest
+{
+ private Jsonb jsonb;
+
+ @Before
+ public void initJson()
+ {
+ JsonbConfig config = new JsonbConfig()
+ .withAdapters(new ProtobufValidatorConfigAdapter());
+ jsonb = JsonbBuilder.create(config);
+ }
+
+ @Test
+ public void shouldReadAvroValidator()
+ {
+ // GIVEN
+ String json =
+ "{" +
+ "\"type\": \"protobuf\"," +
+ "\"catalog\":" +
+ "{" +
+ "\"test0\":" +
+ "[" +
+ "{" +
+ "\"strategy\": \"topic\"," +
+ "\"version\": \"latest\"" +
+ "}," +
+ "{" +
+ "\"subject\": \"cat\"," +
+ "\"version\": \"latest\"" +
+ "}," +
+ "{" +
+ "\"id\": 42" +
+ "}" +
+ "]" +
+ "}" +
+ "}";
+
+ // WHEN
+ ProtobufValidatorConfig validator = jsonb.fromJson(json, ProtobufValidatorConfig.class);
+
+ // THEN
+ assertThat(validator, not(nullValue()));
+ assertThat(validator.type, equalTo("protobuf"));
+ assertThat(validator.cataloged.size(), equalTo(1));
+ assertThat(validator.cataloged.get(0).name, equalTo("test0"));
+ assertThat(validator.cataloged.get(0).schemas.get(0).strategy, equalTo("topic"));
+ assertThat(validator.cataloged.get(0).schemas.get(0).version, equalTo("latest"));
+ assertThat(validator.cataloged.get(0).schemas.get(0).id, equalTo(0));
+ assertThat(validator.cataloged.get(0).schemas.get(1).subject, equalTo("cat"));
+ assertThat(validator.cataloged.get(0).schemas.get(1).strategy, nullValue());
+ assertThat(validator.cataloged.get(0).schemas.get(1).version, equalTo("latest"));
+ assertThat(validator.cataloged.get(0).schemas.get(1).id, equalTo(0));
+ assertThat(validator.cataloged.get(0).schemas.get(2).strategy, nullValue());
+ assertThat(validator.cataloged.get(0).schemas.get(2).version, nullValue());
+ assertThat(validator.cataloged.get(0).schemas.get(2).id, equalTo(42));
+ }
+
+ @Test
+ public void shouldWriteAvroValidator()
+ {
+ // GIVEN
+ String expectedJson =
+ "{" +
+ "\"type\":\"protobuf\"," +
+ "\"catalog\":" +
+ "{" +
+ "\"test0\":" +
+ "[" +
+ "{" +
+ "\"strategy\":\"topic\"," +
+ "\"version\":\"latest\"" +
+ "}," +
+ "{" +
+ "\"subject\":\"cat\"," +
+ "\"version\":\"latest\"" +
+ "}," +
+ "{" +
+ "\"id\":42" +
+ "}" +
+ "]" +
+ "}" +
+ "}";
+ ProtobufValidatorConfig validator = ProtobufValidatorConfig.builder()
+ .catalog()
+ .name("test0")
+ .schema()
+ .strategy("topic")
+ .version("latest")
+ .build()
+ .schema()
+ .subject("cat")
+ .version("latest")
+ .build()
+ .schema()
+ .id(42)
+ .build()
+ .build()
+ .build();
+
+ // WHEN
+ String json = jsonb.toJson(validator);
+
+ // THEN
+ assertThat(json, not(nullValue()));
+ assertThat(json, equalTo(expectedJson));
+ }
+}
diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/SchemaConfig.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/SchemaConfig.java
index 620e43b35c..d9c85a8bf8 100644
--- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/SchemaConfig.java
+++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/SchemaConfig.java
@@ -23,17 +23,20 @@ public class SchemaConfig
public final String version;
public final String subject;
public final int id;
+ public final String record;
SchemaConfig(
String strategy,
String subject,
String version,
- int id)
+ int id,
+ String record)
{
this.strategy = strategy;
this.version = version;
this.subject = subject;
this.id = id;
+ this.record = record;
}
public static SchemaConfigBuilder builder(
diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/SchemaConfigAdapter.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/SchemaConfigAdapter.java
index 1a16a922b3..7ff7c71efe 100644
--- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/SchemaConfigAdapter.java
+++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/config/SchemaConfigAdapter.java
@@ -27,6 +27,7 @@ public class SchemaConfigAdapter implements JsonbAdapter extends ConfigBuilder mapper)
@@ -67,9 +68,16 @@ public SchemaConfigBuilder id(
return this;
}
+ public SchemaConfigBuilder record(
+ String record)
+ {
+ this.record = record;
+ return this;
+ }
+
@Override
public T build()
{
- return mapper.apply(new SchemaConfig(strategy, subject, version, id));
+ return mapper.apply(new SchemaConfig(strategy, subject, version, id, record));
}
}