From ed8d7677d43c9ef6e601ea3cfef01aa11c5f4d98 Mon Sep 17 00:00:00 2001 From: Paultagoras Date: Wed, 26 Mar 2025 19:00:14 -0400 Subject: [PATCH] Quick test of compression buffer reuse --- .../api/data_formats/NativeFormatReader.java | 8 +- ...owBinaryWithNamesAndTypesFormatReader.java | 2 +- .../RowBinaryWithNamesFormatReader.java | 4 +- .../internal/AbstractBinaryFormatReader.java | 3 +- .../internal/BinaryStreamReader.java | 166 ++++++++++++++++-- .../api/internal/HttpAPIClientHelper.java | 34 ++-- .../clickhouse/benchmark/BenchmarkRunner.java | 25 +-- .../benchmark/clients/BenchmarkBase.java | 3 +- 8 files changed, 181 insertions(+), 64 deletions(-) diff --git a/client-v2/src/main/java/com/clickhouse/client/api/data_formats/NativeFormatReader.java b/client-v2/src/main/java/com/clickhouse/client/api/data_formats/NativeFormatReader.java index 0d837f0a1..c0e3e7aab 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/data_formats/NativeFormatReader.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/data_formats/NativeFormatReader.java @@ -64,12 +64,12 @@ protected boolean readRecord(Object[] record) throws IOException { private boolean readBlock() throws IOException { int nColumns; try { - nColumns = BinaryStreamReader.readVarInt(input); + nColumns = binaryStreamReader.readVarInt(input); } catch (EOFException e) { endReached(); return false; } - int nRows = BinaryStreamReader.readVarInt(input); + int nRows = binaryStreamReader.readVarInt(input); List names = new ArrayList<>(nColumns); List types = new ArrayList<>(nColumns); @@ -77,8 +77,8 @@ private boolean readBlock() throws IOException { TableSchema schema = new TableSchema(); for (int i = 0; i < nColumns; i++) { - schema.addColumn(BinaryStreamReader.readString(input), - BinaryStreamReader.readString(input)); + schema.addColumn(binaryStreamReader.readString(input), + binaryStreamReader.readString(input)); ClickHouseColumn column = schema.getColumns().get(i); names.add(column.getColumnName()); diff --git a/client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryWithNamesAndTypesFormatReader.java b/client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryWithNamesAndTypesFormatReader.java index e5272602f..9fe3f5a0e 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryWithNamesAndTypesFormatReader.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryWithNamesAndTypesFormatReader.java @@ -28,7 +28,7 @@ private void readSchema() { List columns = new ArrayList<>(); int nCol; try { - nCol = BinaryStreamReader.readVarInt(input); + nCol = binaryStreamReader.readVarInt(input); } catch (EOFException e) { endReached(); return; diff --git a/client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryWithNamesFormatReader.java b/client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryWithNamesFormatReader.java index 3fe8769ee..09a3df2b3 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryWithNamesFormatReader.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/data_formats/RowBinaryWithNamesFormatReader.java @@ -21,7 +21,7 @@ public RowBinaryWithNamesFormatReader(InputStream inputStream, QuerySettings que super(inputStream, querySettings, schema, byteBufferAllocator); int nCol = 0; try { - nCol = BinaryStreamReader.readVarInt(input); + nCol = binaryStreamReader.readVarInt(input); } catch (EOFException e) { endReached(); columns = Collections.emptyList(); @@ -33,7 +33,7 @@ public RowBinaryWithNamesFormatReader(InputStream inputStream, QuerySettings que columns = new ArrayList<>(nCol); try { for (int i = 0; i < nCol; i++) { - columns.add(BinaryStreamReader.readString(input)); + columns.add(binaryStreamReader.readString(input)); } } catch (IOException e) { throw new RuntimeException("Failed to read header", e); diff --git a/client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/AbstractBinaryFormatReader.java b/client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/AbstractBinaryFormatReader.java index cebd72bb3..d74157112 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/AbstractBinaryFormatReader.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/AbstractBinaryFormatReader.java @@ -76,7 +76,8 @@ protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings quer } boolean jsonAsString = MapUtils.getFlag(settings, ClientConfigProperties.serverSetting(ServerSettings.OUTPUT_FORMAT_BINARY_WRITE_JSON_AS_STRING), false); - this.binaryStreamReader = new BinaryStreamReader(inputStream, timeZone, LOG, byteBufferAllocator, jsonAsString); + this.binaryStreamReader = new BinaryStreamReader(inputStream, timeZone, LOG, byteBufferAllocator, jsonAsString, + MapUtils.getFlag(querySettings == null ? Collections.emptyMap() : querySettings.getAllSettings(), ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getKey(), false)); if (schema != null) { setSchema(schema); } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/BinaryStreamReader.java b/client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/BinaryStreamReader.java index a6e7630b4..3499119cd 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/BinaryStreamReader.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/BinaryStreamReader.java @@ -1,10 +1,18 @@ package com.clickhouse.client.api.data_formats.internal; +import com.clickhouse.client.api.ClientConfigProperties; import com.clickhouse.client.api.ClientException; +import com.clickhouse.client.api.internal.ClickHouseLZ4OutputStream; +import com.clickhouse.client.api.internal.MapUtils; +import com.clickhouse.data.ClickHouseByteUtils; +import com.clickhouse.data.ClickHouseCityHash; import com.clickhouse.data.ClickHouseColumn; import com.clickhouse.data.ClickHouseDataType; import com.clickhouse.data.ClickHouseEnum; +import com.clickhouse.data.ClickHouseUtils; import com.clickhouse.data.value.ClickHouseBitmap; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FastDecompressor; import org.slf4j.Logger; import org.slf4j.helpers.NOPLogger; @@ -16,6 +24,7 @@ import java.math.BigInteger; import java.net.Inet4Address; import java.net.Inet6Address; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; @@ -55,6 +64,15 @@ public class BinaryStreamReader { private final boolean jsonAsString; + private final boolean serverCompression; + private final LZ4FastDecompressor decompressor; + private ByteBuffer buffer; + static final byte MAGIC = (byte) 0x82; + static final int HEADER_LENGTH = 25; + + final byte[] headerBuff = new byte[HEADER_LENGTH]; + private byte[] tmpBuffer = new byte[1]; + /** * Createa a BinaryStreamReader instance that will use the provided buffer allocator. * @@ -63,12 +81,122 @@ public class BinaryStreamReader { * @param log - logger * @param bufferAllocator - byte buffer allocator */ - BinaryStreamReader(InputStream input, TimeZone timeZone, Logger log, ByteBufferAllocator bufferAllocator, boolean jsonAsString) { + BinaryStreamReader(InputStream input, TimeZone timeZone, Logger log, ByteBufferAllocator bufferAllocator, boolean jsonAsString, boolean serverCompression) { this.log = log == null ? NOPLogger.NOP_LOGGER : log; this.timeZone = timeZone; this.input = input; this.bufferAllocator = bufferAllocator; this.jsonAsString = jsonAsString; + + this.serverCompression = serverCompression; + + + decompressor = LZ4Factory.fastestInstance().fastDecompressor(); + this.buffer = ByteBuffer.allocate(ClickHouseLZ4OutputStream.UNCOMPRESSED_BUFF_SIZE); + this.buffer.limit(0); + } + + private int read() throws IOException { + int n = read(tmpBuffer, 0, 1); + return n == -1 ? -1 : tmpBuffer[0] & 0xFF; + } + private int read(byte[] b, int off, int len) throws IOException { + if (b == null) { + throw new NullPointerException("b is null"); + } else if (off < 0) { + throw new IndexOutOfBoundsException("off is negative"); + } else if (len < 0) { + throw new IndexOutOfBoundsException("len is negative"); + } else if (off + len > b.length) { + throw new IndexOutOfBoundsException("off + len is greater than b.length"); + } else if (len == 0) { + return 0; + } + + int readBytes = 0; + do { + int remaining = Math.min(len - readBytes, buffer.remaining()); + buffer.get(b, off + readBytes, remaining); + readBytes += remaining; + } while (readBytes < len && refill() != -1); + + return readBytes == 0 ? -1 : readBytes; + } + private int refill() throws IOException { + + // read header + boolean readFully = readFully(headerBuff, 0, HEADER_LENGTH); + if (!readFully) { + return -1; + } + + if (headerBuff[16] != MAGIC) { + // 1 byte - 0x82 (shows this is LZ4) + throw new ClientException("Invalid LZ4 magic byte: '" + headerBuff[16] + "'"); + } + + // 4 bytes - size of the compressed data including 9 bytes of the header + int compressedSizeWithHeader = getInt32(headerBuff, 17); + // 4 bytes - size of uncompressed data + int uncompressedSize = getInt32(headerBuff, 21); + + int offset = 9; + final byte[] block = new byte[compressedSizeWithHeader]; + block[0] = MAGIC; + setInt32(block, 1, compressedSizeWithHeader); + setInt32(block, 5, uncompressedSize); + // compressed data: compressed_size - 9 bytes + int remaining = compressedSizeWithHeader - offset; + + readFully = readFully(block, offset, remaining); + if (!readFully) { + throw new EOFException("Unexpected end of stream"); + } + + long[] real = ClickHouseCityHash.cityHash128(block, 0, compressedSizeWithHeader); + if (real[0] != getInt64(headerBuff, 0) || real[1] != ClickHouseByteUtils.getInt64(headerBuff, 8)) { + throw new ClientException("Corrupted stream: checksum mismatch"); + } + + if (buffer.capacity() < uncompressedSize) { + buffer = ByteBuffer.allocate(uncompressedSize); + } + decompressor.decompress(ByteBuffer.wrap(block), offset, buffer, 0, uncompressedSize); + buffer.position(0); + buffer.limit(uncompressedSize); + return uncompressedSize; + } + private boolean readFully(byte[] b, int off, int len) throws IOException { + int n = 0; + while (n < len) { + int count = input.read(b, off + n, len - n); + if (count < 0) { + if (n == 0) { + return false; + } + throw new IOException(ClickHouseUtils.format("Incomplete read: {0} of {1}", n, len)); + } + n += count; + } + + return true; + } + static int getInt32(byte[] bytes, int offset) { + return (0xFF & bytes[offset]) | ((0xFF & bytes[offset + 1]) << 8) | ((0xFF & bytes[offset + 2]) << 16) + | ((0xFF & bytes[offset + 3]) << 24); + } + static long getInt64(byte[] bytes, int offset) { + return (0xFFL & bytes[offset]) | ((0xFFL & bytes[offset + 1]) << 8) | ((0xFFL & bytes[offset + 2]) << 16) + | ((0xFFL & bytes[offset + 3]) << 24) | ((0xFFL & bytes[offset + 4]) << 32) + | ((0xFFL & bytes[offset + 5]) << 40) | ((0xFFL & bytes[offset + 6]) << 48) + | ((0xFFL & bytes[offset + 7]) << 56); + } + + static void setInt32(byte[] bytes, int offset, int value) { + bytes[offset] = (byte) (0xFF & value); + bytes[offset + 1] = (byte) (0xFF & (value >> 8)); + bytes[offset + 2] = (byte) (0xFF & (value >> 16)); + bytes[offset + 3] = (byte) (0xFF & (value >> 24)); } /** @@ -325,7 +453,7 @@ public short readShortLE() throws IOException { * @return short value * @throws IOException when IO error occurs */ - public static short readShortLE(InputStream input, byte[] buff) throws IOException { + public short readShortLE(InputStream input, byte[] buff) throws IOException { readNBytes(input, buff, 0, 2); return (short) (buff[0] & 0xFF | (buff[1] & 0xFF) << 8); } @@ -349,7 +477,7 @@ public int readIntLE() throws IOException { * @return - int value * @throws IOException when IO error occurs */ - public static int readIntLE(InputStream input, byte[] buff) throws IOException { + public int readIntLE(InputStream input, byte[] buff) throws IOException { readNBytes(input, buff, 0, 4); return (buff[0] & 0xFF) | (buff[1] & 0xFF) << 8 | (buff[2] & 0xFF) << 16 | (buff[3] & 0xFF) << 24; } @@ -374,7 +502,7 @@ public long readLongLE() throws IOException { * @return - long value * @throws IOException when IO error occurs */ - public static long readLongLE(InputStream input, byte[] buff) throws IOException { + public long readLongLE(InputStream input, byte[] buff) throws IOException { readNBytes(input, buff, 0, 8); return (0xFFL & buff[0]) | ((0xFFL & buff[1]) << 8) | ((0xFFL & buff[2]) << 16) @@ -418,7 +546,7 @@ public int readUnsignedShortLE() throws IOException { * @return - unsigned short value * @throws IOException */ - public static int readUnsignedShortLE(InputStream input, byte[] buff) throws IOException { + public int readUnsignedShortLE(InputStream input, byte[] buff) throws IOException { return readShortLE(input, buff) & 0xFFFF; } @@ -440,7 +568,7 @@ public long readUnsignedIntLE() throws IOException { * @return - unsigned int value * @throws IOException when IO error occurs */ - public static long readUnsignedIntLE(InputStream input, byte[] buff) throws IOException { + public long readUnsignedIntLE(InputStream input, byte[] buff) throws IOException { return readIntLE(input, buff) & 0xFFFFFFFFL; } @@ -474,7 +602,7 @@ public BigInteger readBigIntegerLE(int len, boolean unsigned) throws IOException * @return - big integer value * @throws IOException */ - public static BigInteger readBigIntegerLE(InputStream input, byte[] buff, int len, boolean unsigned) throws IOException { + public BigInteger readBigIntegerLE(InputStream input, byte[] buff, int len, boolean unsigned) throws IOException { byte[] bytes = readNBytesLE(input, buff, 0, len); return unsigned ? new BigInteger(1, bytes) : new BigInteger(bytes); } @@ -521,7 +649,7 @@ public BigDecimal readDecimal(int precision, int scale) throws IOException { return v; } - public static byte[] readNBytes(InputStream inputStream, int len) throws IOException { + public byte[] readNBytes(InputStream inputStream, int len) throws IOException { byte[] bytes = new byte[len]; return readNBytes(inputStream, bytes, 0, len); } @@ -536,10 +664,10 @@ public static byte[] readNBytes(InputStream inputStream, int len) throws IOExcep * @return target buffer * @throws IOException */ - public static byte[] readNBytes(InputStream inputStream, byte[] buffer, int offset, int len) throws IOException { + public byte[] readNBytes(InputStream inputStream, byte[] buffer, int offset, int len) throws IOException { int total = 0; while (total < len) { - int r = inputStream.read(buffer, offset + total, len - total); + int r = read(buffer, offset + total, len - total); if (r == -1) { throw new EOFException("End of stream reached before reading all data"); } @@ -562,7 +690,7 @@ private byte[] readNBytesLE(InputStream input, int len) throws IOException { * @return - target buffer * @throws IOException */ - public static byte[] readNBytesLE(InputStream input, byte[] buffer, int offset, int len) throws IOException { + public byte[] readNBytesLE(InputStream input, byte[] buffer, int offset, int len) throws IOException { byte[] bytes = readNBytes(input, buffer, 0, len); int s = 0; int i = len - 1; @@ -847,7 +975,7 @@ private double[][][][] readGeoMultiPolygon() throws IOException { * @throws IOException when failed to read value from input stream or reached * end of the stream */ - public static int readVarInt(InputStream input) throws IOException { + public int readVarInt(InputStream input) throws IOException { int value = 0; for (int i = 0; i < 10; i++) { @@ -880,7 +1008,7 @@ private ZonedDateTime readDate(TimeZone tz) throws IOException { * @return ZonedDateTime * @throws IOException when IO error occurs */ - public static ZonedDateTime readDate(InputStream input, byte[] buff, TimeZone tz) throws IOException { + public ZonedDateTime readDate(InputStream input, byte[] buff, TimeZone tz) throws IOException { LocalDate d = LocalDate.ofEpochDay(readUnsignedShortLE(input, buff)); return d.atStartOfDay(tz.toZoneId()).withZoneSameInstant(tz.toZoneId()); } @@ -905,7 +1033,7 @@ public ZonedDateTime readDate32(TimeZone tz) * @return ZonedDateTime * @throws IOException when IO error occurs */ - public static ZonedDateTime readDate32(InputStream input, byte[] buff, TimeZone tz) + public ZonedDateTime readDate32(InputStream input, byte[] buff, TimeZone tz) throws IOException { LocalDate d = LocalDate.ofEpochDay(readIntLE(input, buff)); return d.atStartOfDay(tz.toZoneId()).withZoneSameInstant(tz.toZoneId()); @@ -923,7 +1051,7 @@ private ZonedDateTime readDateTime32(TimeZone tz) throws IOException { * @return ZonedDateTime * @throws IOException when IO error occurs */ - public static ZonedDateTime readDateTime32(InputStream input, byte[] buff, TimeZone tz) throws IOException { + public ZonedDateTime readDateTime32(InputStream input, byte[] buff, TimeZone tz) throws IOException { long time = readUnsignedIntLE(input, buff); return LocalDateTime.ofInstant(Instant.ofEpochSecond(Math.max(time, 0L)), tz.toZoneId()).atZone(tz.toZoneId()); } @@ -956,7 +1084,7 @@ public ZonedDateTime readDateTime64(int scale, TimeZone tz) throws IOException { * @return * @throws IOException */ - public static ZonedDateTime readDateTime64(InputStream input, byte[] buff, int scale, TimeZone tz) throws IOException { + public ZonedDateTime readDateTime64(InputStream input, byte[] buff, int scale, TimeZone tz) throws IOException { long value = readLongLE(input, buff); int nanoSeconds = 0; if (scale > 0) { @@ -999,7 +1127,7 @@ public String readString() throws IOException { * @return String * @throws IOException when IO error occurs */ - public static String readString(InputStream input) throws IOException { + public String readString(InputStream input) throws IOException { int len = readVarInt(input); if (len == 0) { return ""; @@ -1007,8 +1135,8 @@ public static String readString(InputStream input) throws IOException { return new String(readNBytes(input, len), StandardCharsets.UTF_8); } - public static int readByteOrEOF(InputStream input) throws IOException { - int b = input.read(); + public int readByteOrEOF(InputStream input) throws IOException { + int b = serverCompression ? input.read() : read(); if (b < 0) { throw new EOFException("End of stream reached before reading all data"); } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index 38060aa12..69f919335 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -584,23 +584,23 @@ private HttpEntity wrapRequestEntity(HttpEntity httpEntity, boolean clientCompre private HttpEntity wrapResponseEntity(HttpEntity httpEntity, int httpStatus, boolean serverCompression, boolean useHttpCompression) { LOG.debug("server compression: {}, http compression: {}", serverCompression, useHttpCompression); - if (serverCompression) { - // Server doesn't compress certain errors like 403 - switch (httpStatus) { - case HttpStatus.SC_OK: - case HttpStatus.SC_CREATED: - case HttpStatus.SC_ACCEPTED: - case HttpStatus.SC_NO_CONTENT: - case HttpStatus.SC_PARTIAL_CONTENT: - case HttpStatus.SC_RESET_CONTENT: - case HttpStatus.SC_NOT_MODIFIED: - case HttpStatus.SC_BAD_REQUEST: - case HttpStatus.SC_INTERNAL_SERVER_ERROR: - case HttpStatus.SC_NOT_FOUND: - return new LZ4Entity(httpEntity, useHttpCompression, true, false, - MapUtils.getInt(chConfiguration, "compression.lz4.uncompressed_buffer_size"), true); - } - } +// if (serverCompression) { +// // Server doesn't compress certain errors like 403 +// switch (httpStatus) { +// case HttpStatus.SC_OK: +// case HttpStatus.SC_CREATED: +// case HttpStatus.SC_ACCEPTED: +// case HttpStatus.SC_NO_CONTENT: +// case HttpStatus.SC_PARTIAL_CONTENT: +// case HttpStatus.SC_RESET_CONTENT: +// case HttpStatus.SC_NOT_MODIFIED: +// case HttpStatus.SC_BAD_REQUEST: +// case HttpStatus.SC_INTERNAL_SERVER_ERROR: +// case HttpStatus.SC_NOT_FOUND: +// return new LZ4Entity(httpEntity, useHttpCompression, true, false, +// MapUtils.getInt(chConfiguration, "compression.lz4.uncompressed_buffer_size"), true); +// } +// } return httpEntity; } diff --git a/performance/src/test/com/clickhouse/benchmark/BenchmarkRunner.java b/performance/src/test/com/clickhouse/benchmark/BenchmarkRunner.java index b9f4766d0..c13727f5b 100644 --- a/performance/src/test/com/clickhouse/benchmark/BenchmarkRunner.java +++ b/performance/src/test/com/clickhouse/benchmark/BenchmarkRunner.java @@ -40,28 +40,15 @@ public static void main(String[] args) throws Exception { Options opt = new OptionsBuilder() .include(QueryClient.class.getName()) - .include(InsertClient.class.getName()) - .include(ConcurrentInsertClient.class.getName()) - .include(ConcurrentQueryClient.class.getName()) - .include(Compression.class.getName()) - .include(Serializers.class.getName()) - .include(Deserializers.class.getName()) - .include(MixedWorkload.class.getName()) - .include(DataTypes.class.getName()) - .include(JDBCQuery.class.getName()) - .include(JDBCInsert.class.getName()) - .forks(1) // must be a fork. No fork only for debugging - .mode(Mode.SampleTime) +// .include(Serializers.class.getName()) +// .include(Deserializers.class.getName()) + .forks(0) // must be a fork. No fork only for debugging + .mode(Mode.SingleShotTime) .timeUnit(TimeUnit.MILLISECONDS) - .addProfiler(GCProfiler.class) - .addProfiler(MemPoolProfiler.class) - .warmupIterations(3) - .warmupTime(TimeValue.seconds(5)) + .warmupIterations(0) .measurementIterations(10) - .jvmArgs("-Xms8g", "-Xmx8g") - .measurementTime(TimeValue.seconds(isCloud() ? 30 : 10)) + .measurementTime(TimeValue.seconds(3)) .resultFormat(ResultFormatType.JSON) -// .output(String.format("jmh-results-%s-%s.out", isCloud() ? "cloud" : "local", System.currentTimeMillis())) .result(String.format("jmh-results-%s-%s.json", isCloud() ? "cloud" : "local", System.currentTimeMillis())) .build(); diff --git a/performance/src/test/com/clickhouse/benchmark/clients/BenchmarkBase.java b/performance/src/test/com/clickhouse/benchmark/clients/BenchmarkBase.java index d6fc20fe8..e18929e33 100644 --- a/performance/src/test/com/clickhouse/benchmark/clients/BenchmarkBase.java +++ b/performance/src/test/com/clickhouse/benchmark/clients/BenchmarkBase.java @@ -14,6 +14,7 @@ import com.clickhouse.client.ClickHouseResponse; import com.clickhouse.client.ClickHouseServerForTest; import com.clickhouse.client.api.Client; +import com.clickhouse.client.api.ClientConfigProperties; import com.clickhouse.client.api.ConnectionReuseStrategy; import com.clickhouse.client.api.enums.Protocol; import com.clickhouse.client.api.insert.InsertResponse; @@ -106,7 +107,7 @@ public static class DataState { @Param({"file://dataset_500k.csv"}) String datasetSourceName; - @Param({"300000", "220000", "100000", "10000"}) + @Param({"300000"}) int limit; @Param({"data_filled"}) String tableNameFilled;