diff --git a/src/recovery/log_manager.cpp b/src/recovery/log_manager.cpp index 75b2bcafca097bd99c59fa3bdebcee52a308b3e9..df50caca460164482f4f6db98a59d1cfe3864bf2 100644 --- a/src/recovery/log_manager.cpp +++ b/src/recovery/log_manager.cpp @@ -16,21 +16,16 @@ See the Mulan PSL v2 for more details. */ * @param {LogRecord*} log_record è¦å†™å…¥ç¼“冲区的日志记录 * @return {lsn_t} è¿”å›žè¯¥æ—¥å¿—çš„æ—¥å¿—è®°å½•å· */ + lsn_t LogManager::add_log_to_buffer(LogRecord* log_record) { std::scoped_lock lock{ latch_ }; log_record->lsn_ = global_lsn_++; - if (log_buffer_.is_full(log_record->log_tot_len_)) { - flush_log_to_disk(); - } - char* dest = new char[log_record->log_tot_len_]; - log_record->serialize(dest); - memcpy(log_buffer_.buffer_ + log_buffer_.offset_, dest, log_record->log_tot_len_); - log_buffer_.offset_ += log_record->log_tot_len_; - delete[] dest; + char* src = new char[log_record->log_tot_len_]; + log_record->serialize(src); + bool write = log_buffer_.write_log_buf(src, log_record->log_tot_len_, disk_manager_); + if (write) persist_lsn_ = global_lsn_ - 1; + delete[] src; - // test, 强制flush - // if (log_buffer_.offset_ != 0) flush_log_to_disk(); - return log_record->lsn_; } @@ -38,9 +33,8 @@ lsn_t LogManager::add_log_to_buffer(LogRecord* log_record) { * @description: 把日志缓冲区的内容刷到ç£ç›˜ä¸ï¼Œç”±äºŽç›®å‰åªè®¾ç½®äº†ä¸€ä¸ªç¼“å†²åŒºï¼Œå› æ¤éœ€è¦é˜»å¡žå…¶ä»–日志æ“作 */ void LogManager::flush_log_to_disk() { - // std::scoped_lock lock{ latch_ }; - disk_manager_->write_log(log_buffer_.buffer_, log_buffer_.offset_); - memset(log_buffer_.buffer_, 0, LOG_BUFFER_SIZE + 1); - log_buffer_.offset_ = 0; - persist_lsn_ = global_lsn_ - 1; // global_lsn_始终维æŒä¸ºä¸‹ä¸€é¡¹æ“作的lsn -} + std::scoped_lock lock{ latch_ }; + // æ¤å¤„åªæœ‰é€€å‡ºå¼ºåˆ¶åˆ·ç›˜æ‰ä¼šè°ƒç”¨ + log_buffer_.force_flush(disk_manager_); + persist_lsn_ = global_lsn_ - 1; +} \ No newline at end of file diff --git a/src/recovery/log_manager.h b/src/recovery/log_manager.h index 7a84ef23da840807f3ebc33b562bec0198ea8ec1..3db18c6059ab4716f28696f463c2b8fd4a64ce49 100644 --- a/src/recovery/log_manager.h +++ b/src/recovery/log_manager.h @@ -13,6 +13,8 @@ See the Mulan PSL v2 for more details. */ #include <mutex> #include <vector> #include <iostream> +#include <thread> + #include "log_defs.h" #include "common/config.h" #include "record/rm_defs.h" @@ -403,18 +405,74 @@ public: class LogBuffer { public: LogBuffer() { - offset_ = 0; - memset(buffer_, 0, sizeof(buffer_)); + offset_ = 0; + ready = false; + memset(buffer_0, 0, sizeof(buffer_0)); + memset(buffer_1, 0, sizeof(buffer_1)); + } + + void force_flush(DiskManager* disk_manager) { + // std::cout << "force flush\n"; + if (ready) { + disk_manager->write_log(buffer_1, offset_); + ready = false; + } else { + disk_manager->write_log(buffer_0, offset_); + ready = true; + } + offset_ = 0; + } + + // 返回是å¦å†™å…¥ + bool write_log_buf(char* src, int append_size, DiskManager* disk_manager) { + bool write = false; + char* buf = get_buffer(append_size, disk_manager, write); + memcpy(buf + offset_, src, append_size); + offset_ += append_size; + // if (write) std::cout << "write_to_disk, offset = " << offset_ << "\n"; + // else std::cout << "not write_to_disk, offset = " << offset_ << "\n"; + return write; + } + + // 用于recovery的读 + char* get_buffer() { + // åªç”¨buffer_0, 䏿”¹åЍoffsetå³å¯ + return buffer_0; } - bool is_full(int append_size) { - if(offset_ + append_size > LOG_BUFFER_SIZE) +private: + char buffer_0[LOG_BUFFER_SIZE/2+1]; + char buffer_1[LOG_BUFFER_SIZE/2+1]; + std::thread write_buf; + bool ready; // 表示哪一个bufferæ£åœ¨æŽ¥å—写入(false为0, true为1) + int offset_; // 写入logçš„offset + + // 返回是å¦å†™å…¥ + bool full_write(int append_size, DiskManager* disk_manager) { + if(offset_ + append_size > LOG_BUFFER_SIZE/2) { + if (write_buf.joinable()) { + write_buf.join(); + } + if (ready) { + write_buf = std::thread(&DiskManager::write_log, disk_manager, buffer_1, offset_); + ready = false; + } else { + write_buf = std::thread(&DiskManager::write_log, disk_manager, buffer_0, offset_); + ready = true; + } + offset_ = 0; return true; + } return false; } - char buffer_[LOG_BUFFER_SIZE+1]; - int offset_; // 写入logçš„offset + // 返回å¯å†™å…¥ç¼“冲指针,指å‘å¯å†™ç¼“冲区头 + // write返回是å¦å†™å…¥ + char* get_buffer(int append_size, DiskManager* disk_manager, bool& write) { + write = full_write(append_size, disk_manager); + if (ready) return buffer_1; + else return buffer_0; + } }; /* 日志管ç†å™¨ï¼Œè´Ÿè´£æŠŠæ—¥å¿—å†™å…¥æ—¥å¿—ç¼“å†²åŒºï¼Œä»¥åŠæŠŠæ—¥å¿—ç¼“å†²åŒºä¸çš„内容写入ç£ç›˜ä¸ */ diff --git a/src/recovery/log_manager_old.cpp b/src/recovery/log_manager_old.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d11a8335ba3655eec3ce4c88291c1430434925aa --- /dev/null +++ b/src/recovery/log_manager_old.cpp @@ -0,0 +1,46 @@ +/* Copyright (c) 2023 Renmin University of China +RMDB is licensed under Mulan PSL v2. +You can use this software according to the terms and conditions of the Mulan PSL v2. +You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 +THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +See the Mulan PSL v2 for more details. */ + +#include <cstring> +#include "log_manager.h" + +/** + * @description: æ·»åŠ æ—¥å¿—è®°å½•åˆ°æ—¥å¿—ç¼“å†²åŒºä¸ï¼Œå¹¶è¿”å›žæ—¥å¿—è®°å½•å· + * @param {LogRecord*} log_record è¦å†™å…¥ç¼“冲区的日志记录 + * @return {lsn_t} è¿”å›žè¯¥æ—¥å¿—çš„æ—¥å¿—è®°å½•å· + */ +lsn_t LogManager::add_log_to_buffer(LogRecord* log_record) { + std::scoped_lock lock{ latch_ }; + log_record->lsn_ = global_lsn_++; + if (log_buffer_.is_full(log_record->log_tot_len_)) { + flush_log_to_disk(); + } + char* dest = new char[log_record->log_tot_len_]; + log_record->serialize(dest); + memcpy(log_buffer_.buffer_ + log_buffer_.offset_, dest, log_record->log_tot_len_); + log_buffer_.offset_ += log_record->log_tot_len_; + delete[] dest; + + // test, 强制flush + // if (log_buffer_.offset_ != 0) flush_log_to_disk(); + + return log_record->lsn_; +} + +/** + * @description: 把日志缓冲区的内容刷到ç£ç›˜ä¸ï¼Œç”±äºŽç›®å‰åªè®¾ç½®äº†ä¸€ä¸ªç¼“å†²åŒºï¼Œå› æ¤éœ€è¦é˜»å¡žå…¶ä»–日志æ“作 + */ +void LogManager::flush_log_to_disk() { + // std::scoped_lock lock{ latch_ }; + disk_manager_->write_log(log_buffer_.buffer_, log_buffer_.offset_); + memset(log_buffer_.buffer_, 0, LOG_BUFFER_SIZE + 1); + log_buffer_.offset_ = 0; + persist_lsn_ = global_lsn_ - 1; // global_lsn_始终维æŒä¸ºä¸‹ä¸€é¡¹æ“作的lsn +} \ No newline at end of file diff --git a/src/recovery/log_manager_old.h b/src/recovery/log_manager_old.h new file mode 100644 index 0000000000000000000000000000000000000000..56d7a20efa686fa3ed1776850952d17b8df6a368 --- /dev/null +++ b/src/recovery/log_manager_old.h @@ -0,0 +1,443 @@ +/* Copyright (c) 2023 Renmin University of China +RMDB is licensed under Mulan PSL v2. +You can use this software according to the terms and conditions of the Mulan PSL v2. +You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 +THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +See the Mulan PSL v2 for more details. */ + +#pragma once + +#include <mutex> +#include <vector> +#include <iostream> +#include <thread> + +#include "log_defs.h" +#include "common/config.h" +#include "record/rm_defs.h" + +/* 日志记录对应æ“作的类型 */ +enum LogType: int { + UPDATE = 0, + INSERT, + DELETE, + begin, + commit, + ABORT, + CHECK_POINT +}; +static std::string LogTypeStr[] = { + "UPDATE", + "INSERT", + "DELETE", + "BEGIN", + "COMMIT", + "ABORT", + "CHECK_POINT" +}; + +class LogRecord { +public: + LogType log_type_; /* 日志对应æ“作的类型 */ + lsn_t lsn_; /* 当剿—¥å¿—çš„lsn */ + uint32_t log_tot_len_; /* 整个日志记录的长度 */ + txn_id_t log_tid_; /* åˆ›å»ºå½“å‰æ—¥å¿—的事务ID */ + lsn_t prev_lsn_; /* 事务创建的å‰ä¸€æ¡æ—¥å¿—记录的lsn,用于undo */ + + // 把日志记录åºåˆ—化到destä¸ + virtual void serialize (char* dest) const { + memcpy(dest + OFFSET_LOG_TYPE, &log_type_, sizeof(LogType)); + memcpy(dest + OFFSET_LSN, &lsn_, sizeof(lsn_t)); + memcpy(dest + OFFSET_LOG_TOT_LEN, &log_tot_len_, sizeof(uint32_t)); + memcpy(dest + OFFSET_LOG_TID, &log_tid_, sizeof(txn_id_t)); + memcpy(dest + OFFSET_PREV_LSN, &prev_lsn_, sizeof(lsn_t)); + } + // 从srcä¸ååºåˆ—åŒ–å‡ºä¸€æ¡æ—¥å¿—记录 + virtual void deserialize(const char* src) { + log_type_ = *reinterpret_cast<const LogType*>(src); + lsn_ = *reinterpret_cast<const lsn_t*>(src + OFFSET_LSN); + log_tot_len_ = *reinterpret_cast<const uint32_t*>(src + OFFSET_LOG_TOT_LEN); + log_tid_ = *reinterpret_cast<const txn_id_t*>(src + OFFSET_LOG_TID); + prev_lsn_ = *reinterpret_cast<const lsn_t*>(src + OFFSET_PREV_LSN); + } + // used for debug + virtual void format_print() { + std::cout << "log type in father_function: " << LogTypeStr[log_type_] << "\n"; + printf("Print Log Record:\n"); + printf("log_type_: %s\n", LogTypeStr[log_type_].c_str()); + printf("lsn: %d\n", lsn_); + printf("log_tot_len: %d\n", log_tot_len_); + printf("log_tid: %d\n", log_tid_); + printf("prev_lsn: %d\n\n", prev_lsn_); + } +}; + +class BeginLogRecord: public LogRecord { +public: + BeginLogRecord() { + log_type_ = LogType::begin; + lsn_ = INVALID_LSN; + log_tot_len_ = LOG_HEADER_SIZE; + log_tid_ = INVALID_TXN_ID; + prev_lsn_ = INVALID_LSN; + } + BeginLogRecord(txn_id_t txn_id, lsn_t prev_lsn) : BeginLogRecord() { + log_tid_ = txn_id; + prev_lsn_ = prev_lsn; + } + // åºåˆ—化Begin日志记录到destä¸ + void serialize(char* dest) const override { + LogRecord::serialize(dest); + } + // 从srcä¸ååºåˆ—化出一æ¡Begin日志记录 + void deserialize(const char* src) override { + LogRecord::deserialize(src); + } + void format_print() override { + std::cout << "log type in son_function: " << LogTypeStr[log_type_] << "\n"; + LogRecord::format_print(); + } +}; + +class CheckPointLogRecord: public LogRecord { +public: + CheckPointLogRecord() { + log_type_ = LogType::CHECK_POINT; + lsn_ = INVALID_LSN; + log_tot_len_ = LOG_HEADER_SIZE; + log_tid_ = INVALID_TXN_ID; + prev_lsn_ = INVALID_LSN; + } + // åºåˆ—化Begin日志记录到destä¸ + void serialize(char* dest) const override { + LogRecord::serialize(dest); + } + // 从srcä¸ååºåˆ—化出一æ¡Begin日志记录 + void deserialize(const char* src) override { + LogRecord::deserialize(src); + } + void format_print() override { + std::cout << "log type in son_function: " << LogTypeStr[log_type_] << "\n"; + LogRecord::format_print(); + } +}; + +/** + * TODO: commitæ“作的日志记录 +*/ +class CommitLogRecord: public LogRecord { +public: + CommitLogRecord() { + log_type_ = LogType::commit; + lsn_ = INVALID_LSN; + log_tot_len_ = LOG_HEADER_SIZE; + log_tid_ = INVALID_TXN_ID; + prev_lsn_ = INVALID_LSN; + } + CommitLogRecord(txn_id_t txn_id, lsn_t prev_lsn) : CommitLogRecord() { + log_tid_ = txn_id; + prev_lsn_ = prev_lsn; + } + void serialize(char* dest) const override { + LogRecord::serialize(dest); + } + void deserialize(const char* src) override { + LogRecord::deserialize(src); + } + void format_print() override { + std::cout << "log type in son_function: " << LogTypeStr[log_type_] << "\n"; + LogRecord::format_print(); + } +}; + +/** + * TODO: abortæ“作的日志记录 +*/ +class AbortLogRecord: public LogRecord { +public: + AbortLogRecord() { + log_type_ = LogType::ABORT; + lsn_ = INVALID_LSN; + log_tot_len_ = LOG_HEADER_SIZE; + log_tid_ = INVALID_TXN_ID; + prev_lsn_ = INVALID_LSN; + } + AbortLogRecord(txn_id_t txn_id, lsn_t prev_lsn) : AbortLogRecord() { + log_tid_ = txn_id; + prev_lsn_ = prev_lsn; + } + void serialize(char* dest) const override { + LogRecord::serialize(dest); + } + void deserialize(const char* src) override { + LogRecord::deserialize(src); + } + void format_print() override { + std::cout << "log type in son_function: " << LogTypeStr[log_type_] << "\n"; + LogRecord::format_print(); + } +}; + +class InsertLogRecord: public LogRecord { +public: + InsertLogRecord() { + log_type_ = LogType::INSERT; + lsn_ = INVALID_LSN; + log_tot_len_ = LOG_HEADER_SIZE; + log_tid_ = INVALID_TXN_ID; + prev_lsn_ = INVALID_LSN; + table_name_ = nullptr; + } + InsertLogRecord(txn_id_t txn_id, lsn_t prev_lsn, RmRecord& insert_value, Rid& rid, std::string table_name) + : InsertLogRecord() { + log_tid_ = txn_id; + prev_lsn_ = prev_lsn; + insert_value_ = insert_value; + rid_ = rid; + log_tot_len_ += sizeof(int); + log_tot_len_ += insert_value_.size; + log_tot_len_ += sizeof(Rid); + table_name_size_ = table_name.length(); + table_name_ = new char[table_name_size_]; + memcpy(table_name_, table_name.c_str(), table_name_size_); + log_tot_len_ += sizeof(size_t) + table_name_size_; + } + ~InsertLogRecord() { + delete[] table_name_; + } + // 把insert日志记录åºåˆ—化到destä¸ + void serialize(char* dest) const override { + LogRecord::serialize(dest); + int offset = OFFSET_LOG_DATA; + memcpy(dest + offset, &insert_value_.size, sizeof(int)); + offset += sizeof(int); + memcpy(dest + offset, insert_value_.data, insert_value_.size); + offset += insert_value_.size; + memcpy(dest + offset, &rid_, sizeof(Rid)); + offset += sizeof(Rid); + memcpy(dest + offset, &table_name_size_, sizeof(size_t)); + offset += sizeof(size_t); + memcpy(dest + offset, table_name_, table_name_size_); + } + // 从srcä¸ååºåˆ—化出一æ¡Insert日志记录 + void deserialize(const char* src) override { + LogRecord::deserialize(src); + insert_value_.Deserialize(src + OFFSET_LOG_DATA); + int offset = OFFSET_LOG_DATA + insert_value_.size + sizeof(int); + rid_ = *reinterpret_cast<const Rid*>(src + offset); + offset += sizeof(Rid); + table_name_size_ = *reinterpret_cast<const size_t*>(src + offset); + offset += sizeof(size_t); + table_name_ = new char[table_name_size_]; + memcpy(table_name_, src + offset, table_name_size_); + } + // æ ¼å¼åŒ–输出 + void format_print() override { + printf("insert record\n"); + LogRecord::format_print(); + printf("insert_value: %s\n", insert_value_.data); + printf("insert rid: %d, %d\n", rid_.page_no, rid_.slot_no); + printf("table name: %s\n", table_name_); + } + + RmRecord insert_value_; // æ’入的记录 + Rid rid_; // 记录æ’入的ä½ç½® + char* table_name_; // æ’入记录的表åç§° + size_t table_name_size_; // 表åç§°çš„å¤§å° +}; + +/** + * TODO: deleteæ“作的日志记录 +*/ +class DeleteLogRecord: public LogRecord { +public: + DeleteLogRecord() { + log_type_ = LogType::DELETE; + lsn_ = INVALID_LSN; + log_tot_len_ = LOG_HEADER_SIZE; + log_tid_ = INVALID_TXN_ID; + prev_lsn_ = INVALID_LSN; + table_name_ = nullptr; + } + DeleteLogRecord(txn_id_t txn_id, lsn_t prev_lsn, RmRecord& delete_value, Rid& rid, std::string table_name) + : DeleteLogRecord() { + log_tid_ = txn_id; + prev_lsn_ = prev_lsn; + delete_value_ = delete_value; + rid_ = rid; + log_tot_len_ += sizeof(int); + log_tot_len_ += delete_value_.size; + log_tot_len_ += sizeof(Rid); + table_name_size_ = table_name.length(); + table_name_ = new char[table_name_size_]; + memcpy(table_name_, table_name.c_str(), table_name_size_); + log_tot_len_ += sizeof(size_t) + table_name_size_; + } + ~DeleteLogRecord() { + delete[] table_name_; + } + // 把delete日志记录åºåˆ—化到destä¸ + void serialize(char* dest) const override { + LogRecord::serialize(dest); + int offset = OFFSET_LOG_DATA; + memcpy(dest + offset, &delete_value_.size, sizeof(int)); + offset += sizeof(int); + memcpy(dest + offset, delete_value_.data, delete_value_.size); + offset += delete_value_.size; + memcpy(dest + offset, &rid_, sizeof(Rid)); + offset += sizeof(Rid); + memcpy(dest + offset, &table_name_size_, sizeof(size_t)); + offset += sizeof(size_t); + memcpy(dest + offset, table_name_, table_name_size_); + } + // 从srcä¸ååºåˆ—化出一æ¡delete日志记录 + void deserialize(const char* src) override { + LogRecord::deserialize(src); + delete_value_.Deserialize(src + OFFSET_LOG_DATA); + int offset = OFFSET_LOG_DATA + delete_value_.size + sizeof(int); + rid_ = *reinterpret_cast<const Rid*>(src + offset); + offset += sizeof(Rid); + table_name_size_ = *reinterpret_cast<const size_t*>(src + offset); + offset += sizeof(size_t); + table_name_ = new char[table_name_size_]; + memcpy(table_name_, src + offset, table_name_size_); + } + void format_print() override { + printf("delete record\n"); + LogRecord::format_print(); + printf("delete_value: %s\n", delete_value_.data); + printf("delete rid: %d, %d\n", rid_.page_no, rid_.slot_no); + printf("table name: %s\n", table_name_); + } + + RmRecord delete_value_; + Rid rid_; + char* table_name_; + size_t table_name_size_; +}; + +/** + * TODO: updateæ“作的日志记录 +*/ +class UpdateLogRecord: public LogRecord { +public: + UpdateLogRecord() { + log_type_ = LogType::UPDATE; + lsn_ = INVALID_LSN; + log_tot_len_ = LOG_HEADER_SIZE; + log_tid_ = INVALID_TXN_ID; + prev_lsn_ = INVALID_LSN; + table_name_ = nullptr; + } + UpdateLogRecord(txn_id_t txn_id, lsn_t prev_lsn, RmRecord& raw_value, RmRecord& update_value, Rid& rid, std::string table_name) + : UpdateLogRecord() { + log_tid_ = txn_id; + prev_lsn_ = prev_lsn; + raw_value_ = raw_value; + update_value_ = update_value; + rid_ = rid; + log_tot_len_ += sizeof(int); + log_tot_len_ += raw_value_.size; + log_tot_len_ += sizeof(int); + log_tot_len_ += update_value_.size; + log_tot_len_ += sizeof(Rid); + table_name_size_ = table_name.length(); + table_name_ = new char[table_name_size_]; + memcpy(table_name_, table_name.c_str(), table_name_size_); + log_tot_len_ += sizeof(size_t) + table_name_size_; + } + ~UpdateLogRecord() { + delete[] table_name_; + } + // 把update日志记录åºåˆ—化到destä¸ + void serialize(char* dest) const override { + LogRecord::serialize(dest); + int offset = OFFSET_LOG_DATA; + memcpy(dest + offset, &raw_value_.size, sizeof(int)); + offset += sizeof(int); + memcpy(dest + offset, raw_value_.data, raw_value_.size); + offset += raw_value_.size; + memcpy(dest + offset, &update_value_.size, sizeof(int)); + offset += sizeof(int); + memcpy(dest + offset, update_value_.data, update_value_.size); + offset += update_value_.size; + memcpy(dest + offset, &rid_, sizeof(Rid)); + offset += sizeof(Rid); + memcpy(dest + offset, &table_name_size_, sizeof(size_t)); + offset += sizeof(size_t); + memcpy(dest + offset, table_name_, table_name_size_); + } + // 从srcä¸ååºåˆ—化出一æ¡update日志记录 + void deserialize(const char* src) override { + LogRecord::deserialize(src); + int offset = OFFSET_LOG_DATA; + raw_value_.Deserialize(src + offset); + offset += raw_value_.size + sizeof(int); + update_value_.Deserialize(src + offset); + offset += update_value_.size + sizeof(int); + rid_ = *reinterpret_cast<const Rid*>(src + offset); + offset += sizeof(Rid); + table_name_size_ = *reinterpret_cast<const size_t*>(src + offset); + offset += sizeof(size_t); + table_name_ = new char[table_name_size_]; + memcpy(table_name_, src + offset, table_name_size_); + } + void format_print() override { + printf("update record\n"); + LogRecord::format_print(); + printf("update_value: %s\n", update_value_.data); + printf("update rid: %d, %d\n", rid_.page_no, rid_.slot_no); + printf("table name: %s\n", table_name_); + } + + RmRecord raw_value_; // 更新之å‰çš„值 + RmRecord update_value_; // 更新之å‰çš„值 + Rid rid_; + char* table_name_; + size_t table_name_size_; +}; + +/* æ—¥å¿—ç¼“å†²åŒºï¼Œåªæœ‰ä¸€ä¸ªbufferï¼Œå› æ¤éœ€è¦é˜»å¡žåœ°åŽ»æŠŠæ—¥å¿—å†™å…¥ç¼“å†²åŒºä¸ */ + +class LogBuffer { +public: + LogBuffer() { + offset_ = 0; + memset(buffer_, 0, sizeof(buffer_)); + } + + bool is_full(int append_size) { + if(offset_ + append_size > LOG_BUFFER_SIZE) + return true; + return false; + } + + char buffer_[LOG_BUFFER_SIZE+1]; + int offset_; // 写入logçš„offset +}; + + + +/* 日志管ç†å™¨ï¼Œè´Ÿè´£æŠŠæ—¥å¿—å†™å…¥æ—¥å¿—ç¼“å†²åŒºï¼Œä»¥åŠæŠŠæ—¥å¿—ç¼“å†²åŒºä¸çš„内容写入ç£ç›˜ä¸ */ +class LogManager { +friend class RecoveryManager; +public: + LogManager(DiskManager* disk_manager) { disk_manager_ = disk_manager; } + + lsn_t add_log_to_buffer(LogRecord* log_record); + void flush_log_to_disk(); + + LogBuffer* get_log_buffer() { return &log_buffer_; } + + lsn_t get_global_lsn() const { return global_lsn_; } + +private: + std::atomic<lsn_t> global_lsn_{0}; // 全局lsnï¼Œé€’å¢žï¼Œç”¨äºŽä¸ºæ¯æ¡è®°å½•分å‘lsn + std::mutex latch_; // 用于对log_buffer_的互斥访问 + LogBuffer log_buffer_; // 日志缓冲区 + lsn_t persist_lsn_; // è®°å½•å·²ç»æŒä¹…化到ç£ç›˜ä¸çš„æœ€åŽä¸€æ¡æ—¥å¿—çš„æ—¥å¿—å· + DiskManager* disk_manager_; +}; diff --git a/src/recovery/log_recovery.cpp b/src/recovery/log_recovery.cpp index 32507ac655796a081878ca146af3bb9eaa5b8137..310aa109cb2f0a4509e93457485babc3d219dbe5 100644 --- a/src/recovery/log_recovery.cpp +++ b/src/recovery/log_recovery.cpp @@ -35,15 +35,17 @@ void RecoveryManager::analyze() { sm_manager_->drop_index(index.first, index.second, nullptr); } + char* buffer = buffer_.get_buffer(); + while(1) { - int size = disk_manager_->read_log(buffer_.buffer_, LOG_BUFFER_SIZE, outer_offset); + int size = disk_manager_->read_log(buffer, LOG_BUFFER_SIZE / 2, outer_offset); if (LOG_HEADER_SIZE > size) break; int inner_offset = 0; while (inner_offset + LOG_HEADER_SIZE <= size) { LogRecord log_head; - log_head.deserialize(buffer_.buffer_ + inner_offset); + log_head.deserialize(buffer + inner_offset); // std::cout << LogTypeStr[log_head.log_type_] << ", txn_id = " << log_head.log_tid_ << ", lsn = " << log_head.lsn_ << "\n"; @@ -93,6 +95,8 @@ void RecoveryManager::analyze() { void RecoveryManager::redo() { if (txn_lsns_.size() == 0) return; // std::cout << "-------------------------- redo --------------------------\n"; + + char* buffer = buffer_.get_buffer(); for (auto& [key, value] : txn_lsns_) { std::vector<lsn_t>& redo_vec = value; int redo_num = redo_vec.size(); @@ -103,22 +107,22 @@ void RecoveryManager::redo() { for (int i = 0; i < redo_num; i++) { LogRecord log_head; - int size = disk_manager_->read_log(buffer_.buffer_, LOG_HEADER_SIZE, log_offset_[redo_vec[i]]); + int size = disk_manager_->read_log(buffer, LOG_HEADER_SIZE, log_offset_[redo_vec[i]]); if (size < LOG_HEADER_SIZE) break; - log_head.deserialize(buffer_.buffer_); + log_head.deserialize(buffer); // std::cout << LogTypeStr[log_head.log_type_] << ", txn_id = " << log_head.log_tid_ << ", lsn = " << log_head.lsn_ << "\n"; - size = disk_manager_->read_log(buffer_.buffer_, log_head.log_tot_len_, log_offset_[redo_vec[i]]); + size = disk_manager_->read_log(buffer, log_head.log_tot_len_, log_offset_[redo_vec[i]]); if (size < log_head.log_tot_len_) break; if(log_head.log_type_ == LogType::UPDATE) { UpdateLogRecord up_log; - up_log.deserialize(buffer_.buffer_); + up_log.deserialize(buffer); std::string tab_name = std::string(up_log.table_name_, up_log.table_name_size_); // 表的åå—就是文件å PageId page_id; @@ -126,15 +130,15 @@ void RecoveryManager::redo() { page_id.page_no = up_log.rid_.page_no; Page* page = buffer_pool_manager_->fetch_page(page_id); - // std::cout << "get page lsn: " << page->get_page_lsn() << " uplog lsn: " << up_log.lsn_ << std::endl; + // // std::cout << "get page lsn: " << page->get_page_lsn() << " uplog lsn: " << up_log.lsn_ << std::endl; if (page->get_page_lsn() >= up_log.lsn_) { - // std::cout << "need not redo update " << std::endl; + // // std::cout << "need not redo update " << std::endl; buffer_pool_manager_->unpin_page(page_id, false); } else { - // std::cout << "redo update, page_lsn = " << page->get_page_lsn() << "\n"; + // // std::cout << "redo update, page_lsn = " << page->get_page_lsn() << "\n"; RmFileHdr file_hdr = sm_manager_->fhs_.at(tab_name).get()->get_file_hdr(); RmPageHandle* page_handle = new RmPageHandle(&file_hdr, page); // char* data = new char[up_log.update_value_.size]; @@ -142,7 +146,7 @@ void RecoveryManager::redo() { memcpy(data, up_log.update_value_.data, up_log.update_value_.size); page->set_page_lsn(up_log.lsn_); - // std::cout << "set page lsn: " << page->get_page_lsn() << std::endl; + // // std::cout << "set page lsn: " << page->get_page_lsn() << std::endl; buffer_pool_manager_->unpin_page(page_id, true); } @@ -150,7 +154,7 @@ void RecoveryManager::redo() { else if(log_head.log_type_ == LogType::INSERT) { InsertLogRecord ins_log; - ins_log.deserialize(buffer_.buffer_); + ins_log.deserialize(buffer); std::string tab_name = std::string(ins_log.table_name_, ins_log.table_name_size_); PageId page_id; @@ -158,14 +162,14 @@ void RecoveryManager::redo() { page_id.page_no = ins_log.rid_.page_no; Page* page = buffer_pool_manager_->fetch_page(page_id); - // std::cout << "get page lsn: " << page->get_page_lsn() << " inslog lsn: " << ins_log.lsn_ << std::endl; + // // std::cout << "get page lsn: " << page->get_page_lsn() << " inslog lsn: " << ins_log.lsn_ << std::endl; if (page->get_page_lsn() >= ins_log.lsn_) { - // std::cout << "need not redo insert " << std::endl; + // // std::cout << "need not redo insert " << std::endl; buffer_pool_manager_->unpin_page(page_id, false); } else { - // std::cout << "redo insert, page_lsn = " << page->get_page_lsn() << "\n"; + // // std::cout << "redo insert, page_lsn = " << page->get_page_lsn() << "\n"; RmFileHdr file_hdr = sm_manager_->fhs_.at(tab_name).get()->get_file_hdr(); RmPageHandle* page_handle = new RmPageHandle(&file_hdr, page); page_handle->page_hdr->num_records++; @@ -175,14 +179,14 @@ void RecoveryManager::redo() { Bitmap::set(page_handle->bitmap, ins_log.rid_.slot_no); page->set_page_lsn(ins_log.lsn_); - // std::cout << "set page lsn: " << page->get_page_lsn() << std::endl; + // // std::cout << "set page lsn: " << page->get_page_lsn() << std::endl; buffer_pool_manager_->unpin_page(page_id, true); } } else if(log_head.log_type_ == LogType::DELETE) { DeleteLogRecord del_log; - del_log.deserialize(buffer_.buffer_); + del_log.deserialize(buffer); std::string tab_name = std::string(del_log.table_name_, del_log.table_name_size_); PageId page_id; @@ -190,22 +194,22 @@ void RecoveryManager::redo() { page_id.page_no = del_log.rid_.page_no; Page* page = buffer_pool_manager_->fetch_page(page_id); - // std::cout << "get page lsn: " << page->get_page_lsn() << " dellog lsn: " << del_log.lsn_ << std::endl; + // // std::cout << "get page lsn: " << page->get_page_lsn() << " dellog lsn: " << del_log.lsn_ << std::endl; if (page->get_page_lsn() >= del_log.lsn_) { - // std::cout << "need not redo delete " << std::endl; + // // std::cout << "need not redo delete " << std::endl; buffer_pool_manager_->unpin_page(page_id, false); } else { - // std::cout << "redo delete, page_lsn = " << page->get_page_lsn() << "\n"; + // // std::cout << "redo delete, page_lsn = " << page->get_page_lsn() << "\n"; RmFileHdr file_hdr = sm_manager_->fhs_.at(tab_name).get()->get_file_hdr(); RmPageHandle* page_handle = new RmPageHandle(&file_hdr, page); page_handle->page_hdr->num_records--; Bitmap::reset(page_handle->bitmap, del_log.rid_.slot_no); page->set_page_lsn(del_log.lsn_); - // std::cout << "set page lsn: " << page->get_page_lsn() << std::endl; + // // std::cout << "set page lsn: " << page->get_page_lsn() << std::endl; buffer_pool_manager_->unpin_page(page_id, true); } } @@ -213,14 +217,14 @@ void RecoveryManager::redo() { } // é‡å»ºç´¢å¼• - // for (const auto& index : indexes) { - // int cols_num = index.second.size(); - // std::vector<std::string> index_cols(cols_num); - // for (int i = 0; i < cols_num; i++) { - // index_cols[i] = index.second[i].name; - // } - // sm_manager_->create_index(index.first, index_cols, nullptr); - // } + for (const auto& index : indexes) { + int cols_num = index.second.size(); + std::vector<std::string> index_cols(cols_num); + for (int i = 0; i < cols_num; i++) { + index_cols[i] = index.second[i].name; + } + sm_manager_->create_index(index.first, index_cols, nullptr); + } // std::cout << "------------------------ end redo ------------------------\n\n"; } @@ -231,15 +235,16 @@ void RecoveryManager::redo() { void RecoveryManager::undo() { if (active_txns_.size() == 0) return; // std::cout << "-------------------------- undo --------------------------\n"; + char* buffer = buffer_.get_buffer(); // é历所有系统崩溃时还在活跃的事务,并undo for (auto& [key, value] : active_txns_) { lsn_t undo_lsn = value; LogRecord log_head; txn_lsns_.erase(key); while (undo_lsn != INVALID_LSN) { - int size = disk_manager_->read_log(buffer_.buffer_, LOG_BUFFER_SIZE, log_offset_[undo_lsn]); + int size = disk_manager_->read_log(buffer, LOG_BUFFER_SIZE / 2, log_offset_[undo_lsn]); if (size <= 0) break; - log_head.deserialize(buffer_.buffer_); + log_head.deserialize(buffer); undo_lsn = log_head.prev_lsn_; UpdateLogRecord up_log; @@ -256,7 +261,7 @@ void RecoveryManager::undo() { switch (log_head.log_type_) { case UPDATE: - up_log.deserialize(buffer_.buffer_); + up_log.deserialize(buffer); tab_name = std::string(up_log.table_name_, up_log.table_name_size_); // 表的åå—就是文件å page_id.fd = disk_manager_->get_file_fd(tab_name); @@ -279,7 +284,7 @@ void RecoveryManager::undo() { buffer_pool_manager_->unpin_page(page_id, true); break; case INSERT: - ins_log.deserialize(buffer_.buffer_); + ins_log.deserialize(buffer); tab_name = std::string(ins_log.table_name_, ins_log.table_name_size_); page_id.fd = disk_manager_->get_file_fd(tab_name); @@ -301,7 +306,7 @@ void RecoveryManager::undo() { buffer_pool_manager_->unpin_page(page_id, true); break; case DELETE: - del_log.deserialize(buffer_.buffer_); + del_log.deserialize(buffer); tab_name = std::string(del_log.table_name_, del_log.table_name_size_); page_id.fd = disk_manager_->get_file_fd(tab_name); diff --git a/src/recovery/log_recovery_old.cpp b/src/recovery/log_recovery_old.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d35054aeb6b7335d901cf2e0e3a27b15f4b580b7 --- /dev/null +++ b/src/recovery/log_recovery_old.cpp @@ -0,0 +1,350 @@ +/* Copyright (c) 2023 Renmin University of China +RMDB is licensed under Mulan PSL v2. +You can use this software according to the terms and conditions of the Mulan PSL v2. +You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 +THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +See the Mulan PSL v2 for more details. */ + +#include "log_recovery.h" + + +/** + * @description: analyze阶段,需è¦èŽ·å¾—è„页表(DPT)和未完æˆçš„事务列表(ATT) + */ +void RecoveryManager::analyze() { + std::cout << "------------------------ analyze ------------------------\n"; + int outer_offset = 0; + lsn_t max_lsn = 0; + txn_id_t max_txn_id = 0; + // é‡å»ºæ‰€æœ‰æ–‡ä»¶çš„空闲页链表 + for (auto& [key,value] : sm_manager_->fhs_) { + value->recovery_page_list(); + } + + // èŽ·å–æ‰€æœ‰ç´¢å¼• + for (const auto& [key, value] : sm_manager_->db_.tabs_) { + for (const auto& index_meta: value.indexes) { + indexes.emplace_back(index_meta.tab_name, index_meta.cols); + } + } + // 销æ¯ç´¢å¼• + for (const auto& index: indexes) { + sm_manager_->drop_index(index.first, index.second, nullptr); + } + + while(1) { + int size = disk_manager_->read_log(buffer_.buffer_, LOG_BUFFER_SIZE, outer_offset); + + if (LOG_HEADER_SIZE > size) break; + int inner_offset = 0; + + while (inner_offset + LOG_HEADER_SIZE <= size) { + LogRecord log_head; + log_head.deserialize(buffer_.buffer_ + inner_offset); + + std::cout << LogTypeStr[log_head.log_type_] << ", txn_id = " << log_head.log_tid_ << ", lsn = " << log_head.lsn_ << "\n"; + + if (log_head.log_type_ == CHECK_POINT) { + check_point_lsn = log_head.lsn_; + inner_offset += log_head.log_tot_len_; + continue; + } + + max_lsn = std::max(max_lsn, log_head.lsn_); + max_txn_id = std::max(max_txn_id, log_head.log_tid_); + + log_offset_[log_head.lsn_] = outer_offset + inner_offset; // æŠŠæ¯æ¡æ—¥å¿—å®šä½ + txn_lsns_[log_head.log_tid_].push_back(log_head.lsn_); + + if (log_head.log_tot_len_ < 20 || log_head.log_type_ > 6) break; + + inner_offset += log_head.log_tot_len_; + + switch (log_head.log_type_) + { + case UPDATE: + case INSERT: + case DELETE: + case begin: + active_txns_[log_head.log_tid_] = log_head.lsn_; + break; + case commit: + active_txns_.erase(log_head.log_tid_); + break; + case ABORT: + active_txns_.erase(log_head.log_tid_); + break; + } + } + outer_offset += inner_offset; + } + // 设置最大lsn + log_manager_->global_lsn_ = max_lsn + 1; + txn_manager_->set_txn_id(max_txn_id + 1); + std::cout << "---------------------- end analyze ----------------------\n\n"; +} + +/** + * @description: é‡åšæ‰€æœ‰æœªè½ç›˜çš„æ“ä½œ + */ +void RecoveryManager::redo() { + if (txn_lsns_.size() == 0) return; + std::cout << "-------------------------- redo --------------------------\n"; + for (auto& [key, value] : txn_lsns_) { + std::vector<lsn_t>& redo_vec = value; + int redo_num = redo_vec.size(); + + if (redo_vec[redo_num-1] < check_point_lsn) { + continue; + } + + for (int i = 0; i < redo_num; i++) { + LogRecord log_head; + int size = disk_manager_->read_log(buffer_.buffer_, LOG_HEADER_SIZE, log_offset_[redo_vec[i]]); + + if (size < LOG_HEADER_SIZE) break; + + log_head.deserialize(buffer_.buffer_); + + std::cout << LogTypeStr[log_head.log_type_] << ", txn_id = " << log_head.log_tid_ << ", lsn = " << log_head.lsn_ << "\n"; + + size = disk_manager_->read_log(buffer_.buffer_, log_head.log_tot_len_, log_offset_[redo_vec[i]]); + + if (size < log_head.log_tot_len_) break; + + if(log_head.log_type_ == LogType::UPDATE) + { + UpdateLogRecord up_log; + up_log.deserialize(buffer_.buffer_); + std::string tab_name = std::string(up_log.table_name_, up_log.table_name_size_); + // 表的åå—就是文件å + PageId page_id; + page_id.fd = disk_manager_->get_file_fd(tab_name); + page_id.page_no = up_log.rid_.page_no; + Page* page = buffer_pool_manager_->fetch_page(page_id); + + // std::cout << "get page lsn: " << page->get_page_lsn() << " uplog lsn: " << up_log.lsn_ << std::endl; + if (page->get_page_lsn() >= up_log.lsn_) + { + // std::cout << "need not redo update " << std::endl; + buffer_pool_manager_->unpin_page(page_id, false); + } + else + { + // std::cout << "redo update, page_lsn = " << page->get_page_lsn() << "\n"; + RmFileHdr file_hdr = sm_manager_->fhs_.at(tab_name).get()->get_file_hdr(); + RmPageHandle* page_handle = new RmPageHandle(&file_hdr, page); + // char* data = new char[up_log.update_value_.size]; + char* data = page_handle->get_slot(up_log.rid_.slot_no); + memcpy(data, up_log.update_value_.data, up_log.update_value_.size); + + page->set_page_lsn(up_log.lsn_); + // std::cout << "set page lsn: " << page->get_page_lsn() << std::endl; + buffer_pool_manager_->unpin_page(page_id, true); + } + + } + else if(log_head.log_type_ == LogType::INSERT) + { + InsertLogRecord ins_log; + ins_log.deserialize(buffer_.buffer_); + std::string tab_name = std::string(ins_log.table_name_, ins_log.table_name_size_); + + PageId page_id; + page_id.fd = disk_manager_->get_file_fd(tab_name); + page_id.page_no = ins_log.rid_.page_no; + Page* page = buffer_pool_manager_->fetch_page(page_id); + + // std::cout << "get page lsn: " << page->get_page_lsn() << " inslog lsn: " << ins_log.lsn_ << std::endl; + if (page->get_page_lsn() >= ins_log.lsn_) { + // std::cout << "need not redo insert " << std::endl; + buffer_pool_manager_->unpin_page(page_id, false); + } + else + { + // std::cout << "redo insert, page_lsn = " << page->get_page_lsn() << "\n"; + RmFileHdr file_hdr = sm_manager_->fhs_.at(tab_name).get()->get_file_hdr(); + RmPageHandle* page_handle = new RmPageHandle(&file_hdr, page); + page_handle->page_hdr->num_records++; + + char* data = page_handle->get_slot(ins_log.rid_.slot_no); + memcpy(data, ins_log.insert_value_.data, ins_log.insert_value_.size); + Bitmap::set(page_handle->bitmap, ins_log.rid_.slot_no); + + page->set_page_lsn(ins_log.lsn_); + // std::cout << "set page lsn: " << page->get_page_lsn() << std::endl; + buffer_pool_manager_->unpin_page(page_id, true); + } + } + else if(log_head.log_type_ == LogType::DELETE) + { + DeleteLogRecord del_log; + del_log.deserialize(buffer_.buffer_); + std::string tab_name = std::string(del_log.table_name_, del_log.table_name_size_); + + PageId page_id; + page_id.fd = disk_manager_->get_file_fd(tab_name); + page_id.page_no = del_log.rid_.page_no; + Page* page = buffer_pool_manager_->fetch_page(page_id); + + // std::cout << "get page lsn: " << page->get_page_lsn() << " dellog lsn: " << del_log.lsn_ << std::endl; + if (page->get_page_lsn() >= del_log.lsn_) + { + // std::cout << "need not redo delete " << std::endl; + buffer_pool_manager_->unpin_page(page_id, false); + } + else + { + // std::cout << "redo delete, page_lsn = " << page->get_page_lsn() << "\n"; + RmFileHdr file_hdr = sm_manager_->fhs_.at(tab_name).get()->get_file_hdr(); + RmPageHandle* page_handle = new RmPageHandle(&file_hdr, page); + page_handle->page_hdr->num_records--; + + Bitmap::reset(page_handle->bitmap, del_log.rid_.slot_no); + page->set_page_lsn(del_log.lsn_); + // std::cout << "set page lsn: " << page->get_page_lsn() << std::endl; + buffer_pool_manager_->unpin_page(page_id, true); + } + } + } + } + + // é‡å»ºç´¢å¼• + // for (const auto& index : indexes) { + // int cols_num = index.second.size(); + // std::vector<std::string> index_cols(cols_num); + // for (int i = 0; i < cols_num; i++) { + // index_cols[i] = index.second[i].name; + // } + // sm_manager_->create_index(index.first, index_cols, nullptr); + // } + + std::cout << "------------------------ end redo ------------------------\n\n"; +} + +/** + * @description: 回滚未完æˆçš„事务 + */ +void RecoveryManager::undo() { + if (active_txns_.size() == 0) return; + std::cout << "-------------------------- undo --------------------------\n"; + // é历所有系统崩溃时还在活跃的事务,并undo + for (auto& [key, value] : active_txns_) { + lsn_t undo_lsn = value; + LogRecord log_head; + txn_lsns_.erase(key); + while (undo_lsn != INVALID_LSN) { + int size = disk_manager_->read_log(buffer_.buffer_, LOG_BUFFER_SIZE, log_offset_[undo_lsn]); + if (size <= 0) break; + log_head.deserialize(buffer_.buffer_); + undo_lsn = log_head.prev_lsn_; + + UpdateLogRecord up_log; + InsertLogRecord ins_log; + DeleteLogRecord del_log; + RmFileHdr file_hdr; + RmPageHandle* page_handle; + PageId page_id; + Page* page; + char* data = nullptr; + std::string tab_name; + + // 直接对底层åšä¿®æ”¹ + switch (log_head.log_type_) + { + case UPDATE: + up_log.deserialize(buffer_.buffer_); + tab_name = std::string(up_log.table_name_, up_log.table_name_size_); + // 表的åå—就是文件å + page_id.fd = disk_manager_->get_file_fd(tab_name); + page_id.page_no = up_log.rid_.page_no; + page = buffer_pool_manager_->fetch_page(page_id); + + if (page->get_page_lsn() < up_log.lsn_) { + // 还没有æŒä¹…化到ç£ç›˜ä¸ï¼Œæ‰€ä»¥ä¸ç”¨undo + page->set_page_lsn(log_manager_->get_global_lsn()); + buffer_pool_manager_->unpin_page(page_id, false); + break; + } + + file_hdr = sm_manager_->fhs_.at(tab_name).get()->get_file_hdr(); + page_handle = new RmPageHandle(&file_hdr, page); + data = page_handle->get_slot(up_log.rid_.slot_no); + memcpy(data, up_log.raw_value_.data, up_log.raw_value_.size); + // 结åˆaborté¿å…冿¬¡undo + page->set_page_lsn(log_manager_->get_global_lsn()); + buffer_pool_manager_->unpin_page(page_id, true); + break; + case INSERT: + ins_log.deserialize(buffer_.buffer_); + tab_name = std::string(ins_log.table_name_, ins_log.table_name_size_); + + page_id.fd = disk_manager_->get_file_fd(tab_name); + page_id.page_no = ins_log.rid_.page_no; + page = buffer_pool_manager_->fetch_page(page_id); + + if (page->get_page_lsn() < ins_log.lsn_) { + page->set_page_lsn(log_manager_->get_global_lsn()); + buffer_pool_manager_->unpin_page(page_id, false); + break; + } + + file_hdr = sm_manager_->fhs_.at(tab_name).get()->get_file_hdr(); + page_handle = new RmPageHandle(&file_hdr, page); + page_handle->page_hdr->num_records--; + + Bitmap::reset(page_handle->bitmap, ins_log.rid_.slot_no); + page->set_page_lsn(log_manager_->get_global_lsn()); + buffer_pool_manager_->unpin_page(page_id, true); + break; + case DELETE: + del_log.deserialize(buffer_.buffer_); + tab_name = std::string(del_log.table_name_, del_log.table_name_size_); + + page_id.fd = disk_manager_->get_file_fd(tab_name); + page_id.page_no = del_log.rid_.page_no; + page = buffer_pool_manager_->fetch_page(page_id); + + if (page->get_page_lsn() < del_log.lsn_) { + page->set_page_lsn(log_manager_->get_global_lsn()); + buffer_pool_manager_->unpin_page(page_id, false); + break; + } + + file_hdr = sm_manager_->fhs_.at(tab_name).get()->get_file_hdr(); + page_handle = new RmPageHandle(&file_hdr, page); + data = page_handle->get_slot(del_log.rid_.slot_no); + page_handle->page_hdr->num_records++; + memcpy(data, del_log.delete_value_.data, del_log.delete_value_.size); + Bitmap::set(page_handle->bitmap, del_log.rid_.slot_no); + page->set_page_lsn(log_manager_->get_global_lsn()); + buffer_pool_manager_->unpin_page(page_id, true); + break; + } + + std::cout << LogTypeStr[log_head.log_type_] << ", txn_id = " << log_head.log_tid_ << ", lsn = " << log_head.lsn_ << "\n"; + + } + } + + // 使用abort使其ä¸å†undo + for (auto& [key, value] : active_txns_) { + AbortLogRecord abort_log(key, log_manager_->global_lsn_); + lsn_t prev_lsn = log_manager_->add_log_to_buffer(&abort_log); + } + + std::cout << "------------------------ end undo ------------------------\n\n"; +} + +void RecoveryManager::create_checkpoint() { + CheckPointLogRecord check_log; + log_manager_->add_log_to_buffer(&check_log); + log_manager_->flush_log_to_disk(); + + for (const auto& [key, file] : sm_manager_->fhs_) { + buffer_pool_manager_->flush_all_pages(file->GetFd()); + } +} \ No newline at end of file diff --git a/src/rmdb.cpp b/src/rmdb.cpp index ee3bf85c4ab804301a99b039db7991677ceb4578..5123095c5dbce7177b50ab2943719e0ec3d78c1b 100644 --- a/src/rmdb.cpp +++ b/src/rmdb.cpp @@ -98,7 +98,7 @@ void *client_handler(void *sock_fd) { std::string output = "establish client connection, sockfd: " + std::to_string(fd) + "\n"; std::cout << output; - // auto start = std::chrono::high_resolution_clock::now(); + auto start = std::chrono::high_resolution_clock::now(); while (true) { // std::cout << "Waiting for request..." << std::endl; @@ -246,6 +246,10 @@ void *client_handler(void *sock_fd) { // Clear std::cout << "Terminating current client_connection..." << std::endl; + auto end = std::chrono::high_resolution_clock::now(); + std::chrono::duration<double> duration = end - start; + std::cout << "total time: " << duration.count() << "\n"; + close(fd); // close a file descriptor. pthread_exit(NULL); // terminate calling thread! } diff --git a/src/transaction/transaction_manager.cpp b/src/transaction/transaction_manager.cpp index 5463807752082d616cff42c54504a64fabf0276b..1cbc9a977eced54d944566b9bb317708067566f9 100644 --- a/src/transaction/transaction_manager.cpp +++ b/src/transaction/transaction_manager.cpp @@ -106,7 +106,9 @@ void TransactionManager::commit(Transaction* txn, LogManager* log_manager) { lock_manager_->unlock(txn, lock_id); lock_set.clear(); // 3. 写日志 - log_manager->flush_log_to_disk(); // 强行刷盘 + + // yzx 7.28 modify + // log_manager->flush_log_to_disk(); // 强行刷盘 txn->set_state(TransactionState::COMMITTED); } @@ -237,7 +239,9 @@ void TransactionManager::abort(Transaction * txn, LogManager *log_manager) { AbortLogRecord abort_log(txn->get_transaction_id(), txn->get_prev_lsn()); lsn_t prev_lsn = log_manager->add_log_to_buffer(&abort_log); txn->set_prev_lsn(prev_lsn); - log_manager->flush_log_to_disk(); // 强行刷盘 + + // yzx 7.28 modify + // log_manager->flush_log_to_disk(); // 强行刷盘 // 5. æ›´æ–°äº‹åŠ¡çŠ¶æ€ txn->set_state(TransactionState::ABORTED);