Commit e7ae6a35 authored by RuntimeTerror's avatar RuntimeTerror
Browse files

2

Showing with 36 additions and 87 deletions
+36 -87
......@@ -25,64 +25,34 @@ See the Mulan PSL v2 for more details. */
#include "executor_abstract.h"
#include "system/sm_meta.h"
#include "execution/executor_utils.hpp"
class BlockNestedLoopJoinExecutor : public AbstractExecutor {
private:
std::unique_ptr<AbstractExecutor> left_;
std::unique_ptr<AbstractExecutor> right_;
size_t len_, left_len, right_len;
std::vector<ColMeta> cols_; // join后获得的记录的字段
int64_t current_buffer_pos; // pos in the vector -1 means not exist
static constexpr int MAX_BUFFER_SIZE = 2048; // block join buffer size
std::vector<std::unique_ptr<RmRecord>> buffer_vec_;// block join buffer
std::vector<ColMeta> cols_;
std::map<
std::pair<std::string, std::string>,
std::pair<bool, std::vector<ColMeta>::const_iterator>
> table_col_table;
static constexpr int MAX_BUFFER_SIZE = 2048;
bool left_isend = false;
size_t left_ptr = 0, left_size = 0;
std::array<std::unique_ptr<RmRecord>, MAX_BUFFER_SIZE> left_buffer;
bool is_join;
std::vector<Condition> fed_conds_;
std::unique_ptr<RmRecord> current_;
std::pair<bool, std::vector<ColMeta>::const_iterator> findByColMeta(const ColMeta &meta) {
return table_col_table[{meta.tab_name, meta.name}];
}
void getNextBlockBuffer() {
buffer_vec_.clear();
for (size_t i = 0; i < MAX_BUFFER_SIZE && !left_->is_end(); i++, left_->nextTuple()) {
buffer_vec_.emplace_back(left_->Next());
}
}
bool checkCondition(const std::unique_ptr<RmRecord> &left, const std::unique_ptr<RmRecord> &right) {
for (auto &cond: fed_conds_) {
auto lcol = get_col(cols_, cond.lhs_col);
Value lval;
lval.type = lcol->type;
lval.load_raw(lcol->len, left->data + lcol->offset);
Value rval;
if (cond.is_rhs_val) {
rval = cond.rhs_val;
} else {
auto [_, rcol] = findByColMeta({cond.rhs_col.tab_name, cond.rhs_col.col_name});
rval.type = rcol->type;
rval.load_raw(rcol->len, right->data + rcol->offset);
}
if (!binop(cond.op, lval, rval))
return false;
void load_left_buffer() {
left_ptr = 0;
left_isend = left_->is_end();
for (left_size = 0; !left_->is_end() && left_size < MAX_BUFFER_SIZE; left_->nextTuple()) {
left_buffer[left_size++] = std::move(left_->Next());
}
return true;
}
public:
BlockNestedLoopJoinExecutor(std::unique_ptr<AbstractExecutor> left, std::unique_ptr<AbstractExecutor> right,
std::vector<Condition> conds) {
BlockNestedLoopJoinExecutor(std::unique_ptr<AbstractExecutor> left, std::unique_ptr<AbstractExecutor> right, std::vector<Condition> conds) {
left_ = std::move(left);
right_ = std::move(right);
......@@ -98,17 +68,6 @@ public:
cols_.insert(cols_.end(), right_cols.begin(), right_cols.end());
fed_conds_ = std::move(conds);
is_join = (fed_conds_.size() > 0);
for (size_t i = 0; i < left_->cols().size(); i++) {
const auto &vec = left_->cols();
table_col_table[{vec[i].tab_name, vec[i].name}] = {true, left_->cols().begin() + i};
}
for (size_t i = 0; i < right_->cols().size(); i++) {
const auto &vec = right_->cols();
table_col_table[{vec[i].tab_name, vec[i].name}] = {false, right_->cols().begin() + i};
}
current_buffer_pos = -2;
}
void beginTuple() override {
......@@ -118,58 +77,48 @@ public:
right_->beginTuple();
if (right_->is_end())
return;
getNextBlockBuffer();
load_left_buffer();
nextTuple();
}
void nextTuple() override {
while (!(left_->is_end() && right_->is_end())) {
bool next = 0;
if (current_buffer_pos < 0) {
if (current_buffer_pos == -1) {
right_->nextTuple();
if (right_->is_end() && left_->is_end()) {
return;
}
}
while (!left_isend || !right_->is_end()) {
if (left_ptr == left_size) {
right_->nextTuple();
if (right_->is_end() && left_->is_end())
return;
if (right_->is_end()) {
right_->beginTuple();
getNextBlockBuffer();
load_left_buffer();
}
current_buffer_pos = 0;
}
for (; current_buffer_pos < buffer_vec_.size(); current_buffer_pos++) {
auto &left_rec = buffer_vec_[current_buffer_pos];
auto right_rec = right_->Next();
if (checkCondition(left_rec, right_rec)) {
current_ = std::make_unique<RmRecord>(len_);
for (auto col: cols_) {
auto [is_left, it] = table_col_table[{col.tab_name, col.name}];
if (is_left) {
memcpy(current_->data + col.offset, left_rec->data + it->offset, it->len);
} else {
memcpy(current_->data + col.offset, right_rec->data + it->offset, it->len);
}
}
next = true;
break;
}
bool next = false;
for (; left_ptr < left_size; left_ptr += 1) {
if (!executor_utils::checkConds(this->Next(), fed_conds_, cols_))
continue;
next = true;
break;
}
if (next) {
current_buffer_pos++;
left_ptr += 1;
break;
} else {
current_buffer_pos = -1;
}
}
}
virtual std::unique_ptr<RmRecord> Next() override {
auto current_record = std::make_unique<RmRecord>(len_);
std::copy_n(left_buffer[left_ptr]->data, left_len, current_record->data);
auto right_record = right_->Next();
std::copy_n(right_record->data, right_len, current_record->data + left_len);
return current_record;
}
bool is_end() const override { return left_->is_end() && right_->is_end(); }
const std::vector<ColMeta> &cols() const override { return cols_; }
std::unique_ptr<RmRecord> Next() override { return std::make_unique<RmRecord>(*current_); }
size_t tupleLen() const override { return len_; }
std::string getType() override { return "NestedLoopJoinExecutor"; };
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment