Changes to support modifying networking while maintaining legacy support

This commit is contained in:
Kae 2023-07-31 20:22:09 +10:00
parent 2dc10fa5ad
commit 0ef8807539
9 changed files with 100 additions and 18 deletions

View File

@ -55,6 +55,9 @@ Maybe<PacketStats> PacketSocket::outgoingStats() const {
return {}; return {};
} }
void PacketSocket::setLegacy(bool legacy) { m_legacy = legacy; }
bool PacketSocket::legacy() const { return m_legacy; }
pair<LocalPacketSocketUPtr, LocalPacketSocketUPtr> LocalPacketSocket::openPair() { pair<LocalPacketSocketUPtr, LocalPacketSocketUPtr> LocalPacketSocket::openPair() {
auto lhsIncomingPipe = make_shared<Pipe>(); auto lhsIncomingPipe = make_shared<Pipe>();
auto rhsIncomingPipe = make_shared<Pipe>(); auto rhsIncomingPipe = make_shared<Pipe>();
@ -138,23 +141,32 @@ void TcpPacketSocket::sendPackets(List<PacketPtr> packets) {
while (it.hasNext()) { while (it.hasNext()) {
PacketType currentType = it.peekNext()->type(); PacketType currentType = it.peekNext()->type();
PacketCompressionMode currentCompressionMode = it.peekNext()->compressionMode();
DataStreamBuffer packetBuffer; DataStreamBuffer packetBuffer;
while (it.hasNext() && it.peekNext()->type() == currentType) while (it.hasNext()
it.next()->write(packetBuffer); && it.peekNext()->type() == currentType
&& it.peekNext()->compressionMode() == currentCompressionMode) {
if (legacy())
it.next()->writeLegacy(packetBuffer);
else
it.next()->write(packetBuffer);
}
// Packets must read and write actual data, because this is used to // Packets must read and write actual data, because this is used to
// determine packet count // determine packet count
starAssert(!packetBuffer.empty()); starAssert(!packetBuffer.empty());
ByteArray compressedPackets; ByteArray compressedPackets;
if (packetBuffer.size() > 64) bool mustCompress = currentCompressionMode == PacketCompressionMode::Enabled;
bool perhapsCompress = currentCompressionMode == PacketCompressionMode::Automatic && packetBuffer.size() > 64;
if (mustCompress || perhapsCompress)
compressedPackets = compressData(packetBuffer.data()); compressedPackets = compressData(packetBuffer.data());
DataStreamBuffer outBuffer; DataStreamBuffer outBuffer;
outBuffer.write(currentType); outBuffer.write(currentType);
if (!compressedPackets.empty() && compressedPackets.size() < packetBuffer.size()) { if (!compressedPackets.empty() && (mustCompress || compressedPackets.size() < packetBuffer.size())) {
outBuffer.writeVlqI(-(int)(compressedPackets.size())); outBuffer.writeVlqI(-(int)(compressedPackets.size()));
outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size()); outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size());
m_outgoingStats.mix(currentType, compressedPackets.size()); m_outgoingStats.mix(currentType, compressedPackets.size());
@ -208,7 +220,11 @@ List<PacketPtr> TcpPacketSocket::receivePackets() {
DataStreamBuffer packetStream(move(packetBytes)); DataStreamBuffer packetStream(move(packetBytes));
do { do {
PacketPtr packet = createPacket(packetType); PacketPtr packet = createPacket(packetType);
packet->read(packetStream); packet->setCompressionMode(packetCompressed ? PacketCompressionMode::Enabled : PacketCompressionMode::Disabled);
if (legacy())
packet->readLegacy(packetStream);
else
packet->read(packetStream);
packets.append(move(packet)); packets.append(move(packet));
} while (!packetStream.atEnd()); } while (!packetStream.atEnd());
@ -299,23 +315,32 @@ void P2PPacketSocket::sendPackets(List<PacketPtr> packets) {
while (it.hasNext()) { while (it.hasNext()) {
PacketType currentType = it.peekNext()->type(); PacketType currentType = it.peekNext()->type();
PacketCompressionMode currentCompressionMode = it.peekNext()->compressionMode();
DataStreamBuffer packetBuffer; DataStreamBuffer packetBuffer;
while (it.hasNext() && it.peekNext()->type() == currentType) while (it.hasNext()
it.next()->write(packetBuffer); && it.peekNext()->type() == currentType
&& it.peekNext()->compressionMode() == currentCompressionMode) {
if (legacy())
it.next()->writeLegacy(packetBuffer);
else
it.next()->write(packetBuffer);
}
// Packets must read and write actual data, because this is used to // Packets must read and write actual data, because this is used to
// determine packet count // determine packet count
starAssert(!packetBuffer.empty()); starAssert(!packetBuffer.empty());
ByteArray compressedPackets; ByteArray compressedPackets;
if (packetBuffer.size() > 64) bool mustCompress = currentCompressionMode == PacketCompressionMode::Enabled;
bool perhapsCompress = currentCompressionMode == PacketCompressionMode::Automatic && packetBuffer.size() > 64;
if (mustCompress || perhapsCompress)
compressedPackets = compressData(packetBuffer.data()); compressedPackets = compressData(packetBuffer.data());
DataStreamBuffer outBuffer; DataStreamBuffer outBuffer;
outBuffer.write(currentType); outBuffer.write(currentType);
if (!compressedPackets.empty() && compressedPackets.size() < packetBuffer.size()) { if (!compressedPackets.empty() && (mustCompress || compressedPackets.size() < packetBuffer.size())) {
outBuffer.write<bool>(true); outBuffer.write<bool>(true);
outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size()); outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size());
m_outgoingStats.mix(currentType, compressedPackets.size()); m_outgoingStats.mix(currentType, compressedPackets.size());
@ -347,7 +372,11 @@ List<PacketPtr> P2PPacketSocket::receivePackets() {
DataStreamBuffer packetStream(move(packetBytes)); DataStreamBuffer packetStream(move(packetBytes));
do { do {
PacketPtr packet = createPacket(packetType); PacketPtr packet = createPacket(packetType);
packet->read(packetStream); packet->setCompressionMode(packetCompressed ? PacketCompressionMode::Enabled : PacketCompressionMode::Disabled);
if (legacy())
packet->readLegacy(packetStream);
else
packet->read(packetStream);
packets.append(move(packet)); packets.append(move(packet));
} while (!packetStream.atEnd()); } while (!packetStream.atEnd());
} }

View File

@ -72,6 +72,11 @@ public:
// Default implementations return nothing. // Default implementations return nothing.
virtual Maybe<PacketStats> incomingStats() const; virtual Maybe<PacketStats> incomingStats() const;
virtual Maybe<PacketStats> outgoingStats() const; virtual Maybe<PacketStats> outgoingStats() const;
void setLegacy(bool legacy);
bool legacy() const;
private:
bool m_legacy = false;
}; };
// PacketSocket for local communication. // PacketSocket for local communication.

View File

@ -79,6 +79,18 @@ EnumMap<PacketType> const PacketTypeNames{
Packet::~Packet() {} Packet::~Packet() {}
void Packet::readLegacy(DataStream& ds) {
read(ds);
}
void Packet::writeLegacy(DataStream& ds) const {
write(ds);
}
PacketCompressionMode Packet::compressionMode() const
{ return m_compressionMode; }
void Packet::setCompressionMode(PacketCompressionMode compressionMode)
{ m_compressionMode = compressionMode; }
PacketPtr createPacket(PacketType type) { PacketPtr createPacket(PacketType type) {
switch (type) { switch (type) {
case PacketType::ProtocolRequest: return make_shared<ProtocolRequestPacket>(); case PacketType::ProtocolRequest: return make_shared<ProtocolRequestPacket>();

View File

@ -117,6 +117,12 @@ enum class PacketType : uint8_t {
}; };
extern EnumMap<PacketType> const PacketTypeNames; extern EnumMap<PacketType> const PacketTypeNames;
enum class PacketCompressionMode : uint8_t {
Disabled,
Enabled,
Automatic
};
struct Packet { struct Packet {
virtual ~Packet(); virtual ~Packet();
@ -124,6 +130,14 @@ struct Packet {
virtual void read(DataStream& ds) = 0; virtual void read(DataStream& ds) = 0;
virtual void write(DataStream& ds) const = 0; virtual void write(DataStream& ds) const = 0;
virtual void readLegacy(DataStream& ds);
virtual void writeLegacy(DataStream& ds) const;
PacketCompressionMode compressionMode() const;
void setCompressionMode(PacketCompressionMode compressionMode);
PacketCompressionMode m_compressionMode = PacketCompressionMode::Automatic;
}; };
PacketPtr createPacket(PacketType type); PacketPtr createPacket(PacketType type);
@ -132,9 +146,7 @@ template <PacketType PacketT>
struct PacketBase : public Packet { struct PacketBase : public Packet {
static PacketType const Type = PacketT; static PacketType const Type = PacketT;
PacketType type() const override { PacketType type() const override { return Type; }
return Type;
}
}; };
struct ProtocolRequestPacket : PacketBase<PacketType::ProtocolRequest> { struct ProtocolRequestPacket : PacketBase<PacketType::ProtocolRequest> {

View File

@ -77,7 +77,13 @@ Maybe<String> UniverseClient::connect(UniverseConnection connection, bool allowA
unsigned timeout = assets->json("/client.config:serverConnectTimeout").toUInt(); unsigned timeout = assets->json("/client.config:serverConnectTimeout").toUInt();
connection.pushSingle(make_shared<ProtocolRequestPacket>(StarProtocolVersion)); {
auto protocolRequest = make_shared<ProtocolRequestPacket>(StarProtocolVersion);
protocolRequest->setCompressionMode(PacketCompressionMode::Enabled);
// Signal that we're OpenStarbound. Vanilla Starbound only compresses packets above 64 bytes - by forcing it we can communicate this.
// If you know a less cursed way, please let me know.
connection.pushSingle(protocolRequest);
}
connection.sendAll(timeout); connection.sendAll(timeout);
connection.receiveAny(timeout); connection.receiveAny(timeout);
@ -87,6 +93,8 @@ Maybe<String> UniverseClient::connect(UniverseConnection connection, bool allowA
else if (!protocolResponsePacket->allowed) else if (!protocolResponsePacket->allowed)
return String(strf("Join failed! Server does not support connections with protocol version {}", StarProtocolVersion)); return String(strf("Join failed! Server does not support connections with protocol version {}", StarProtocolVersion));
m_legacyServer = protocolResponsePacket->compressionMode() != PacketCompressionMode::Enabled; // True if server is vanilla
connection.setLegacy(m_legacyServer);
connection.pushSingle(make_shared<ClientConnectPacket>(Root::singleton().assets()->digest(), allowAssetsMismatch, m_mainPlayer->uuid(), m_mainPlayer->name(), connection.pushSingle(make_shared<ClientConnectPacket>(Root::singleton().assets()->digest(), allowAssetsMismatch, m_mainPlayer->uuid(), m_mainPlayer->name(),
m_mainPlayer->species(), m_playerStorage->loadShipData(m_mainPlayer->uuid()), m_mainPlayer->shipUpgrades(), m_mainPlayer->species(), m_playerStorage->loadShipData(m_mainPlayer->uuid()), m_mainPlayer->shipUpgrades(),
m_mainPlayer->log()->introComplete(), account)); m_mainPlayer->log()->introComplete(), account));
@ -121,7 +129,7 @@ Maybe<String> UniverseClient::connect(UniverseConnection connection, bool allowA
m_celestialDatabase = make_shared<CelestialSlaveDatabase>(move(success->celestialInformation)); m_celestialDatabase = make_shared<CelestialSlaveDatabase>(move(success->celestialInformation));
m_systemWorldClient = make_shared<SystemWorldClient>(m_universeClock, m_celestialDatabase, m_mainPlayer->universeMap()); m_systemWorldClient = make_shared<SystemWorldClient>(m_universeClock, m_celestialDatabase, m_mainPlayer->universeMap());
Logger::info("UniverseClient: Joined server as client {}", success->clientId); Logger::info("UniverseClient: Joined {} server as client {}", m_legacyServer ? "Starbound" : "OpenStarbound", success->clientId);
return {}; return {};
} else if (auto failure = as<ConnectFailurePacket>(packet)) { } else if (auto failure = as<ConnectFailurePacket>(packet)) {
Logger::error("UniverseClient: Join failed: {}", failure->reason); Logger::error("UniverseClient: Join failed: {}", failure->reason);

View File

@ -127,6 +127,7 @@ private:
StatisticsPtr m_statistics; StatisticsPtr m_statistics;
PlayerPtr m_mainPlayer; PlayerPtr m_mainPlayer;
bool m_legacyServer;
bool m_pause; bool m_pause;
ClockPtr m_universeClock; ClockPtr m_universeClock;
WorldClientPtr m_worldClient; WorldClientPtr m_worldClient;

View File

@ -107,6 +107,10 @@ bool UniverseConnection::receiveAny(unsigned timeout) {
} }
} }
void UniverseConnection::setLegacy(bool legacy) {
m_packetSocket->setLegacy(legacy);
}
Maybe<PacketStats> UniverseConnection::incomingStats() const { Maybe<PacketStats> UniverseConnection::incomingStats() const {
MutexLocker locker(m_mutex); MutexLocker locker(m_mutex);
return m_packetSocket->incomingStats(); return m_packetSocket->incomingStats();

View File

@ -49,6 +49,8 @@ public:
// false if the timeout was reached with no packets receivable. // false if the timeout was reached with no packets receivable.
bool receiveAny(unsigned timeout); bool receiveAny(unsigned timeout);
void setLegacy(bool legacy);
// Packet stats for the most recent one second window of activity incoming // Packet stats for the most recent one second window of activity incoming
// and outgoing. Will only return valid stats if the underlying PacketSocket // and outgoing. Will only return valid stats if the underlying PacketSocket
// implements stat collection. // implements stat collection.

View File

@ -1520,20 +1520,30 @@ void UniverseServer::acceptConnection(UniverseConnection connection, Maybe<HostA
Logger::warn("UniverseServer: client connection aborted, expected ProtocolRequestPacket"); Logger::warn("UniverseServer: client connection aborted, expected ProtocolRequestPacket");
return; return;
} }
bool legacyClient = protocolRequest->compressionMode() != PacketCompressionMode::Enabled;
connection.setLegacy(legacyClient);
auto protocolResponse = make_shared<ProtocolResponsePacket>();
protocolResponse->setCompressionMode(PacketCompressionMode::Enabled); // Signal that we're OpenStarbound
if (protocolRequest->requestProtocolVersion != StarProtocolVersion) { if (protocolRequest->requestProtocolVersion != StarProtocolVersion) {
Logger::warn("UniverseServer: client connection aborted, unsupported protocol version {}, supported version {}", Logger::warn("UniverseServer: client connection aborted, unsupported protocol version {}, supported version {}",
protocolRequest->requestProtocolVersion, StarProtocolVersion); protocolRequest->requestProtocolVersion, StarProtocolVersion);
connection.pushSingle(make_shared<ProtocolResponsePacket>(false)); protocolResponse->allowed = false;
connection.pushSingle(protocolResponse);
connection.sendAll(clientWaitLimit); connection.sendAll(clientWaitLimit);
mainLocker.lock(); mainLocker.lock();
m_deadConnections.append({move(connection), Time::monotonicMilliseconds()}); m_deadConnections.append({move(connection), Time::monotonicMilliseconds()});
return; return;
} }
connection.pushSingle(make_shared<ProtocolResponsePacket>(true)); protocolResponse->allowed = true;
connection.pushSingle(protocolResponse);
connection.sendAll(clientWaitLimit); connection.sendAll(clientWaitLimit);
String remoteAddressString = remoteAddress ? toString(*remoteAddress) : "local";
Logger::info("UniverseServer: Awaiting connection info from {}, {} client", remoteAddressString, legacyClient ? "Starbound" : "OpenStarbound");
connection.receiveAny(clientWaitLimit); connection.receiveAny(clientWaitLimit);
auto clientConnect = as<ClientConnectPacket>(connection.pullSingle()); auto clientConnect = as<ClientConnectPacket>(connection.pullSingle());
if (!clientConnect) { if (!clientConnect) {
@ -1547,7 +1557,6 @@ void UniverseServer::acceptConnection(UniverseConnection connection, Maybe<HostA
bool administrator = false; bool administrator = false;
String accountString = !clientConnect->account.empty() ? strf("'{}'", clientConnect->account) : "<anonymous>"; String accountString = !clientConnect->account.empty() ? strf("'{}'", clientConnect->account) : "<anonymous>";
String remoteAddressString = remoteAddress ? toString(*remoteAddress) : "local";
auto connectionFail = [&](String message) { auto connectionFail = [&](String message) {
Logger::warn("UniverseServer: Login attempt failed with account '{}' as player '{}' from address {}, error: {}", Logger::warn("UniverseServer: Login attempt failed with account '{}' as player '{}' from address {}, error: {}",