Skip to content

Commit c04cac3

Browse files
authored
Use wait_for_completion:False instead of relying on timeouts in ForceMerge (#2016)
* Use wait_for_completion:False instead of relying on timeouts * Retain old behaviour if < 8.1 * Update tests to cover new behaviour * Add ability to trace via task_id if present. Update tests
1 parent 3dfda14 commit c04cac3

File tree

2 files changed

+210
-74
lines changed

2 files changed

+210
-74
lines changed

esrally/driver/runner.py

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -709,18 +709,31 @@ async def __call__(self, es, params):
709709
if max_num_segments:
710710
merge_params["max_num_segments"] = max_num_segments
711711
if mode == "polling":
712+
task_id = None
712713
complete = False
713-
try:
714-
await es.indices.forcemerge(**merge_params)
715-
complete = True
716-
except elasticsearch.ConnectionTimeout:
717-
pass
714+
es_info = await es.info()
715+
es_version = Version.from_string(es_info["version"]["number"])
716+
if es_version < Version(8, 1, 0): # before 8.1.0 wait_for_completion is not supported
717+
try:
718+
await es.indices.forcemerge(**merge_params)
719+
complete = True
720+
except elasticsearch.ConnectionTimeout:
721+
pass
722+
else:
723+
complete = False
724+
merge_params["wait_for_completion"] = False
725+
response = await es.indices.forcemerge(**merge_params)
726+
task_id = response.get("task")
718727
while not complete:
719728
await asyncio.sleep(params.get("poll-period"))
720-
tasks = await es.tasks.list(params={"actions": "indices:admin/forcemerge"})
721-
if len(tasks["nodes"]) == 0:
722-
# empty nodes response indicates no tasks
723-
complete = True
729+
if task_id:
730+
tasks = await es.tasks.get(task_id=task_id)
731+
complete = tasks.get("completed", False)
732+
else:
733+
tasks = await es.tasks.list(params={"actions": "indices:admin/forcemerge"})
734+
if len(tasks["nodes"]) == 0:
735+
# empty nodes response indicates no tasks
736+
complete = True
724737
else:
725738
await es.indices.forcemerge(**merge_params)
726739

tests/driver/runner_test.py

Lines changed: 188 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1519,6 +1519,79 @@ async def test_bulk_index_success_with_refresh_invalid(self, es):
15191519

15201520

15211521
class TestForceMergeRunner:
1522+
1523+
def _eight_cluster_info_output(self):
1524+
return {
1525+
"name": "es01",
1526+
"cluster_name": "docker-cluster",
1527+
"cluster_uuid": "7KTGbgcOTgSC0_X8B57-Gg",
1528+
"version": {
1529+
"number": "8.1.0",
1530+
"build_flavor": "default",
1531+
"build_type": "docker",
1532+
"build_hash": "3700f7679f7d95e36da0b43762189bab189bc53a",
1533+
"build_date": "2022-03-03T14:20:00.690422633Z",
1534+
"build_snapshot": False,
1535+
"lucene_version": "9.0.0",
1536+
"minimum_wire_compatibility_version": "7.17.0",
1537+
"minimum_index_compatibility_version": "7.0.0",
1538+
},
1539+
"tagline": "You Know, for Search",
1540+
}
1541+
1542+
def _seven_cluster_info_output(self):
1543+
return {
1544+
"name": "es01",
1545+
"cluster_name": "escluster",
1546+
"cluster_uuid": "4BgOtWNiQ6-zap9zDW2Q1A",
1547+
"version": {
1548+
"number": "7.17.3",
1549+
"build_flavor": "default",
1550+
"build_type": "tar",
1551+
"build_hash": "5ad023604c8d7416c9eb6c0eadb62b14e766caff",
1552+
"build_date": "2022-04-19T08:11:19.070913226Z",
1553+
"build_snapshot": False,
1554+
"lucene_version": "8.11.1",
1555+
"minimum_wire_compatibility_version": "6.8.0",
1556+
"minimum_index_compatibility_version": "6.0.0-beta1",
1557+
},
1558+
"tagline": "You Know, for Search",
1559+
}
1560+
1561+
def _task_list_output(self):
1562+
return {
1563+
"nodes": {
1564+
"Ap3OfntPT7qL4CBeKvamxg": {
1565+
"name": "instance-0000000001",
1566+
"transport_address": "10.46.79.231:19693",
1567+
"host": "10.46.79.231",
1568+
"ip": "10.46.79.231:19693",
1569+
"roles": ["data", "ingest", "master", "remote_cluster_client", "transform"],
1570+
"attributes": {
1571+
"logical_availability_zone": "zone-1",
1572+
"server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526",
1573+
"availability_zone": "us-east4-a",
1574+
"xpack.installed": "true",
1575+
"instance_configuration": "gcp.data.highio.1",
1576+
"transform.node": "true",
1577+
"region": "unknown-region",
1578+
},
1579+
"tasks": {
1580+
"Ap3OfntPT7qL4CBeKvamxg:417009036": {
1581+
"node": "Ap3OfntPT7qL4CBeKvamxg",
1582+
"id": 417009036,
1583+
"type": "transport",
1584+
"action": "indices:admin/forcemerge",
1585+
"start_time_in_millis": 1598018980850,
1586+
"running_time_in_nanos": 3659821411,
1587+
"cancellable": False,
1588+
"headers": {},
1589+
}
1590+
},
1591+
}
1592+
}
1593+
}
1594+
15221595
@mock.patch("elasticsearch.Elasticsearch")
15231596
@pytest.mark.asyncio
15241597
async def test_force_merge_with_defaults(self, es):
@@ -1562,97 +1635,147 @@ async def test_force_merge_with_params(self, es):
15621635
@mock.patch("elasticsearch.Elasticsearch")
15631636
@pytest.mark.asyncio
15641637
async def test_force_merge_with_polling_no_timeout(self, es):
1565-
es.indices.forcemerge = mock.AsyncMock()
1638+
es.indices.forcemerge = mock.AsyncMock(return_value={"task": "Ap3OfntPT7qL4CBeKvamxg"})
1639+
es.info = mock.AsyncMock(
1640+
return_value=self._eight_cluster_info_output(),
1641+
)
1642+
es.tasks.get = mock.AsyncMock(return_value={"completed": True})
1643+
es.tasks.list = mock.AsyncMock(
1644+
side_effect=[
1645+
self._task_list_output(),
1646+
{
1647+
"nodes": {},
1648+
},
1649+
]
1650+
)
15661651

15671652
force_merge = runner.ForceMerge()
15681653
await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0})
1569-
es.indices.forcemerge.assert_awaited_once_with(index="_all")
1654+
es.indices.forcemerge.assert_awaited_once_with(index="_all", wait_for_completion=False)
1655+
es.tasks.get.assert_awaited_once_with(task_id="Ap3OfntPT7qL4CBeKvamxg")
1656+
es.tasks.list.assert_not_awaited()
15701657

