Веб-сайт самохостера Lotigara

summaryrefslogtreecommitdiff
path: root/source/game/StarNetPacketSocket.cpp
diff options
context:
space:
mode:
authorKae <80987908+Novaenia@users.noreply.github.com>2024-03-14 21:41:53 +1100
committerKae <80987908+Novaenia@users.noreply.github.com>2024-03-14 21:41:53 +1100
commit662b83ff92cc2316fb962ff1608359f6d705a5f0 (patch)
treed0e7d15887ed14bd252e6e61888710c8bac2200a /source/game/StarNetPacketSocket.cpp
parent8164e5ae6fa33c9ec2a14f107585a7cbe7fbf813 (diff)
Initial commit of experimental zstd network compression
currently a bit buggy
Diffstat (limited to 'source/game/StarNetPacketSocket.cpp')
-rw-r--r--source/game/StarNetPacketSocket.cpp196
1 files changed, 125 insertions, 71 deletions
diff --git a/source/game/StarNetPacketSocket.cpp b/source/game/StarNetPacketSocket.cpp
index 48b2082..2ebaadb 100644
--- a/source/game/StarNetPacketSocket.cpp
+++ b/source/game/StarNetPacketSocket.cpp
@@ -6,17 +6,27 @@
namespace Star {
PacketStatCollector::PacketStatCollector(float calculationWindow)
- : m_calculationWindow(calculationWindow), m_stats(), m_lastMixTime(0) {}
+ : m_calculationWindow(calculationWindow), m_stats(), m_totalBytes(0), m_lastMixTime(0) {}
-void PacketStatCollector::mix(PacketType type, size_t size) {
+void PacketStatCollector::mix(size_t size) {
+ calculate();
+ m_totalBytes += size;
+}
+
+void PacketStatCollector::mix(PacketType type, size_t size, bool addToTotal) {
calculate();
m_unmixed[type] += size;
+ if (addToTotal)
+ m_totalBytes += size;
}
-void PacketStatCollector::mix(HashMap<PacketType, size_t> const& sizes) {
+void PacketStatCollector::mix(HashMap<PacketType, size_t> const& sizes, bool addToTotal) {
calculate();
- for (auto const& p : sizes)
+ for (auto const& p : sizes) {
+ if (addToTotal)
+ m_totalBytes += p.second;
m_unmixed[p.first] += p.second;
+ }
}
PacketStats PacketStatCollector::stats() const {
@@ -31,18 +41,19 @@ void PacketStatCollector::calculate() {
m_lastMixTime = currentTime;
m_stats.worstPacketSize = 0;
- float total = 0.0f;
+ if (abs(elapsedTime) - m_calculationWindow < 0.0125f)
+ elapsedTime = m_calculationWindow;
+
for (auto& pair : m_unmixed) {
if (pair.second > m_stats.worstPacketSize) {
m_stats.worstPacketType = pair.first;
m_stats.worstPacketSize = pair.second;
}
- auto& bytes = m_stats.packetBytesPerSecond[pair.first];
- bytes = pair.second / elapsedTime;
- total += bytes;
+ m_stats.packetBytesPerSecond[pair.first] = round(pair.second / elapsedTime);
}
- m_stats.bytesPerSecond = total;
+ m_stats.bytesPerSecond = round(float(m_totalBytes) / elapsedTime);
+ m_totalBytes = 0;
m_unmixed.clear();
}
}
@@ -138,67 +149,81 @@ void TcpPacketSocket::close() {
void TcpPacketSocket::sendPackets(List<PacketPtr> packets) {
auto it = makeSMutableIterator(packets);
-
- while (it.hasNext()) {
- PacketType currentType = it.peekNext()->type();
- PacketCompressionMode currentCompressionMode = it.peekNext()->compressionMode();
-
- DataStreamBuffer packetBuffer;
- while (it.hasNext()
- && it.peekNext()->type() == currentType
- && it.peekNext()->compressionMode() == currentCompressionMode) {
- if (legacy())
- it.next()->writeLegacy(packetBuffer);
- else
- it.next()->write(packetBuffer);
+ if (m_useCompressionStream) {
+ DataStreamBuffer outBuffer;
+ while (it.hasNext()) {
+ PacketPtr& packet = it.next();
+ auto packetType = packet->type();
+ DataStreamBuffer packetBuffer;
+ packet->write(packetBuffer);
+ outBuffer.write(packetType);
+ outBuffer.writeVlqI((int)packetBuffer.size());
+ outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size());
+ m_outgoingStats.mix(packetType, packetBuffer.size(), false);
}
+ m_outputBuffer.append(outBuffer.ptr(), outBuffer.size());
+ } else {
+ while (it.hasNext()) {
+ PacketType currentType = it.peekNext()->type();
+ PacketCompressionMode currentCompressionMode = it.peekNext()->compressionMode();
+
+ DataStreamBuffer packetBuffer;
+ while (it.hasNext()
+ && it.peekNext()->type() == currentType
+ && it.peekNext()->compressionMode() == currentCompressionMode) {
+ if (legacy())
+ it.next()->writeLegacy(packetBuffer);
+ else
+ it.next()->write(packetBuffer);
+ }
- // Packets must read and write actual data, because this is used to
- // determine packet count
- starAssert(!packetBuffer.empty());
+ // Packets must read and write actual data, because this is used to
+ // determine packet count
+ starAssert(!packetBuffer.empty());
- ByteArray compressedPackets;
- bool mustCompress = currentCompressionMode == PacketCompressionMode::Enabled;
- bool perhapsCompress = currentCompressionMode == PacketCompressionMode::Automatic && packetBuffer.size() > 64;
- if (mustCompress || perhapsCompress)
- compressedPackets = compressData(packetBuffer.data());
+ ByteArray compressedPackets;
+ bool mustCompress = currentCompressionMode == PacketCompressionMode::Enabled;
+ bool perhapsCompress = currentCompressionMode == PacketCompressionMode::Automatic && packetBuffer.size() > 64;
+ if (mustCompress || perhapsCompress)
+ compressedPackets = compressData(packetBuffer.data());
- DataStreamBuffer outBuffer;
- outBuffer.write(currentType);
+ DataStreamBuffer outBuffer;
+ outBuffer.write(currentType);
- if (!compressedPackets.empty() && (mustCompress || compressedPackets.size() < packetBuffer.size())) {
- outBuffer.writeVlqI(-(int)(compressedPackets.size()));
- outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size());
- m_outgoingStats.mix(currentType, compressedPackets.size());
- } else {
- outBuffer.writeVlqI((int)(packetBuffer.size()));
- outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size());
- m_outgoingStats.mix(currentType, packetBuffer.size());
+ if (!compressedPackets.empty() && (mustCompress || compressedPackets.size() < packetBuffer.size())) {
+ outBuffer.writeVlqI(-(int)(compressedPackets.size()));
+ outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size());
+ m_outgoingStats.mix(currentType, compressedPackets.size());
+ } else {
+ outBuffer.writeVlqI((int)(packetBuffer.size()));
+ outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size());
+ m_outgoingStats.mix(currentType, packetBuffer.size());
+ }
+ m_outputBuffer.append(outBuffer.takeData());
}
- m_outputBuffer.append(outBuffer.takeData());
}
}
List<PacketPtr> TcpPacketSocket::receivePackets() {
- uint64_t const PacketSizeLimit = 64<<20;
+ // How large can uncompressed packets be
+ // this limit is now also used during decompression
+ uint64_t const PacketSizeLimit = 64 << 20;
+ // How many packets can be batched together into one compressed chunk at once
+ uint64_t const PacketBatchLimit = 131072;
List<PacketPtr> packets;
try {
- while (!m_inputBuffer.empty()) {
+ DataStreamExternalBuffer ds(m_inputBuffer.ptr(), m_inputBuffer.size());
+ bool atLeastOne = false;
+ while (!ds.atEnd()) {
PacketType packetType;
uint64_t packetSize = 0;
bool packetCompressed = false;
- DataStreamBuffer ds(m_inputBuffer);
try {
packetType = ds.read<PacketType>();
int64_t len = ds.readVlqI();
- if (len < 0) {
- packetSize = -len;
- packetCompressed = true;
- } else {
- packetSize = len;
- packetCompressed = false;
- }
+ packetCompressed = len < 0;
+ packetSize = packetCompressed ? -len : len;
} catch (EofException const&) {
// Guard against not having the entire packet header available when
// trying to read.
@@ -206,19 +231,28 @@ List<PacketPtr> TcpPacketSocket::receivePackets() {
}
if (packetSize > PacketSizeLimit)
- throw IOException::format("Packet size {} exceeds maximum allowed packet size!", packetSize);
+ throw IOException::format("{} bytes large {} exceeds max size!", packetSize, PacketTypeNames.getRight(packetType));
if (packetSize > ds.size() - ds.pos())
break;
- ByteArray packetBytes = ds.readBytes(packetSize);
- if (packetCompressed)
- packetBytes = uncompressData(packetBytes);
+ atLeastOne = true;
+ m_incomingStats.mix(packetType, packetSize, !m_useCompressionStream);
- m_incomingStats.mix(packetType, packetSize);
+ DataStreamExternalBuffer packetStream(ds.ptr() + ds.pos(), packetSize);
+ ByteArray uncompressed;
+ if (packetCompressed) {
+ uncompressed = uncompressData(packetStream.ptr() + packetStream.pos(), packetSize, PacketSizeLimit);
+ packetStream.reset(uncompressed.ptr(), uncompressed.size());
+ }
+ ds.seek(packetSize, IOSeek::Relative);
- DataStreamBuffer packetStream(std::move(packetBytes));
+ size_t count = 0;
do {
+ if (++count > PacketBatchLimit) {
+ throw IOException::format("Packet batch limit {} reached while reading {}s!", PacketBatchLimit, PacketTypeNames.getRight(packetType));
+ break;
+ }
PacketPtr packet = createPacket(packetType);
packet->setCompressionMode(packetCompressed ? PacketCompressionMode::Enabled : PacketCompressionMode::Disabled);
if (legacy())
@@ -227,9 +261,9 @@ List<PacketPtr> TcpPacketSocket::receivePackets() {
packet->read(packetStream);
packets.append(std::move(packet));
} while (!packetStream.atEnd());
-
- m_inputBuffer = ds.readBytes(ds.size() - ds.pos());
}
+ if (atLeastOne)
+ m_inputBuffer.trimLeft(ds.pos());
} catch (IOException const& e) {
Logger::warn("I/O error in TcpPacketSocket::readPackets, closing: {}", outputException(e, false));
m_inputBuffer.clear();
@@ -248,15 +282,25 @@ bool TcpPacketSocket::writeData() {
bool dataSent = false;
try {
- if (m_outputBuffer.empty())
- return false;
-
- while (!m_outputBuffer.empty()) {
- size_t writtenAmount = m_socket->send(m_outputBuffer.ptr(), m_outputBuffer.size());
- if (writtenAmount == 0)
- break;
- dataSent = true;
- m_outputBuffer.trimLeft(writtenAmount);
+ if (m_useCompressionStream) {
+ auto compressed = m_compressionStream.compress(m_outputBuffer);
+ m_outputBuffer.clear();
+
+ m_compressedBuffer.append(compressed.ptr(), compressed.size());
+ size_t written = m_socket->send(m_compressedBuffer.ptr(), m_compressedBuffer.size());
+ if (written > 0) {
+ dataSent = true;
+ m_compressedBuffer.trimLeft(written);
+ m_outgoingStats.mix(written);
+ }
+ } else {
+ while (!m_outputBuffer.empty()) {
+ size_t written = m_socket->send(m_outputBuffer.ptr(), m_outputBuffer.size());
+ if (written == 0)
+ break;
+ dataSent = true;
+ m_outputBuffer.trimLeft(written);
+ }
}
} catch (SocketClosedException const& e) {
Logger::debug("TcpPacketSocket socket closed: {}", outputException(e, false));
@@ -276,7 +320,13 @@ bool TcpPacketSocket::readData() {
if (readAmount == 0)
break;
dataReceived = true;
- m_inputBuffer.append(readBuffer, readAmount);
+ m_incomingStats.mix(readAmount);
+ if (m_useCompressionStream) {
+ auto decompressed = m_decompressionStream.decompress(readBuffer, readAmount);
+ m_inputBuffer.append(decompressed.ptr(), decompressed.size());
+ } else {
+ m_inputBuffer.append(readBuffer, readAmount);
+ }
}
} catch (SocketClosedException const& e) {
Logger::debug("TcpPacketSocket socket closed: {}", outputException(e, false));
@@ -295,8 +345,12 @@ Maybe<PacketStats> TcpPacketSocket::outgoingStats() const {
return m_outgoingStats.stats();
}
-TcpPacketSocket::TcpPacketSocket(TcpSocketPtr socket)
- : m_socket(std::move(socket)) {}
+void TcpPacketSocket::setLegacy(bool legacy) {
+ m_useCompressionStream = !legacy;
+ PacketSocket::setLegacy(legacy);
+}
+
+TcpPacketSocket::TcpPacketSocket(TcpSocketPtr socket) : m_socket(std::move(socket)) {}
P2PPacketSocketUPtr P2PPacketSocket::open(P2PSocketUPtr socket) {
return P2PPacketSocketUPtr(new P2PPacketSocket(std::move(socket)));