Fixes
This commit is contained in:
parent
662b83ff92
commit
6208636d69
@ -132,6 +132,8 @@ void DataStreamBuffer::writeData(char const* data, size_t len) {
|
|||||||
|
|
||||||
DataStreamExternalBuffer::DataStreamExternalBuffer() : m_buffer() {}
|
DataStreamExternalBuffer::DataStreamExternalBuffer() : m_buffer() {}
|
||||||
|
|
||||||
|
DataStreamExternalBuffer::DataStreamExternalBuffer(ByteArray const& byteArray) : DataStreamExternalBuffer(byteArray.ptr(), byteArray.size()) {}
|
||||||
|
|
||||||
DataStreamExternalBuffer::DataStreamExternalBuffer(DataStreamBuffer const& buffer) : DataStreamExternalBuffer(buffer.ptr(), buffer.size()) {}
|
DataStreamExternalBuffer::DataStreamExternalBuffer(DataStreamBuffer const& buffer) : DataStreamExternalBuffer(buffer.ptr(), buffer.size()) {}
|
||||||
|
|
||||||
DataStreamExternalBuffer::DataStreamExternalBuffer(char const* externalData, size_t len) : DataStreamExternalBuffer() {
|
DataStreamExternalBuffer::DataStreamExternalBuffer(char const* externalData, size_t len) : DataStreamExternalBuffer() {
|
||||||
|
@ -126,7 +126,9 @@ private:
|
|||||||
class DataStreamExternalBuffer : public DataStream {
|
class DataStreamExternalBuffer : public DataStream {
|
||||||
public:
|
public:
|
||||||
DataStreamExternalBuffer();
|
DataStreamExternalBuffer();
|
||||||
DataStreamExternalBuffer(DataStreamBuffer const& buffer);
|
explicit DataStreamExternalBuffer(ByteArray const& byteArray);
|
||||||
|
explicit DataStreamExternalBuffer(DataStreamBuffer const& buffer);
|
||||||
|
|
||||||
DataStreamExternalBuffer(DataStreamExternalBuffer const& buffer) = default;
|
DataStreamExternalBuffer(DataStreamExternalBuffer const& buffer) = default;
|
||||||
DataStreamExternalBuffer(char const* externalData, size_t len);
|
DataStreamExternalBuffer(char const* externalData, size_t len);
|
||||||
|
|
||||||
|
@ -7,6 +7,7 @@ CompressionStream::CompressionStream() : m_cStream(ZSTD_createCStream()) {
|
|||||||
ZSTD_CCtx_setParameter(m_cStream, ZSTD_c_enableLongDistanceMatching, 1);
|
ZSTD_CCtx_setParameter(m_cStream, ZSTD_c_enableLongDistanceMatching, 1);
|
||||||
ZSTD_CCtx_setParameter(m_cStream, ZSTD_c_windowLog, 24);
|
ZSTD_CCtx_setParameter(m_cStream, ZSTD_c_windowLog, 24);
|
||||||
ZSTD_initCStream(m_cStream, 2);
|
ZSTD_initCStream(m_cStream, 2);
|
||||||
|
m_output.resize(ZSTD_CStreamOutSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
CompressionStream::~CompressionStream() { ZSTD_freeCStream(m_cStream); }
|
CompressionStream::~CompressionStream() { ZSTD_freeCStream(m_cStream); }
|
||||||
@ -14,39 +15,33 @@ CompressionStream::~CompressionStream() { ZSTD_freeCStream(m_cStream); }
|
|||||||
ByteArray CompressionStream::compress(const char* in, size_t inLen) {
|
ByteArray CompressionStream::compress(const char* in, size_t inLen) {
|
||||||
size_t const cInSize = ZSTD_CStreamInSize ();
|
size_t const cInSize = ZSTD_CStreamInSize ();
|
||||||
size_t const cOutSize = ZSTD_CStreamOutSize();
|
size_t const cOutSize = ZSTD_CStreamOutSize();
|
||||||
ByteArray output(cOutSize, 0);
|
ZSTD_inBuffer inBuffer = {in, inLen, 0};
|
||||||
size_t written = 0, read = 0;
|
size_t written = 0;
|
||||||
while (read < inLen) {
|
|
||||||
ZSTD_inBuffer inBuffer = {in + read, min(cInSize, inLen - read), 0};
|
|
||||||
ZSTD_outBuffer outBuffer = {output.ptr() + written, output.size() - written, 0};
|
|
||||||
bool finished = false;
|
bool finished = false;
|
||||||
do {
|
do {
|
||||||
|
ZSTD_outBuffer outBuffer = {m_output.ptr() + written, min(cOutSize, m_output.size() - written), 0};
|
||||||
size_t ret = ZSTD_compressStream2(m_cStream, &outBuffer, &inBuffer, ZSTD_e_flush);
|
size_t ret = ZSTD_compressStream2(m_cStream, &outBuffer, &inBuffer, ZSTD_e_flush);
|
||||||
if (ZSTD_isError(ret)) {
|
if (ZSTD_isError(ret)) {
|
||||||
throw IOException(strf("ZSTD compression error {}", ZSTD_getErrorName(ret)));
|
throw IOException(strf("ZSTD compression error {}", ZSTD_getErrorName(ret)));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
written += outBuffer.pos;
|
||||||
if (outBuffer.pos == outBuffer.size) {
|
if (outBuffer.pos == outBuffer.size) {
|
||||||
output.resize(output.size() * 2);
|
if (written >= m_output.size())
|
||||||
outBuffer.dst = output.ptr();
|
m_output.resize(m_output.size() * 2);
|
||||||
outBuffer.size = output.size();
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
finished = ret == 0 && inBuffer.pos == inBuffer.size;
|
finished = ret == 0 && inBuffer.pos == inBuffer.size;
|
||||||
} while (!finished);
|
} while (!finished);
|
||||||
|
return ByteArray(m_output.ptr(), written);
|
||||||
read += inBuffer.pos;
|
|
||||||
written += outBuffer.pos;
|
|
||||||
}
|
|
||||||
output.resize(written);
|
|
||||||
return output;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
DecompressionStream::DecompressionStream() : m_dStream(ZSTD_createDStream()) {
|
DecompressionStream::DecompressionStream() : m_dStream(ZSTD_createDStream()) {
|
||||||
ZSTD_DCtx_setParameter(m_dStream, ZSTD_d_windowLogMax, 25);
|
ZSTD_DCtx_setParameter(m_dStream, ZSTD_d_windowLogMax, 25);
|
||||||
ZSTD_initDStream(m_dStream);
|
ZSTD_initDStream(m_dStream);
|
||||||
|
m_output.resize(ZSTD_DStreamOutSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
DecompressionStream::~DecompressionStream() { ZSTD_freeDStream(m_dStream); }
|
DecompressionStream::~DecompressionStream() { ZSTD_freeDStream(m_dStream); }
|
||||||
@ -54,31 +49,26 @@ DecompressionStream::~DecompressionStream() { ZSTD_freeDStream(m_dStream); }
|
|||||||
ByteArray DecompressionStream::decompress(const char* in, size_t inLen) {
|
ByteArray DecompressionStream::decompress(const char* in, size_t inLen) {
|
||||||
size_t const dInSize = ZSTD_DStreamInSize ();
|
size_t const dInSize = ZSTD_DStreamInSize ();
|
||||||
size_t const dOutSize = ZSTD_DStreamOutSize();
|
size_t const dOutSize = ZSTD_DStreamOutSize();
|
||||||
ByteArray output(dOutSize, 0);
|
ZSTD_inBuffer inBuffer = {in, inLen, 0};
|
||||||
size_t written = 0, read = 0;
|
size_t written = 0;
|
||||||
while (read < inLen) {
|
bool finished = false;
|
||||||
ZSTD_inBuffer inBuffer = {in + read, min(dInSize, inLen - read), 0};
|
|
||||||
ZSTD_outBuffer outBuffer = {output.ptr() + written, output.size() - written, 0};
|
|
||||||
do {
|
do {
|
||||||
|
ZSTD_outBuffer outBuffer = {m_output.ptr() + written, min(dOutSize, m_output.size() - written), 0};
|
||||||
size_t ret = ZSTD_decompressStream(m_dStream, &outBuffer, &inBuffer);
|
size_t ret = ZSTD_decompressStream(m_dStream, &outBuffer, &inBuffer);
|
||||||
if (ZSTD_isError(ret)) {
|
if (ZSTD_isError(ret)) {
|
||||||
throw IOException(strf("ZSTD decompression error {}", ZSTD_getErrorName(ret)));
|
throw IOException(strf("ZSTD decompression error {}", ZSTD_getErrorName(ret)));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
written += outBuffer.pos;
|
||||||
if (outBuffer.pos == outBuffer.size) {
|
if (outBuffer.pos == outBuffer.size) {
|
||||||
output.resize(output.size() * 2);
|
if (written >= m_output.size())
|
||||||
outBuffer.dst = output.ptr();
|
m_output.resize(m_output.size() * 2);
|
||||||
outBuffer.size = output.size();
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
} while (inBuffer.pos < inBuffer.size);
|
finished = inBuffer.pos == inBuffer.size;
|
||||||
|
} while (!finished);
|
||||||
read += inBuffer.pos;
|
return ByteArray(m_output.ptr(), written);
|
||||||
written += outBuffer.pos;
|
|
||||||
}
|
|
||||||
output.resize(written);
|
|
||||||
return output;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
@ -19,6 +19,7 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
ZSTD_CStream* m_cStream;
|
ZSTD_CStream* m_cStream;
|
||||||
|
ByteArray m_output;
|
||||||
};
|
};
|
||||||
|
|
||||||
inline ByteArray CompressionStream::compress(ByteArray const& in) {
|
inline ByteArray CompressionStream::compress(ByteArray const& in) {
|
||||||
@ -35,6 +36,7 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
ZSTD_DStream* m_dStream;
|
ZSTD_DStream* m_dStream;
|
||||||
|
ByteArray m_output;
|
||||||
};
|
};
|
||||||
|
|
||||||
inline ByteArray DecompressionStream::decompress(ByteArray const& in) {
|
inline ByteArray DecompressionStream::decompress(ByteArray const& in) {
|
||||||
|
@ -212,8 +212,8 @@ List<PacketPtr> TcpPacketSocket::receivePackets() {
|
|||||||
uint64_t const PacketBatchLimit = 131072;
|
uint64_t const PacketBatchLimit = 131072;
|
||||||
List<PacketPtr> packets;
|
List<PacketPtr> packets;
|
||||||
try {
|
try {
|
||||||
DataStreamExternalBuffer ds(m_inputBuffer.ptr(), m_inputBuffer.size());
|
DataStreamExternalBuffer ds(m_inputBuffer);
|
||||||
bool atLeastOne = false;
|
size_t trimPos = 0;
|
||||||
while (!ds.atEnd()) {
|
while (!ds.atEnd()) {
|
||||||
PacketType packetType;
|
PacketType packetType;
|
||||||
uint64_t packetSize = 0;
|
uint64_t packetSize = 0;
|
||||||
@ -233,19 +233,19 @@ List<PacketPtr> TcpPacketSocket::receivePackets() {
|
|||||||
if (packetSize > PacketSizeLimit)
|
if (packetSize > PacketSizeLimit)
|
||||||
throw IOException::format("{} bytes large {} exceeds max size!", packetSize, PacketTypeNames.getRight(packetType));
|
throw IOException::format("{} bytes large {} exceeds max size!", packetSize, PacketTypeNames.getRight(packetType));
|
||||||
|
|
||||||
if (packetSize > ds.size() - ds.pos())
|
if (packetSize > ds.remaining())
|
||||||
break;
|
break;
|
||||||
|
|
||||||
atLeastOne = true;
|
|
||||||
m_incomingStats.mix(packetType, packetSize, !m_useCompressionStream);
|
m_incomingStats.mix(packetType, packetSize, !m_useCompressionStream);
|
||||||
|
|
||||||
DataStreamExternalBuffer packetStream(ds.ptr() + ds.pos(), packetSize);
|
DataStreamExternalBuffer packetStream(ds.ptr() + ds.pos(), packetSize);
|
||||||
ByteArray uncompressed;
|
ByteArray uncompressed;
|
||||||
if (packetCompressed) {
|
if (packetCompressed) {
|
||||||
uncompressed = uncompressData(packetStream.ptr() + packetStream.pos(), packetSize, PacketSizeLimit);
|
uncompressed = uncompressData(packetStream.ptr(), packetSize, PacketSizeLimit);
|
||||||
packetStream.reset(uncompressed.ptr(), uncompressed.size());
|
packetStream.reset(uncompressed.ptr(), uncompressed.size());
|
||||||
}
|
}
|
||||||
ds.seek(packetSize, IOSeek::Relative);
|
ds.seek(packetSize, IOSeek::Relative);
|
||||||
|
trimPos = ds.pos();
|
||||||
|
|
||||||
size_t count = 0;
|
size_t count = 0;
|
||||||
do {
|
do {
|
||||||
@ -262,10 +262,10 @@ List<PacketPtr> TcpPacketSocket::receivePackets() {
|
|||||||
packets.append(std::move(packet));
|
packets.append(std::move(packet));
|
||||||
} while (!packetStream.atEnd());
|
} while (!packetStream.atEnd());
|
||||||
}
|
}
|
||||||
if (atLeastOne)
|
if (trimPos)
|
||||||
m_inputBuffer.trimLeft(ds.pos());
|
m_inputBuffer.trimLeft(trimPos);
|
||||||
} catch (IOException const& e) {
|
} catch (IOException const& e) {
|
||||||
Logger::warn("I/O error in TcpPacketSocket::readPackets, closing: {}", outputException(e, false));
|
Logger::warn("I/O error in TcpPacketSocket::receivePackets, closing: {}", outputException(e, false));
|
||||||
m_inputBuffer.clear();
|
m_inputBuffer.clear();
|
||||||
m_socket->shutdown();
|
m_socket->shutdown();
|
||||||
}
|
}
|
||||||
@ -282,6 +282,7 @@ bool TcpPacketSocket::writeData() {
|
|||||||
|
|
||||||
bool dataSent = false;
|
bool dataSent = false;
|
||||||
try {
|
try {
|
||||||
|
if (!m_outputBuffer.empty()) {
|
||||||
if (m_useCompressionStream) {
|
if (m_useCompressionStream) {
|
||||||
auto compressed = m_compressionStream.compress(m_outputBuffer);
|
auto compressed = m_compressionStream.compress(m_outputBuffer);
|
||||||
m_outputBuffer.clear();
|
m_outputBuffer.clear();
|
||||||
@ -294,18 +295,19 @@ bool TcpPacketSocket::writeData() {
|
|||||||
m_outgoingStats.mix(written);
|
m_outgoingStats.mix(written);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
while (!m_outputBuffer.empty()) {
|
do {
|
||||||
size_t written = m_socket->send(m_outputBuffer.ptr(), m_outputBuffer.size());
|
size_t written = m_socket->send(m_outputBuffer.ptr(), m_outputBuffer.size());
|
||||||
if (written == 0)
|
if (written == 0)
|
||||||
break;
|
break;
|
||||||
dataSent = true;
|
dataSent = true;
|
||||||
m_outputBuffer.trimLeft(written);
|
m_outputBuffer.trimLeft(written);
|
||||||
|
} while (!m_outputBuffer.empty());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (SocketClosedException const& e) {
|
} catch (SocketClosedException const& e) {
|
||||||
Logger::debug("TcpPacketSocket socket closed: {}", outputException(e, false));
|
Logger::debug("TcpPacketSocket socket closed: {}", outputException(e, false));
|
||||||
} catch (IOException const& e) {
|
} catch (IOException const& e) {
|
||||||
Logger::warn("I/O error in TcpPacketSocket::sendData: {}", outputException(e, false));
|
Logger::warn("I/O error in TcpPacketSocket::writeData: {}", outputException(e, false));
|
||||||
m_socket->shutdown();
|
m_socket->shutdown();
|
||||||
}
|
}
|
||||||
return dataSent;
|
return dataSent;
|
||||||
@ -320,8 +322,8 @@ bool TcpPacketSocket::readData() {
|
|||||||
if (readAmount == 0)
|
if (readAmount == 0)
|
||||||
break;
|
break;
|
||||||
dataReceived = true;
|
dataReceived = true;
|
||||||
m_incomingStats.mix(readAmount);
|
|
||||||
if (m_useCompressionStream) {
|
if (m_useCompressionStream) {
|
||||||
|
m_incomingStats.mix(readAmount);
|
||||||
auto decompressed = m_decompressionStream.decompress(readBuffer, readAmount);
|
auto decompressed = m_decompressionStream.decompress(readBuffer, readAmount);
|
||||||
m_inputBuffer.append(decompressed.ptr(), decompressed.size());
|
m_inputBuffer.append(decompressed.ptr(), decompressed.size());
|
||||||
} else {
|
} else {
|
||||||
@ -435,7 +437,7 @@ List<PacketPtr> P2PPacketSocket::receivePackets() {
|
|||||||
} while (!packetStream.atEnd());
|
} while (!packetStream.atEnd());
|
||||||
}
|
}
|
||||||
} catch (IOException const& e) {
|
} catch (IOException const& e) {
|
||||||
Logger::warn("I/O error in P2PPacketSocket::readPackets, closing: {}", outputException(e, false));
|
Logger::warn("I/O error in P2PPacketSocket::receivePackets, closing: {}", outputException(e, false));
|
||||||
m_socket.reset();
|
m_socket.reset();
|
||||||
}
|
}
|
||||||
return packets;
|
return packets;
|
||||||
|
Loading…
Reference in New Issue
Block a user