-
-
Notifications
You must be signed in to change notification settings - Fork 185
feat: shared pgx-pool #4393
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: shared pgx-pool #4393
Changes from 6 commits
6823c96
7a3eb53
a0082b1
e9ff21c
117ae1a
4617a05
5d97bb0
cde1664
2852577
fa090e2
228208a
de4ed7f
e3f6a11
a0bd5ce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| package postgresqlretriever | ||
|
|
||
| import ( | ||
| "context" | ||
| "sync" | ||
|
|
||
| "github.com/jackc/pgx/v5/pgxpool" | ||
| ) | ||
|
|
||
| var ( | ||
| pool *pgxpool.Pool | ||
| mu sync.Mutex | ||
| refCount int | ||
| ) | ||
|
|
||
| func GetPool(ctx context.Context, uri string) (*pgxpool.Pool, error) { | ||
|
Check warning on line 16 in retriever/postgresqlretriever/database.go
|
||
| mu.Lock() | ||
| defer mu.Unlock() | ||
|
|
||
| if pool == nil { | ||
| p, err := pgxpool.New(ctx, uri) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| if err := p.Ping(ctx); err != nil { | ||
| p.Close() | ||
| return nil, err | ||
| } | ||
|
|
||
| pool = p | ||
| } | ||
|
|
||
| refCount++ | ||
| return pool, nil | ||
| } | ||
|
|
||
| func ReleasePool() { | ||
| mu.Lock() | ||
| defer mu.Unlock() | ||
|
|
||
| refCount-- | ||
| if refCount <= 0 { | ||
| if pool != nil { | ||
| pool.Close() | ||
| pool = nil | ||
| } | ||
| refCount = 0 | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -7,6 +7,7 @@ | |||||||||||||||||||||||||||||
| "log/slog" | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| "github.com/jackc/pgx/v5" | ||||||||||||||||||||||||||||||
| "github.com/jackc/pgx/v5/pgxpool" | ||||||||||||||||||||||||||||||
| "github.com/thomaspoignant/go-feature-flag/retriever" | ||||||||||||||||||||||||||||||
| "github.com/thomaspoignant/go-feature-flag/utils" | ||||||||||||||||||||||||||||||
| "github.com/thomaspoignant/go-feature-flag/utils/fflog" | ||||||||||||||||||||||||||||||
|
|
@@ -26,7 +27,7 @@ | |||||||||||||||||||||||||||||
| logger *fflog.FFLogger | ||||||||||||||||||||||||||||||
| status retriever.Status | ||||||||||||||||||||||||||||||
| columns map[string]string | ||||||||||||||||||||||||||||||
| conn *pgx.Conn | ||||||||||||||||||||||||||||||
| pool *pgxpool.Pool | ||||||||||||||||||||||||||||||
| flagset *string | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
|
|
@@ -40,23 +41,19 @@ | |||||||||||||||||||||||||||||
| r.columns = r.getColumnNames() | ||||||||||||||||||||||||||||||
| r.flagset = flagset | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| if r.conn == nil { | ||||||||||||||||||||||||||||||
| if r.pool == nil { | ||||||||||||||||||||||||||||||
| r.logger.Info("Initializing PostgreSQL retriever") | ||||||||||||||||||||||||||||||
| r.logger.Debug("Using columns", "columns", r.columns) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| conn, err := pgx.Connect(ctx, r.URI) | ||||||||||||||||||||||||||||||
| pool, err := GetPool(ctx, r.URI) | ||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||
| r.status = retriever.RetrieverError | ||||||||||||||||||||||||||||||
| return err | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| if err := conn.Ping(ctx); err != nil { | ||||||||||||||||||||||||||||||
| r.status = retriever.RetrieverError | ||||||||||||||||||||||||||||||
| return err | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| r.conn = conn | ||||||||||||||||||||||||||||||
| r.pool = pool | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| r.status = retriever.RetrieverReady | ||||||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
@@ -70,38 +67,37 @@ | |||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // Shutdown closes the database connection. | ||||||||||||||||||||||||||||||
| func (r *Retriever) Shutdown(ctx context.Context) error { | ||||||||||||||||||||||||||||||
|
Check warning on line 70 in retriever/postgresqlretriever/retriever.go
|
||||||||||||||||||||||||||||||
| if r.conn == nil { | ||||||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| return r.conn.Close(ctx) | ||||||||||||||||||||||||||||||
| ReleasePool() | ||||||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
Comment on lines
70
to
77
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To support the URI-aware pool manager, the I've also added a
Suggested change
|
||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // Retrieve fetches flag configuration from PostgreSQL. | ||||||||||||||||||||||||||||||
| func (r *Retriever) Retrieve(ctx context.Context) ([]byte, error) { | ||||||||||||||||||||||||||||||
| if r.conn == nil { | ||||||||||||||||||||||||||||||
| return nil, fmt.Errorf("database connection is not initialized") | ||||||||||||||||||||||||||||||
| if r.pool == nil { | ||||||||||||||||||||||||||||||
| return nil, fmt.Errorf("database connection pool is not initialized") | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // Build the query using the configured table and column names | ||||||||||||||||||||||||||||||
| query := r.buildQuery() | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // Build the arguments for the query | ||||||||||||||||||||||||||||||
| args := []interface{}{} | ||||||||||||||||||||||||||||||
| args := []any{} | ||||||||||||||||||||||||||||||
gogbog marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||
| if r.getFlagset() != "" { | ||||||||||||||||||||||||||||||
| args = []interface{}{r.getFlagset()} | ||||||||||||||||||||||||||||||
| // If a flagset is defined, it becomes the first ($1) argument. | ||||||||||||||||||||||||||||||
| args = []any{r.getFlagset()} | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| r.logger.Debug("Executing PostgreSQL query", slog.String("query", query), slog.Any("args", args)) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| rows, err := r.conn.Query(ctx, query, args...) | ||||||||||||||||||||||||||||||
| rows, err := r.pool.Query(ctx, query, args...) | ||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||
| return nil, fmt.Errorf("failed to execute query: %w", err) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| defer rows.Close() | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // Map to store flag configurations with flag_name as key | ||||||||||||||||||||||||||||||
| flagConfigs := make(map[string]interface{}) | ||||||||||||||||||||||||||||||
| flagConfigs := make(map[string]any) | ||||||||||||||||||||||||||||||
gogbog marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| for rows.Next() { | ||||||||||||||||||||||||||||||
| var flagName string | ||||||||||||||||||||||||||||||
|
|
@@ -121,6 +117,7 @@ | |||||||||||||||||||||||||||||
| flagConfigs[flagName] = config | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // Check for any errors that occurred during row iteration | ||||||||||||||||||||||||||||||
| if err := rows.Err(); err != nil { | ||||||||||||||||||||||||||||||
| return nil, fmt.Errorf("rows iteration error: %w", err) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current singleton implementation for the connection pool doesn't support multiple database URIs. The global
poolvariable will be initialized once with the URI from the first retriever, and all subsequent retrievers, regardless of their configuredURI, will receive this same pool. This can lead to retrievers connecting to the wrong database.To fix this, the pool management should be URI-aware. I suggest using maps to store a connection pool and a reference count for each unique URI. This ensures that each database has its own dedicated pool, which is shared only by retrievers connecting to that same database.