diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index 1897b4d5f..f53bcd113 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -709,18 +709,31 @@ async def __call__(self, es, params): if max_num_segments: merge_params["max_num_segments"] = max_num_segments if mode == "polling": + task_id = None complete = False - try: - await es.indices.forcemerge(**merge_params) - complete = True - except elasticsearch.ConnectionTimeout: - pass + es_info = await es.info() + es_version = Version.from_string(es_info["version"]["number"]) + if es_version < Version(8, 1, 0): # before 8.1.0 wait_for_completion is not supported + try: + await es.indices.forcemerge(**merge_params) + complete = True + except elasticsearch.ConnectionTimeout: + pass + else: + complete = False + merge_params["wait_for_completion"] = False + response = await es.indices.forcemerge(**merge_params) + task_id = response.get("task") while not complete: await asyncio.sleep(params.get("poll-period")) - tasks = await es.tasks.list(params={"actions": "indices:admin/forcemerge"}) - if len(tasks["nodes"]) == 0: - # empty nodes response indicates no tasks - complete = True + if task_id: + tasks = await es.tasks.get(task_id=task_id) + complete = tasks.get("completed", False) + else: + tasks = await es.tasks.list(params={"actions": "indices:admin/forcemerge"}) + if len(tasks["nodes"]) == 0: + # empty nodes response indicates no tasks + complete = True else: await es.indices.forcemerge(**merge_params) diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index adccabaac..ae2875dd9 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -1519,6 +1519,79 @@ async def test_bulk_index_success_with_refresh_invalid(self, es): class TestForceMergeRunner: + + def _eight_cluster_info_output(self): + return { + "name": "es01", + "cluster_name": "docker-cluster", + "cluster_uuid": "7KTGbgcOTgSC0_X8B57-Gg", + "version": { + "number": "8.1.0", + "build_flavor": "default", + "build_type": "docker", + "build_hash": "3700f7679f7d95e36da0b43762189bab189bc53a", + "build_date": "2022-03-03T14:20:00.690422633Z", + "build_snapshot": False, + "lucene_version": "9.0.0", + "minimum_wire_compatibility_version": "7.17.0", + "minimum_index_compatibility_version": "7.0.0", + }, + "tagline": "You Know, for Search", + } + + def _seven_cluster_info_output(self): + return { + "name": "es01", + "cluster_name": "escluster", + "cluster_uuid": "4BgOtWNiQ6-zap9zDW2Q1A", + "version": { + "number": "7.17.3", + "build_flavor": "default", + "build_type": "tar", + "build_hash": "5ad023604c8d7416c9eb6c0eadb62b14e766caff", + "build_date": "2022-04-19T08:11:19.070913226Z", + "build_snapshot": False, + "lucene_version": "8.11.1", + "minimum_wire_compatibility_version": "6.8.0", + "minimum_index_compatibility_version": "6.0.0-beta1", + }, + "tagline": "You Know, for Search", + } + + def _task_list_output(self): + return { + "nodes": { + "Ap3OfntPT7qL4CBeKvamxg": { + "name": "instance-0000000001", + "transport_address": "10.46.79.231:19693", + "host": "10.46.79.231", + "ip": "10.46.79.231:19693", + "roles": ["data", "ingest", "master", "remote_cluster_client", "transform"], + "attributes": { + "logical_availability_zone": "zone-1", + "server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526", + "availability_zone": "us-east4-a", + "xpack.installed": "true", + "instance_configuration": "gcp.data.highio.1", + "transform.node": "true", + "region": "unknown-region", + }, + "tasks": { + "Ap3OfntPT7qL4CBeKvamxg:417009036": { + "node": "Ap3OfntPT7qL4CBeKvamxg", + "id": 417009036, + "type": "transport", + "action": "indices:admin/forcemerge", + "start_time_in_millis": 1598018980850, + "running_time_in_nanos": 3659821411, + "cancellable": False, + "headers": {}, + } + }, + } + } + } + @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_force_merge_with_defaults(self, es): @@ -1562,97 +1635,147 @@ async def test_force_merge_with_params(self, es): @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_force_merge_with_polling_no_timeout(self, es): - es.indices.forcemerge = mock.AsyncMock() + es.indices.forcemerge = mock.AsyncMock(return_value={"task": "Ap3OfntPT7qL4CBeKvamxg"}) + es.info = mock.AsyncMock( + return_value=self._eight_cluster_info_output(), + ) + es.tasks.get = mock.AsyncMock(return_value={"completed": True}) + es.tasks.list = mock.AsyncMock( + side_effect=[ + self._task_list_output(), + { + "nodes": {}, + }, + ] + ) force_merge = runner.ForceMerge() await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0}) - es.indices.forcemerge.assert_awaited_once_with(index="_all") + es.indices.forcemerge.assert_awaited_once_with(index="_all", wait_for_completion=False) + es.tasks.get.assert_awaited_once_with(task_id="Ap3OfntPT7qL4CBeKvamxg") + es.tasks.list.assert_not_awaited() @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio async def test_force_merge_with_polling(self, es): - es.indices.forcemerge = mock.AsyncMock(side_effect=elasticsearch.ConnectionTimeout(message="connection timeout")) + es.indices.forcemerge = mock.AsyncMock(return_value={"task": "Ap3OfntPT7qL4CBeKvamxg"}) + es.info = mock.AsyncMock( + return_value=self._eight_cluster_info_output(), + ) + es.tasks.get = mock.AsyncMock(return_value={"completed": True}) es.tasks.list = mock.AsyncMock( side_effect=[ + self._task_list_output, { - "nodes": { - "Ap3OfntPT7qL4CBeKvamxg": { - "name": "instance-0000000001", - "transport_address": "10.46.79.231:19693", - "host": "10.46.79.231", - "ip": "10.46.79.231:19693", - "roles": ["data", "ingest", "master", "remote_cluster_client", "transform"], - "attributes": { - "logical_availability_zone": "zone-1", - "server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526", - "availability_zone": "us-east4-a", - "xpack.installed": "true", - "instance_configuration": "gcp.data.highio.1", - "transform.node": "true", - "region": "unknown-region", - }, - "tasks": { - "Ap3OfntPT7qL4CBeKvamxg:417009036": { - "node": "Ap3OfntPT7qL4CBeKvamxg", - "id": 417009036, - "type": "transport", - "action": "indices:admin/forcemerge", - "start_time_in_millis": 1598018980850, - "running_time_in_nanos": 3659821411, - "cancellable": False, - "headers": {}, - } - }, - } - } + "nodes": {}, }, + ] + ) + force_merge = runner.ForceMerge() + await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0}) + es.indices.forcemerge.assert_awaited_once_with(index="_all", wait_for_completion=False) + es.tasks.get.assert_awaited_once_with(task_id="Ap3OfntPT7qL4CBeKvamxg") + es.tasks.list.assert_not_awaited() + + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_force_merge_with_polling_and_params(self, es): + es.indices.forcemerge = mock.AsyncMock(return_value={"task": "Ap3OfntPT7qL4CBeKvamxg"}) + es.info = mock.AsyncMock(return_value=self._eight_cluster_info_output()) + es.tasks.get = mock.AsyncMock(return_value={"completed": True}) + es.tasks.list = mock.AsyncMock( + side_effect=[ + self._task_list_output(), { "nodes": {}, }, ] ) + force_merge = runner.ForceMerge() + # request-timeout should be ignored as mode:polling + await force_merge( + es, + params={ + "index": "_all", + "mode": "polling", + "max-num-segments": 1, + "request-timeout": 50000, + "poll-period": 0, + }, + ) + es.indices.forcemerge.assert_awaited_once_with(index="_all", max_num_segments=1, request_timeout=50000, wait_for_completion=False) + es.tasks.get.assert_awaited_once_with(task_id="Ap3OfntPT7qL4CBeKvamxg") + es.tasks.list.assert_not_awaited() + + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_force_merge_with_polling_and_params_missing_task_id(self, es): + es.indices.forcemerge = mock.AsyncMock(return_value={}) + es.info = mock.AsyncMock(return_value=self._eight_cluster_info_output()) + es.tasks.get = mock.AsyncMock(return_value={"completed": True}) + es.tasks.list = mock.AsyncMock( + side_effect=[ + self._task_list_output(), + { + "nodes": {}, + }, + ] + ) + force_merge = runner.ForceMerge() + # request-timeout should be ignored as mode:polling + await force_merge( + es, + params={ + "index": "_all", + "mode": "polling", + "max-num-segments": 1, + "request-timeout": 50000, + "poll-period": 0, + }, + ) + es.indices.forcemerge.assert_awaited_once_with(index="_all", max_num_segments=1, request_timeout=50000, wait_for_completion=False) + es.tasks.get.assert_not_awaited() + es.tasks.list.assert_awaited() + + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_force_merge_with_polling_no_timeout_pre_8_1(self, es): + es.indices.forcemerge = mock.AsyncMock() + es.info = mock.AsyncMock( + return_value=self._seven_cluster_info_output(), + ) + force_merge = runner.ForceMerge() await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0}) es.indices.forcemerge.assert_awaited_once_with(index="_all") @mock.patch("elasticsearch.Elasticsearch") @pytest.mark.asyncio - async def test_force_merge_with_polling_and_params(self, es): - es.indices.forcemerge = mock.AsyncMock(return_value=elasticsearch.ConnectionTimeout("connection timeout")) + async def test_force_merge_with_polling_pre_8_1(self, es): + es.indices.forcemerge = mock.AsyncMock(side_effect=elasticsearch.ConnectionTimeout(message="connection timeout")) + es.info = mock.AsyncMock(return_value=self._seven_cluster_info_output()) es.tasks.list = mock.AsyncMock( side_effect=[ + self._task_list_output(), { - "nodes": { - "Ap3OfntPT7qL4CBeKvamxg": { - "name": "instance-0000000001", - "transport_address": "10.46.79.231:19693", - "host": "10.46.79.231", - "ip": "10.46.79.231:19693", - "roles": ["data", "ingest", "master", "remote_cluster_client", "transform"], - "attributes": { - "logical_availability_zone": "zone-1", - "server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526", - "availability_zone": "us-east4-a", - "xpack.installed": "true", - "instance_configuration": "gcp.data.highio.1", - "transform.node": "true", - "region": "unknown-region", - }, - "tasks": { - "Ap3OfntPT7qL4CBeKvamxg:417009036": { - "node": "Ap3OfntPT7qL4CBeKvamxg", - "id": 417009036, - "type": "transport", - "action": "indices:admin/forcemerge", - "start_time_in_millis": 1598018980850, - "running_time_in_nanos": 3659821411, - "cancellable": False, - "headers": {}, - } - }, - } - } + "nodes": {}, }, + ] + ) + force_merge = runner.ForceMerge() + await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0}) + es.indices.forcemerge.assert_awaited_once_with(index="_all") + + @mock.patch("elasticsearch.Elasticsearch") + @pytest.mark.asyncio + async def test_force_merge_with_polling_and_params_pre_8_1(self, es): + es.indices.forcemerge = mock.AsyncMock(return_value=elasticsearch.ConnectionTimeout("connection timeout")) + es.info = mock.AsyncMock( + return_value=self._seven_cluster_info_output(), + ) + es.tasks.list = mock.AsyncMock( + side_effect=[ + self._task_list_output(), { "nodes": {}, },