diff options
author | Kae <80987908+Novaenia@users.noreply.github.com> | 2024-03-14 21:41:53 +1100 |
---|---|---|
committer | Kae <80987908+Novaenia@users.noreply.github.com> | 2024-03-14 21:41:53 +1100 |
commit | 662b83ff92cc2316fb962ff1608359f6d705a5f0 (patch) | |
tree | d0e7d15887ed14bd252e6e61888710c8bac2200a /source/game/StarNetPacketSocket.cpp | |
parent | 8164e5ae6fa33c9ec2a14f107585a7cbe7fbf813 (diff) |
Initial commit of experimental zstd network compression
currently a bit buggy
Diffstat (limited to 'source/game/StarNetPacketSocket.cpp')
-rw-r--r-- | source/game/StarNetPacketSocket.cpp | 196 |
1 files changed, 125 insertions, 71 deletions
diff --git a/source/game/StarNetPacketSocket.cpp b/source/game/StarNetPacketSocket.cpp index 48b2082..2ebaadb 100644 --- a/source/game/StarNetPacketSocket.cpp +++ b/source/game/StarNetPacketSocket.cpp @@ -6,17 +6,27 @@ namespace Star { PacketStatCollector::PacketStatCollector(float calculationWindow) - : m_calculationWindow(calculationWindow), m_stats(), m_lastMixTime(0) {} + : m_calculationWindow(calculationWindow), m_stats(), m_totalBytes(0), m_lastMixTime(0) {} -void PacketStatCollector::mix(PacketType type, size_t size) { +void PacketStatCollector::mix(size_t size) { + calculate(); + m_totalBytes += size; +} + +void PacketStatCollector::mix(PacketType type, size_t size, bool addToTotal) { calculate(); m_unmixed[type] += size; + if (addToTotal) + m_totalBytes += size; } -void PacketStatCollector::mix(HashMap<PacketType, size_t> const& sizes) { +void PacketStatCollector::mix(HashMap<PacketType, size_t> const& sizes, bool addToTotal) { calculate(); - for (auto const& p : sizes) + for (auto const& p : sizes) { + if (addToTotal) + m_totalBytes += p.second; m_unmixed[p.first] += p.second; + } } PacketStats PacketStatCollector::stats() const { @@ -31,18 +41,19 @@ void PacketStatCollector::calculate() { m_lastMixTime = currentTime; m_stats.worstPacketSize = 0; - float total = 0.0f; + if (abs(elapsedTime) - m_calculationWindow < 0.0125f) + elapsedTime = m_calculationWindow; + for (auto& pair : m_unmixed) { if (pair.second > m_stats.worstPacketSize) { m_stats.worstPacketType = pair.first; m_stats.worstPacketSize = pair.second; } - auto& bytes = m_stats.packetBytesPerSecond[pair.first]; - bytes = pair.second / elapsedTime; - total += bytes; + m_stats.packetBytesPerSecond[pair.first] = round(pair.second / elapsedTime); } - m_stats.bytesPerSecond = total; + m_stats.bytesPerSecond = round(float(m_totalBytes) / elapsedTime); + m_totalBytes = 0; m_unmixed.clear(); } } @@ -138,67 +149,81 @@ void TcpPacketSocket::close() { void TcpPacketSocket::sendPackets(List<PacketPtr> packets) { auto it = makeSMutableIterator(packets); - - while (it.hasNext()) { - PacketType currentType = it.peekNext()->type(); - PacketCompressionMode currentCompressionMode = it.peekNext()->compressionMode(); - - DataStreamBuffer packetBuffer; - while (it.hasNext() - && it.peekNext()->type() == currentType - && it.peekNext()->compressionMode() == currentCompressionMode) { - if (legacy()) - it.next()->writeLegacy(packetBuffer); - else - it.next()->write(packetBuffer); + if (m_useCompressionStream) { + DataStreamBuffer outBuffer; + while (it.hasNext()) { + PacketPtr& packet = it.next(); + auto packetType = packet->type(); + DataStreamBuffer packetBuffer; + packet->write(packetBuffer); + outBuffer.write(packetType); + outBuffer.writeVlqI((int)packetBuffer.size()); + outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size()); + m_outgoingStats.mix(packetType, packetBuffer.size(), false); } + m_outputBuffer.append(outBuffer.ptr(), outBuffer.size()); + } else { + while (it.hasNext()) { + PacketType currentType = it.peekNext()->type(); + PacketCompressionMode currentCompressionMode = it.peekNext()->compressionMode(); + + DataStreamBuffer packetBuffer; + while (it.hasNext() + && it.peekNext()->type() == currentType + && it.peekNext()->compressionMode() == currentCompressionMode) { + if (legacy()) + it.next()->writeLegacy(packetBuffer); + else + it.next()->write(packetBuffer); + } - // Packets must read and write actual data, because this is used to - // determine packet count - starAssert(!packetBuffer.empty()); + // Packets must read and write actual data, because this is used to + // determine packet count + starAssert(!packetBuffer.empty()); - ByteArray compressedPackets; - bool mustCompress = currentCompressionMode == PacketCompressionMode::Enabled; - bool perhapsCompress = currentCompressionMode == PacketCompressionMode::Automatic && packetBuffer.size() > 64; - if (mustCompress || perhapsCompress) - compressedPackets = compressData(packetBuffer.data()); + ByteArray compressedPackets; + bool mustCompress = currentCompressionMode == PacketCompressionMode::Enabled; + bool perhapsCompress = currentCompressionMode == PacketCompressionMode::Automatic && packetBuffer.size() > 64; + if (mustCompress || perhapsCompress) + compressedPackets = compressData(packetBuffer.data()); - DataStreamBuffer outBuffer; - outBuffer.write(currentType); + DataStreamBuffer outBuffer; + outBuffer.write(currentType); - if (!compressedPackets.empty() && (mustCompress || compressedPackets.size() < packetBuffer.size())) { - outBuffer.writeVlqI(-(int)(compressedPackets.size())); - outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size()); - m_outgoingStats.mix(currentType, compressedPackets.size()); - } else { - outBuffer.writeVlqI((int)(packetBuffer.size())); - outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size()); - m_outgoingStats.mix(currentType, packetBuffer.size()); + if (!compressedPackets.empty() && (mustCompress || compressedPackets.size() < packetBuffer.size())) { + outBuffer.writeVlqI(-(int)(compressedPackets.size())); + outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size()); + m_outgoingStats.mix(currentType, compressedPackets.size()); + } else { + outBuffer.writeVlqI((int)(packetBuffer.size())); + outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size()); + m_outgoingStats.mix(currentType, packetBuffer.size()); + } + m_outputBuffer.append(outBuffer.takeData()); } - m_outputBuffer.append(outBuffer.takeData()); } } List<PacketPtr> TcpPacketSocket::receivePackets() { - uint64_t const PacketSizeLimit = 64<<20; + // How large can uncompressed packets be + // this limit is now also used during decompression + uint64_t const PacketSizeLimit = 64 << 20; + // How many packets can be batched together into one compressed chunk at once + uint64_t const PacketBatchLimit = 131072; List<PacketPtr> packets; try { - while (!m_inputBuffer.empty()) { + DataStreamExternalBuffer ds(m_inputBuffer.ptr(), m_inputBuffer.size()); + bool atLeastOne = false; + while (!ds.atEnd()) { PacketType packetType; uint64_t packetSize = 0; bool packetCompressed = false; - DataStreamBuffer ds(m_inputBuffer); try { packetType = ds.read<PacketType>(); int64_t len = ds.readVlqI(); - if (len < 0) { - packetSize = -len; - packetCompressed = true; - } else { - packetSize = len; - packetCompressed = false; - } + packetCompressed = len < 0; + packetSize = packetCompressed ? -len : len; } catch (EofException const&) { // Guard against not having the entire packet header available when // trying to read. @@ -206,19 +231,28 @@ List<PacketPtr> TcpPacketSocket::receivePackets() { } if (packetSize > PacketSizeLimit) - throw IOException::format("Packet size {} exceeds maximum allowed packet size!", packetSize); + throw IOException::format("{} bytes large {} exceeds max size!", packetSize, PacketTypeNames.getRight(packetType)); if (packetSize > ds.size() - ds.pos()) break; - ByteArray packetBytes = ds.readBytes(packetSize); - if (packetCompressed) - packetBytes = uncompressData(packetBytes); + atLeastOne = true; + m_incomingStats.mix(packetType, packetSize, !m_useCompressionStream); - m_incomingStats.mix(packetType, packetSize); + DataStreamExternalBuffer packetStream(ds.ptr() + ds.pos(), packetSize); + ByteArray uncompressed; + if (packetCompressed) { + uncompressed = uncompressData(packetStream.ptr() + packetStream.pos(), packetSize, PacketSizeLimit); + packetStream.reset(uncompressed.ptr(), uncompressed.size()); + } + ds.seek(packetSize, IOSeek::Relative); - DataStreamBuffer packetStream(std::move(packetBytes)); + size_t count = 0; do { + if (++count > PacketBatchLimit) { + throw IOException::format("Packet batch limit {} reached while reading {}s!", PacketBatchLimit, PacketTypeNames.getRight(packetType)); + break; + } PacketPtr packet = createPacket(packetType); packet->setCompressionMode(packetCompressed ? PacketCompressionMode::Enabled : PacketCompressionMode::Disabled); if (legacy()) @@ -227,9 +261,9 @@ List<PacketPtr> TcpPacketSocket::receivePackets() { packet->read(packetStream); packets.append(std::move(packet)); } while (!packetStream.atEnd()); - - m_inputBuffer = ds.readBytes(ds.size() - ds.pos()); } + if (atLeastOne) + m_inputBuffer.trimLeft(ds.pos()); } catch (IOException const& e) { Logger::warn("I/O error in TcpPacketSocket::readPackets, closing: {}", outputException(e, false)); m_inputBuffer.clear(); @@ -248,15 +282,25 @@ bool TcpPacketSocket::writeData() { bool dataSent = false; try { - if (m_outputBuffer.empty()) - return false; - - while (!m_outputBuffer.empty()) { - size_t writtenAmount = m_socket->send(m_outputBuffer.ptr(), m_outputBuffer.size()); - if (writtenAmount == 0) - break; - dataSent = true; - m_outputBuffer.trimLeft(writtenAmount); + if (m_useCompressionStream) { + auto compressed = m_compressionStream.compress(m_outputBuffer); + m_outputBuffer.clear(); + + m_compressedBuffer.append(compressed.ptr(), compressed.size()); + size_t written = m_socket->send(m_compressedBuffer.ptr(), m_compressedBuffer.size()); + if (written > 0) { + dataSent = true; + m_compressedBuffer.trimLeft(written); + m_outgoingStats.mix(written); + } + } else { + while (!m_outputBuffer.empty()) { + size_t written = m_socket->send(m_outputBuffer.ptr(), m_outputBuffer.size()); + if (written == 0) + break; + dataSent = true; + m_outputBuffer.trimLeft(written); + } } } catch (SocketClosedException const& e) { Logger::debug("TcpPacketSocket socket closed: {}", outputException(e, false)); @@ -276,7 +320,13 @@ bool TcpPacketSocket::readData() { if (readAmount == 0) break; dataReceived = true; - m_inputBuffer.append(readBuffer, readAmount); + m_incomingStats.mix(readAmount); + if (m_useCompressionStream) { + auto decompressed = m_decompressionStream.decompress(readBuffer, readAmount); + m_inputBuffer.append(decompressed.ptr(), decompressed.size()); + } else { + m_inputBuffer.append(readBuffer, readAmount); + } } } catch (SocketClosedException const& e) { Logger::debug("TcpPacketSocket socket closed: {}", outputException(e, false)); @@ -295,8 +345,12 @@ Maybe<PacketStats> TcpPacketSocket::outgoingStats() const { return m_outgoingStats.stats(); } -TcpPacketSocket::TcpPacketSocket(TcpSocketPtr socket) - : m_socket(std::move(socket)) {} +void TcpPacketSocket::setLegacy(bool legacy) { + m_useCompressionStream = !legacy; + PacketSocket::setLegacy(legacy); +} + +TcpPacketSocket::TcpPacketSocket(TcpSocketPtr socket) : m_socket(std::move(socket)) {} P2PPacketSocketUPtr P2PPacketSocket::open(P2PSocketUPtr socket) { return P2PPacketSocketUPtr(new P2PPacketSocket(std::move(socket))); |