Skip to content

Commit 004003d

Browse files
Merge pull request ClickHouse#76700 from ClickHouse/backport/25.1/75149
Backport ClickHouse#75149 to 25.1: Fix double preallocation in `ConcurrentHashJoin` in case of swap join sides
2 parents 8461fb8 + 724b061 commit 004003d

File tree

4 files changed

+38
-33
lines changed

4 files changed

+38
-33
lines changed

src/Interpreters/ConcurrentHashJoin.cpp

Lines changed: 11 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -101,18 +101,13 @@ HashJoin::RightTableDataPtr getData(const std::shared_ptr<ConcurrentHashJoin::In
101101
return join->data->getJoinedData();
102102
}
103103

104-
void reserveSpaceInHashMaps(
105-
const std::vector<std::shared_ptr<ConcurrentHashJoin::InternalHashJoin>> & hash_joins,
106-
ThreadPool * pool,
107-
const StatsCollectingParams & stats_collecting_params,
108-
size_t slots)
104+
void reserveSpaceInHashMaps(HashJoin & hash_join, size_t ind, const StatsCollectingParams & stats_collecting_params, size_t slots)
109105
{
110106
if (auto hint = getSizeHint(stats_collecting_params, slots))
111107
{
112108
/// Hash map is shared between all `HashJoin` instances, so the `median_size` is actually the total size
113109
/// we need to preallocate in all buckets of all hash maps.
114110
const size_t reserve_size = hint->median_size;
115-
ProfileEvents::increment(ProfileEvents::HashJoinPreallocatedElementsInHashTables, reserve_size);
116111

117112
/// Each `HashJoin` instance will "own" a subset of buckets during the build phase. Because of that
118113
/// we preallocate space only in the specific buckets of each `HashJoin` instance.
@@ -129,25 +124,9 @@ void reserveSpaceInHashMaps(
129124
})
130125
};
131126

132-
for (size_t i = 0; i < slots; ++i)
133-
{
134-
pool->scheduleOrThrow(
135-
[&, i, thread_group = CurrentThread::getGroup()]()
136-
{
137-
SCOPE_EXIT_SAFE({
138-
if (thread_group)
139-
CurrentThread::detachFromGroupIfNotDetached();
140-
});
141-
142-
if (thread_group)
143-
CurrentThread::attachToGroupIfDetached(thread_group);
144-
setThreadName("ConcurrentJoin");
145-
146-
const auto & right_data = getData(hash_joins[i]);
147-
std::visit([&](auto & maps) { return reserve_space_in_buckets(maps, right_data->type, i); }, right_data->maps.at(0));
148-
});
149-
}
150-
pool->wait();
127+
const auto & right_data = hash_join.getJoinedData();
128+
std::visit([&](auto & maps) { return reserve_space_in_buckets(maps, right_data->type, ind); }, right_data->maps.at(0));
129+
ProfileEvents::increment(ProfileEvents::HashJoinPreallocatedElementsInHashTables, reserve_size / slots);
151130
}
152131
}
153132

@@ -220,12 +199,6 @@ ConcurrentHashJoin::ConcurrentHashJoin(
220199
});
221200
}
222201
pool->wait();
223-
224-
if (!hash_joins[0]->data->twoLevelMapIsUsed())
225-
/// Means fixed-size hash map is used
226-
return;
227-
228-
reserveSpaceInHashMaps(hash_joins, pool.get(), stats_collecting_params, slots);
229202
}
230203
catch (...)
231204
{
@@ -316,6 +289,12 @@ bool ConcurrentHashJoin::addBlockToJoin(const Block & right_block_, bool check_l
316289
if (!lock.owns_lock())
317290
continue;
318291

292+
if (!hash_join->space_was_preallocated && hash_join->data->twoLevelMapIsUsed())
293+
{
294+
reserveSpaceInHashMaps(*hash_join->data, i, stats_collecting_params, slots);
295+
hash_join->space_was_preallocated = true;
296+
}
297+
319298
bool limit_exceeded = !hash_join->data->addBlockToJoin(dispatched_block, check_limits);
320299

321300
dispatched_block = {};
@@ -636,7 +615,7 @@ void ConcurrentHashJoin::onBuildPhaseFinish()
636615
{
637616
for (auto & hash_join : hash_joins)
638617
{
639-
// It cannot be called concurrently with other IJoin methods
618+
// `onBuildPhaseFinish` cannot be called concurrently with other IJoin methods, so we don't need a lock to access internal joins.
640619
hash_join->data->onBuildPhaseFinish();
641620
}
642621

src/Interpreters/ConcurrentHashJoin.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ class ConcurrentHashJoin : public IJoin
7171
IBlocksStreamPtr
7272
getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override;
7373

74-
7574
bool isCloneSupported() const override
7675
{
7776
return !getTotals() && getTotalRowCount() == 0;
@@ -88,6 +87,7 @@ class ConcurrentHashJoin : public IJoin
8887
{
8988
std::mutex mutex;
9089
std::unique_ptr<HashJoin> data;
90+
bool space_was_preallocated = false;
9191
};
9292

9393
private:
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
100000
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
-- Tags: no-tsan, no-asan, no-msan, no-ubsan, no-parallel-replicas
2+
3+
drop table if exists lhs;
4+
drop table if exists rhs;
5+
6+
create table lhs(a UInt64, b UInt64) Engine = MergeTree order by tuple();
7+
create table rhs(a UInt64, b UInt64) Engine = MergeTree order by tuple();
8+
9+
insert into lhs select number, number from numbers_mt(1e5);
10+
-- rhs should be bigger to trigger tables swap (see `query_plan_join_swap_table`)
11+
insert into rhs select number, number from numbers_mt(1e6);
12+
13+
set max_threads = 8, query_plan_join_swap_table = 1, join_algorithm = 'parallel_hash', enable_analyzer = 1;
14+
15+
-- First populate the cache of hash table sizes
16+
select * from lhs as t1 join rhs as t2 on t1.a = t2.a format Null;
17+
18+
-- For the next run we will preallocate the space
19+
select * from lhs as t1 join rhs as t2 on t1.a = t2.a format Null settings log_comment = '03319_second_query';
20+
21+
system flush logs;
22+
23+
select ProfileEvents['HashJoinPreallocatedElementsInHashTables']
24+
from system.query_log
25+
where event_date >= yesterday() and current_database = currentDatabase() and type = 'QueryFinish' and log_comment = '03319_second_query';

0 commit comments

Comments
 (0)