diff --git a/source/core/StarNetImpl.hpp b/source/core/StarNetImpl.hpp index a84da6d..b7213e7 100644 --- a/source/core/StarNetImpl.hpp +++ b/source/core/StarNetImpl.hpp @@ -4,6 +4,8 @@ #include #include #include + +#include "StarString_windows.hpp" #else #ifdef STAR_SYSTEM_FREEBSD #include @@ -42,17 +44,19 @@ static WindowsSocketInitializer g_windowsSocketInitializer; inline String netErrorString() { #ifdef STAR_SYSTEM_WINDOWS - LPVOID lpMsgBuf = NULL; + LPWSTR lpMsgBuf = NULL; + int error = WSAGetLastError(); - FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, + FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM + | FORMAT_MESSAGE_IGNORE_INSERTS | FORMAT_MESSAGE_MAX_WIDTH_MASK, NULL, - WSAGetLastError(), + error, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language (LPTSTR)&lpMsgBuf, 0, NULL); - String result = String((char*)lpMsgBuf); + String result = strf("{} - {}", error, utf16ToString(lpMsgBuf)); if (lpMsgBuf != NULL) LocalFree(lpMsgBuf); diff --git a/source/game/StarNetPacketSocket.cpp b/source/game/StarNetPacketSocket.cpp index cd5c749..436726a 100644 --- a/source/game/StarNetPacketSocket.cpp +++ b/source/game/StarNetPacketSocket.cpp @@ -66,6 +66,9 @@ Maybe PacketSocket::outgoingStats() const { void PacketSocket::setLegacy(bool legacy) { m_legacy = legacy; } bool PacketSocket::legacy() const { return m_legacy; } +void CompressedPacketSocket::setCompressionStreamEnabled(bool enabled) { m_useCompressionStream = enabled; } +bool CompressedPacketSocket::compressionStreamEnabled() const { return m_useCompressionStream; } + pair LocalPacketSocket::openPair() { auto lhsIncomingPipe = make_shared(); auto rhsIncomingPipe = make_shared(); @@ -146,7 +149,7 @@ void TcpPacketSocket::close() { void TcpPacketSocket::sendPackets(List packets) { auto it = makeSMutableIterator(packets); - if (m_useCompressionStream) { + if (compressionStreamEnabled()) { DataStreamBuffer outBuffer; while (it.hasNext()) { PacketPtr& packet = it.next(); @@ -233,7 +236,7 @@ List TcpPacketSocket::receivePackets() { if (packetSize > ds.remaining()) break; - m_incomingStats.mix(packetType, packetSize, !m_useCompressionStream); + m_incomingStats.mix(packetType, packetSize, !compressionStreamEnabled()); DataStreamExternalBuffer packetStream(ds.ptr() + ds.pos(), packetSize); ByteArray uncompressed; @@ -280,19 +283,17 @@ bool TcpPacketSocket::writeData() { bool dataSent = false; try { if (!m_outputBuffer.empty()) { - if (m_useCompressionStream) { - auto compressed = m_compressionStream.compress(m_outputBuffer); + if (compressionStreamEnabled()) { + auto compressedBuffer = m_compressionStream.compress(m_outputBuffer); m_outputBuffer.clear(); - - m_compressedBuffer.append(compressed.ptr(), compressed.size()); do { - size_t written = m_socket->send(m_compressedBuffer.ptr(), m_compressedBuffer.size()); + size_t written = m_socket->send(compressedBuffer.ptr(), compressedBuffer.size()); if (written > 0) { dataSent = true; - m_compressedBuffer.trimLeft(written); + compressedBuffer.trimLeft(written); m_outgoingStats.mix(written); } - } while (!m_compressedBuffer.empty()); + } while (!compressedBuffer.empty()); } else { do { size_t written = m_socket->send(m_outputBuffer.ptr(), m_outputBuffer.size()); @@ -321,10 +322,10 @@ bool TcpPacketSocket::readData() { if (readAmount == 0) break; dataReceived = true; - if (m_useCompressionStream) { + if (compressionStreamEnabled()) { m_incomingStats.mix(readAmount); auto decompressed = m_decompressionStream.decompress(readBuffer, readAmount); - m_inputBuffer.append(decompressed.ptr(), decompressed.size()); + m_inputBuffer.append(decompressed); } else { m_inputBuffer.append(readBuffer, readAmount); } @@ -347,7 +348,6 @@ Maybe TcpPacketSocket::outgoingStats() const { } void TcpPacketSocket::setLegacy(bool legacy) { - m_useCompressionStream = !legacy; PacketSocket::setLegacy(legacy); } @@ -368,43 +368,58 @@ void P2PPacketSocket::close() { void P2PPacketSocket::sendPackets(List packets) { auto it = makeSMutableIterator(packets); - while (it.hasNext()) { - PacketType currentType = it.peekNext()->type(); - PacketCompressionMode currentCompressionMode = it.peekNext()->compressionMode(); - - DataStreamBuffer packetBuffer; - while (it.hasNext() - && it.peekNext()->type() == currentType - && it.peekNext()->compressionMode() == currentCompressionMode) { - if (legacy()) - it.next()->writeLegacy(packetBuffer); - else - it.next()->write(packetBuffer); - } - - // Packets must read and write actual data, because this is used to - // determine packet count - starAssert(!packetBuffer.empty()); - - ByteArray compressedPackets; - bool mustCompress = currentCompressionMode == PacketCompressionMode::Enabled; - bool perhapsCompress = currentCompressionMode == PacketCompressionMode::Automatic && packetBuffer.size() > 64; - if (mustCompress || perhapsCompress) - compressedPackets = compressData(packetBuffer.data()); - + if (compressionStreamEnabled()) { DataStreamBuffer outBuffer; - outBuffer.write(currentType); - - if (!compressedPackets.empty() && (mustCompress || compressedPackets.size() < packetBuffer.size())) { - outBuffer.write(true); - outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size()); - m_outgoingStats.mix(currentType, compressedPackets.size()); - } else { - outBuffer.write(false); + while (it.hasNext()) { + PacketPtr& packet = it.next(); + auto packetType = packet->type(); + DataStreamBuffer packetBuffer; + packet->write(packetBuffer); + outBuffer.write(packetType); + outBuffer.writeVlqI((int)packetBuffer.size()); outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size()); - m_outgoingStats.mix(currentType, packetBuffer.size()); + m_outgoingStats.mix(packetType, packetBuffer.size(), false); + } + m_outputMessages.append(m_compressionStream.compress(outBuffer.data())); + } else { + while (it.hasNext()) { + PacketType currentType = it.peekNext()->type(); + PacketCompressionMode currentCompressionMode = it.peekNext()->compressionMode(); + + DataStreamBuffer packetBuffer; + while (it.hasNext() + && it.peekNext()->type() == currentType + && it.peekNext()->compressionMode() == currentCompressionMode) { + if (legacy()) + it.next()->writeLegacy(packetBuffer); + else + it.next()->write(packetBuffer); + } + + // Packets must read and write actual data, because this is used to + // determine packet count + starAssert(!packetBuffer.empty()); + + ByteArray compressedPackets; + bool mustCompress = currentCompressionMode == PacketCompressionMode::Enabled; + bool perhapsCompress = currentCompressionMode == PacketCompressionMode::Automatic && packetBuffer.size() > 64; + if (mustCompress || perhapsCompress) + compressedPackets = compressData(packetBuffer.data()); + + DataStreamBuffer outBuffer; + outBuffer.write(currentType); + + if (!compressedPackets.empty() && (mustCompress || compressedPackets.size() < packetBuffer.size())) { + outBuffer.write(true); + outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size()); + m_outgoingStats.mix(currentType, compressedPackets.size()); + } else { + outBuffer.write(false); + outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size()); + m_outgoingStats.mix(currentType, packetBuffer.size()); + } + m_outputMessages.append(outBuffer.takeData()); } - m_outputMessages.append(outBuffer.takeData()); } } @@ -422,9 +437,9 @@ List P2PPacketSocket::receivePackets() { if (packetCompressed) packetBytes = uncompressData(packetBytes); - m_incomingStats.mix(packetType, packetSize); + m_incomingStats.mix(packetType, packetSize, !compressionStreamEnabled()); - DataStreamBuffer packetStream(std::move(packetBytes)); + DataStreamExternalBuffer packetStream(packetBytes); do { PacketPtr packet = createPacket(packetType); packet->setCompressionMode(packetCompressed ? PacketCompressionMode::Enabled : PacketCompressionMode::Disabled); @@ -468,7 +483,10 @@ bool P2PPacketSocket::readData() { if (m_socket) { while (auto message = m_socket->receiveMessage()) { - m_inputMessages.append(message.take()); + m_incomingStats.mix(message->size()); + m_inputMessages.append(compressionStreamEnabled() + ? m_decompressionStream.decompress(*message) + : *message); workDone = true; } } diff --git a/source/game/StarNetPacketSocket.hpp b/source/game/StarNetPacketSocket.hpp index dfffec6..c2c06fa 100644 --- a/source/game/StarNetPacketSocket.hpp +++ b/source/game/StarNetPacketSocket.hpp @@ -78,7 +78,20 @@ public: virtual void setLegacy(bool legacy); virtual bool legacy() const; private: - bool m_legacy = true; + bool m_legacy = false; +}; + +class CompressedPacketSocket : public PacketSocket { +public: + virtual ~CompressedPacketSocket() = default; + + virtual void setCompressionStreamEnabled(bool enabled); + virtual bool compressionStreamEnabled() const; +private: + bool m_useCompressionStream = false; +protected: + CompressionStream m_compressionStream; + DecompressionStream m_decompressionStream; }; // PacketSocket for local communication. @@ -112,7 +125,7 @@ private: }; // Wraps a TCP socket into a PacketSocket. -class TcpPacketSocket : public PacketSocket { +class TcpPacketSocket : public CompressedPacketSocket { public: static TcpPacketSocketUPtr open(TcpSocketPtr socket); @@ -140,14 +153,10 @@ private: PacketStatCollector m_outgoingStats; ByteArray m_outputBuffer; ByteArray m_inputBuffer; - bool m_useCompressionStream = false; - ByteArray m_compressedBuffer; - CompressionStream m_compressionStream; - DecompressionStream m_decompressionStream; }; // Wraps a P2PSocket into a PacketSocket -class P2PPacketSocket : public PacketSocket { +class P2PPacketSocket : public CompressedPacketSocket { public: static P2PPacketSocketUPtr open(P2PSocketUPtr socket); diff --git a/source/game/StarNetPackets.cpp b/source/game/StarNetPackets.cpp index 3cbb3ad..d0aacac 100644 --- a/source/game/StarNetPackets.cpp +++ b/source/game/StarNetPackets.cpp @@ -78,6 +78,11 @@ EnumMap const PacketTypeNames{ {PacketType::SystemObjectSpawn, "SystemObjectSpawn"} }; +EnumMap const NetCompressionModeNames { + {NetCompressionMode::None, "None"}, + {NetCompressionMode::Zstd, "Zstd"} +}; + Packet::~Packet() {} void Packet::readLegacy(DataStream& ds) { read(ds); } @@ -187,15 +192,27 @@ void ProtocolRequestPacket::write(DataStream& ds) const { ds.write(requestProtocolVersion); } -ProtocolResponsePacket::ProtocolResponsePacket(bool allowed) - : allowed(allowed) {} +ProtocolResponsePacket::ProtocolResponsePacket(bool allowed, Json info) + : allowed(allowed), info(info) {} void ProtocolResponsePacket::read(DataStream& ds) { ds.read(allowed); + if (compressionMode() == PacketCompressionMode::Enabled) { + // gross hack for backwards compatibility with older OpenSB servers + // can be removed later + auto externalBuffer = as(&ds); + if (!externalBuffer || !externalBuffer->atEnd()) + ds.read(info); + } +} + +void ProtocolResponsePacket::writeLegacy(DataStream& ds) const { + ds.write(allowed); } void ProtocolResponsePacket::write(DataStream& ds) const { - ds.write(allowed); + writeLegacy(ds); + ds.write(info); } ConnectSuccessPacket::ConnectSuccessPacket() {} diff --git a/source/game/StarNetPackets.hpp b/source/game/StarNetPackets.hpp index 475f57a..bfaea6e 100644 --- a/source/game/StarNetPackets.hpp +++ b/source/game/StarNetPackets.hpp @@ -116,16 +116,23 @@ enum class PacketType : uint8_t { }; extern EnumMap const PacketTypeNames; +enum class NetCompressionMode : uint8_t { + None, + Zstd +}; +extern EnumMap const NetCompressionModeNames; + enum class PacketCompressionMode : uint8_t { Disabled, - Enabled, - Automatic + Automatic, + Enabled }; struct Packet { virtual ~Packet(); virtual PacketType type() const = 0; + virtual String const& typeName() const = 0; virtual void readLegacy(DataStream& ds); virtual void read(DataStream& ds) = 0; @@ -149,6 +156,7 @@ struct PacketBase : public Packet { static PacketType const Type = PacketT; PacketType type() const override { return Type; } + String const& typeName() const override { return PacketTypeNames.getRight(Type); } }; struct ProtocolRequestPacket : PacketBase { @@ -162,12 +170,14 @@ struct ProtocolRequestPacket : PacketBase { }; struct ProtocolResponsePacket : PacketBase { - ProtocolResponsePacket(bool allowed = false); + ProtocolResponsePacket(bool allowed = false, Json info = {}); void read(DataStream& ds) override; + void writeLegacy(DataStream& ds) const override; void write(DataStream& ds) const override; bool allowed; + Json info; }; struct ServerDisconnectPacket : PacketBase { diff --git a/source/game/StarRootLoader.cpp b/source/game/StarRootLoader.cpp index bc0bce3..1643791 100644 --- a/source/game/StarRootLoader.cpp +++ b/source/game/StarRootLoader.cpp @@ -76,6 +76,9 @@ R"JSON( "allowAdminCommands" : true, "allowAdminCommandsFromAnyone" : false, "anonymousConnectionsAreAdmin" : false, + "connectionSettings" : { + "compression" : "Zstd" + }, "clientP2PJoinable" : true, "clientIPJoinable" : false, diff --git a/source/game/StarUniverseClient.cpp b/source/game/StarUniverseClient.cpp index 136fd7b..a5bd9df 100644 --- a/source/game/StarUniverseClient.cpp +++ b/source/game/StarUniverseClient.cpp @@ -81,8 +81,8 @@ Maybe UniverseClient::connect(UniverseConnection connection, bool allowA { auto protocolRequest = make_shared(StarProtocolVersion); protocolRequest->setCompressionMode(PacketCompressionMode::Enabled); - // Signal that we're OpenStarbound. Vanilla Starbound only compresses packets above 64 bytes - by forcing it we can communicate this. - // If you know a less cursed way, please let me know. + // Signal that we're OpenStarbound. Vanilla Starbound only compresses + // packets above 64 bytes - by forcing it, we can communicate this. connection.pushSingle(protocolRequest); } connection.sendAll(timeout); @@ -94,8 +94,24 @@ Maybe UniverseClient::connect(UniverseConnection connection, bool allowA else if (!protocolResponsePacket->allowed) return String(strf("Join failed! Server does not support connections with protocol version {}", StarProtocolVersion)); - m_legacyServer = protocolResponsePacket->compressionMode() != PacketCompressionMode::Enabled; // True if server is vanilla - connection.setLegacy(m_legacyServer); + if (!(m_legacyServer = protocolResponsePacket->compressionMode() != PacketCompressionMode::Enabled)) { + if (auto compressedSocket = as(&connection.packetSocket())) { + if (protocolResponsePacket->info) { + auto compressionName = protocolResponsePacket->info.getString("compression", "None"); + auto compressionMode = NetCompressionModeNames.maybeLeft(compressionName); + if (!compressionMode) + return String(strf("Join failed! Unknown net stream connection type '{}'", compressionName)); + + Logger::info("UniverseClient: Using '{}' network stream compression", NetCompressionModeNames.getRight(*compressionMode)); + compressedSocket->setCompressionStreamEnabled(compressionMode == NetCompressionMode::Zstd); + } else if (!m_legacyServer) { + Logger::info("UniverseClient: Defaulting to Zstd network stream compression (older server version)"); + compressedSocket->setCompressionStreamEnabled(true);// old OpenSB server version always expects it! + } + } + } + connection.packetSocket().setLegacy(m_legacyServer); + connection.pushSingle(make_shared(Root::singleton().assets()->digest(), allowAssetsMismatch, m_mainPlayer->uuid(), m_mainPlayer->name(), m_mainPlayer->species(), m_playerStorage->loadShipData(m_mainPlayer->uuid()), m_mainPlayer->shipUpgrades(), m_mainPlayer->log()->introComplete(), account)); diff --git a/source/game/StarUniverseConnection.cpp b/source/game/StarUniverseConnection.cpp index 18b6da9..63f7f75 100644 --- a/source/game/StarUniverseConnection.cpp +++ b/source/game/StarUniverseConnection.cpp @@ -107,8 +107,8 @@ bool UniverseConnection::receiveAny(unsigned timeout) { } } -void UniverseConnection::setLegacy(bool legacy) { - m_packetSocket->setLegacy(legacy); +PacketSocket& UniverseConnection::packetSocket() { + return *m_packetSocket; } Maybe UniverseConnection::incomingStats() const { diff --git a/source/game/StarUniverseConnection.hpp b/source/game/StarUniverseConnection.hpp index 666e2da..ed08104 100644 --- a/source/game/StarUniverseConnection.hpp +++ b/source/game/StarUniverseConnection.hpp @@ -48,7 +48,8 @@ public: // false if the timeout was reached with no packets receivable. bool receiveAny(unsigned timeout); - void setLegacy(bool legacy); + // Returns a reference to the packet socket. + PacketSocket& packetSocket(); // Packet stats for the most recent one second window of activity incoming // and outgoing. Will only return valid stats if the underlying PacketSocket diff --git a/source/game/StarUniverseServer.cpp b/source/game/StarUniverseServer.cpp index 43e67c0..56ae634 100644 --- a/source/game/StarUniverseServer.cpp +++ b/source/game/StarUniverseServer.cpp @@ -1540,6 +1540,7 @@ void UniverseServer::acceptConnection(UniverseConnection connection, Maybejson("/universe_server.config:clientWaitLimit").toInt(); String serverAssetsMismatchMessage = assets->json("/universe_server.config:serverAssetsMismatchMessage").toString(); String clientAssetsMismatchMessage = assets->json("/universe_server.config:clientAssetsMismatchMessage").toString(); + auto connectionSettings = configuration->get("connectionSettings"); RecursiveMutexLocker mainLocker(m_mainLock, false); @@ -1549,8 +1550,9 @@ void UniverseServer::acceptConnection(UniverseConnection connection, MaybecompressionMode() != PacketCompressionMode::Enabled; + connection.packetSocket().setLegacy(legacyClient); auto protocolResponse = make_shared(); protocolResponse->setCompressionMode(PacketCompressionMode::Enabled); // Signal that we're OpenStarbound @@ -1565,10 +1567,21 @@ void UniverseServer::acceptConnection(UniverseConnection connection, Maybeallowed = true; + if (!legacyClient) { + auto compressionName = connectionSettings.getString("compression", "None"); + auto compressionMode = NetCompressionModeNames.maybeLeft(compressionName).value(NetCompressionMode::None); + useCompressionStream = compressionMode == NetCompressionMode::Zstd; + protocolResponse->info = JsonObject{ + {"compression", NetCompressionModeNames.getRight(compressionMode)} + }; + } connection.pushSingle(protocolResponse); connection.sendAll(clientWaitLimit); - connection.setLegacy(legacyClient); + + if (auto compressedSocket = as(&connection.packetSocket())) + compressedSocket->setCompressionStreamEnabled(useCompressionStream); String remoteAddressString = remoteAddress ? toString(*remoteAddress) : "local"; Logger::info("UniverseServer: Awaiting connection info from {}, {} client", remoteAddressString, legacyClient ? "Starbound" : "OpenStarbound"); diff --git a/source/game/StarWorldClient.cpp b/source/game/StarWorldClient.cpp index 9e0478a..fba94f3 100644 --- a/source/game/StarWorldClient.cpp +++ b/source/game/StarWorldClient.cpp @@ -751,7 +751,7 @@ void WorldClient::handleIncomingPackets(List const& packets) { for (auto const& packet : packets) { if (!inWorld() && !is(packet)) - Logger::error("WorldClient received packet type {} while not in world", PacketTypeNames.getRight(packet->type())); + Logger::error("WorldClient received packet type {} while not in world", packet->typeName()); if (auto worldStartPacket = as(packet)) { initWorld(*worldStartPacket);