diff options
author | Kae <80987908+Novaenia@users.noreply.github.com> | 2024-03-14 21:41:53 +1100 |
---|---|---|
committer | Kae <80987908+Novaenia@users.noreply.github.com> | 2024-03-14 21:41:53 +1100 |
commit | 662b83ff92cc2316fb962ff1608359f6d705a5f0 (patch) | |
tree | d0e7d15887ed14bd252e6e61888710c8bac2200a /source/core | |
parent | 8164e5ae6fa33c9ec2a14f107585a7cbe7fbf813 (diff) |
Initial commit of experimental zstd network compression
currently a bit buggy
Diffstat (limited to 'source/core')
-rw-r--r-- | source/core/CMakeLists.txt | 2 | ||||
-rw-r--r-- | source/core/StarBuffer.cpp | 4 | ||||
-rw-r--r-- | source/core/StarBuffer.hpp | 2 | ||||
-rw-r--r-- | source/core/StarCompression.cpp | 51 | ||||
-rw-r--r-- | source/core/StarCompression.hpp | 6 | ||||
-rw-r--r-- | source/core/StarDataStreamDevices.cpp | 8 | ||||
-rw-r--r-- | source/core/StarDataStreamDevices.hpp | 3 | ||||
-rw-r--r-- | source/core/StarZSTDCompression.cpp | 84 | ||||
-rw-r--r-- | source/core/StarZSTDCompression.hpp | 44 |
9 files changed, 182 insertions, 22 deletions
diff --git a/source/core/CMakeLists.txt b/source/core/CMakeLists.txt index 687b9c5..4723686 100644 --- a/source/core/CMakeLists.txt +++ b/source/core/CMakeLists.txt @@ -126,6 +126,7 @@ SET (star_core_HEADERS StarWeightedPool.hpp StarWorkerPool.hpp StarXXHash.hpp + StarZSTDCompression.hpp ) SET (star_core_SOURCES @@ -181,6 +182,7 @@ SET (star_core_SOURCES StarUnicode.cpp StarUuid.cpp StarWorkerPool.cpp + StarZSTDCompression.cpp ) IF (STAR_SYSTEM_FAMILY_UNIX) diff --git a/source/core/StarBuffer.cpp b/source/core/StarBuffer.cpp index 0d998af..5b5b2e4 100644 --- a/source/core/StarBuffer.cpp +++ b/source/core/StarBuffer.cpp @@ -263,6 +263,10 @@ bool ExternalBuffer::empty() const { return m_size == 0; } +ExternalBuffer::operator bool() const { + return m_size == 0; +} + void ExternalBuffer::reset(char const* externalData, size_t len) { m_pos = 0; m_bytes = externalData; diff --git a/source/core/StarBuffer.hpp b/source/core/StarBuffer.hpp index 0e1213e..0f9864a 100644 --- a/source/core/StarBuffer.hpp +++ b/source/core/StarBuffer.hpp @@ -105,6 +105,8 @@ public: // Clears buffer, moves position to 0. bool empty() const; + operator bool() const; + // Reset buffer with new contents, moves position to 0. void reset(char const* externalData, size_t len); diff --git a/source/core/StarCompression.cpp b/source/core/StarCompression.cpp index 2aa59b2..58b43ef 100644 --- a/source/core/StarCompression.cpp +++ b/source/core/StarCompression.cpp @@ -15,9 +15,9 @@ void compressData(ByteArray const& in, ByteArray& out, CompressionLevel compress return; const size_t BUFSIZE = 32 * 1024; - unsigned char temp_buffer[BUFSIZE]; + auto tempBuffer = std::make_unique<unsigned char[]>(BUFSIZE); - z_stream strm; + z_stream strm{}; strm.zalloc = Z_NULL; strm.zfree = Z_NULL; strm.opaque = Z_NULL; @@ -27,13 +27,13 @@ void compressData(ByteArray const& in, ByteArray& out, CompressionLevel compress strm.next_in = (unsigned char*)in.ptr(); strm.avail_in = in.size(); - strm.next_out = temp_buffer; + strm.next_out = tempBuffer.get(); strm.avail_out = BUFSIZE; while (deflate_res == Z_OK) { deflate_res = deflate(&strm, Z_FINISH); if (strm.avail_out == 0) { - out.append((char const*)temp_buffer, BUFSIZE); - strm.next_out = temp_buffer; + out.append((char const*)tempBuffer.get(), BUFSIZE); + strm.next_out = tempBuffer.get(); strm.avail_out = BUFSIZE; } } @@ -42,7 +42,7 @@ void compressData(ByteArray const& in, ByteArray& out, CompressionLevel compress if (deflate_res != Z_STREAM_END) throw IOException(strf("Internal error in uncompressData, deflate_res is {}", deflate_res)); - out.append((char const*)temp_buffer, BUFSIZE - strm.avail_out); + out.append((char const*)tempBuffer.get(), BUFSIZE - strm.avail_out); } ByteArray compressData(ByteArray const& in, CompressionLevel compression) { @@ -51,16 +51,16 @@ ByteArray compressData(ByteArray const& in, CompressionLevel compression) { return out; } -void uncompressData(ByteArray const& in, ByteArray& out) { +void uncompressData(const char* in, size_t inLen, ByteArray& out, size_t limit) { out.clear(); - if (in.empty()) + if (!inLen) return; const size_t BUFSIZE = 32 * 1024; - unsigned char temp_buffer[BUFSIZE]; + auto tempBuffer = std::make_unique<unsigned char[]>(BUFSIZE); - z_stream strm; + z_stream strm{}; strm.zalloc = Z_NULL; strm.zfree = Z_NULL; strm.opaque = Z_NULL; @@ -68,17 +68,22 @@ void uncompressData(ByteArray const& in, ByteArray& out) { if (inflate_res != Z_OK) throw IOException(strf("Failed to initialise inflate ({})", inflate_res)); - strm.next_in = (unsigned char*)in.ptr(); - strm.avail_in = in.size(); - strm.next_out = temp_buffer; + strm.next_in = (unsigned char*)in; + strm.avail_in = inLen; + strm.next_out = tempBuffer.get(); strm.avail_out = BUFSIZE; while (inflate_res == Z_OK || inflate_res == Z_BUF_ERROR) { inflate_res = inflate(&strm, Z_FINISH); if (strm.avail_out == 0) { - out.append((char const*)temp_buffer, BUFSIZE); - strm.next_out = temp_buffer; + out.append((char const*)tempBuffer.get(), BUFSIZE); + strm.next_out = tempBuffer.get(); strm.avail_out = BUFSIZE; + if (limit && out.size() >= limit) { + inflateEnd(&strm); + throw IOException(strf("hit uncompressData limit of {} bytes", limit)); + break; + } } else if (inflate_res == Z_BUF_ERROR) { break; } @@ -88,15 +93,23 @@ void uncompressData(ByteArray const& in, ByteArray& out) { if (inflate_res != Z_STREAM_END) throw IOException(strf("Internal error in uncompressData, inflate_res is {}", inflate_res)); - out.append((char const*)temp_buffer, BUFSIZE - strm.avail_out); + out.append((char const*)tempBuffer.get(), BUFSIZE - strm.avail_out); } -ByteArray uncompressData(ByteArray const& in) { - ByteArray out = ByteArray::withReserve(in.size()); - uncompressData(in, out); +ByteArray uncompressData(const char* in, size_t inLen, size_t limit) { + ByteArray out = ByteArray::withReserve(inLen); + uncompressData(in, inLen, out, limit); return out; } +void uncompressData(ByteArray const& in, ByteArray& out, size_t limit) { + uncompressData(in.ptr(), in.size(), out, limit); +} + +ByteArray uncompressData(ByteArray const& in, size_t limit) { + return uncompressData(in.ptr(), in.size(), limit); +} + CompressedFilePtr CompressedFile::open(String const& filename, IOMode mode, CompressionLevel comp) { CompressedFilePtr f = make_shared<CompressedFile>(filename); f->open(mode, comp); diff --git a/source/core/StarCompression.hpp b/source/core/StarCompression.hpp index 56dc774..3322662 100644 --- a/source/core/StarCompression.hpp +++ b/source/core/StarCompression.hpp @@ -17,8 +17,10 @@ CompressionLevel const HighCompression = 9; void compressData(ByteArray const& in, ByteArray& out, CompressionLevel compression = MediumCompression); ByteArray compressData(ByteArray const& in, CompressionLevel compression = MediumCompression); -void uncompressData(ByteArray const& in, ByteArray& out); -ByteArray uncompressData(ByteArray const& in); +void uncompressData(const char* in, size_t inLen, ByteArray& out, size_t limit = 0); +ByteArray uncompressData(const char* in, size_t inLen, size_t limit = 0); +void uncompressData(ByteArray const& in, ByteArray& out, size_t limit = 0); +ByteArray uncompressData(ByteArray const& in, size_t limit = 0); // Random access to a (potentially) compressed file. class CompressedFile : public IODevice { diff --git a/source/core/StarDataStreamDevices.cpp b/source/core/StarDataStreamDevices.cpp index 58ec920..b769167 100644 --- a/source/core/StarDataStreamDevices.cpp +++ b/source/core/StarDataStreamDevices.cpp @@ -130,7 +130,9 @@ void DataStreamBuffer::writeData(char const* data, size_t len) { m_buffer->writeFull(data, len); } -DataStreamExternalBuffer::DataStreamExternalBuffer() {} +DataStreamExternalBuffer::DataStreamExternalBuffer() : m_buffer() {} + +DataStreamExternalBuffer::DataStreamExternalBuffer(DataStreamBuffer const& buffer) : DataStreamExternalBuffer(buffer.ptr(), buffer.size()) {} DataStreamExternalBuffer::DataStreamExternalBuffer(char const* externalData, size_t len) : DataStreamExternalBuffer() { reset(externalData, len); @@ -160,6 +162,10 @@ size_t DataStreamExternalBuffer::pos() { return m_buffer.pos(); } +size_t DataStreamExternalBuffer::remaining() { + return m_buffer.dataSize() - m_buffer.pos(); +} + void DataStreamExternalBuffer::reset(char const* externalData, size_t len) { m_buffer.reset(externalData, len); } diff --git a/source/core/StarDataStreamDevices.hpp b/source/core/StarDataStreamDevices.hpp index dce7dcf..5d404ab 100644 --- a/source/core/StarDataStreamDevices.hpp +++ b/source/core/StarDataStreamDevices.hpp @@ -126,6 +126,8 @@ private: class DataStreamExternalBuffer : public DataStream { public: DataStreamExternalBuffer(); + DataStreamExternalBuffer(DataStreamBuffer const& buffer); + DataStreamExternalBuffer(DataStreamExternalBuffer const& buffer) = default; DataStreamExternalBuffer(char const* externalData, size_t len); char const* ptr() const; @@ -136,6 +138,7 @@ public: void seek(size_t pos, IOSeek mode = IOSeek::Absolute); bool atEnd(); size_t pos(); + size_t remaining(); void reset(char const* externalData, size_t len); diff --git a/source/core/StarZSTDCompression.cpp b/source/core/StarZSTDCompression.cpp new file mode 100644 index 0000000..733b182 --- /dev/null +++ b/source/core/StarZSTDCompression.cpp @@ -0,0 +1,84 @@ +#include "StarZSTDCompression.hpp" +#include <zstd.h> + +namespace Star { + +CompressionStream::CompressionStream() : m_cStream(ZSTD_createCStream()) { + ZSTD_CCtx_setParameter(m_cStream, ZSTD_c_enableLongDistanceMatching, 1); + ZSTD_CCtx_setParameter(m_cStream, ZSTD_c_windowLog, 24); + ZSTD_initCStream(m_cStream, 2); +} + +CompressionStream::~CompressionStream() { ZSTD_freeCStream(m_cStream); } + +ByteArray CompressionStream::compress(const char* in, size_t inLen) { + size_t const cInSize = ZSTD_CStreamInSize (); + size_t const cOutSize = ZSTD_CStreamOutSize(); + ByteArray output(cOutSize, 0); + size_t written = 0, read = 0; + while (read < inLen) { + ZSTD_inBuffer inBuffer = {in + read, min(cInSize, inLen - read), 0}; + ZSTD_outBuffer outBuffer = {output.ptr() + written, output.size() - written, 0}; + bool finished = false; + do { + size_t ret = ZSTD_compressStream2(m_cStream, &outBuffer, &inBuffer, ZSTD_e_flush); + if (ZSTD_isError(ret)) { + throw IOException(strf("ZSTD compression error {}", ZSTD_getErrorName(ret))); + break; + } + + if (outBuffer.pos == outBuffer.size) { + output.resize(output.size() * 2); + outBuffer.dst = output.ptr(); + outBuffer.size = output.size(); + continue; + } + + finished = ret == 0 && inBuffer.pos == inBuffer.size; + } while (!finished); + + read += inBuffer.pos; + written += outBuffer.pos; + } + output.resize(written); + return output; +} + +DecompressionStream::DecompressionStream() : m_dStream(ZSTD_createDStream()) { + ZSTD_DCtx_setParameter(m_dStream, ZSTD_d_windowLogMax, 25); + ZSTD_initDStream(m_dStream); +} + +DecompressionStream::~DecompressionStream() { ZSTD_freeDStream(m_dStream); } + +ByteArray DecompressionStream::decompress(const char* in, size_t inLen) { + size_t const dInSize = ZSTD_DStreamInSize (); + size_t const dOutSize = ZSTD_DStreamOutSize(); + ByteArray output(dOutSize, 0); + size_t written = 0, read = 0; + while (read < inLen) { + ZSTD_inBuffer inBuffer = {in + read, min(dInSize, inLen - read), 0}; + ZSTD_outBuffer outBuffer = {output.ptr() + written, output.size() - written, 0}; + do { + size_t ret = ZSTD_decompressStream(m_dStream, &outBuffer, &inBuffer); + if (ZSTD_isError(ret)) { + throw IOException(strf("ZSTD decompression error {}", ZSTD_getErrorName(ret))); + break; + } + + if (outBuffer.pos == outBuffer.size) { + output.resize(output.size() * 2); + outBuffer.dst = output.ptr(); + outBuffer.size = output.size(); + continue; + } + } while (inBuffer.pos < inBuffer.size); + + read += inBuffer.pos; + written += outBuffer.pos; + } + output.resize(written); + return output; +} + +}
\ No newline at end of file diff --git a/source/core/StarZSTDCompression.hpp b/source/core/StarZSTDCompression.hpp new file mode 100644 index 0000000..77719bf --- /dev/null +++ b/source/core/StarZSTDCompression.hpp @@ -0,0 +1,44 @@ +#pragma once +#include "StarByteArray.hpp" +#include "StarDataStreamDevices.hpp" + +typedef struct ZSTD_CCtx_s ZSTD_CCtx; +typedef struct ZSTD_DCtx_s ZSTD_DCtx; +typedef ZSTD_DCtx ZSTD_DStream; +typedef ZSTD_CCtx ZSTD_CStream; + +namespace Star { + +class CompressionStream { +public: + CompressionStream(); + ~CompressionStream(); + + ByteArray compress(const char* in, size_t inLen); + ByteArray compress(ByteArray const& in); + +private: + ZSTD_CStream* m_cStream; +}; + +inline ByteArray CompressionStream::compress(ByteArray const& in) { + return compress(in.ptr(), in.size()); +} + +class DecompressionStream { +public: + DecompressionStream(); + ~DecompressionStream(); + + ByteArray decompress(const char* in, size_t inLen); + ByteArray decompress(ByteArray const& in); + +private: + ZSTD_DStream* m_dStream; +}; + +inline ByteArray DecompressionStream::decompress(ByteArray const& in) { + return decompress(in.ptr(), in.size()); +} + +}
\ No newline at end of file |