Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: handle tlsconfig properly for decoding metadata
Signed-off-by: Samantha Coyle <[email protected]>
  • Loading branch information
sicoyle committed Aug 11, 2025
commit 2ce4cd5e1590e62a7e65d1d4bef510408ac0e827
73 changes: 72 additions & 1 deletion bindings/rethinkdb/statechange/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,26 @@ authenticationProfiles:
required: false
description: The password for authentication. This is only used for v1 handshake protocol.
example: "password"
- title: "TLS Authentication"
description: "Authenticate using client certificate and key."
metadata:
- name: enableTLS
type: bool
required: false
description: Whether to enable TLS encryption.
example: false
default: false
- name: clientCert
type: string
required: true
description: The client certificate for TLS authentication.
example: "-----BEGIN CERTIFICATE-----\nXXX..."
- name: clientKey
type: string
required: true
description: The client key for TLS authentication.
example: "-----BEGIN PRIVATE KEY-----\nXXX..."
sensitive: true
metadata:
- name: table
type: string
Expand All @@ -69,7 +89,7 @@ metadata:
example: false
default: false
- name: numRetries
type: int
type: number
required: false
description: Number of times to retry queries on connection errors.
example: 3
Expand All @@ -85,3 +105,54 @@ metadata:
description: Whether to enable opentracing for queries.
example: false
default: false
- name: writeTimeout
type: string
required: false
description: Write timeout duration."
example: "10s"
- name: readTimeout
type: string
required: false
description: Read timeout duration."
example: "10s"
- name: handshakeVersion
type: number
required: false
description: Handshake version for RethinkDB."
example: 1
- name: keepAlivePeriod
type: string
required: false
description: Keep alive period duration."
example: "30s"
- name: maxIdle
type: number
required: false
description: Maximum number of idle connections."
example: 5
- name: authKey
type: string
required: false
description: The authentication key for RethinkDB. This field is now deprecated."
example: "auth-key"
sensitive: true
- name: initialCap
type: number
required: false
description: Initial connection pool capacity."
example: 5
- name: maxOpen
type: number
required: false
description: Maximum number of open connections."
example: 10
- name: discoverHosts
type: bool
required: false
description: Whether to discover hosts."
example: false
- name: nodeRefreshInterval
type: string
required: false
description: Node refresh interval duration."
example: "5m"
91 changes: 86 additions & 5 deletions bindings/rethinkdb/statechange/statechange.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ package statechange

import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"reflect"
"strings"
"sync"
"sync/atomic"
"time"

r "github.com/dancannon/gorethink"

