#include "StarNetPacketSocket.hpp" #include "StarIterator.hpp" #include "StarCompression.hpp" #include "StarLogging.hpp" namespace Star { PacketStatCollector::PacketStatCollector(float calculationWindow) : m_calculationWindow(calculationWindow), m_stats(), m_lastMixTime(0) {} void PacketStatCollector::mix(PacketType type, size_t size) { calculate(); m_unmixed[type] += size; } void PacketStatCollector::mix(HashMap const& sizes) { calculate(); for (auto const& p : sizes) m_unmixed[p.first] += p.second; } PacketStats PacketStatCollector::stats() const { const_cast(this)->calculate(); return m_stats; } void PacketStatCollector::calculate() { int64_t currentTime = Time::monotonicMilliseconds(); float elapsedTime = (currentTime - m_lastMixTime) / 1000.0f; if (elapsedTime >= m_calculationWindow) { m_lastMixTime = currentTime; m_stats.worstPacketSize = 0; float total = 0.0f; for (auto& pair : m_unmixed) { if (pair.second > m_stats.worstPacketSize) { m_stats.worstPacketType = pair.first; m_stats.worstPacketSize = pair.second; } auto& bytes = m_stats.packetBytesPerSecond[pair.first]; bytes = pair.second / elapsedTime; total += bytes; } m_stats.bytesPerSecond = total; m_unmixed.clear(); } } Maybe PacketSocket::incomingStats() const { return {}; } Maybe PacketSocket::outgoingStats() const { return {}; } pair LocalPacketSocket::openPair() { auto lhsIncomingPipe = make_shared(); auto rhsIncomingPipe = make_shared(); return { LocalPacketSocketUPtr(new LocalPacketSocket(lhsIncomingPipe, weak_ptr(rhsIncomingPipe))), LocalPacketSocketUPtr(new LocalPacketSocket(rhsIncomingPipe, weak_ptr(lhsIncomingPipe))) }; } bool LocalPacketSocket::isOpen() const { return m_incomingPipe && !m_outgoingPipe.expired(); } void LocalPacketSocket::close() { m_incomingPipe.reset(); } void LocalPacketSocket::sendPackets(List packets) { if (!isOpen() || packets.empty()) return; if (auto outgoingPipe = m_outgoingPipe.lock()) { MutexLocker locker(outgoingPipe->mutex); #ifdef STAR_DEBUG // Test serialization if STAR_DEBUG is enabled DataStreamBuffer buffer; for (auto inPacket : take(packets)) { buffer.clear(); inPacket->write(buffer); auto outPacket = createPacket(inPacket->type()); buffer.seek(0); outPacket->read(buffer); packets.append(outPacket); } #endif outgoingPipe->queue.appendAll(move(packets)); } } List LocalPacketSocket::receivePackets() { MutexLocker locker(m_incomingPipe->mutex); List packets; packets.appendAll(take(m_incomingPipe->queue)); return packets; } bool LocalPacketSocket::sentPacketsPending() const { return false; } bool LocalPacketSocket::writeData() { return false; } bool LocalPacketSocket::readData() { return false; } LocalPacketSocket::LocalPacketSocket(shared_ptr incomingPipe, weak_ptr outgoingPipe) : m_incomingPipe(move(incomingPipe)), m_outgoingPipe(move(outgoingPipe)) {} TcpPacketSocketUPtr TcpPacketSocket::open(TcpSocketPtr socket) { socket->setNoDelay(true); socket->setNonBlocking(true); return TcpPacketSocketUPtr(new TcpPacketSocket(move(socket))); } bool TcpPacketSocket::isOpen() const { return m_socket->isActive(); } void TcpPacketSocket::close() { m_socket->close(); } void TcpPacketSocket::sendPackets(List packets) { auto it = makeSMutableIterator(packets); while (it.hasNext()) { PacketType currentType = it.peekNext()->type(); DataStreamBuffer packetBuffer; while (it.hasNext() && it.peekNext()->type() == currentType) it.next()->write(packetBuffer); // Packets must read and write actual data, because this is used to // determine packet count starAssert(!packetBuffer.empty()); ByteArray compressedPackets; if (packetBuffer.size() > 64) compressedPackets = compressData(packetBuffer.data()); DataStreamBuffer outBuffer; outBuffer.write(currentType); if (!compressedPackets.empty() && compressedPackets.size() < packetBuffer.size()) { outBuffer.writeVlqI(-(int)(compressedPackets.size())); outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size()); m_outgoingStats.mix(currentType, compressedPackets.size()); } else { outBuffer.writeVlqI((int)(packetBuffer.size())); outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size()); m_outgoingStats.mix(currentType, packetBuffer.size()); } m_outputBuffer.append(outBuffer.takeData()); } } List TcpPacketSocket::receivePackets() { uint64_t const PacketSizeLimit = 64<<20; List packets; try { while (!m_inputBuffer.empty()) { PacketType packetType; uint64_t packetSize = 0; bool packetCompressed = false; DataStreamBuffer ds(m_inputBuffer); try { packetType = ds.read(); int64_t len = ds.readVlqI(); if (len < 0) { packetSize = -len; packetCompressed = true; } else { packetSize = len; packetCompressed = false; } } catch (EofException const&) { // Guard against not having the entire packet header available when // trying to read. break; } if (packetSize > PacketSizeLimit) throw IOException::format("Packet size {} exceeds maximum allowed packet size!", packetSize); if (packetSize > ds.size() - ds.pos()) break; ByteArray packetBytes = ds.readBytes(packetSize); if (packetCompressed) packetBytes = uncompressData(packetBytes); m_incomingStats.mix(packetType, packetSize); DataStreamBuffer packetStream(move(packetBytes)); do { PacketPtr packet = createPacket(packetType); packet->read(packetStream); packets.append(move(packet)); } while (!packetStream.atEnd()); m_inputBuffer = ds.readBytes(ds.size() - ds.pos()); } } catch (IOException const& e) { Logger::warn("I/O error in TcpPacketSocket::readPackets, closing: {}", outputException(e, false)); m_inputBuffer.clear(); m_socket->shutdown(); } return packets; } bool TcpPacketSocket::sentPacketsPending() const { return !m_outputBuffer.empty(); } bool TcpPacketSocket::writeData() { if (!isOpen()) return false; bool dataSent = false; try { if (m_outputBuffer.empty()) return false; while (!m_outputBuffer.empty()) { size_t writtenAmount = m_socket->send(m_outputBuffer.ptr(), m_outputBuffer.size()); if (writtenAmount == 0) break; dataSent = true; m_outputBuffer.trimLeft(writtenAmount); } } catch (SocketClosedException const& e) { Logger::debug("TcpPacketSocket socket closed: {}", outputException(e, false)); } catch (IOException const& e) { Logger::warn("I/O error in TcpPacketSocket::sendData: {}", outputException(e, false)); m_socket->shutdown(); } return dataSent; } bool TcpPacketSocket::readData() { bool dataReceived = false; try { char readBuffer[1024]; while (true) { size_t readAmount = m_socket->receive(readBuffer, 1024); if (readAmount == 0) break; dataReceived = true; m_inputBuffer.append(readBuffer, readAmount); } } catch (SocketClosedException const& e) { Logger::debug("TcpPacketSocket socket closed: {}", outputException(e, false)); } catch (IOException const& e) { Logger::warn("I/O error in TcpPacketSocket::receiveData: {}", outputException(e, false)); m_socket->shutdown(); } return dataReceived; } Maybe TcpPacketSocket::incomingStats() const { return m_incomingStats.stats(); } Maybe TcpPacketSocket::outgoingStats() const { return m_outgoingStats.stats(); } TcpPacketSocket::TcpPacketSocket(TcpSocketPtr socket) : m_socket(move(socket)) {} P2PPacketSocketUPtr P2PPacketSocket::open(P2PSocketUPtr socket) { return P2PPacketSocketUPtr(new P2PPacketSocket(move(socket))); } bool P2PPacketSocket::isOpen() const { return m_socket && m_socket->isOpen(); } void P2PPacketSocket::close() { m_socket.reset(); } void P2PPacketSocket::sendPackets(List packets) { auto it = makeSMutableIterator(packets); while (it.hasNext()) { PacketType currentType = it.peekNext()->type(); DataStreamBuffer packetBuffer; while (it.hasNext() && it.peekNext()->type() == currentType) it.next()->write(packetBuffer); // Packets must read and write actual data, because this is used to // determine packet count starAssert(!packetBuffer.empty()); ByteArray compressedPackets; if (packetBuffer.size() > 64) compressedPackets = compressData(packetBuffer.data()); DataStreamBuffer outBuffer; outBuffer.write(currentType); if (!compressedPackets.empty() && compressedPackets.size() < packetBuffer.size()) { outBuffer.write(true); outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size()); m_outgoingStats.mix(currentType, compressedPackets.size()); } else { outBuffer.write(false); outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size()); m_outgoingStats.mix(currentType, packetBuffer.size()); } m_outputMessages.append(outBuffer.takeData()); } } List P2PPacketSocket::receivePackets() { List packets; try { for (auto& inputMessage : take(m_inputMessages)) { DataStreamBuffer ds(move(inputMessage)); PacketType packetType = ds.read(); bool packetCompressed = ds.read(); size_t packetSize = ds.size() - ds.pos(); ByteArray packetBytes = ds.readBytes(packetSize); if (packetCompressed) packetBytes = uncompressData(packetBytes); m_incomingStats.mix(packetType, packetSize); DataStreamBuffer packetStream(move(packetBytes)); do { PacketPtr packet = createPacket(packetType); packet->read(packetStream); packets.append(move(packet)); } while (!packetStream.atEnd()); } } catch (IOException const& e) { Logger::warn("I/O error in P2PPacketSocket::readPackets, closing: {}", outputException(e, false)); m_socket.reset(); } return packets; } bool P2PPacketSocket::sentPacketsPending() const { return !m_outputMessages.empty(); } bool P2PPacketSocket::writeData() { bool workDone = false; if (m_socket) { while (!m_outputMessages.empty()) { if (m_socket->sendMessage(m_outputMessages.first())) { m_outputMessages.removeFirst(); workDone = true; } else { break; } } } return workDone; } bool P2PPacketSocket::readData() { bool workDone = false; if (m_socket) { while (auto message = m_socket->receiveMessage()) { m_inputMessages.append(message.take()); workDone = true; } } return workDone; } Maybe P2PPacketSocket::incomingStats() const { return m_incomingStats.stats(); } Maybe P2PPacketSocket::outgoingStats() const { return m_outgoingStats.stats(); } P2PPacketSocket::P2PPacketSocket(P2PSocketPtr socket) : m_socket(move(socket)) {} }