Skip to content

Conversation

@artemklevtsov
Copy link
Contributor

@artemklevtsov artemklevtsov commented Oct 5, 2025

Hi,

This PR adds a new template processor to Benthos that allows users to apply Go text/template templates to the structured content of messages. The processor supports optional Bloblang mapping for transforming data before templating, and can load templates from files using glob patterns.

This addresses the need for dynamic content generation using templating in Benthos pipelines, enabling use cases like generating dynamic JSON, XML, or text outputs based on message data.

Examples:

# example_1.yaml
input:
  generate:
    count: 1
    mapping: root.foo = "bar"

pipeline:
  processors:
    - template:
        code: "{{.foo}}"

output:
  stdout: {}
# example_2.yaml
input:
  generate:
    count: 1
    mapping: root.foo = "bar"

pipeline:
  processors:
    - template:
        code: "{{.value}}"
        mapping: "root.value = this.foo"

output:
  stdout: {}
# example_3.yaml
input:
  generate:
    count: 1
    mapping: root.foo = "bar"
  processors:
    - template:
        code: |
          {{ template "greeting" . }}
        files: ["./*.tmpl"]
output:
  stdout: {}

greeteing.tmpl:

{{- define "greeting" -}}
Hello, {{ .foo }}!
{{- end -}}

Output:

./target/benthos run example_3.yaml
INFO[2025-10-05T12:32:21+07:00] Running main config from specified file       @service=benthos benthos_version=ffd62dce19ed4f4211fa9e66c0600acbea59d569 path=tmp/glob.yaml
INFO[2025-10-05T12:32:21+07:00] Listening for HTTP requests at: http://0.0.0.0:4195  @service=benthos
INFO[2025-10-05T12:32:21+07:00] Input type generate is now active             @service=benthos label="" path=root.input
INFO[2025-10-05T12:32:21+07:00] Launching a Benthos instance, use CTRL+C to close  @service=benthos
INFO[2025-10-05T12:32:21+07:00] Output type stdout is now active              @service=benthos label="" path=root.output
Hello, bar!
INFO[2025-10-05T12:32:21+07:00] Pipeline has terminated. Shutting down the service  @service=benthos

Regards

@artemklevtsov
Copy link
Contributor Author

@Jeffail, @mihaitodor could you give your feedback on the PR?

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new template processor for Benthos that enables users to apply Go text/template templates to structured message content. The processor supports optional Bloblang mapping for pre-templating data transformation and can load template definitions from files using glob patterns.

Key Changes

  • New template processor with support for inline code and file-based template loading
  • Integration with Bloblang for optional pre-processing of message data
  • Configuration examples demonstrating basic templating, mapping integration, and file-based templates

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

artemklevtsov and others added 24 commits December 9, 2025 00:23
* Allow templates to be marked deprecated

* Remove version field from jsonspec
* utils/netutil: ingest netclient

* utils/netutil: refactoring

* Prepare package to host more network tools
* Improve spec descriptions
* Refactor to decorator pattern, and support wrapping existing control functions
* Detect linux based on GOOS vs build tags
* bloblang: add "".repeat(N) method

* update changelog
Make net.Dialer.Timeout configurable.
Without this patch Windows build fail with

    error=
    │ build failed: exit status 1: # github.com/redpanda-data/benthos/v4/public/utils/netutil
    │ ../../../Go/pkg/mod/github.com/redpanda-data/benthos/[email protected]/public/utils/netutil/dial.go:146:39: cannot use int(fd) (value of type int) as syscall.Handle value in argument to syscall.SetsockoptInt
    │ ../../../Go/pkg/mod/github.com/redpanda-data/benthos/[email protected]/public/utils/netutil/listen.go:45:40: cannot use int(fd) (value of type int) as syscall.Handle value in argument to syscall.SetsockoptInt
    │ ../../../Go/pkg/mod/github.com/redpanda-data/benthos/[email protected]/public/utils/netutil/listen.go:54:40: cannot use int(fd) (value of type int) as syscall.Handle value in argument to syscall.SetsockoptInt
    target=windows_amd64_v1