Expand All @@ -44,8 +46,37 @@ type Binding struct {

// StateConfig is the binding config.
type StateConfig struct {
r.ConnectOpts `mapstructure:",squash"`
Table string `mapstructure:"table"`
ConnectOptsWrapper `mapstructure:",squash"`
Table string `mapstructure:"table"`
}

// ConnectOptsWrapper wraps r.ConnectOpts but excludes TLSConfig
// This is needed because the metadata decoder does not support nested structs with tags as inputs in the metadata.yaml file
type ConnectOptsWrapper struct {
Address string `gorethink:"address,omitempty"`
Addresses []string `gorethink:"addresses,omitempty"`
Database string `gorethink:"database,omitempty"`
Username string `gorethink:"username,omitempty"`
Password string `gorethink:"password,omitempty"`
AuthKey string `gorethink:"authkey,omitempty"`
Timeout time.Duration `gorethink:"timeout,omitempty"`
WriteTimeout time.Duration `gorethink:"write_timeout,omitempty"`
ReadTimeout time.Duration `gorethink:"read_timeout,omitempty"`
KeepAlivePeriod time.Duration `gorethink:"keep_alive_timeout,omitempty"`
HandshakeVersion int `gorethink:"handshake_version,omitempty"`
MaxIdle int `gorethink:"max_idle,omitempty"`
InitialCap int `gorethink:"initial_cap,omitempty"`
MaxOpen int `gorethink:"max_open,omitempty"`
DiscoverHosts bool `gorethink:"discover_hosts,omitempty"`
NodeRefreshInterval time.Duration `gorethink:"node_refresh_interval,omitempty"`
UseJSONNumber bool `gorethink:"use_json_number,omitempty"`
NumRetries int `gorethink:"num_retries,omitempty"`
HostDecayDuration time.Duration `gorethink:"host_decay_duration,omitempty"`
UseOpentracing bool `gorethink:"use_opentracing,omitempty"`
// TLS fields must be brought in as separate fields as they will not be processed by the metadata decoder properly without this
EnableTLS bool `gorethink:"enable_tls,omitempty"`
ClientCert string `gorethink:"client_cert,omitempty"`
ClientKey string `gorethink:"client_key,omitempty"`
}

// NewRethinkDBStateChangeBinding returns a new RethinkDB actor event input binding.
Expand All @@ -64,7 +95,40 @@ func (b *Binding) Init(ctx context.Context, metadata bindings.Metadata) error {
}
b.config = cfg

ses, err := r.Connect(b.config.ConnectOpts)
// Convert wrapper to r.ConnectOpts
connectOpts := r.ConnectOpts{
Address: cfg.Address,
Addresses: cfg.Addresses,
Database: cfg.Database,
Username: cfg.Username,
Password: cfg.Password,
AuthKey: cfg.AuthKey,
Timeout: cfg.Timeout,
WriteTimeout: cfg.WriteTimeout,
ReadTimeout: cfg.ReadTimeout,
KeepAlivePeriod: cfg.KeepAlivePeriod,
HandshakeVersion: r.HandshakeVersion(cfg.HandshakeVersion),
MaxIdle: cfg.MaxIdle,
InitialCap: cfg.InitialCap,
MaxOpen: cfg.MaxOpen,
DiscoverHosts: cfg.DiscoverHosts,
NodeRefreshInterval: cfg.NodeRefreshInterval,
UseJSONNumber: cfg.UseJSONNumber,
NumRetries: cfg.NumRetries,
HostDecayDuration: cfg.HostDecayDuration,
UseOpentracing: cfg.UseOpentracing,
}

// Configure TLS if enabled
if cfg.EnableTLS {
tlsConfig, tlsErr := createTLSConfig(cfg.ClientCert, cfg.ClientKey)
if tlsErr != nil {
return fmt.Errorf("error creating TLS config: %w", tlsErr)
}
connectOpts.TLSConfig = tlsConfig
}

ses, err := r.Connect(connectOpts)
if err != nil {
return fmt.Errorf("error connecting to the database: %w", err)
}
Expand All @@ -73,6 +137,23 @@ func (b *Binding) Init(ctx context.Context, metadata bindings.Metadata) error {
return nil
}

// createTLSConfig creates a tls.Config from client certificate and key
func createTLSConfig(clientCert, clientKey string) (*tls.Config, error) {
if clientCert == "" || clientKey == "" {
return nil, errors.New("both client certificate and key are required for TLS")
}

cert, err := tls.X509KeyPair([]byte(clientCert), []byte(clientKey))
if err != nil {
return nil, fmt.Errorf("error parsing client certificate and key: %w", err)
}

return &tls.Config{
Certificates: []tls.Certificate{cert},
MinVersion: tls.VersionTLS12,
}, nil
}

// Read triggers the RethinkDB scheduler.
func (b *Binding) Read(ctx context.Context, handler bindings.Handler) error {
if b.closed.Load() {
Expand Down Expand Up @@ -107,7 +188,7 @@ func (b *Binding) Read(ctx context.Context, handler bindings.Handler) error {
go func() {
defer b.wg.Done()
for readCtx.Err() == nil {
var change interface{}
var change any
ok := cursor.Next(&change)
if !ok {
b.logger.Errorf("error detecting change: %v", cursor.Err())
Expand Down Expand Up @@ -149,7 +230,7 @@ func (b *Binding) Close() error {
return b.session.Close()
}

func metadataToConfig(cfg map[string]string, logger logger.Logger) (StateConfig, error) {
func metadataToConfig(cfg map[string]string, _ logger.Logger) (StateConfig, error) {
c := StateConfig{}

// prepare metadata keys for decoding
Expand Down