osb/source/game/StarUniverseConnection.cpp

260 lines
8.5 KiB
C++
Raw Normal View History

2023-06-20 04:33:09 +00:00
#include "StarUniverseConnection.hpp"
#include "StarLogging.hpp"
namespace Star {
static const int PacketSocketPollSleep = 1;
UniverseConnection::UniverseConnection(PacketSocketUPtr packetSocket)
: m_packetSocket(move(packetSocket)) {}
UniverseConnection::UniverseConnection(UniverseConnection&& rhs) {
operator=(move(rhs));
}
UniverseConnection::~UniverseConnection() {
if (m_packetSocket)
m_packetSocket->close();
}
UniverseConnection& UniverseConnection::operator=(UniverseConnection&& rhs) {
MutexLocker locker(m_mutex);
m_sendQueue = take(rhs.m_sendQueue);
m_receiveQueue = take(rhs.m_receiveQueue);
m_packetSocket = take(rhs.m_packetSocket);
return *this;
}
bool UniverseConnection::isOpen() const {
MutexLocker locker(m_mutex);
return m_packetSocket->isOpen();
}
void UniverseConnection::close() {
MutexLocker locker(m_mutex);
m_packetSocket->close();
}
void UniverseConnection::push(List<PacketPtr> packets) {
MutexLocker locker(m_mutex);
m_sendQueue.appendAll(move(packets));
}
void UniverseConnection::pushSingle(PacketPtr packet) {
MutexLocker locker(m_mutex);
m_sendQueue.append(move(packet));
}
List<PacketPtr> UniverseConnection::pull() {
MutexLocker locker(m_mutex);
return List<PacketPtr>::from(take(m_receiveQueue));
}
PacketPtr UniverseConnection::pullSingle() {
MutexLocker locker(m_mutex);
if (m_receiveQueue.empty())
return {};
return m_receiveQueue.takeFirst();
}
bool UniverseConnection::send() {
MutexLocker locker(m_mutex);
m_packetSocket->sendPackets(take(m_sendQueue));
return m_packetSocket->writeData();
}
bool UniverseConnection::sendAll(unsigned timeout) {
MutexLocker locker(m_mutex);
m_packetSocket->sendPackets(take(m_sendQueue));
auto timer = Timer::withMilliseconds(timeout);
while (true) {
m_packetSocket->writeData();
if (!m_packetSocket->sentPacketsPending())
return true;
if (timer.timeUp() || !m_packetSocket->isOpen())
return false;
Thread::sleep(PacketSocketPollSleep);
}
}
bool UniverseConnection::receive() {
MutexLocker locker(m_mutex);
bool received = m_packetSocket->readData();
m_receiveQueue.appendAll(m_packetSocket->receivePackets());
return received;
}
bool UniverseConnection::receiveAny(unsigned timeout) {
MutexLocker locker(m_mutex);
if (!m_receiveQueue.empty())
return true;
auto timer = Timer::withMilliseconds(timeout);
while (true) {
m_packetSocket->readData();
m_receiveQueue.appendAll(m_packetSocket->receivePackets());
if (!m_receiveQueue.empty())
return true;
if (timer.timeUp() || !m_packetSocket->isOpen())
return false;
Thread::sleep(PacketSocketPollSleep);
}
}
Maybe<PacketStats> UniverseConnection::incomingStats() const {
MutexLocker locker(m_mutex);
return m_packetSocket->incomingStats();
}
Maybe<PacketStats> UniverseConnection::outgoingStats() const {
MutexLocker locker(m_mutex);
return m_packetSocket->outgoingStats();
}
UniverseConnectionServer::UniverseConnectionServer(PacketReceiveCallback packetReceiver)
: m_packetReceiver(move(packetReceiver)), m_shutdown(false) {
m_processingLoop = Thread::invoke("UniverseConnectionServer::processingLoop", [this]() {
RecursiveMutexLocker connectionsLocker(m_connectionsMutex);
try {
while (!m_shutdown) {
connectionsLocker.lock();
auto connections = m_connections.pairs();
connectionsLocker.unlock();
bool dataTransmitted = false;
for (auto& p : connections) {
MutexLocker connectionLocker(p.second->mutex);
if (!p.second->packetSocket || !p.second->packetSocket->isOpen())
continue;
p.second->packetSocket->sendPackets(take(p.second->sendQueue));
dataTransmitted |= p.second->packetSocket->writeData();
dataTransmitted |= p.second->packetSocket->readData();
List<PacketPtr> receivePackets = p.second->packetSocket->receivePackets();
if (!receivePackets.empty()) {
p.second->lastActivityTime = Time::monotonicMilliseconds();
p.second->receiveQueue.appendAll(take(receivePackets));
}
if (!p.second->receiveQueue.empty()) {
List<PacketPtr> toReceive = List<PacketPtr>::from(take(p.second->receiveQueue));
connectionLocker.unlock();
try {
m_packetReceiver(this, p.first, move(toReceive));
} catch (std::exception const& e) {
Logger::error("Exception caught handling incoming server packets, disconnecting client '%s' %s", p.first, outputException(e, true));
connectionLocker.lock();
p.second->packetSocket->close();
}
}
}
if (!dataTransmitted)
Thread::sleep(PacketSocketPollSleep);
}
} catch (std::exception const& e) {
Logger::error("Exception caught in UniverseConnectionServer::remoteProcessLoop, closing all remote connections: %s", e.what());
connectionsLocker.lock();
for (auto& p : m_connections)
p.second->packetSocket->close();
}
});
}
UniverseConnectionServer::~UniverseConnectionServer() {
m_shutdown = true;
m_processingLoop.finish();
removeAllConnections();
}
bool UniverseConnectionServer::hasConnection(ConnectionId clientId) const {
RecursiveMutexLocker connectionsLocker(m_connectionsMutex);
return m_connections.contains(clientId);
}
List<ConnectionId> UniverseConnectionServer::allConnections() const {
RecursiveMutexLocker connectionsLocker(m_connectionsMutex);
return m_connections.keys();
}
bool UniverseConnectionServer::connectionIsOpen(ConnectionId clientId) const {
RecursiveMutexLocker connectionsLocker(m_connectionsMutex);
if (auto conn = m_connections.value(clientId)) {
MutexLocker connectionLocker(conn->mutex);
return conn->packetSocket->isOpen();
}
throw UniverseConnectionException::format("No such client '%s' in UniverseConnectionServer::connectionIsOpen", clientId);
}
int64_t UniverseConnectionServer::lastActivityTime(ConnectionId clientId) const {
RecursiveMutexLocker connectionsLocker(m_connectionsMutex);
if (auto conn = m_connections.value(clientId)) {
MutexLocker connectionLocker(conn->mutex);
return conn->lastActivityTime;
}
throw UniverseConnectionException::format("No such client '%s' in UniverseConnectionServer::lastRemoteActivityTime", clientId);
}
void UniverseConnectionServer::addConnection(ConnectionId clientId, UniverseConnection uc) {
RecursiveMutexLocker connectionsLocker(m_connectionsMutex);
if (m_connections.contains(clientId))
throw UniverseConnectionException::format("Client '%s' already exists in UniverseConnectionServer::addConnection", clientId);
auto connection = make_shared<Connection>();
connection->packetSocket = move(uc.m_packetSocket);
connection->sendQueue = move(uc.m_sendQueue);
connection->receiveQueue = move(uc.m_receiveQueue);
connection->lastActivityTime = Time::monotonicMilliseconds();
m_connections.add(clientId, move(connection));
}
UniverseConnection UniverseConnectionServer::removeConnection(ConnectionId clientId) {
RecursiveMutexLocker connectionsLocker(m_connectionsMutex);
if (!m_connections.contains(clientId))
throw UniverseConnectionException::format("Client '%s' does not exist in UniverseConnectionServer::removeConnection", clientId);
auto conn = m_connections.take(clientId);
MutexLocker connectionLocker(conn->mutex);
UniverseConnection uc;
uc.m_packetSocket = take(conn->packetSocket);
uc.m_sendQueue = move(conn->sendQueue);
uc.m_receiveQueue = move(conn->receiveQueue);
return uc;
}
List<UniverseConnection> UniverseConnectionServer::removeAllConnections() {
List<UniverseConnection> removedConnections;
RecursiveMutexLocker connectionsLocker(m_connectionsMutex);
for (auto connectionId : m_connections.keys())
removedConnections.append(removeConnection(connectionId));
return removedConnections;
}
void UniverseConnectionServer::sendPackets(ConnectionId clientId, List<PacketPtr> packets) {
RecursiveMutexLocker connectionsLocker(m_connectionsMutex);
if (auto conn = m_connections.value(clientId)) {
MutexLocker connectionLocker(conn->mutex);
conn->sendQueue.appendAll(move(packets));
if (conn->packetSocket->isOpen()) {
conn->packetSocket->sendPackets(take(conn->sendQueue));
conn->packetSocket->writeData();
}
} else {
throw UniverseConnectionException::format("No such client '%s' in UniverseConnectionServer::sendPackets", clientId);
}
}
}