Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,18 +709,31 @@ async def __call__(self, es, params):
if max_num_segments:
merge_params["max_num_segments"] = max_num_segments
if mode == "polling":
task_id = None
complete = False
try:
await es.indices.forcemerge(**merge_params)
complete = True
except elasticsearch.ConnectionTimeout:
pass
es_info = await es.info()
es_version = Version.from_string(es_info["version"]["number"])
if es_version < Version(8, 1, 0): # before 8.1.0 wait_for_completion is not supported
try:
await es.indices.forcemerge(**merge_params)
complete = True
except elasticsearch.ConnectionTimeout:
pass
else:
complete = False
merge_params["wait_for_completion"] = False
response = await es.indices.forcemerge(**merge_params)
task_id = response.get("task")
while not complete:
await asyncio.sleep(params.get("poll-period"))
tasks = await es.tasks.list(params={"actions": "indices:admin/forcemerge"})
if len(tasks["nodes"]) == 0:
# empty nodes response indicates no tasks
complete = True
if task_id:
tasks = await es.tasks.get(task_id=task_id)
complete = tasks.get("completed", False)
else:
tasks = await es.tasks.list(params={"actions": "indices:admin/forcemerge"})
if len(tasks["nodes"]) == 0:
# empty nodes response indicates no tasks
complete = True
else:
await es.indices.forcemerge(**merge_params)

Expand Down
253 changes: 188 additions & 65 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1519,6 +1519,79 @@ async def test_bulk_index_success_with_refresh_invalid(self, es):


class TestForceMergeRunner:

def _eight_cluster_info_output(self):
return {
"name": "es01",
"cluster_name": "docker-cluster",
"cluster_uuid": "7KTGbgcOTgSC0_X8B57-Gg",
"version": {
"number": "8.1.0",
"build_flavor": "default",
"build_type": "docker",
"build_hash": "3700f7679f7d95e36da0b43762189bab189bc53a",
"build_date": "2022-03-03T14:20:00.690422633Z",
"build_snapshot": False,
"lucene_version": "9.0.0",
"minimum_wire_compatibility_version": "7.17.0",
"minimum_index_compatibility_version": "7.0.0",
},
"tagline": "You Know, for Search",
}

def _seven_cluster_info_output(self):
return {
"name": "es01",
"cluster_name": "escluster",
"cluster_uuid": "4BgOtWNiQ6-zap9zDW2Q1A",
"version": {
"number": "7.17.3",
"build_flavor": "default",
"build_type": "tar",
"build_hash": "5ad023604c8d7416c9eb6c0eadb62b14e766caff",
"build_date": "2022-04-19T08:11:19.070913226Z",
"build_snapshot": False,
"lucene_version": "8.11.1",
"minimum_wire_compatibility_version": "6.8.0",
"minimum_index_compatibility_version": "6.0.0-beta1",
},
"tagline": "You Know, for Search",
}

def _task_list_output(self):
return {
"nodes": {
"Ap3OfntPT7qL4CBeKvamxg": {
"name": "instance-0000000001",
"transport_address": "10.46.79.231:19693",
"host": "10.46.79.231",
"ip": "10.46.79.231:19693",
"roles": ["data", "ingest", "master", "remote_cluster_client", "transform"],
"attributes": {
"logical_availability_zone": "zone-1",
"server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526",
"availability_zone": "us-east4-a",
"xpack.installed": "true",
"instance_configuration": "gcp.data.highio.1",
"transform.node": "true",
"region": "unknown-region",
},
"tasks": {
"Ap3OfntPT7qL4CBeKvamxg:417009036": {
"node": "Ap3OfntPT7qL4CBeKvamxg",
"id": 417009036,
"type": "transport",
"action": "indices:admin/forcemerge",
"start_time_in_millis": 1598018980850,
"running_time_in_nanos": 3659821411,
"cancellable": False,
"headers": {},
}
},
}
}
}

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_force_merge_with_defaults(self, es):
Expand Down Expand Up @@ -1562,97 +1635,147 @@ async def test_force_merge_with_params(self, es):
@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_force_merge_with_polling_no_timeout(self, es):
es.indices.forcemerge = mock.AsyncMock()
es.indices.forcemerge = mock.AsyncMock(return_value={"task": "Ap3OfntPT7qL4CBeKvamxg"})
es.info = mock.AsyncMock(
return_value=self._eight_cluster_info_output(),
)
es.tasks.get = mock.AsyncMock(return_value={"completed": True})
es.tasks.list = mock.AsyncMock(
side_effect=[
self._task_list_output(),
{
"nodes": {},
},
]
)

