Веб-сайт самохостера Lotigara

summaryrefslogtreecommitdiff
path: root/source/game/StarNetPacketSocket.cpp
diff options
context:
space:
mode:
authorKae <80987908+Novaenia@users.noreply.github.com>2023-06-20 14:33:09 +1000
committerKae <80987908+Novaenia@users.noreply.github.com>2023-06-20 14:33:09 +1000
commit6352e8e3196f78388b6c771073f9e03eaa612673 (patch)
treee23772f79a7fbc41bc9108951e9e136857484bf4 /source/game/StarNetPacketSocket.cpp
parent6741a057e5639280d85d0f88ba26f000baa58f61 (diff)
everything everywhere
all at once
Diffstat (limited to 'source/game/StarNetPacketSocket.cpp')
-rw-r--r--source/game/StarNetPacketSocket.cpp406
1 files changed, 406 insertions, 0 deletions
diff --git a/source/game/StarNetPacketSocket.cpp b/source/game/StarNetPacketSocket.cpp
new file mode 100644
index 0000000..392f2d9
--- /dev/null
+++ b/source/game/StarNetPacketSocket.cpp
@@ -0,0 +1,406 @@
+#include "StarNetPacketSocket.hpp"
+#include "StarIterator.hpp"
+#include "StarCompression.hpp"
+#include "StarLogging.hpp"
+
+namespace Star {
+
+PacketStatCollector::PacketStatCollector(float calculationWindow)
+ : m_calculationWindow(calculationWindow), m_stats(), m_lastMixTime(0) {}
+
+void PacketStatCollector::mix(PacketType type, size_t size) {
+ calculate();
+ m_unmixed[type] += size;
+}
+
+void PacketStatCollector::mix(HashMap<PacketType, size_t> const& sizes) {
+ calculate();
+ for (auto const& p : sizes)
+ m_unmixed[p.first] += p.second;
+}
+
+PacketStats PacketStatCollector::stats() const {
+ const_cast<PacketStatCollector*>(this)->calculate();
+ return m_stats;
+}
+
+void PacketStatCollector::calculate() {
+ int64_t currentTime = Time::monotonicMilliseconds();
+ float elapsedTime = (currentTime - m_lastMixTime) / 1000.0f;
+ if (elapsedTime >= m_calculationWindow) {
+ m_lastMixTime = currentTime;
+ m_stats.worstPacketSize = 0;
+
+ float total = 0.0f;
+ for (auto& pair : m_unmixed) {
+ if (pair.second > m_stats.worstPacketSize) {
+ m_stats.worstPacketType = pair.first;
+ m_stats.worstPacketSize = pair.second;
+ }
+
+ auto& bytes = m_stats.packetBytesPerSecond[pair.first];
+ bytes = pair.second / elapsedTime;
+ total += bytes;
+ }
+ m_stats.bytesPerSecond = total;
+ m_unmixed.clear();
+ }
+}
+
+Maybe<PacketStats> PacketSocket::incomingStats() const {
+ return {};
+}
+
+Maybe<PacketStats> PacketSocket::outgoingStats() const {
+ return {};
+}
+
+pair<LocalPacketSocketUPtr, LocalPacketSocketUPtr> LocalPacketSocket::openPair() {
+ auto lhsIncomingPipe = make_shared<Pipe>();
+ auto rhsIncomingPipe = make_shared<Pipe>();
+
+ return {
+ LocalPacketSocketUPtr(new LocalPacketSocket(lhsIncomingPipe, weak_ptr<Pipe>(rhsIncomingPipe))),
+ LocalPacketSocketUPtr(new LocalPacketSocket(rhsIncomingPipe, weak_ptr<Pipe>(lhsIncomingPipe)))
+ };
+}
+
+bool LocalPacketSocket::isOpen() const {
+ return m_incomingPipe && !m_outgoingPipe.expired();
+}
+
+void LocalPacketSocket::close() {
+ m_incomingPipe.reset();
+}
+
+void LocalPacketSocket::sendPackets(List<PacketPtr> packets) {
+ if (!isOpen() || packets.empty())
+ return;
+
+ if (auto outgoingPipe = m_outgoingPipe.lock()) {
+ MutexLocker locker(outgoingPipe->mutex);
+
+#ifdef STAR_DEBUG
+ // Test serialization if STAR_DEBUG is enabled
+ DataStreamBuffer buffer;
+ for (auto inPacket : take(packets)) {
+ buffer.clear();
+ inPacket->write(buffer);
+ auto outPacket = createPacket(inPacket->type());
+ buffer.seek(0);
+ outPacket->read(buffer);
+ packets.append(outPacket);
+ }
+#endif
+
+ outgoingPipe->queue.appendAll(move(packets));
+ }
+}
+
+List<PacketPtr> LocalPacketSocket::receivePackets() {
+ MutexLocker locker(m_incomingPipe->mutex);
+ List<PacketPtr> packets;
+ packets.appendAll(take(m_incomingPipe->queue));
+ return packets;
+}
+
+bool LocalPacketSocket::sentPacketsPending() const {
+ return false;
+}
+
+bool LocalPacketSocket::writeData() {
+ return false;
+}
+
+bool LocalPacketSocket::readData() {
+ return false;
+}
+
+LocalPacketSocket::LocalPacketSocket(shared_ptr<Pipe> incomingPipe, weak_ptr<Pipe> outgoingPipe)
+ : m_incomingPipe(move(incomingPipe)), m_outgoingPipe(move(outgoingPipe)) {}
+
+TcpPacketSocketUPtr TcpPacketSocket::open(TcpSocketPtr socket) {
+ socket->setNoDelay(true);
+ socket->setNonBlocking(true);
+ return TcpPacketSocketUPtr(new TcpPacketSocket(move(socket)));
+}
+
+bool TcpPacketSocket::isOpen() const {
+ return m_socket->isActive();
+}
+
+void TcpPacketSocket::close() {
+ m_socket->close();
+}
+
+void TcpPacketSocket::sendPackets(List<PacketPtr> packets) {
+ auto it = makeSMutableIterator(packets);
+
+ while (it.hasNext()) {
+ PacketType currentType = it.peekNext()->type();
+
+ DataStreamBuffer packetBuffer;
+ while (it.hasNext() && it.peekNext()->type() == currentType)
+ it.next()->write(packetBuffer);
+
+ // Packets must read and write actual data, because this is used to
+ // determine packet count
+ starAssert(!packetBuffer.empty());
+
+ ByteArray compressedPackets;
+ if (packetBuffer.size() > 64)
+ compressedPackets = compressData(packetBuffer.data());
+
+ DataStreamBuffer outBuffer;
+ outBuffer.write(currentType);
+
+ if (!compressedPackets.empty() && compressedPackets.size() < packetBuffer.size()) {
+ outBuffer.writeVlqI(-(int)(compressedPackets.size()));
+ outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size());
+ m_outgoingStats.mix(currentType, compressedPackets.size());
+ } else {
+ outBuffer.writeVlqI((int)(packetBuffer.size()));
+ outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size());
+ m_outgoingStats.mix(currentType, packetBuffer.size());
+ }
+ m_outputBuffer.append(outBuffer.takeData());
+ }
+}
+
+List<PacketPtr> TcpPacketSocket::receivePackets() {
+ uint64_t const PacketSizeLimit = 64<<20;
+ List<PacketPtr> packets;
+ try {
+ while (!m_inputBuffer.empty()) {
+ PacketType packetType;
+ uint64_t packetSize = 0;
+ bool packetCompressed = false;
+
+ DataStreamBuffer ds(m_inputBuffer);
+ try {
+ packetType = ds.read<PacketType>();
+ int64_t len = ds.readVlqI();
+ if (len < 0) {
+ packetSize = -len;
+ packetCompressed = true;
+ } else {
+ packetSize = len;
+ packetCompressed = false;
+ }
+ } catch (EofException const&) {
+ // Guard against not having the entire packet header available when
+ // trying to read.
+ break;
+ }
+
+ if (packetSize > PacketSizeLimit)
+ throw IOException::format("Packet size %s exceeds maximum allowed packet size!", packetSize);
+
+ if (packetSize > ds.size() - ds.pos())
+ break;
+
+ ByteArray packetBytes = ds.readBytes(packetSize);
+ if (packetCompressed)
+ packetBytes = uncompressData(packetBytes);
+
+ m_incomingStats.mix(packetType, packetSize);
+
+ DataStreamBuffer packetStream(move(packetBytes));
+ do {
+ PacketPtr packet = createPacket(packetType);
+ packet->read(packetStream);
+ packets.append(move(packet));
+ } while (!packetStream.atEnd());
+
+ m_inputBuffer = ds.readBytes(ds.size() - ds.pos());
+ }
+ } catch (IOException const& e) {
+ Logger::warn("I/O error in TcpPacketSocket::readPackets, closing: %s", outputException(e, false));
+ m_inputBuffer.clear();
+ m_socket->shutdown();
+ }
+ return packets;
+}
+
+bool TcpPacketSocket::sentPacketsPending() const {
+ return !m_outputBuffer.empty();
+}
+
+bool TcpPacketSocket::writeData() {
+ if (!isOpen())
+ return false;
+
+ bool dataSent = false;
+ try {
+ if (m_outputBuffer.empty())
+ return false;
+
+ while (!m_outputBuffer.empty()) {
+ size_t writtenAmount = m_socket->send(m_outputBuffer.ptr(), m_outputBuffer.size());
+ if (writtenAmount == 0)
+ break;
+ dataSent = true;
+ m_outputBuffer.trimLeft(writtenAmount);
+ }
+ } catch (SocketClosedException const& e) {
+ Logger::debug("TcpPacketSocket socket closed: %s", outputException(e, false));
+ } catch (IOException const& e) {
+ Logger::warn("I/O error in TcpPacketSocket::sendData: %s", outputException(e, false));
+ m_socket->shutdown();
+ }
+ return dataSent;
+}
+
+bool TcpPacketSocket::readData() {
+ bool dataReceived = false;
+ try {
+ char readBuffer[1024];
+ while (true) {
+ size_t readAmount = m_socket->receive(readBuffer, 1024);
+ if (readAmount == 0)
+ break;
+ dataReceived = true;
+ m_inputBuffer.append(readBuffer, readAmount);
+ }
+ } catch (SocketClosedException const& e) {
+ Logger::debug("TcpPacketSocket socket closed: %s", outputException(e, false));
+ } catch (IOException const& e) {
+ Logger::warn("I/O error in TcpPacketSocket::receiveData: %s", outputException(e, false));
+ m_socket->shutdown();
+ }
+ return dataReceived;
+}
+
+Maybe<PacketStats> TcpPacketSocket::incomingStats() const {
+ return m_incomingStats.stats();
+}
+
+Maybe<PacketStats> TcpPacketSocket::outgoingStats() const {
+ return m_outgoingStats.stats();
+}
+
+TcpPacketSocket::TcpPacketSocket(TcpSocketPtr socket)
+ : m_socket(move(socket)) {}
+
+P2PPacketSocketUPtr P2PPacketSocket::open(P2PSocketUPtr socket) {
+ return P2PPacketSocketUPtr(new P2PPacketSocket(move(socket)));
+}
+
+bool P2PPacketSocket::isOpen() const {
+ return m_socket && m_socket->isOpen();
+}
+
+void P2PPacketSocket::close() {
+ m_socket.reset();
+}
+
+void P2PPacketSocket::sendPackets(List<PacketPtr> packets) {
+ auto it = makeSMutableIterator(packets);
+
+ while (it.hasNext()) {
+ PacketType currentType = it.peekNext()->type();
+
+ DataStreamBuffer packetBuffer;
+ while (it.hasNext() && it.peekNext()->type() == currentType)
+ it.next()->write(packetBuffer);
+
+ // Packets must read and write actual data, because this is used to
+ // determine packet count
+ starAssert(!packetBuffer.empty());
+
+ ByteArray compressedPackets;
+ if (packetBuffer.size() > 64)
+ compressedPackets = compressData(packetBuffer.data());
+
+ DataStreamBuffer outBuffer;
+ outBuffer.write(currentType);
+
+ if (!compressedPackets.empty() && compressedPackets.size() < packetBuffer.size()) {
+ outBuffer.write<bool>(true);
+ outBuffer.writeData(compressedPackets.ptr(), compressedPackets.size());
+ m_outgoingStats.mix(currentType, compressedPackets.size());
+ } else {
+ outBuffer.write<bool>(false);
+ outBuffer.writeData(packetBuffer.ptr(), packetBuffer.size());
+ m_outgoingStats.mix(currentType, packetBuffer.size());
+ }
+ m_outputMessages.append(outBuffer.takeData());
+ }
+}
+
+List<PacketPtr> P2PPacketSocket::receivePackets() {
+ List<PacketPtr> packets;
+ try {
+ for (auto& inputMessage : take(m_inputMessages)) {
+ DataStreamBuffer ds(move(inputMessage));
+
+ PacketType packetType = ds.read<PacketType>();
+ bool packetCompressed = ds.read<bool>();
+ size_t packetSize = ds.size() - ds.pos();
+
+ ByteArray packetBytes = ds.readBytes(packetSize);
+ if (packetCompressed)
+ packetBytes = uncompressData(packetBytes);
+
+ m_incomingStats.mix(packetType, packetSize);
+
+ DataStreamBuffer packetStream(move(packetBytes));
+ do {
+ PacketPtr packet = createPacket(packetType);
+ packet->read(packetStream);
+ packets.append(move(packet));
+ } while (!packetStream.atEnd());
+ }
+ } catch (IOException const& e) {
+ Logger::warn("I/O error in P2PPacketSocket::readPackets, closing: %s", outputException(e, false));
+ m_socket.reset();
+ }
+ return packets;
+}
+
+bool P2PPacketSocket::sentPacketsPending() const {
+ return !m_outputMessages.empty();
+}
+
+bool P2PPacketSocket::writeData() {
+ bool workDone = false;
+
+ if (m_socket) {
+ while (!m_outputMessages.empty()) {
+ if (m_socket->sendMessage(m_outputMessages.first())) {
+ m_outputMessages.removeFirst();
+ workDone = true;
+ } else {
+ break;
+ }
+ }
+ }
+
+ return workDone;
+}
+
+bool P2PPacketSocket::readData() {
+ bool workDone = false;
+
+ if (m_socket) {
+ while (auto message = m_socket->receiveMessage()) {
+ m_inputMessages.append(message.take());
+ workDone = true;
+ }
+ }
+
+ return workDone;
+}
+
+Maybe<PacketStats> P2PPacketSocket::incomingStats() const {
+ return m_incomingStats.stats();
+}
+
+Maybe<PacketStats> P2PPacketSocket::outgoingStats() const {
+ return m_outgoingStats.stats();
+}
+
+P2PPacketSocket::P2PPacketSocket(P2PSocketPtr socket)
+ : m_socket(move(socket)) {}
+
+}