diff --git a/src/create_next_fn.js b/src/create_next_fn.js index 15327a1..5a79486 100644 --- a/src/create_next_fn.js +++ b/src/create_next_fn.js @@ -2,7 +2,8 @@ const merge = require('deepmerge'); const { hashify, getIDBError } = require('./util.js'), filter = require('./filter.js'), - sort = require('./sort.js'); + sort = require('./sort.js'), + limit = require('./limit.js'); const { build, @@ -74,30 +75,6 @@ const buildPredicates = (pipeline) => { return new_pipeline; }; -const initPredAndSortSpec = (config) => { - const { pipeline } = config, - preds = [], - sort_specs = []; - - let i = 0; - - for (let [fn, arg] of pipeline) { - if (fn === sort) { sort_specs.push(arg); } - else if (fn === filter) { preds.push(arg); } - else { break; } - - i++; - } - - pipeline.splice(0, i); - - config.pred = joinPredicates(preds); - - if (sort_specs.length) { - config.sort_spec = sort_specs.reduce(merge, {}); - } -}; - const getClauses = (col, pred) => { if (!pred) { return []; } @@ -161,7 +138,15 @@ const initSort = (config) => { if (new_clauses.length) { config.clauses = new_clauses; } else { - pipeline.push([sort, spec]); + pipeline.unshift([sort, spec]); + } +}; + +const initLimit = (config) => { + const { limit_num, pipeline } = config; + + if (config.hasOwnProperty('limit_num')) { + pipeline.push([limit, limit_num]); } }; @@ -282,7 +267,7 @@ const createParallelNextFn = (config) => { }; const spec = config.sort_spec; - if (spec) { config.pipeline.push([sort, spec]); } + if (spec) { config.pipeline.unshift([sort, spec]); } return next; }; @@ -315,7 +300,32 @@ module.exports = (cur) => { pipeline }; - initPredAndSortSpec(config); + const preds = [], + sort_specs = [], + limits = []; + + let i = 0; + + for (let [fn, arg] of pipeline) { + if (fn === sort) { sort_specs.push(arg); } + else if (fn === filter) { preds.push(arg); } + else if (fn === limit) { limits.push(arg); } + else { break; } + + i++; + } + + pipeline.splice(0, i); + + config.pred = joinPredicates(preds); + + if (sort_specs.length) { + config.sort_spec = sort_specs.reduce(merge, {}); + } + + if (limits.length) { + config.limit_num = limits.reduce((a, b) => a + b); + } let next; @@ -325,9 +335,10 @@ module.exports = (cur) => { initClauses(config); initHint(config); initSort(config); + initLimit(config); next = createNextFn(config); } - + return addPipelineStages(config, next); };