1414
1515package com .github .pgasync ;
1616
17- import com .github .pgasync .conversion .DataConverter ;
1817import com .pgasync .Connection ;
1918import com .pgasync .Listening ;
2019import com .pgasync .PreparedStatement ;
2120import com .pgasync .Row ;
2221import com .pgasync .SqlException ;
23- import com .pgasync .ConnectionPool ;
24- import com .pgasync .ConnectionPoolBuilder ;
22+ import com .pgasync .ConnectibleBuilder ;
2523import com .pgasync .ResultSet ;
2624import com .pgasync .Transaction ;
2725
2826import javax .annotation .concurrent .GuardedBy ;
29- import java .net .InetSocketAddress ;
30- import java .nio .charset .Charset ;
3127import java .util .ArrayList ;
3228import java .util .Collection ;
29+ import java .util .Iterator ;
3330import java .util .LinkedHashMap ;
3431import java .util .LinkedList ;
3532import java .util .Map ;
4239import java .util .function .Function ;
4340import java .util .logging .Level ;
4441import java .util .logging .Logger ;
45- import java .util .stream .Collectors ;
4642
4743/**
48- * Pool for backend connections.
44+ * Resource pool for backend connections.
4945 *
5046 * @author Antti Laisi
5147 */
52- public abstract class PgConnectionPool implements ConnectionPool {
48+ public abstract class PgConnectionPool extends PgConnectible {
5349
5450 private class PooledPgConnection implements Connection {
5551
@@ -118,15 +114,29 @@ boolean isConnected() {
118114 return delegate .isConnected ();
119115 }
120116
117+ private void closeNextStatement (Iterator <PooledPgPreparedStatement > statementsSource , CompletableFuture <Void > onComplete ) {
118+ if (statementsSource .hasNext ()) {
119+ statementsSource .next ().delegate .close ()
120+ .thenAccept (v -> {
121+ statementsSource .remove ();
122+ closeNextStatement (statementsSource , onComplete );
123+ })
124+ .exceptionally (th -> {
125+ futuresExecutor .execute (() -> onComplete .completeExceptionally (th ));
126+ return null ;
127+ });
128+ } else {
129+ onComplete .completeAsync (() -> null , futuresExecutor );
130+ }
131+ }
132+
121133 CompletableFuture <Void > shutdown () {
122- CompletableFuture <?>[] closeTasks = statements .values ().stream ()
123- .map (stmt -> stmt .delegate .close ())
124- .collect (Collectors .toList ()).toArray (new CompletableFuture <?>[]{});
125- statements .clear ();
126- return CompletableFuture .allOf (closeTasks )
134+ CompletableFuture <Void > onComplete = new CompletableFuture <>();
135+ closeNextStatement (statements .values ().iterator (), onComplete );
136+ return onComplete
127137 .thenApply (v -> {
128138 if (!statements .isEmpty ()) {
129- Logger . getLogger ( PooledPgConnection . class . getName ()). log ( Level . WARNING , "Stale prepared statements detected {0}" , statements .size ());
139+ throw new IllegalStateException ( "Stale prepared statements detected (" + statements .size () + ")" );
130140 }
131141 return delegate .close ();
132142 })
@@ -155,7 +165,20 @@ public CompletableFuture<Void> script(BiConsumer<Map<String, PgColumn>, PgColumn
155165
156166 @ Override
157167 public CompletableFuture <Integer > query (BiConsumer <Map <String , PgColumn >, PgColumn []> onColumns , Consumer <Row > onRow , String sql , Object ... params ) {
158- return delegate .query (onColumns , onRow , sql , params );
168+ return prepareStatement (sql , dataConverter .assumeTypes (params ))
169+ .thenApply (stmt ->
170+ stmt .fetch (onColumns , onRow , params )
171+ .handle ((affected , th ) ->
172+ stmt .close ()
173+ .thenApply (v -> {
174+ if (th == null ) {
175+ return affected ;
176+ } else {
177+ throw new RuntimeException (th );
178+ }
179+ })
180+ ).thenCompose (Function .identity ())
181+ ).thenCompose (Function .identity ());
159182 }
160183
161184 @ Override
@@ -171,6 +194,7 @@ public CompletableFuture<PreparedStatement> prepareStatement(String sql, Oid...
171194
172195 private class PooledPgPreparedStatement implements PreparedStatement {
173196
197+ private static final String DUPLICATED_PREPARED_STATEMENT_DETECTED = "Duplicated prepared statement detected. Closing extra instance. \n {0}" ;
174198 private final String sql ;
175199 private final PgConnection .PgPreparedStatement delegate ;
176200
@@ -181,15 +205,27 @@ private PooledPgPreparedStatement(String sql, PgConnection.PgPreparedStatement d
181205
182206 @ Override
183207 public CompletableFuture <Void > close () {
184- statements .put (sql , this );
208+ PooledPgPreparedStatement already = statements .put (sql , this );
185209 if (evicted != null ) {
186210 try {
187- return evicted .delegate .close ();
211+ if (already != null && already != evicted ) {
212+ Logger .getLogger (PgConnectionPool .class .getName ()).log (Level .WARNING , DUPLICATED_PREPARED_STATEMENT_DETECTED , already .sql );
213+ return evicted .delegate .close ()
214+ .thenApply (v -> already .delegate .close ())
215+ .thenCompose (Function .identity ());
216+ } else {
217+ return evicted .delegate .close ();
218+ }
188219 } finally {
189220 evicted = null ;
190221 }
191222 } else {
192- return CompletableFuture .completedFuture (null );
223+ if (already != null ) {
224+ Logger .getLogger (PgConnectionPool .class .getName ()).log (Level .WARNING , DUPLICATED_PREPARED_STATEMENT_DETECTED , already .sql );
225+ return already .delegate .close ();
226+ } else {
227+ return CompletableFuture .completedFuture (null );
228+ }
193229 }
194230 }
195231
@@ -207,9 +243,7 @@ public CompletableFuture<Integer> fetch(BiConsumer<Map<String, PgColumn>, PgColu
207243
208244 private final int maxConnections ;
209245 private final int maxStatements ;
210- private final String validationQuery ;
211246 private final ReentrantLock lock = new ReentrantLock ();
212- protected final Charset encoding ;
213247
214248 @ GuardedBy ("lock" )
215249 private int size ;
@@ -220,25 +254,10 @@ public CompletableFuture<Integer> fetch(BiConsumer<Map<String, PgColumn>, PgColu
220254 @ GuardedBy ("lock" )
221255 private final Queue <PooledPgConnection > connections = new LinkedList <>();
222256
223- private final InetSocketAddress address ;
224- private final String username ;
225- private final String password ;
226- private final String database ;
227- private final DataConverter dataConverter ;
228-
229- protected final Executor futuresExecutor ;
230-
231- public PgConnectionPool (ConnectionPoolBuilder .PoolProperties properties , Executor futuresExecutor ) {
232- this .address = InetSocketAddress .createUnresolved (properties .getHostname (), properties .getPort ());
233- this .username = properties .getUsername ();
234- this .password = properties .getPassword ();
235- this .database = properties .getDatabase ();
257+ public PgConnectionPool (ConnectibleBuilder .ConnectibleProperties properties , Executor futuresExecutor ) {
258+ super (properties , futuresExecutor );
236259 this .maxConnections = properties .getMaxConnections ();
237260 this .maxStatements = properties .getMaxStatements ();
238- this .dataConverter = properties .getDataConverter ();
239- this .validationQuery = properties .getValidationQuery ();
240- this .encoding = Charset .forName (properties .getEncoding ());
241- this .futuresExecutor = futuresExecutor ;
242261 }
243262
244263 @ Override
@@ -259,7 +278,7 @@ public CompletableFuture<Void> close() {
259278 } finally {
260279 lock .unlock ();
261280 }
262- return CompletableFuture .allOf (shutdownTasks .toArray (size -> new CompletableFuture <?>[size ] ));
281+ return CompletableFuture .allOf (shutdownTasks .toArray (CompletableFuture <?>[]:: new ));
263282 }
264283
265284 @ Override
@@ -280,7 +299,17 @@ public CompletableFuture<Connection> getConnection() {
280299 .connect (username , password , database )
281300 .thenApply (pooledConnection -> {
282301 if (validationQuery != null && !validationQuery .isBlank ()) {
283- return pooledConnection .completeQuery (validationQuery ).thenApply (rs -> pooledConnection );
302+ return pooledConnection .completeScript (validationQuery )
303+ .handle ((rss , th ) -> {
304+ if (th != null ) {
305+ return ((PooledPgConnection ) pooledConnection ).delegate .close ()
306+ .thenApply (v -> CompletableFuture .<Connection >failedFuture (th ))
307+ .thenCompose (Function .identity ());
308+ } else {
309+ return CompletableFuture .completedFuture (pooledConnection );
310+ }
311+ })
312+ .thenCompose (Function .identity ());
284313 } else {
285314 return CompletableFuture .completedFuture (pooledConnection );
286315 }
@@ -347,56 +376,4 @@ private CompletableFuture<Void> release(PooledPgConnection connection) {
347376 return shutdownTask ;
348377 }
349378
350- @ Override
351- public CompletableFuture <Transaction > begin () {
352- return getConnection ()
353- .thenApply (Connection ::begin )
354- .thenCompose (Function .identity ());
355- }
356-
357- @ Override
358- public CompletableFuture <Void > script (BiConsumer <Map <String , PgColumn >, PgColumn []> onColumns , Consumer <Row > onRow , Consumer <Integer > onAffected , String sql ) {
359- return getConnection ()
360- .thenApply (connection ->
361- connection .script (onColumns , onRow , onAffected , sql )
362- .handle ((message , th ) ->
363- connection .close ()
364- .thenApply (v -> {
365- if (th == null ) {
366- return message ;
367- } else {
368- throw new RuntimeException (th );
369- }
370- })
371- ).thenCompose (Function .identity ())
372- )
373- .thenCompose (Function .identity ());
374- }
375-
376- @ Override
377- public CompletableFuture <Integer > query (BiConsumer <Map <String , PgColumn >, PgColumn []> onColumns , Consumer <Row > onRow , String sql , Object ... params ) {
378- return getConnection ()
379- .thenApply (connection ->
380- connection .query (onColumns , onRow , sql , params )
381- .handle ((affected , th ) ->
382- connection .close ()
383- .thenApply (v -> {
384- if (th == null ) {
385- return affected ;
386- } else {
387- throw new RuntimeException (th );
388- }
389- })
390- ).thenCompose (Function .identity ())
391- )
392- .thenCompose (Function .identity ());
393- }
394-
395- /**
396- * Creates a new socket stream to the backend.
397- *
398- * @param address Server address
399- * @return Stream with no pending messages
400- */
401- protected abstract PgProtocolStream openStream (InetSocketAddress address );
402379}
0 commit comments