Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion source/libs/executor/src/virtualtablescanoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ int32_t doVirtualTableMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
}

VTS_ERR_RET(copyDataBlock(pDataBlock, p));
qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%" PRId64 , GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId,
qDebug("%s %s get sorted block, groupId:0x%" PRIx64 " rows:%" PRId64 , GET_TASKID(pTaskInfo), __func__, pDataBlock->info.id.groupId,
pDataBlock->info.rows);

*pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
Expand Down
47 changes: 34 additions & 13 deletions source/libs/parser/src/parTranslater.c
Original file line number Diff line number Diff line change
Expand Up @@ -6867,20 +6867,28 @@ static int32_t getTimeRange(SNode** pPrimaryKeyCond, STimeWindow* pTimeRange, bo
typedef struct SConditionOnlyPhAndConstContext {
bool onlyPhAndConst;
bool hasPhOrConst;
bool onlyConst;
} SConditionOnlyPhAndConstContext;

static EDealRes conditionOnlyPhAndConstImpl(SNode* pNode, void* pContext) {
SConditionOnlyPhAndConstContext* pCxt = (SConditionOnlyPhAndConstContext*)pContext;
if (nodeType(pNode) == QUERY_NODE_VALUE) {
pCxt->onlyPhAndConst &= true;
pCxt->hasPhOrConst = true;
pCxt->onlyConst &= true;
} else if (nodeType(pNode) == QUERY_NODE_FUNCTION) {
SFunctionNode *pFunc = (SFunctionNode*)pNode;
if (fmIsPlaceHolderFunc(pFunc->funcId) || pFunc->funcType == FUNCTION_TYPE_NOW || pFunc->funcType == FUNCTION_TYPE_TODAY || fmIsScalarFunc(pFunc->funcId)) {
if (fmIsPlaceHolderFunc(pFunc->funcId)) {
pCxt->onlyPhAndConst &= true;
pCxt->hasPhOrConst = true;
pCxt->onlyConst = false;
} else if (pFunc->funcType == FUNCTION_TYPE_NOW || pFunc->funcType == FUNCTION_TYPE_TODAY || fmIsScalarFunc(pFunc->funcId)) {
pCxt->onlyPhAndConst &= true;
pCxt->hasPhOrConst = true;
pCxt->onlyConst &= true;
} else {
pCxt->onlyPhAndConst = false;
pCxt->onlyConst = false;
}
}
return DEAL_RES_CONTINUE;
Expand All @@ -6889,18 +6897,20 @@ static EDealRes conditionOnlyPhAndConstImpl(SNode* pNode, void* pContext) {
typedef struct SFilterExtractTsContext {
SNodeList* pStart;
SNodeList* pEnd;
bool onlyTsConst;
} SFilterExtractTsContext;

static bool filterExtractTsNeedCollect(SNode* pLeft, SNode* pRight) {
static bool filterExtractTsNeedCollect(SNode* pLeft, SNode* pRight, bool* pOnlyTsConst) {
if (nodeType(pLeft) == QUERY_NODE_COLUMN) {
SColumnNode* pCol = (SColumnNode*)pLeft;
if (pCol->colType != COLUMN_TYPE_COLUMN || pCol->colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
return false;
}

SConditionOnlyPhAndConstContext cxt = {true, false};
SConditionOnlyPhAndConstContext cxt = {true, false, true};
nodesWalkExpr(pRight, conditionOnlyPhAndConstImpl, &cxt);
if (cxt.onlyPhAndConst && cxt.hasPhOrConst) {
*pOnlyTsConst &= cxt.onlyConst;
if (cxt.onlyPhAndConst && cxt.hasPhOrConst && (((SExprNode*)pRight)->resType.type == TSDB_DATA_TYPE_TIMESTAMP)) {
return true;
} else {
return false;
Expand All @@ -6915,11 +6925,12 @@ EDealRes filterExtractTsCondImpl(SNode** pNode, void* pContext) {
switch(nodeType(*pNode)) {
case QUERY_NODE_OPERATOR: {
SOperatorNode* pOperator = (SOperatorNode*)*pNode;
PAR_ERR_JRET(scalarConvertOpValueNodeTs(pOperator));
if (pOperator->opType == OP_TYPE_LOWER_EQUAL ||
pOperator->opType == OP_TYPE_LOWER_THAN ||
pOperator->opType == OP_TYPE_GREATER_EQUAL ||
pOperator->opType == OP_TYPE_GREATER_THAN) {
if (filterExtractTsNeedCollect(pOperator->pLeft, pOperator->pRight)) {
if (filterExtractTsNeedCollect(pOperator->pLeft, pOperator->pRight, &pCxt->onlyTsConst)) {
SValueNode *pVal = NULL;
if (pOperator->opType == OP_TYPE_LOWER_EQUAL || pOperator->opType == OP_TYPE_LOWER_THAN) {
PAR_ERR_JRET(nodesListMakeAppend(&pCxt->pEnd, *pNode));
Expand All @@ -6929,7 +6940,7 @@ EDealRes filterExtractTsCondImpl(SNode** pNode, void* pContext) {
PAR_ERR_JRET(nodesMakeValueNodeFromBool(true, &pVal));
*pNode = (SNode*)pVal;
return DEAL_RES_IGNORE_CHILD;
} else if (filterExtractTsNeedCollect(pOperator->pRight, pOperator->pLeft)) {
} else if (filterExtractTsNeedCollect(pOperator->pRight, pOperator->pLeft, &pCxt->onlyTsConst)) {
SValueNode *pVal = NULL;
if (pOperator->opType == OP_TYPE_LOWER_EQUAL || pOperator->opType == OP_TYPE_LOWER_THAN) {
TSWAP(pOperator->pLeft, pOperator->pRight);
Expand All @@ -6947,7 +6958,7 @@ EDealRes filterExtractTsCondImpl(SNode** pNode, void* pContext) {
return DEAL_RES_CONTINUE;
}
} else if (pOperator->opType == OP_TYPE_EQUAL) {
if (filterExtractTsNeedCollect(pOperator->pLeft, pOperator->pRight)) {
if (filterExtractTsNeedCollect(pOperator->pLeft, pOperator->pRight, &pCxt->onlyTsConst)) {
SNode* startNode = NULL;
SNode* endNode = NULL;
SValueNode* pVal = NULL;
Expand All @@ -6960,7 +6971,7 @@ EDealRes filterExtractTsCondImpl(SNode** pNode, void* pContext) {
PAR_ERR_JRET(nodesMakeValueNodeFromBool(true, &pVal));
*pNode = (SNode*)pVal;
return DEAL_RES_IGNORE_CHILD;
} else if (filterExtractTsNeedCollect(pOperator->pRight, pOperator->pLeft)) {
} else if (filterExtractTsNeedCollect(pOperator->pRight, pOperator->pLeft, &pCxt->onlyTsConst)) {
SNode* startNode = NULL;
SNode* endNode = NULL;
SValueNode* pVal = NULL;
Expand Down Expand Up @@ -7042,12 +7053,13 @@ static bool tsRangeSameToWindowRange(SNode* pCond, bool start, bool equal) {
}
}

static int32_t filterExtractTsCond(SNode** pCond, SNode** pTimeRangeExpr, bool leftEq, bool rightEq) {
static int32_t filterExtractTsCond(SNode** pCond, SNode** pTimeRangeExpr, bool leftEq, bool rightEq, bool *onlyTsConst) {
int32_t code = TSDB_CODE_SUCCESS;
SNode* pNew = NULL;
SFilterExtractTsContext pCxt = {0};
SFilterExtractTsContext pCxt = {.onlyTsConst = true};

nodesRewriteExpr(pCond, filterExtractTsCondImpl, &pCxt);
*onlyTsConst = pCxt.onlyTsConst;
if (!pCxt.pStart && !pCxt.pEnd) {
return TSDB_CODE_SUCCESS;
}
Expand Down Expand Up @@ -7093,11 +7105,20 @@ static int32_t getQueryTimeRange(STranslateContext* pCxt, SNode** pWhere, STimeW
extractJoinCond = false;
}
}
bool onlyTsConst = false;
if (inStreamCalcClause(pCxt) && nodeType(pFromTable) != QUERY_NODE_TEMP_TABLE && extractJoinCond) {
PAR_ERR_JRET(filterExtractTsCond(&pCond, pTimeRangeExpr, pCxt->streamInfo.extLeftEq, pCxt->streamInfo.extRightEq));
PAR_ERR_JRET(filterExtractTsCond(&pCond, pTimeRangeExpr, pCxt->streamInfo.extLeftEq, pCxt->streamInfo.extRightEq, &onlyTsConst));
// some node may be replaced
TSWAP(*pWhere, pCond);
goto _return;
if (onlyTsConst) {
nodesDestroyNode(pCond);
PAR_ERR_JRET(nodesCloneNode(*pWhere, &pCond));
nodesDestroyNode(*pTimeRangeExpr);
*pTimeRangeExpr = NULL;
// we can extract time range to pTimerange but not time range expr
} else {
TSWAP(*pWhere, pCond);
goto _return;
}
}

PAR_ERR_JRET(filterPartitionCond(&pCond, &pPrimaryKeyCond, NULL, NULL, NULL));
Expand Down
87 changes: 55 additions & 32 deletions source/libs/planner/src/planLogicCreater.c
Original file line number Diff line number Diff line change
Expand Up @@ -2156,60 +2156,83 @@ static bool placeHolderCanMakeExternalWindow(int32_t startType, int32_t endType)
}
}

static bool filterHasPlaceHolderRange(SOperatorNode *pStart, SOperatorNode *pEnd) {
SNode* pStartLeft = pStart->pLeft;
SNode* pStartRight = pStart->pRight;
SNode* pEndLeft = pEnd->pLeft;
SNode* pEndRight = pEnd->pRight;
static bool filterHasPlaceHolderRange(SOperatorNode *pOperator) {
SNode* pOpLeft = pOperator->pLeft;
SNode* pOpRight = pOperator->pRight;

if (pStartLeft == NULL || pStartRight == NULL || pEndLeft == NULL || pEndRight == NULL) {
if (pOpLeft == NULL || pOpRight == NULL) {
return false;
}

if (nodeType(pStartLeft) == QUERY_NODE_COLUMN && nodeType(pEndLeft) == QUERY_NODE_COLUMN) {
SColumnNode* pLeftCol = (SColumnNode*)pStartLeft;
SColumnNode* pRightCol = (SColumnNode*)pEndLeft;
if (pLeftCol->colType != COLUMN_TYPE_COLUMN || pLeftCol->colId != PRIMARYKEY_TIMESTAMP_COL_ID ||
pRightCol->colType != COLUMN_TYPE_COLUMN || pRightCol->colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
if (nodeType(pOpLeft) == QUERY_NODE_COLUMN) {
SColumnNode* pTsCol = (SColumnNode*)pOpLeft;
if (pTsCol->colType != COLUMN_TYPE_COLUMN || pTsCol->colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
return false;
}
} else {
return false;
}

SConditionCheckContext startCxt = {.hasNotBasicOp = false,
.hasNegativeConst = false,
.hasOtherFunc = false,
.placeholderAtRight = false,
.placeholderType = 0};
SConditionCheckContext opCxt = {.hasNotBasicOp = false,
.hasNegativeConst = false,
.hasOtherFunc = false,
.placeholderAtRight = false,
.placeholderType = 0};

SConditionCheckContext endCxt = {.hasNotBasicOp = false,
.hasNegativeConst = false,
.hasOtherFunc = false,
.placeholderAtRight = false,
.placeholderType = 0};
nodesWalkExpr(pOpRight, conditionOnlyPhAndConstImpl, &opCxt);
if (opCxt.hasNotBasicOp || opCxt.hasNegativeConst || opCxt.hasOtherFunc || opCxt.placeholderAtRight) {
return false;
}
return true;
}

nodesWalkExpr(pStartRight, conditionOnlyPhAndConstImpl, &startCxt);
nodesWalkExpr(pEndRight, conditionOnlyPhAndConstImpl, &endCxt);
if (startCxt.hasNotBasicOp || startCxt.hasNegativeConst || startCxt.hasOtherFunc || startCxt.placeholderAtRight ||
endCxt.hasNotBasicOp || endCxt.hasNegativeConst || endCxt.hasOtherFunc || endCxt.placeholderAtRight ||
!placeHolderCanMakeExternalWindow(startCxt.placeholderType, endCxt.placeholderType)) {
static bool logicConditionSatisfyExternalWindow(SLogicConditionNode *pLogicCond) {
if (pLogicCond->condType != LOGIC_COND_TYPE_AND || LIST_LENGTH(pLogicCond->pParameterList) == 0) {
return false;
}
SNode *pOperator = NULL;
FOREACH(pOperator, pLogicCond->pParameterList) {
if (nodeType(pOperator) != QUERY_NODE_OPERATOR) {
return false;
}
if (!filterHasPlaceHolderRange((SOperatorNode*)pOperator)) {
return false;
}
}
return true;
}


static bool timeRangeSatisfyExternalWindow(STimeRangeNode* pTimeRange) {
if (!pTimeRange || !pTimeRange->pStart || !pTimeRange->pEnd ||
nodeType(pTimeRange->pStart) != QUERY_NODE_OPERATOR ||
nodeType(pTimeRange->pEnd) != QUERY_NODE_OPERATOR) {
if (!pTimeRange || !pTimeRange->pStart || !pTimeRange->pEnd) {
return false;
}

if (nodeType(pTimeRange->pStart) == QUERY_NODE_OPERATOR) {
if (!filterHasPlaceHolderRange((SOperatorNode*)pTimeRange->pStart)) {
return false;
}
} else if (nodeType(pTimeRange->pStart) == QUERY_NODE_LOGIC_CONDITION) {
if (!logicConditionSatisfyExternalWindow((SLogicConditionNode*)pTimeRange->pStart)) {
return false;
}
} else {
return false;
}

SOperatorNode *pStart = (SOperatorNode *)(pTimeRange->pStart);
SOperatorNode *pEnd = (SOperatorNode *)(pTimeRange->pEnd);
if (nodeType(pTimeRange->pEnd) == QUERY_NODE_OPERATOR) {
if (!filterHasPlaceHolderRange((SOperatorNode*)pTimeRange->pEnd)) {
return false;
}
} else if (nodeType(pTimeRange->pEnd) == QUERY_NODE_LOGIC_CONDITION) {
if (!logicConditionSatisfyExternalWindow((SLogicConditionNode*)pTimeRange->pEnd)) {
return false;
}
} else {
return false;
}

return filterHasPlaceHolderRange(pStart, pEnd);
return true;
}

static int32_t conditionHasPlaceHolder(SNode* pNode, bool* pHasPlaceHolder) {
Expand Down
10 changes: 6 additions & 4 deletions source/libs/scalar/src/scalar.c
Original file line number Diff line number Diff line change
Expand Up @@ -1282,12 +1282,12 @@ static int32_t sclCalcStreamExtWinsTimeRange(SScalarCtx *ctx, SOperator
if (1 == ctx->stream.extWinType) {
if (node->opType == OP_TYPE_GREATER_THAN) {
for (int32_t i = 0; i < res->numOfRows; ++i) {
ctx->stream.pWins[i].tw.skey = ((int64_t*)res->columnData->pData)[i] + 1;
ctx->stream.pWins[i].tw.skey = (-1 == ctx->stream.pWins[i].winOutIdx) ? TMAX(((int64_t*)res->columnData->pData)[i] + 1, ctx->stream.pWins[i].tw.skey) : (((int64_t*)res->columnData->pData)[i] + 1);
ctx->stream.pWins[i].winOutIdx = -1;
}
} else if (node->opType == OP_TYPE_GREATER_EQUAL) {
for (int32_t i = 0; i < res->numOfRows; ++i) {
ctx->stream.pWins[i].tw.skey = ((int64_t*)res->columnData->pData)[i];
ctx->stream.pWins[i].tw.skey = (-1 == ctx->stream.pWins[i].winOutIdx) ? TMAX(((int64_t*)res->columnData->pData)[i], ctx->stream.pWins[i].tw.skey) : (((int64_t*)res->columnData->pData)[i]);
ctx->stream.pWins[i].winOutIdx = -1;
}
} else {
Expand All @@ -1301,11 +1301,13 @@ static int32_t sclCalcStreamExtWinsTimeRange(SScalarCtx *ctx, SOperator
// consider triggerType and keep the ekey exclude
if (node->opType == OP_TYPE_LOWER_THAN) {
for (int32_t i = 0; i < res->numOfRows; ++i) {
ctx->stream.pWins[i].tw.ekey = ((int64_t*)res->columnData->pData)[i];
ctx->stream.pWins[i].tw.ekey = (-2 == ctx->stream.pWins[i].winOutIdx) ? TMIN(((int64_t*)res->columnData->pData)[i], ctx->stream.pWins[i].tw.ekey) : (((int64_t*)res->columnData->pData)[i]);
ctx->stream.pWins[i].winOutIdx = -2;
}
} else if (node->opType == OP_TYPE_LOWER_EQUAL) {
for (int32_t i = 0; i < res->numOfRows; ++i) {
ctx->stream.pWins[i].tw.ekey = ((int64_t*)res->columnData->pData)[i] + 1;
ctx->stream.pWins[i].tw.ekey = (-2 == ctx->stream.pWins[i].winOutIdx) ? TMIN(((int64_t*)res->columnData->pData)[i] + 1, ctx->stream.pWins[i].tw.ekey) : (((int64_t*)res->columnData->pData)[i] + 1);
ctx->stream.pWins[i].winOutIdx = -2;
}
} else {
qError("invalid op type:%d in ext win range end expr", node->opType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1232,6 +1232,14 @@ def createStreams(self):
)
self.streams.append(stream)

stream = StreamItem(
id=133,
stream="create stream rdb.s133 count_window(2) from tdb.t1 into rdb.r133 as select * from tdb.t1 where _c0 >= _twend and _c0 >= _twstart and _c0 <= _twend;",
res_query="select * from rdb.r133",
exp_query="select cols(last(ts), *) from tdb.t1 count_window(2) limit 4;",
)
self.streams.append(stream)

tdLog.info(f"create total:{len(self.streams)} streams")
for stream in self.streams:
stream.createStream()
Expand Down
Loading