Skip to content

Commit

Permalink
disable pip[elien
Browse files Browse the repository at this point in the history
  • Loading branch information
Hugoberry committed Apr 16, 2024
1 parent e298328 commit 8dfe988
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 613 deletions.
1 change: 1 addition & 0 deletions .github/workflows/MainDistributionPipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ on:
push:
pull_request:
workflow_dispatch:
if: false

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.head_ref || '' }}-${{ github.base_ref || '' }}-${{ github.ref != 'refs/heads/main' || github.sha }}
Expand Down
11 changes: 2 additions & 9 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@ set(TARGET_NAME pbix)

set(EXTENSION_NAME ${TARGET_NAME}_extension)
set(LOADABLE_EXTENSION_NAME ${TARGET_NAME}_loadable_extension)
set(STRING_ENCODING_TYPE "ICONV" CACHE STRING "Set the way strings have to be encoded (ICONV|WIN32API|NONE|...)")
add_definitions(-DKS_STR_ENCODING_ICONV)


project(${TARGET_NAME})
include_directories(src/include)
include_directories(src/abf)
include_directories(third_party/kaitai)
include_directories(third_party/miniz)
include_directories(third_party/sqlite)
include_directories(third_party/tinyxml2)
Expand All @@ -36,9 +34,6 @@ set(XPRESS9_SOURCES
third_party/xpress9/Xpress9Misc.c
third_party/xpress9/Xpress9Wrapper.cpp)

set(KAITAI_SOURCES
third_party/kaitai/kaitaistream.cpp)

SET(ABF_SOURCES
src/abf/AbfParser.cpp
src/abf/BackupFile.cpp
Expand All @@ -51,8 +46,7 @@ SET(ABF_SOURCES
src/abf/FileList.cpp
src/abf/Languages.cpp
src/abf/LogBackupFile.cpp
src/abf/VirtualDirectory.cpp
src/abf/pbix.cpp)
src/abf/VirtualDirectory.cpp)

