Skip to content

Commit 5353239

Browse files
Performance test refactored
1 parent 25ed16f commit 5353239

File tree

4 files changed

+101
-140
lines changed

4 files changed

+101
-140
lines changed

src/main/java/com/github/pgasync/PgConnection.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.util.List;
2424
import java.util.Map;
2525
import java.util.concurrent.CompletableFuture;
26-
import java.util.concurrent.atomic.AtomicReference;
2726
import java.util.function.BiConsumer;
2827
import java.util.function.Consumer;
2928
import java.util.function.Function;
@@ -133,6 +132,8 @@ private String next() {
133132
private final DataConverter dataConverter;
134133
private final Charset encoding;
135134

135+
private Columns currentColumns;
136+
136137
PgConnection(PgProtocolStream stream, DataConverter dataConverter, Charset encoding) {
137138
this.stream = stream;
138139
this.dataConverter = dataConverter;
@@ -179,16 +180,17 @@ public CompletableFuture<Void> script(BiConsumer<Map<String, PgColumn>, PgColumn
179180
if (sql == null || sql.isBlank()) {
180181
throw new IllegalArgumentException("'sql' shouldn't be null or empty or blank string");
181182
}
182-
AtomicReference<Columns> columnsRef = new AtomicReference<>();
183183
return stream.send(
184184
new Query(sql),
185185
columnDescriptions -> {
186-
Columns columns = calcColumns(columnDescriptions);
187-
columnsRef.set(columns);
188-
onColumns.accept(columns.byName, columns.ordered);
186+
currentColumns = calcColumns(columnDescriptions);
187+
onColumns.accept(currentColumns.byName, currentColumns.ordered);
189188
},
190-
message -> onRow.accept(new PgRow(message, columnsRef.get().byName, columnsRef.get().ordered, dataConverter)),
191-
message -> onAffected.accept(message.getAffectedRows())
189+
message -> onRow.accept(new PgRow(message, currentColumns.byName, currentColumns.ordered, dataConverter)),
190+
message -> {
191+
currentColumns = null;
192+
onAffected.accept(message.getAffectedRows());
193+
}
192194
);
193195
}
194196

src/main/java/com/github/pgasync/netty/NettyPgProtocolStream.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ public CompletableFuture<Integer> send(Bind bind, Describe describe, Consumer<Ro
155155
this.onAffected = null;
156156
return offerRoundTrip(() -> {
157157
lastSentMessage = new Execute();
158-
write(bind, describe, lastSentMessage);
158+
write(bind, describe, lastSentMessage, FIndicators.SYNC);
159159
}).thenApply(commandComplete -> ((CommandComplete) commandComplete).getAffectedRows());
160160
}
161161

@@ -166,7 +166,7 @@ public CompletableFuture<Integer> send(Bind bind, Consumer<DataRow> onRow) {
166166
this.onAffected = null;
167167
return offerRoundTrip(() -> {
168168
lastSentMessage = new Execute();
169-
write(bind, lastSentMessage);
169+
write(bind, lastSentMessage, FIndicators.SYNC);
170170
}).thenApply(commandComplete -> ((CommandComplete) commandComplete).getAffectedRows());
171171
}
172172

@@ -239,9 +239,6 @@ private void respondWithMessage(Message message) {
239239
} else if (message instanceof ErrorResponse) {
240240
if (seenReadyForQuery) {
241241
readyForQueryPendingMessage = message;
242-
if (isExtendedQueryInProgress()) {
243-
write(FIndicators.SYNC);
244-
}
245242
} else {
246243
respondWithException(toSqlException((ErrorResponse) message));
247244
}
@@ -253,7 +250,6 @@ private void respondWithMessage(Message message) {
253250
// "During simple query message flow, CommandComplete message should be consumed only by dedicated callback,
254251
// due to possibility of multiple CommandComplete messages, one per sql clause.";
255252
readyForQueryPendingMessage = message;
256-
write(FIndicators.SYNC);
257253
}
258254
} else if (message instanceof Authentication) {
259255
Authentication authentication = (Authentication) message;

src/main/java/com/pgasync/QueryExecutor.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,20 +34,23 @@ public interface QueryExecutor {
3434
*/
3535
default CompletableFuture<Collection<ResultSet>> completeScript(String sql) {
3636
List<ResultSet> results = new ArrayList<>();
37-
AtomicReference<Map<String, PgColumn>> columnsByNameRef = new AtomicReference<>();
38-
AtomicReference<PgColumn[]> orderedColumnsRef = new AtomicReference<>();
39-
AtomicReference<List<Row>> rowsRef = new AtomicReference<>();
37+
class ResultSetAssembly {
38+
private Map<String, PgColumn> columnsByName;
39+
private PgColumn[] orderedColumns;
40+
private List<Row> rows;
41+
}
42+
ResultSetAssembly assembly = new ResultSetAssembly();
4043
return script(
4144
(columnsByName, orderedColumns) -> {
42-
columnsByNameRef.set(columnsByName);
43-
orderedColumnsRef.set(orderedColumns);
44-
rowsRef.set(new ArrayList<>());
45+
assembly.columnsByName = columnsByName;
46+
assembly.orderedColumns = orderedColumns;
47+
assembly.rows = new ArrayList<>();
4548
},
46-
row -> rowsRef.get().add(row),
49+
row -> assembly.rows.add(row),
4750
affected -> results.add(new PgResultSet(
48-
columnsByNameRef.get() != null ? columnsByNameRef.get() : Map.of(),
49-
orderedColumnsRef.get() != null ? List.of(orderedColumnsRef.get()) : List.of(),
50-
rowsRef.get() != null ? rowsRef.get() : List.of(),
51+
assembly.columnsByName != null ? assembly.columnsByName : Map.of(),
52+
assembly.orderedColumns != null ? List.of(assembly.orderedColumns) : List.of(),
53+
assembly.rows != null ? assembly.rows : List.of(),
5154
affected
5255
)),
5356
sql

src/test/java/com/github/pgasync/PerformanceTest.java

Lines changed: 77 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -14,28 +14,31 @@
1414

1515
package com.github.pgasync;
1616

17-
import com.pgasync.Connection;
1817
import com.pgasync.ConnectionPool;
18+
import com.pgasync.PreparedStatement;
1919
import org.junit.*;
2020
import org.junit.runner.RunWith;
2121
import org.junit.runners.Parameterized;
2222
import org.junit.runners.Parameterized.Parameters;
2323

2424
import java.util.*;
2525
import java.util.concurrent.*;
26+
import java.util.concurrent.atomic.LongAdder;
2627
import java.util.function.Function;
2728
import java.util.stream.IntStream;
29+
import java.util.stream.LongStream;
2830

2931
import static com.github.pgasync.DatabaseRule.createPoolBuilder;
3032
import static java.lang.System.currentTimeMillis;
3133
import static java.lang.System.out;
32-
import static java.util.concurrent.TimeUnit.MILLISECONDS;
3334
import static org.junit.runners.MethodSorters.NAME_ASCENDING;
3435

3536
@RunWith(Parameterized.class)
3637
@FixMethodOrder(NAME_ASCENDING)
3738
public class PerformanceTest {
3839

40+
private static final String SELECT_42 = "select 42";
41+
3942
@ClassRule
4043
public static DatabaseRule dbr = new DatabaseRule(createPoolBuilder(1));
4144

@@ -69,7 +72,8 @@ public PerformanceTest(int poolSize, int numThreads) {
6972
this.numThreads = numThreads;
7073
pool = dbr.builder
7174
.password("async-pg")
72-
.maxConnections(poolSize).build();
75+
.maxConnections(poolSize)
76+
.build();
7377
}
7478

7579
@After
@@ -79,140 +83,96 @@ public void close() {
7983

8084
@Test(timeout = 2000)
8185
public void t1_preAllocatePool() throws Exception {
82-
List<Connection> connections = new ArrayList<>();
8386
CompletableFuture.allOf((CompletableFuture<?>[]) IntStream.range(0, poolSize)
84-
.mapToObj(i -> pool.getConnection().thenAccept(connections::add))
87+
.mapToObj(i -> pool.getConnection()
88+
.thenApply(connection ->
89+
connection.prepareStatement(SELECT_42)
90+
.thenApply(PreparedStatement::close)
91+
.thenCompose(Function.identity())
92+
.thenApply(v -> connection.close())
93+
.thenCompose(Function.identity())
94+
)
95+
.thenCompose(Function.identity())
96+
)
8597
.toArray(size -> new CompletableFuture<?>[size])
8698
).get();
87-
connections.forEach(Connection::close);
8899
}
89100

90101
@Test
91-
public void t3_run() throws Exception {
92-
Collection<Callable<Long>> tasks = new ArrayList<>();
93-
for (int i = 0; i < batchSize; ++i) {
94-
tasks.add(new Callable<>() {
95-
final Exchanger<Long> swap = new Exchanger<>();
96-
97-
@Override
98-
public Long call() throws Exception {
99-
100-
pool.getConnection()
101-
.thenApply(connection -> connection.prepareStatement("select 42")
102-
.thenApply(stmt ->
103-
stmt.query()
104-
.thenAccept(res -> {
105-
try {
106-
swap.exchange(currentTimeMillis());
107-
} catch (Exception e) {
108-
throw new AssertionError(e);
109-
}
110-
})
111-
.handle((v, th) ->
112-
stmt.close()
113-
.thenAccept(_v -> {
114-
if (th != null)
115-
throw new RuntimeException(th);
116-
})
117-
)
118-
.thenCompose(Function.identity())
119-
)
120-
.thenCompose(Function.identity())
121-
.handle((v, th) -> connection.close()
122-
.thenAccept(_v -> {
123-
if (th != null) {
124-
throw new RuntimeException(th);
125-
}
126-
}))
127-
.thenCompose(Function.identity())
128-
)
129-
.thenCompose(Function.identity())
130-
.exceptionally(th -> {
131-
throw new AssertionError(th);
132-
});
133-
134-
/*
135-
pool.completeScript("select 42")
136-
.thenAccept(r -> {
137-
try {
138-
swap.exchange(currentTimeMillis());
139-
} catch (Exception e) {
140-
throw new AssertionError(e);
141-
}
142-
})
143-
.exceptionally(th -> {
144-
throw new AssertionError(th);
145-
});
146-
*/
147-
return swap.exchange(null);
148-
}
149-
});
150-
}
151-
152-
long minTime = Long.MAX_VALUE;
153-
154-
for (int r = 0; r < repeats; ++r) {
155-
MILLISECONDS.sleep(300);
156-
157-
final CyclicBarrier barrier = new CyclicBarrier(numThreads + 1);
158-
159-
final Queue<Callable<Long>> taskQueue = new LinkedBlockingQueue<>(tasks);
160-
final Queue<Long> endTimes = new ArrayBlockingQueue<>(batchSize);
161-
162-
Thread[] threads = new Thread[numThreads];
163-
for (int i = 0; i < numThreads; ++i) {
164-
threads[i] = new Thread("tester" + i) {
165-
public void run() {
166-
try {
167-
barrier.await();
168-
} catch (InterruptedException | BrokenBarrierException e) {
169-
e.printStackTrace();
170-
}
171-
172-
Callable<Long> c;
173-
try {
174-
while ((c = taskQueue.poll()) != null) {
175-
endTimes.add(c.call());
176-
}
177-
} catch (Exception e) {
178-
e.printStackTrace();
179-
}
102+
public void t3_run() {
103+
double mean = LongStream.range(0, repeats)
104+
.map(i -> {
105+
try {
106+
return performBatch();
107+
} catch (Exception ex) {
108+
throw new RuntimeException(ex);
180109
}
181-
};
182-
threads[i].start();
183-
}
184-
185-
long start = currentTimeMillis();
186-
barrier.await();
110+
})
111+
.average().getAsDouble();
112+
results.computeIfAbsent(poolSize + " conn", k -> new TreeMap<>())
113+
.put(numThreads, Math.round(mean));
114+
}
187115

188-
for (Thread thread : threads) {
189-
thread.join();
190-
}
116+
private long performBatch() throws Exception {
117+
List<CompletableFuture<Void>> batchFutures = new ArrayList<>();
118+
long startTime = currentTimeMillis();
119+
for (int i = 0; i < batchSize; i++) {
120+
121+
batchFutures.add(pool.getConnection()
122+
.thenApply(connection -> connection.prepareStatement(SELECT_42)
123+
.thenApply(stmt -> {
124+
return stmt.query()
125+
.handle((v, th) ->
126+
stmt.close()
127+
.thenAccept(_v -> {
128+
if (th != null) {
129+
throw new RuntimeException(th);
130+
}
131+
})
132+
)
133+
.thenCompose(Function.identity());
134+
}
135+
)
136+
.thenCompose(Function.identity())
137+
.handle((v, th) -> connection.close()
138+
.thenAccept(_v -> {
139+
if (th != null) {
140+
throw new RuntimeException(th);
141+
}
142+
}))
143+
.thenCompose(Function.identity())
144+
)
145+
.thenCompose(Function.identity())
146+
.exceptionally(th -> {
147+
throw new AssertionError(th);
148+
}));
191149

192-
OptionalLong end = endTimes.stream().mapToLong(f -> f).max();
193-
long time = end.getAsLong() - start;
194-
minTime = Math.min(minTime, time);
150+
/*
151+
batchFutures.add(pool.completeScript("select 42").thenAccept(rs -> {
152+
}));
153+
*/
195154
}
196-
197-
results.get(key(poolSize)).put(numThreads, minTime);
198-
199-
out.printf("\t%d\t%2d\t%4.3f\t%n", poolSize, numThreads, minTime / 1000.0);
155+
CompletableFuture
156+
.allOf(batchFutures.toArray(new CompletableFuture<?>[]{}))
157+
.get();
158+
long duration = currentTimeMillis() - startTime;
159+
return duration;
200160
}
201161

202162
@AfterClass
203163
public static void printResults() {
204164
out.println();
205165
out.println("Requests per second, Hz:");
206-
out.print(" threads");
207-
results.keySet().forEach(i -> out.printf("\t\t%s\t", i));
166+
out.print(" threads");
167+
results.keySet().forEach(i -> out.printf("\t%s\t", i));
208168
out.println();
209169

210170
results.values().iterator().next().keySet().forEach(threads -> {
211171
out.print(" " + threads);
212-
results.keySet().forEach(conns -> {
213-
long millis = results.get(conns).get(threads);
214-
double rps = batchSize * 1000 / (double) millis;
215-
out.printf("\t\t%f", rps);
172+
results.keySet().forEach(connections -> {
173+
long batchDuration = results.get(connections).get(threads);
174+
double rps = 1000 * batchSize / (double) batchDuration;
175+
out.printf("\t\t%d", Math.round(rps));
216176
});
217177
out.println();
218178
});

0 commit comments

Comments
 (0)