Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Bulk API implementation with the help of a BulkBuilder class
  • Loading branch information
alolis committed Nov 27, 2014
commit 16761428881148b6aec4db732e1123a35e10d586
89 changes: 88 additions & 1 deletion src/elasticsearch/elasticsearch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <cstring>
#include <cassert>
#include <locale>
#include <vector>

ElasticSearch::ElasticSearch(const std::string& node, bool readOnly): _http(node, true), _readOnly(readOnly) {

Expand Down Expand Up @@ -309,7 +310,7 @@ long ElasticSearch::search(const std::string& index, const std::string& type, co
return result.getValue("hits").getObject().getValue("total").getLong();
}

/// Delete given type (and all documents, mappings)
/// Delete given type (and all documents, mappings)
bool ElasticSearch::deleteType(const std::string& index, const std::string& type){
std::ostringstream uri;
uri << index << "/" << type;
Expand Down Expand Up @@ -339,3 +340,89 @@ void ElasticSearch::refresh(const std::string& index){
Json::Object msg;
_http.get(oss.str().c_str(), 0, &msg);
}

// Bulk API of ES.
bool ElasticSearch::bulk(const char* data, Json::Object& jResult) {
if(_readOnly)
return false;

return (200 == _http.post("/_bulk", data, &jResult));
}

BulkBuilder::BulkBuilder() {}

void BulkBuilder::createCommand(const std::string &op, const std::string &index, const std::string &type, const std::string &id = "") {
Json::Object command;
Json::Object commandParams;

if (id != "") {
commandParams.addMemberByKey("_id", id);
}

commandParams.addMemberByKey("_index", index);
commandParams.addMemberByKey("_type", type);

command.addMemberByKey(op, commandParams);
operations.push_back(command);
}

void BulkBuilder::index(const std::string &index, const std::string &type, const std::string &id, const Json::Object &fields) {
createCommand("index", index, type, id);
operations.push_back(fields);
}

void BulkBuilder::create(const std::string &index, const std::string &type, const std::string &id, const Json::Object &fields) {
createCommand("create", index, type, id);
operations.push_back(fields);
}

void BulkBuilder::index(const std::string &index, const std::string &type, const Json::Object &fields) {
createCommand("index", index, type);
operations.push_back(fields);
}

void BulkBuilder::create(const std::string &index, const std::string &type, const Json::Object &fields) {
createCommand("create", index, type);
operations.push_back(fields);
}

void BulkBuilder::update(const std::string &index, const std::string &type, const std::string &id, const Json::Object &fields) {
createCommand("update", index, type, id);

Json::Object updateFields;
updateFields.addMemberByKey("doc", fields);

operations.push_back(updateFields);
}

void BulkBuilder::del(const std::string &index, const std::string &type, const std::string &id) {
createCommand("delete", index, type, id);
}

void BulkBuilder::upsert(const std::string &index, const std::string &type, const std::string &id, const Json::Object &fields) {
createCommand("update", index, type, id);

Json::Object updateFields;
updateFields.addMemberByKey("doc", fields);
updateFields.addMemberByKey("doc_as_upsert", true);

operations.push_back(updateFields);
}

std::string BulkBuilder::str() {
std::stringstream json;

for(auto &operation : operations) {
json << operation.str() << std::endl;
}

return json.str();
}

void BulkBuilder::clear() {
operations.clear();
}

bool BulkBuilder::isEmpty() {
return operations.empty();
}
24 changes: 24 additions & 0 deletions src/elasticsearch/elasticsearch.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <sstream>
#include <list>
#include <mutex>
#include <vector>

#include "http/http.h"
#include "json/json.h"
Expand Down Expand Up @@ -61,6 +62,9 @@ class ElasticSearch {
/// Perform a scan to get all results from a query.
int fullScan(const std::string& index, const std::string& type, const std::string& query, Json::Array& resultArray, int scrollSize = 1000);

// Bulk API
bool bulk(const char*, Json::Object& jResult);

public:
/// Delete given type (and all documents, mappings)
bool deleteType(const std::string& index, const std::string& type);
Expand Down Expand Up @@ -89,4 +93,24 @@ class ElasticSearch {
bool _readOnly;
};

class BulkBuilder {
private:
std::vector<Json::Object> operations;

void createCommand(const std::string &op, const std::string &index, const std::string &type, const std::string &id);

public:
BulkBuilder();
void index(const std::string &index, const std::string &type, const std::string &id, const Json::Object &fields);
void create(const std::string &index, const std::string &type, const std::string &id, const Json::Object &fields);
void index(const std::string &index, const std::string &type, const Json::Object &fields);
void create(const std::string &index, const std::string &type, const Json::Object &fields);
void update(const std::string &index, const std::string &type, const std::string &id, const Json::Object &fields);
void del(const std::string &index, const std::string &type, const std::string &id);
void upsert(const std::string &index, const std::string &type, const std::string &id, const Json::Object &fields);
void clear();
std::string str();
bool isEmpty();
};

#endif // ELASTICSEARCH_H