Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/jaegertracing/jaeger-idl v0.6.0 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,6 @@ github.com/jackc/pgx/v5 v5.7.6/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8
github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v1.2.1 h1:gI8os0wpRXFd4FiAY2dWiqRK037tjj3t7rKFeO4X5iw=
github.com/jackc/puddle v1.2.1/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
Expand Down
64 changes: 64 additions & 0 deletions retriever/postgresqlretriever/postgres.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package postgresqlretriever

import (
"context"
"sync"

"github.com/jackc/pgx/v5/pgxpool"
)

type poolEntry struct {
pool *pgxpool.Pool
refCount int
}

var (
mu sync.Mutex
poolMap = make(map[string]*poolEntry)
Comment on lines +15 to +17
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The use of global variables (mu and poolMap) to implement the singleton pattern for the connection pool is a common approach for shared resources. However, global state can sometimes make testing more complex and introduce implicit dependencies. While acceptable for this specific use case, it's a design choice that should be considered carefully in larger architectures.

)

// GetPool returns a pool for a given URI, creating it if needed.
func GetPool(ctx context.Context, uri string) (*pgxpool.Pool, error) {
mu.Lock()
defer mu.Unlock()

// If already exists, bump refCount
if entry, ok := poolMap[uri]; ok {
entry.refCount++
return entry.pool, nil
}

// Create a new pool
pool, err := pgxpool.New(ctx, uri)
if err != nil {
return nil, err
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, err
}

poolMap[uri] = &poolEntry{
pool: pool,
refCount: 1,
}

return pool, nil
}

// ReleasePool decreases refCount and closes/removes when it hits zero.
func ReleasePool(ctx context.Context, uri string) {

Check warning on line 50 in retriever/postgresqlretriever/postgres.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Unused parameter 'ctx' should be removed.

See more on https://sonarcloud.io/project/issues?id=thomaspoignant_go-feature-flag&issues=AZr-Pb2bvZq0Wtfo_duT&open=AZr-Pb2bvZq0Wtfo_duT&pullRequest=4393
mu.Lock()
defer mu.Unlock()

entry, ok := poolMap[uri]
if !ok {
return // nothing to do
}

entry.refCount--
if entry.refCount <= 0 {
entry.pool.Close()
delete(poolMap, uri)
}
}
85 changes: 85 additions & 0 deletions retriever/postgresqlretriever/postgres_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package postgresqlretriever_test

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
rr "github.com/thomaspoignant/go-feature-flag/retriever/postgresqlretriever"
)

func TestGetPool_MultipleURIsAndReuse(t *testing.T) {
ctx := context.Background()

// Setup Container
req := testcontainers.ContainerRequest{
Image: "postgres:15-alpine",
ExposedPorts: []string{"5432/tcp"},
Env: map[string]string{"POSTGRES_PASSWORD": "password"},
// This waits until the log says the system is ready, preventing connection errors
WaitingFor: wait.ForAll(
wait.ForLog("database system is ready to accept connections").WithStartupTimeout(10*time.Second),
wait.ForListeningPort("5432/tcp").WithStartupTimeout(10*time.Second),
),
}

pg, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
})
assert.NoError(t, err)
defer pg.Terminate(ctx)

endpoint, err := pg.Endpoint(ctx, "")
assert.NoError(t, err)
baseURI := "postgres://postgres:password@" + endpoint + "/postgres?sslmode=disable"

// Different URIs
uri1 := baseURI + "&application_name=A"
uri2 := baseURI + "&application_name=B"

// First Calls (RefCount = 1)
pool1a, err := rr.GetPool(ctx, uri1)
assert.NoError(t, err)
assert.NotNil(t, pool1a)

pool2a, err := rr.GetPool(ctx, uri2)
assert.NoError(t, err)
assert.NotNil(t, pool2a)

// Verify distinct pools
assert.NotEqual(t, pool1a, pool2a)

// Reuse Logic (RefCount = 2 for URI1)
pool1b, err := rr.GetPool(ctx, uri1)
assert.NoError(t, err)
assert.Equal(t, pool1a, pool1b, "Should return exact same pool instance")

