From 662b83ff92cc2316fb962ff1608359f6d705a5f0 Mon Sep 17 00:00:00 2001 From: Kae <80987908+Novaenia@users.noreply.github.com> Date: Thu, 14 Mar 2024 21:41:53 +1100 Subject: [PATCH] Initial commit of experimental zstd network compression currently a bit buggy --- source/CMakeLists.txt | 2 + source/core/CMakeLists.txt | 2 + source/core/StarBuffer.cpp | 4 + source/core/StarBuffer.hpp | 2 + source/core/StarCompression.cpp | 51 ++++--- source/core/StarCompression.hpp | 6 +- source/core/StarDataStreamDevices.cpp | 8 +- source/core/StarDataStreamDevices.hpp | 3 + source/core/StarZSTDCompression.cpp | 84 +++++++++++ source/core/StarZSTDCompression.hpp | 44 ++++++ source/game/StarNetPacketSocket.cpp | 202 ++++++++++++++++---------- source/game/StarNetPacketSocket.hpp | 20 ++- source/game/StarNetPackets.cpp | 14 +- source/game/StarNetPackets.hpp | 8 +- source/game/StarUniverseServer.cpp | 2 +- source/vcpkg.json | 3 +- 16 files changed, 346 insertions(+), 109 deletions(-) create mode 100644 source/core/StarZSTDCompression.cpp create mode 100644 source/core/StarZSTDCompression.hpp diff --git a/source/CMakeLists.txt b/source/CMakeLists.txt index 6ed5bb5..526d956 100644 --- a/source/CMakeLists.txt +++ b/source/CMakeLists.txt @@ -443,6 +443,7 @@ find_package(PNG REQUIRED) find_package(Freetype REQUIRED) find_package(Opus CONFIG REQUIRED) find_package(OggVorbis REQUIRED) +find_package(zstd CONFIG REQUIRED) include_directories(SYSTEM ${FREETYPE_INCLUDE_DIRS} @@ -453,6 +454,7 @@ set(STAR_EXT_LIBS ${STAR_EXT_LIBS} ZLIB::ZLIB PNG::PNG $,Freetype::Freetype,freetype> + $,zstd::libzstd_shared,zstd::libzstd_static> Opus::opus ${VORBISFILE_LIBRARY} ${VORBIS_LIBRARY} diff --git a/source/core/CMakeLists.txt b/source/core/CMakeLists.txt index 687b9c5..4723686 100644 --- a/source/core/CMakeLists.txt +++ b/source/core/CMakeLists.txt @@ -126,6 +126,7 @@ SET (star_core_HEADERS StarWeightedPool.hpp StarWorkerPool.hpp StarXXHash.hpp + StarZSTDCompression.hpp ) SET (star_core_SOURCES @@ -181,6 +182,7 @@ SET (star_core_SOURCES StarUnicode.cpp StarUuid.cpp StarWorkerPool.cpp + StarZSTDCompression.cpp ) IF (STAR_SYSTEM_FAMILY_UNIX) diff --git a/source/core/StarBuffer.cpp b/source/core/StarBuffer.cpp index 0d998af..5b5b2e4 100644 --- a/source/core/StarBuffer.cpp +++ b/source/core/StarBuffer.cpp @@ -263,6 +263,10 @@ bool ExternalBuffer::empty() const { return m_size == 0; } +ExternalBuffer::operator bool() const { + return m_size == 0; +} + void ExternalBuffer::reset(char const* externalData, size_t len) { m_pos = 0; m_bytes = externalData; diff --git a/source/core/StarBuffer.hpp b/source/core/StarBuffer.hpp index 0e1213e..0f9864a 100644 --- a/source/core/StarBuffer.hpp +++ b/source/core/StarBuffer.hpp @@ -105,6 +105,8 @@ public: // Clears buffer, moves position to 0. bool empty() const; + operator bool() const; + // Reset buffer with new contents, moves position to 0. void reset(char const* externalData, size_t len); diff --git a/source/core/StarCompression.cpp b/source/core/StarCompression.cpp index 2aa59b2..58b43ef 100644 --- a/source/core/StarCompression.cpp +++ b/source/core/StarCompression.cpp @@ -15,9 +15,9 @@ void compressData(ByteArray const& in, ByteArray& out, CompressionLevel compress return; const size_t BUFSIZE = 32 * 1024; - unsigned char temp_buffer[BUFSIZE]; + auto tempBuffer = std::make_unique(BUFSIZE); - z_stream strm; + z_stream strm{}; strm.zalloc = Z_NULL; strm.zfree = Z_NULL; strm.opaque = Z_NULL; @@ -27,13 +27,13 @@ void compressData(ByteArray const& in, ByteArray& out, CompressionLevel compress strm.next_in = (unsigned char*)in.ptr(); strm.avail_in = in.size(); - strm.next_out = temp_buffer; + strm.next_out = tempBuffer.get(); strm.avail_out = BUFSIZE; while (deflate_res == Z_OK) { deflate_res = deflate(&strm, Z_FINISH); if (strm.avail_out == 0) { - out.append((char const*)temp_buffer, BUFSIZE); - strm.next_out = temp_buffer; + out.append((char const*)tempBuffer.get(), BUFSIZE); + strm.next_out = tempBuffer.get(); strm.avail_out = BUFSIZE; } } @@ -42,7 +42,7 @@ void compressData(ByteArray const& in, ByteArray& out, CompressionLevel compress if (deflate_res != Z_STREAM_END) throw IOException(strf("Internal error in uncompressData, deflate_res is {}", deflate_res)); - out.append((char const*)temp_buffer, BUFSIZE - strm.avail_out); + out.append((char const*)tempBuffer.get(), BUFSIZE - strm.avail_out); } ByteArray compressData(ByteArray const& in, CompressionLevel compression) { @@ -51,16 +51,16 @@ ByteArray compressData(ByteArray const& in, CompressionLevel compression) { return out; } -void uncompressData(ByteArray const& in, ByteArray& out) { +void uncompressData(const char* in, size_t inLen, ByteArray& out, size_t limit) { out.clear(); - if (in.empty()) + if (!inLen) return; const size_t BUFSIZE = 32 * 1024; - unsigned char temp_buffer[BUFSIZE]; + auto tempBuffer = std::make_unique(BUFSIZE); - z_stream strm; + z_stream strm{}; strm.zalloc = Z_NULL; strm.zfree = Z_NULL; strm.opaque = Z_NULL; @@ -68,17 +68,22 @@ void uncompressData(ByteArray const& in, ByteArray& out) { if (inflate_res != Z_OK) throw IOException(strf("Failed to initialise inflate ({})", inflate_res)); - strm.next_in = (unsigned char*)in.ptr(); - strm.avail_in = in.size(); - strm.next_out = temp_buffer; + strm.next_in = (unsigned char*)in; + strm.avail_in = inLen; + strm.next_out = tempBuffer.get(); strm.avail_out = BUFSIZE; while (inflate_res == Z_OK || inflate_res == Z_BUF_ERROR) { inflate_res = inflate(&strm, Z_FINISH); if (strm.avail_out == 0) { - out.append((char const*)temp_buffer, BUFSIZE); - strm.next_out = temp_buffer; + out.append((char const*)tempBuffer.get(), BUFSIZE); + strm.next_out = tempBuffer.get(); strm.avail_out = BUFSIZE; + if (limit && out.size() >= limit) { + inflateEnd(&strm); + throw IOException(strf("hit uncompressData limit of {} bytes", limit)); + break; + } } else if (inflate_res == Z_BUF_ERROR) { break; } @@ -88,15 +93,23 @@ void uncompressData(ByteArray const& in, ByteArray& out) { if (inflate_res != Z_STREAM_END) throw IOException(strf("Internal error in uncompressData, inflate_res is {}", inflate_res)); - out.append((char const*)temp_buffer, BUFSIZE - strm.avail_out); + out.append((char const*)tempBuffer.get(), BUFSIZE - strm.avail_out); } -ByteArray uncompressData(ByteArray const& in) { - ByteArray out = ByteArray::withReserve(in.size()); - uncompressData(in, out); +ByteArray uncompressData(const char* in, size_t inLen, size_t limit) { + ByteArray out = ByteArray::withReserve(inLen); + uncompressData(in, inLen, out, limit); return out; } +void uncompressData(ByteArray const& in, ByteArray& out, size_t limit) { + uncompressData(in.ptr(), in.size(), out, limit); +} + +ByteArray uncompressData(ByteArray const& in, size_t limit) { + return uncompressData(in.ptr(), in.size(), limit); +} + CompressedFilePtr CompressedFile::open(String const& filename, IOMode mode, CompressionLevel comp) { CompressedFilePtr f = make_shared(filename); f->open(mode, comp); diff --git a/source/core/StarCompression.hpp b/source/core/StarCompression.hpp index 56dc774..3322662 100644 --- a/source/core/StarCompression.hpp +++ b/source/core/StarCompression.hpp @@ -17,8 +17,10 @@ CompressionLevel const HighCompression = 9; void compressData(ByteArray const& in, ByteArray& out, CompressionLevel compression = MediumCompression); ByteArray compressData(ByteArray const& in, CompressionLevel compression = MediumCompression); -void uncompressData(ByteArray const& in, ByteArray& out); -ByteArray uncompressData(ByteArray const& in); +void uncompressData(const char* in, size_t inLen, ByteArray& out, size_t limit = 0); +ByteArray uncompressData(const char* in, size_t inLen, size_t limit = 0); +void uncompressData(ByteArray const& in, ByteArray& out, size_t limit = 0); +ByteArray uncompressData(ByteArray const& in, size_t limit = 0); // Random access to a (potentially) compressed file. class CompressedFile : public IODevice { diff --git a/source/core/StarDataStreamDevices.cpp b/source/core/StarDataStreamDevices.cpp index 58ec920..b769167 100644 --- a/source/core/StarDataStreamDevices.cpp +++ b/source/core/StarDataStreamDevices.cpp @@ -130,7 +130,9 @@ void DataStreamBuffer::writeData(char const* data, size_t len) { m_buffer->writeFull(data, len); } -DataStreamExternalBuffer::DataStreamExternalBuffer() {} +DataStreamExternalBuffer::DataStreamExternalBuffer() : m_buffer() {} + +DataStreamExternalBuffer::DataStreamExternalBuffer(DataStreamBuffer const& buffer) : DataStreamExternalBuffer(buffer.ptr(), buffer.size()) {} DataStreamExternalBuffer::DataStreamExternalBuffer(char const* externalData, size_t len) : DataStreamExternalBuffer() { reset(externalData, len); @@ -160,6 +162,10 @@ size_t DataStreamExternalBuffer::pos() { return m_buffer.pos(); } +size_t DataStreamExternalBuffer::remaining() { + return m_buffer.dataSize() - m_buffer.pos(); +} + void DataStreamExternalBuffer::reset(char const* externalData, size_t len) { m_buffer.reset(externalData, len); } diff --git a/source/core/StarDataStreamDevices.hpp b/source/core/StarDataStreamDevices.hpp index dce7dcf..5d404ab 100644 --- a/source/core/StarDataStreamDevices.hpp +++ b/source/core/StarDataStreamDevices.hpp @@ -126,6 +126,8 @@ private: class DataStreamExternalBuffer : public DataStream { public: DataStreamExternalBuffer(); + DataStreamExternalBuffer(DataStreamBuffer const& buffer); + DataStreamExternalBuffer(DataStreamExternalBuffer const& buffer) = default; DataStreamExternalBuffer(char const* externalData, size_t len); char const* ptr() const; @@ -136,6 +138,7 @@ public: void seek(size_t pos, IOSeek mode = IOSeek::Absolute); bool atEnd(); size_t pos(); + size_t remaining(); void reset(char const* externalData, size_t len); diff --git a/source/core/StarZSTDCompression.cpp b/source/core/StarZSTDCompression.cpp new file mode 100644 index 0000000..733b182 --- /dev/null +++ b/source/core/StarZSTDCompression.cpp @@ -0,0 +1,84 @@ +#include "StarZSTDCompression.hpp" +#include + +namespace Star { + +CompressionStream::CompressionStream() : m_cStream(ZSTD_createCStream()) { + ZSTD_CCtx_setParameter(m_cStream, ZSTD_c_enableLongDistanceMatching, 1); + ZSTD_CCtx_setParameter(m_cStream, ZSTD_c_windowLog, 24); + ZSTD_initCStream(m_cStream, 2); +} + +CompressionStream::~CompressionStream() { ZSTD_freeCStream(m_cStream); } + +ByteArray CompressionStream::compress(const char* in, size_t inLen) { + size_t const cInSize = ZSTD_CStreamInSize (); + size_t const cOutSize = ZSTD_CStreamOutSize(); + ByteArray output(cOutSize, 0); + size_t written = 0, read = 0; + while (read < inLen) { + ZSTD_inBuffer inBuffer = {in + read, min(cInSize, inLen - read), 0}; + ZSTD_outBuffer outBuffer = {output.ptr() + written, output.size() - written, 0}; + bool finished = false; + do { + size_t ret = ZSTD_compressStream2(m_cStream, &outBuffer, &inBuffer, ZSTD_e_flush); + if (ZSTD_isError(ret)) { + throw IOException(strf("ZSTD compression error {}", ZSTD_getErrorName(ret))); + break; + } + + if (outBuffer.pos == outBuffer.size) { + output.resize(output.size() * 2); + outBuffer.dst = output.ptr(); + outBuffer.size = output.size(); + continue; + } + + finished = ret == 0 && inBuffer.pos == inBuffer.size; + } while (!finished); + + read += inBuffer.pos; + written += outBuffer.pos; + } + output.resize(written); + return output; +} + +DecompressionStream::DecompressionStream() : m_dStream(ZSTD_createDStream()) { + ZSTD_DCtx_setParameter(m_dStream, ZSTD_d_windowLogMax, 25); + ZSTD_initDStream(m_dStream); +} + +DecompressionStream::~DecompressionStream() { ZSTD_freeDStream(m_dStream); } + +ByteArray DecompressionStream::decompress(const char* in, size_t inLen) { + size_t const dInSize = ZSTD_DStreamInSize (); + size_t const dOutSize = ZSTD_DStreamOutSize(); + ByteArray output(dOutSize, 0); + size_t written = 0, read = 0; + while (read < inLen) { + ZSTD_inBuffer inBuffer = {in + read, min(dInSize, inLen - read), 0}; + ZSTD_outBuffer outBuffer = {output.ptr() + written, output.size() - written, 0}; + do { + size_t ret = ZSTD_decompressStream(m_dStream, &outBuffer, &inBuffer); + if (ZSTD_isError(ret)) { + throw IOException(strf("ZSTD decompression error {}", ZSTD_getErrorName(ret))); + break; + } + + if (outBuffer.pos == outBuffer.size) { + output.resize(output.size() * 2); + outBuffer.dst = output.ptr(); + outBuffer.size = output.size(); + continue; + } + } while (inBuffer.pos < inBuffer.size); + + read += inBuffer.pos; + written += outBuffer.pos; + } + output.resize(written); + return output; +} + +} \ No newline at end of file diff --git a/source/core/StarZSTDCompression.hpp b/source/core/StarZSTDCompression.hpp new file mode 100644 index 0000000..77719bf --- /dev/null +++ b/source/core/StarZSTDCompression.hpp @@ -0,0 +1,44 @@ +#pragma once +#include "StarByteArray.hpp" +#include "StarDataStreamDevices.hpp" + +typedef struct ZSTD_CCtx_s ZSTD_CCtx; +typedef struct ZSTD_DCtx_s ZSTD_DCtx; +typedef ZSTD_DCtx ZSTD_DStream; +typedef ZSTD_CCtx ZSTD_CStream; + +namespace Star { + +class CompressionStream { +public: + CompressionStream(); + ~CompressionStream(); + + ByteArray compress(const char* in, size_t inLen); + ByteArray compress(ByteArray const& in); + +private: + ZSTD_CStream* m_cStream; +}; + +inline ByteArray CompressionStream::compress(ByteArray const& in) { + return compress(in.ptr(), in.size()); +} + +class DecompressionStream { +public: + DecompressionStream(); + ~DecompressionStream(); + + ByteArray decompress(const char* in, size_t inLen); + ByteArray decompress(ByteArray const& in); + +private: + ZSTD_DStream* m_dStream; +}; + +inline ByteArray DecompressionStream::decompress(ByteArray const& in) { + return decompress(in.ptr(), in.size()); +} + +} \ No newline at end of file diff --git a/source/game/StarNetPacketSocket.cpp b/source/game/StarNetPacketSocket.cpp index 48b2082..2ebaadb 100644 --- a/source/game/StarNetPacketSocket.cpp +++ b/source/game/StarNetPacketSocket.cpp @@ -6,17 +6,27 @@ namespace Star { PacketStatCollector::PacketStatCollector(float calculationWindow) - : m_calculationWindow(calculationWindow), m_stats(), m_lastMixTime(0) {} + : m_calculationWindow(calculationWindow), m_stats(), m_totalBytes(0), m_lastMixTime(0) {} -void PacketStatCollector::mix(PacketType type, size_t size) { +void PacketStatCollector::mix(size_t size) { calculate(); - m_unmixed[type] += size; + m_totalBytes += size; } -void PacketStatCollector::mix(HashMap const& sizes) { +void PacketStatCollector::mix(PacketType type, size_t size, bool addToTotal) { calculate(); - for (auto const& p : sizes) + m_unmixed[type] += size; + if (addToTotal) + m_totalBytes += size; +} + +void PacketStatCollector::mix(HashMap const& sizes, bool addToTotal) { + calculate(); + for (auto const& p : sizes) { + if (addToTotal) + m_totalBytes += p.second; m_unmixed[p.first] += p.second; + } } PacketStats PacketStatCollector::stats() const { @@ -31,18 +41,19 @@ void PacketStatCollector::calculate() { m_lastMixTime = currentTime; m_stats.worstPacketSize = 0; - float total = 0.0f; + if (abs(elapsedTime) - m_calculationWindow < 0.0125f) + elapsedTime = m_calculationWindow; + for (auto& pair : m_unmixed) { if (pair.second > m_stats.worstPacketSize) { m_stats.worstPacketType = pair.first; m_stats.worstPacketSize = pair.second; } - auto& bytes = m_stats.packetBytesPerSecond[pair.first]; - bytes = pair.second / elapsedTime; - total += bytes; + m_stats.packetBytesPerSecond[pair.first] = round(pair.second / elapsedTime); } - m_stats.bytesPerSecond = total; + m_stats.bytesPerSecond = round(float(m_totalBytes) / elapsedTime); + m_totalBytes = 0; m_unmixed.clear(); } } @@ -138,67 +149,81 @@ void TcpPacketSocket::close() { void TcpPacketSocket::sendPackets(List packets) { auto it = makeSMutableIterator(packets); - - while (it.hasNext()) { - PacketType currentType = it.peekNext()->type(); - PacketCompressionMode currentCompressionMode = it.peekNext()->compressionMode(); - - DataStreamBuffer packetBuffer; - while (it.hasNext() - && it.peekNext()->type() == currentType - && it.peekNext()->compressionMode() == currentCompressionMode) { - if (legacy()) - it.next()->writeLegacy(packetBuffer); - else - it.next()->write(packetBuffer); - } - - // Packets must read and write actual data, because this is used to - // determine packet count - starAssert(!packetBuffer.empty()); - - ByteArray compressedPackets; - bool mustCompress = currentCompressionMode == PacketCompressionMode::Enabled; - bool perhapsCompress = currentCompressionMode == PacketCompressionMode::Automatic && packetBuffer.size() > 64; - if (mustCompress || perhapsCompress) - compressedPackets = compressData(packetBuffer.data()); - + if (m_useCompressionStream) { DataStreamBuffer outBuffer; - outBuffer.write(currentType); - - if (!compressedPackets.empty() && (mustCompress || compressedPackets.size() < packetBuffer.size())) { - outBuffer.writeVlqI(-(int)(compressedPackets.size())); - outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size()); - m_outgoingStats.mix(currentType, compressedPackets.size()); - } else { - outBuffer.writeVlqI((int)(packetBuffer.size())); + while (it.hasNext()) { + PacketPtr& packet = it.next(); + auto packetType = packet->type(); + DataStreamBuffer packetBuffer; + packet->write(packetBuffer); + outBuffer.write(packetType); + outBuffer.writeVlqI((int)packetBuffer.size()); outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size()); - m_outgoingStats.mix(currentType, packetBuffer.size()); + m_outgoingStats.mix(packetType, packetBuffer.size(), false); + } + m_outputBuffer.append(outBuffer.ptr(), outBuffer.size()); + } else { + while (it.hasNext()) { + PacketType currentType = it.peekNext()->type(); + PacketCompressionMode currentCompressionMode = it.peekNext()->compressionMode(); + + DataStreamBuffer packetBuffer; + while (it.hasNext() + && it.peekNext()->type() == currentType + && it.peekNext()->compressionMode() == currentCompressionMode) { + if (legacy()) + it.next()->writeLegacy(packetBuffer); + else + it.next()->write(packetBuffer); + } + + // Packets must read and write actual data, because this is used to + // determine packet count + starAssert(!packetBuffer.empty()); + + ByteArray compressedPackets; + bool mustCompress = currentCompressionMode == PacketCompressionMode::Enabled; + bool perhapsCompress = currentCompressionMode == PacketCompressionMode::Automatic && packetBuffer.size() > 64; + if (mustCompress || perhapsCompress) + compressedPackets = compressData(packetBuffer.data()); + + DataStreamBuffer outBuffer; + outBuffer.write(currentType); + + if (!compressedPackets.empty() && (mustCompress || compressedPackets.size() < packetBuffer.size())) { + outBuffer.writeVlqI(-(int)(compressedPackets.size())); + outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size()); + m_outgoingStats.mix(currentType, compressedPackets.size()); + } else { + outBuffer.writeVlqI((int)(packetBuffer.size())); + outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size()); + m_outgoingStats.mix(currentType, packetBuffer.size()); + } + m_outputBuffer.append(outBuffer.takeData()); } - m_outputBuffer.append(outBuffer.takeData()); } } List TcpPacketSocket::receivePackets() { - uint64_t const PacketSizeLimit = 64<<20; + // How large can uncompressed packets be + // this limit is now also used during decompression + uint64_t const PacketSizeLimit = 64 << 20; + // How many packets can be batched together into one compressed chunk at once + uint64_t const PacketBatchLimit = 131072; List packets; try { - while (!m_inputBuffer.empty()) { + DataStreamExternalBuffer ds(m_inputBuffer.ptr(), m_inputBuffer.size()); + bool atLeastOne = false; + while (!ds.atEnd()) { PacketType packetType; uint64_t packetSize = 0; bool packetCompressed = false; - DataStreamBuffer ds(m_inputBuffer); try { packetType = ds.read(); int64_t len = ds.readVlqI(); - if (len < 0) { - packetSize = -len; - packetCompressed = true; - } else { - packetSize = len; - packetCompressed = false; - } + packetCompressed = len < 0; + packetSize = packetCompressed ? -len : len; } catch (EofException const&) { // Guard against not having the entire packet header available when // trying to read. @@ -206,19 +231,28 @@ List TcpPacketSocket::receivePackets() { } if (packetSize > PacketSizeLimit) - throw IOException::format("Packet size {} exceeds maximum allowed packet size!", packetSize); + throw IOException::format("{} bytes large {} exceeds max size!", packetSize, PacketTypeNames.getRight(packetType)); if (packetSize > ds.size() - ds.pos()) break; - ByteArray packetBytes = ds.readBytes(packetSize); - if (packetCompressed) - packetBytes = uncompressData(packetBytes); + atLeastOne = true; + m_incomingStats.mix(packetType, packetSize, !m_useCompressionStream); - m_incomingStats.mix(packetType, packetSize); + DataStreamExternalBuffer packetStream(ds.ptr() + ds.pos(), packetSize); + ByteArray uncompressed; + if (packetCompressed) { + uncompressed = uncompressData(packetStream.ptr() + packetStream.pos(), packetSize, PacketSizeLimit); + packetStream.reset(uncompressed.ptr(), uncompressed.size()); + } + ds.seek(packetSize, IOSeek::Relative); - DataStreamBuffer packetStream(std::move(packetBytes)); + size_t count = 0; do { + if (++count > PacketBatchLimit) { + throw IOException::format("Packet batch limit {} reached while reading {}s!", PacketBatchLimit, PacketTypeNames.getRight(packetType)); + break; + } PacketPtr packet = createPacket(packetType); packet->setCompressionMode(packetCompressed ? PacketCompressionMode::Enabled : PacketCompressionMode::Disabled); if (legacy()) @@ -227,9 +261,9 @@ List TcpPacketSocket::receivePackets() { packet->read(packetStream); packets.append(std::move(packet)); } while (!packetStream.atEnd()); - - m_inputBuffer = ds.readBytes(ds.size() - ds.pos()); } + if (atLeastOne) + m_inputBuffer.trimLeft(ds.pos()); } catch (IOException const& e) { Logger::warn("I/O error in TcpPacketSocket::readPackets, closing: {}", outputException(e, false)); m_inputBuffer.clear(); @@ -248,15 +282,25 @@ bool TcpPacketSocket::writeData() { bool dataSent = false; try { - if (m_outputBuffer.empty()) - return false; + if (m_useCompressionStream) { + auto compressed = m_compressionStream.compress(m_outputBuffer); + m_outputBuffer.clear(); - while (!m_outputBuffer.empty()) { - size_t writtenAmount = m_socket->send(m_outputBuffer.ptr(), m_outputBuffer.size()); - if (writtenAmount == 0) - break; - dataSent = true; - m_outputBuffer.trimLeft(writtenAmount); + m_compressedBuffer.append(compressed.ptr(), compressed.size()); + size_t written = m_socket->send(m_compressedBuffer.ptr(), m_compressedBuffer.size()); + if (written > 0) { + dataSent = true; + m_compressedBuffer.trimLeft(written); + m_outgoingStats.mix(written); + } + } else { + while (!m_outputBuffer.empty()) { + size_t written = m_socket->send(m_outputBuffer.ptr(), m_outputBuffer.size()); + if (written == 0) + break; + dataSent = true; + m_outputBuffer.trimLeft(written); + } } } catch (SocketClosedException const& e) { Logger::debug("TcpPacketSocket socket closed: {}", outputException(e, false)); @@ -276,7 +320,13 @@ bool TcpPacketSocket::readData() { if (readAmount == 0) break; dataReceived = true; - m_inputBuffer.append(readBuffer, readAmount); + m_incomingStats.mix(readAmount); + if (m_useCompressionStream) { + auto decompressed = m_decompressionStream.decompress(readBuffer, readAmount); + m_inputBuffer.append(decompressed.ptr(), decompressed.size()); + } else { + m_inputBuffer.append(readBuffer, readAmount); + } } } catch (SocketClosedException const& e) { Logger::debug("TcpPacketSocket socket closed: {}", outputException(e, false)); @@ -295,8 +345,12 @@ Maybe TcpPacketSocket::outgoingStats() const { return m_outgoingStats.stats(); } -TcpPacketSocket::TcpPacketSocket(TcpSocketPtr socket) - : m_socket(std::move(socket)) {} +void TcpPacketSocket::setLegacy(bool legacy) { + m_useCompressionStream = !legacy; + PacketSocket::setLegacy(legacy); +} + +TcpPacketSocket::TcpPacketSocket(TcpSocketPtr socket) : m_socket(std::move(socket)) {} P2PPacketSocketUPtr P2PPacketSocket::open(P2PSocketUPtr socket) { return P2PPacketSocketUPtr(new P2PPacketSocket(std::move(socket))); diff --git a/source/game/StarNetPacketSocket.hpp b/source/game/StarNetPacketSocket.hpp index 3530310..dfffec6 100644 --- a/source/game/StarNetPacketSocket.hpp +++ b/source/game/StarNetPacketSocket.hpp @@ -4,6 +4,7 @@ #include "StarAtomicSharedPtr.hpp" #include "StarP2PNetworkingService.hpp" #include "StarNetPackets.hpp" +#include "StarZSTDCompression.hpp" namespace Star { @@ -24,10 +25,11 @@ class PacketStatCollector { public: PacketStatCollector(float calculationWindow = 1.0f); - void mix(PacketType type, size_t size); - void mix(HashMap const& sizes); + void mix(size_t size); + void mix(PacketType type, size_t size, bool addToTotal = true); + void mix(HashMap const& sizes, bool addToTotal = true); - // Should always return packet staticstics for the most recent completed + // Should always return packet statistics for the most recent completed // window of time PacketStats stats() const; @@ -37,6 +39,7 @@ private: float m_calculationWindow; PacketStats m_stats; Map m_unmixed; + size_t m_totalBytes; int64_t m_lastMixTime; }; @@ -72,10 +75,10 @@ public: virtual Maybe incomingStats() const; virtual Maybe outgoingStats() const; - void setLegacy(bool legacy); - bool legacy() const; + virtual void setLegacy(bool legacy); + virtual bool legacy() const; private: - bool m_legacy = false; + bool m_legacy = true; }; // PacketSocket for local communication. @@ -127,6 +130,7 @@ public: Maybe incomingStats() const override; Maybe outgoingStats() const override; + void setLegacy(bool legacy) override; private: TcpPacketSocket(TcpSocketPtr socket); @@ -136,6 +140,10 @@ private: PacketStatCollector m_outgoingStats; ByteArray m_outputBuffer; ByteArray m_inputBuffer; + bool m_useCompressionStream = false; + ByteArray m_compressedBuffer; + CompressionStream m_compressionStream; + DecompressionStream m_decompressionStream; }; // Wraps a P2PSocket into a PacketSocket diff --git a/source/game/StarNetPackets.cpp b/source/game/StarNetPackets.cpp index f787b1c..2f03283 100644 --- a/source/game/StarNetPackets.cpp +++ b/source/game/StarNetPackets.cpp @@ -341,7 +341,7 @@ ClientConnectPacket::ClientConnectPacket(ByteArray assetsDigest, bool allowAsset playerName(std::move(playerName)), playerSpecies(std::move(playerSpecies)), shipChunks(std::move(shipChunks)), shipUpgrades(std::move(shipUpgrades)), introComplete(std::move(introComplete)), account(std::move(account)) {} -void ClientConnectPacket::read(DataStream& ds) { +void ClientConnectPacket::readLegacy(DataStream& ds) { ds.read(assetsDigest); ds.read(allowAssetsMismatch); ds.read(playerUuid); @@ -353,7 +353,12 @@ void ClientConnectPacket::read(DataStream& ds) { ds.read(account); } -void ClientConnectPacket::write(DataStream& ds) const { +void ClientConnectPacket::read(DataStream& ds) { + readLegacy(ds); + ds.read(info); +} + +void ClientConnectPacket::writeLegacy(DataStream& ds) const { ds.write(assetsDigest); ds.write(allowAssetsMismatch); ds.write(playerUuid); @@ -365,6 +370,11 @@ void ClientConnectPacket::write(DataStream& ds) const { ds.write(account); } +void ClientConnectPacket::write(DataStream& ds) const { + writeLegacy(ds); + ds.write(info); +} + ClientDisconnectRequestPacket::ClientDisconnectRequestPacket() {} void ClientDisconnectRequestPacket::read(DataStream& ds) { diff --git a/source/game/StarNetPackets.hpp b/source/game/StarNetPackets.hpp index 98455e2..dd2acfa 100644 --- a/source/game/StarNetPackets.hpp +++ b/source/game/StarNetPackets.hpp @@ -127,11 +127,10 @@ struct Packet { virtual PacketType type() const = 0; - virtual void read(DataStream& ds) = 0; - virtual void write(DataStream& ds) const = 0; - virtual void readLegacy(DataStream& ds); + virtual void read(DataStream& ds) = 0; virtual void writeLegacy(DataStream& ds) const; + virtual void write(DataStream& ds) const = 0; PacketCompressionMode compressionMode() const; void setCompressionMode(PacketCompressionMode compressionMode); @@ -288,7 +287,9 @@ struct ClientConnectPacket : PacketBase { String playerSpecies, WorldChunks shipChunks, ShipUpgrades shipUpgrades, bool introComplete, String account); + void readLegacy(DataStream& ds) override; void read(DataStream& ds) override; + void writeLegacy(DataStream& ds) const override; void write(DataStream& ds) const override; ByteArray assetsDigest; @@ -300,6 +301,7 @@ struct ClientConnectPacket : PacketBase { ShipUpgrades shipUpgrades; bool introComplete; String account; + Json info; }; struct ClientDisconnectRequestPacket : PacketBase { diff --git a/source/game/StarUniverseServer.cpp b/source/game/StarUniverseServer.cpp index 783bde2..eafa844 100644 --- a/source/game/StarUniverseServer.cpp +++ b/source/game/StarUniverseServer.cpp @@ -1522,7 +1522,6 @@ void UniverseServer::acceptConnection(UniverseConnection connection, MaybecompressionMode() != PacketCompressionMode::Enabled; - connection.setLegacy(legacyClient); auto protocolResponse = make_shared(); protocolResponse->setCompressionMode(PacketCompressionMode::Enabled); // Signal that we're OpenStarbound @@ -1540,6 +1539,7 @@ void UniverseServer::acceptConnection(UniverseConnection connection, Maybeallowed = true; connection.pushSingle(protocolResponse); connection.sendAll(clientWaitLimit); + connection.setLegacy(legacyClient); String remoteAddressString = remoteAddress ? toString(*remoteAddress) : "local"; Logger::info("UniverseServer: Awaiting connection info from {}, {} client", remoteAddressString, legacyClient ? "Starbound" : "OpenStarbound"); diff --git a/source/vcpkg.json b/source/vcpkg.json index 1e0942c..6dcae93 100644 --- a/source/vcpkg.json +++ b/source/vcpkg.json @@ -7,6 +7,7 @@ "zlib", "freetype", "libpng", - "opus" + "opus", + "zstd" ] } \ No newline at end of file