15711658
@mock.patch("elasticsearch.Elasticsearch")
15721659
@pytest.mark.asyncio
15731660
async def test_force_merge_with_polling(self, es):
1574-
es.indices.forcemerge = mock.AsyncMock(side_effect=elasticsearch.ConnectionTimeout(message="connection timeout"))
1661+
es.indices.forcemerge = mock.AsyncMock(return_value={"task": "Ap3OfntPT7qL4CBeKvamxg"})
1662+
es.info = mock.AsyncMock(
1663+
return_value=self._eight_cluster_info_output(),
1664+
)
1665+
es.tasks.get = mock.AsyncMock(return_value={"completed": True})
15751666
es.tasks.list = mock.AsyncMock(
15761667
side_effect=[
1668+
self._task_list_output,
15771669
{
1578-
"nodes": {
1579-
"Ap3OfntPT7qL4CBeKvamxg": {
1580-
"name": "instance-0000000001",
1581-
"transport_address": "10.46.79.231:19693",
1582-
"host": "10.46.79.231",
1583-
"ip": "10.46.79.231:19693",
1584-
"roles": ["data", "ingest", "master", "remote_cluster_client", "transform"],
1585-
"attributes": {
1586-
"logical_availability_zone": "zone-1",
1587-
"server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526",
1588-
"availability_zone": "us-east4-a",
1589-
"xpack.installed": "true",
1590-
"instance_configuration": "gcp.data.highio.1",
1591-
"transform.node": "true",
1592-
"region": "unknown-region",
1593-
},
1594-
"tasks": {
1595-
"Ap3OfntPT7qL4CBeKvamxg:417009036": {
1596-
"node": "Ap3OfntPT7qL4CBeKvamxg",
1597-
"id": 417009036,
1598-
"type": "transport",
1599-
"action": "indices:admin/forcemerge",
1600-
"start_time_in_millis": 1598018980850,
1601-
"running_time_in_nanos": 3659821411,
1602-
"cancellable": False,
1603-
"headers": {},
1604-
}
1605-
},
1606-
}
1607-
}
1670+
"nodes": {},
16081671
},
1672+
]
1673+
)
1674+
force_merge = runner.ForceMerge()
1675+
await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0})
1676+
es.indices.forcemerge.assert_awaited_once_with(index="_all", wait_for_completion=False)
1677+
es.tasks.get.assert_awaited_once_with(task_id="Ap3OfntPT7qL4CBeKvamxg")
1678+
es.tasks.list.assert_not_awaited()
1679+
1680+
@mock.patch("elasticsearch.Elasticsearch")
1681+
@pytest.mark.asyncio
1682+
async def test_force_merge_with_polling_and_params(self, es):
1683+
es.indices.forcemerge = mock.AsyncMock(return_value={"task": "Ap3OfntPT7qL4CBeKvamxg"})
1684+
es.info = mock.AsyncMock(return_value=self._eight_cluster_info_output())
1685+
es.tasks.get = mock.AsyncMock(return_value={"completed": True})
1686+
es.tasks.list = mock.AsyncMock(
1687+
side_effect=[
1688+
self._task_list_output(),
16091689
{
16101690
"nodes": {},
16111691
},
16121692
]
16131693
)
1694+
force_merge = runner.ForceMerge()
1695+
# request-timeout should be ignored as mode:polling
1696+
await force_merge(
1697+
es,
1698+
params={
1699+
"index": "_all",
1700+
"mode": "polling",
1701+
"max-num-segments": 1,
1702+
"request-timeout": 50000,
1703+
"poll-period": 0,
1704+
},
1705+
)
1706+
es.indices.forcemerge.assert_awaited_once_with(index="_all", max_num_segments=1, request_timeout=50000, wait_for_completion=False)
1707+
es.tasks.get.assert_awaited_once_with(task_id="Ap3OfntPT7qL4CBeKvamxg")
1708+
es.tasks.list.assert_not_awaited()
1709+
1710+
@mock.patch("elasticsearch.Elasticsearch")
1711+
@pytest.mark.asyncio
1712+
async def test_force_merge_with_polling_and_params_missing_task_id(self, es):
1713+
es.indices.forcemerge = mock.AsyncMock(return_value={})
1714+
es.info = mock.AsyncMock(return_value=self._eight_cluster_info_output())
1715+
es.tasks.get = mock.AsyncMock(return_value={"completed": True})
1716+
es.tasks.list = mock.AsyncMock(
1717+
side_effect=[
1718+
self._task_list_output(),
1719+
{
1720+
"nodes": {},
1721+
},
1722+
]
1723+
)
1724+
force_merge = runner.ForceMerge()
1725+
# request-timeout should be ignored as mode:polling
1726+
await force_merge(
1727+
es,
1728+
params={
1729+
"index": "_all",
1730+
"mode": "polling",
1731+
"max-num-segments": 1,
1732+
"request-timeout": 50000,
1733+
"poll-period": 0,
1734+
},
1735+
)
1736+
es.indices.forcemerge.assert_awaited_once_with(index="_all", max_num_segments=1, request_timeout=50000, wait_for_completion=False)
1737+
es.tasks.get.assert_not_awaited()
1738+
es.tasks.list.assert_awaited()
1739+
1740+
@mock.patch("elasticsearch.Elasticsearch")
1741+
@pytest.mark.asyncio
1742+
async def test_force_merge_with_polling_no_timeout_pre_8_1(self, es):
1743+
es.indices.forcemerge = mock.AsyncMock()
1744+
es.info = mock.AsyncMock(
1745+
return_value=self._seven_cluster_info_output(),
1746+
)
1747+
16141748
force_merge = runner.ForceMerge()
16151749
await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0})
16161750
es.indices.forcemerge.assert_awaited_once_with(index="_all")
16171751

