Skip to content

Commit 9ba3679

Browse files
committed
Clean up and simplify Begin/EndInsert
- Renamed `InsertData` to `SendInsertBlock`, to prevent it from appearing first when searching for `Insert...` - Removed `EndInsert` with a block. Instead, users should now use `SendInsertBlock` followed by `EndInsert`. - Removed `ReceivePreparePackets` and replaced it with a call to `ReceivePacket`, similar to what is done in `Insert`. - Renamed `inserting` to `inserting_` to follow the member variable naming convention.
1 parent b4a9d70 commit 9ba3679

File tree

3 files changed

+50
-123
lines changed

3 files changed

+50
-123
lines changed

clickhouse/client.cpp

Lines changed: 47 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ class Client::Impl {
163163

164164
Block BeginInsert(Query query);
165165

166-
void InsertData(const Block& block);
166+
void SendInsertBlock(const Block& block);
167167

168168
void EndInsert();
169169

@@ -181,7 +181,6 @@ class Client::Impl {
181181
bool Handshake();
182182

183183
bool ReceivePacket(uint64_t* server_packet = nullptr);
184-
bool ReceivePreparePackets(uint64_t* server_packet = nullptr);
185184

186185
void SendQuery(const Query& query, bool finalize = true);
187186
void FinalizeQuery();
@@ -215,7 +214,6 @@ class Client::Impl {
215214
}
216215

217216
private:
218-
bool inserting;
219217
/// In case of network errors tries to reconnect to server and
220218
/// call fuc several times.
221219
void RetryGuard(std::function<void()> func);
@@ -259,6 +257,8 @@ class Client::Impl {
259257
std::optional<Endpoint> current_endpoint_;
260258

261259
ServerInfo server_info_;
260+
261+
bool inserting_;
262262
};
263263

264264
ClientOptions modifyClientOptions(ClientOptions opts)
@@ -294,9 +294,10 @@ Client::Impl::~Impl() {
294294
}
295295

296296
void Client::Impl::ExecuteQuery(Query query) {
297-
if (inserting) {
298-
throw ProtocolError("cannot execute query while inserting");
297+
if (inserting_) {
298+
throw ValidationError("cannot execute query while inserting");
299299
}
300+
300301
EnsureNull en(static_cast<QueryEvents*>(&query), &events_);
301302

302303
if (options_.ping_before_query) {
@@ -312,9 +313,10 @@ void Client::Impl::ExecuteQuery(Query query) {
312313

313314

314315
void Client::Impl::SelectWithExternalData(Query query, const ExternalTables& external_tables) {
315-
if (inserting) {
316-
throw ProtocolError("cannot execute query while inserting");
316+
if (inserting_) {
317+
throw ValidationError("cannot execute query while inserting");
317318
}
319+
318320
if (server_info_.revision < DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) {
319321
throw UnimplementedError("This version of ClickHouse server doesn't support temporary tables");
320322
}
@@ -378,15 +380,18 @@ std::string NameToQueryString(const std::string &input)
378380
}
379381

380382
void Client::Impl::Insert(const std::string& table_name, const std::string& query_id, const Block& block) {
381-
if (inserting) {
382-
throw ProtocolError("cannot execute query while inserting");
383+
if (inserting_) {
384+
throw ValidationError("cannot execute query while inserting, use SendInsertData instead");
383385
}
386+
384387
if (options_.ping_before_query) {
385388
RetryGuard([this]() { Ping(); });
386389
}
387390

391+
inserting_ = true;
392+
388393
std::stringstream fields_section;
389-
const auto num_columns = block.GetColumnCount();
394+
const auto num_columns = block.GetColumnCount();
390395

391396
for (unsigned int i = 0; i < num_columns; ++i) {
392397
if (i == num_columns - 1) {
@@ -399,41 +404,34 @@ void Client::Impl::Insert(const std::string& table_name, const std::string& quer
399404
Query query("INSERT INTO " + table_name + " ( " + fields_section.str() + " ) VALUES", query_id);
400405
SendQuery(query);
401406

402-
uint64_t server_packet;
403-
// Receive data packet.
404-
while (true) {
405-
bool ret = ReceivePacket(&server_packet);
406-
407-
if (!ret) {
408-
throw ProtocolError("fail to receive data packet");
409-
}
407+
// Wait for a data packet and return
408+
uint64_t server_packet = 0;
409+
while (ReceivePacket(&server_packet)) {
410410
if (server_packet == ServerCodes::Data) {
411-
break;
412-
}
413-
if (server_packet == ServerCodes::Progress) {
414-
continue;
411+
SendData(block);
412+
EndInsert();
413+
return;
415414
}
416415
}
417416

418-
// Send data.
419-
inserting = true;
420-
SendData(block);
421-
EndInsert();
417+
throw ProtocolError("fail to receive data packet");
422418
}
423419

424-
void Client::Impl::InsertData(const Block& block) {
425-
if (!inserting) {
426-
throw ProtocolError("illegal call to InsertData without first calling BeginInsert");
420+
void Client::Impl::SendInsertBlock(const Block& block) {
421+
if (!inserting_) {
422+
throw ValidationError("illegal call to InsertData without first calling BeginInsert");
427423
}
424+
428425
SendData(block);
429426
}
430427

431428
void Client::Impl::EndInsert() {
432-
if (!inserting) return;
429+
if (!inserting_) {
430+
return;
431+
}
433432

434433
// Send empty block as marker of end of data.
435434
SendData(Block());
436-
inserting = false;
437435

438436
// Wait for EOS.
439437
uint64_t eos_packet{0};
@@ -446,12 +444,14 @@ void Client::Impl::EndInsert() {
446444
throw ProtocolError(std::string{"unexpected packet from server while receiving end of query, expected (expected Exception, EndOfStream or Log, got: "}
447445
+ (eos_packet ? std::to_string(eos_packet) : "nothing") + ")");
448446
}
447+
inserting_ = false;
449448
}
450449

451450
void Client::Impl::Ping() {
452-
if (inserting) {
453-
throw ProtocolError("cannot execute query while inserting");
451+
if (inserting_) {
452+
throw ValidationError("cannot execute query while inserting");
454453
}
454+
455455
WireFormat::WriteUInt64(*output_, ClientCodes::Ping);
456456
output_->Flush();
457457

@@ -465,7 +465,7 @@ void Client::Impl::Ping() {
465465

466466
void Client::Impl::ResetConnection() {
467467
InitializeStreams(socket_factory_->connect(options_, current_endpoint_.value()));
468-
inserting = false;
468+
inserting_ = false;
469469

470470
if (!Handshake()) {
471471
throw ProtocolError("fail to connect to " + options_.host);
@@ -685,78 +685,6 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) {
685685
}
686686
}
687687

688-
bool Client::Impl::ReceivePreparePackets(uint64_t* server_packet) {
689-
uint64_t packet_type = 0;
690-
691-
while (true) {
692-
if (!WireFormat::ReadVarint64(*input_, &packet_type)) {
693-
throw std::runtime_error("unexpected package type " +
694-
std::to_string((int)packet_type) + " for insert query");
695-
}
696-
if (server_packet) {
697-
*server_packet = packet_type;
698-
}
699-
700-
switch (packet_type) {
701-
case ServerCodes::Data: {
702-
if (!ReceiveData()) {
703-
throw ProtocolError("can't read data packet from input stream");
704-
}
705-
return true;
706-
}
707-
708-
case ServerCodes::Exception: {
709-
ReceiveException();
710-
return false;
711-
}
712-
713-
case ServerCodes::ProfileInfo:
714-
case ServerCodes::Progress:
715-
case ServerCodes::Pong:
716-
case ServerCodes::Hello:
717-
continue;
718-
719-
case ServerCodes::Log: {
720-
// log tag
721-
if (!WireFormat::SkipString(*input_)) {
722-
return false;
723-
}
724-
Block block;
725-
726-
// Use uncompressed stream since log blocks usually contain only one row
727-
if (!ReadBlock(*input_, &block)) {
728-
return false;
729-
}
730-
731-
if (events_) {
732-
events_->OnServerLog(block);
733-
}
734-
continue;
735-
}
736-
737-
case ServerCodes::TableColumns: {
738-
// external table name
739-
if (!WireFormat::SkipString(*input_)) {
740-
return false;
741-
}
742-
743-
// columns metadata
744-
if (!WireFormat::SkipString(*input_)) {
745-
return false;
746-
}
747-
continue;
748-
}
749-
750-
// No others expected.
751-
case ServerCodes::EndOfStream:
752-
case ServerCodes::ProfileEvents:
753-
default:
754-
throw UnimplementedError("unimplemented " + std::to_string((int)packet_type));
755-
break;
756-
}
757-
}
758-
}
759-
760688
bool Client::Impl::ReadBlock(InputStream& input, Block* block) {
761689
// Additional information about block.
762690
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO) {
@@ -1194,15 +1122,18 @@ void Client::Impl::RetryGuard(std::function<void()> func) {
11941122
}
11951123

11961124
Block Client::Impl::BeginInsert(Query query) {
1197-
if (inserting) {
1125+
if (inserting_) {
11981126
throw ProtocolError("cannot execute query while inserting");
11991127
}
1128+
12001129
EnsureNull en(static_cast<QueryEvents*>(&query), &events_);
12011130

12021131
if (options_.ping_before_query) {
12031132
RetryGuard([this]() { Ping(); });
12041133
}
12051134

1135+
inserting_ = true;
1136+
12061137
// Create a callback to extract the block with the proper query columns.
12071138
Block block;
12081139
query.OnData([&block](const Block& b) {
@@ -1212,13 +1143,15 @@ Block Client::Impl::BeginInsert(Query query) {
12121143

12131144
SendQuery(query.GetText());
12141145

1215-
// Receive data packet but keep the query/connection open.
1216-
if (!ReceivePreparePackets()) {
1217-
throw std::runtime_error("fail to receive data packet");
1146+
// Wait for a data packet and return
1147+
uint64_t server_packet = 0;
1148+
while (ReceivePacket(&server_packet)) {
1149+
if (server_packet == ServerCodes::Data) {
1150+
return block;
1151+
}
12181152
}
12191153

1220-
inserting = true;
1221-
return block;
1154+
throw ProtocolError("fail to receive data packet");
12221155
}
12231156

12241157
Client::Client(const ClientOptions& opts)
@@ -1293,13 +1226,8 @@ Block Client::BeginInsert(const std::string& query, const std::string& query_id)
12931226
return impl_->BeginInsert(Query(query, query_id));
12941227
}
12951228

1296-
void Client::InsertData(const Block& block) {
1297-
impl_->InsertData(block);
1298-
}
1299-
1300-
void Client::EndInsert(const Block& block) {
1301-
impl_->InsertData(block);
1302-
impl_->EndInsert();
1229+
void Client::SendInsertBlock(const Block& block) {
1230+
impl_->SendInsertBlock(block);
13031231
}
13041232

13051233
void Client::EndInsert() {

clickhouse/client.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,11 +278,10 @@ class Client {
278278
Block BeginInsert(const std::string& query, const std::string& query_id);
279279

280280
/// Insert data using a \p block returned by \p BeginInsert.
281-
void InsertData(const Block& block);
281+
void SendInsertBlock(const Block& block);
282282

283283
/// End an \p INSERT session started by \p BeginInsert.
284284
void EndInsert();
285-
void EndInsert(const Block& block);
286285

287286
/// Ping server for aliveness.
288287
void Ping();

ut/client_ut.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ TEST_P(ClientCase, InsertData) {
402402
f->Append(td.f);
403403
}
404404
block.RefreshRowCount();
405-
client_->InsertData(block);
405+
client_->SendInsertBlock(block);
406406
block.Clear();
407407

408408
// Insert some more values.
@@ -412,7 +412,7 @@ TEST_P(ClientCase, InsertData) {
412412
f->Append(td.f);
413413
}
414414
block.RefreshRowCount();
415-
client_->InsertData(block);
415+
client_->SendInsertBlock(block);
416416
block.Clear();
417417
client_->EndInsert();
418418
// Second call to EndInsert should be no-op.

0 commit comments

Comments
 (0)