diff --git a/pkg/sql/opt/memo/statistics_builder.go b/pkg/sql/opt/memo/statistics_builder.go index 83668d8e93fc..b7827462923e 100644 --- a/pkg/sql/opt/memo/statistics_builder.go +++ b/pkg/sql/opt/memo/statistics_builder.go @@ -2358,6 +2358,30 @@ func (sb *statisticsBuilder) rowsProcessed(e RelExpr) float64 { withoutOn := e.Memo().MemoizeLookupJoin(t.Input, nil /* on */, lookupJoinPrivate) return withoutOn.Relational().Stats.RowCount + case *GeoLookupJoinExpr: + var lookupJoinPrivate *GeoLookupJoinPrivate + switch t.JoinType { + case opt.SemiJoinOp, opt.SemiJoinApplyOp, opt.AntiJoinOp, opt.AntiJoinApplyOp: + // The number of rows processed for semi and anti joins is closer to the + // number of output rows for an equivalent inner join. + copy := t.GeoLookupJoinPrivate + copy.JoinType = semiAntiJoinToInnerJoin(t.JoinType) + lookupJoinPrivate = © + + default: + if t.On.IsTrue() { + // If there are no additional ON filters, the number of rows processed + // equals the number of output rows. + return e.Relational().Stats.RowCount + } + lookupJoinPrivate = &t.GeoLookupJoinPrivate + } + + // We need to determine the row count of the join before the + // ON conditions are applied. + withoutOn := e.Memo().MemoizeGeoLookupJoin(t.Input, nil /* on */, lookupJoinPrivate) + return withoutOn.Relational().Stats.RowCount + case *MergeJoinExpr: var mergeJoinPrivate *MergeJoinPrivate switch t.JoinType { diff --git a/pkg/sql/opt/xform/coster.go b/pkg/sql/opt/xform/coster.go index 0687a954b46b..634e81d184d4 100644 --- a/pkg/sql/opt/xform/coster.go +++ b/pkg/sql/opt/xform/coster.go @@ -133,6 +133,11 @@ const ( // If the final expression has this cost or larger, it means that there was no // plan that could satisfy the hints. hugeCost memo.Cost = 1e100 + + // Some benchmarks showed that some geo functions were atleast 10 times + // slower than some float functions, so this is a somewhat data-backed + // guess. + geoFnCost = cpuCostFactor * 10 ) // Init initializes a new coster structure with the given memo. @@ -183,7 +188,7 @@ func (c *coster) ComputeCost(candidate memo.RelExpr, required *physical.Required cost = c.computeLookupJoinCost(candidate.(*memo.LookupJoinExpr), required) case opt.GeoLookupJoinOp: - cost = c.computeGeoLookupJoinCost(candidate.(*memo.GeoLookupJoinExpr)) + cost = c.computeGeoLookupJoinCost(candidate.(*memo.GeoLookupJoinExpr), required) case opt.ZigzagJoinOp: cost = c.computeZigzagJoinCost(candidate.(*memo.ZigzagJoinExpr)) @@ -478,22 +483,73 @@ func (c *coster) computeLookupJoinCost( return cost } -func (c *coster) computeGeoLookupJoinCost(join *memo.GeoLookupJoinExpr) memo.Cost { +func (c *coster) computeGeoLookupJoinCost( + join *memo.GeoLookupJoinExpr, required *physical.Required, +) memo.Cost { lookupCount := join.Input.Relational().Stats.RowCount + // Take into account that the "internal" row count is higher, according to + // the selectivities of the conditions. In particular, we need to ignore + // the conditions that don't affect the number of rows processed. + // A contrived example, where gid is a SERIAL PK: + // nyc_census_blocks c JOIN nyc_neighborhoods n ON + // ST_Intersects(c.geom, n.geom) AND c.gid < n.gid + // which can become a lookup join with left-over condition c.gid < + // n.gid. + rowsProcessed, ok := c.mem.RowsProcessed(join) + if !ok { + // We shouldn't ever get here. Since we don't allow the memo + // to be optimized twice, the coster should never be used after + // logPropsBuilder.clear() is called. + panic(errors.AssertionFailedf("could not get rows processed for geolookup join")) + } + + // Lookup joins can return early if enough rows have been found. An otherwise + // expensive lookup join might have a lower cost if its limit hint estimates + // that most rows will not be needed. + if required.LimitHint != 0 && lookupCount > 0 { + outputRows := join.Relational().Stats.RowCount + unlimitedLookupCount := lookupCount + lookupCount = lookupJoinInputLimitHint(unlimitedLookupCount, outputRows, required.LimitHint) + // We scale the number of rows processed by the same factor (we are + // calculating the average number of rows processed per lookup and + // multiplying by the new lookup count). + rowsProcessed = (rowsProcessed / unlimitedLookupCount) * lookupCount + } + // The rows in the (left) input are used to probe into the (right) table. // Since the matching rows in the table may not all be in the same range, this // counts as random I/O. perLookupCost := memo.Cost(randIOCostFactor) + // Since inverted indexes can't form a key, execution will have to + // limit KV batches which prevents running requests to multiple nodes + // in parallel. An experiment on a 4 node cluster with a table with + // 100k rows split into 100 ranges showed that a "non-parallel" lookup + // join is about 5 times slower. + perLookupCost *= 5 cost := memo.Cost(lookupCount) * perLookupCost - // TODO: support GeoLookupJoinExpr in c.mem.RowsProcessed. See - // computeLookupJoinCost. + // Each lookup might retrieve many rows; add the IO cost of retrieving the + // rows (relevant when we expect many resulting rows per lookup) and the CPU + // cost of emitting the rows. + numLookupCols := join.Cols.Difference(join.Input.Relational().OutputCols).Len() + perRowCost := lookupJoinRetrieveRowCost + + c.rowScanCost(join.Table, join.Index, numLookupCols) + cost += memo.Cost(rowsProcessed) * perRowCost - cost += c.computeFiltersCost(join.On, util.FastIntMap{}) + // We don't add the result of computeFiltersCost to perRowCost because + // otherwise the 1 that is added to rowsProcessed would either have + // to be removed or be multiplied by all of the other various costs in + // perRowCost. To be consistent with other joins, keep it separate. + cost += c.computeFiltersCost(join.On, util.FastIntMap{}) * memo.Cost(1+rowsProcessed) return cost } +// computeFiltersCost returns the per-row cost of executing a filter. Callers +// of this function should multiply its output by the number of rows expected +// to be filtered + 1. The + 1 accounts for a setup cost and is useful for +// comparing costs of filters with very low row counts. +// TODO: account for per-row costs in all callers. func (c *coster) computeFiltersCost(filters memo.FiltersExpr, eqMap util.FastIntMap) memo.Cost { var cost memo.Cost for i := range filters { @@ -515,11 +571,18 @@ func (c *coster) computeFiltersCost(filters memo.FiltersExpr, eqMap util.FastInt // them. They do not cost anything. continue } + case opt.FunctionOp: + if IsGeoIndexFunction(f.Condition) { + cost += geoFnCost + } + // TODO(mjibson): do we need to cost other functions? } // Add a constant "setup" cost per ON condition to account for the fact that // the rowsProcessed estimate alone cannot effectively discriminate between // plans when RowCount is too small. + // TODO: perhaps separate the one-time and per-row costs and + // return them separately. cost += cpuCostFactor } return cost diff --git a/pkg/sql/opt/xform/custom_funcs.go b/pkg/sql/opt/xform/custom_funcs.go index 08c6fb21b438..7794a022158e 100644 --- a/pkg/sql/opt/xform/custom_funcs.go +++ b/pkg/sql/opt/xform/custom_funcs.go @@ -1651,6 +1651,12 @@ var geoRelationshipMap = map[string]geoindex.RelationshipType{ // IsGeoIndexFunction returns true if the given function is a geospatial // function that can be index-accelerated. func (c *CustomFuncs) IsGeoIndexFunction(fn opt.ScalarExpr) bool { + return IsGeoIndexFunction(fn) +} + +// IsGeoIndexFunction returns true if the given function is a geospatial +// function that can be index-accelerated. +func IsGeoIndexFunction(fn opt.ScalarExpr) bool { function := fn.(*memo.FunctionExpr) _, ok := geoRelationshipMap[function.Name] return ok diff --git a/pkg/sql/opt/xform/testdata/rules/join b/pkg/sql/opt/xform/testdata/rules/join index cb6f1d6ad983..1bb622d9210e 100644 --- a/pkg/sql/opt/xform/testdata/rules/join +++ b/pkg/sql/opt/xform/testdata/rules/join @@ -1693,16 +1693,16 @@ memo (optimized, ~23KB, required=[presentation: name:13,popn_per_sqkm:16]) ├── G1: (project G2 G3 name) │ └── [presentation: name:13,popn_per_sqkm:16] │ ├── best: (project G2 G3 name) - │ └── cost: 5110.53 + │ └── cost: 6717.66 ├── G2: (group-by G4 G5 cols=(13,14)) │ └── [] │ ├── best: (group-by G4 G5 cols=(13,14)) - │ └── cost: 5110.48 + │ └── cost: 6717.61 ├── G3: (projections G6) ├── G4: (inner-join G7 G8 G9) (inner-join G8 G7 G9) (lookup-join G10 G9 nyc_census_blocks,keyCols=[1],outCols=(3,9,10,12-14)) │ └── [] │ ├── best: (lookup-join G10 G9 nyc_census_blocks,keyCols=[1],outCols=(3,9,10,12-14)) - │ └── cost: 4903.53 + │ └── cost: 6510.67 ├── G5: (aggregations G11) ├── G6: (div G12 G13) ├── G7: (scan c,cols=(3,9,10)) @@ -1717,7 +1717,7 @@ memo (optimized, ~23KB, required=[presentation: name:13,popn_per_sqkm:16]) ├── G10: (geo-lookup-join G8 G18 nyc_census_blocks@nyc_census_blocks_geo_idx) │ └── [] │ ├── best: (geo-lookup-join G8 G18 nyc_census_blocks@nyc_census_blocks_geo_idx) - │ └── cost: 147.36 + │ └── cost: 1754.40 ├── G11: (sum G19) ├── G12: (variable sum) ├── G13: (div G20 G21)