josephwoodward and others added 25 commits December 9, 2025 01:17
Bumps [actions/setup-go](https://github.com/actions/setup-go) from 5 to 6.
- [Release notes](https://github.com/actions/setup-go/releases)
- [Commits](actions/setup-go@v5...v6)

---
updated-dependencies:
- dependency-name: actions/setup-go
  dependency-version: '6'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <[email protected]>
Bumps [golangci/golangci-lint-action](https://github.com/golangci/golangci-lint-action) from 8 to 9.
- [Release notes](https://github.com/golangci/golangci-lint-action/releases)
- [Commits](golangci/golangci-lint-action@v8...v9)

---
updated-dependencies:
- dependency-name: golangci/golangci-lint-action
  dependency-version: '9'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <[email protected]>
Bumps [actions/checkout](https://github.com/actions/checkout) from 4 to 6.
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](actions/checkout@v4...v6)

---
updated-dependencies:
- dependency-name: actions/checkout
  dependency-version: '6'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <[email protected]>
…schema

$ ./target/benthos list --format jsonschema bloblang-functions uuid_v4 now | jq
{
  "bloblang-functions": [
    {
      "status": "stable",
      "category": "Environment",
      "name": "now",
      "description": "Returns the current timestamp as a string in RFC 3339 format with the local timezone. Use the method `ts_format` in order to change the format and timezone.",
      "params": {},
      "examples": [
        {
          "mapping": "root.received_at = now()",
          "summary": "",
          "results": [],
          "skip_testing": false
        },
        {
          "mapping": "root.received_at = now().ts_format(\"Mon Jan 2 15:04:05 -0700 MST 2006\", \"UTC\")",
          "summary": "",
          "results": [],
          "skip_testing": false
        }
      ],
      "impure": false
    },
    {
      "status": "stable",
      "category": "General",
      "name": "uuid_v4",
      "description": "Generates a new RFC-4122 UUID each time it is invoked and prints a string representation.",
      "params": {},
      "examples": [
        {
          "mapping": "root.id = uuid_v4()",
          "summary": "",
          "results": [],
          "skip_testing": false
        }
      ],
      "impure": false
    }
  ]
}
$ rpk connect blobl -h
NAME:
   redpanda-connect blobl - Execute a Redpanda Connect mapping on documents consumed via stdin or file

USAGE:
   redpanda-connect blobl [command options]

DESCRIPTION:
   Provides a convenient tool for mapping JSON documents over the command line:

     cat documents.jsonl | redpanda-connect blobl 'foo.bar.map_each(this.uppercase())'

     echo '{"foo":"bar"}' | redpanda-connect blobl -f ./mapping.blobl

     redpanda-connect blobl -i input.jsonl -f ./mapping.blobl

   Find out more about Bloblang at: https://docs.redpanda.com/redpanda-connect/guides/bloblang/about

COMMANDS:
   server   EXPERIMENTAL: Run a web server that hosts a Bloblang app
   help, h  Shows a list of commands or help for one command

OPTIONS:
   --threads value, -t value                                  the number of processing threads to use, when >1 ordering is no longer guaranteed. (default: 1)
   --raw, -r                                                  consume raw strings. (default: false)
   --pretty, -p                                               pretty-print output. (default: false)
   --file value, -f value                                     execute a mapping from a file.
   --input value, -i value                                    read input from a file instead of stdin.
   --output value, -o value                                   write output to a file instead of stdout.
   --max-token-length value                                   Set the buffer size for document lines. (default: 65536)
   --env-file value, -e value [ --env-file value, -e value ]  import environment variables from a dotenv file
   --help, -h
Core Benthos Functions/Methods (94 items)

1. Functions (25): batch_index, batch_size, bytes, content, counter, deleted, env, error, errored, error_source_*, file, file_rel, hostname, json, ksuid, metadata, nanoid, nothing,
  now, pi, random_int, range, throw, timestamp_unix*, tracing_id, tracing_span, uuid_v4, uuid_v7, var
2. String Methods (31): capitalize, contains, escape_html, escape_url_query, filepath_join, filepath_split, format, has_prefix, has_suffix, index_of, join, lowercase, quote,
  unquote, repeat, replace, replace_all, replace_many, replace_all_many, reverse, re_find_all, re_match, re_replace*, split, string, trim, trim_prefix, trim_suffix, unescape_html,
  unescape_url_query, uppercase
3. Structured Methods (28): all, any, append, assign, collapse, enumerated, exists, explode, filter, find, find_all, find_by, find_all_by, flatten, fold, key_values, keys, length,
  map_each, map_each_key, merge, sort, sort_by, sum, unique, values, without
  4. Number Methods (10): ceil, floor, log, log10, max, min, round, bitwise_and, bitwise_or, bitwise_xor

Benthos Implementation Methods (22 items)

1. Encoding Methods (2): compress, decompress
2. Time Methods (15): ts_round, ts_tz, ts_add_iso8601, ts_sub_iso8601, parse_duration, parse_duration_iso8601, ts_parse, ts_strptime, ts_format, ts_strftime, ts_unix, ts_unix_milli, ts_unix_micro, ts_unix_nano, ts_sub
3. General Functions (1): counter
4. I/O Functions (4): hostname, env, file, file_rel
…y with 15 updates

Bumps the production-dependencies group with 10 updates in the / directory:

| Package | From | To |
| --- | --- | --- |
| cuelang.org/go | `0.13.2` | `0.15.1` |
| [github.com/golang-jwt/jwt/v5](https://github.com/golang-jwt/jwt) | `5.2.2` | `5.3.0` |
| [github.com/itchyny/gojq](https://github.com/itchyny/gojq) | `0.12.17` | `0.12.18` |
| [github.com/klauspost/compress](https://github.com/klauspost/compress) | `1.18.0` | `1.18.2` |
| [github.com/linkedin/goavro/v2](https://github.com/linkedin/goavro) | `2.14.0` | `2.14.1` |
| [github.com/rickb777/period](https://github.com/rickb777/period) | `1.0.15` | `1.0.21` |
| [github.com/stretchr/testify](https://github.com/stretchr/testify) | `1.10.0` | `1.11.1` |
| [go.opentelemetry.io/otel](https://github.com/open-telemetry/opentelemetry-go) | `1.37.0` | `1.38.0` |
| [golang.org/x/crypto](https://github.com/golang/crypto) | `0.43.0` | `0.45.0` |
| [github.com/gofrs/uuid/v5](https://github.com/gofrs/uuid) | `5.3.2` | `5.4.0` |



Updates `cuelang.org/go` from 0.13.2 to 0.15.1

Updates `github.com/golang-jwt/jwt/v5` from 5.2.2 to 5.3.0
- [Release notes](https://github.com/golang-jwt/jwt/releases)
- [Commits](golang-jwt/jwt@v5.2.2...v5.3.0)

Updates `github.com/itchyny/gojq` from 0.12.17 to 0.12.18
- [Release notes](https://github.com/itchyny/gojq/releases)
- [Changelog](https://github.com/itchyny/gojq/blob/main/CHANGELOG.md)
- [Commits](itchyny/gojq@v0.12.17...v0.12.18)

Updates `github.com/itchyny/timefmt-go` from 0.1.6 to 0.1.7
- [Release notes](https://github.com/itchyny/timefmt-go/releases)
- [Changelog](https://github.com/itchyny/timefmt-go/blob/main/CHANGELOG.md)
- [Commits](itchyny/timefmt-go@v0.1.6...v0.1.7)

Updates `github.com/klauspost/compress` from 1.18.0 to 1.18.2
- [Release notes](https://github.com/klauspost/compress/releases)
- [Commits](klauspost/compress@v1.18.0...v1.18.2)

Updates `github.com/linkedin/goavro/v2` from 2.14.0 to 2.14.1
- [Release notes](https://github.com/linkedin/goavro/releases)
- [Commits](linkedin/goavro@v2.14.0...v2.14.1)

Updates `github.com/rickb777/period` from 1.0.15 to 1.0.21
- [Release notes](https://github.com/rickb777/period/releases)
- [Commits](rickb777/period@v1.0.15...v1.0.21)

Updates `github.com/stretchr/testify` from 1.10.0 to 1.11.1
- [Release notes](https://github.com/stretchr/testify/releases)
- [Commits](stretchr/testify@v1.10.0...v1.11.1)

Updates `go.opentelemetry.io/otel` from 1.37.0 to 1.38.0
- [Release notes](https://github.com/open-telemetry/opentelemetry-go/releases)
- [Changelog](https://github.com/open-telemetry/opentelemetry-go/blob/main/CHANGELOG.md)
- [Commits](open-telemetry/opentelemetry-go@v1.37.0...v1.38.0)

Updates `go.opentelemetry.io/otel/trace` from 1.37.0 to 1.38.0
- [Release notes](https://github.com/open-telemetry/opentelemetry-go/releases)
- [Changelog](https://github.com/open-telemetry/opentelemetry-go/blob/main/CHANGELOG.md)
- [Commits](open-telemetry/opentelemetry-go@v1.37.0...v1.38.0)

Updates `golang.org/x/crypto` from 0.43.0 to 0.45.0
- [Commits](golang/crypto@v0.43.0...v0.45.0)

Updates `golang.org/x/oauth2` from 0.30.0 to 0.32.0
- [Commits](golang/oauth2@v0.30.0...v0.32.0)

Updates `golang.org/x/sync` from 0.17.0 to 0.18.0
- [Commits](golang/sync@v0.17.0...v0.18.0)

Updates `golang.org/x/text` from 0.30.0 to 0.31.0
- [Release notes](https://github.com/golang/text/releases)
- [Commits](golang/text@v0.30.0...v0.31.0)

Updates `github.com/gofrs/uuid/v5` from 5.3.2 to 5.4.0
- [Release notes](https://github.com/gofrs/uuid/releases)
- [Commits](gofrs/uuid@v5.3.2...v5.4.0)

---
updated-dependencies:
- dependency-name: cuelang.org/go
  dependency-version: 0.15.1
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
- dependency-name: github.com/golang-jwt/jwt/v5
  dependency-version: 5.3.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
- dependency-name: github.com/itchyny/gojq
  dependency-version: 0.12.18
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: production-dependencies
- dependency-name: github.com/itchyny/timefmt-go
  dependency-version: 0.1.7
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: production-dependencies
- dependency-name: github.com/klauspost/compress
  dependency-version: 1.18.2
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: production-dependencies
- dependency-name: github.com/linkedin/goavro/v2
  dependency-version: 2.14.1
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: production-dependencies
- dependency-name: github.com/rickb777/period
  dependency-version: 1.0.21
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: production-dependencies
- dependency-name: github.com/stretchr/testify
  dependency-version: 1.11.1
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
- dependency-name: go.opentelemetry.io/otel
  dependency-version: 1.38.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
- dependency-name: go.opentelemetry.io/otel/trace
  dependency-version: 1.38.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
- dependency-name: golang.org/x/crypto
  dependency-version: 0.45.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
- dependency-name: golang.org/x/oauth2
  dependency-version: 0.32.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
- dependency-name: golang.org/x/sync
  dependency-version: 0.18.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
- dependency-name: golang.org/x/text
  dependency-version: 0.31.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
- dependency-name: github.com/gofrs/uuid/v5
  dependency-version: 5.4.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
...

Signed-off-by: dependabot[bot] <[email protected]>
The test's Eventually check was passing immediately on array length instead of waiting for the 5ms timeout to fire. This caused the test to call ackFn before the automatic nack, canceling the timeout and setting the ack result to nil instead of the expected error.

Update the check to verify the specific timeout error message, ensuring the automatic nack completes before proceeding.
Bumps the production-dependencies group with 2 updates: [golang.org/x/oauth2](https://github.com/golang/oauth2) and [golang.org/x/sync](https://github.com/golang/sync).


Updates `golang.org/x/oauth2` from 0.32.0 to 0.34.0
- [Commits](golang/oauth2@v0.32.0...v0.34.0)

Updates `golang.org/x/sync` from 0.18.0 to 0.19.0
- [Commits](golang/sync@v0.18.0...v0.19.0)

---
updated-dependencies:
- dependency-name: golang.org/x/oauth2
  dependency-version: 0.34.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
- dependency-name: golang.org/x/sync
  dependency-version: 0.19.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: production-dependencies
...

Signed-off-by: dependabot[bot] <[email protected]>
@artemklevtsov
Copy link
Contributor Author

artemklevtsov commented Dec 9, 2025

Looks like I merged the main upstream branch. My mistake.
I can send a new request if review of the current one is difficult.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants