diff options
author | Kae <80987908+Novaenia@users.noreply.github.com> | 2024-07-27 14:09:12 +1000 |
---|---|---|
committer | Kae <80987908+Novaenia@users.noreply.github.com> | 2024-07-27 14:09:12 +1000 |
commit | 951fe787c437d43982f5eda77e1f19bf83998bfd (patch) | |
tree | d2e0c9a57451be75a968fedb6b7c407911ffbef4 /source/game/StarNetPacketSocket.cpp | |
parent | 9e7a2e9bb9976a0f23fad5c0c91b151ffa2db24c (diff) |
Networking changes (needs P2P testing, requires clients to update unfortunately)
Diffstat (limited to 'source/game/StarNetPacketSocket.cpp')
-rw-r--r-- | source/game/StarNetPacketSocket.cpp | 112 |
1 files changed, 65 insertions, 47 deletions
diff --git a/source/game/StarNetPacketSocket.cpp b/source/game/StarNetPacketSocket.cpp index cd5c749..436726a 100644 --- a/source/game/StarNetPacketSocket.cpp +++ b/source/game/StarNetPacketSocket.cpp @@ -66,6 +66,9 @@ Maybe<PacketStats> PacketSocket::outgoingStats() const { void PacketSocket::setLegacy(bool legacy) { m_legacy = legacy; } bool PacketSocket::legacy() const { return m_legacy; } +void CompressedPacketSocket::setCompressionStreamEnabled(bool enabled) { m_useCompressionStream = enabled; } +bool CompressedPacketSocket::compressionStreamEnabled() const { return m_useCompressionStream; } + pair<LocalPacketSocketUPtr, LocalPacketSocketUPtr> LocalPacketSocket::openPair() { auto lhsIncomingPipe = make_shared<Pipe>(); auto rhsIncomingPipe = make_shared<Pipe>(); @@ -146,7 +149,7 @@ void TcpPacketSocket::close() { void TcpPacketSocket::sendPackets(List<PacketPtr> packets) { auto it = makeSMutableIterator(packets); - if (m_useCompressionStream) { + if (compressionStreamEnabled()) { DataStreamBuffer outBuffer; while (it.hasNext()) { PacketPtr& packet = it.next(); @@ -233,7 +236,7 @@ List<PacketPtr> TcpPacketSocket::receivePackets() { if (packetSize > ds.remaining()) break; - m_incomingStats.mix(packetType, packetSize, !m_useCompressionStream); + m_incomingStats.mix(packetType, packetSize, !compressionStreamEnabled()); DataStreamExternalBuffer packetStream(ds.ptr() + ds.pos(), packetSize); ByteArray uncompressed; @@ -280,19 +283,17 @@ bool TcpPacketSocket::writeData() { bool dataSent = false; try { if (!m_outputBuffer.empty()) { - if (m_useCompressionStream) { - auto compressed = m_compressionStream.compress(m_outputBuffer); + if (compressionStreamEnabled()) { + auto compressedBuffer = m_compressionStream.compress(m_outputBuffer); m_outputBuffer.clear(); - - m_compressedBuffer.append(compressed.ptr(), compressed.size()); do { - size_t written = m_socket->send(m_compressedBuffer.ptr(), m_compressedBuffer.size()); + size_t written = m_socket->send(compressedBuffer.ptr(), compressedBuffer.size()); if (written > 0) { dataSent = true; - m_compressedBuffer.trimLeft(written); + compressedBuffer.trimLeft(written); m_outgoingStats.mix(written); } - } while (!m_compressedBuffer.empty()); + } while (!compressedBuffer.empty()); } else { do { size_t written = m_socket->send(m_outputBuffer.ptr(), m_outputBuffer.size()); @@ -321,10 +322,10 @@ bool TcpPacketSocket::readData() { if (readAmount == 0) break; dataReceived = true; - if (m_useCompressionStream) { + if (compressionStreamEnabled()) { m_incomingStats.mix(readAmount); auto decompressed = m_decompressionStream.decompress(readBuffer, readAmount); - m_inputBuffer.append(decompressed.ptr(), decompressed.size()); + m_inputBuffer.append(decompressed); } else { m_inputBuffer.append(readBuffer, readAmount); } @@ -347,7 +348,6 @@ Maybe<PacketStats> TcpPacketSocket::outgoingStats() const { } void TcpPacketSocket::setLegacy(bool legacy) { - m_useCompressionStream = !legacy; PacketSocket::setLegacy(legacy); } @@ -368,43 +368,58 @@ void P2PPacketSocket::close() { void P2PPacketSocket::sendPackets(List<PacketPtr> packets) { auto it = makeSMutableIterator(packets); - while (it.hasNext()) { - PacketType currentType = it.peekNext()->type(); - PacketCompressionMode currentCompressionMode = it.peekNext()->compressionMode(); - - DataStreamBuffer packetBuffer; - while (it.hasNext() - && it.peekNext()->type() == currentType - && it.peekNext()->compressionMode() == currentCompressionMode) { - if (legacy()) - it.next()->writeLegacy(packetBuffer); - else - it.next()->write(packetBuffer); + if (compressionStreamEnabled()) { + DataStreamBuffer outBuffer; + while (it.hasNext()) { + PacketPtr& packet = it.next(); + auto packetType = packet->type(); + DataStreamBuffer packetBuffer; + packet->write(packetBuffer); + outBuffer.write(packetType); + outBuffer.writeVlqI((int)packetBuffer.size()); + outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size()); + m_outgoingStats.mix(packetType, packetBuffer.size(), false); } + m_outputMessages.append(m_compressionStream.compress(outBuffer.data())); + } else { + while (it.hasNext()) { + PacketType currentType = it.peekNext()->type(); + PacketCompressionMode currentCompressionMode = it.peekNext()->compressionMode(); + + DataStreamBuffer packetBuffer; + while (it.hasNext() + && it.peekNext()->type() == currentType + && it.peekNext()->compressionMode() == currentCompressionMode) { + if (legacy()) + it.next()->writeLegacy(packetBuffer); + else + it.next()->write(packetBuffer); + } - // Packets must read and write actual data, because this is used to - // determine packet count - starAssert(!packetBuffer.empty()); + // Packets must read and write actual data, because this is used to + // determine packet count + starAssert(!packetBuffer.empty()); - ByteArray compressedPackets; - bool mustCompress = currentCompressionMode == PacketCompressionMode::Enabled; - bool perhapsCompress = currentCompressionMode == PacketCompressionMode::Automatic && packetBuffer.size() > 64; - if (mustCompress || perhapsCompress) - compressedPackets = compressData(packetBuffer.data()); + ByteArray compressedPackets; + bool mustCompress = currentCompressionMode == PacketCompressionMode::Enabled; + bool perhapsCompress = currentCompressionMode == PacketCompressionMode::Automatic && packetBuffer.size() > 64; + if (mustCompress || perhapsCompress) + compressedPackets = compressData(packetBuffer.data()); - DataStreamBuffer outBuffer; - outBuffer.write(currentType); - - if (!compressedPackets.empty() && (mustCompress || compressedPackets.size() < packetBuffer.size())) { - outBuffer.write<bool>(true); - outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size()); - m_outgoingStats.mix(currentType, compressedPackets.size()); - } else { - outBuffer.write<bool>(false); - outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size()); - m_outgoingStats.mix(currentType, packetBuffer.size()); + DataStreamBuffer outBuffer; + outBuffer.write(currentType); + + if (!compressedPackets.empty() && (mustCompress || compressedPackets.size() < packetBuffer.size())) { + outBuffer.write<bool>(true); + outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size()); + m_outgoingStats.mix(currentType, compressedPackets.size()); + } else { + outBuffer.write<bool>(false); + outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size()); + m_outgoingStats.mix(currentType, packetBuffer.size()); + } + m_outputMessages.append(outBuffer.takeData()); } - m_outputMessages.append(outBuffer.takeData()); } } @@ -422,9 +437,9 @@ List<PacketPtr> P2PPacketSocket::receivePackets() { if (packetCompressed) packetBytes = uncompressData(packetBytes); - m_incomingStats.mix(packetType, packetSize); + m_incomingStats.mix(packetType, packetSize, !compressionStreamEnabled()); - DataStreamBuffer packetStream(std::move(packetBytes)); + DataStreamExternalBuffer packetStream(packetBytes); do { PacketPtr packet = createPacket(packetType); packet->setCompressionMode(packetCompressed ? PacketCompressionMode::Enabled : PacketCompressionMode::Disabled); @@ -468,7 +483,10 @@ bool P2PPacketSocket::readData() { if (m_socket) { while (auto message = m_socket->receiveMessage()) { - m_inputMessages.append(message.take()); + m_incomingStats.mix(message->size()); + m_inputMessages.append(compressionStreamEnabled() + ? m_decompressionStream.decompress(*message) + : *message); workDone = true; } } |