Initial commit of experimental zstd network compression

currently a bit buggy
This commit is contained in:
Kae 2024-03-14 21:41:53 +11:00
parent 8164e5ae6f
commit 662b83ff92
16 changed files with 346 additions and 109 deletions

View File

@ -443,6 +443,7 @@ find_package(PNG REQUIRED)
find_package(Freetype REQUIRED) find_package(Freetype REQUIRED)
find_package(Opus CONFIG REQUIRED) find_package(Opus CONFIG REQUIRED)
find_package(OggVorbis REQUIRED) find_package(OggVorbis REQUIRED)
find_package(zstd CONFIG REQUIRED)
include_directories(SYSTEM include_directories(SYSTEM
${FREETYPE_INCLUDE_DIRS} ${FREETYPE_INCLUDE_DIRS}
@ -453,6 +454,7 @@ set(STAR_EXT_LIBS ${STAR_EXT_LIBS}
ZLIB::ZLIB ZLIB::ZLIB
PNG::PNG PNG::PNG
$<IF:$<TARGET_EXISTS:Freetype::Freetype>,Freetype::Freetype,freetype> $<IF:$<TARGET_EXISTS:Freetype::Freetype>,Freetype::Freetype,freetype>
$<IF:$<TARGET_EXISTS:zstd::libzstd_shared>,zstd::libzstd_shared,zstd::libzstd_static>
Opus::opus Opus::opus
${VORBISFILE_LIBRARY} ${VORBISFILE_LIBRARY}
${VORBIS_LIBRARY} ${VORBIS_LIBRARY}

View File

@ -126,6 +126,7 @@ SET (star_core_HEADERS
StarWeightedPool.hpp StarWeightedPool.hpp
StarWorkerPool.hpp StarWorkerPool.hpp
StarXXHash.hpp StarXXHash.hpp
StarZSTDCompression.hpp
) )
SET (star_core_SOURCES SET (star_core_SOURCES
@ -181,6 +182,7 @@ SET (star_core_SOURCES
StarUnicode.cpp StarUnicode.cpp
StarUuid.cpp StarUuid.cpp
StarWorkerPool.cpp StarWorkerPool.cpp
StarZSTDCompression.cpp
) )
IF (STAR_SYSTEM_FAMILY_UNIX) IF (STAR_SYSTEM_FAMILY_UNIX)

View File

@ -263,6 +263,10 @@ bool ExternalBuffer::empty() const {
return m_size == 0; return m_size == 0;
} }
ExternalBuffer::operator bool() const {
return m_size == 0;
}
void ExternalBuffer::reset(char const* externalData, size_t len) { void ExternalBuffer::reset(char const* externalData, size_t len) {
m_pos = 0; m_pos = 0;
m_bytes = externalData; m_bytes = externalData;

View File

@ -105,6 +105,8 @@ public:
// Clears buffer, moves position to 0. // Clears buffer, moves position to 0.
bool empty() const; bool empty() const;
operator bool() const;
// Reset buffer with new contents, moves position to 0. // Reset buffer with new contents, moves position to 0.
void reset(char const* externalData, size_t len); void reset(char const* externalData, size_t len);

View File

@ -15,9 +15,9 @@ void compressData(ByteArray const& in, ByteArray& out, CompressionLevel compress
return; return;
const size_t BUFSIZE = 32 * 1024; const size_t BUFSIZE = 32 * 1024;
unsigned char temp_buffer[BUFSIZE]; auto tempBuffer = std::make_unique<unsigned char[]>(BUFSIZE);
z_stream strm; z_stream strm{};
strm.zalloc = Z_NULL; strm.zalloc = Z_NULL;
strm.zfree = Z_NULL; strm.zfree = Z_NULL;
strm.opaque = Z_NULL; strm.opaque = Z_NULL;
@ -27,13 +27,13 @@ void compressData(ByteArray const& in, ByteArray& out, CompressionLevel compress
strm.next_in = (unsigned char*)in.ptr(); strm.next_in = (unsigned char*)in.ptr();
strm.avail_in = in.size(); strm.avail_in = in.size();
strm.next_out = temp_buffer; strm.next_out = tempBuffer.get();
strm.avail_out = BUFSIZE; strm.avail_out = BUFSIZE;
while (deflate_res == Z_OK) { while (deflate_res == Z_OK) {
deflate_res = deflate(&strm, Z_FINISH); deflate_res = deflate(&strm, Z_FINISH);
if (strm.avail_out == 0) { if (strm.avail_out == 0) {
out.append((char const*)temp_buffer, BUFSIZE); out.append((char const*)tempBuffer.get(), BUFSIZE);
strm.next_out = temp_buffer; strm.next_out = tempBuffer.get();
strm.avail_out = BUFSIZE; strm.avail_out = BUFSIZE;
} }
} }
@ -42,7 +42,7 @@ void compressData(ByteArray const& in, ByteArray& out, CompressionLevel compress
if (deflate_res != Z_STREAM_END) if (deflate_res != Z_STREAM_END)
throw IOException(strf("Internal error in uncompressData, deflate_res is {}", deflate_res)); throw IOException(strf("Internal error in uncompressData, deflate_res is {}", deflate_res));
out.append((char const*)temp_buffer, BUFSIZE - strm.avail_out); out.append((char const*)tempBuffer.get(), BUFSIZE - strm.avail_out);
} }
ByteArray compressData(ByteArray const& in, CompressionLevel compression) { ByteArray compressData(ByteArray const& in, CompressionLevel compression) {
@ -51,16 +51,16 @@ ByteArray compressData(ByteArray const& in, CompressionLevel compression) {
return out; return out;
} }
void uncompressData(ByteArray const& in, ByteArray& out) { void uncompressData(const char* in, size_t inLen, ByteArray& out, size_t limit) {
out.clear(); out.clear();
if (in.empty()) if (!inLen)
return; return;
const size_t BUFSIZE = 32 * 1024; const size_t BUFSIZE = 32 * 1024;
unsigned char temp_buffer[BUFSIZE]; auto tempBuffer = std::make_unique<unsigned char[]>(BUFSIZE);
z_stream strm; z_stream strm{};
strm.zalloc = Z_NULL; strm.zalloc = Z_NULL;
strm.zfree = Z_NULL; strm.zfree = Z_NULL;
strm.opaque = Z_NULL; strm.opaque = Z_NULL;
@ -68,17 +68,22 @@ void uncompressData(ByteArray const& in, ByteArray& out) {
if (inflate_res != Z_OK) if (inflate_res != Z_OK)
throw IOException(strf("Failed to initialise inflate ({})", inflate_res)); throw IOException(strf("Failed to initialise inflate ({})", inflate_res));
strm.next_in = (unsigned char*)in.ptr(); strm.next_in = (unsigned char*)in;
strm.avail_in = in.size(); strm.avail_in = inLen;
strm.next_out = temp_buffer; strm.next_out = tempBuffer.get();
strm.avail_out = BUFSIZE; strm.avail_out = BUFSIZE;
while (inflate_res == Z_OK || inflate_res == Z_BUF_ERROR) { while (inflate_res == Z_OK || inflate_res == Z_BUF_ERROR) {
inflate_res = inflate(&strm, Z_FINISH); inflate_res = inflate(&strm, Z_FINISH);
if (strm.avail_out == 0) { if (strm.avail_out == 0) {
out.append((char const*)temp_buffer, BUFSIZE); out.append((char const*)tempBuffer.get(), BUFSIZE);
strm.next_out = temp_buffer; strm.next_out = tempBuffer.get();
strm.avail_out = BUFSIZE; strm.avail_out = BUFSIZE;
if (limit && out.size() >= limit) {
inflateEnd(&strm);
throw IOException(strf("hit uncompressData limit of {} bytes", limit));
break;
}
} else if (inflate_res == Z_BUF_ERROR) { } else if (inflate_res == Z_BUF_ERROR) {
break; break;
} }
@ -88,15 +93,23 @@ void uncompressData(ByteArray const& in, ByteArray& out) {
if (inflate_res != Z_STREAM_END) if (inflate_res != Z_STREAM_END)
throw IOException(strf("Internal error in uncompressData, inflate_res is {}", inflate_res)); throw IOException(strf("Internal error in uncompressData, inflate_res is {}", inflate_res));
out.append((char const*)temp_buffer, BUFSIZE - strm.avail_out); out.append((char const*)tempBuffer.get(), BUFSIZE - strm.avail_out);
} }
ByteArray uncompressData(ByteArray const& in) { ByteArray uncompressData(const char* in, size_t inLen, size_t limit) {
ByteArray out = ByteArray::withReserve(in.size()); ByteArray out = ByteArray::withReserve(inLen);
uncompressData(in, out); uncompressData(in, inLen, out, limit);
return out; return out;
} }
void uncompressData(ByteArray const& in, ByteArray& out, size_t limit) {
uncompressData(in.ptr(), in.size(), out, limit);
}
ByteArray uncompressData(ByteArray const& in, size_t limit) {
return uncompressData(in.ptr(), in.size(), limit);
}
CompressedFilePtr CompressedFile::open(String const& filename, IOMode mode, CompressionLevel comp) { CompressedFilePtr CompressedFile::open(String const& filename, IOMode mode, CompressionLevel comp) {
CompressedFilePtr f = make_shared<CompressedFile>(filename); CompressedFilePtr f = make_shared<CompressedFile>(filename);
f->open(mode, comp); f->open(mode, comp);

View File

@ -17,8 +17,10 @@ CompressionLevel const HighCompression = 9;
void compressData(ByteArray const& in, ByteArray& out, CompressionLevel compression = MediumCompression); void compressData(ByteArray const& in, ByteArray& out, CompressionLevel compression = MediumCompression);
ByteArray compressData(ByteArray const& in, CompressionLevel compression = MediumCompression); ByteArray compressData(ByteArray const& in, CompressionLevel compression = MediumCompression);
void uncompressData(ByteArray const& in, ByteArray& out); void uncompressData(const char* in, size_t inLen, ByteArray& out, size_t limit = 0);
ByteArray uncompressData(ByteArray const& in); ByteArray uncompressData(const char* in, size_t inLen, size_t limit = 0);
void uncompressData(ByteArray const& in, ByteArray& out, size_t limit = 0);
ByteArray uncompressData(ByteArray const& in, size_t limit = 0);
// Random access to a (potentially) compressed file. // Random access to a (potentially) compressed file.
class CompressedFile : public IODevice { class CompressedFile : public IODevice {

View File

@ -130,7 +130,9 @@ void DataStreamBuffer::writeData(char const* data, size_t len) {
m_buffer->writeFull(data, len); m_buffer->writeFull(data, len);
} }
DataStreamExternalBuffer::DataStreamExternalBuffer() {} DataStreamExternalBuffer::DataStreamExternalBuffer() : m_buffer() {}
DataStreamExternalBuffer::DataStreamExternalBuffer(DataStreamBuffer const& buffer) : DataStreamExternalBuffer(buffer.ptr(), buffer.size()) {}
DataStreamExternalBuffer::DataStreamExternalBuffer(char const* externalData, size_t len) : DataStreamExternalBuffer() { DataStreamExternalBuffer::DataStreamExternalBuffer(char const* externalData, size_t len) : DataStreamExternalBuffer() {
reset(externalData, len); reset(externalData, len);
@ -160,6 +162,10 @@ size_t DataStreamExternalBuffer::pos() {
return m_buffer.pos(); return m_buffer.pos();
} }
size_t DataStreamExternalBuffer::remaining() {
return m_buffer.dataSize() - m_buffer.pos();
}
void DataStreamExternalBuffer::reset(char const* externalData, size_t len) { void DataStreamExternalBuffer::reset(char const* externalData, size_t len) {
m_buffer.reset(externalData, len); m_buffer.reset(externalData, len);
} }

View File

@ -126,6 +126,8 @@ private:
class DataStreamExternalBuffer : public DataStream { class DataStreamExternalBuffer : public DataStream {
public: public:
DataStreamExternalBuffer(); DataStreamExternalBuffer();
DataStreamExternalBuffer(DataStreamBuffer const& buffer);
DataStreamExternalBuffer(DataStreamExternalBuffer const& buffer) = default;
DataStreamExternalBuffer(char const* externalData, size_t len); DataStreamExternalBuffer(char const* externalData, size_t len);
char const* ptr() const; char const* ptr() const;
@ -136,6 +138,7 @@ public:
void seek(size_t pos, IOSeek mode = IOSeek::Absolute); void seek(size_t pos, IOSeek mode = IOSeek::Absolute);
bool atEnd(); bool atEnd();
size_t pos(); size_t pos();
size_t remaining();
void reset(char const* externalData, size_t len); void reset(char const* externalData, size_t len);

View File

@ -0,0 +1,84 @@
#include "StarZSTDCompression.hpp"
#include <zstd.h>
namespace Star {
CompressionStream::CompressionStream() : m_cStream(ZSTD_createCStream()) {
ZSTD_CCtx_setParameter(m_cStream, ZSTD_c_enableLongDistanceMatching, 1);
ZSTD_CCtx_setParameter(m_cStream, ZSTD_c_windowLog, 24);
ZSTD_initCStream(m_cStream, 2);
}
CompressionStream::~CompressionStream() { ZSTD_freeCStream(m_cStream); }
ByteArray CompressionStream::compress(const char* in, size_t inLen) {
size_t const cInSize = ZSTD_CStreamInSize ();
size_t const cOutSize = ZSTD_CStreamOutSize();
ByteArray output(cOutSize, 0);
size_t written = 0, read = 0;
while (read < inLen) {
ZSTD_inBuffer inBuffer = {in + read, min(cInSize, inLen - read), 0};
ZSTD_outBuffer outBuffer = {output.ptr() + written, output.size() - written, 0};
bool finished = false;
do {
size_t ret = ZSTD_compressStream2(m_cStream, &outBuffer, &inBuffer, ZSTD_e_flush);
if (ZSTD_isError(ret)) {
throw IOException(strf("ZSTD compression error {}", ZSTD_getErrorName(ret)));
break;
}
if (outBuffer.pos == outBuffer.size) {
output.resize(output.size() * 2);
outBuffer.dst = output.ptr();
outBuffer.size = output.size();
continue;
}
finished = ret == 0 && inBuffer.pos == inBuffer.size;
} while (!finished);
read += inBuffer.pos;
written += outBuffer.pos;
}
output.resize(written);
return output;
}
DecompressionStream::DecompressionStream() : m_dStream(ZSTD_createDStream()) {
ZSTD_DCtx_setParameter(m_dStream, ZSTD_d_windowLogMax, 25);
ZSTD_initDStream(m_dStream);
}
DecompressionStream::~DecompressionStream() { ZSTD_freeDStream(m_dStream); }
ByteArray DecompressionStream::decompress(const char* in, size_t inLen) {
size_t const dInSize = ZSTD_DStreamInSize ();
size_t const dOutSize = ZSTD_DStreamOutSize();
ByteArray output(dOutSize, 0);
size_t written = 0, read = 0;
while (read < inLen) {
ZSTD_inBuffer inBuffer = {in + read, min(dInSize, inLen - read), 0};
ZSTD_outBuffer outBuffer = {output.ptr() + written, output.size() - written, 0};
do {
size_t ret = ZSTD_decompressStream(m_dStream, &outBuffer, &inBuffer);
if (ZSTD_isError(ret)) {
throw IOException(strf("ZSTD decompression error {}", ZSTD_getErrorName(ret)));
break;
}
if (outBuffer.pos == outBuffer.size) {
output.resize(output.size() * 2);
outBuffer.dst = output.ptr();
outBuffer.size = output.size();
continue;
}
} while (inBuffer.pos < inBuffer.size);
read += inBuffer.pos;
written += outBuffer.pos;
}
output.resize(written);
return output;
}
}

View File

@ -0,0 +1,44 @@
#pragma once
#include "StarByteArray.hpp"
#include "StarDataStreamDevices.hpp"
typedef struct ZSTD_CCtx_s ZSTD_CCtx;
typedef struct ZSTD_DCtx_s ZSTD_DCtx;
typedef ZSTD_DCtx ZSTD_DStream;
typedef ZSTD_CCtx ZSTD_CStream;
namespace Star {
class CompressionStream {
public:
CompressionStream();
~CompressionStream();
ByteArray compress(const char* in, size_t inLen);
ByteArray compress(ByteArray const& in);
private:
ZSTD_CStream* m_cStream;
};
inline ByteArray CompressionStream::compress(ByteArray const& in) {
return compress(in.ptr(), in.size());
}
class DecompressionStream {
public:
DecompressionStream();
~DecompressionStream();
ByteArray decompress(const char* in, size_t inLen);
ByteArray decompress(ByteArray const& in);
private:
ZSTD_DStream* m_dStream;
};
inline ByteArray DecompressionStream::decompress(ByteArray const& in) {
return decompress(in.ptr(), in.size());
}
}

View File

@ -6,17 +6,27 @@
namespace Star { namespace Star {
PacketStatCollector::PacketStatCollector(float calculationWindow) PacketStatCollector::PacketStatCollector(float calculationWindow)
: m_calculationWindow(calculationWindow), m_stats(), m_lastMixTime(0) {} : m_calculationWindow(calculationWindow), m_stats(), m_totalBytes(0), m_lastMixTime(0) {}
void PacketStatCollector::mix(PacketType type, size_t size) { void PacketStatCollector::mix(size_t size) {
calculate(); calculate();
m_unmixed[type] += size; m_totalBytes += size;
} }
void PacketStatCollector::mix(HashMap<PacketType, size_t> const& sizes) { void PacketStatCollector::mix(PacketType type, size_t size, bool addToTotal) {
calculate(); calculate();
for (auto const& p : sizes) m_unmixed[type] += size;
if (addToTotal)
m_totalBytes += size;
}
void PacketStatCollector::mix(HashMap<PacketType, size_t> const& sizes, bool addToTotal) {
calculate();
for (auto const& p : sizes) {
if (addToTotal)
m_totalBytes += p.second;
m_unmixed[p.first] += p.second; m_unmixed[p.first] += p.second;
}
} }
PacketStats PacketStatCollector::stats() const { PacketStats PacketStatCollector::stats() const {
@ -31,18 +41,19 @@ void PacketStatCollector::calculate() {
m_lastMixTime = currentTime; m_lastMixTime = currentTime;
m_stats.worstPacketSize = 0; m_stats.worstPacketSize = 0;
float total = 0.0f; if (abs(elapsedTime) - m_calculationWindow < 0.0125f)
elapsedTime = m_calculationWindow;
for (auto& pair : m_unmixed) { for (auto& pair : m_unmixed) {
if (pair.second > m_stats.worstPacketSize) { if (pair.second > m_stats.worstPacketSize) {
m_stats.worstPacketType = pair.first; m_stats.worstPacketType = pair.first;
m_stats.worstPacketSize = pair.second; m_stats.worstPacketSize = pair.second;
} }
auto& bytes = m_stats.packetBytesPerSecond[pair.first]; m_stats.packetBytesPerSecond[pair.first] = round(pair.second / elapsedTime);
bytes = pair.second / elapsedTime;
total += bytes;
} }
m_stats.bytesPerSecond = total; m_stats.bytesPerSecond = round(float(m_totalBytes) / elapsedTime);
m_totalBytes = 0;
m_unmixed.clear(); m_unmixed.clear();
} }
} }
@ -138,67 +149,81 @@ void TcpPacketSocket::close() {
void TcpPacketSocket::sendPackets(List<PacketPtr> packets) { void TcpPacketSocket::sendPackets(List<PacketPtr> packets) {
auto it = makeSMutableIterator(packets); auto it = makeSMutableIterator(packets);
if (m_useCompressionStream) {
while (it.hasNext()) {
PacketType currentType = it.peekNext()->type();
PacketCompressionMode currentCompressionMode = it.peekNext()->compressionMode();
DataStreamBuffer packetBuffer;
while (it.hasNext()
&& it.peekNext()->type() == currentType
&& it.peekNext()->compressionMode() == currentCompressionMode) {
if (legacy())
it.next()->writeLegacy(packetBuffer);
else
it.next()->write(packetBuffer);
}
// Packets must read and write actual data, because this is used to
// determine packet count
starAssert(!packetBuffer.empty());
ByteArray compressedPackets;
bool mustCompress = currentCompressionMode == PacketCompressionMode::Enabled;
bool perhapsCompress = currentCompressionMode == PacketCompressionMode::Automatic && packetBuffer.size() > 64;
if (mustCompress || perhapsCompress)
compressedPackets = compressData(packetBuffer.data());
DataStreamBuffer outBuffer; DataStreamBuffer outBuffer;
outBuffer.write(currentType); while (it.hasNext()) {
PacketPtr& packet = it.next();
if (!compressedPackets.empty() && (mustCompress || compressedPackets.size() < packetBuffer.size())) { auto packetType = packet->type();
outBuffer.writeVlqI(-(int)(compressedPackets.size())); DataStreamBuffer packetBuffer;
outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size()); packet->write(packetBuffer);
m_outgoingStats.mix(currentType, compressedPackets.size()); outBuffer.write(packetType);
} else { outBuffer.writeVlqI((int)packetBuffer.size());
outBuffer.writeVlqI((int)(packetBuffer.size()));
outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size()); outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size());
m_outgoingStats.mix(currentType, packetBuffer.size()); m_outgoingStats.mix(packetType, packetBuffer.size(), false);
}
m_outputBuffer.append(outBuffer.ptr(), outBuffer.size());
} else {
while (it.hasNext()) {
PacketType currentType = it.peekNext()->type();
PacketCompressionMode currentCompressionMode = it.peekNext()->compressionMode();
DataStreamBuffer packetBuffer;
while (it.hasNext()
&& it.peekNext()->type() == currentType
&& it.peekNext()->compressionMode() == currentCompressionMode) {
if (legacy())
it.next()->writeLegacy(packetBuffer);
else
it.next()->write(packetBuffer);
}
// Packets must read and write actual data, because this is used to
// determine packet count
starAssert(!packetBuffer.empty());
ByteArray compressedPackets;
bool mustCompress = currentCompressionMode == PacketCompressionMode::Enabled;
bool perhapsCompress = currentCompressionMode == PacketCompressionMode::Automatic && packetBuffer.size() > 64;
if (mustCompress || perhapsCompress)
compressedPackets = compressData(packetBuffer.data());
DataStreamBuffer outBuffer;
outBuffer.write(currentType);
if (!compressedPackets.empty() && (mustCompress || compressedPackets.size() < packetBuffer.size())) {
outBuffer.writeVlqI(-(int)(compressedPackets.size()));
outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size());
m_outgoingStats.mix(currentType, compressedPackets.size());
} else {
outBuffer.writeVlqI((int)(packetBuffer.size()));
outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size());
m_outgoingStats.mix(currentType, packetBuffer.size());
}
m_outputBuffer.append(outBuffer.takeData());
} }
m_outputBuffer.append(outBuffer.takeData());
} }
} }
List<PacketPtr> TcpPacketSocket::receivePackets() { List<PacketPtr> TcpPacketSocket::receivePackets() {
uint64_t const PacketSizeLimit = 64<<20; // How large can uncompressed packets be
// this limit is now also used during decompression
uint64_t const PacketSizeLimit = 64 << 20;
// How many packets can be batched together into one compressed chunk at once
uint64_t const PacketBatchLimit = 131072;
List<PacketPtr> packets; List<PacketPtr> packets;
try { try {
while (!m_inputBuffer.empty()) { DataStreamExternalBuffer ds(m_inputBuffer.ptr(), m_inputBuffer.size());
bool atLeastOne = false;
while (!ds.atEnd()) {
PacketType packetType; PacketType packetType;
uint64_t packetSize = 0; uint64_t packetSize = 0;
bool packetCompressed = false; bool packetCompressed = false;
DataStreamBuffer ds(m_inputBuffer);
try { try {
packetType = ds.read<PacketType>(); packetType = ds.read<PacketType>();
int64_t len = ds.readVlqI(); int64_t len = ds.readVlqI();
if (len < 0) { packetCompressed = len < 0;
packetSize = -len; packetSize = packetCompressed ? -len : len;
packetCompressed = true;
} else {
packetSize = len;
packetCompressed = false;
}
} catch (EofException const&) { } catch (EofException const&) {
// Guard against not having the entire packet header available when // Guard against not having the entire packet header available when
// trying to read. // trying to read.
@ -206,19 +231,28 @@ List<PacketPtr> TcpPacketSocket::receivePackets() {
} }
if (packetSize > PacketSizeLimit) if (packetSize > PacketSizeLimit)
throw IOException::format("Packet size {} exceeds maximum allowed packet size!", packetSize); throw IOException::format("{} bytes large {} exceeds max size!", packetSize, PacketTypeNames.getRight(packetType));
if (packetSize > ds.size() - ds.pos()) if (packetSize > ds.size() - ds.pos())
break; break;
ByteArray packetBytes = ds.readBytes(packetSize); atLeastOne = true;
if (packetCompressed) m_incomingStats.mix(packetType, packetSize, !m_useCompressionStream);
packetBytes = uncompressData(packetBytes);
m_incomingStats.mix(packetType, packetSize); DataStreamExternalBuffer packetStream(ds.ptr() + ds.pos(), packetSize);
ByteArray uncompressed;
if (packetCompressed) {
uncompressed = uncompressData(packetStream.ptr() + packetStream.pos(), packetSize, PacketSizeLimit);
packetStream.reset(uncompressed.ptr(), uncompressed.size());
}
ds.seek(packetSize, IOSeek::Relative);
DataStreamBuffer packetStream(std::move(packetBytes)); size_t count = 0;
do { do {
if (++count > PacketBatchLimit) {
throw IOException::format("Packet batch limit {} reached while reading {}s!", PacketBatchLimit, PacketTypeNames.getRight(packetType));
break;
}
PacketPtr packet = createPacket(packetType); PacketPtr packet = createPacket(packetType);
packet->setCompressionMode(packetCompressed ? PacketCompressionMode::Enabled : PacketCompressionMode::Disabled); packet->setCompressionMode(packetCompressed ? PacketCompressionMode::Enabled : PacketCompressionMode::Disabled);
if (legacy()) if (legacy())
@ -227,9 +261,9 @@ List<PacketPtr> TcpPacketSocket::receivePackets() {
packet->read(packetStream); packet->read(packetStream);
packets.append(std::move(packet)); packets.append(std::move(packet));
} while (!packetStream.atEnd()); } while (!packetStream.atEnd());
m_inputBuffer = ds.readBytes(ds.size() - ds.pos());
} }
if (atLeastOne)
m_inputBuffer.trimLeft(ds.pos());
} catch (IOException const& e) { } catch (IOException const& e) {
Logger::warn("I/O error in TcpPacketSocket::readPackets, closing: {}", outputException(e, false)); Logger::warn("I/O error in TcpPacketSocket::readPackets, closing: {}", outputException(e, false));
m_inputBuffer.clear(); m_inputBuffer.clear();
@ -248,15 +282,25 @@ bool TcpPacketSocket::writeData() {
bool dataSent = false; bool dataSent = false;
try { try {
if (m_outputBuffer.empty()) if (m_useCompressionStream) {
return false; auto compressed = m_compressionStream.compress(m_outputBuffer);
m_outputBuffer.clear();
while (!m_outputBuffer.empty()) { m_compressedBuffer.append(compressed.ptr(), compressed.size());
size_t writtenAmount = m_socket->send(m_outputBuffer.ptr(), m_outputBuffer.size()); size_t written = m_socket->send(m_compressedBuffer.ptr(), m_compressedBuffer.size());
if (writtenAmount == 0) if (written > 0) {
break; dataSent = true;
dataSent = true; m_compressedBuffer.trimLeft(written);
m_outputBuffer.trimLeft(writtenAmount); m_outgoingStats.mix(written);
}
} else {
while (!m_outputBuffer.empty()) {
size_t written = m_socket->send(m_outputBuffer.ptr(), m_outputBuffer.size());
if (written == 0)
break;
dataSent = true;
m_outputBuffer.trimLeft(written);
}
} }
} catch (SocketClosedException const& e) { } catch (SocketClosedException const& e) {
Logger::debug("TcpPacketSocket socket closed: {}", outputException(e, false)); Logger::debug("TcpPacketSocket socket closed: {}", outputException(e, false));
@ -276,7 +320,13 @@ bool TcpPacketSocket::readData() {
if (readAmount == 0) if (readAmount == 0)
break; break;
dataReceived = true; dataReceived = true;
m_inputBuffer.append(readBuffer, readAmount); m_incomingStats.mix(readAmount);
if (m_useCompressionStream) {
auto decompressed = m_decompressionStream.decompress(readBuffer, readAmount);
m_inputBuffer.append(decompressed.ptr(), decompressed.size());
} else {
m_inputBuffer.append(readBuffer, readAmount);
}
} }
} catch (SocketClosedException const& e) { } catch (SocketClosedException const& e) {
Logger::debug("TcpPacketSocket socket closed: {}", outputException(e, false)); Logger::debug("TcpPacketSocket socket closed: {}", outputException(e, false));
@ -295,8 +345,12 @@ Maybe<PacketStats> TcpPacketSocket::outgoingStats() const {
return m_outgoingStats.stats(); return m_outgoingStats.stats();
} }
TcpPacketSocket::TcpPacketSocket(TcpSocketPtr socket) void TcpPacketSocket::setLegacy(bool legacy) {
: m_socket(std::move(socket)) {} m_useCompressionStream = !legacy;
PacketSocket::setLegacy(legacy);
}
TcpPacketSocket::TcpPacketSocket(TcpSocketPtr socket) : m_socket(std::move(socket)) {}
P2PPacketSocketUPtr P2PPacketSocket::open(P2PSocketUPtr socket) { P2PPacketSocketUPtr P2PPacketSocket::open(P2PSocketUPtr socket) {
return P2PPacketSocketUPtr(new P2PPacketSocket(std::move(socket))); return P2PPacketSocketUPtr(new P2PPacketSocket(std::move(socket)));

View File

@ -4,6 +4,7 @@
#include "StarAtomicSharedPtr.hpp" #include "StarAtomicSharedPtr.hpp"
#include "StarP2PNetworkingService.hpp" #include "StarP2PNetworkingService.hpp"
#include "StarNetPackets.hpp" #include "StarNetPackets.hpp"
#include "StarZSTDCompression.hpp"
namespace Star { namespace Star {
@ -24,10 +25,11 @@ class PacketStatCollector {
public: public:
PacketStatCollector(float calculationWindow = 1.0f); PacketStatCollector(float calculationWindow = 1.0f);
void mix(PacketType type, size_t size); void mix(size_t size);
void mix(HashMap<PacketType, size_t> const& sizes); void mix(PacketType type, size_t size, bool addToTotal = true);
void mix(HashMap<PacketType, size_t> const& sizes, bool addToTotal = true);
// Should always return packet staticstics for the most recent completed // Should always return packet statistics for the most recent completed
// window of time // window of time
PacketStats stats() const; PacketStats stats() const;
@ -37,6 +39,7 @@ private:
float m_calculationWindow; float m_calculationWindow;
PacketStats m_stats; PacketStats m_stats;
Map<PacketType, float> m_unmixed; Map<PacketType, float> m_unmixed;
size_t m_totalBytes;
int64_t m_lastMixTime; int64_t m_lastMixTime;
}; };
@ -72,10 +75,10 @@ public:
virtual Maybe<PacketStats> incomingStats() const; virtual Maybe<PacketStats> incomingStats() const;
virtual Maybe<PacketStats> outgoingStats() const; virtual Maybe<PacketStats> outgoingStats() const;
void setLegacy(bool legacy); virtual void setLegacy(bool legacy);
bool legacy() const; virtual bool legacy() const;
private: private:
bool m_legacy = false; bool m_legacy = true;
}; };
// PacketSocket for local communication. // PacketSocket for local communication.
@ -127,6 +130,7 @@ public:
Maybe<PacketStats> incomingStats() const override; Maybe<PacketStats> incomingStats() const override;
Maybe<PacketStats> outgoingStats() const override; Maybe<PacketStats> outgoingStats() const override;
void setLegacy(bool legacy) override;
private: private:
TcpPacketSocket(TcpSocketPtr socket); TcpPacketSocket(TcpSocketPtr socket);
@ -136,6 +140,10 @@ private:
PacketStatCollector m_outgoingStats; PacketStatCollector m_outgoingStats;
ByteArray m_outputBuffer; ByteArray m_outputBuffer;
ByteArray m_inputBuffer; ByteArray m_inputBuffer;
bool m_useCompressionStream = false;
ByteArray m_compressedBuffer;
CompressionStream m_compressionStream;
DecompressionStream m_decompressionStream;
}; };
// Wraps a P2PSocket into a PacketSocket // Wraps a P2PSocket into a PacketSocket

View File

@ -341,7 +341,7 @@ ClientConnectPacket::ClientConnectPacket(ByteArray assetsDigest, bool allowAsset
playerName(std::move(playerName)), playerSpecies(std::move(playerSpecies)), shipChunks(std::move(shipChunks)), playerName(std::move(playerName)), playerSpecies(std::move(playerSpecies)), shipChunks(std::move(shipChunks)),
shipUpgrades(std::move(shipUpgrades)), introComplete(std::move(introComplete)), account(std::move(account)) {} shipUpgrades(std::move(shipUpgrades)), introComplete(std::move(introComplete)), account(std::move(account)) {}
void ClientConnectPacket::read(DataStream& ds) { void ClientConnectPacket::readLegacy(DataStream& ds) {
ds.read(assetsDigest); ds.read(assetsDigest);
ds.read(allowAssetsMismatch); ds.read(allowAssetsMismatch);
ds.read(playerUuid); ds.read(playerUuid);
@ -353,7 +353,12 @@ void ClientConnectPacket::read(DataStream& ds) {
ds.read(account); ds.read(account);
} }
void ClientConnectPacket::write(DataStream& ds) const { void ClientConnectPacket::read(DataStream& ds) {
readLegacy(ds);
ds.read(info);
}
void ClientConnectPacket::writeLegacy(DataStream& ds) const {
ds.write(assetsDigest); ds.write(assetsDigest);
ds.write(allowAssetsMismatch); ds.write(allowAssetsMismatch);
ds.write(playerUuid); ds.write(playerUuid);
@ -365,6 +370,11 @@ void ClientConnectPacket::write(DataStream& ds) const {
ds.write(account); ds.write(account);
} }
void ClientConnectPacket::write(DataStream& ds) const {
writeLegacy(ds);
ds.write(info);
}
ClientDisconnectRequestPacket::ClientDisconnectRequestPacket() {} ClientDisconnectRequestPacket::ClientDisconnectRequestPacket() {}
void ClientDisconnectRequestPacket::read(DataStream& ds) { void ClientDisconnectRequestPacket::read(DataStream& ds) {

View File

@ -127,11 +127,10 @@ struct Packet {
virtual PacketType type() const = 0; virtual PacketType type() const = 0;
virtual void read(DataStream& ds) = 0;
virtual void write(DataStream& ds) const = 0;
virtual void readLegacy(DataStream& ds); virtual void readLegacy(DataStream& ds);
virtual void read(DataStream& ds) = 0;
virtual void writeLegacy(DataStream& ds) const; virtual void writeLegacy(DataStream& ds) const;
virtual void write(DataStream& ds) const = 0;
PacketCompressionMode compressionMode() const; PacketCompressionMode compressionMode() const;
void setCompressionMode(PacketCompressionMode compressionMode); void setCompressionMode(PacketCompressionMode compressionMode);
@ -288,7 +287,9 @@ struct ClientConnectPacket : PacketBase<PacketType::ClientConnect> {
String playerSpecies, WorldChunks shipChunks, ShipUpgrades shipUpgrades, bool introComplete, String playerSpecies, WorldChunks shipChunks, ShipUpgrades shipUpgrades, bool introComplete,
String account); String account);
void readLegacy(DataStream& ds) override;
void read(DataStream& ds) override; void read(DataStream& ds) override;
void writeLegacy(DataStream& ds) const override;
void write(DataStream& ds) const override; void write(DataStream& ds) const override;
ByteArray assetsDigest; ByteArray assetsDigest;
@ -300,6 +301,7 @@ struct ClientConnectPacket : PacketBase<PacketType::ClientConnect> {
ShipUpgrades shipUpgrades; ShipUpgrades shipUpgrades;
bool introComplete; bool introComplete;
String account; String account;
Json info;
}; };
struct ClientDisconnectRequestPacket : PacketBase<PacketType::ClientDisconnectRequest> { struct ClientDisconnectRequestPacket : PacketBase<PacketType::ClientDisconnectRequest> {

View File

@ -1522,7 +1522,6 @@ void UniverseServer::acceptConnection(UniverseConnection connection, Maybe<HostA
} }
bool legacyClient = protocolRequest->compressionMode() != PacketCompressionMode::Enabled; bool legacyClient = protocolRequest->compressionMode() != PacketCompressionMode::Enabled;
connection.setLegacy(legacyClient);
auto protocolResponse = make_shared<ProtocolResponsePacket>(); auto protocolResponse = make_shared<ProtocolResponsePacket>();
protocolResponse->setCompressionMode(PacketCompressionMode::Enabled); // Signal that we're OpenStarbound protocolResponse->setCompressionMode(PacketCompressionMode::Enabled); // Signal that we're OpenStarbound
@ -1540,6 +1539,7 @@ void UniverseServer::acceptConnection(UniverseConnection connection, Maybe<HostA
protocolResponse->allowed = true; protocolResponse->allowed = true;
connection.pushSingle(protocolResponse); connection.pushSingle(protocolResponse);
connection.sendAll(clientWaitLimit); connection.sendAll(clientWaitLimit);
connection.setLegacy(legacyClient);
String remoteAddressString = remoteAddress ? toString(*remoteAddress) : "local"; String remoteAddressString = remoteAddress ? toString(*remoteAddress) : "local";
Logger::info("UniverseServer: Awaiting connection info from {}, {} client", remoteAddressString, legacyClient ? "Starbound" : "OpenStarbound"); Logger::info("UniverseServer: Awaiting connection info from {}, {} client", remoteAddressString, legacyClient ? "Starbound" : "OpenStarbound");

View File

@ -7,6 +7,7 @@
"zlib", "zlib",
"freetype", "freetype",
"libpng", "libpng",
"opus" "opus",
"zstd"
] ]
} }