// Release Logic
// URI1 RefCount: 2 -> 1
rr.ReleasePool(ctx, uri1)

// URI1 RefCount: 1 -> 0 (Closed & Removed)
rr.ReleasePool(ctx, uri1)

// Recreation Logic
// URI1 should now create a NEW pool
pool1c, err := rr.GetPool(ctx, uri1)
assert.NoError(t, err)
assert.NotEqual(t, pool1a, pool1c, "Should be a new pool instance after full release")

// Cleanup new pool
rr.ReleasePool(ctx, uri1)

// URI2 Cleanup verification
rr.ReleasePool(ctx, uri2) // RefCount -> 0

pool2b, err := rr.GetPool(ctx, uri2)
assert.NoError(t, err)
assert.NotEqual(t, pool2a, pool2b, "URI2 should be recreated")

rr.ReleasePool(ctx, uri2)
}
36 changes: 18 additions & 18 deletions retriever/postgresqlretriever/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log/slog"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/thomaspoignant/go-feature-flag/retriever"
"github.com/thomaspoignant/go-feature-flag/utils"
"github.com/thomaspoignant/go-feature-flag/utils/fflog"
Expand All @@ -26,7 +27,7 @@ type Retriever struct {
logger *fflog.FFLogger
status retriever.Status
columns map[string]string
conn *pgx.Conn
pool *pgxpool.Pool
flagset *string
}

Expand All @@ -40,23 +41,19 @@ func (r *Retriever) Init(ctx context.Context, logger *fflog.FFLogger, flagset *s
r.columns = r.getColumnNames()
r.flagset = flagset

if r.conn == nil {
if r.pool == nil {
r.logger.Info("Initializing PostgreSQL retriever")
r.logger.Debug("Using columns", "columns", r.columns)

conn, err := pgx.Connect(ctx, r.URI)
pool, err := GetPool(ctx, r.URI)
if err != nil {
r.status = retriever.RetrieverError
return err
}

if err := conn.Ping(ctx); err != nil {
r.status = retriever.RetrieverError
return err
}

r.conn = conn
r.pool = pool
}

r.status = retriever.RetrieverReady
return nil
}
Expand All @@ -71,37 +68,39 @@ func (r *Retriever) Status() retriever.Status {

// Shutdown closes the database connection.
func (r *Retriever) Shutdown(ctx context.Context) error {
if r.conn == nil {
return nil
if r.pool != nil {
ReleasePool(ctx, r.URI)
r.pool = nil
}
return r.conn.Close(ctx)
return nil
}

// Retrieve fetches flag configuration from PostgreSQL.
func (r *Retriever) Retrieve(ctx context.Context) ([]byte, error) {
if r.conn == nil {
return nil, fmt.Errorf("database connection is not initialized")
if r.pool == nil {
return nil, fmt.Errorf("database connection pool is not initialized")
}

// Build the query using the configured table and column names
query := r.buildQuery()

// Build the arguments for the query
args := []interface{}{}
args := []any{}
if r.getFlagset() != "" {
args = []interface{}{r.getFlagset()}
// If a flagset is defined, it becomes the first ($1) argument.
args = []any{r.getFlagset()}
}

r.logger.Debug("Executing PostgreSQL query", slog.String("query", query), slog.Any("args", args))

rows, err := r.conn.Query(ctx, query, args...)
rows, err := r.pool.Query(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("failed to execute query: %w", err)
}
defer rows.Close()

// Map to store flag configurations with flag_name as key
flagConfigs := make(map[string]interface{})
flagConfigs := make(map[string]any)

for rows.Next() {
var flagName string
Expand All @@ -121,6 +120,7 @@ func (r *Retriever) Retrieve(ctx context.Context) ([]byte, error) {
flagConfigs[flagName] = config
}

// Check for any errors that occurred during row iteration
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("rows iteration error: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion retriever/postgresqlretriever/retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func TestRetrieverDataHandlingErrors(t *testing.T) {
assert.Contains(t, err.Error(), "failed to execute query")
}
})
}
}

// TestRetrieverEdgeCases tests additional edge cases and boundary conditions
func TestRetrieverEdgeCases(t *testing.T) {
Expand Down
Loading