Experimental BTree changes

This commit is contained in:
Kae 2024-09-11 15:24:01 +10:00
parent a6b20df3f0
commit 4c78b7365a
2 changed files with 173 additions and 60 deletions

View File

@ -1,6 +1,7 @@
#include "StarBTreeDatabase.hpp"
#include "StarSha256.hpp"
#include "StarVlqEncoding.hpp"
#include "StarLogging.hpp"
namespace Star {
@ -243,7 +244,7 @@ uint32_t BTreeDatabase::freeBlockCount() {
indexBlockIndex = indexBlock.nextFreeBlock;
}
count += m_availableBlocks.size() + m_pendingFree.size();
count += m_availableBlocks.size();
// Include untracked blocks at the end of the file in the free count.
count += (m_device->size() - m_deviceSize) / m_blockSize;
@ -272,7 +273,7 @@ uint32_t BTreeDatabase::leafBlockCount() {
return true;
}
BTreeDatabase* parent;
BTreeDatabase* parent = nullptr;
BlockIndex leafBlockCount = 0;
};
@ -293,8 +294,8 @@ void BTreeDatabase::rollback() {
m_availableBlocks.clear();
m_indexCache.clear();
m_uncommittedWrites.clear();
m_uncommitted.clear();
m_pendingFree.clear();
readRoot();
@ -305,7 +306,8 @@ void BTreeDatabase::rollback() {
void BTreeDatabase::close(bool closeDevice) {
WriteLocker writeLocker(m_lock);
if (m_open) {
doCommit();
if (!tryFlatten())
doCommit();
m_indexCache.clear();
@ -536,7 +538,7 @@ auto BTreeDatabase::BTreeImpl::loadIndex(Pointer pointer) -> Index {
index->pointers.resize(s);
for (uint32_t i = 0; i < s; ++i) {
auto& e = index->pointers[i];
e.key =buffer.readBytes(parent->m_keySize);
e.key = buffer.readBytes(parent->m_keySize);
e.pointer = buffer.read<BlockIndex>();
}
@ -896,17 +898,25 @@ void BTreeDatabase::rawReadBlock(BlockIndex blockIndex, size_t blockOffset, char
if (size <= 0)
return;
m_device->readFullAbsolute(HeaderSize + blockIndex * (StreamOffset)m_blockSize + blockOffset, block, size);
if (auto buffer = m_uncommittedWrites.ptr(blockIndex))
buffer->copyTo(block, blockOffset, size);
else
m_device->readFullAbsolute(HeaderSize + blockIndex * (StreamOffset)m_blockSize + blockOffset, block, size);
}
void BTreeDatabase::rawWriteBlock(BlockIndex blockIndex, size_t blockOffset, char const* block, size_t size) const {
void BTreeDatabase::rawWriteBlock(BlockIndex blockIndex, size_t blockOffset, char const* block, size_t size) {
if (blockOffset > m_blockSize || size > m_blockSize - blockOffset)
throw DBException::format("Write past end of block, offset: {} size {}", blockOffset, size);
if (size <= 0)
return;
m_device->writeFullAbsolute(HeaderSize + blockIndex * (StreamOffset)m_blockSize + blockOffset, block, size);
StreamOffset blockStart = HeaderSize + blockIndex * (StreamOffset)m_blockSize;
auto buffer = m_uncommittedWrites.find(blockIndex);
if (buffer == m_uncommittedWrites.end())
buffer = m_uncommittedWrites.emplace(blockIndex, m_device->readBytesAbsolute(blockStart, m_blockSize)).first;
buffer->second.writeFrom(block, blockOffset, size);
}
auto BTreeDatabase::readFreeIndexBlock(BlockIndex blockIndex) -> FreeIndexBlock {
@ -991,12 +1001,12 @@ auto BTreeDatabase::leafTailBlocks(BlockIndex leafPointer) -> List<BlockIndex> {
}
void BTreeDatabase::freeBlock(BlockIndex b) {
if (m_uncommitted.contains(b)) {
if (m_uncommitted.contains(b))
m_uncommitted.remove(b);
m_availableBlocks.add(b);
} else {
m_pendingFree.append(b);
}
if (m_uncommittedWrites.contains(b))
m_uncommittedWrites.remove(b);
m_availableBlocks.add(b);
}
auto BTreeDatabase::reserveBlock() -> BlockIndex {
@ -1007,10 +1017,7 @@ auto BTreeDatabase::reserveBlock() -> BlockIndex {
FreeIndexBlock indexBlock = readFreeIndexBlock(m_headFreeIndexBlock);
for (auto const& b : indexBlock.freeBlocks)
m_availableBlocks.add(b);
// We cannot make available the block itself, because we must maintain
// atomic consistency. We will need to free this block later and commit
// the new free index block chain.
m_pendingFree.append(m_headFreeIndexBlock);
m_availableBlocks.add(m_headFreeIndexBlock);
m_headFreeIndexBlock = indexBlock.nextFreeBlock;
}
@ -1068,65 +1075,168 @@ void BTreeDatabase::readRoot() {
}
void BTreeDatabase::doCommit() {
if (m_availableBlocks.empty() && m_pendingFree.empty() && m_uncommitted.empty())
if (m_availableBlocks.empty() && m_uncommitted.empty())
return;
if (!m_availableBlocks.empty() || !m_pendingFree.empty()) {
if (!m_availableBlocks.empty()) {
// First, read the existing head FreeIndexBlock, if it exists
FreeIndexBlock indexBlock = FreeIndexBlock{InvalidBlockIndex, {}};
if (m_headFreeIndexBlock != InvalidBlockIndex) {
auto newBlock = [&]() -> BlockIndex {
if (!m_availableBlocks.empty())
return m_availableBlocks.takeFirst();
else
return makeEndBlock();
};
if (m_headFreeIndexBlock != InvalidBlockIndex)
indexBlock = readFreeIndexBlock(m_headFreeIndexBlock);
if (indexBlock.freeBlocks.size() >= maxFreeIndexLength()) {
// If the existing head free index block is full, then we should start a
// new one and leave it alone
indexBlock.nextFreeBlock = m_headFreeIndexBlock;
indexBlock.freeBlocks.clear();
} else {
// If we are copying an existing free index block, the old free index
// block will be a newly freed block
indexBlock.freeBlocks.append(m_headFreeIndexBlock);
}
}
else
m_headFreeIndexBlock = newBlock();
// Then, we need to write all the available blocks, which are safe to write
// to, and the pending free blocks, which are NOT safe to write to, to the
// FreeIndexBlock chain.
// Then, we need to write all the available blocks to the FreeIndexBlock chain.
while (true) {
if (indexBlock.freeBlocks.size() < maxFreeIndexLength() && (!m_availableBlocks.empty() || !m_pendingFree.empty())) {
// If we have room on our current FreeIndexblock, just add a block to
// it. Prioritize the pending free blocks, because we cannot use those
// to write to.
BlockIndex toAdd;
if (m_pendingFree.empty())
toAdd = m_availableBlocks.takeFirst();
else
toAdd = m_pendingFree.takeFirst();
// If we have room on our current FreeIndexBlock, just add a block to it.
if (!m_availableBlocks.empty() && indexBlock.freeBlocks.size() < maxFreeIndexLength()) {
BlockIndex toAdd = m_availableBlocks.takeFirst();
indexBlock.freeBlocks.append(toAdd);
} else {
// If our index block is full OR we are out of blocks to free, then
// need to write a new head free index block.
if (m_availableBlocks.empty())
m_headFreeIndexBlock = makeEndBlock();
else
m_headFreeIndexBlock = m_availableBlocks.takeFirst();
// Update the current head free index block.
writeFreeIndexBlock(m_headFreeIndexBlock, indexBlock);
// If we're out of blocks to free, then we're done
if (m_availableBlocks.empty() && m_pendingFree.empty())
if (m_availableBlocks.empty())
break;
indexBlock.nextFreeBlock = m_headFreeIndexBlock;
indexBlock.freeBlocks.clear();
// If our head free index block is full, then
// need to write a new head free index block.
if (indexBlock.freeBlocks.size() >= maxFreeIndexLength()) {
indexBlock.nextFreeBlock = m_headFreeIndexBlock;
indexBlock.freeBlocks.clear();
m_headFreeIndexBlock = newBlock();
writeFreeIndexBlock(m_headFreeIndexBlock, indexBlock);
}
}
}
}
commitWrites();
writeRoot();
m_uncommitted.clear();
}
void BTreeDatabase::commitWrites() {
for (auto& write : m_uncommittedWrites)
m_device->writeFullAbsolute(HeaderSize + write.first * (StreamOffset)m_blockSize, write.second.ptr(), m_blockSize);
m_device->sync();
m_uncommittedWrites.clear();
}
bool BTreeDatabase::tryFlatten() {
if (m_headFreeIndexBlock == InvalidBlockIndex || m_rootIsLeaf || !m_device->isWritable())
return false;
BlockIndex freeBlockCount = 0;
BlockIndex indexBlockIndex = m_headFreeIndexBlock;
while (indexBlockIndex != InvalidBlockIndex) {
FreeIndexBlock indexBlock = readFreeIndexBlock(indexBlockIndex);
freeBlockCount += 1 + indexBlock.freeBlocks.size();
indexBlockIndex = indexBlock.nextFreeBlock;
}
BlockIndex expectedBlockCount = (m_deviceSize - HeaderSize) / m_blockSize;
float free = float(freeBlockCount) / float(expectedBlockCount);
if (free < 0.05f)
return false;
Logger::info("[BTreeDatabase] File '{}' is {:.2f}% free space, flattening", m_device->deviceName(), free * 100.f);
indexBlockIndex = m_headFreeIndexBlock;
{
List<BlockIndex> availableBlocksList;
do {
FreeIndexBlock indexBlock = readFreeIndexBlock(indexBlockIndex);
availableBlocksList.appendAll(indexBlock.freeBlocks);
availableBlocksList.append(indexBlockIndex);
indexBlockIndex = indexBlock.nextFreeBlock;
} while (indexBlockIndex != InvalidBlockIndex);
m_headFreeIndexBlock = InvalidBlockIndex;
sort(availableBlocksList);
for (auto& availableBlock : availableBlocksList)
m_availableBlocks.insert(m_availableBlocks.end(), availableBlock);
}
BlockIndex count = 1; // 1 to include root index
double start = Time::monotonicTime();
auto index = m_impl.loadIndex(m_impl.rootPointer());
if (flattenVisitor(index, count)) {
m_impl.deleteIndex(index);
index->self = InvalidBlockIndex;
m_root = m_impl.storeIndex(index);
}
m_availableBlocks.clear();
m_device->resize(m_deviceSize = HeaderSize + (StreamOffset)m_blockSize * count);
m_indexCache.clear();
commitWrites();
writeRoot();
m_uncommitted.clear();
Logger::info("[BTreeDatabase] Finished flattening '{}' in {:.2f} milliseconds", m_device->deviceName(), (Time::monotonicTime() - start) * 1000.f);
return true;
}
bool BTreeDatabase::flattenVisitor(BTreeImpl::Index& index, BlockIndex& count) {
auto pointerCount = index->pointerCount();
count += pointerCount;
bool canStore = !m_availableBlocks.empty();
bool needsStore = false;
if (m_impl.indexLevel(index) == 0) {
for (size_t i = 0; i != pointerCount; ++i) {
auto indexPointer = index->pointer(i);
auto tailBlocks = leafTailBlocks(indexPointer);
if (canStore) {
bool leafNeedsStore = m_availableBlocks.first() < indexPointer;
if (!leafNeedsStore)
for (size_t i = 0; !leafNeedsStore && i != tailBlocks.size(); ++i)
if (m_availableBlocks.first() < tailBlocks[i])
leafNeedsStore = true;
if (leafNeedsStore) {
auto leaf = m_impl.loadLeaf(indexPointer);
m_impl.deleteLeaf(leaf);
leaf->self = InvalidBlockIndex;
index->updatePointer(i, m_impl.storeLeaf(leaf));
tailBlocks = leafTailBlocks(leaf->self);
needsStore = true;
}
canStore = !m_availableBlocks.empty();
}
count += tailBlocks.size();
}
} else {
for (size_t i = 0; i != pointerCount; ++i) {
auto childIndex = m_impl.loadIndex(index->pointer(i));
if (canStore && flattenVisitor(childIndex, count)) {
m_impl.deleteIndex(childIndex);
childIndex->self = InvalidBlockIndex;
index->updatePointer(i, m_impl.storeIndex(childIndex));
canStore = !m_availableBlocks.empty();
needsStore = true;
}
}
}
return needsStore || (canStore && m_availableBlocks.first() < index->self);
}
void BTreeDatabase::checkIfOpen(char const* methodName, bool shouldBeOpen) const {
if (shouldBeOpen && !m_open)
throw DBException::format("BTreeDatabase method '{}' called when not open, must be open.", methodName);
@ -1146,7 +1256,7 @@ void BTreeDatabase::checkKeySize(ByteArray const& k) const {
}
uint32_t BTreeDatabase::maxFreeIndexLength() const {
return (m_blockSize - 2 - sizeof(BlockIndex) - 4) / sizeof(BlockIndex);
return (m_blockSize / sizeof(BlockIndex)) - 2 - sizeof(BlockIndex) - 4;
}
BTreeSha256Database::BTreeSha256Database() {

View File

@ -230,7 +230,7 @@ private:
void updateBlock(BlockIndex blockIndex, ByteArray const& block);
void rawReadBlock(BlockIndex blockIndex, size_t blockOffset, char* block, size_t size) const;
void rawWriteBlock(BlockIndex blockIndex, size_t blockOffset, char const* block, size_t size) const;
void rawWriteBlock(BlockIndex blockIndex, size_t blockOffset, char const* block, size_t size);
void updateHeadFreeIndexBlock(BlockIndex newHead);
@ -251,6 +251,9 @@ private:
void writeRoot();
void readRoot();
void doCommit();
void commitWrites();
bool tryFlatten();
bool flattenVisitor(BTreeImpl::Index& index, BlockIndex& count);
void checkIfOpen(char const* methodName, bool shouldBeOpen) const;
void checkBlockIndex(size_t blockIndex) const;
@ -285,14 +288,14 @@ private:
bool m_dirty;
// Blocks that can be freely allocated and written to without violating
// atomic consistency
// atomic consistency.
Set<BlockIndex> m_availableBlocks;
// Blocks to be freed on next commit.
Deque<BlockIndex> m_pendingFree;
// Blocks that have been written in uncommitted portions of the tree.
Set<BlockIndex> m_uncommitted;
// Temporarily holds written data so that it can be rolled back.
mutable Map<BlockIndex, ByteArray> m_uncommittedWrites;
};
// Version of BTreeDatabase that hashes keys with SHA-256 to produce a unique