From 74741e6794ffedacfc9962b353ed65c8ef2e0321 Mon Sep 17 00:00:00 2001 From: Matt Jibson Date: Tue, 19 May 2020 11:00:42 -0600 Subject: [PATCH] opt: add cost for executing geo functions in filters Geo costing is now the only function that multiplies its row count by the filters cost. The other JOINs and SELECT will be added in another PR since it is a more complicated change due to some plans changing, which must come with some justification. Fixes #48214 Release note: None --- pkg/sql/opt/memo/statistics_builder.go | 24 +++++++++ pkg/sql/opt/xform/coster.go | 73 ++++++++++++++++++++++++-- pkg/sql/opt/xform/custom_funcs.go | 6 +++ pkg/sql/opt/xform/testdata/rules/join | 8 +-- 4 files changed, 102 insertions(+), 9 deletions(-) diff --git a/pkg/sql/opt/memo/statistics_builder.go b/pkg/sql/opt/memo/statistics_builder.go index 83668d8e93fc..b7827462923e 100644 --- a/pkg/sql/opt/memo/statistics_builder.go +++ b/pkg/sql/opt/memo/statistics_builder.go @@ -2358,6 +2358,30 @@ func (sb *statisticsBuilder) rowsProcessed(e RelExpr) float64 { withoutOn := e.Memo().MemoizeLookupJoin(t.Input, nil /* on */, lookupJoinPrivate) return withoutOn.Relational().Stats.RowCount + case *GeoLookupJoinExpr: + var lookupJoinPrivate *GeoLookupJoinPrivate + switch t.JoinType { + case opt.SemiJoinOp, opt.SemiJoinApplyOp, opt.AntiJoinOp, opt.AntiJoinApplyOp: + // The number of rows processed for semi and anti joins is closer to the + // number of output rows for an equivalent inner join. + copy := t.GeoLookupJoinPrivate + copy.JoinType = semiAntiJoinToInnerJoin(t.JoinType) + lookupJoinPrivate = © + + default: + if t.On.IsTrue() { + // If there are no additional ON filters, the number of rows processed + // equals the number of output rows. + return e.Relational().Stats.RowCount + } + lookupJoinPrivate = &t.GeoLookupJoinPrivate + } + + // We need to determine the row count of the join before the + // ON conditions are applied. + withoutOn := e.Memo().MemoizeGeoLookupJoin(t.Input, nil /* on */, lookupJoinPrivate) + return withoutOn.Relational().Stats.RowCount + case *MergeJoinExpr: var mergeJoinPrivate *MergeJoinPrivate switch t.JoinType { diff --git a/pkg/sql/opt/xform/coster.go b/pkg/sql/opt/xform/coster.go index 0687a954b46b..634e81d184d4 100644 --- a/pkg/sql/opt/xform/coster.go +++ b/pkg/sql/opt/xform/coster.go @@ -133,6 +133,11 @@ const ( // If the final expression has this cost or larger, it means that there was no // plan that could satisfy the hints. hugeCost memo.Cost = 1e100 + + // Some benchmarks showed that some geo functions were atleast 10 times + // slower than some float functions, so this is a somewhat data-backed + // guess. + geoFnCost = cpuCostFactor * 10 ) // Init initializes a new coster structure with the given memo. @@ -183,7 +188,7 @@ func (c *coster) ComputeCost(candidate memo.RelExpr, required *physical.Required cost = c.computeLookupJoinCost(candidate.(*memo.LookupJoinExpr), required) case opt.GeoLookupJoinOp: - cost = c.computeGeoLookupJoinCost(candidate.(*memo.GeoLookupJoinExpr)) + cost = c.computeGeoLookupJoinCost(candidate.(*memo.GeoLookupJoinExpr), required) case opt.ZigzagJoinOp: cost = c.computeZigzagJoinCost(candidate.(*memo.ZigzagJoinExpr)) @@ -478,22 +483,73 @@ func (c *coster) computeLookupJoinCost( return cost } -func (c *coster) computeGeoLookupJoinCost(join *memo.GeoLookupJoinExpr) memo.Cost { +func (c *coster) computeGeoLookupJoinCost( + join *memo.GeoLookupJoinExpr, required *physical.Required, +) memo.Cost { lookupCount := join.Input.Relational().Stats.RowCount + // Take into account that the "internal" row count is higher, according to + // the selectivities of the conditions. In particular, we need to ignore + // the conditions that don't affect the number of rows processed. + // A contrived example, where gid is a SERIAL PK: + // nyc_census_blocks c JOIN nyc_neighborhoods n ON + // ST_Intersects(c.geom, n.geom) AND c.gid < n.gid + // which can become a lookup join with left-over condition c.gid < + // n.gid. + rowsProcessed, ok := c.mem.RowsProcessed(join) + if !ok { + // We shouldn't ever get here. Since we don't allow the memo + // to be optimized twice, the coster should never be used after + // logPropsBuilder.clear() is called. + panic(errors.AssertionFailedf("could not get rows processed for geolookup join")) + } + + // Lookup joins can return early if enough rows have been found. An otherwise + // expensive lookup join might have a lower cost if its limit hint estimates + // that most rows will not be needed. + if required.LimitHint != 0 && lookupCount > 0 { + outputRows := join.Relational().Stats.RowCount + unlimitedLookupCount := lookupCount + lookupCount = lookupJoinInputLimitHint(unlimitedLookupCount, outputRows, required.LimitHint) + // We scale the number of rows processed by the same factor (we are + // calculating the average number of rows processed per lookup and + // multiplying by the new lookup count). + rowsProcessed = (rowsProcessed / unlimitedLookupCount) * lookupCount + } + // The rows in the (left) input are used to probe into the (right) table. // Since the matching rows in the table may not all be in the same range, this // counts as random I/O. perLookupCost := memo.Cost(randIOCostFactor) + // Since inverted indexes can't form a key, execution will have to + // limit KV batches which prevents running requests to multiple nodes + // in parallel. An experiment on a 4 node cluster with a table with + // 100k rows split into 100 ranges showed that a "non-parallel" lookup + // join is about 5 times slower. + perLookupCost *= 5 cost := memo.Cost(lookupCount) * perLookupCost - // TODO: support GeoLookupJoinExpr in c.mem.RowsProcessed. See - // computeLookupJoinCost. + // Each lookup might retrieve many rows; add the IO cost of retrieving the + // rows (relevant when we expect many resulting rows per lookup) and the CPU + // cost of emitting the rows. + numLookupCols := join.Cols.Difference(join.Input.Relational().OutputCols).Len() + perRowCost := lookupJoinRetrieveRowCost + + c.rowScanCost(join.Table, join.Index, numLookupCols) + cost += memo.Cost(rowsProcessed) * perRowCost - cost += c.computeFiltersCost(join.On, util.FastIntMap{}) + // We don't add the result of computeFiltersCost to perRowCost because + // otherwise the 1 that is added to rowsProcessed would either have + // to be removed or be multiplied by all of the other various costs in + // perRowCost. To be consistent with other joins, keep it separate. + cost += c.computeFiltersCost(join.On, util.FastIntMap{}) * memo.Cost(1+rowsProcessed) return cost } +// computeFiltersCost returns the per-row cost of executing a filter. Callers +// of this function should multiply its output by the number of rows expected +// to be filtered + 1. The + 1 accounts for a setup cost and is useful for +// comparing costs of filters with very low row counts. +// TODO: account for per-row costs in all callers. func (c *coster) computeFiltersCost(filters memo.FiltersExpr, eqMap util.FastIntMap) memo.Cost { var cost memo.Cost for i := range filters { @@ -515,11 +571,18 @@ func (c *coster) computeFiltersCost(filters memo.FiltersExpr, eqMap util.FastInt // them. They do not cost anything. continue } + case opt.FunctionOp: + if IsGeoIndexFunction(f.Condition) { + cost += geoFnCost + } + // TODO(mjibson): do we need to cost other functions? } // Add a constant "setup" cost per ON condition to account for the fact that // the rowsProcessed estimate alone cannot effectively discriminate between // plans when RowCount is too small. + // TODO: perhaps separate the one-time and per-row costs and + // return them separately. cost += cpuCostFactor } return cost diff --git a/pkg/sql/opt/xform/custom_funcs.go b/pkg/sql/opt/xform/custom_funcs.go index 08c6fb21b438..7794a022158e 100644 --- a/pkg/sql/opt/xform/custom_funcs.go +++ b/pkg/sql/opt/xform/custom_funcs.go @@ -1651,6 +1651,12 @@ var geoRelationshipMap = map[string]geoindex.RelationshipType{ // IsGeoIndexFunction returns true if the given function is a geospatial // function that can be index-accelerated. func (c *CustomFuncs) IsGeoIndexFunction(fn opt.ScalarExpr) bool { + return IsGeoIndexFunction(fn) +} + +// IsGeoIndexFunction returns true if the given function is a geospatial +// function that can be index-accelerated. +func IsGeoIndexFunction(fn opt.ScalarExpr) bool { function := fn.(*memo.FunctionExpr) _, ok := geoRelationshipMap[function.Name] return ok diff --git a/pkg/sql/opt/xform/testdata/rules/join b/pkg/sql/opt/xform/testdata/rules/join index cb6f1d6ad983..1bb622d9210e 100644 --- a/pkg/sql/opt/xform/testdata/rules/join +++ b/pkg/sql/opt/xform/testdata/rules/join @@ -1693,16 +1693,16 @@ memo (optimized, ~23KB, required=[presentation: name:13,popn_per_sqkm:16]) ├── G1: (project G2 G3 name) │ └── [presentation: name:13,popn_per_sqkm:16] │ ├── best: (project G2 G3 name) - │ └── cost: 5110.53 + │ └── cost: 6717.66 ├── G2: (group-by G4 G5 cols=(13,14)) │ └── [] │ ├── best: (group-by G4 G5 cols=(13,14)) - │ └── cost: 5110.48 + │ └── cost: 6717.61 ├── G3: (projections G6) ├── G4: (inner-join G7 G8 G9) (inner-join G8 G7 G9) (lookup-join G10 G9 nyc_census_blocks,keyCols=[1],outCols=(3,9,10,12-14)) │ └── [] │ ├── best: (lookup-join G10 G9 nyc_census_blocks,keyCols=[1],outCols=(3,9,10,12-14)) - │ └── cost: 4903.53 + │ └── cost: 6510.67 ├── G5: (aggregations G11) ├── G6: (div G12 G13) ├── G7: (scan c,cols=(3,9,10)) @@ -1717,7 +1717,7 @@ memo (optimized, ~23KB, required=[presentation: name:13,popn_per_sqkm:16]) ├── G10: (geo-lookup-join G8 G18 nyc_census_blocks@nyc_census_blocks_geo_idx) │ └── [] │ ├── best: (geo-lookup-join G8 G18 nyc_census_blocks@nyc_census_blocks_geo_idx) - │ └── cost: 147.36 + │ └── cost: 1754.40 ├── G11: (sum G19) ├── G12: (variable sum) ├── G13: (div G20 G21)