force_merge = runner.ForceMerge()
await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0})
es.indices.forcemerge.assert_awaited_once_with(index="_all")
es.indices.forcemerge.assert_awaited_once_with(index="_all", wait_for_completion=False)
es.tasks.get.assert_awaited_once_with(task_id="Ap3OfntPT7qL4CBeKvamxg")
es.tasks.list.assert_not_awaited()

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_force_merge_with_polling(self, es):
es.indices.forcemerge = mock.AsyncMock(side_effect=elasticsearch.ConnectionTimeout(message="connection timeout"))
es.indices.forcemerge = mock.AsyncMock(return_value={"task": "Ap3OfntPT7qL4CBeKvamxg"})
es.info = mock.AsyncMock(
return_value=self._eight_cluster_info_output(),
)
es.tasks.get = mock.AsyncMock(return_value={"completed": True})
es.tasks.list = mock.AsyncMock(
side_effect=[
self._task_list_output,
{
"nodes": {
"Ap3OfntPT7qL4CBeKvamxg": {
"name": "instance-0000000001",
"transport_address": "10.46.79.231:19693",
"host": "10.46.79.231",
"ip": "10.46.79.231:19693",
"roles": ["data", "ingest", "master", "remote_cluster_client", "transform"],
"attributes": {
"logical_availability_zone": "zone-1",
"server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526",
"availability_zone": "us-east4-a",
"xpack.installed": "true",
"instance_configuration": "gcp.data.highio.1",
"transform.node": "true",
"region": "unknown-region",
},
"tasks": {
"Ap3OfntPT7qL4CBeKvamxg:417009036": {
"node": "Ap3OfntPT7qL4CBeKvamxg",
"id": 417009036,
"type": "transport",
"action": "indices:admin/forcemerge",
"start_time_in_millis": 1598018980850,
"running_time_in_nanos": 3659821411,
"cancellable": False,
"headers": {},
}
},
}
}
"nodes": {},
},
]
)
force_merge = runner.ForceMerge()
await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0})
es.indices.forcemerge.assert_awaited_once_with(index="_all", wait_for_completion=False)
es.tasks.get.assert_awaited_once_with(task_id="Ap3OfntPT7qL4CBeKvamxg")
es.tasks.list.assert_not_awaited()

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_force_merge_with_polling_and_params(self, es):
es.indices.forcemerge = mock.AsyncMock(return_value={"task": "Ap3OfntPT7qL4CBeKvamxg"})
es.info = mock.AsyncMock(return_value=self._eight_cluster_info_output())
es.tasks.get = mock.AsyncMock(return_value={"completed": True})
es.tasks.list = mock.AsyncMock(
side_effect=[
self._task_list_output(),
{
"nodes": {},
},
]
)
force_merge = runner.ForceMerge()
# request-timeout should be ignored as mode:polling
await force_merge(
es,
params={
"index": "_all",
"mode": "polling",
"max-num-segments": 1,
"request-timeout": 50000,
"poll-period": 0,
},
)
es.indices.forcemerge.assert_awaited_once_with(index="_all", max_num_segments=1, request_timeout=50000, wait_for_completion=False)
es.tasks.get.assert_awaited_once_with(task_id="Ap3OfntPT7qL4CBeKvamxg")
es.tasks.list.assert_not_awaited()

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_force_merge_with_polling_and_params_missing_task_id(self, es):
es.indices.forcemerge = mock.AsyncMock(return_value={})
es.info = mock.AsyncMock(return_value=self._eight_cluster_info_output())
es.tasks.get = mock.AsyncMock(return_value={"completed": True})
es.tasks.list = mock.AsyncMock(
side_effect=[
self._task_list_output(),
{
"nodes": {},
},
]
)
force_merge = runner.ForceMerge()
# request-timeout should be ignored as mode:polling
await force_merge(
es,
params={
"index": "_all",
"mode": "polling",
"max-num-segments": 1,
"request-timeout": 50000,
"poll-period": 0,
},
)
es.indices.forcemerge.assert_awaited_once_with(index="_all", max_num_segments=1, request_timeout=50000, wait_for_completion=False)
es.tasks.get.assert_not_awaited()
es.tasks.list.assert_awaited()

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_force_merge_with_polling_no_timeout_pre_8_1(self, es):
es.indices.forcemerge = mock.AsyncMock()
es.info = mock.AsyncMock(
return_value=self._seven_cluster_info_output(),
)

force_merge = runner.ForceMerge()
await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0})
es.indices.forcemerge.assert_awaited_once_with(index="_all")

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_force_merge_with_polling_and_params(self, es):
es.indices.forcemerge = mock.AsyncMock(return_value=elasticsearch.ConnectionTimeout("connection timeout"))
async def test_force_merge_with_polling_pre_8_1(self, es):
es.indices.forcemerge = mock.AsyncMock(side_effect=elasticsearch.ConnectionTimeout(message="connection timeout"))
es.info = mock.AsyncMock(return_value=self._seven_cluster_info_output())
es.tasks.list = mock.AsyncMock(
side_effect=[
self._task_list_output(),
{
"nodes": {
"Ap3OfntPT7qL4CBeKvamxg": {
"name": "instance-0000000001",
"transport_address": "10.46.79.231:19693",
"host": "10.46.79.231",
"ip": "10.46.79.231:19693",
"roles": ["data", "ingest", "master", "remote_cluster_client", "transform"],
"attributes": {
"logical_availability_zone": "zone-1",
"server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526",
"availability_zone": "us-east4-a",
"xpack.installed": "true",
"instance_configuration": "gcp.data.highio.1",
"transform.node": "true",
"region": "unknown-region",
},
"tasks": {
"Ap3OfntPT7qL4CBeKvamxg:417009036": {
"node": "Ap3OfntPT7qL4CBeKvamxg",
"id": 417009036,
"type": "transport",
"action": "indices:admin/forcemerge",
"start_time_in_millis": 1598018980850,
"running_time_in_nanos": 3659821411,
"cancellable": False,
"headers": {},
}
},
}
}
"nodes": {},
},
]
)
force_merge = runner.ForceMerge()
await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0})
es.indices.forcemerge.assert_awaited_once_with(index="_all")

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_force_merge_with_polling_and_params_pre_8_1(self, es):
es.indices.forcemerge = mock.AsyncMock(return_value=elasticsearch.ConnectionTimeout("connection timeout"))
es.info = mock.AsyncMock(
return_value=self._seven_cluster_info_output(),
)
es.tasks.list = mock.AsyncMock(
side_effect=[
self._task_list_output(),
{
"nodes": {},
},
Expand Down