From 291f3ebf6aacc69d9d1dd3b4fdfb912d1313f731 Mon Sep 17 00:00:00 2001 From: Moazam Date: Fri, 16 Nov 2018 19:38:42 +0000 Subject: [PATCH 1/6] Update composer.json --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index 58e95a3..965803a 100644 --- a/composer.json +++ b/composer.json @@ -39,7 +39,7 @@ }, "require": { "php": ">=5.6.6", - "elasticsearch/elasticsearch": "^5.0|^6.0", + "elasticsearch/elasticsearch": "^6.0", "illuminate/pagination": "*", "illuminate/support": "*", "symfony/var-dumper": "*", From a3b3d0d087352609179fdeddd48540622d4c1410 Mon Sep 17 00:00:00 2001 From: Moazam Date: Fri, 16 Nov 2018 19:39:29 +0000 Subject: [PATCH 2/6] Update composer.json --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index 965803a..12bb0db 100644 --- a/composer.json +++ b/composer.json @@ -1,5 +1,5 @@ { - "name": "basemkhirat/elasticsearch", + "name": "moazam/elasticsearch", "description": "Laravel, Lumen and Native php elasticseach query builder to build complex queries using an elegant syntax", "keywords": [ "elasticsearch", From c664809e411bcda1dc34cd56958d08e26646e501 Mon Sep 17 00:00:00 2001 From: Moazam Date: Fri, 16 Nov 2018 19:50:11 +0000 Subject: [PATCH 3/6] Update composer.json --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index 12bb0db..9171b7e 100644 --- a/composer.json +++ b/composer.json @@ -39,7 +39,7 @@ }, "require": { "php": ">=5.6.6", - "elasticsearch/elasticsearch": "^6.0", + "elasticsearch/elasticsearch": "6.*", "illuminate/pagination": "*", "illuminate/support": "*", "symfony/var-dumper": "*", From f60b5c137e3c78ea5a42ff0b44e15df107999914 Mon Sep 17 00:00:00 2001 From: Moazam Date: Fri, 16 Nov 2018 21:13:23 +0000 Subject: [PATCH 4/6] Update composer.json --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index 9171b7e..a84c676 100644 --- a/composer.json +++ b/composer.json @@ -39,7 +39,7 @@ }, "require": { "php": ">=5.6.6", - "elasticsearch/elasticsearch": "6.*", + "elasticsearch/elasticsearch": "*", "illuminate/pagination": "*", "illuminate/support": "*", "symfony/var-dumper": "*", From 8b659dfaaf97ddcbd04b506b8fbf6fb8113efcad Mon Sep 17 00:00:00 2001 From: Moazam Date: Wed, 10 Apr 2019 18:05:31 +0100 Subject: [PATCH 5/6] fixed bulk index issue --- src/Classes/Bulk.php | 453 ++++++++++++++++---------------- src/Commands/ReindexCommand.php | 254 +++++++++--------- src/Query.php | 2 +- 3 files changed, 354 insertions(+), 355 deletions(-) diff --git a/src/Classes/Bulk.php b/src/Classes/Bulk.php index b98f507..97903d9 100755 --- a/src/Classes/Bulk.php +++ b/src/Classes/Bulk.php @@ -11,230 +11,231 @@ class Bulk { - /** - * The query object - * @var Query - */ - public $query; - - /** - * The document key - * @var string - */ - public $_id; - - /** - * The index name - * @var string - */ - public $index; - - /** - * The type name - * @var string - */ - public $type; - - /** - * Bulk body - * @var array - */ - public $body = []; - - /** - * Number of pending operations - * @var int - */ - public $operationCount = 0; - - /** - * Operation count which will trigger autocommit - * @var int - */ - public $autocommitAfter = 0; - - - /** - * Bulk constructor. - * @param Query $query - * @param int $autocommitAfter - */ - public function __construct(Query $query, $autocommitAfter = 0) - { - - $this->query = $query; - - $this->autocommitAfter = intval($autocommitAfter); - } - - /** - * Set the index name - * @param $index - * @return $this - */ - public function index($index = false) - { - - $this->index = $index; - - return $this; - } - - /** - * Get the index name - * @return mixed - */ - protected function getIndex() - { - return $this->index ? $this->index : $this->query->getIndex(); - } - - /** - * Set the type name - * @param $type - * @return $this - */ - public function type($type = false) - { - - $this->type = $type; - - return $this; - } - - /** - * Get the type name - * @return mixed - */ - protected function getType() - { - return $this->type ? $this->type : $this->query->getType(); - } - - /** - * Filter by _id - * @param bool $_id - * @return $this - */ - public function _id($_id = false) - { - - $this->_id = $_id; - - return $this; - } - - /** - * Just an alias for _id() method - * @param bool $_id - * @return $this - */ - public function id($_id = false) - { - return $this->_id($_id); - } - - /** - * Add pending document for insert - * @param array $data - * @return mixed - */ - public function insert($data = []) - { - return $this->action('index', $data); - } - - /** - * Add pending document for update - * @param array $data - * @return mixed - */ - public function update($data = []) - { - - return $this->action('update', $data); - } - - /** - * Add pending document for deletion - */ - public function delete() - { - - return $this->action('delete'); - } - - /** - * Add pending document abstract action - * @param string $actionType - * @param array $data - * @return mixed - */ - public function action($actionType, $data = []) - { - - $this->body["body"][] = [ - - $actionType => [ - '_index' => $this->getIndex(), - '_type' => $this->getType(), - '_id' => $this->_id - ] - - ]; - - if (!empty($data)) { - if($actionType == "update"){ - $this->body["body"][] = ["doc" => $data]; - }else { - $this->body["body"][] = $data; - } - } - - $this->operationCount++; - - $this->reset(); - - if ($this->autocommitAfter > 0 && $this->operationCount >= $this->autocommitAfter) { - return $this->commit(); - } - - return true; - } - - /** - * Get Bulk body - * @return array - */ - public function body() - { - return $this->body; - } - - /** - * Reset names - * @return void - */ - public function reset() - { - $this->index(NULL); - $this->type(NULL); - } - - /** - * Commit all pending operations - */ - public function commit() - { - - if (empty($this->body)) { - return false; - } - - $result = $this->query->connection->bulk($this->body); - $this->operationCount = 0; - $this->body = []; - - return $result; - } + /** + * The query object + * @var Query + */ + public $query; + + /** + * The document key + * @var string + */ + public $_id; + + /** + * The index name + * @var string + */ + public $index; + + /** + * The type name + * @var string + */ + public $type; + + /** + * Bulk body + * @var array + */ + public $body = []; + + /** + * Number of pending operations + * @var int + */ + public $operationCount = 0; + + /** + * Operation count which will trigger autocommit + * @var int + */ + public $autocommitAfter = 0; + + + /** + * Bulk constructor. + * @param Query $query + * @param int $autocommitAfter + */ + public function __construct(Query $query, $autocommitAfter = 0) + { + + $this->query = $query; + + $this->autocommitAfter = intval($autocommitAfter); + } + + /** + * Set the index name + * @param $index + * @return $this + */ + public function index($index = false) + { + + $this->index = $index; + + return $this; + } + + /** + * Get the index name + * @return mixed + */ + protected function getIndex() + { + return $this->index ? $this->index : $this->query->getIndex(); + } + + /** + * Set the type name + * @param $type + * @return $this + */ + public function type($type = false) + { + + $this->type = $type; + + return $this; + } + + /** + * Get the type name + * @return mixed + */ + protected function getType() + { + return $this->type ? $this->type : $this->query->getType(); + } + + /** + * Filter by _id + * @param bool $_id + * @return $this + */ + public function _id($_id = false) + { + + $this->_id = $_id; + + return $this; + } + + /** + * Just an alias for _id() method + * @param bool $_id + * @return $this + */ + public function id($_id = false) + { + return $this->_id($_id); + } + + /** + * Add pending document for insert + * @param array $data + * @return mixed + */ + public function insert($data = []) + { + return $this->action('index', $data); + } + + /** + * Add pending document for update + * @param array $data + * @return mixed + */ + public function update($data = []) + { + + return $this->action('update', $data); + } + + /** + * Add pending document for deletion + */ + public function delete() + { + + return $this->action('delete'); + } + + /** + * Add pending document abstract action + * @param string $actionType + * @param array $data + * @return mixed + */ + public function action($actionType, $data = []) + { + + $this->body["body"][] = [ + + $actionType => [ + '_index' => $this->getIndex(), + '_type' => $this->getType(), + '_id' => $this->_id + ] + + ]; + + if (!empty($data)) { + if($actionType == "update"){ + $this->body["body"][] = ["doc" => $data]; + }else { + $this->body["body"][] = $data; + } + } + + $this->operationCount++; + unset($data); + + $this->reset(); + + if ($this->autocommitAfter > 0 && $this->operationCount >= $this->autocommitAfter) { + return $this->commit(); + } + + return true; + } + + /** + * Get Bulk body + * @return array + */ + public function body() + { + return $this->body; + } + + /** + * Reset names + * @return void + */ + public function reset() + { + $this->index(NULL); + $this->type(NULL); + } + + /** + * Commit all pending operations + */ + public function commit() + { + + if (empty($this->body)) { + return false; + } + + $result = $this->query->connection->bulk($this->body); + $this->operationCount = 0; + $this->body = []; + + return $result; + } } diff --git a/src/Commands/ReindexCommand.php b/src/Commands/ReindexCommand.php index 86bc2c4..eb8a8d4 100755 --- a/src/Commands/ReindexCommand.php +++ b/src/Commands/ReindexCommand.php @@ -10,185 +10,183 @@ */ class ReindexCommand extends Command { - /** - * The name and signature of the console command. - * - * @var string - */ - protected $signature = 'es:indices:reindex {index}{new_index} + /** + * The name and signature of the console command. + * + * @var string + */ + protected $signature = 'es:indices:reindex {index}{new_index} {--bulk-size=1000 : Scroll size} {--skip-errors : Skip reindexing errors} {--hide-errors : Hide reindexing errors} {--scroll=2m : query scroll time} {--connection= : Elasticsearch connection}'; - /** - * The console command description. - * - * @var string - */ - protected $description = 'Reindex indices data'; + /** + * The console command description. + * + * @var string + */ + protected $description = 'Reindex indices data'; - /** - * ES connection name - * @var string - */ - protected $connection; + /** + * ES connection name + * @var string + */ + protected $connection; - /** - * ES object - * @var object - */ - protected $es; + /** + * ES object + * @var object + */ + protected $es; - /** - * Query bulk size - * @var integer - */ - protected $size; + /** + * Query bulk size + * @var integer + */ + protected $size; - /** - * Scroll time - * @var string - */ - protected $scroll; + /** + * Scroll time + * @var string + */ + protected $scroll; - /** - * ReindexCommand constructor. - */ - function __construct() - { - parent::__construct(); - $this->es = app("es"); - } + /** + * ReindexCommand constructor. + */ + function __construct() + { + parent::__construct(); + $this->es = app("es"); + } - /** - * Execute the console command. - * - * @return mixed - */ - public function handle() - { + /** + * Execute the console command. + * + * @return mixed + */ + public function handle() + { - $this->connection = $this->option("connection") ? $this->option("connection") : config("es.default"); + $this->connection = $this->option("connection") ? $this->option("connection") : config("es.default"); - $this->size = (int)$this->option("bulk-size"); + $this->size = (int)$this->option("bulk-size"); - $this->scroll = $this->option("scroll"); + $this->scroll = $this->option("scroll"); - if ($this->size <= 0 or !is_numeric($this->size)) { - return $this->warn("Invalide size value"); - } + if ($this->size <= 0 or !is_numeric($this->size)) { + return $this->warn("Invalide size value"); + } - $original_index = $this->argument('index'); + $original_index = $this->argument('index'); - $new_index = $this->argument('new_index'); + $new_index = $this->argument('new_index'); - if (!in_array($original_index, array_keys(config("es.indices")))) { - return $this->warn("Missing configuration for index: {$original_index}"); - } + if (!in_array($original_index, array_keys(config("es.indices")))) { + return $this->warn("Missing configuration for index: {$original_index}"); + } - if (!in_array($new_index, array_keys(config("es.indices")))) { - return $this->warn("Missing configuration for index: {$new_index}"); - } + if (!in_array($new_index, array_keys(config("es.indices")))) { + return $this->warn("Missing configuration for index: {$new_index}"); + } - $this->migrate($original_index, $new_index); - } + $this->migrate($original_index, $new_index); + } - /** - * Migrate data with Scroll queries & Bulk API - * @param $original_index - * @param $new_index - * @param null $scroll_id - * @param int $errors - * @param int $page - */ - function migrate($original_index, $new_index, $scroll_id = null, $errors = 0, $page = 1) - { + /** + * Migrate data with Scroll queries & Bulk API + * @param $original_index + * @param $new_index + * @param null $scroll_id + * @param int $errors + * @param int $page + */ + function migrate($original_index, $new_index, $scroll_id = null, $errors = 0, $page = 1) + { - if ($page == 1) { + if ($page == 1) { - $pages = (int)ceil($this->es->connection($this->connection)->index($original_index)->count() / $this->size); + $pages = (int)ceil($this->es->connection($this->connection)->index($original_index)->count() / $this->size); - $this->output->progressStart($pages); + $this->output->progressStart($pages); - $documents = $this->es->connection($this->connection)->index($original_index)->type("") - ->scroll($this->scroll) - ->take($this->size) - ->response(); + $documents = $this->es->connection($this->connection)->index($original_index)->type("") + ->scroll($this->scroll) + ->take($this->size) + ->response(); - } else { + } else { - $documents = $this->es->connection($this->connection)->index($original_index)->type("") - ->scroll($this->scroll) - ->scrollID($scroll_id) - ->response(); + $documents = $this->es->connection($this->connection)->index($original_index)->type("") + ->scroll($this->scroll) + ->scrollID($scroll_id) + ->response(); - } + } - if (isset($documents["hits"]["hits"]) and count($documents["hits"]["hits"])) { + if (isset($documents["hits"]["hits"]) and count($documents["hits"]["hits"])) { + $params = []; - $data = $documents["hits"]["hits"]; + foreach ($documents["hits"]["hits"] as $row) { - $params = []; + $params["body"][] = [ - foreach ($data as $row) { + 'index' => [ + '_index' => $new_index, + '_type' => $row["_type"], + '_id' => $row["_id"] + ] - $params["body"][] = [ + ]; - 'index' => [ - '_index' => $new_index, - '_type' => $row["_type"], - '_id' => $row["_id"] - ] + $params["body"][] = $row["_source"]; - ]; + } - $params["body"][] = $row["_source"]; + $response = $this->es->connection($this->connection)->raw()->bulk($params); + $scrollId = $documents["_scroll_id"]; + unset($params); + unset($documents); - } + if (isset($response["errors"]) and $response["errors"]) { + if (!$this->option("hide-errors")) { + if ($this->option("skip-errors")) { + $this->warn("\n" . json_encode($response["items"])); + } else { + return $this->warn("\n" . json_encode($response["items"])); + } - $response = $this->es->connection($this->connection)->raw()->bulk($params); + } - if (isset($response["errors"]) and $response["errors"]) { + $errors++; + } - if (!$this->option("hide-errors")) { + $this->output->progressAdvance(); - if ($this->option("skip-errors")) { - $this->warn("\n" . json_encode($response["items"])); - } else { - return $this->warn("\n" . json_encode($response["items"])); - } + } else { - } + // Reindexing finished - $errors++; - } + $this->output->progressFinish(); - $this->output->progressAdvance(); + $total = $this->es->connection($this->connection)->index($original_index)->count(); - } else { + if ($errors > 0) { + return $this->warn("$total documents reindexed with $errors errors."); + } else { + return $this->info("$total documents reindexed $errors errors."); + } - // Reindexing finished + } - $this->output->progressFinish(); + $page++; - $total = $this->es->connection($this->connection)->index($original_index)->count(); - - if ($errors > 0) { - return $this->warn("$total documents reindexed with $errors errors."); - } else { - return $this->info("$total documents reindexed $errors errors."); - } - - } - - $page++; - - $this->migrate($original_index, $new_index, $documents["_scroll_id"], $errors, $page); - } + $this->migrate($original_index, $new_index, $scrollId, $errors, $page); + } } diff --git a/src/Query.php b/src/Query.php index 42cfa19..be9bb36 100755 --- a/src/Query.php +++ b/src/Query.php @@ -1086,7 +1086,7 @@ public function bulk($data) $params["body"][] = $value; } - + unset($data); } return (object)$this->connection->bulk($params); From 0d381ccb415eed501c612b1ff5bb3471210bd27a Mon Sep 17 00:00:00 2001 From: Moazam Date: Wed, 10 Apr 2019 23:45:32 +0100 Subject: [PATCH 6/6] fixed memory leak --- src/Commands/ReindexCommand.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Commands/ReindexCommand.php b/src/Commands/ReindexCommand.php index eb8a8d4..b151c48 100755 --- a/src/Commands/ReindexCommand.php +++ b/src/Commands/ReindexCommand.php @@ -165,7 +165,8 @@ function migrate($original_index, $new_index, $scroll_id = null, $errors = 0, $p $errors++; } - + unset($response); + gc_collect_cycles(); $this->output->progressAdvance(); } else {