3232import com .clickhouse .client .api .query .QueryResponse ;
3333import com .clickhouse .client .api .query .QuerySettings ;
3434import com .clickhouse .data .format .BinaryStreamUtils ;
35+ import org .apache .commons .lang3 .time .StopWatch ;
3536import org .slf4j .Logger ;
3637import org .slf4j .LoggerFactory ;
3738import org .slf4j .MDC ;
@@ -247,7 +248,7 @@ public InsertResponse insert(String tableName,
247248 if (data == null || data .isEmpty ()) {
248249 throw new IllegalArgumentException ("Data cannot be empty" );
249250 }
250- long s1 = System . currentTimeMillis ();
251+ StopWatch watch = StopWatch . createStarted ();
251252
252253 //Add format to the settings
253254 if (settings == null ) {
@@ -271,7 +272,6 @@ public InsertResponse insert(String tableName,
271272 throw new IllegalArgumentException ("No serializer found for the given class. Please register() before calling this method." );
272273 }
273274
274- long s2 = System .currentTimeMillis ();
275275 //Call the static .serialize method on the POJOSerializer for each object in the list
276276 for (Object obj : data ) {
277277 for (POJOSerializer serializer : serializers ) {
@@ -282,9 +282,9 @@ public InsertResponse insert(String tableName,
282282 }
283283 }
284284 }
285- long s3 = System .currentTimeMillis ();
286285
287- LOG .debug ("Total serialization time: {}" , s3 - s1 );
286+ watch .stop ();
287+ LOG .debug ("Total serialization time: {}" , watch .getTime ());
288288 return insert (tableName , new ByteArrayInputStream (stream .toByteArray ()), settings );
289289 }
290290
@@ -294,18 +294,18 @@ public InsertResponse insert(String tableName,
294294 public InsertResponse insert (String tableName ,
295295 InputStream data ,
296296 InsertSettings settings ) throws IOException , ClientException {
297- long s1 = System . currentTimeMillis ();
297+ StopWatch watch = StopWatch . createStarted ();
298298 InsertResponse response ;
299299 try (ClickHouseClient client = createClient ()) {
300300 ClickHouseRequest .Mutation request = createMutationRequest (client .write (getServerNode ()), tableName , settings )
301- .format (( ClickHouseFormat ) settings .getSetting ( ClickHouseClientOption . FORMAT . getKey () ));
301+ .format (settings .getFormat ( ));
302302
303303 Future <ClickHouseResponse > future ;
304304 try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory .getInstance ().createPipedOutputStream (request .getConfig ())) {
305305 future = request .data (stream .getInputStream ()).execute ();
306306
307307 //Copy the data from the input stream to the output stream
308- byte [] buffer = new byte [8196 ];
308+ byte [] buffer = new byte [settings . getInputStreamBatchSize () ];
309309 int bytesRead ;
310310 while ((bytesRead = data .read (buffer )) != -1 ) {
311311 stream .write (buffer , 0 , bytesRead );
@@ -318,8 +318,8 @@ public InsertResponse insert(String tableName,
318318 }
319319 }
320320
321- long s2 = System . currentTimeMillis ();
322- LOG .debug ("Total insert (InputStream) time: {}" , s2 - s1 );
321+ watch . stop ();
322+ LOG .debug ("Total insert (InputStream) time: {}" , watch . getTime () );
323323 return response ;
324324 }
325325
0 commit comments