Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -710,11 +710,18 @@ async def __call__(self, es, params):
merge_params["max_num_segments"] = max_num_segments
if mode == "polling":
complete = False
try:
es_info = await es.info()
es_version = Version.from_string(es_info["version"]["number"])
if es_version < Version(8, 1, 0): # before 8.1.0 wait_for_completion is not supported
try:
await es.indices.forcemerge(**merge_params)
complete = True
except elasticsearch.ConnectionTimeout:
pass
else:
complete = False
merge_params["wait_for_completion"] = False
await es.indices.forcemerge(**merge_params)
complete = True
except elasticsearch.ConnectionTimeout:
pass
while not complete:
await asyncio.sleep(params.get("poll-period"))
tasks = await es.tasks.list(params={"actions": "indices:admin/forcemerge"})
Expand Down
270 changes: 268 additions & 2 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1563,15 +1563,262 @@ async def test_force_merge_with_params(self, es):
@pytest.mark.asyncio
async def test_force_merge_with_polling_no_timeout(self, es):
es.indices.forcemerge = mock.AsyncMock()
es.info = mock.AsyncMock(
return_value={
"name": "es01",
"cluster_name": "docker-cluster",
"cluster_uuid": "7KTGbgcOTgSC0_X8B57-Gg",
"version": {
"number": "8.1.0",
"build_flavor": "default",
"build_type": "docker",
"build_hash": "3700f7679f7d95e36da0b43762189bab189bc53a",
"build_date": "2022-03-03T14:20:00.690422633Z",
"build_snapshot": False,
"lucene_version": "9.0.0",
"minimum_wire_compatibility_version": "7.17.0",
"minimum_index_compatibility_version": "7.0.0",
},
"tagline": "You Know, for Search",
},
)
es.tasks.list = mock.AsyncMock(
side_effect=[
{
"nodes": {
"Ap3OfntPT7qL4CBeKvamxg": {
"name": "instance-0000000001",
"transport_address": "10.46.79.231:19693",
"host": "10.46.79.231",
"ip": "10.46.79.231:19693",
"roles": ["data", "ingest", "master", "remote_cluster_client", "transform"],
"attributes": {
"logical_availability_zone": "zone-1",
"server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526",
"availability_zone": "us-east4-a",
"xpack.installed": "true",
"instance_configuration": "gcp.data.highio.1",
"transform.node": "true",
"region": "unknown-region",
},
"tasks": {
"Ap3OfntPT7qL4CBeKvamxg:417009036": {
"node": "Ap3OfntPT7qL4CBeKvamxg",
"id": 417009036,
"type": "transport",
"action": "indices:admin/forcemerge",
"start_time_in_millis": 1598018980850,
"running_time_in_nanos": 3659821411,
"cancellable": False,
"headers": {},
}
},
}
}
},
{
"nodes": {},
},
]
)

