407 lines
11 KiB
C++
407 lines
11 KiB
C++
#include "StarNetPacketSocket.hpp"
|
|
#include "StarIterator.hpp"
|
|
#include "StarCompression.hpp"
|
|
#include "StarLogging.hpp"
|
|
|
|
namespace Star {
|
|
|
|
PacketStatCollector::PacketStatCollector(float calculationWindow)
|
|
: m_calculationWindow(calculationWindow), m_stats(), m_lastMixTime(0) {}
|
|
|
|
void PacketStatCollector::mix(PacketType type, size_t size) {
|
|
calculate();
|
|
m_unmixed[type] += size;
|
|
}
|
|
|
|
void PacketStatCollector::mix(HashMap<PacketType, size_t> const& sizes) {
|
|
calculate();
|
|
for (auto const& p : sizes)
|
|
m_unmixed[p.first] += p.second;
|
|
}
|
|
|
|
PacketStats PacketStatCollector::stats() const {
|
|
const_cast<PacketStatCollector*>(this)->calculate();
|
|
return m_stats;
|
|
}
|
|
|
|
void PacketStatCollector::calculate() {
|
|
int64_t currentTime = Time::monotonicMilliseconds();
|
|
float elapsedTime = (currentTime - m_lastMixTime) / 1000.0f;
|
|
if (elapsedTime >= m_calculationWindow) {
|
|
m_lastMixTime = currentTime;
|
|
m_stats.worstPacketSize = 0;
|
|
|
|
float total = 0.0f;
|
|
for (auto& pair : m_unmixed) {
|
|
if (pair.second > m_stats.worstPacketSize) {
|
|
m_stats.worstPacketType = pair.first;
|
|
m_stats.worstPacketSize = pair.second;
|
|
}
|
|
|
|
auto& bytes = m_stats.packetBytesPerSecond[pair.first];
|
|
bytes = pair.second / elapsedTime;
|
|
total += bytes;
|
|
}
|
|
m_stats.bytesPerSecond = total;
|
|
m_unmixed.clear();
|
|
}
|
|
}
|
|
|
|
Maybe<PacketStats> PacketSocket::incomingStats() const {
|
|
return {};
|
|
}
|
|
|
|
Maybe<PacketStats> PacketSocket::outgoingStats() const {
|
|
return {};
|
|
}
|
|
|
|
pair<LocalPacketSocketUPtr, LocalPacketSocketUPtr> LocalPacketSocket::openPair() {
|
|
auto lhsIncomingPipe = make_shared<Pipe>();
|
|
auto rhsIncomingPipe = make_shared<Pipe>();
|
|
|
|
return {
|
|
LocalPacketSocketUPtr(new LocalPacketSocket(lhsIncomingPipe, weak_ptr<Pipe>(rhsIncomingPipe))),
|
|
LocalPacketSocketUPtr(new LocalPacketSocket(rhsIncomingPipe, weak_ptr<Pipe>(lhsIncomingPipe)))
|
|
};
|
|
}
|
|
|
|
bool LocalPacketSocket::isOpen() const {
|
|
return m_incomingPipe && !m_outgoingPipe.expired();
|
|
}
|
|
|
|
void LocalPacketSocket::close() {
|
|
m_incomingPipe.reset();
|
|
}
|
|
|
|
void LocalPacketSocket::sendPackets(List<PacketPtr> packets) {
|
|
if (!isOpen() || packets.empty())
|
|
return;
|
|
|
|
if (auto outgoingPipe = m_outgoingPipe.lock()) {
|
|
MutexLocker locker(outgoingPipe->mutex);
|
|
|
|
#ifdef STAR_DEBUG
|
|
// Test serialization if STAR_DEBUG is enabled
|
|
DataStreamBuffer buffer;
|
|
for (auto inPacket : take(packets)) {
|
|
buffer.clear();
|
|
inPacket->write(buffer);
|
|
auto outPacket = createPacket(inPacket->type());
|
|
buffer.seek(0);
|
|
outPacket->read(buffer);
|
|
packets.append(outPacket);
|
|
}
|
|
#endif
|
|
|
|
outgoingPipe->queue.appendAll(move(packets));
|
|
}
|
|
}
|
|
|
|
List<PacketPtr> LocalPacketSocket::receivePackets() {
|
|
MutexLocker locker(m_incomingPipe->mutex);
|
|
List<PacketPtr> packets;
|
|
packets.appendAll(take(m_incomingPipe->queue));
|
|
return packets;
|
|
}
|
|
|
|
bool LocalPacketSocket::sentPacketsPending() const {
|
|
return false;
|
|
}
|
|
|
|
bool LocalPacketSocket::writeData() {
|
|
return false;
|
|
}
|
|
|
|
bool LocalPacketSocket::readData() {
|
|
return false;
|
|
}
|
|
|
|
LocalPacketSocket::LocalPacketSocket(shared_ptr<Pipe> incomingPipe, weak_ptr<Pipe> outgoingPipe)
|
|
: m_incomingPipe(move(incomingPipe)), m_outgoingPipe(move(outgoingPipe)) {}
|
|
|
|
TcpPacketSocketUPtr TcpPacketSocket::open(TcpSocketPtr socket) {
|
|
socket->setNoDelay(true);
|
|
socket->setNonBlocking(true);
|
|
return TcpPacketSocketUPtr(new TcpPacketSocket(move(socket)));
|
|
}
|
|
|
|
bool TcpPacketSocket::isOpen() const {
|
|
return m_socket->isActive();
|
|
}
|
|
|
|
void TcpPacketSocket::close() {
|
|
m_socket->close();
|
|
}
|
|
|
|
void TcpPacketSocket::sendPackets(List<PacketPtr> packets) {
|
|
auto it = makeSMutableIterator(packets);
|
|
|
|
while (it.hasNext()) {
|
|
PacketType currentType = it.peekNext()->type();
|
|
|
|
DataStreamBuffer packetBuffer;
|
|
while (it.hasNext() && it.peekNext()->type() == currentType)
|
|
it.next()->write(packetBuffer);
|
|
|
|
// Packets must read and write actual data, because this is used to
|
|
// determine packet count
|
|
starAssert(!packetBuffer.empty());
|
|
|
|
ByteArray compressedPackets;
|
|
if (packetBuffer.size() > 64)
|
|
compressedPackets = compressData(packetBuffer.data());
|
|
|
|
DataStreamBuffer outBuffer;
|
|
outBuffer.write(currentType);
|
|
|
|
if (!compressedPackets.empty() && compressedPackets.size() < packetBuffer.size()) {
|
|
outBuffer.writeVlqI(-(int)(compressedPackets.size()));
|
|
outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size());
|
|
m_outgoingStats.mix(currentType, compressedPackets.size());
|
|
} else {
|
|
outBuffer.writeVlqI((int)(packetBuffer.size()));
|
|
outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size());
|
|
m_outgoingStats.mix(currentType, packetBuffer.size());
|
|
}
|
|
m_outputBuffer.append(outBuffer.takeData());
|
|
}
|
|
}
|
|
|
|
List<PacketPtr> TcpPacketSocket::receivePackets() {
|
|
uint64_t const PacketSizeLimit = 64<<20;
|
|
List<PacketPtr> packets;
|
|
try {
|
|
while (!m_inputBuffer.empty()) {
|
|
PacketType packetType;
|
|
uint64_t packetSize = 0;
|
|
bool packetCompressed = false;
|
|
|
|
DataStreamBuffer ds(m_inputBuffer);
|
|
try {
|
|
packetType = ds.read<PacketType>();
|
|
int64_t len = ds.readVlqI();
|
|
if (len < 0) {
|
|
packetSize = -len;
|
|
packetCompressed = true;
|
|
} else {
|
|
packetSize = len;
|
|
packetCompressed = false;
|
|
}
|
|
} catch (EofException const&) {
|
|
// Guard against not having the entire packet header available when
|
|
// trying to read.
|
|
break;
|
|
}
|
|
|
|
if (packetSize > PacketSizeLimit)
|
|
throw IOException::format("Packet size {} exceeds maximum allowed packet size!", packetSize);
|
|
|
|
if (packetSize > ds.size() - ds.pos())
|
|
break;
|
|
|
|
ByteArray packetBytes = ds.readBytes(packetSize);
|
|
if (packetCompressed)
|
|
packetBytes = uncompressData(packetBytes);
|
|
|
|
m_incomingStats.mix(packetType, packetSize);
|
|
|
|
DataStreamBuffer packetStream(move(packetBytes));
|
|
do {
|
|
PacketPtr packet = createPacket(packetType);
|
|
packet->read(packetStream);
|
|
packets.append(move(packet));
|
|
} while (!packetStream.atEnd());
|
|
|
|
m_inputBuffer = ds.readBytes(ds.size() - ds.pos());
|
|
}
|
|
} catch (IOException const& e) {
|
|
Logger::warn("I/O error in TcpPacketSocket::readPackets, closing: {}", outputException(e, false));
|
|
m_inputBuffer.clear();
|
|
m_socket->shutdown();
|
|
}
|
|
return packets;
|
|
}
|
|
|
|
bool TcpPacketSocket::sentPacketsPending() const {
|
|
return !m_outputBuffer.empty();
|
|
}
|
|
|
|
bool TcpPacketSocket::writeData() {
|
|
if (!isOpen())
|
|
return false;
|
|
|
|
bool dataSent = false;
|
|
try {
|
|
if (m_outputBuffer.empty())
|
|
return false;
|
|
|
|
while (!m_outputBuffer.empty()) {
|
|
size_t writtenAmount = m_socket->send(m_outputBuffer.ptr(), m_outputBuffer.size());
|
|
if (writtenAmount == 0)
|
|
break;
|
|
dataSent = true;
|
|
m_outputBuffer.trimLeft(writtenAmount);
|
|
}
|
|
} catch (SocketClosedException const& e) {
|
|
Logger::debug("TcpPacketSocket socket closed: {}", outputException(e, false));
|
|
} catch (IOException const& e) {
|
|
Logger::warn("I/O error in TcpPacketSocket::sendData: {}", outputException(e, false));
|
|
m_socket->shutdown();
|
|
}
|
|
return dataSent;
|
|
}
|
|
|
|
bool TcpPacketSocket::readData() {
|
|
bool dataReceived = false;
|
|
try {
|
|
char readBuffer[1024];
|
|
while (true) {
|
|
size_t readAmount = m_socket->receive(readBuffer, 1024);
|
|
if (readAmount == 0)
|
|
break;
|
|
dataReceived = true;
|
|
m_inputBuffer.append(readBuffer, readAmount);
|
|
}
|
|
} catch (SocketClosedException const& e) {
|
|
Logger::debug("TcpPacketSocket socket closed: {}", outputException(e, false));
|
|
} catch (IOException const& e) {
|
|
Logger::warn("I/O error in TcpPacketSocket::receiveData: {}", outputException(e, false));
|
|
m_socket->shutdown();
|
|
}
|
|
return dataReceived;
|
|
}
|
|
|
|
Maybe<PacketStats> TcpPacketSocket::incomingStats() const {
|
|
return m_incomingStats.stats();
|
|
}
|
|
|
|
Maybe<PacketStats> TcpPacketSocket::outgoingStats() const {
|
|
return m_outgoingStats.stats();
|
|
}
|
|
|
|
TcpPacketSocket::TcpPacketSocket(TcpSocketPtr socket)
|
|
: m_socket(move(socket)) {}
|
|
|
|
P2PPacketSocketUPtr P2PPacketSocket::open(P2PSocketUPtr socket) {
|
|
return P2PPacketSocketUPtr(new P2PPacketSocket(move(socket)));
|
|
}
|
|
|
|
bool P2PPacketSocket::isOpen() const {
|
|
return m_socket && m_socket->isOpen();
|
|
}
|
|
|
|
void P2PPacketSocket::close() {
|
|
m_socket.reset();
|
|
}
|
|
|
|
void P2PPacketSocket::sendPackets(List<PacketPtr> packets) {
|
|
auto it = makeSMutableIterator(packets);
|
|
|
|
while (it.hasNext()) {
|
|
PacketType currentType = it.peekNext()->type();
|
|
|
|
DataStreamBuffer packetBuffer;
|
|
while (it.hasNext() && it.peekNext()->type() == currentType)
|
|
it.next()->write(packetBuffer);
|
|
|
|
// Packets must read and write actual data, because this is used to
|
|
// determine packet count
|
|
starAssert(!packetBuffer.empty());
|
|
|
|
ByteArray compressedPackets;
|
|
if (packetBuffer.size() > 64)
|
|
compressedPackets = compressData(packetBuffer.data());
|
|
|
|
DataStreamBuffer outBuffer;
|
|
outBuffer.write(currentType);
|
|
|
|
if (!compressedPackets.empty() && compressedPackets.size() < packetBuffer.size()) {
|
|
outBuffer.write<bool>(true);
|
|
outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size());
|
|
m_outgoingStats.mix(currentType, compressedPackets.size());
|
|
} else {
|
|
outBuffer.write<bool>(false);
|
|
outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size());
|
|
m_outgoingStats.mix(currentType, packetBuffer.size());
|
|
}
|
|
m_outputMessages.append(outBuffer.takeData());
|
|
}
|
|
}
|
|
|
|
List<PacketPtr> P2PPacketSocket::receivePackets() {
|
|
List<PacketPtr> packets;
|
|
try {
|
|
for (auto& inputMessage : take(m_inputMessages)) {
|
|
DataStreamBuffer ds(move(inputMessage));
|
|
|
|
PacketType packetType = ds.read<PacketType>();
|
|
bool packetCompressed = ds.read<bool>();
|
|
size_t packetSize = ds.size() - ds.pos();
|
|
|
|
ByteArray packetBytes = ds.readBytes(packetSize);
|
|
if (packetCompressed)
|
|
packetBytes = uncompressData(packetBytes);
|
|
|
|
m_incomingStats.mix(packetType, packetSize);
|
|
|
|
DataStreamBuffer packetStream(move(packetBytes));
|
|
do {
|
|
PacketPtr packet = createPacket(packetType);
|
|
packet->read(packetStream);
|
|
packets.append(move(packet));
|
|
} while (!packetStream.atEnd());
|
|
}
|
|
} catch (IOException const& e) {
|
|
Logger::warn("I/O error in P2PPacketSocket::readPackets, closing: {}", outputException(e, false));
|
|
m_socket.reset();
|
|
}
|
|
return packets;
|
|
}
|
|
|
|
bool P2PPacketSocket::sentPacketsPending() const {
|
|
return !m_outputMessages.empty();
|
|
}
|
|
|
|
bool P2PPacketSocket::writeData() {
|
|
bool workDone = false;
|
|
|
|
if (m_socket) {
|
|
while (!m_outputMessages.empty()) {
|
|
if (m_socket->sendMessage(m_outputMessages.first())) {
|
|
m_outputMessages.removeFirst();
|
|
workDone = true;
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
return workDone;
|
|
}
|
|
|
|
bool P2PPacketSocket::readData() {
|
|
bool workDone = false;
|
|
|
|
if (m_socket) {
|
|
while (auto message = m_socket->receiveMessage()) {
|
|
m_inputMessages.append(message.take());
|
|
workDone = true;
|
|
}
|
|
}
|
|
|
|
return workDone;
|
|
}
|
|
|
|
Maybe<PacketStats> P2PPacketSocket::incomingStats() const {
|
|
return m_incomingStats.stats();
|
|
}
|
|
|
|
Maybe<PacketStats> P2PPacketSocket::outgoingStats() const {
|
|
return m_outgoingStats.stats();
|
|
}
|
|
|
|
P2PPacketSocket::P2PPacketSocket(P2PSocketPtr socket)
|
|
: m_socket(move(socket)) {}
|
|
|
|
}
|