431a9c00a5
On Linux and macOS, using Clang to compile OpenStarbound produces about 400 MB worth of warnings during the build, making the compiler output unreadable and slowing the build down considerably. 99% of the warnings were unqualified uses of std::move and std::forward, which are now all properly qualified. Fixed a few other minor warnings about non-virtual destructors and some uses of std::move preventing copy elision on temporary objects. Most remaining warnings are now unused parameters.
264 lines
8.6 KiB
C++
264 lines
8.6 KiB
C++
#include "StarUniverseConnection.hpp"
|
|
#include "StarLogging.hpp"
|
|
|
|
namespace Star {
|
|
|
|
static const int PacketSocketPollSleep = 1;
|
|
|
|
UniverseConnection::UniverseConnection(PacketSocketUPtr packetSocket)
|
|
: m_packetSocket(std::move(packetSocket)) {}
|
|
|
|
UniverseConnection::UniverseConnection(UniverseConnection&& rhs) {
|
|
operator=(std::move(rhs));
|
|
}
|
|
|
|
UniverseConnection::~UniverseConnection() {
|
|
if (m_packetSocket)
|
|
m_packetSocket->close();
|
|
}
|
|
|
|
UniverseConnection& UniverseConnection::operator=(UniverseConnection&& rhs) {
|
|
MutexLocker locker(m_mutex);
|
|
m_sendQueue = take(rhs.m_sendQueue);
|
|
m_receiveQueue = take(rhs.m_receiveQueue);
|
|
m_packetSocket = take(rhs.m_packetSocket);
|
|
return *this;
|
|
}
|
|
|
|
bool UniverseConnection::isOpen() const {
|
|
MutexLocker locker(m_mutex);
|
|
return m_packetSocket->isOpen();
|
|
}
|
|
|
|
void UniverseConnection::close() {
|
|
MutexLocker locker(m_mutex);
|
|
m_packetSocket->close();
|
|
}
|
|
|
|
void UniverseConnection::push(List<PacketPtr> packets) {
|
|
MutexLocker locker(m_mutex);
|
|
m_sendQueue.appendAll(std::move(packets));
|
|
}
|
|
|
|
void UniverseConnection::pushSingle(PacketPtr packet) {
|
|
MutexLocker locker(m_mutex);
|
|
m_sendQueue.append(std::move(packet));
|
|
}
|
|
|
|
List<PacketPtr> UniverseConnection::pull() {
|
|
MutexLocker locker(m_mutex);
|
|
return List<PacketPtr>::from(take(m_receiveQueue));
|
|
}
|
|
|
|
PacketPtr UniverseConnection::pullSingle() {
|
|
MutexLocker locker(m_mutex);
|
|
if (m_receiveQueue.empty())
|
|
return {};
|
|
return m_receiveQueue.takeFirst();
|
|
}
|
|
|
|
bool UniverseConnection::send() {
|
|
MutexLocker locker(m_mutex);
|
|
m_packetSocket->sendPackets(take(m_sendQueue));
|
|
return m_packetSocket->writeData();
|
|
}
|
|
|
|
bool UniverseConnection::sendAll(unsigned timeout) {
|
|
MutexLocker locker(m_mutex);
|
|
|
|
m_packetSocket->sendPackets(take(m_sendQueue));
|
|
|
|
auto timer = Timer::withMilliseconds(timeout);
|
|
while (true) {
|
|
m_packetSocket->writeData();
|
|
if (!m_packetSocket->sentPacketsPending())
|
|
return true;
|
|
|
|
if (timer.timeUp() || !m_packetSocket->isOpen())
|
|
return false;
|
|
|
|
Thread::sleep(PacketSocketPollSleep);
|
|
}
|
|
}
|
|
|
|
bool UniverseConnection::receive() {
|
|
MutexLocker locker(m_mutex);
|
|
bool received = m_packetSocket->readData();
|
|
m_receiveQueue.appendAll(m_packetSocket->receivePackets());
|
|
return received;
|
|
}
|
|
|
|
bool UniverseConnection::receiveAny(unsigned timeout) {
|
|
MutexLocker locker(m_mutex);
|
|
if (!m_receiveQueue.empty())
|
|
return true;
|
|
|
|
auto timer = Timer::withMilliseconds(timeout);
|
|
while (true) {
|
|
m_packetSocket->readData();
|
|
m_receiveQueue.appendAll(m_packetSocket->receivePackets());
|
|
if (!m_receiveQueue.empty())
|
|
return true;
|
|
|
|
if (timer.timeUp() || !m_packetSocket->isOpen())
|
|
return false;
|
|
|
|
Thread::sleep(PacketSocketPollSleep);
|
|
}
|
|
}
|
|
|
|
void UniverseConnection::setLegacy(bool legacy) {
|
|
m_packetSocket->setLegacy(legacy);
|
|
}
|
|
|
|
Maybe<PacketStats> UniverseConnection::incomingStats() const {
|
|
MutexLocker locker(m_mutex);
|
|
return m_packetSocket->incomingStats();
|
|
}
|
|
|
|
Maybe<PacketStats> UniverseConnection::outgoingStats() const {
|
|
MutexLocker locker(m_mutex);
|
|
return m_packetSocket->outgoingStats();
|
|
}
|
|
|
|
UniverseConnectionServer::UniverseConnectionServer(PacketReceiveCallback packetReceiver)
|
|
: m_packetReceiver(std::move(packetReceiver)), m_shutdown(false) {
|
|
m_processingLoop = Thread::invoke("UniverseConnectionServer::processingLoop", [this]() {
|
|
RecursiveMutexLocker connectionsLocker(m_connectionsMutex);
|
|
try {
|
|
while (!m_shutdown) {
|
|
connectionsLocker.lock();
|
|
auto connections = m_connections.pairs();
|
|
connectionsLocker.unlock();
|
|
|
|
bool dataTransmitted = false;
|
|
for (auto& p : connections) {
|
|
MutexLocker connectionLocker(p.second->mutex);
|
|
if (!p.second->packetSocket || !p.second->packetSocket->isOpen())
|
|
continue;
|
|
|
|
p.second->packetSocket->sendPackets(take(p.second->sendQueue));
|
|
dataTransmitted |= p.second->packetSocket->writeData();
|
|
|
|
dataTransmitted |= p.second->packetSocket->readData();
|
|
List<PacketPtr> receivePackets = p.second->packetSocket->receivePackets();
|
|
if (!receivePackets.empty()) {
|
|
p.second->lastActivityTime = Time::monotonicMilliseconds();
|
|
p.second->receiveQueue.appendAll(take(receivePackets));
|
|
}
|
|
|
|
if (!p.second->receiveQueue.empty()) {
|
|
List<PacketPtr> toReceive = List<PacketPtr>::from(take(p.second->receiveQueue));
|
|
connectionLocker.unlock();
|
|
|
|
try {
|
|
m_packetReceiver(this, p.first, std::move(toReceive));
|
|
} catch (std::exception const& e) {
|
|
Logger::error("Exception caught handling incoming server packets, disconnecting client '{}' {}", p.first, outputException(e, true));
|
|
|
|
connectionLocker.lock();
|
|
p.second->packetSocket->close();
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!dataTransmitted)
|
|
Thread::sleep(PacketSocketPollSleep);
|
|
}
|
|
} catch (std::exception const& e) {
|
|
Logger::error("Exception caught in UniverseConnectionServer::remoteProcessLoop, closing all remote connections: {}", e.what());
|
|
connectionsLocker.lock();
|
|
for (auto& p : m_connections)
|
|
p.second->packetSocket->close();
|
|
}
|
|
});
|
|
}
|
|
|
|
UniverseConnectionServer::~UniverseConnectionServer() {
|
|
m_shutdown = true;
|
|
m_processingLoop.finish();
|
|
removeAllConnections();
|
|
}
|
|
|
|
bool UniverseConnectionServer::hasConnection(ConnectionId clientId) const {
|
|
RecursiveMutexLocker connectionsLocker(m_connectionsMutex);
|
|
return m_connections.contains(clientId);
|
|
}
|
|
|
|
List<ConnectionId> UniverseConnectionServer::allConnections() const {
|
|
RecursiveMutexLocker connectionsLocker(m_connectionsMutex);
|
|
return m_connections.keys();
|
|
}
|
|
|
|
bool UniverseConnectionServer::connectionIsOpen(ConnectionId clientId) const {
|
|
RecursiveMutexLocker connectionsLocker(m_connectionsMutex);
|
|
if (auto conn = m_connections.value(clientId)) {
|
|
MutexLocker connectionLocker(conn->mutex);
|
|
return conn->packetSocket->isOpen();
|
|
}
|
|
|
|
throw UniverseConnectionException::format("No such client '{}' in UniverseConnectionServer::connectionIsOpen", clientId);
|
|
}
|
|
|
|
int64_t UniverseConnectionServer::lastActivityTime(ConnectionId clientId) const {
|
|
RecursiveMutexLocker connectionsLocker(m_connectionsMutex);
|
|
if (auto conn = m_connections.value(clientId)) {
|
|
MutexLocker connectionLocker(conn->mutex);
|
|
return conn->lastActivityTime;
|
|
}
|
|
throw UniverseConnectionException::format("No such client '{}' in UniverseConnectionServer::lastRemoteActivityTime", clientId);
|
|
}
|
|
|
|
void UniverseConnectionServer::addConnection(ConnectionId clientId, UniverseConnection uc) {
|
|
RecursiveMutexLocker connectionsLocker(m_connectionsMutex);
|
|
if (m_connections.contains(clientId))
|
|
throw UniverseConnectionException::format("Client '{}' already exists in UniverseConnectionServer::addConnection", clientId);
|
|
|
|
auto connection = make_shared<Connection>();
|
|
connection->packetSocket = std::move(uc.m_packetSocket);
|
|
connection->sendQueue = std::move(uc.m_sendQueue);
|
|
connection->receiveQueue = std::move(uc.m_receiveQueue);
|
|
connection->lastActivityTime = Time::monotonicMilliseconds();
|
|
m_connections.add(clientId, std::move(connection));
|
|
}
|
|
|
|
UniverseConnection UniverseConnectionServer::removeConnection(ConnectionId clientId) {
|
|
RecursiveMutexLocker connectionsLocker(m_connectionsMutex);
|
|
if (!m_connections.contains(clientId))
|
|
throw UniverseConnectionException::format("Client '{}' does not exist in UniverseConnectionServer::removeConnection", clientId);
|
|
|
|
auto conn = m_connections.take(clientId);
|
|
MutexLocker connectionLocker(conn->mutex);
|
|
|
|
UniverseConnection uc;
|
|
uc.m_packetSocket = take(conn->packetSocket);
|
|
uc.m_sendQueue = std::move(conn->sendQueue);
|
|
uc.m_receiveQueue = std::move(conn->receiveQueue);
|
|
return uc;
|
|
}
|
|
|
|
List<UniverseConnection> UniverseConnectionServer::removeAllConnections() {
|
|
List<UniverseConnection> removedConnections;
|
|
RecursiveMutexLocker connectionsLocker(m_connectionsMutex);
|
|
for (auto connectionId : m_connections.keys())
|
|
removedConnections.append(removeConnection(connectionId));
|
|
return removedConnections;
|
|
}
|
|
|
|
void UniverseConnectionServer::sendPackets(ConnectionId clientId, List<PacketPtr> packets) {
|
|
RecursiveMutexLocker connectionsLocker(m_connectionsMutex);
|
|
if (auto conn = m_connections.value(clientId)) {
|
|
MutexLocker connectionLocker(conn->mutex);
|
|
conn->sendQueue.appendAll(std::move(packets));
|
|
|
|
if (conn->packetSocket->isOpen()) {
|
|
conn->packetSocket->sendPackets(take(conn->sendQueue));
|
|
conn->packetSocket->writeData();
|
|
}
|
|
} else {
|
|
throw UniverseConnectionException::format("No such client '{}' in UniverseConnectionServer::sendPackets", clientId);
|
|
}
|
|
}
|
|
|
|
}
|