set(EXTENSION_SOURCES
src/pbix_extension.cpp
Expand All @@ -64,7 +58,6 @@ set(EXTENSION_SOURCES
${SQLITE_SOURCES}
${TINYXML2_SOURCES}
${XPRESS9_SOURCES}
${KAITAI_SOURCES}
${ABF_SOURCES})

# build_static_extension(${TARGET_NAME} ${EXTENSION_SOURCES})
Expand Down
174 changes: 145 additions & 29 deletions src/abf/AbfParser.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#include "AbfParser.h"
// #include "duckdb/common/file_system.hpp"


using namespace tinyxml2;
using namespace duckdb;

std::vector<uint8_t> AbfParser::read_buffer_bytes(const std::vector<uint8_t> &buffer, uint64_t offset, int size)
{
Expand Down Expand Up @@ -110,28 +112,49 @@ std::pair<uint64_t, uint64_t> AbfParser::initialize_zip_and_locate_datamodel(con
return {file_stat.m_local_header_ofs, file_stat.m_comp_size};
}

std::pair<uint64_t, uint64_t> AbfParser::locate_datamodel(duckdb::ClientContext &context,const std::string &path) {
auto &fs = duckdb::FileSystem::GetFileSystem(context);
auto file_handle = fs.OpenFile(path, FILE_READ);
duckdb::FileHandleStream file_stream(file_handle.get());

kaitai::kstream ks(&file_stream);
std::pair<uint64_t, uint64_t> AbfParser::locate_datamodel(duckdb::FileHandle &file_handle_p, const std::string &path) {
constexpr auto DataModelFileName = "DataModel";
mz_zip_archive zip_archive;
memset(&zip_archive, 0, sizeof(zip_archive));

pbix_t pbix(&ks);

for(auto &section : *pbix.sections()) {
if (section->section_type() == 513) { //Central Directory
auto central_directory = static_cast<pbix_t::central_dir_entry_t*>(section->body());
//check if the file is DataModel
if(central_directory->file_name() == "DataModel") {
return {central_directory->ofs_local_header(), central_directory->len_body_compressed()};
}
}
// Setup the custom IO operations
zip_archive.m_pIO_opaque = &file_handle_p;
zip_archive.m_pRead = [](void *opaque, mz_uint64 file_offset, void *buffer, size_t n) {
auto handle = static_cast<duckdb::FileHandle *>(opaque);
handle->Seek(file_offset);
return static_cast<size_t>(handle->Read(buffer, n));
};

// Initialize the zip archive for reading using the custom IO
if (!mz_zip_reader_init(&zip_archive, file_handle_p.GetFileSize(), MZ_ZIP_FLAG_COMPRESSED_DATA)) { // Note: MZ_ZIP_FLAG_DO_NOT_SORT_CENTRAL_DIRECTORY might be needed depending on use case
throw std::runtime_error("Could not initialize zip reader");
}
throw std::runtime_error("DataModel not found in the zip file.");

// Locate the DataModel file within the zip
int file_index = mz_zip_reader_locate_file(&zip_archive, DataModelFileName, nullptr, 0);
if (file_index < 0) {
mz_zip_reader_end(&zip_archive); // Clean up before throwing
throw std::runtime_error("DataModel not found in the zip file.");
}

// Retrieve information about the DataModel file
mz_zip_archive_file_stat file_stat;
if (!mz_zip_reader_file_stat(&zip_archive, file_index, &file_stat)) {
mz_zip_reader_end(&zip_archive); // Clean up before throwing
throw std::runtime_error("Could not retrieve information about DataModel.");
}

// // Clean up the zip reader as it's no longer needed after getting the info
mz_zip_reader_end(&zip_archive);
// file_handle = file_handle_p.release();

// Return the offset and compressed size of the DataModel file
return {file_stat.m_local_header_ofs, file_stat.m_comp_size};
}


void AbfParser::read_compressed_datamodel_header(std::ifstream &entryStream, uint64_t &datamodel_ofs) {
// Read compressed DataModel header to adjust offset
entryStream.seekg(datamodel_ofs+ZIP_LOCAL_FILE_HEADER_FIXED);
Expand All @@ -145,7 +168,7 @@ void AbfParser::read_compressed_datamodel_header(std::ifstream &entryStream, uin

std::vector<uint8_t> AbfParser::decompress_initial_block(std::ifstream &entryStream, uint64_t datamodel_ofs, XPress9Wrapper &xpress9_wrapper) {
// Seek to the start of the DataModel compressed data
entryStream.seekg(datamodel_ofs + ABF_XPRESS9_SIGNATRUE, std::ios::beg);
entryStream.seekg(datamodel_ofs + ABF_XPRESS9_SIGNATURE, std::ios::beg);

uint32_t uncompressed_size;
uint32_t compressed_size;
Expand All @@ -167,10 +190,39 @@ std::vector<uint8_t> AbfParser::decompress_initial_block(std::ifstream &entryStr
}
return decompressed_buffer;
}
std::vector<uint8_t> AbfParser::decompress_initial_block(duckdb::FileHandle &file_handle_p, uint64_t &bytes_read,XPress9Wrapper &xpress9_wrapper) {
// Seek to the start of the DataModel compressed data
std::vector<uint8_t> signature(ABF_XPRESS9_SIGNATURE);
file_handle_p.Read(reinterpret_cast<char*>(signature.data()),ABF_XPRESS9_SIGNATURE);

bytes_read += ABF_XPRESS9_SIGNATURE;

uint32_t uncompressed_size;
uint32_t compressed_size;
// Read the compressed and uncompressed sizes before the offset
file_handle_p.Read(reinterpret_cast<char*>(&uncompressed_size), sizeof(uint32_t));
file_handle_p.Read(reinterpret_cast<char*>(&compressed_size), sizeof(uint32_t));
bytes_read += sizeof(uint32_t) + sizeof(uint32_t);

// Allocate buffers for compressed and decompressed data
std::vector<uint8_t> decompressed_buffer(uncompressed_size);
std::vector<uint8_t> compressed_buffer(compressed_size);

file_handle_p.Read(reinterpret_cast<char*>(compressed_buffer.data()), compressed_size);
bytes_read += compressed_size;

// Decompress the entire data
uint32_t decompressed_size = xpress9_wrapper.Decompress(compressed_buffer.data(), compressed_size, decompressed_buffer.data(), decompressed_buffer.size());
// Verify that the total decompressed size matches the expected size
if (decompressed_size != uncompressed_size) {
throw std::runtime_error("Mismatch in decompressed block size in first block.");
}
// file_handle = file_handle_p.release();
return decompressed_buffer;
}

std::vector<uint8_t> AbfParser::iterate_and_decompress_blocks(std::ifstream &entryStream, uint64_t datamodel_ofs, uint64_t datamodel_size, XPress9Wrapper &xpress9_wrapper, uint64_t virtual_directory_offset, int virtual_directory_size, const int trailing_blocks, uint64_t &skip_offset) {
// Calculate the total number of blocks
constexpr uint32_t BLOCK_SIZE = 0x200000;
auto total_blocks = (virtual_directory_size + virtual_directory_offset) / BLOCK_SIZE;

std::vector<uint8_t> all_decompressed_data;
Expand Down Expand Up @@ -218,6 +270,61 @@ std::vector<uint8_t> AbfParser::iterate_and_decompress_blocks(std::ifstream &ent
return all_decompressed_data;
}

std::vector<uint8_t> AbfParser::iterate_and_decompress_blocks(duckdb::FileHandle &file_handle_p, uint64_t &bytes_read, uint64_t datamodel_ofs,uint64_t datamodel_size, XPress9Wrapper &xpress9_wrapper, uint64_t virtual_directory_offset, int virtual_directory_size, const int trailing_blocks, uint64_t &skip_offset) {
// Calculate the total number of blocks

auto total_blocks = (virtual_directory_size + virtual_directory_offset) / BLOCK_SIZE;

std::vector<uint8_t> all_decompressed_data;
uint32_t block_index = 0;
uint32_t block_index_iterator = 0;

// Iterate through each block in the DataModel
while (bytes_read < datamodel_size) {
block_index++;
// Read the compressed and uncompressed sizes
uint32_t uncompressed_size = 0;
uint32_t compressed_size = 0;
file_handle_p.Read(reinterpret_cast<char*>(&uncompressed_size), sizeof(uncompressed_size));
file_handle_p.Read(reinterpret_cast<char*>(&compressed_size), sizeof(compressed_size));
bytes_read += sizeof(uncompressed_size) + sizeof(compressed_size);

// Skip blocks if not within the last `trailing_blocks` (based on your logic)
if (total_blocks > trailing_blocks && block_index < (total_blocks - trailing_blocks)) {
skip_offset += uncompressed_size;
bytes_read += compressed_size;
file_handle_p.Seek(datamodel_ofs+bytes_read); // Skip this block
continue;
}

// Allocate buffers for the compressed and decompressed data
std::vector<uint8_t> compressed_buffer(compressed_size);
std::vector<uint8_t> decompressed_buffer(uncompressed_size);

// Read the compressed block
file_handle_p.Read(reinterpret_cast<char*>(compressed_buffer.data()), compressed_size);
bytes_read += compressed_size;

// call to a new function process header_buffer which we'll use to modify compressed_buffer
patch_header_of_compressed_buffer(compressed_buffer, block_index_iterator);

// Decompress the block
uint32_t decompressed_size = xpress9_wrapper.Decompress(compressed_buffer.data(), compressed_size, decompressed_buffer.data(), decompressed_buffer.size());

// Verify decompression success
if (decompressed_size != uncompressed_size) {
throw std::runtime_error("Decompression failed or resulted in unexpected size.");
}

// Add decompressed data to the overall buffer
all_decompressed_data.insert(all_decompressed_data.end(), decompressed_buffer.begin(), decompressed_buffer.end());
}

// file_handle = file_handle_p.release();

return all_decompressed_data;
}

std::vector<uint8_t> AbfParser::get_sqlite(const std::string &path, const int trailing_blocks=15)
{
// Initialize zip and locate DataModel
Expand Down Expand Up @@ -260,33 +367,42 @@ std::vector<uint8_t> AbfParser::get_sqlite(const std::string &path, const int tr
}
std::vector<uint8_t> AbfParser::get_sqlite_v2(duckdb::ClientContext &context, const std::string &path, const int trailing_blocks=15)
{
// Initialize zip and locate DataModel
auto [datamodel_ofs, datamodel_size] = locate_datamodel(context,path);

// Open file stream
std::ifstream entryStream(path, std::ios::binary);
if (!entryStream.is_open()) {
throw std::runtime_error("Could not open pbix file for reading compressed DataModel header.");
auto &fs = duckdb::FileSystem::GetFileSystem(context);
// Open the file using FileSystem
auto file_handle = fs.OpenFile(path, FILE_READ);
if (!file_handle) {
throw std::runtime_error("Could not open zip file");
}

// Read compressed DataModel header to adjust offset
read_compressed_datamodel_header(entryStream, datamodel_ofs);

auto [datamodel_ofs, datamodel_size] = locate_datamodel(*file_handle, path);
uint64_t bytes_read = 0;
uint16_t zip_pointer = 0;

// Read compressed DataModel header to adjust offset
file_handle->Seek(datamodel_ofs+ZIP_LOCAL_FILE_HEADER_FIXED);
uint16_t filename_len = 0;
uint16_t extra_len = 0;
file_handle->Read(reinterpret_cast<char *>(&filename_len), sizeof(filename_len));
file_handle->Read(reinterpret_cast<char *>(&extra_len), sizeof(extra_len));
datamodel_ofs += ZIP_LOCAL_FILE_HEADER + filename_len + extra_len;

file_handle->Seek(datamodel_ofs);

XPress9Wrapper xpress9_wrapper;
if (!xpress9_wrapper.Initialize())
{
throw std::runtime_error("Failed to initialize XPress9Wrapper");
}

// Decompress initial block to get the virtual directory info
auto initial_decompressed_buffer = decompress_initial_block(entryStream, datamodel_ofs, xpress9_wrapper);
auto initial_decompressed_buffer = decompress_initial_block(*file_handle, bytes_read,xpress9_wrapper);

// Process backup log header to get virtual directory offset and size
auto [virtual_directory_offset, virtual_directory_size] = process_backup_log_header(initial_decompressed_buffer);

uint64_t skip_offset = 0; //optimization for skipping blocks
// Iterate through the remaining blocks and decompress them
auto all_decompressed_buffer = iterate_and_decompress_blocks(entryStream, datamodel_ofs, datamodel_size, xpress9_wrapper, virtual_directory_offset, virtual_directory_size, trailing_blocks, skip_offset);
auto all_decompressed_buffer = iterate_and_decompress_blocks(*file_handle, bytes_read, datamodel_ofs, datamodel_size, xpress9_wrapper, virtual_directory_offset, virtual_directory_size, trailing_blocks, skip_offset);

// Prefix all_decompressed_buffer with initial_decompressed_buffer in case we have only one block
all_decompressed_buffer.insert(all_decompressed_buffer.begin(), initial_decompressed_buffer.begin(), initial_decompressed_buffer.end());
Expand Down
15 changes: 8 additions & 7 deletions src/abf/AbfParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,15 @@
#include "Xpress9Wrapper.h"
#include "Crc32.h"
#include "duckdb.hpp"
#include "duckdb/common/file_open_flags.hpp"
#include "pbix.h"
#include "FileHandleStream.h"
#include <sstream>
#include "kaitai/kaitaistream.h"



// Constants related to ZIP file parsing
constexpr unsigned char ZIP_LOCAL_FILE_HEADER_FIXED = 26;
constexpr unsigned char ZIP_LOCAL_FILE_HEADER = 30;
constexpr unsigned char ABF_XPRESS9_SIGNATRUE = 102;
constexpr unsigned char ABF_XPRESS9_SIGNATURE = 102;
constexpr unsigned char ABF_BACKUP_LOG_HEADER_OFFSET = 72;
constexpr uint32_t BLOCK_SIZE = 0x200000;
constexpr unsigned short ABF_BACKUP_LOG_HEADER_SIZE = 0x1000 - ABF_BACKUP_LOG_HEADER_OFFSET;
static constexpr idx_t FILE_READ = idx_t(1 << 0);

Expand All @@ -38,16 +35,20 @@ class AbfParser {
static std::vector<uint8_t> get_sqlite(const std::string &path, const int trailing_chunks);
static std::vector<uint8_t> get_sqlite_v2(duckdb::ClientContext &context,const std::string &path, const int trailing_chunks);
private:
// duckdb::FileHandle *file_handle;
// mz_zip_archive zip_archive;
static void patch_header_of_compressed_buffer(std::vector<uint8_t> &compressed_buffer, uint32_t& block_index_iterator);
static std::vector<uint8_t> read_buffer_bytes(const std::vector<uint8_t>& buffer, uint64_t offset, int size);
static std::vector<uint8_t> trim_buffer(const std::vector<uint8_t>& buffer);
static std::tuple<uint64_t,int> process_backup_log_header(const std::vector<uint8_t> &buffer);
static std::vector<uint8_t> extract_sqlite_buffer(const std::vector<uint8_t> &buffer, uint64_t skip_offset, uint64_t virtual_directory_offset, int virtual_directory_size);
static std::pair<uint64_t, uint64_t> initialize_zip_and_locate_datamodel(const std::string &path);
static std::pair<uint64_t, uint64_t> locate_datamodel(duckdb::ClientContext &context, const std::string &path);
static std::pair<uint64_t, uint64_t> locate_datamodel(duckdb::FileHandle &file_handle, const std::string &path);
static void read_compressed_datamodel_header(std::ifstream &entryStream, uint64_t &datamodel_ofs);
static std::vector<uint8_t> decompress_initial_block(std::ifstream &entryStream, uint64_t datamodel_ofs, XPress9Wrapper &xpress9_wrapper);
static std::vector<uint8_t> decompress_initial_block(duckdb::FileHandle &file_handle, uint64_t &bytes_read, XPress9Wrapper &xpress9_wrapper);
static std::vector<uint8_t> iterate_and_decompress_blocks(std::ifstream &entryStream, uint64_t datamodel_ofs, uint64_t datamodel_size, XPress9Wrapper &xpress9_wrapper, uint64_t virtual_directory_offset, int virtual_directory_size, const int trailing_blocks, uint64_t &skip_offset);
static std::vector<uint8_t> iterate_and_decompress_blocks(duckdb::FileHandle &file_handle, uint64_t &bytes_read, uint64_t datamodel_ofs, uint64_t datamodel_size, XPress9Wrapper &xpress9_wrapper, uint64_t virtual_directory_offset, int virtual_directory_size, const int trailing_blocks, uint64_t &skip_offset);
};

class Header {
Expand Down
54 changes: 0 additions & 54 deletions src/abf/FileHandleStream.cpp

This file was deleted.

Loading

0 comments on commit 8dfe988

Please sign in to comment.