force_merge = runner.ForceMerge()
await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0})
es.indices.forcemerge.assert_awaited_once_with(index="_all")
es.indices.forcemerge.assert_awaited_once_with(index="_all", wait_for_completion=False)

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_force_merge_with_polling(self, es):
es.indices.forcemerge = mock.AsyncMock()
es.info = mock.AsyncMock(
return_value={
"name": "es01",
"cluster_name": "docker-cluster",
"cluster_uuid": "7KTGbgcOTgSC0_X8B57-Gg",
"version": {
"number": "8.1.0",
"build_flavor": "default",
"build_type": "docker",
"build_hash": "3700f7679f7d95e36da0b43762189bab189bc53a",
"build_date": "2022-03-03T14:20:00.690422633Z",
"build_snapshot": False,
"lucene_version": "9.0.0",
"minimum_wire_compatibility_version": "7.17.0",
"minimum_index_compatibility_version": "7.0.0",
},
"tagline": "You Know, for Search",
},
)
es.tasks.list = mock.AsyncMock(
side_effect=[
{
"nodes": {
"Ap3OfntPT7qL4CBeKvamxg": {
"name": "instance-0000000001",
"transport_address": "10.46.79.231:19693",
"host": "10.46.79.231",
"ip": "10.46.79.231:19693",
"roles": ["data", "ingest", "master", "remote_cluster_client", "transform"],
"attributes": {
"logical_availability_zone": "zone-1",
"server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526",
"availability_zone": "us-east4-a",
"xpack.installed": "true",
"instance_configuration": "gcp.data.highio.1",
"transform.node": "true",
"region": "unknown-region",
},
"tasks": {
"Ap3OfntPT7qL4CBeKvamxg:417009036": {
"node": "Ap3OfntPT7qL4CBeKvamxg",
"id": 417009036,
"type": "transport",
"action": "indices:admin/forcemerge",
"start_time_in_millis": 1598018980850,
"running_time_in_nanos": 3659821411,
"cancellable": False,
"headers": {},
}
},
}
}
},
{
"nodes": {},
},
]
)
force_merge = runner.ForceMerge()
await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0})
es.indices.forcemerge.assert_awaited_once_with(index="_all", wait_for_completion=False)

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_force_merge_with_polling_and_params(self, es):
es.indices.forcemerge = mock.AsyncMock()
es.info = mock.AsyncMock(
return_value={
"name": "es01",
"cluster_name": "docker-cluster",
"cluster_uuid": "7KTGbgcOTgSC0_X8B57-Gg",
"version": {
"number": "8.1.0",
"build_flavor": "default",
"build_type": "docker",
"build_hash": "3700f7679f7d95e36da0b43762189bab189bc53a",
"build_date": "2022-03-03T14:20:00.690422633Z",
"build_snapshot": False,
"lucene_version": "9.0.0",
"minimum_wire_compatibility_version": "7.17.0",
"minimum_index_compatibility_version": "7.0.0",
},
"tagline": "You Know, for Search",
},
)
es.tasks.list = mock.AsyncMock(
side_effect=[
{
"nodes": {
"Ap3OfntPT7qL4CBeKvamxg": {
"name": "instance-0000000001",
"transport_address": "10.46.79.231:19693",
"host": "10.46.79.231",
"ip": "10.46.79.231:19693",
"roles": ["data", "ingest", "master", "remote_cluster_client", "transform"],
"attributes": {
"logical_availability_zone": "zone-1",
"server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526",
"availability_zone": "us-east4-a",
"xpack.installed": "true",
"instance_configuration": "gcp.data.highio.1",
"transform.node": "true",
"region": "unknown-region",
},
"tasks": {
"Ap3OfntPT7qL4CBeKvamxg:417009036": {
"node": "Ap3OfntPT7qL4CBeKvamxg",
"id": 417009036,
"type": "transport",
"action": "indices:admin/forcemerge",
"start_time_in_millis": 1598018980850,
"running_time_in_nanos": 3659821411,
"cancellable": False,
"headers": {},
}
},
}
}
},
{
"nodes": {},
},
]
)
force_merge = runner.ForceMerge()
# request-timeout should be ignored as mode:polling
await force_merge(
es,
params={
"index": "_all",
"mode": "polling",
"max-num-segments": 1,
"request-timeout": 50000,
"poll-period": 0,
},
)
es.indices.forcemerge.assert_awaited_once_with(index="_all", max_num_segments=1, request_timeout=50000, wait_for_completion=False)

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_force_merge_with_polling_no_timeout_pre_8_1(self, es):
es.indices.forcemerge = mock.AsyncMock()
es.info = mock.AsyncMock(
return_value={
"name": "es01",
"cluster_name": "escluster",
"cluster_uuid": "4BgOtWNiQ6-zap9zDW2Q1A",
"version": {
"number": "7.17.3",
"build_flavor": "default",
"build_type": "tar",
"build_hash": "5ad023604c8d7416c9eb6c0eadb62b14e766caff",
"build_date": "2022-04-19T08:11:19.070913226Z",
"build_snapshot": False,
"lucene_version": "8.11.1",
"minimum_wire_compatibility_version": "6.8.0",
"minimum_index_compatibility_version": "6.0.0-beta1",
},
"tagline": "You Know, for Search",
},
)

force_merge = runner.ForceMerge()
await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0})
es.indices.forcemerge.assert_awaited_once_with(index="_all")

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_force_merge_with_polling_pre_8_1(self, es):
es.indices.forcemerge = mock.AsyncMock(side_effect=elasticsearch.ConnectionTimeout(message="connection timeout"))
es.info = mock.AsyncMock(
return_value={
"name": "es01",
"cluster_name": "escluster",
"cluster_uuid": "4BgOtWNiQ6-zap9zDW2Q1A",
"version": {
"number": "7.17.3",
"build_flavor": "default",
"build_type": "tar",
"build_hash": "5ad023604c8d7416c9eb6c0eadb62b14e766caff",
"build_date": "2022-04-19T08:11:19.070913226Z",
"build_snapshot": False,
"lucene_version": "8.11.1",
"minimum_wire_compatibility_version": "6.8.0",
"minimum_index_compatibility_version": "6.0.0-beta1",
},
"tagline": "You Know, for Search",
},
)
es.tasks.list = mock.AsyncMock(
side_effect=[
{
Expand Down Expand Up @@ -1617,8 +1864,27 @@ async def test_force_merge_with_polling(self, es):

@mock.patch("elasticsearch.Elasticsearch")
@pytest.mark.asyncio
async def test_force_merge_with_polling_and_params(self, es):
async def test_force_merge_with_polling_and_params_pre_8_1(self, es):
es.indices.forcemerge = mock.AsyncMock(return_value=elasticsearch.ConnectionTimeout("connection timeout"))
es.info = mock.AsyncMock(
return_value={
"name": "es01",
"cluster_name": "escluster",
"cluster_uuid": "4BgOtWNiQ6-zap9zDW2Q1A",
"version": {
"number": "7.17.3",
"build_flavor": "default",
"build_type": "tar",
"build_hash": "5ad023604c8d7416c9eb6c0eadb62b14e766caff",
"build_date": "2022-04-19T08:11:19.070913226Z",
"build_snapshot": False,
"lucene_version": "8.11.1",
"minimum_wire_compatibility_version": "6.8.0",
"minimum_index_compatibility_version": "6.0.0-beta1",
},
"tagline": "You Know, for Search",
},
)
es.tasks.list = mock.AsyncMock(
side_effect=[
{
Expand Down