Веб-сайт самохостера Lotigara

summaryrefslogtreecommitdiff
path: root/source/game/StarNetPacketSocket.cpp
diff options
context:
space:
mode:
authorKae <80987908+Novaenia@users.noreply.github.com>2024-07-27 14:09:12 +1000
committerKae <80987908+Novaenia@users.noreply.github.com>2024-07-27 14:09:12 +1000
commit951fe787c437d43982f5eda77e1f19bf83998bfd (patch)
treed2e0c9a57451be75a968fedb6b7c407911ffbef4 /source/game/StarNetPacketSocket.cpp
parent9e7a2e9bb9976a0f23fad5c0c91b151ffa2db24c (diff)
Networking changes (needs P2P testing, requires clients to update unfortunately)
Diffstat (limited to 'source/game/StarNetPacketSocket.cpp')
-rw-r--r--source/game/StarNetPacketSocket.cpp112
1 files changed, 65 insertions, 47 deletions
diff --git a/source/game/StarNetPacketSocket.cpp b/source/game/StarNetPacketSocket.cpp
index cd5c749..436726a 100644
--- a/source/game/StarNetPacketSocket.cpp
+++ b/source/game/StarNetPacketSocket.cpp
@@ -66,6 +66,9 @@ Maybe<PacketStats> PacketSocket::outgoingStats() const {
void PacketSocket::setLegacy(bool legacy) { m_legacy = legacy; }
bool PacketSocket::legacy() const { return m_legacy; }
+void CompressedPacketSocket::setCompressionStreamEnabled(bool enabled) { m_useCompressionStream = enabled; }
+bool CompressedPacketSocket::compressionStreamEnabled() const { return m_useCompressionStream; }
+
pair<LocalPacketSocketUPtr, LocalPacketSocketUPtr> LocalPacketSocket::openPair() {
auto lhsIncomingPipe = make_shared<Pipe>();
auto rhsIncomingPipe = make_shared<Pipe>();
@@ -146,7 +149,7 @@ void TcpPacketSocket::close() {
void TcpPacketSocket::sendPackets(List<PacketPtr> packets) {
auto it = makeSMutableIterator(packets);
- if (m_useCompressionStream) {
+ if (compressionStreamEnabled()) {
DataStreamBuffer outBuffer;
while (it.hasNext()) {
PacketPtr& packet = it.next();
@@ -233,7 +236,7 @@ List<PacketPtr> TcpPacketSocket::receivePackets() {
if (packetSize > ds.remaining())
break;
- m_incomingStats.mix(packetType, packetSize, !m_useCompressionStream);
+ m_incomingStats.mix(packetType, packetSize, !compressionStreamEnabled());
DataStreamExternalBuffer packetStream(ds.ptr() + ds.pos(), packetSize);
ByteArray uncompressed;
@@ -280,19 +283,17 @@ bool TcpPacketSocket::writeData() {
bool dataSent = false;
try {
if (!m_outputBuffer.empty()) {
- if (m_useCompressionStream) {
- auto compressed = m_compressionStream.compress(m_outputBuffer);
+ if (compressionStreamEnabled()) {
+ auto compressedBuffer = m_compressionStream.compress(m_outputBuffer);
m_outputBuffer.clear();
-
- m_compressedBuffer.append(compressed.ptr(), compressed.size());
do {
- size_t written = m_socket->send(m_compressedBuffer.ptr(), m_compressedBuffer.size());
+ size_t written = m_socket->send(compressedBuffer.ptr(), compressedBuffer.size());
if (written > 0) {
dataSent = true;
- m_compressedBuffer.trimLeft(written);
+ compressedBuffer.trimLeft(written);
m_outgoingStats.mix(written);
}
- } while (!m_compressedBuffer.empty());
+ } while (!compressedBuffer.empty());
} else {
do {
size_t written = m_socket->send(m_outputBuffer.ptr(), m_outputBuffer.size());
@@ -321,10 +322,10 @@ bool TcpPacketSocket::readData() {
if (readAmount == 0)
break;
dataReceived = true;
- if (m_useCompressionStream) {
+ if (compressionStreamEnabled()) {
m_incomingStats.mix(readAmount);
auto decompressed = m_decompressionStream.decompress(readBuffer, readAmount);
- m_inputBuffer.append(decompressed.ptr(), decompressed.size());
+ m_inputBuffer.append(decompressed);
} else {
m_inputBuffer.append(readBuffer, readAmount);
}
@@ -347,7 +348,6 @@ Maybe<PacketStats> TcpPacketSocket::outgoingStats() const {
}
void TcpPacketSocket::setLegacy(bool legacy) {
- m_useCompressionStream = !legacy;
PacketSocket::setLegacy(legacy);
}
@@ -368,43 +368,58 @@ void P2PPacketSocket::close() {
void P2PPacketSocket::sendPackets(List<PacketPtr> packets) {
auto it = makeSMutableIterator(packets);
- while (it.hasNext()) {
- PacketType currentType = it.peekNext()->type();
- PacketCompressionMode currentCompressionMode = it.peekNext()->compressionMode();
-
- DataStreamBuffer packetBuffer;
- while (it.hasNext()
- && it.peekNext()->type() == currentType
- && it.peekNext()->compressionMode() == currentCompressionMode) {
- if (legacy())
- it.next()->writeLegacy(packetBuffer);
- else
- it.next()->write(packetBuffer);
+ if (compressionStreamEnabled()) {
+ DataStreamBuffer outBuffer;
+ while (it.hasNext()) {
+ PacketPtr& packet = it.next();
+ auto packetType = packet->type();
+ DataStreamBuffer packetBuffer;
+ packet->write(packetBuffer);
+ outBuffer.write(packetType);
+ outBuffer.writeVlqI((int)packetBuffer.size());
+ outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size());
+ m_outgoingStats.mix(packetType, packetBuffer.size(), false);
}
+ m_outputMessages.append(m_compressionStream.compress(outBuffer.data()));
+ } else {
+ while (it.hasNext()) {
+ PacketType currentType = it.peekNext()->type();
+ PacketCompressionMode currentCompressionMode = it.peekNext()->compressionMode();
+
+ DataStreamBuffer packetBuffer;
+ while (it.hasNext()
+ && it.peekNext()->type() == currentType
+ && it.peekNext()->compressionMode() == currentCompressionMode) {
+ if (legacy())
+ it.next()->writeLegacy(packetBuffer);
+ else
+ it.next()->write(packetBuffer);
+ }
- // Packets must read and write actual data, because this is used to
- // determine packet count
- starAssert(!packetBuffer.empty());
+ // Packets must read and write actual data, because this is used to
+ // determine packet count
+ starAssert(!packetBuffer.empty());
- ByteArray compressedPackets;
- bool mustCompress = currentCompressionMode == PacketCompressionMode::Enabled;
- bool perhapsCompress = currentCompressionMode == PacketCompressionMode::Automatic && packetBuffer.size() > 64;
- if (mustCompress || perhapsCompress)
- compressedPackets = compressData(packetBuffer.data());
+ ByteArray compressedPackets;
+ bool mustCompress = currentCompressionMode == PacketCompressionMode::Enabled;
+ bool perhapsCompress = currentCompressionMode == PacketCompressionMode::Automatic && packetBuffer.size() > 64;
+ if (mustCompress || perhapsCompress)
+ compressedPackets = compressData(packetBuffer.data());
- DataStreamBuffer outBuffer;
- outBuffer.write(currentType);
-
- if (!compressedPackets.empty() && (mustCompress || compressedPackets.size() < packetBuffer.size())) {
- outBuffer.write<bool>(true);
- outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size());
- m_outgoingStats.mix(currentType, compressedPackets.size());
- } else {
- outBuffer.write<bool>(false);
- outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size());
- m_outgoingStats.mix(currentType, packetBuffer.size());
+ DataStreamBuffer outBuffer;
+ outBuffer.write(currentType);
+
+ if (!compressedPackets.empty() && (mustCompress || compressedPackets.size() < packetBuffer.size())) {
+ outBuffer.write<bool>(true);
+ outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size());
+ m_outgoingStats.mix(currentType, compressedPackets.size());
+ } else {
+ outBuffer.write<bool>(false);
+ outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size());
+ m_outgoingStats.mix(currentType, packetBuffer.size());
+ }
+ m_outputMessages.append(outBuffer.takeData());
}
- m_outputMessages.append(outBuffer.takeData());
}
}
@@ -422,9 +437,9 @@ List<PacketPtr> P2PPacketSocket::receivePackets() {
if (packetCompressed)
packetBytes = uncompressData(packetBytes);
- m_incomingStats.mix(packetType, packetSize);
+ m_incomingStats.mix(packetType, packetSize, !compressionStreamEnabled());
- DataStreamBuffer packetStream(std::move(packetBytes));
+ DataStreamExternalBuffer packetStream(packetBytes);
do {
PacketPtr packet = createPacket(packetType);
packet->setCompressionMode(packetCompressed ? PacketCompressionMode::Enabled : PacketCompressionMode::Disabled);
@@ -468,7 +483,10 @@ bool P2PPacketSocket::readData() {
if (m_socket) {
while (auto message = m_socket->receiveMessage()) {
- m_inputMessages.append(message.take());
+ m_incomingStats.mix(message->size());
+ m_inputMessages.append(compressionStreamEnabled()
+ ? m_decompressionStream.decompress(*message)
+ : *message);
workDone = true;
}
}