Веб-сайт самохостера Lotigara

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKae <80987908+Novaenia@users.noreply.github.com>2024-03-14 21:41:53 +1100
committerKae <80987908+Novaenia@users.noreply.github.com>2024-03-14 21:41:53 +1100
commit662b83ff92cc2316fb962ff1608359f6d705a5f0 (patch)
treed0e7d15887ed14bd252e6e61888710c8bac2200a
parent8164e5ae6fa33c9ec2a14f107585a7cbe7fbf813 (diff)
Initial commit of experimental zstd network compression
currently a bit buggy
-rw-r--r--source/CMakeLists.txt2
-rw-r--r--source/core/CMakeLists.txt2
-rw-r--r--source/core/StarBuffer.cpp4
-rw-r--r--source/core/StarBuffer.hpp2
-rw-r--r--source/core/StarCompression.cpp51
-rw-r--r--source/core/StarCompression.hpp6
-rw-r--r--source/core/StarDataStreamDevices.cpp8
-rw-r--r--source/core/StarDataStreamDevices.hpp3
-rw-r--r--source/core/StarZSTDCompression.cpp84
-rw-r--r--source/core/StarZSTDCompression.hpp44
-rw-r--r--source/game/StarNetPacketSocket.cpp196
-rw-r--r--source/game/StarNetPacketSocket.hpp20
-rw-r--r--source/game/StarNetPackets.cpp14
-rw-r--r--source/game/StarNetPackets.hpp8
-rw-r--r--source/game/StarUniverseServer.cpp2
-rw-r--r--source/vcpkg.json3
16 files changed, 343 insertions, 106 deletions
diff --git a/source/CMakeLists.txt b/source/CMakeLists.txt
index 6ed5bb5..526d956 100644
--- a/source/CMakeLists.txt
+++ b/source/CMakeLists.txt
@@ -443,6 +443,7 @@ find_package(PNG REQUIRED)
find_package(Freetype REQUIRED)
find_package(Opus CONFIG REQUIRED)
find_package(OggVorbis REQUIRED)
+find_package(zstd CONFIG REQUIRED)
include_directories(SYSTEM
${FREETYPE_INCLUDE_DIRS}
@@ -453,6 +454,7 @@ set(STAR_EXT_LIBS ${STAR_EXT_LIBS}
ZLIB::ZLIB
PNG::PNG
$<IF:$<TARGET_EXISTS:Freetype::Freetype>,Freetype::Freetype,freetype>
+ $<IF:$<TARGET_EXISTS:zstd::libzstd_shared>,zstd::libzstd_shared,zstd::libzstd_static>
Opus::opus
${VORBISFILE_LIBRARY}
${VORBIS_LIBRARY}
diff --git a/source/core/CMakeLists.txt b/source/core/CMakeLists.txt
index 687b9c5..4723686 100644
--- a/source/core/CMakeLists.txt
+++ b/source/core/CMakeLists.txt
@@ -126,6 +126,7 @@ SET (star_core_HEADERS
StarWeightedPool.hpp
StarWorkerPool.hpp
StarXXHash.hpp
+ StarZSTDCompression.hpp
)
SET (star_core_SOURCES
@@ -181,6 +182,7 @@ SET (star_core_SOURCES
StarUnicode.cpp
StarUuid.cpp
StarWorkerPool.cpp
+ StarZSTDCompression.cpp
)
IF (STAR_SYSTEM_FAMILY_UNIX)
diff --git a/source/core/StarBuffer.cpp b/source/core/StarBuffer.cpp
index 0d998af..5b5b2e4 100644
--- a/source/core/StarBuffer.cpp
+++ b/source/core/StarBuffer.cpp
@@ -263,6 +263,10 @@ bool ExternalBuffer::empty() const {
return m_size == 0;
}
+ExternalBuffer::operator bool() const {
+ return m_size == 0;
+}
+
void ExternalBuffer::reset(char const* externalData, size_t len) {
m_pos = 0;
m_bytes = externalData;
diff --git a/source/core/StarBuffer.hpp b/source/core/StarBuffer.hpp
index 0e1213e..0f9864a 100644
--- a/source/core/StarBuffer.hpp
+++ b/source/core/StarBuffer.hpp
@@ -105,6 +105,8 @@ public:
// Clears buffer, moves position to 0.
bool empty() const;
+ operator bool() const;
+
// Reset buffer with new contents, moves position to 0.
void reset(char const* externalData, size_t len);
diff --git a/source/core/StarCompression.cpp b/source/core/StarCompression.cpp
index 2aa59b2..58b43ef 100644
--- a/source/core/StarCompression.cpp
+++ b/source/core/StarCompression.cpp
@@ -15,9 +15,9 @@ void compressData(ByteArray const& in, ByteArray& out, CompressionLevel compress
return;
const size_t BUFSIZE = 32 * 1024;
- unsigned char temp_buffer[BUFSIZE];
+ auto tempBuffer = std::make_unique<unsigned char[]>(BUFSIZE);
- z_stream strm;
+ z_stream strm{};
strm.zalloc = Z_NULL;
strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
@@ -27,13 +27,13 @@ void compressData(ByteArray const& in, ByteArray& out, CompressionLevel compress
strm.next_in = (unsigned char*)in.ptr();
strm.avail_in = in.size();
- strm.next_out = temp_buffer;
+ strm.next_out = tempBuffer.get();
strm.avail_out = BUFSIZE;
while (deflate_res == Z_OK) {
deflate_res = deflate(&strm, Z_FINISH);
if (strm.avail_out == 0) {
- out.append((char const*)temp_buffer, BUFSIZE);
- strm.next_out = temp_buffer;
+ out.append((char const*)tempBuffer.get(), BUFSIZE);
+ strm.next_out = tempBuffer.get();
strm.avail_out = BUFSIZE;
}
}
@@ -42,7 +42,7 @@ void compressData(ByteArray const& in, ByteArray& out, CompressionLevel compress
if (deflate_res != Z_STREAM_END)
throw IOException(strf("Internal error in uncompressData, deflate_res is {}", deflate_res));
- out.append((char const*)temp_buffer, BUFSIZE - strm.avail_out);
+ out.append((char const*)tempBuffer.get(), BUFSIZE - strm.avail_out);
}
ByteArray compressData(ByteArray const& in, CompressionLevel compression) {
@@ -51,16 +51,16 @@ ByteArray compressData(ByteArray const& in, CompressionLevel compression) {
return out;
}
-void uncompressData(ByteArray const& in, ByteArray& out) {
+void uncompressData(const char* in, size_t inLen, ByteArray& out, size_t limit) {
out.clear();
- if (in.empty())
+ if (!inLen)
return;
const size_t BUFSIZE = 32 * 1024;
- unsigned char temp_buffer[BUFSIZE];
+ auto tempBuffer = std::make_unique<unsigned char[]>(BUFSIZE);
- z_stream strm;
+ z_stream strm{};
strm.zalloc = Z_NULL;
strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
@@ -68,17 +68,22 @@ void uncompressData(ByteArray const& in, ByteArray& out) {
if (inflate_res != Z_OK)
throw IOException(strf("Failed to initialise inflate ({})", inflate_res));
- strm.next_in = (unsigned char*)in.ptr();
- strm.avail_in = in.size();
- strm.next_out = temp_buffer;
+ strm.next_in = (unsigned char*)in;
+ strm.avail_in = inLen;
+ strm.next_out = tempBuffer.get();
strm.avail_out = BUFSIZE;
while (inflate_res == Z_OK || inflate_res == Z_BUF_ERROR) {
inflate_res = inflate(&strm, Z_FINISH);
if (strm.avail_out == 0) {
- out.append((char const*)temp_buffer, BUFSIZE);
- strm.next_out = temp_buffer;
+ out.append((char const*)tempBuffer.get(), BUFSIZE);
+ strm.next_out = tempBuffer.get();
strm.avail_out = BUFSIZE;
+ if (limit && out.size() >= limit) {
+ inflateEnd(&strm);
+ throw IOException(strf("hit uncompressData limit of {} bytes", limit));
+ break;
+ }
} else if (inflate_res == Z_BUF_ERROR) {
break;
}
@@ -88,15 +93,23 @@ void uncompressData(ByteArray const& in, ByteArray& out) {
if (inflate_res != Z_STREAM_END)
throw IOException(strf("Internal error in uncompressData, inflate_res is {}", inflate_res));
- out.append((char const*)temp_buffer, BUFSIZE - strm.avail_out);
+ out.append((char const*)tempBuffer.get(), BUFSIZE - strm.avail_out);
}
-ByteArray uncompressData(ByteArray const& in) {
- ByteArray out = ByteArray::withReserve(in.size());
- uncompressData(in, out);
+ByteArray uncompressData(const char* in, size_t inLen, size_t limit) {
+ ByteArray out = ByteArray::withReserve(inLen);
+ uncompressData(in, inLen, out, limit);
return out;
}
+void uncompressData(ByteArray const& in, ByteArray& out, size_t limit) {
+ uncompressData(in.ptr(), in.size(), out, limit);
+}
+
+ByteArray uncompressData(ByteArray const& in, size_t limit) {
+ return uncompressData(in.ptr(), in.size(), limit);
+}
+
CompressedFilePtr CompressedFile::open(String const& filename, IOMode mode, CompressionLevel comp) {
CompressedFilePtr f = make_shared<CompressedFile>(filename);
f->open(mode, comp);
diff --git a/source/core/StarCompression.hpp b/source/core/StarCompression.hpp
index 56dc774..3322662 100644
--- a/source/core/StarCompression.hpp
+++ b/source/core/StarCompression.hpp
@@ -17,8 +17,10 @@ CompressionLevel const HighCompression = 9;
void compressData(ByteArray const& in, ByteArray& out, CompressionLevel compression = MediumCompression);
ByteArray compressData(ByteArray const& in, CompressionLevel compression = MediumCompression);
-void uncompressData(ByteArray const& in, ByteArray& out);
-ByteArray uncompressData(ByteArray const& in);
+void uncompressData(const char* in, size_t inLen, ByteArray& out, size_t limit = 0);
+ByteArray uncompressData(const char* in, size_t inLen, size_t limit = 0);
+void uncompressData(ByteArray const& in, ByteArray& out, size_t limit = 0);
+ByteArray uncompressData(ByteArray const& in, size_t limit = 0);
// Random access to a (potentially) compressed file.
class CompressedFile : public IODevice {
diff --git a/source/core/StarDataStreamDevices.cpp b/source/core/StarDataStreamDevices.cpp
index 58ec920..b769167 100644
--- a/source/core/StarDataStreamDevices.cpp
+++ b/source/core/StarDataStreamDevices.cpp
@@ -130,7 +130,9 @@ void DataStreamBuffer::writeData(char const* data, size_t len) {
m_buffer->writeFull(data, len);
}
-DataStreamExternalBuffer::DataStreamExternalBuffer() {}
+DataStreamExternalBuffer::DataStreamExternalBuffer() : m_buffer() {}
+
+DataStreamExternalBuffer::DataStreamExternalBuffer(DataStreamBuffer const& buffer) : DataStreamExternalBuffer(buffer.ptr(), buffer.size()) {}
DataStreamExternalBuffer::DataStreamExternalBuffer(char const* externalData, size_t len) : DataStreamExternalBuffer() {
reset(externalData, len);
@@ -160,6 +162,10 @@ size_t DataStreamExternalBuffer::pos() {
return m_buffer.pos();
}
+size_t DataStreamExternalBuffer::remaining() {
+ return m_buffer.dataSize() - m_buffer.pos();
+}
+
void DataStreamExternalBuffer::reset(char const* externalData, size_t len) {
m_buffer.reset(externalData, len);
}
diff --git a/source/core/StarDataStreamDevices.hpp b/source/core/StarDataStreamDevices.hpp
index dce7dcf..5d404ab 100644
--- a/source/core/StarDataStreamDevices.hpp
+++ b/source/core/StarDataStreamDevices.hpp
@@ -126,6 +126,8 @@ private:
class DataStreamExternalBuffer : public DataStream {
public:
DataStreamExternalBuffer();
+ DataStreamExternalBuffer(DataStreamBuffer const& buffer);
+ DataStreamExternalBuffer(DataStreamExternalBuffer const& buffer) = default;
DataStreamExternalBuffer(char const* externalData, size_t len);
char const* ptr() const;
@@ -136,6 +138,7 @@ public:
void seek(size_t pos, IOSeek mode = IOSeek::Absolute);
bool atEnd();
size_t pos();
+ size_t remaining();
void reset(char const* externalData, size_t len);
diff --git a/source/core/StarZSTDCompression.cpp b/source/core/StarZSTDCompression.cpp
new file mode 100644
index 0000000..733b182
--- /dev/null
+++ b/source/core/StarZSTDCompression.cpp
@@ -0,0 +1,84 @@
+#include "StarZSTDCompression.hpp"
+#include <zstd.h>
+
+namespace Star {
+
+CompressionStream::CompressionStream() : m_cStream(ZSTD_createCStream()) {
+ ZSTD_CCtx_setParameter(m_cStream, ZSTD_c_enableLongDistanceMatching, 1);
+ ZSTD_CCtx_setParameter(m_cStream, ZSTD_c_windowLog, 24);
+ ZSTD_initCStream(m_cStream, 2);
+}
+
+CompressionStream::~CompressionStream() { ZSTD_freeCStream(m_cStream); }
+
+ByteArray CompressionStream::compress(const char* in, size_t inLen) {
+ size_t const cInSize = ZSTD_CStreamInSize ();
+ size_t const cOutSize = ZSTD_CStreamOutSize();
+ ByteArray output(cOutSize, 0);
+ size_t written = 0, read = 0;
+ while (read < inLen) {
+ ZSTD_inBuffer inBuffer = {in + read, min(cInSize, inLen - read), 0};
+ ZSTD_outBuffer outBuffer = {output.ptr() + written, output.size() - written, 0};
+ bool finished = false;
+ do {
+ size_t ret = ZSTD_compressStream2(m_cStream, &outBuffer, &inBuffer, ZSTD_e_flush);
+ if (ZSTD_isError(ret)) {
+ throw IOException(strf("ZSTD compression error {}", ZSTD_getErrorName(ret)));
+ break;
+ }
+
+ if (outBuffer.pos == outBuffer.size) {
+ output.resize(output.size() * 2);
+ outBuffer.dst = output.ptr();
+ outBuffer.size = output.size();
+ continue;
+ }
+
+ finished = ret == 0 && inBuffer.pos == inBuffer.size;
+ } while (!finished);
+
+ read += inBuffer.pos;
+ written += outBuffer.pos;
+ }
+ output.resize(written);
+ return output;
+}
+
+DecompressionStream::DecompressionStream() : m_dStream(ZSTD_createDStream()) {
+ ZSTD_DCtx_setParameter(m_dStream, ZSTD_d_windowLogMax, 25);
+ ZSTD_initDStream(m_dStream);
+}
+
+DecompressionStream::~DecompressionStream() { ZSTD_freeDStream(m_dStream); }
+
+ByteArray DecompressionStream::decompress(const char* in, size_t inLen) {
+ size_t const dInSize = ZSTD_DStreamInSize ();
+ size_t const dOutSize = ZSTD_DStreamOutSize();
+ ByteArray output(dOutSize, 0);
+ size_t written = 0, read = 0;
+ while (read < inLen) {
+ ZSTD_inBuffer inBuffer = {in + read, min(dInSize, inLen - read), 0};
+ ZSTD_outBuffer outBuffer = {output.ptr() + written, output.size() - written, 0};
+ do {
+ size_t ret = ZSTD_decompressStream(m_dStream, &outBuffer, &inBuffer);
+ if (ZSTD_isError(ret)) {
+ throw IOException(strf("ZSTD decompression error {}", ZSTD_getErrorName(ret)));
+ break;
+ }
+
+ if (outBuffer.pos == outBuffer.size) {
+ output.resize(output.size() * 2);
+ outBuffer.dst = output.ptr();
+ outBuffer.size = output.size();
+ continue;
+ }
+ } while (inBuffer.pos < inBuffer.size);
+
+ read += inBuffer.pos;
+ written += outBuffer.pos;
+ }
+ output.resize(written);
+ return output;
+}
+
+} \ No newline at end of file
diff --git a/source/core/StarZSTDCompression.hpp b/source/core/StarZSTDCompression.hpp
new file mode 100644
index 0000000..77719bf
--- /dev/null
+++ b/source/core/StarZSTDCompression.hpp
@@ -0,0 +1,44 @@
+#pragma once
+#include "StarByteArray.hpp"
+#include "StarDataStreamDevices.hpp"
+
+typedef struct ZSTD_CCtx_s ZSTD_CCtx;
+typedef struct ZSTD_DCtx_s ZSTD_DCtx;
+typedef ZSTD_DCtx ZSTD_DStream;
+typedef ZSTD_CCtx ZSTD_CStream;
+
+namespace Star {
+
+class CompressionStream {
+public:
+ CompressionStream();
+ ~CompressionStream();
+
+ ByteArray compress(const char* in, size_t inLen);
+ ByteArray compress(ByteArray const& in);
+
+private:
+ ZSTD_CStream* m_cStream;
+};
+
+inline ByteArray CompressionStream::compress(ByteArray const& in) {
+ return compress(in.ptr(), in.size());
+}
+
+class DecompressionStream {
+public:
+ DecompressionStream();
+ ~DecompressionStream();
+
+ ByteArray decompress(const char* in, size_t inLen);
+ ByteArray decompress(ByteArray const& in);
+
+private:
+ ZSTD_DStream* m_dStream;
+};
+
+inline ByteArray DecompressionStream::decompress(ByteArray const& in) {
+ return decompress(in.ptr(), in.size());
+}
+
+} \ No newline at end of file
diff --git a/source/game/StarNetPacketSocket.cpp b/source/game/StarNetPacketSocket.cpp
index 48b2082..2ebaadb 100644
--- a/source/game/StarNetPacketSocket.cpp
+++ b/source/game/StarNetPacketSocket.cpp
@@ -6,17 +6,27 @@
namespace Star {
PacketStatCollector::PacketStatCollector(float calculationWindow)
- : m_calculationWindow(calculationWindow), m_stats(), m_lastMixTime(0) {}
+ : m_calculationWindow(calculationWindow), m_stats(), m_totalBytes(0), m_lastMixTime(0) {}
-void PacketStatCollector::mix(PacketType type, size_t size) {
+void PacketStatCollector::mix(size_t size) {
+ calculate();
+ m_totalBytes += size;
+}
+
+void PacketStatCollector::mix(PacketType type, size_t size, bool addToTotal) {
calculate();
m_unmixed[type] += size;
+ if (addToTotal)
+ m_totalBytes += size;
}
-void PacketStatCollector::mix(HashMap<PacketType, size_t> const& sizes) {
+void PacketStatCollector::mix(HashMap<PacketType, size_t> const& sizes, bool addToTotal) {
calculate();
- for (auto const& p : sizes)
+ for (auto const& p : sizes) {
+ if (addToTotal)
+ m_totalBytes += p.second;
m_unmixed[p.first] += p.second;
+ }
}
PacketStats PacketStatCollector::stats() const {
@@ -31,18 +41,19 @@ void PacketStatCollector::calculate() {
m_lastMixTime = currentTime;
m_stats.worstPacketSize = 0;
- float total = 0.0f;
+ if (abs(elapsedTime) - m_calculationWindow < 0.0125f)
+ elapsedTime = m_calculationWindow;
+
for (auto& pair : m_unmixed) {
if (pair.second > m_stats.worstPacketSize) {
m_stats.worstPacketType = pair.first;
m_stats.worstPacketSize = pair.second;
}
- auto& bytes = m_stats.packetBytesPerSecond[pair.first];
- bytes = pair.second / elapsedTime;
- total += bytes;
+ m_stats.packetBytesPerSecond[pair.first] = round(pair.second / elapsedTime);
}
- m_stats.bytesPerSecond = total;
+ m_stats.bytesPerSecond = round(float(m_totalBytes) / elapsedTime);
+ m_totalBytes = 0;
m_unmixed.clear();
}
}
@@ -138,67 +149,81 @@ void TcpPacketSocket::close() {
void TcpPacketSocket::sendPackets(List<PacketPtr> packets) {
auto it = makeSMutableIterator(packets);
-
- while (it.hasNext()) {
- PacketType currentType = it.peekNext()->type();
- PacketCompressionMode currentCompressionMode = it.peekNext()->compressionMode();
-
- DataStreamBuffer packetBuffer;
- while (it.hasNext()
- && it.peekNext()->type() == currentType
- && it.peekNext()->compressionMode() == currentCompressionMode) {
- if (legacy())
- it.next()->writeLegacy(packetBuffer);
- else
- it.next()->write(packetBuffer);
+ if (m_useCompressionStream) {
+ DataStreamBuffer outBuffer;
+ while (it.hasNext()) {
+ PacketPtr& packet = it.next();
+ auto packetType = packet->type();
+ DataStreamBuffer packetBuffer;
+ packet->write(packetBuffer);
+ outBuffer.write(packetType);
+ outBuffer.writeVlqI((int)packetBuffer.size());
+ outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size());
+ m_outgoingStats.mix(packetType, packetBuffer.size(), false);
}
+ m_outputBuffer.append(outBuffer.ptr(), outBuffer.size());
+ } else {
+ while (it.hasNext()) {
+ PacketType currentType = it.peekNext()->type();
+ PacketCompressionMode currentCompressionMode = it.peekNext()->compressionMode();
+
+ DataStreamBuffer packetBuffer;
+ while (it.hasNext()
+ && it.peekNext()->type() == currentType
+ && it.peekNext()->compressionMode() == currentCompressionMode) {
+ if (legacy())
+ it.next()->writeLegacy(packetBuffer);
+ else
+ it.next()->write(packetBuffer);
+ }
- // Packets must read and write actual data, because this is used to
- // determine packet count
- starAssert(!packetBuffer.empty());
+ // Packets must read and write actual data, because this is used to
+ // determine packet count
+ starAssert(!packetBuffer.empty());
- ByteArray compressedPackets;
- bool mustCompress = currentCompressionMode == PacketCompressionMode::Enabled;
- bool perhapsCompress = currentCompressionMode == PacketCompressionMode::Automatic && packetBuffer.size() > 64;
- if (mustCompress || perhapsCompress)
- compressedPackets = compressData(packetBuffer.data());
+ ByteArray compressedPackets;
+ bool mustCompress = currentCompressionMode == PacketCompressionMode::Enabled;
+ bool perhapsCompress = currentCompressionMode == PacketCompressionMode::Automatic && packetBuffer.size() > 64;
+ if (mustCompress || perhapsCompress)
+ compressedPackets = compressData(packetBuffer.data());
- DataStreamBuffer outBuffer;
- outBuffer.write(currentType);
+ DataStreamBuffer outBuffer;
+ outBuffer.write(currentType);
- if (!compressedPackets.empty() && (mustCompress || compressedPackets.size() < packetBuffer.size())) {
- outBuffer.writeVlqI(-(int)(compressedPackets.size()));
- outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size());
- m_outgoingStats.mix(currentType, compressedPackets.size());
- } else {
- outBuffer.writeVlqI((int)(packetBuffer.size()));
- outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size());
- m_outgoingStats.mix(currentType, packetBuffer.size());
+ if (!compressedPackets.empty() && (mustCompress || compressedPackets.size() < packetBuffer.size())) {
+ outBuffer.writeVlqI(-(int)(compressedPackets.size()));
+ outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size());
+ m_outgoingStats.mix(currentType, compressedPackets.size());
+ } else {
+ outBuffer.writeVlqI((int)(packetBuffer.size()));
+ outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size());
+ m_outgoingStats.mix(currentType, packetBuffer.size());
+ }
+ m_outputBuffer.append(outBuffer.takeData());
}
- m_outputBuffer.append(outBuffer.takeData());
}
}
List<PacketPtr> TcpPacketSocket::receivePackets() {
- uint64_t const PacketSizeLimit = 64<<20;
+ // How large can uncompressed packets be
+ // this limit is now also used during decompression
+ uint64_t const PacketSizeLimit = 64 << 20;
+ // How many packets can be batched together into one compressed chunk at once
+ uint64_t const PacketBatchLimit = 131072;
List<PacketPtr> packets;
try {
- while (!m_inputBuffer.empty()) {
+ DataStreamExternalBuffer ds(m_inputBuffer.ptr(), m_inputBuffer.size());
+ bool atLeastOne = false;
+ while (!ds.atEnd()) {
PacketType packetType;
uint64_t packetSize = 0;
bool packetCompressed = false;
- DataStreamBuffer ds(m_inputBuffer);
try {
packetType = ds.read<PacketType>();
int64_t len = ds.readVlqI();
- if (len < 0) {
- packetSize = -len;
- packetCompressed = true;
- } else {
- packetSize = len;
- packetCompressed = false;
- }
+ packetCompressed = len < 0;
+ packetSize = packetCompressed ? -len : len;
} catch (EofException const&) {
// Guard against not having the entire packet header available when
// trying to read.
@@ -206,19 +231,28 @@ List<PacketPtr> TcpPacketSocket::receivePackets() {
}
if (packetSize > PacketSizeLimit)
- throw IOException::format("Packet size {} exceeds maximum allowed packet size!", packetSize);
+ throw IOException::format("{} bytes large {} exceeds max size!", packetSize, PacketTypeNames.getRight(packetType));
if (packetSize > ds.size() - ds.pos())
break;
- ByteArray packetBytes = ds.readBytes(packetSize);
- if (packetCompressed)
- packetBytes = uncompressData(packetBytes);
+ atLeastOne = true;
+ m_incomingStats.mix(packetType, packetSize, !m_useCompressionStream);
- m_incomingStats.mix(packetType, packetSize);
+ DataStreamExternalBuffer packetStream(ds.ptr() + ds.pos(), packetSize);
+ ByteArray uncompressed;
+ if (packetCompressed) {
+ uncompressed = uncompressData(packetStream.ptr() + packetStream.pos(), packetSize, PacketSizeLimit);
+ packetStream.reset(uncompressed.ptr(), uncompressed.size());
+ }
+ ds.seek(packetSize, IOSeek::Relative);
- DataStreamBuffer packetStream(std::move(packetBytes));
+ size_t count = 0;
do {
+ if (++count > PacketBatchLimit) {
+ throw IOException::format("Packet batch limit {} reached while reading {}s!", PacketBatchLimit, PacketTypeNames.getRight(packetType));
+ break;
+ }
PacketPtr packet = createPacket(packetType);
packet->setCompressionMode(packetCompressed ? PacketCompressionMode::Enabled : PacketCompressionMode::Disabled);
if (legacy())
@@ -227,9 +261,9 @@ List<PacketPtr> TcpPacketSocket::receivePackets() {
packet->read(packetStream);
packets.append(std::move(packet));
} while (!packetStream.atEnd());
-
- m_inputBuffer = ds.readBytes(ds.size() - ds.pos());
}
+ if (atLeastOne)
+ m_inputBuffer.trimLeft(ds.pos());
} catch (IOException const& e) {
Logger::warn("I/O error in TcpPacketSocket::readPackets, closing: {}", outputException(e, false));
m_inputBuffer.clear();
@@ -248,15 +282,25 @@ bool TcpPacketSocket::writeData() {
bool dataSent = false;
try {
- if (m_outputBuffer.empty())
- return false;
-
- while (!m_outputBuffer.empty()) {
- size_t writtenAmount = m_socket->send(m_outputBuffer.ptr(), m_outputBuffer.size());
- if (writtenAmount == 0)
- break;
- dataSent = true;
- m_outputBuffer.trimLeft(writtenAmount);
+ if (m_useCompressionStream) {
+ auto compressed = m_compressionStream.compress(m_outputBuffer);
+ m_outputBuffer.clear();
+
+ m_compressedBuffer.append(compressed.ptr(), compressed.size());
+ size_t written = m_socket->send(m_compressedBuffer.ptr(), m_compressedBuffer.size());
+ if (written > 0) {
+ dataSent = true;
+ m_compressedBuffer.trimLeft(written);
+ m_outgoingStats.mix(written);
+ }
+ } else {
+ while (!m_outputBuffer.empty()) {
+ size_t written = m_socket->send(m_outputBuffer.ptr(), m_outputBuffer.size());
+ if (written == 0)
+ break;
+ dataSent = true;
+ m_outputBuffer.trimLeft(written);
+ }
}
} catch (SocketClosedException const& e) {
Logger::debug("TcpPacketSocket socket closed: {}", outputException(e, false));
@@ -276,7 +320,13 @@ bool TcpPacketSocket::readData() {
if (readAmount == 0)
break;
dataReceived = true;
- m_inputBuffer.append(readBuffer, readAmount);
+ m_incomingStats.mix(readAmount);
+ if (m_useCompressionStream) {
+ auto decompressed = m_decompressionStream.decompress(readBuffer, readAmount);
+ m_inputBuffer.append(decompressed.ptr(), decompressed.size());
+ } else {
+ m_inputBuffer.append(readBuffer, readAmount);
+ }
}
} catch (SocketClosedException const& e) {
Logger::debug("TcpPacketSocket socket closed: {}", outputException(e, false));
@@ -295,8 +345,12 @@ Maybe<PacketStats> TcpPacketSocket::outgoingStats() const {
return m_outgoingStats.stats();
}
-TcpPacketSocket::TcpPacketSocket(TcpSocketPtr socket)
- : m_socket(std::move(socket)) {}
+void TcpPacketSocket::setLegacy(bool legacy) {
+ m_useCompressionStream = !legacy;
+ PacketSocket::setLegacy(legacy);
+}
+
+TcpPacketSocket::TcpPacketSocket(TcpSocketPtr socket) : m_socket(std::move(socket)) {}
P2PPacketSocketUPtr P2PPacketSocket::open(P2PSocketUPtr socket) {
return P2PPacketSocketUPtr(new P2PPacketSocket(std::move(socket)));
diff --git a/source/game/StarNetPacketSocket.hpp b/source/game/StarNetPacketSocket.hpp
index 3530310..dfffec6 100644
--- a/source/game/StarNetPacketSocket.hpp
+++ b/source/game/StarNetPacketSocket.hpp
@@ -4,6 +4,7 @@
#include "StarAtomicSharedPtr.hpp"
#include "StarP2PNetworkingService.hpp"
#include "StarNetPackets.hpp"
+#include "StarZSTDCompression.hpp"
namespace Star {
@@ -24,10 +25,11 @@ class PacketStatCollector {
public:
PacketStatCollector(float calculationWindow = 1.0f);
- void mix(PacketType type, size_t size);
- void mix(HashMap<PacketType, size_t> const& sizes);
+ void mix(size_t size);
+ void mix(PacketType type, size_t size, bool addToTotal = true);
+ void mix(HashMap<PacketType, size_t> const& sizes, bool addToTotal = true);
- // Should always return packet staticstics for the most recent completed
+ // Should always return packet statistics for the most recent completed
// window of time
PacketStats stats() const;
@@ -37,6 +39,7 @@ private:
float m_calculationWindow;
PacketStats m_stats;
Map<PacketType, float> m_unmixed;
+ size_t m_totalBytes;
int64_t m_lastMixTime;
};
@@ -72,10 +75,10 @@ public:
virtual Maybe<PacketStats> incomingStats() const;
virtual Maybe<PacketStats> outgoingStats() const;
- void setLegacy(bool legacy);
- bool legacy() const;
+ virtual void setLegacy(bool legacy);
+ virtual bool legacy() const;
private:
- bool m_legacy = false;
+ bool m_legacy = true;
};
// PacketSocket for local communication.
@@ -127,6 +130,7 @@ public:
Maybe<PacketStats> incomingStats() const override;
Maybe<PacketStats> outgoingStats() const override;
+ void setLegacy(bool legacy) override;
private:
TcpPacketSocket(TcpSocketPtr socket);
@@ -136,6 +140,10 @@ private:
PacketStatCollector m_outgoingStats;
ByteArray m_outputBuffer;
ByteArray m_inputBuffer;
+ bool m_useCompressionStream = false;
+ ByteArray m_compressedBuffer;
+ CompressionStream m_compressionStream;
+ DecompressionStream m_decompressionStream;
};
// Wraps a P2PSocket into a PacketSocket
diff --git a/source/game/StarNetPackets.cpp b/source/game/StarNetPackets.cpp
index f787b1c..2f03283 100644
--- a/source/game/StarNetPackets.cpp
+++ b/source/game/StarNetPackets.cpp
@@ -341,7 +341,7 @@ ClientConnectPacket::ClientConnectPacket(ByteArray assetsDigest, bool allowAsset
playerName(std::move(playerName)), playerSpecies(std::move(playerSpecies)), shipChunks(std::move(shipChunks)),
shipUpgrades(std::move(shipUpgrades)), introComplete(std::move(introComplete)), account(std::move(account)) {}
-void ClientConnectPacket::read(DataStream& ds) {
+void ClientConnectPacket::readLegacy(DataStream& ds) {
ds.read(assetsDigest);
ds.read(allowAssetsMismatch);
ds.read(playerUuid);
@@ -353,7 +353,12 @@ void ClientConnectPacket::read(DataStream& ds) {
ds.read(account);
}
-void ClientConnectPacket::write(DataStream& ds) const {
+void ClientConnectPacket::read(DataStream& ds) {
+ readLegacy(ds);
+ ds.read(info);
+}
+
+void ClientConnectPacket::writeLegacy(DataStream& ds) const {
ds.write(assetsDigest);
ds.write(allowAssetsMismatch);
ds.write(playerUuid);
@@ -365,6 +370,11 @@ void ClientConnectPacket::write(DataStream& ds) const {
ds.write(account);
}
+void ClientConnectPacket::write(DataStream& ds) const {
+ writeLegacy(ds);
+ ds.write(info);
+}
+
ClientDisconnectRequestPacket::ClientDisconnectRequestPacket() {}
void ClientDisconnectRequestPacket::read(DataStream& ds) {
diff --git a/source/game/StarNetPackets.hpp b/source/game/StarNetPackets.hpp
index 98455e2..dd2acfa 100644
--- a/source/game/StarNetPackets.hpp
+++ b/source/game/StarNetPackets.hpp
@@ -127,11 +127,10 @@ struct Packet {
virtual PacketType type() const = 0;
- virtual void read(DataStream& ds) = 0;
- virtual void write(DataStream& ds) const = 0;
-
virtual void readLegacy(DataStream& ds);
+ virtual void read(DataStream& ds) = 0;
virtual void writeLegacy(DataStream& ds) const;
+ virtual void write(DataStream& ds) const = 0;
PacketCompressionMode compressionMode() const;
void setCompressionMode(PacketCompressionMode compressionMode);
@@ -288,7 +287,9 @@ struct ClientConnectPacket : PacketBase<PacketType::ClientConnect> {
String playerSpecies, WorldChunks shipChunks, ShipUpgrades shipUpgrades, bool introComplete,
String account);
+ void readLegacy(DataStream& ds) override;
void read(DataStream& ds) override;
+ void writeLegacy(DataStream& ds) const override;
void write(DataStream& ds) const override;
ByteArray assetsDigest;
@@ -300,6 +301,7 @@ struct ClientConnectPacket : PacketBase<PacketType::ClientConnect> {
ShipUpgrades shipUpgrades;
bool introComplete;
String account;
+ Json info;
};
struct ClientDisconnectRequestPacket : PacketBase<PacketType::ClientDisconnectRequest> {
diff --git a/source/game/StarUniverseServer.cpp b/source/game/StarUniverseServer.cpp
index 783bde2..eafa844 100644
--- a/source/game/StarUniverseServer.cpp
+++ b/source/game/StarUniverseServer.cpp
@@ -1522,7 +1522,6 @@ void UniverseServer::acceptConnection(UniverseConnection connection, Maybe<HostA
}
bool legacyClient = protocolRequest->compressionMode() != PacketCompressionMode::Enabled;
- connection.setLegacy(legacyClient);
auto protocolResponse = make_shared<ProtocolResponsePacket>();
protocolResponse->setCompressionMode(PacketCompressionMode::Enabled); // Signal that we're OpenStarbound
@@ -1540,6 +1539,7 @@ void UniverseServer::acceptConnection(UniverseConnection connection, Maybe<HostA
protocolResponse->allowed = true;
connection.pushSingle(protocolResponse);
connection.sendAll(clientWaitLimit);
+ connection.setLegacy(legacyClient);
String remoteAddressString = remoteAddress ? toString(*remoteAddress) : "local";
Logger::info("UniverseServer: Awaiting connection info from {}, {} client", remoteAddressString, legacyClient ? "Starbound" : "OpenStarbound");
diff --git a/source/vcpkg.json b/source/vcpkg.json
index 1e0942c..6dcae93 100644
--- a/source/vcpkg.json
+++ b/source/vcpkg.json
@@ -7,6 +7,7 @@
"zlib",
"freetype",
"libpng",
- "opus"
+ "opus",
+ "zstd"
]
} \ No newline at end of file