Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions pkg/sql/opt/memo/statistics_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2358,6 +2358,30 @@ func (sb *statisticsBuilder) rowsProcessed(e RelExpr) float64 {
withoutOn := e.Memo().MemoizeLookupJoin(t.Input, nil /* on */, lookupJoinPrivate)
return withoutOn.Relational().Stats.RowCount

case *GeoLookupJoinExpr:
var lookupJoinPrivate *GeoLookupJoinPrivate
switch t.JoinType {
case opt.SemiJoinOp, opt.SemiJoinApplyOp, opt.AntiJoinOp, opt.AntiJoinApplyOp:
// The number of rows processed for semi and anti joins is closer to the
// number of output rows for an equivalent inner join.
copy := t.GeoLookupJoinPrivate
copy.JoinType = semiAntiJoinToInnerJoin(t.JoinType)
lookupJoinPrivate = &copy

default:
if t.On.IsTrue() {
// If there are no additional ON filters, the number of rows processed
// equals the number of output rows.
return e.Relational().Stats.RowCount
}
lookupJoinPrivate = &t.GeoLookupJoinPrivate
}

// We need to determine the row count of the join before the
// ON conditions are applied.
withoutOn := e.Memo().MemoizeGeoLookupJoin(t.Input, nil /* on */, lookupJoinPrivate)
return withoutOn.Relational().Stats.RowCount

case *MergeJoinExpr:
var mergeJoinPrivate *MergeJoinPrivate
switch t.JoinType {
Expand Down
73 changes: 68 additions & 5 deletions pkg/sql/opt/xform/coster.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ const (
// If the final expression has this cost or larger, it means that there was no
// plan that could satisfy the hints.
hugeCost memo.Cost = 1e100

// Some benchmarks showed that some geo functions were atleast 10 times
// slower than some float functions, so this is a somewhat data-backed
// guess.
geoFnCost = cpuCostFactor * 10
)

// Init initializes a new coster structure with the given memo.
Expand Down Expand Up @@ -183,7 +188,7 @@ func (c *coster) ComputeCost(candidate memo.RelExpr, required *physical.Required
cost = c.computeLookupJoinCost(candidate.(*memo.LookupJoinExpr), required)

case opt.GeoLookupJoinOp:
cost = c.computeGeoLookupJoinCost(candidate.(*memo.GeoLookupJoinExpr))
cost = c.computeGeoLookupJoinCost(candidate.(*memo.GeoLookupJoinExpr), required)

case opt.ZigzagJoinOp:
cost = c.computeZigzagJoinCost(candidate.(*memo.ZigzagJoinExpr))
Expand Down Expand Up @@ -478,22 +483,73 @@ func (c *coster) computeLookupJoinCost(
return cost
}

func (c *coster) computeGeoLookupJoinCost(join *memo.GeoLookupJoinExpr) memo.Cost {
func (c *coster) computeGeoLookupJoinCost(
join *memo.GeoLookupJoinExpr, required *physical.Required,
) memo.Cost {
lookupCount := join.Input.Relational().Stats.RowCount

// Take into account that the "internal" row count is higher, according to
// the selectivities of the conditions. In particular, we need to ignore
// the conditions that don't affect the number of rows processed.
// A contrived example, where gid is a SERIAL PK:
// nyc_census_blocks c JOIN nyc_neighborhoods n ON
// ST_Intersects(c.geom, n.geom) AND c.gid < n.gid
// which can become a lookup join with left-over condition c.gid <
// n.gid.
rowsProcessed, ok := c.mem.RowsProcessed(join)
if !ok {
// We shouldn't ever get here. Since we don't allow the memo
// to be optimized twice, the coster should never be used after
// logPropsBuilder.clear() is called.
panic(errors.AssertionFailedf("could not get rows processed for geolookup join"))
}

// Lookup joins can return early if enough rows have been found. An otherwise
// expensive lookup join might have a lower cost if its limit hint estimates
// that most rows will not be needed.
if required.LimitHint != 0 && lookupCount > 0 {
outputRows := join.Relational().Stats.RowCount
unlimitedLookupCount := lookupCount
lookupCount = lookupJoinInputLimitHint(unlimitedLookupCount, outputRows, required.LimitHint)
// We scale the number of rows processed by the same factor (we are
// calculating the average number of rows processed per lookup and
// multiplying by the new lookup count).
rowsProcessed = (rowsProcessed / unlimitedLookupCount) * lookupCount
}

// The rows in the (left) input are used to probe into the (right) table.
// Since the matching rows in the table may not all be in the same range, this
// counts as random I/O.
perLookupCost := memo.Cost(randIOCostFactor)
// Since inverted indexes can't form a key, execution will have to
// limit KV batches which prevents running requests to multiple nodes
// in parallel. An experiment on a 4 node cluster with a table with
// 100k rows split into 100 ranges showed that a "non-parallel" lookup
// join is about 5 times slower.
perLookupCost *= 5
cost := memo.Cost(lookupCount) * perLookupCost

// TODO: support GeoLookupJoinExpr in c.mem.RowsProcessed. See
// computeLookupJoinCost.
// Each lookup might retrieve many rows; add the IO cost of retrieving the
// rows (relevant when we expect many resulting rows per lookup) and the CPU
// cost of emitting the rows.
numLookupCols := join.Cols.Difference(join.Input.Relational().OutputCols).Len()
perRowCost := lookupJoinRetrieveRowCost +
c.rowScanCost(join.Table, join.Index, numLookupCols)
cost += memo.Cost(rowsProcessed) * perRowCost

cost += c.computeFiltersCost(join.On, util.FastIntMap{})
// We don't add the result of computeFiltersCost to perRowCost because
// otherwise the 1 that is added to rowsProcessed would either have
// to be removed or be multiplied by all of the other various costs in
// perRowCost. To be consistent with other joins, keep it separate.
cost += c.computeFiltersCost(join.On, util.FastIntMap{}) * memo.Cost(1+rowsProcessed)
return cost
}

// computeFiltersCost returns the per-row cost of executing a filter. Callers
// of this function should multiply its output by the number of rows expected
// to be filtered + 1. The + 1 accounts for a setup cost and is useful for
// comparing costs of filters with very low row counts.
// TODO: account for per-row costs in all callers.
func (c *coster) computeFiltersCost(filters memo.FiltersExpr, eqMap util.FastIntMap) memo.Cost {
var cost memo.Cost
for i := range filters {
Expand All @@ -515,11 +571,18 @@ func (c *coster) computeFiltersCost(filters memo.FiltersExpr, eqMap util.FastInt
// them. They do not cost anything.
continue
}
case opt.FunctionOp:
if IsGeoIndexFunction(f.Condition) {
cost += geoFnCost
}
// TODO(mjibson): do we need to cost other functions?
}

// Add a constant "setup" cost per ON condition to account for the fact that
// the rowsProcessed estimate alone cannot effectively discriminate between
// plans when RowCount is too small.
// TODO: perhaps separate the one-time and per-row costs and
// return them separately.
cost += cpuCostFactor
}
return cost
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/opt/xform/custom_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1651,6 +1651,12 @@ var geoRelationshipMap = map[string]geoindex.RelationshipType{
// IsGeoIndexFunction returns true if the given function is a geospatial
// function that can be index-accelerated.
func (c *CustomFuncs) IsGeoIndexFunction(fn opt.ScalarExpr) bool {
return IsGeoIndexFunction(fn)
}

// IsGeoIndexFunction returns true if the given function is a geospatial
// function that can be index-accelerated.
func IsGeoIndexFunction(fn opt.ScalarExpr) bool {
function := fn.(*memo.FunctionExpr)
_, ok := geoRelationshipMap[function.Name]
return ok
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/opt/xform/testdata/rules/join
Original file line number Diff line number Diff line change
Expand Up @@ -1693,16 +1693,16 @@ memo (optimized, ~23KB, required=[presentation: name:13,popn_per_sqkm:16])
├── G1: (project G2 G3 name)
│ └── [presentation: name:13,popn_per_sqkm:16]
│ ├── best: (project G2 G3 name)
│ └── cost: 5110.53
│ └── cost: 6717.66
├── G2: (group-by G4 G5 cols=(13,14))
│ └── []
│ ├── best: (group-by G4 G5 cols=(13,14))
│ └── cost: 5110.48
│ └── cost: 6717.61
├── G3: (projections G6)
├── G4: (inner-join G7 G8 G9) (inner-join G8 G7 G9) (lookup-join G10 G9 nyc_census_blocks,keyCols=[1],outCols=(3,9,10,12-14))
│ └── []
│ ├── best: (lookup-join G10 G9 nyc_census_blocks,keyCols=[1],outCols=(3,9,10,12-14))
│ └── cost: 4903.53
│ └── cost: 6510.67
├── G5: (aggregations G11)
├── G6: (div G12 G13)
├── G7: (scan c,cols=(3,9,10))
Expand All @@ -1717,7 +1717,7 @@ memo (optimized, ~23KB, required=[presentation: name:13,popn_per_sqkm:16])
├── G10: (geo-lookup-join G8 G18 nyc_census_blocks@nyc_census_blocks_geo_idx)
│ └── []
│ ├── best: (geo-lookup-join G8 G18 nyc_census_blocks@nyc_census_blocks_geo_idx)
│ └── cost: 147.36
│ └── cost: 1754.40
├── G11: (sum G19)
├── G12: (variable sum)
├── G13: (div G20 G21)
Expand Down