#include "StarUniverseConnection.hpp" #include "StarTcp.hpp" #include "gtest/gtest.h" using namespace Star; unsigned const PacketCount = 20; uint16_t const ServerPort = 55555; unsigned const NumLocalASyncConnections = 5; unsigned const NumRemoteASyncConnections = 5; unsigned const ASyncSleepMillis = 5; unsigned const NumLocalSyncConnections = 5; unsigned const NumRemoteSyncConnections = 5; unsigned const SyncWaitMillis = 10000; class ASyncClientThread : public Thread { public: ASyncClientThread(UniverseConnection conn) : Thread("UniverseConnectionTestClientThread"), m_connection(std::move(conn)) { start(); } virtual void run() { try { unsigned read = 0; unsigned written = 0; while (read < PacketCount || written < PacketCount) { m_connection.receive(); if (read < PacketCount) { if (auto packet = m_connection.pullSingle()) { EXPECT_TRUE(convert(packet)->requestProtocolVersion == read); ++read; } } if (written < PacketCount) { m_connection.push({make_shared(written)}); ++written; } m_connection.send(); Thread::sleep(ASyncSleepMillis); if (!m_connection.isOpen()) break; } EXPECT_EQ(PacketCount, read); EXPECT_EQ(PacketCount, written); m_connection.close(); EXPECT_TRUE(m_connection.pull().empty()); } catch (std::exception const& e) { ADD_FAILURE() << "Exception: " << outputException(e, true); } catch (...) { ADD_FAILURE(); } } private: UniverseConnection m_connection; }; class SyncClientThread : public Thread { public: SyncClientThread(UniverseConnection conn) : Thread("UniverseConnectionTestClientThread"), m_connection(std::move(conn)) { start(); } virtual void run() { try { for (unsigned i = 0; i < PacketCount; ++i) { m_connection.pushSingle(make_shared(i)); EXPECT_TRUE(m_connection.sendAll(SyncWaitMillis)); EXPECT_TRUE(m_connection.receiveAny(SyncWaitMillis)); EXPECT_EQ(convert(m_connection.pullSingle())->requestProtocolVersion, i); if (!m_connection.isOpen()) break; } m_connection.close(); EXPECT_TRUE(m_connection.pull().empty()); } catch (std::exception const& e) { ADD_FAILURE() << "Exception: " << outputException(e, true); } catch (...) { ADD_FAILURE(); } } private: UniverseConnection m_connection; }; TEST(UniverseConnections, All) { UniverseConnectionServer server([](UniverseConnectionServer* server, ConnectionId clientId, List packets) { server->sendPackets(clientId, packets); }); ConnectionId clientId = ServerConnectionId; TcpServer tcpServer(HostAddressWithPort(HostAddress::localhost(), ServerPort)); tcpServer.setAcceptCallback([&server, &clientId](TcpSocketPtr socket) { socket->setNonBlocking(true); auto conn = UniverseConnection(TcpPacketSocket::open(std::move(socket))); server.addConnection(++clientId, std::move(conn)); }); LinkedList localASyncClients; for (unsigned i = 0; i < NumLocalASyncConnections; ++i) { auto pair = LocalPacketSocket::openPair(); server.addConnection(++clientId, UniverseConnection(std::move(pair.first))); localASyncClients.emplaceAppend(UniverseConnection(std::move(pair.second))); } LinkedList localSyncClients; for (unsigned i = 0; i < NumLocalSyncConnections; ++i) { auto pair = LocalPacketSocket::openPair(); server.addConnection(++clientId, UniverseConnection(std::move(pair.first))); localSyncClients.emplaceAppend(UniverseConnection(std::move(pair.second))); } LinkedList remoteASyncClients; for (unsigned i = 0; i < NumRemoteASyncConnections; ++i) { auto socket = TcpSocket::connectTo({HostAddress::localhost(), ServerPort}); socket->setNonBlocking(true); remoteASyncClients.emplaceAppend(UniverseConnection(TcpPacketSocket::open(std::move(socket)))); } LinkedList remoteSyncClients; for (unsigned i = 0; i < NumRemoteSyncConnections; ++i) { auto socket = TcpSocket::connectTo({HostAddress::localhost(), ServerPort}); socket->setNonBlocking(true); remoteSyncClients.emplaceAppend(UniverseConnection(TcpPacketSocket::open(std::move(socket)))); } for (auto& c : localASyncClients) c.join(); for (auto& c : remoteASyncClients) c.join(); for (auto& c : localSyncClients) c.join(); for (auto& c : remoteSyncClients) c.join(); server.removeAllConnections(); }