mirror of https://github.com/mongodb/mongo
SERVER-86485 Refactor SlotBasedStageBuilder::buildWindow()
GitOrigin-RevId: 974bad19782e453707d9a7950a2656210657bfeb
This commit is contained in:
parent
a7194a501c
commit
4d46168300
|
|
@ -4637,14 +4637,9 @@ std::unique_ptr<sbe::EExpression> getDefaultValueExpr(const WindowFunctionStatem
|
|||
MONGO_UNREACHABLE;
|
||||
}
|
||||
}
|
||||
} // namespace
|
||||
|
||||
std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder::buildWindow(
|
||||
const QuerySolutionNode* root, const PlanStageReqs& reqs) {
|
||||
auto windowNode = static_cast<const WindowNode*>(root);
|
||||
|
||||
SbBuilder b(_state, windowNode->nodeId());
|
||||
|
||||
std::tuple<bool, PlanStageReqs, PlanStageReqs> computeChildReqsForWindow(
|
||||
const PlanStageReqs& reqs, const WindowNode* windowNode) {
|
||||
auto reqFields = reqs.getFields();
|
||||
bool reqFieldsHasDottedPaths = std::any_of(reqFields.begin(), reqFields.end(), [](auto&& f) {
|
||||
return f.find('.') != std::string::npos;
|
||||
|
|
@ -4675,7 +4670,6 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
|
|||
return false;
|
||||
}();
|
||||
|
||||
auto child = root->children[0].get();
|
||||
auto childReqs =
|
||||
reqResult ? reqs.copyForChild().setResultObj() : reqs.copyForChild().clearResult();
|
||||
|
||||
|
|
@ -4685,17 +4679,39 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
|
|||
childReqs.setFields(getTopLevelFields(windowNode->sortByRequiredFields));
|
||||
childReqs.setFields(getTopLevelFields(windowNode->outputRequiredFields));
|
||||
|
||||
auto childStageOutput = build(child, childReqs);
|
||||
auto stage = std::move(childStageOutput.first);
|
||||
auto outputs = std::move(childStageOutput.second);
|
||||
auto rootSlotOpt = outputs.getResultObjIfExists();
|
||||
return {reqResult, std::move(childReqs), std::move(forwardingReqs)};
|
||||
}
|
||||
|
||||
// Create a tuple of slots for each new slot added.
|
||||
sbe::value::SlotVector currSlots = _data->metadataSlots.getSlotVector();
|
||||
sbe::value::SlotVector boundTestingSlots = _slotIdGenerator.generateMultiple(currSlots.size());
|
||||
std::vector<sbe::value::SlotVector> windowFrameFirstSlots;
|
||||
std::vector<sbe::value::SlotVector> windowFrameLastSlots;
|
||||
auto ensureSlotInBuffer = [&](sbe::value::SlotId slot) {
|
||||
class WindowStageBuilder {
|
||||
public:
|
||||
using SlotId = sbe::value::SlotId;
|
||||
using SlotVector = sbe::value::SlotVector;
|
||||
using StringDataEExprMap = StringDataMap<std::unique_ptr<sbe::EExpression>>;
|
||||
using BuildOutput =
|
||||
std::tuple<SbStage, std::vector<std::string>, SlotVector, StringMap<SlotId>>;
|
||||
|
||||
WindowStageBuilder(StageBuilderState& state,
|
||||
const PlanStageReqs& forwardingReqs,
|
||||
const PlanStageSlots& outputs,
|
||||
const WindowNode* wn,
|
||||
const CanonicalQuery& cq,
|
||||
boost::optional<SlotId> collatorSlot,
|
||||
SlotVector sv)
|
||||
: _state(state),
|
||||
_slotIdGenerator(*state.slotIdGenerator),
|
||||
forwardingReqs(forwardingReqs),
|
||||
outputs(outputs),
|
||||
rootSlotOpt(outputs.getResultObjIfExists()),
|
||||
windowNode(wn),
|
||||
_cq(cq),
|
||||
collatorSlot(collatorSlot),
|
||||
b(state, wn->nodeId()),
|
||||
currSlots(std::move(sv)),
|
||||
boundTestingSlots(_slotIdGenerator.generateMultiple(currSlots.size())) {}
|
||||
|
||||
BuildOutput build(SbStage stage);
|
||||
|
||||
size_t ensureSlotInBuffer(SlotId slot) {
|
||||
for (size_t i = 0; i < currSlots.size(); i++) {
|
||||
if (slot == currSlots[i]) {
|
||||
return i;
|
||||
|
|
@ -4710,28 +4726,29 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
|
|||
frameLastSlots.push_back(_slotIdGenerator.generate());
|
||||
}
|
||||
return currSlots.size() - 1;
|
||||
};
|
||||
auto registerFrameFirstSlots = [&]() {
|
||||
windowFrameFirstSlots.push_back(sbe::value::SlotVector());
|
||||
}
|
||||
|
||||
size_t registerFrameFirstSlots() {
|
||||
windowFrameFirstSlots.push_back(SlotVector());
|
||||
auto& frameFirstSlots = windowFrameFirstSlots.back();
|
||||
frameFirstSlots.clear();
|
||||
for (size_t i = 0; i < currSlots.size(); i++) {
|
||||
frameFirstSlots.push_back(_slotIdGenerator.generate());
|
||||
}
|
||||
return windowFrameFirstSlots.size() - 1;
|
||||
};
|
||||
auto registerFrameLastSlots = [&]() {
|
||||
windowFrameLastSlots.push_back(sbe::value::SlotVector());
|
||||
}
|
||||
|
||||
size_t registerFrameLastSlots() {
|
||||
windowFrameLastSlots.push_back(SlotVector());
|
||||
auto& frameLastSlots = windowFrameLastSlots.back();
|
||||
frameLastSlots.clear();
|
||||
for (size_t i = 0; i < currSlots.size(); i++) {
|
||||
frameLastSlots.push_back(_slotIdGenerator.generate());
|
||||
}
|
||||
return windowFrameLastSlots.size() - 1;
|
||||
};
|
||||
|
||||
auto collatorSlot = _state.getCollatorSlot();
|
||||
}
|
||||
|
||||
std::pair<SbStage, size_t> generatePartitionExpr(SbStage stage) {
|
||||
// Get stages for partition by.
|
||||
size_t partitionSlotCount = 0;
|
||||
if (windowNode->partitionBy) {
|
||||
|
|
@ -4747,24 +4764,31 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
|
|||
partitionExpr = b.makeLet(
|
||||
frameId,
|
||||
SbExpr::makeSeq(b.makeFillEmptyNull(std::move(partitionExpr))),
|
||||
b.makeIf(b.makeFunction("isArray"_sd, partitionName),
|
||||
b.makeIf(
|
||||
b.makeFunction("isArray"_sd, partitionName),
|
||||
b.makeFail(
|
||||
ErrorCodes::TypeMismatch,
|
||||
"An expression used to partition cannot evaluate to value of type array"),
|
||||
partitionName));
|
||||
|
||||
stage = sbe::makeProjectStage(
|
||||
std::move(stage), root->nodeId(), partitionSlot, partitionExpr.extractExpr(_state));
|
||||
stage = sbe::makeProjectStage(std::move(stage),
|
||||
windowNode->nodeId(),
|
||||
partitionSlot,
|
||||
partitionExpr.extractExpr(_state));
|
||||
}
|
||||
|
||||
return {std::move(stage), partitionSlotCount};
|
||||
}
|
||||
|
||||
void ensureForwardSlotsInBuffer() {
|
||||
// Calculate list of forward slots.
|
||||
for (auto forwardSlot : getSlotsOrderedByName(forwardingReqs, outputs)) {
|
||||
ensureSlotInBuffer(forwardSlot);
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate slot for document position based window bounds, and add corresponding stages.
|
||||
boost::optional<sbe::value::SlotId> documentBoundSlot;
|
||||
auto getDocumentBoundSlot = [&]() -> std::pair<sbe::value::SlotId, sbe::value::SlotId> {
|
||||
std::tuple<SbStage, SlotId, SlotId> getDocumentBoundSlot(SbStage stage) {
|
||||
if (!documentBoundSlot) {
|
||||
documentBoundSlot = _slotIdGenerator.generate();
|
||||
sbe::value::SlotMap<sbe::AggExprPair> aggExprPairs;
|
||||
|
|
@ -4775,12 +4799,10 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
|
|||
std::move(stage), std::move(aggExprPairs), windowNode->nodeId());
|
||||
}
|
||||
auto documentBoundSlotIdx = ensureSlotInBuffer(*documentBoundSlot);
|
||||
return {*documentBoundSlot, boundTestingSlots[documentBoundSlotIdx]};
|
||||
};
|
||||
return {std::move(stage), *documentBoundSlot, boundTestingSlots[documentBoundSlotIdx]};
|
||||
}
|
||||
|
||||
// Calculate sort-by slot, and add corresponding stages.
|
||||
boost::optional<sbe::value::SlotId> sortBySlot;
|
||||
auto getSortBySlot = [&]() -> std::pair<sbe::value::SlotId, sbe::value::SlotId> {
|
||||
std::tuple<SbStage, SlotId, SlotId> getSortBySlot(SbStage stage) {
|
||||
if (!sortBySlot) {
|
||||
sortBySlot = _slotIdGenerator.generate();
|
||||
tassert(7914602,
|
||||
|
|
@ -4796,21 +4818,20 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
|
|||
std::move(stage), windowNode->nodeId(), *sortBySlot, std::move(sortByExpr));
|
||||
}
|
||||
auto sortBySlotIdx = ensureSlotInBuffer(*sortBySlot);
|
||||
return {*sortBySlot, boundTestingSlots[sortBySlotIdx]};
|
||||
};
|
||||
return {std::move(stage), *sortBySlot, boundTestingSlots[sortBySlotIdx]};
|
||||
}
|
||||
|
||||
// Calculate slot for range and time range based window bounds
|
||||
boost::optional<sbe::value::SlotId> rangeBoundSlot;
|
||||
boost::optional<sbe::value::SlotId> timeRangeBoundSlot;
|
||||
auto getRangeBoundSlot =
|
||||
[&](boost::optional<TimeUnit> unit) -> std::pair<sbe::value::SlotId, sbe::value::SlotId> {
|
||||
std::tuple<SbStage, SlotId, SlotId> getRangeBoundSlot(SbStage stage,
|
||||
boost::optional<TimeUnit> unit) {
|
||||
auto projectRangeBoundSlot = [&](StringData typeCheckFn,
|
||||
std::unique_ptr<sbe::EExpression> failExpr) {
|
||||
auto slot = _slotIdGenerator.generate();
|
||||
auto sortBySlot = getSortBySlot().first;
|
||||
auto [outStage, sortBySlot, _] = getSortBySlot(std::move(stage));
|
||||
stage = std::move(outStage);
|
||||
|
||||
auto checkType = makeLocalBind(
|
||||
&_frameIdGenerator,
|
||||
_state.frameIdGenerator,
|
||||
[&](sbe::EVariable input) {
|
||||
return sbe::makeE<sbe::EIf>(makeFunction(typeCheckFn, input.clone()),
|
||||
input.clone(),
|
||||
|
|
@ -4831,7 +4852,8 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
|
|||
"Invalid range: Expected the sortBy field to be a date"));
|
||||
}
|
||||
auto timeRangeBoundSlotIdx = ensureSlotInBuffer(*timeRangeBoundSlot);
|
||||
return {*timeRangeBoundSlot, boundTestingSlots[timeRangeBoundSlotIdx]};
|
||||
return {
|
||||
std::move(stage), *timeRangeBoundSlot, boundTestingSlots[timeRangeBoundSlotIdx]};
|
||||
} else {
|
||||
if (!rangeBoundSlot) {
|
||||
rangeBoundSlot = projectRangeBoundSlot(
|
||||
|
|
@ -4841,29 +4863,21 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
|
|||
"Invalid range: Expected the sortBy field to be a number"));
|
||||
}
|
||||
auto rangeBoundSlotIdx = ensureSlotInBuffer(*rangeBoundSlot);
|
||||
return {*rangeBoundSlot, boundTestingSlots[rangeBoundSlotIdx]};
|
||||
return {std::move(stage), *rangeBoundSlot, boundTestingSlots[rangeBoundSlotIdx]};
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Creating window definitions, including the slots and expressions for the bounds and
|
||||
// accumulators.
|
||||
std::vector<sbe::WindowStage::Window> windows;
|
||||
std::vector<std::string> getWindowOutputPaths() {
|
||||
std::vector<std::string> windowFields;
|
||||
sbe::value::SlotVector windowFinalSlots;
|
||||
sbe::SlotExprPairVector windowFinalProjects;
|
||||
std::vector<boost::optional<size_t>> windowFrameFirstSlotIdx;
|
||||
std::vector<boost::optional<size_t>> windowFrameLastSlotIdx;
|
||||
StringMap<sbe::value::SlotId> outputPathMap;
|
||||
|
||||
// We project window function input arguments in order to avoid repeated evaluation
|
||||
// for both add and remove expressions.
|
||||
sbe::SlotExprPairVector windowArgProjects;
|
||||
|
||||
for (size_t i = 0; i < windowNode->outputFields.size(); i++) {
|
||||
sbe::WindowStage::Window window{};
|
||||
auto& outputField = windowNode->outputFields[i];
|
||||
windowFields.push_back(outputField.fieldName);
|
||||
}
|
||||
return windowFields;
|
||||
}
|
||||
|
||||
bool isWindowRemovable(const WindowBounds& windowBounds) const {
|
||||
// Check whether window is removable or not.
|
||||
auto isUnboundedBoundRemovable = [](const WindowBounds::Unbounded&) {
|
||||
return false;
|
||||
|
|
@ -4889,13 +4903,13 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
|
|||
isValueBoundRemovable},
|
||||
range.lower);
|
||||
};
|
||||
const auto& windowBounds = outputField.expr->bounds();
|
||||
auto removable = visit(OverloadedVisitor{isDocumentWindowRemovable, isRangeWindowRemovable},
|
||||
bool removable = visit(OverloadedVisitor{isDocumentWindowRemovable, isRangeWindowRemovable},
|
||||
windowBounds.bounds);
|
||||
return removable;
|
||||
}
|
||||
|
||||
// Create a fake accumulation statement for non-removable window bounds.
|
||||
auto accStmt = createFakeAccumulationStatement(_state, outputField);
|
||||
|
||||
std::tuple<SbStage, StringDataEExprMap, StringDataEExprMap> generateArgs(
|
||||
SbStage stage, const WindowFunctionStatement& outputField, bool removable) {
|
||||
// Get init expression arg for relevant functions
|
||||
auto getUnitArg = [&](window_function::ExpressionWithUnit* expr) {
|
||||
auto unit = expr->unitInMillis();
|
||||
|
|
@ -4983,8 +4997,11 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
|
|||
makeConstant(sbe::value::TypeTags::Null, 0));
|
||||
}
|
||||
} else if (accName == "$integral" || accName == "$derivative" || accName == "$linearFill") {
|
||||
auto [outStage, sortBySlot, _] = getSortBySlot(std::move(stage));
|
||||
stage = std::move(outStage);
|
||||
|
||||
argExprs.emplace(AccArgs::kInput, getArgExpr(outputField.expr->input().get()));
|
||||
argExprs.emplace(AccArgs::kSortBy, makeVariable(getSortBySlot().first));
|
||||
argExprs.emplace(AccArgs::kSortBy, makeVariable(sortBySlot));
|
||||
} else if (accName == "$rank" || accName == "$denseRank") {
|
||||
auto isAscending = windowNode->sortBy->front().isAscending;
|
||||
argExprs.emplace(AccArgs::kInput, getArgExpr(outputField.expr->input().get()));
|
||||
|
|
@ -5061,6 +5078,16 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
|
|||
argExprs.emplace("", getArgExpr(outputField.expr->input().get()));
|
||||
}
|
||||
|
||||
return {std::move(stage), std::move(initExprArgs), std::move(argExprs)};
|
||||
}
|
||||
|
||||
SbStage generateInitsAddsAndRemoves(SbStage stage,
|
||||
const WindowFunctionStatement& outputField,
|
||||
const AccumulationStatement& accStmt,
|
||||
bool removable,
|
||||
StringDataEExprMap initExprArgs,
|
||||
const StringDataEExprMap& argExprs,
|
||||
sbe::WindowStage::Window& window) {
|
||||
// Create init/add/remove expressions.
|
||||
auto cloneExprMap = [](const StringDataMap<std::unique_ptr<sbe::EExpression>>& exprMap) {
|
||||
StringDataMap<std::unique_ptr<sbe::EExpression>> exprMapClone;
|
||||
|
|
@ -5115,9 +5142,10 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
|
|||
window.addExprs.size() == window.removeExprs.size() &&
|
||||
window.removeExprs.size() == window.windowExprSlots.size());
|
||||
|
||||
return stage;
|
||||
}
|
||||
|
||||
// Build bound expressions and create window definitions.
|
||||
|
||||
void createFrameFirstAndLastSlots(const WindowFunctionStatement& outputField, bool removable) {
|
||||
// Create frame first and last slots if the window requires.
|
||||
if (outputField.expr->getOpName() == "$derivative") {
|
||||
windowFrameFirstSlotIdx.push_back(registerFrameFirstSlots());
|
||||
|
|
@ -5135,8 +5163,13 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
|
|||
windowFrameFirstSlotIdx.push_back(boost::none);
|
||||
windowFrameLastSlotIdx.push_back(boost::none);
|
||||
}
|
||||
}
|
||||
|
||||
auto makeOffsetBoundExpr = [&](sbe::value::SlotId boundSlot,
|
||||
SbStage generateBoundExprs(SbStage stage,
|
||||
const WindowFunctionStatement& outputField,
|
||||
const WindowBounds& windowBounds,
|
||||
sbe::WindowStage::Window& window) {
|
||||
auto makeOffsetBoundExpr = [&](SlotId boundSlot,
|
||||
std::pair<sbe::value::TypeTags, sbe::value::Value> offset =
|
||||
{sbe::value::TypeTags::Nothing, 0},
|
||||
boost::optional<TimeUnit> unit = boost::none) {
|
||||
|
|
@ -5164,8 +5197,8 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
|
|||
makeConstant(offset.first, offset.second));
|
||||
}
|
||||
};
|
||||
auto makeLowBoundExpr = [&](sbe::value::SlotId boundSlot,
|
||||
sbe::value::SlotId boundTestingSlot,
|
||||
auto makeLowBoundExpr = [&](SlotId boundSlot,
|
||||
SlotId boundTestingSlot,
|
||||
std::pair<sbe::value::TypeTags, sbe::value::Value> offset =
|
||||
{sbe::value::TypeTags::Nothing, 0},
|
||||
boost::optional<TimeUnit> unit = boost::none) {
|
||||
|
|
@ -5176,8 +5209,8 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
|
|||
makeOffsetBoundExpr(boundSlot, offset, unit)),
|
||||
makeConstant(sbe::value::TypeTags::NumberInt32, 0));
|
||||
};
|
||||
auto makeHighBoundExpr = [&](sbe::value::SlotId boundSlot,
|
||||
sbe::value::SlotId boundTestingSlot,
|
||||
auto makeHighBoundExpr = [&](SlotId boundSlot,
|
||||
SlotId boundTestingSlot,
|
||||
std::pair<sbe::value::TypeTags, sbe::value::Value> offset =
|
||||
{sbe::value::TypeTags::Nothing, 0},
|
||||
boost::optional<TimeUnit> unit = boost::none) {
|
||||
|
|
@ -5195,23 +5228,35 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
|
|||
window.highBoundExpr = nullptr;
|
||||
};
|
||||
auto makeLowCurrentExpr = [&](const WindowBounds::Current&) {
|
||||
auto [lowBoundSlot, lowBoundTestingSlot] = getDocumentBoundSlot();
|
||||
auto [outStage, lowBoundSlot, lowBoundTestingSlot] =
|
||||
getDocumentBoundSlot(std::move(stage));
|
||||
stage = std::move(outStage);
|
||||
|
||||
window.lowBoundExpr = makeLowBoundExpr(lowBoundSlot, lowBoundTestingSlot);
|
||||
};
|
||||
auto makeHighCurrentExpr = [&](const WindowBounds::Current&) {
|
||||
auto [highBoundSlot, highBoundTestingSlot] = getDocumentBoundSlot();
|
||||
auto [outStage, highBoundSlot, highBoundTestingSlot] =
|
||||
getDocumentBoundSlot(std::move(stage));
|
||||
stage = std::move(outStage);
|
||||
|
||||
window.highBoundExpr = makeHighBoundExpr(highBoundSlot, highBoundTestingSlot);
|
||||
};
|
||||
auto documentCase = [&](const WindowBounds::DocumentBased& document) {
|
||||
auto makeLowValueExpr = [&](const int& v) {
|
||||
auto [lowBoundSlot, lowBoundTestingSlot] = getDocumentBoundSlot();
|
||||
auto [outStage, lowBoundSlot, lowBoundTestingSlot] =
|
||||
getDocumentBoundSlot(std::move(stage));
|
||||
stage = std::move(outStage);
|
||||
|
||||
window.lowBoundExpr = makeLowBoundExpr(
|
||||
lowBoundSlot,
|
||||
lowBoundTestingSlot,
|
||||
{sbe::value::TypeTags::NumberInt32, sbe::value::bitcastFrom<int>(v)});
|
||||
};
|
||||
auto makeHighValueExpr = [&](const int& v) {
|
||||
auto [highBoundSlot, highBoundTestingSlot] = getDocumentBoundSlot();
|
||||
auto [outStage, highBoundSlot, highBoundTestingSlot] =
|
||||
getDocumentBoundSlot(std::move(stage));
|
||||
stage = std::move(outStage);
|
||||
|
||||
window.highBoundExpr = makeHighBoundExpr(
|
||||
highBoundSlot,
|
||||
highBoundTestingSlot,
|
||||
|
|
@ -5223,8 +5268,13 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
|
|||
document.upper);
|
||||
};
|
||||
auto rangeCase = [&](const WindowBounds::RangeBased& range) {
|
||||
auto rangeBoundSlot = getRangeBoundSlot(range.unit).first;
|
||||
auto rangeBoundTestingSlot = getRangeBoundSlot(range.unit).second;
|
||||
auto [outStage, outRbSlot, outRbTestingSlot] =
|
||||
getRangeBoundSlot(std::move(stage), range.unit);
|
||||
|
||||
stage = std::move(outStage);
|
||||
auto rangeBoundSlot = std::move(outRbSlot);
|
||||
auto rangeBoundTestingSlot = std::move(outRbTestingSlot);
|
||||
|
||||
auto makeLowValueExpr = [&](const Value& v) {
|
||||
window.lowBoundExpr = makeLowBoundExpr(
|
||||
rangeBoundSlot, rangeBoundTestingSlot, sbe::value::makeValue(v), range.unit);
|
||||
|
|
@ -5247,9 +5297,19 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
|
|||
makeFunction("aggLinearFillCanAdd", makeVariable(window.windowExprSlots[0]));
|
||||
}
|
||||
|
||||
return stage;
|
||||
}
|
||||
|
||||
SlotId generateFinalExpr(const WindowFunctionStatement& outputField,
|
||||
const AccumulationStatement& accStmt,
|
||||
bool removable,
|
||||
StringDataEExprMap argExprs,
|
||||
const sbe::WindowStage::Window& window) {
|
||||
using ExpressionWithUnit = window_function::ExpressionWithUnit;
|
||||
|
||||
// Build extra arguments for finalize expressions.
|
||||
auto getModifiedExpr = [&](std::unique_ptr<sbe::EExpression> argExpr,
|
||||
sbe::value::SlotVector& newSlots) {
|
||||
SlotVector& newSlots) {
|
||||
if (auto varExpr = argExpr->as<sbe::EVariable>(); varExpr) {
|
||||
auto idx = ensureSlotInBuffer(varExpr->getSlotId());
|
||||
return makeVariable(newSlots[idx]);
|
||||
|
|
@ -5262,8 +5322,9 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
|
|||
|
||||
StringDataMap<std::unique_ptr<sbe::EExpression>> finalArgExprs;
|
||||
if (outputField.expr->getOpName() == "$derivative") {
|
||||
auto unit = getUnitArg(
|
||||
dynamic_cast<window_function::ExpressionWithUnit*>(outputField.expr.get()));
|
||||
auto u = dynamic_cast<ExpressionWithUnit*>(outputField.expr.get())->unitInMillis();
|
||||
auto unit = u ? makeInt64Constant(*u) : makeNullConstant();
|
||||
|
||||
auto it = argExprs.find(AccArgs::kInput);
|
||||
tassert(7993401,
|
||||
str::stream() << "Window function expects '" << AccArgs::kInput << "' argument",
|
||||
|
|
@ -5351,7 +5412,9 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
|
|||
}
|
||||
|
||||
// Deal with empty window for finalize expressions.
|
||||
auto emptyWindowExpr = [&](StringData accExprName) {
|
||||
auto emptyWindowExpr = [&] {
|
||||
StringData accExprName = outputField.expr->getOpName();
|
||||
|
||||
if (accExprName == "$sum") {
|
||||
return makeConstant(sbe::value::TypeTags::NumberInt32, 0);
|
||||
} else if (accExprName == "$push" || accExprName == AccumulatorAddToSet::kName) {
|
||||
|
|
@ -5362,7 +5425,8 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
|
|||
} else {
|
||||
return makeConstant(sbe::value::TypeTags::Null, 0);
|
||||
}
|
||||
}(outputField.expr->getOpName());
|
||||
}();
|
||||
|
||||
if (finalExpr) {
|
||||
finalExpr = sbe::makeE<sbe::EIf>(
|
||||
makeFunction("exists", makeVariable(window.windowExprSlots[0])),
|
||||
|
|
@ -5376,6 +5440,100 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
|
|||
auto finalSlot = _slotIdGenerator.generate();
|
||||
windowFinalProjects.emplace_back(finalSlot, std::move(finalExpr));
|
||||
windowFinalSlots.push_back(finalSlot);
|
||||
|
||||
return finalSlot;
|
||||
}
|
||||
|
||||
private:
|
||||
StageBuilderState& _state;
|
||||
sbe::value::SlotIdGenerator& _slotIdGenerator;
|
||||
const PlanStageReqs& forwardingReqs;
|
||||
const PlanStageSlots& outputs;
|
||||
boost::optional<TypedSlot> rootSlotOpt;
|
||||
const WindowNode* windowNode;
|
||||
const CanonicalQuery& _cq;
|
||||
boost::optional<SlotId> collatorSlot;
|
||||
SbBuilder b;
|
||||
|
||||
SlotVector currSlots;
|
||||
SlotVector boundTestingSlots;
|
||||
std::vector<SlotVector> windowFrameFirstSlots;
|
||||
std::vector<SlotVector> windowFrameLastSlots;
|
||||
|
||||
// Calculate slot for document position based window bounds, and add corresponding stages.
|
||||
boost::optional<SlotId> documentBoundSlot;
|
||||
|
||||
// Calculate sort-by slot, and add corresponding stages.
|
||||
boost::optional<SlotId> sortBySlot;
|
||||
|
||||
// Calculate slot for range and time range based window bounds
|
||||
boost::optional<SlotId> rangeBoundSlot;
|
||||
boost::optional<SlotId> timeRangeBoundSlot;
|
||||
|
||||
std::vector<boost::optional<size_t>> windowFrameFirstSlotIdx;
|
||||
std::vector<boost::optional<size_t>> windowFrameLastSlotIdx;
|
||||
|
||||
// We project window function input arguments in order to avoid repeated evaluation
|
||||
// for both add and remove expressions.
|
||||
sbe::SlotExprPairVector windowArgProjects;
|
||||
|
||||
sbe::value::SlotVector windowFinalSlots;
|
||||
sbe::SlotExprPairVector windowFinalProjects;
|
||||
};
|
||||
|
||||
WindowStageBuilder::BuildOutput WindowStageBuilder::build(SbStage stage) {
|
||||
// Get stages for partition by.
|
||||
auto [outStage, partitionSlotCount] = generatePartitionExpr(std::move(stage));
|
||||
stage = std::move(outStage);
|
||||
|
||||
// Calculate list of forward slots.
|
||||
ensureForwardSlotsInBuffer();
|
||||
|
||||
// Generate list of the window output paths.
|
||||
auto windowFields = getWindowOutputPaths();
|
||||
|
||||
// Creating window definitions, including the slots and expressions for the bounds and
|
||||
// accumulators.
|
||||
std::vector<sbe::WindowStage::Window> windows;
|
||||
StringMap<SlotId> outputPathMap;
|
||||
|
||||
for (size_t i = 0; i < windowNode->outputFields.size(); i++) {
|
||||
auto& outputField = windowNode->outputFields[i];
|
||||
|
||||
WindowBounds windowBounds = outputField.expr->bounds();
|
||||
|
||||
// Check whether window is removable or not.
|
||||
bool removable = isWindowRemovable(windowBounds);
|
||||
|
||||
// Create a fake accumulation statement for non-removable window bounds.
|
||||
auto accStmt = createFakeAccumulationStatement(_state, outputField);
|
||||
|
||||
auto [outStage, initExprArgs, argExprs] =
|
||||
generateArgs(std::move(stage), outputField, removable);
|
||||
stage = std::move(outStage);
|
||||
|
||||
sbe::WindowStage::Window window{};
|
||||
|
||||
// Create init/add/remove expressions.
|
||||
stage = generateInitsAddsAndRemoves(std::move(stage),
|
||||
outputField,
|
||||
accStmt,
|
||||
removable,
|
||||
std::move(initExprArgs),
|
||||
argExprs,
|
||||
window);
|
||||
|
||||
// Create frame first and last slots if the window requires.
|
||||
createFrameFirstAndLastSlots(outputField, removable);
|
||||
|
||||
// Build bound expressions.
|
||||
stage = generateBoundExprs(std::move(stage), outputField, windowBounds, window);
|
||||
|
||||
// Build extra arguments for finalize expressions.
|
||||
auto finalSlot =
|
||||
generateFinalExpr(outputField, accStmt, removable, std::move(argExprs), window);
|
||||
|
||||
// Append the window definition to the end of the 'windows' vector.
|
||||
windows.emplace_back(std::move(window));
|
||||
|
||||
// If 'outputField' is not a dotted path, add 'outputField' and its corresponding slot
|
||||
|
|
@ -5416,8 +5574,38 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
|
|||
stage = sbe::makeS<sbe::ProjectStage>(
|
||||
std::move(stage), std::move(windowFinalProjects), windowNode->nodeId());
|
||||
|
||||
// Now that we're done generating all the expressions we need to generate, we can finally
|
||||
// update the kField slots in 'outputs' to reflect the effects of this stage.
|
||||
return {std::move(stage),
|
||||
std::move(windowFields),
|
||||
std::move(windowFinalSlots),
|
||||
std::move(outputPathMap)};
|
||||
}
|
||||
} // namespace
|
||||
|
||||
std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder::buildWindow(
|
||||
const QuerySolutionNode* root, const PlanStageReqs& reqs) {
|
||||
auto windowNode = static_cast<const WindowNode*>(root);
|
||||
|
||||
auto [reqResult, childReqs, forwardingReqs] = computeChildReqsForWindow(reqs, windowNode);
|
||||
|
||||
auto child = root->children[0].get();
|
||||
auto childStageOutput = build(child, childReqs);
|
||||
auto stage = std::move(childStageOutput.first);
|
||||
auto outputs = std::move(childStageOutput.second);
|
||||
|
||||
auto collatorSlot = _state.getCollatorSlot();
|
||||
|
||||
auto sv = _data->metadataSlots.getSlotVector();
|
||||
|
||||
// Create a WindowStageBuilder and call the build() method on it. This will generate all
|
||||
// the SBE expressions and SBE stages needed to implement the window stage.
|
||||
WindowStageBuilder builder(
|
||||
_state, forwardingReqs, outputs, windowNode, _cq, collatorSlot, std::move(sv));
|
||||
|
||||
auto [outStage, windowFields, windowFinalSlots, outputPathMap] =
|
||||
builder.build(std::move(stage));
|
||||
stage = std::move(outStage);
|
||||
|
||||
// Update the kField slots in 'outputs' to reflect the effects of this stage.
|
||||
for (auto&& windowField : windowFields) {
|
||||
outputs.clearFieldAndAllPrefixes(windowField);
|
||||
}
|
||||
|
|
@ -5427,6 +5615,8 @@ std::pair<std::unique_ptr<sbe::PlanStage>, PlanStageSlots> SlotBasedStageBuilder
|
|||
|
||||
// Produce a materialized result object if needed.
|
||||
if (reqResult) {
|
||||
SbBuilder b(_state, windowNode->nodeId());
|
||||
|
||||
std::vector<ProjectNode> nodes;
|
||||
for (size_t i = 0; i < windowFields.size(); ++i) {
|
||||
nodes.emplace_back(SbExpr{windowFinalSlots[i]});
|
||||
|
|
|
|||
Loading…
Reference in New Issue