16181752
@mock.patch("elasticsearch.Elasticsearch")
16191753
@pytest.mark.asyncio
1620-
async def test_force_merge_with_polling_and_params(self, es):
1621-
es.indices.forcemerge = mock.AsyncMock(return_value=elasticsearch.ConnectionTimeout("connection timeout"))
1754+
async def test_force_merge_with_polling_pre_8_1(self, es):
1755+
es.indices.forcemerge = mock.AsyncMock(side_effect=elasticsearch.ConnectionTimeout(message="connection timeout"))
1756+
es.info = mock.AsyncMock(return_value=self._seven_cluster_info_output())
16221757
es.tasks.list = mock.AsyncMock(
16231758
side_effect=[
1759+
self._task_list_output(),
16241760
{
1625-
"nodes": {
1626-
"Ap3OfntPT7qL4CBeKvamxg": {
1627-
"name": "instance-0000000001",
1628-
"transport_address": "10.46.79.231:19693",
1629-
"host": "10.46.79.231",
1630-
"ip": "10.46.79.231:19693",
1631-
"roles": ["data", "ingest", "master", "remote_cluster_client", "transform"],
1632-
"attributes": {
1633-
"logical_availability_zone": "zone-1",
1634-
"server_name": "instance-0000000001.64cb4c66f4f24d85b41f120ef2df5526",
1635-
"availability_zone": "us-east4-a",
1636-
"xpack.installed": "true",
1637-
"instance_configuration": "gcp.data.highio.1",
1638-
"transform.node": "true",
1639-
"region": "unknown-region",
1640-
},
1641-
"tasks": {
1642-
"Ap3OfntPT7qL4CBeKvamxg:417009036": {
1643-
"node": "Ap3OfntPT7qL4CBeKvamxg",
1644-
"id": 417009036,
1645-
"type": "transport",
1646-
"action": "indices:admin/forcemerge",
1647-
"start_time_in_millis": 1598018980850,
1648-
"running_time_in_nanos": 3659821411,
1649-
"cancellable": False,
1650-
"headers": {},
1651-
}
1652-
},
1653-
}
1654-
}
1761+
"nodes": {},
16551762
},
1763+
]
1764+
)
1765+
force_merge = runner.ForceMerge()
1766+
await force_merge(es, params={"index": "_all", "mode": "polling", "poll-period": 0})
1767+
es.indices.forcemerge.assert_awaited_once_with(index="_all")
1768+
1769+
@mock.patch("elasticsearch.Elasticsearch")
1770+
@pytest.mark.asyncio
1771+
async def test_force_merge_with_polling_and_params_pre_8_1(self, es):
1772+
es.indices.forcemerge = mock.AsyncMock(return_value=elasticsearch.ConnectionTimeout("connection timeout"))
1773+
es.info = mock.AsyncMock(
1774+
return_value=self._seven_cluster_info_output(),
1775+
)
1776+
es.tasks.list = mock.AsyncMock(
1777+
side_effect=[
1778+
self._task_list_output(),
16561779
{
16571780
"nodes": {},
16581781
},

0 commit comments

Comments
 (0)