Веб-сайт самохостера Lotigara

summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--source/core/StarNetImpl.hpp12
-rw-r--r--source/game/StarNetPacketSocket.cpp112
-rw-r--r--source/game/StarNetPacketSocket.hpp23
-rw-r--r--source/game/StarNetPackets.cpp23
-rw-r--r--source/game/StarNetPackets.hpp16
-rw-r--r--source/game/StarRootLoader.cpp3
-rw-r--r--source/game/StarUniverseClient.cpp24
-rw-r--r--source/game/StarUniverseConnection.cpp4
-rw-r--r--source/game/StarUniverseConnection.hpp3
-rw-r--r--source/game/StarUniverseServer.cpp17
-rw-r--r--source/game/StarWorldClient.cpp2
11 files changed, 165 insertions, 74 deletions
diff --git a/source/core/StarNetImpl.hpp b/source/core/StarNetImpl.hpp
index a84da6d..b7213e7 100644
--- a/source/core/StarNetImpl.hpp
+++ b/source/core/StarNetImpl.hpp
@@ -4,6 +4,8 @@
#include <winsock2.h>
#include <ws2tcpip.h>
#include <stdio.h>
+
+#include "StarString_windows.hpp"
#else
#ifdef STAR_SYSTEM_FREEBSD
#include <sys/types.h>
@@ -42,17 +44,19 @@ static WindowsSocketInitializer g_windowsSocketInitializer;
inline String netErrorString() {
#ifdef STAR_SYSTEM_WINDOWS
- LPVOID lpMsgBuf = NULL;
+ LPWSTR lpMsgBuf = NULL;
+ int error = WSAGetLastError();
- FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
+ FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM
+ | FORMAT_MESSAGE_IGNORE_INSERTS | FORMAT_MESSAGE_MAX_WIDTH_MASK,
NULL,
- WSAGetLastError(),
+ error,
MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language
(LPTSTR)&lpMsgBuf,
0,
NULL);
- String result = String((char*)lpMsgBuf);
+ String result = strf("{} - {}", error, utf16ToString(lpMsgBuf));
if (lpMsgBuf != NULL)
LocalFree(lpMsgBuf);
diff --git a/source/game/StarNetPacketSocket.cpp b/source/game/StarNetPacketSocket.cpp
index cd5c749..436726a 100644
--- a/source/game/StarNetPacketSocket.cpp
+++ b/source/game/StarNetPacketSocket.cpp
@@ -66,6 +66,9 @@ Maybe<PacketStats> PacketSocket::outgoingStats() const {
void PacketSocket::setLegacy(bool legacy) { m_legacy = legacy; }
bool PacketSocket::legacy() const { return m_legacy; }
+void CompressedPacketSocket::setCompressionStreamEnabled(bool enabled) { m_useCompressionStream = enabled; }
+bool CompressedPacketSocket::compressionStreamEnabled() const { return m_useCompressionStream; }
+
pair<LocalPacketSocketUPtr, LocalPacketSocketUPtr> LocalPacketSocket::openPair() {
auto lhsIncomingPipe = make_shared<Pipe>();
auto rhsIncomingPipe = make_shared<Pipe>();
@@ -146,7 +149,7 @@ void TcpPacketSocket::close() {
void TcpPacketSocket::sendPackets(List<PacketPtr> packets) {
auto it = makeSMutableIterator(packets);
- if (m_useCompressionStream) {
+ if (compressionStreamEnabled()) {
DataStreamBuffer outBuffer;
while (it.hasNext()) {
PacketPtr& packet = it.next();
@@ -233,7 +236,7 @@ List<PacketPtr> TcpPacketSocket::receivePackets() {
if (packetSize > ds.remaining())
break;
- m_incomingStats.mix(packetType, packetSize, !m_useCompressionStream);
+ m_incomingStats.mix(packetType, packetSize, !compressionStreamEnabled());
DataStreamExternalBuffer packetStream(ds.ptr() + ds.pos(), packetSize);
ByteArray uncompressed;
@@ -280,19 +283,17 @@ bool TcpPacketSocket::writeData() {
bool dataSent = false;
try {
if (!m_outputBuffer.empty()) {
- if (m_useCompressionStream) {
- auto compressed = m_compressionStream.compress(m_outputBuffer);
+ if (compressionStreamEnabled()) {
+ auto compressedBuffer = m_compressionStream.compress(m_outputBuffer);
m_outputBuffer.clear();
-
- m_compressedBuffer.append(compressed.ptr(), compressed.size());
do {
- size_t written = m_socket->send(m_compressedBuffer.ptr(), m_compressedBuffer.size());
+ size_t written = m_socket->send(compressedBuffer.ptr(), compressedBuffer.size());
if (written > 0) {
dataSent = true;
- m_compressedBuffer.trimLeft(written);
+ compressedBuffer.trimLeft(written);
m_outgoingStats.mix(written);
}
- } while (!m_compressedBuffer.empty());
+ } while (!compressedBuffer.empty());
} else {
do {
size_t written = m_socket->send(m_outputBuffer.ptr(), m_outputBuffer.size());
@@ -321,10 +322,10 @@ bool TcpPacketSocket::readData() {
if (readAmount == 0)
break;
dataReceived = true;
- if (m_useCompressionStream) {
+ if (compressionStreamEnabled()) {
m_incomingStats.mix(readAmount);
auto decompressed = m_decompressionStream.decompress(readBuffer, readAmount);
- m_inputBuffer.append(decompressed.ptr(), decompressed.size());
+ m_inputBuffer.append(decompressed);
} else {
m_inputBuffer.append(readBuffer, readAmount);
}
@@ -347,7 +348,6 @@ Maybe<PacketStats> TcpPacketSocket::outgoingStats() const {
}
void TcpPacketSocket::setLegacy(bool legacy) {
- m_useCompressionStream = !legacy;
PacketSocket::setLegacy(legacy);
}
@@ -368,43 +368,58 @@ void P2PPacketSocket::close() {
void P2PPacketSocket::sendPackets(List<PacketPtr> packets) {
auto it = makeSMutableIterator(packets);
- while (it.hasNext()) {
- PacketType currentType = it.peekNext()->type();
- PacketCompressionMode currentCompressionMode = it.peekNext()->compressionMode();
-
- DataStreamBuffer packetBuffer;
- while (it.hasNext()
- && it.peekNext()->type() == currentType
- && it.peekNext()->compressionMode() == currentCompressionMode) {
- if (legacy())
- it.next()->writeLegacy(packetBuffer);
- else
- it.next()->write(packetBuffer);
+ if (compressionStreamEnabled()) {
+ DataStreamBuffer outBuffer;
+ while (it.hasNext()) {
+ PacketPtr& packet = it.next();
+ auto packetType = packet->type();
+ DataStreamBuffer packetBuffer;
+ packet->write(packetBuffer);
+ outBuffer.write(packetType);
+ outBuffer.writeVlqI((int)packetBuffer.size());
+ outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size());
+ m_outgoingStats.mix(packetType, packetBuffer.size(), false);
}
+ m_outputMessages.append(m_compressionStream.compress(outBuffer.data()));
+ } else {
+ while (it.hasNext()) {
+ PacketType currentType = it.peekNext()->type();
+ PacketCompressionMode currentCompressionMode = it.peekNext()->compressionMode();
+
+ DataStreamBuffer packetBuffer;
+ while (it.hasNext()
+ && it.peekNext()->type() == currentType
+ && it.peekNext()->compressionMode() == currentCompressionMode) {
+ if (legacy())
+ it.next()->writeLegacy(packetBuffer);
+ else
+ it.next()->write(packetBuffer);
+ }
- // Packets must read and write actual data, because this is used to
- // determine packet count
- starAssert(!packetBuffer.empty());
+ // Packets must read and write actual data, because this is used to
+ // determine packet count
+ starAssert(!packetBuffer.empty());
- ByteArray compressedPackets;
- bool mustCompress = currentCompressionMode == PacketCompressionMode::Enabled;
- bool perhapsCompress = currentCompressionMode == PacketCompressionMode::Automatic && packetBuffer.size() > 64;
- if (mustCompress || perhapsCompress)
- compressedPackets = compressData(packetBuffer.data());
+ ByteArray compressedPackets;
+ bool mustCompress = currentCompressionMode == PacketCompressionMode::Enabled;
+ bool perhapsCompress = currentCompressionMode == PacketCompressionMode::Automatic && packetBuffer.size() > 64;
+ if (mustCompress || perhapsCompress)
+ compressedPackets = compressData(packetBuffer.data());
- DataStreamBuffer outBuffer;
- outBuffer.write(currentType);
-
- if (!compressedPackets.empty() && (mustCompress || compressedPackets.size() < packetBuffer.size())) {
- outBuffer.write<bool>(true);
- outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size());
- m_outgoingStats.mix(currentType, compressedPackets.size());
- } else {
- outBuffer.write<bool>(false);
- outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size());
- m_outgoingStats.mix(currentType, packetBuffer.size());
+ DataStreamBuffer outBuffer;
+ outBuffer.write(currentType);
+
+ if (!compressedPackets.empty() && (mustCompress || compressedPackets.size() < packetBuffer.size())) {
+ outBuffer.write<bool>(true);
+ outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size());
+ m_outgoingStats.mix(currentType, compressedPackets.size());
+ } else {
+ outBuffer.write<bool>(false);
+ outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size());
+ m_outgoingStats.mix(currentType, packetBuffer.size());
+ }
+ m_outputMessages.append(outBuffer.takeData());
}
- m_outputMessages.append(outBuffer.takeData());
}
}
@@ -422,9 +437,9 @@ List<PacketPtr> P2PPacketSocket::receivePackets() {
if (packetCompressed)
packetBytes = uncompressData(packetBytes);
- m_incomingStats.mix(packetType, packetSize);
+ m_incomingStats.mix(packetType, packetSize, !compressionStreamEnabled());
- DataStreamBuffer packetStream(std::move(packetBytes));
+ DataStreamExternalBuffer packetStream(packetBytes);
do {
PacketPtr packet = createPacket(packetType);
packet->setCompressionMode(packetCompressed ? PacketCompressionMode::Enabled : PacketCompressionMode::Disabled);
@@ -468,7 +483,10 @@ bool P2PPacketSocket::readData() {
if (m_socket) {
while (auto message = m_socket->receiveMessage()) {
- m_inputMessages.append(message.take());
+ m_incomingStats.mix(message->size());
+ m_inputMessages.append(compressionStreamEnabled()
+ ? m_decompressionStream.decompress(*message)
+ : *message);
workDone = true;
}
}
diff --git a/source/game/StarNetPacketSocket.hpp b/source/game/StarNetPacketSocket.hpp
index dfffec6..c2c06fa 100644
--- a/source/game/StarNetPacketSocket.hpp
+++ b/source/game/StarNetPacketSocket.hpp
@@ -78,7 +78,20 @@ public:
virtual void setLegacy(bool legacy);
virtual bool legacy() const;
private:
- bool m_legacy = true;
+ bool m_legacy = false;
+};
+
+class CompressedPacketSocket : public PacketSocket {
+public:
+ virtual ~CompressedPacketSocket() = default;
+
+ virtual void setCompressionStreamEnabled(bool enabled);
+ virtual bool compressionStreamEnabled() const;
+private:
+ bool m_useCompressionStream = false;
+protected:
+ CompressionStream m_compressionStream;
+ DecompressionStream m_decompressionStream;
};
// PacketSocket for local communication.
@@ -112,7 +125,7 @@ private:
};
// Wraps a TCP socket into a PacketSocket.
-class TcpPacketSocket : public PacketSocket {
+class TcpPacketSocket : public CompressedPacketSocket {
public:
static TcpPacketSocketUPtr open(TcpSocketPtr socket);
@@ -140,14 +153,10 @@ private:
PacketStatCollector m_outgoingStats;
ByteArray m_outputBuffer;
ByteArray m_inputBuffer;
- bool m_useCompressionStream = false;
- ByteArray m_compressedBuffer;
- CompressionStream m_compressionStream;
- DecompressionStream m_decompressionStream;
};
// Wraps a P2PSocket into a PacketSocket
-class P2PPacketSocket : public PacketSocket {
+class P2PPacketSocket : public CompressedPacketSocket {
public:
static P2PPacketSocketUPtr open(P2PSocketUPtr socket);
diff --git a/source/game/StarNetPackets.cpp b/source/game/StarNetPackets.cpp
index 3cbb3ad..d0aacac 100644
--- a/source/game/StarNetPackets.cpp
+++ b/source/game/StarNetPackets.cpp
@@ -78,6 +78,11 @@ EnumMap<PacketType> const PacketTypeNames{
{PacketType::SystemObjectSpawn, "SystemObjectSpawn"}
};
+EnumMap<NetCompressionMode> const NetCompressionModeNames {
+ {NetCompressionMode::None, "None"},
+ {NetCompressionMode::Zstd, "Zstd"}
+};
+
Packet::~Packet() {}
void Packet::readLegacy(DataStream& ds) { read(ds); }
@@ -187,17 +192,29 @@ void ProtocolRequestPacket::write(DataStream& ds) const {
ds.write(requestProtocolVersion);
}
-ProtocolResponsePacket::ProtocolResponsePacket(bool allowed)
- : allowed(allowed) {}
+ProtocolResponsePacket::ProtocolResponsePacket(bool allowed, Json info)
+ : allowed(allowed), info(info) {}
void ProtocolResponsePacket::read(DataStream& ds) {
ds.read(allowed);
+ if (compressionMode() == PacketCompressionMode::Enabled) {
+ // gross hack for backwards compatibility with older OpenSB servers
+ // can be removed later
+ auto externalBuffer = as<DataStreamExternalBuffer>(&ds);
+ if (!externalBuffer || !externalBuffer->atEnd())
+ ds.read(info);
+ }
}
-void ProtocolResponsePacket::write(DataStream& ds) const {
+void ProtocolResponsePacket::writeLegacy(DataStream& ds) const {
ds.write(allowed);
}
+void ProtocolResponsePacket::write(DataStream& ds) const {
+ writeLegacy(ds);
+ ds.write(info);
+}
+
ConnectSuccessPacket::ConnectSuccessPacket() {}
ConnectSuccessPacket::ConnectSuccessPacket(
diff --git a/source/game/StarNetPackets.hpp b/source/game/StarNetPackets.hpp
index 475f57a..bfaea6e 100644
--- a/source/game/StarNetPackets.hpp
+++ b/source/game/StarNetPackets.hpp
@@ -116,16 +116,23 @@ enum class PacketType : uint8_t {
};
extern EnumMap<PacketType> const PacketTypeNames;
+enum class NetCompressionMode : uint8_t {
+ None,
+ Zstd
+};
+extern EnumMap<NetCompressionMode> const NetCompressionModeNames;
+
enum class PacketCompressionMode : uint8_t {
Disabled,
- Enabled,
- Automatic
+ Automatic,
+ Enabled
};
struct Packet {
virtual ~Packet();
virtual PacketType type() const = 0;
+ virtual String const& typeName() const = 0;
virtual void readLegacy(DataStream& ds);
virtual void read(DataStream& ds) = 0;
@@ -149,6 +156,7 @@ struct PacketBase : public Packet {
static PacketType const Type = PacketT;
PacketType type() const override { return Type; }
+ String const& typeName() const override { return PacketTypeNames.getRight(Type); }
};
struct ProtocolRequestPacket : PacketBase<PacketType::ProtocolRequest> {
@@ -162,12 +170,14 @@ struct ProtocolRequestPacket : PacketBase<PacketType::ProtocolRequest> {
};
struct ProtocolResponsePacket : PacketBase<PacketType::ProtocolResponse> {
- ProtocolResponsePacket(bool allowed = false);
+ ProtocolResponsePacket(bool allowed = false, Json info = {});
void read(DataStream& ds) override;
+ void writeLegacy(DataStream& ds) const override;
void write(DataStream& ds) const override;
bool allowed;
+ Json info;
};
struct ServerDisconnectPacket : PacketBase<PacketType::ServerDisconnect> {
diff --git a/source/game/StarRootLoader.cpp b/source/game/StarRootLoader.cpp
index bc0bce3..1643791 100644
--- a/source/game/StarRootLoader.cpp
+++ b/source/game/StarRootLoader.cpp
@@ -76,6 +76,9 @@ R"JSON(
"allowAdminCommands" : true,
"allowAdminCommandsFromAnyone" : false,
"anonymousConnectionsAreAdmin" : false,
+ "connectionSettings" : {
+ "compression" : "Zstd"
+ },
"clientP2PJoinable" : true,
"clientIPJoinable" : false,
diff --git a/source/game/StarUniverseClient.cpp b/source/game/StarUniverseClient.cpp
index 136fd7b..a5bd9df 100644
--- a/source/game/StarUniverseClient.cpp
+++ b/source/game/StarUniverseClient.cpp
@@ -81,8 +81,8 @@ Maybe<String> UniverseClient::connect(UniverseConnection connection, bool allowA
{
auto protocolRequest = make_shared<ProtocolRequestPacket>(StarProtocolVersion);
protocolRequest->setCompressionMode(PacketCompressionMode::Enabled);
- // Signal that we're OpenStarbound. Vanilla Starbound only compresses packets above 64 bytes - by forcing it we can communicate this.
- // If you know a less cursed way, please let me know.
+ // Signal that we're OpenStarbound. Vanilla Starbound only compresses
+ // packets above 64 bytes - by forcing it, we can communicate this.
connection.pushSingle(protocolRequest);
}
connection.sendAll(timeout);
@@ -94,8 +94,24 @@ Maybe<String> UniverseClient::connect(UniverseConnection connection, bool allowA
else if (!protocolResponsePacket->allowed)
return String(strf("Join failed! Server does not support connections with protocol version {}", StarProtocolVersion));
- m_legacyServer = protocolResponsePacket->compressionMode() != PacketCompressionMode::Enabled; // True if server is vanilla
- connection.setLegacy(m_legacyServer);
+ if (!(m_legacyServer = protocolResponsePacket->compressionMode() != PacketCompressionMode::Enabled)) {
+ if (auto compressedSocket = as<CompressedPacketSocket>(&connection.packetSocket())) {
+ if (protocolResponsePacket->info) {
+ auto compressionName = protocolResponsePacket->info.getString("compression", "None");
+ auto compressionMode = NetCompressionModeNames.maybeLeft(compressionName);
+ if (!compressionMode)
+ return String(strf("Join failed! Unknown net stream connection type '{}'", compressionName));
+
+ Logger::info("UniverseClient: Using '{}' network stream compression", NetCompressionModeNames.getRight(*compressionMode));
+ compressedSocket->setCompressionStreamEnabled(compressionMode == NetCompressionMode::Zstd);
+ } else if (!m_legacyServer) {
+ Logger::info("UniverseClient: Defaulting to Zstd network stream compression (older server version)");
+ compressedSocket->setCompressionStreamEnabled(true);// old OpenSB server version always expects it!
+ }
+ }
+ }
+ connection.packetSocket().setLegacy(m_legacyServer);
+
connection.pushSingle(make_shared<ClientConnectPacket>(Root::singleton().assets()->digest(), allowAssetsMismatch, m_mainPlayer->uuid(), m_mainPlayer->name(),
m_mainPlayer->species(), m_playerStorage->loadShipData(m_mainPlayer->uuid()), m_mainPlayer->shipUpgrades(),
m_mainPlayer->log()->introComplete(), account));
diff --git a/source/game/StarUniverseConnection.cpp b/source/game/StarUniverseConnection.cpp
index 18b6da9..63f7f75 100644
--- a/source/game/StarUniverseConnection.cpp
+++ b/source/game/StarUniverseConnection.cpp
@@ -107,8 +107,8 @@ bool UniverseConnection::receiveAny(unsigned timeout) {
}
}
-void UniverseConnection::setLegacy(bool legacy) {
- m_packetSocket->setLegacy(legacy);
+PacketSocket& UniverseConnection::packetSocket() {
+ return *m_packetSocket;
}
Maybe<PacketStats> UniverseConnection::incomingStats() const {
diff --git a/source/game/StarUniverseConnection.hpp b/source/game/StarUniverseConnection.hpp
index 666e2da..ed08104 100644
--- a/source/game/StarUniverseConnection.hpp
+++ b/source/game/StarUniverseConnection.hpp
@@ -48,7 +48,8 @@ public:
// false if the timeout was reached with no packets receivable.
bool receiveAny(unsigned timeout);
- void setLegacy(bool legacy);
+ // Returns a reference to the packet socket.
+ PacketSocket& packetSocket();
// Packet stats for the most recent one second window of activity incoming
// and outgoing. Will only return valid stats if the underlying PacketSocket
diff --git a/source/game/StarUniverseServer.cpp b/source/game/StarUniverseServer.cpp
index 43e67c0..56ae634 100644
--- a/source/game/StarUniverseServer.cpp
+++ b/source/game/StarUniverseServer.cpp
@@ -1540,6 +1540,7 @@ void UniverseServer::acceptConnection(UniverseConnection connection, Maybe<HostA
int clientWaitLimit = assets->json("/universe_server.config:clientWaitLimit").toInt();
String serverAssetsMismatchMessage = assets->json("/universe_server.config:serverAssetsMismatchMessage").toString();
String clientAssetsMismatchMessage = assets->json("/universe_server.config:clientAssetsMismatchMessage").toString();
+ auto connectionSettings = configuration->get("connectionSettings");
RecursiveMutexLocker mainLocker(m_mainLock, false);
@@ -1549,8 +1550,9 @@ void UniverseServer::acceptConnection(UniverseConnection connection, Maybe<HostA
Logger::warn("UniverseServer: client connection aborted, expected ProtocolRequestPacket");
return;
}
-
+
bool legacyClient = protocolRequest->compressionMode() != PacketCompressionMode::Enabled;
+ connection.packetSocket().setLegacy(legacyClient);
auto protocolResponse = make_shared<ProtocolResponsePacket>();
protocolResponse->setCompressionMode(PacketCompressionMode::Enabled); // Signal that we're OpenStarbound
@@ -1565,10 +1567,21 @@ void UniverseServer::acceptConnection(UniverseConnection connection, Maybe<HostA
return;
}
+ bool useCompressionStream = false;
protocolResponse->allowed = true;
+ if (!legacyClient) {
+ auto compressionName = connectionSettings.getString("compression", "None");
+ auto compressionMode = NetCompressionModeNames.maybeLeft(compressionName).value(NetCompressionMode::None);
+ useCompressionStream = compressionMode == NetCompressionMode::Zstd;
+ protocolResponse->info = JsonObject{
+ {"compression", NetCompressionModeNames.getRight(compressionMode)}
+ };
+ }
connection.pushSingle(protocolResponse);
connection.sendAll(clientWaitLimit);
- connection.setLegacy(legacyClient);
+
+ if (auto compressedSocket = as<CompressedPacketSocket>(&connection.packetSocket()))
+ compressedSocket->setCompressionStreamEnabled(useCompressionStream);
String remoteAddressString = remoteAddress ? toString(*remoteAddress) : "local";
Logger::info("UniverseServer: Awaiting connection info from {}, {} client", remoteAddressString, legacyClient ? "Starbound" : "OpenStarbound");
diff --git a/source/game/StarWorldClient.cpp b/source/game/StarWorldClient.cpp
index 9e0478a..fba94f3 100644
--- a/source/game/StarWorldClient.cpp
+++ b/source/game/StarWorldClient.cpp
@@ -751,7 +751,7 @@ void WorldClient::handleIncomingPackets(List<PacketPtr> const& packets) {
for (auto const& packet : packets) {
if (!inWorld() && !is<WorldStartPacket>(packet))
- Logger::error("WorldClient received packet type {} while not in world", PacketTypeNames.getRight(packet->type()));
+ Logger::error("WorldClient received packet type {} while not in world", packet->typeName());
if (auto worldStartPacket = as<WorldStartPacket>(packet)) {
initWorld(*worldStartPacket);