From f20ef1352a2ec321a42a75e9f0944eba105bac5e Mon Sep 17 00:00:00 2001 From: akvlad Date: Mon, 10 Mar 2025 18:17:15 +0200 Subject: [PATCH 1/7] WIP/init --- chsql/CMakeLists.txt | 2 +- chsql/src/include/chsql_parquet_types.h | 34 ++++ chsql/src/parquet_ordered_scan.cpp | 32 +--- chsql/src/parquet_types.cpp | 242 ++++++++++++++++++++++++ 4 files changed, 283 insertions(+), 27 deletions(-) create mode 100644 chsql/src/include/chsql_parquet_types.h create mode 100644 chsql/src/parquet_types.cpp diff --git a/chsql/CMakeLists.txt b/chsql/CMakeLists.txt index 5460e12..93ce21b 100644 --- a/chsql/CMakeLists.txt +++ b/chsql/CMakeLists.txt @@ -21,7 +21,7 @@ include_directories( ../duckdb/third_party/mbedtls ../duckdb/third_party/mbedtls/include ../duckdb/third_party/brotli/include) -set(EXTENSION_SOURCES src/chsql_extension.cpp src/duck_flock.cpp src/chsql_system.cpp) +set(EXTENSION_SOURCES src/chsql_extension.cpp src/duck_flock.cpp src/chsql_system.cpp src/parquet_types.cpp) build_static_extension(${TARGET_NAME} ${EXTENSION_SOURCES}) build_loadable_extension(${TARGET_NAME} " " ${EXTENSION_SOURCES}) # Link OpenSSL in both the static library as the loadable extension diff --git a/chsql/src/include/chsql_parquet_types.h b/chsql/src/include/chsql_parquet_types.h new file mode 100644 index 0000000..179a9f0 --- /dev/null +++ b/chsql/src/include/chsql_parquet_types.h @@ -0,0 +1,34 @@ +// +// Created by hromozeka on 10.03.25. +// + +#ifndef PARQUET_TYPES_H +#define PARQUET_TYPES_H + +#include "duckdb.hpp" +#include + +struct ParquetType { + /*duckdb_parquet::ConvertedType::type -> replaced to int to support -1 nodata value*/ + int converted_type; + duckdb_parquet::Type::type parquet_type; + const duckdb::LogicalType &logical_type; + ParquetType(int converted_type, duckdb_parquet::Type::type parquet_type, + const duckdb::LogicalType &logical_type) + : converted_type(converted_type), parquet_type(parquet_type), logical_type(logical_type) {} + bool check_type(const duckdb::vector &schema, idx_t idx); +}; + +class ParquetTypesManager { + protected: + std::vector types; + static ParquetTypesManager *instance; + static std::mutex instance_mutex; + ParquetTypesManager(); + static ParquetTypesManager* get_instance(); + duckdb::LogicalType derive_logical_type(const duckdb_parquet::SchemaElement &s_ele, bool binary_as_string); + public: + static duckdb::LogicalType get_logical_type(const duckdb::vector &schema, idx_t idx); +}; + +#endif //PARQUET_TYPES_H diff --git a/chsql/src/parquet_ordered_scan.cpp b/chsql/src/parquet_ordered_scan.cpp index 88585c4..f1a4e11 100644 --- a/chsql/src/parquet_ordered_scan.cpp +++ b/chsql/src/parquet_ordered_scan.cpp @@ -3,6 +3,7 @@ #include #include "chsql_extension.hpp" #include +#include "chsql_parquet_types.h" namespace duckdb { @@ -113,36 +114,15 @@ namespace duckdb { po.binary_as_string = true; ParquetReader reader(context, file, po, nullptr); set->columnMap = vector(); - for (auto &el : reader.metadata->metadata->schema) { + for (auto it = reader.metadata->metadata->schema.begin(); + it!= reader.metadata->metadata->schema.end(); ++it) { + auto &el = *it; if (el.num_children != 0) { continue; } auto name_it = std::find(names.begin(), names.end(), el.name); - auto return_type = LogicalType::ANY; - switch (el.type) { - case Type::INT32: - return_type = LogicalType::INTEGER; - break; - case Type::INT64: - return_type = LogicalType::BIGINT; - break; - case Type::DOUBLE: - return_type = LogicalType::DOUBLE; - break; - case Type::FLOAT: - return_type = LogicalType::FLOAT; - break; - case Type::BYTE_ARRAY: - return_type = LogicalType::VARCHAR; - case Type::FIXED_LEN_BYTE_ARRAY: - return_type = LogicalType::VARCHAR; - break; - case Type::BOOLEAN: - return_type = LogicalType::TINYINT; - break; - default: - break;; - } + auto return_type = ParquetTypesManager::get_logical_type(reader.metadata->metadata->schema, + it - reader.metadata->metadata->schema.begin()); set->columnMap.push_back(name_it - names.begin()); if (el.name == res->orderBy) { set->orderByIdx = name_it - names.begin(); diff --git a/chsql/src/parquet_types.cpp b/chsql/src/parquet_types.cpp new file mode 100644 index 0000000..6464adf --- /dev/null +++ b/chsql/src/parquet_types.cpp @@ -0,0 +1,242 @@ +#include "chsql_parquet_types.h" + +bool ParquetType::check_type(const duckdb::vector &schema, idx_t idx) { + auto &el = schema[idx]; + if (el.type != parquet_type) { + return false; + } + if (converted_type == -1) { + return !el.__isset.converted_type; + } + return int(el.converted_type) == converted_type; +}; + +ParquetTypesManager::ParquetTypesManager() { + /*UTF8 = 0, + MAP = 1, -> no support + MAP_KEY_VALUE = 2, -> no support + LIST = 3, -> no support + ENUM = 4, -> no support + DECIMAL = 5, -> no support + DATE = 6, + TIME_MILLIS = 7 + TIME_MICROS = 8 + TIMESTAMP_MILLIS = 9, + TIMESTAMP_MICROS = 10, + UINT_8 = 11, + UINT_16 = 12, + UINT_32 = 13, + UINT_64 = 14, + INT_8 = 15, + INT_16 = 16, + INT_32 = 17, + INT_64 = 18, + JSON = 19, -> no support + BSON = 20, -> no support + INTERVAL = 21 -> no support + */ + types.push_back(ParquetType(duckdb_parquet::ConvertedType::UTF8, duckdb_parquet::Type::BYTE_ARRAY, duckdb::LogicalType::VARCHAR)); + types.push_back(ParquetType(duckdb_parquet::ConvertedType::DATE, duckdb_parquet::Type::INT32, duckdb::LogicalType::DATE)); + types.push_back(ParquetType(duckdb_parquet::ConvertedType::TIME_MILLIS, duckdb_parquet::Type::INT32, duckdb::LogicalType::TIME)); + types.push_back(ParquetType(duckdb_parquet::ConvertedType::TIME_MICROS, duckdb_parquet::Type::INT64, duckdb::LogicalType::TIME)); + types.push_back(ParquetType(duckdb_parquet::ConvertedType::TIMESTAMP_MILLIS, duckdb_parquet::Type::INT64, duckdb::LogicalType::TIMESTAMP)); + types.push_back(ParquetType(duckdb_parquet::ConvertedType::TIMESTAMP_MICROS, duckdb_parquet::Type::INT64, duckdb::LogicalType::TIMESTAMP)); + types.push_back(ParquetType(duckdb_parquet::ConvertedType::UINT_8, duckdb_parquet::Type::INT32, duckdb::LogicalType::UTINYINT)); + types.push_back(ParquetType(duckdb_parquet::ConvertedType::UINT_16, duckdb_parquet::Type::INT32, duckdb::LogicalType::USMALLINT)); + types.push_back(ParquetType(duckdb_parquet::ConvertedType::UINT_32, duckdb_parquet::Type::INT32, duckdb::LogicalType::UINTEGER)); + types.push_back(ParquetType(duckdb_parquet::ConvertedType::UINT_64, duckdb_parquet::Type::INT64, duckdb::LogicalType::UBIGINT)); + types.push_back(ParquetType(duckdb_parquet::ConvertedType::INT_8, duckdb_parquet::Type::INT32, duckdb::LogicalType::TINYINT)); + types.push_back(ParquetType(duckdb_parquet::ConvertedType::INT_16, duckdb_parquet::Type::INT32, duckdb::LogicalType::SMALLINT)); + types.push_back(ParquetType(duckdb_parquet::ConvertedType::INT_32, duckdb_parquet::Type::INT32, duckdb::LogicalType::INTEGER)); + types.push_back(ParquetType(duckdb_parquet::ConvertedType::INT_64, duckdb_parquet::Type::INT64, duckdb::LogicalType::BIGINT)); +}; + +ParquetTypesManager *ParquetTypesManager::instance = nullptr; +std::mutex ParquetTypesManager::instance_mutex; + +ParquetTypesManager *ParquetTypesManager::get_instance() { + std::lock_guard lock(instance_mutex); + if (instance == nullptr) { + instance = new ParquetTypesManager(); + } + return instance; +} + +duckdb::LogicalType ParquetTypesManager::get_logical_type(const duckdb::vector &schema, idx_t idx) { + auto inst = get_instance(); + return inst->derive_logical_type(schema[idx], false); + /*for (auto it = inst->types.begin(); it != inst->types.end(); ++it) { + if (it->check_type(schema, idx)) { + return it->logical_type; + } + } + throw std::runtime_error("Unsupported Parquet type");*/ +} + +duckdb::LogicalType ParquetTypesManager::derive_logical_type(const duckdb_parquet::SchemaElement &s_ele, bool binary_as_string) { + // inner node + if (s_ele.type == duckdb_parquet::Type::FIXED_LEN_BYTE_ARRAY && !s_ele.__isset.type_length) { + throw duckdb::IOException("FIXED_LEN_BYTE_ARRAY requires length to be set"); + } + if (s_ele.__isset.logicalType) { + if (s_ele.logicalType.__isset.UUID) { + if (s_ele.type == duckdb_parquet::Type::FIXED_LEN_BYTE_ARRAY) { + return duckdb::LogicalType::UUID; + } + } else if (s_ele.logicalType.__isset.TIMESTAMP) { + if (s_ele.logicalType.TIMESTAMP.isAdjustedToUTC) { + return duckdb::LogicalType::TIMESTAMP_TZ; + } else if (s_ele.logicalType.TIMESTAMP.unit.__isset.NANOS) { + return duckdb::LogicalType::TIMESTAMP_NS; + } + return duckdb::LogicalType::TIMESTAMP; + } else if (s_ele.logicalType.__isset.TIME) { + if (s_ele.logicalType.TIME.isAdjustedToUTC) { + return duckdb::LogicalType::TIME_TZ; + } + return duckdb::LogicalType::TIME; + } + } + if (s_ele.__isset.converted_type) { + // Legacy NULL type, does no longer exist, but files are still around of course + if (static_cast(s_ele.converted_type) == 24) { + return duckdb::LogicalTypeId::SQLNULL; + } + switch (s_ele.converted_type) { + case duckdb_parquet::ConvertedType::INT_8: + if (s_ele.type == duckdb_parquet::Type::INT32) { + return duckdb::LogicalType::TINYINT; + } else { + throw duckdb::IOException("INT8 converted type can only be set for value of Type::INT32"); + } + case duckdb_parquet::ConvertedType::INT_16: + if (s_ele.type == duckdb_parquet::Type::INT32) { + return duckdb::LogicalType::SMALLINT; + } else { + throw duckdb::IOException("INT16 converted type can only be set for value of Type::INT32"); + } + case duckdb_parquet::ConvertedType::INT_32: + if (s_ele.type == duckdb_parquet::Type::INT32) { + return duckdb::LogicalType::INTEGER; + } else { + throw duckdb::IOException("INT32 converted type can only be set for value of Type::INT32"); + } + case duckdb_parquet::ConvertedType::INT_64: + if (s_ele.type == duckdb_parquet::Type::INT64) { + return duckdb::LogicalType::BIGINT; + } else { + throw duckdb::IOException("INT64 converted type can only be set for value of Type::INT32"); + } + case duckdb_parquet::ConvertedType::UINT_8: + if (s_ele.type == duckdb_parquet::Type::INT32) { + return duckdb::LogicalType::UTINYINT; + } else { + throw duckdb::IOException("UINT8 converted type can only be set for value of Type::INT32"); + } + case duckdb_parquet::ConvertedType::UINT_16: + if (s_ele.type == duckdb_parquet::Type::INT32) { + return duckdb::LogicalType::USMALLINT; + } else { + throw duckdb::IOException("UINT16 converted type can only be set for value of Type::INT32"); + } + case duckdb_parquet::ConvertedType::UINT_32: + if (s_ele.type == duckdb_parquet::Type::INT32) { + return duckdb::LogicalType::UINTEGER; + } else { + throw duckdb::IOException("UINT32 converted type can only be set for value of Type::INT32"); + } + case duckdb_parquet::ConvertedType::UINT_64: + if (s_ele.type == duckdb_parquet::Type::INT64) { + return duckdb::LogicalType::UBIGINT; + } else { + throw duckdb::IOException("UINT64 converted type can only be set for value of Type::INT64"); + } + case duckdb_parquet::ConvertedType::DATE: + if (s_ele.type == duckdb_parquet::Type::INT32) { + return duckdb::LogicalType::DATE; + } else { + throw duckdb::IOException("DATE converted type can only be set for value of Type::INT32"); + } + case duckdb_parquet::ConvertedType::TIMESTAMP_MICROS: + case duckdb_parquet::ConvertedType::TIMESTAMP_MILLIS: + if (s_ele.type == duckdb_parquet::Type::INT64) { + return duckdb::LogicalType::TIMESTAMP; + } else { + throw duckdb::IOException("TIMESTAMP converted type can only be set for value of Type::INT64"); + } + case duckdb_parquet::ConvertedType::DECIMAL: + if (!s_ele.__isset.precision || !s_ele.__isset.scale) { + throw duckdb::IOException("DECIMAL requires a length and scale specifier!"); + } + if (s_ele.precision > duckdb::DecimalType::MaxWidth()) { + return duckdb::LogicalType::DOUBLE; + } + switch (s_ele.type) { + case duckdb_parquet::Type::BYTE_ARRAY: + case duckdb_parquet::Type::FIXED_LEN_BYTE_ARRAY: + case duckdb_parquet::Type::INT32: + case duckdb_parquet::Type::INT64: + return duckdb::LogicalType::DECIMAL(s_ele.precision, s_ele.scale); + default: + throw duckdb::IOException( + "DECIMAL converted type can only be set for value of Type::(FIXED_LEN_)BYTE_ARRAY/INT32/INT64"); + } + case duckdb_parquet::ConvertedType::UTF8: + case duckdb_parquet::ConvertedType::ENUM: + switch (s_ele.type) { + case duckdb_parquet::Type::BYTE_ARRAY: + case duckdb_parquet::Type::FIXED_LEN_BYTE_ARRAY: + return duckdb::LogicalType::VARCHAR; + default: + throw duckdb::IOException("UTF8 converted type can only be set for Type::(FIXED_LEN_)BYTE_ARRAY"); + } + case duckdb_parquet::ConvertedType::TIME_MILLIS: + if (s_ele.type == duckdb_parquet::Type::INT32) { + return duckdb::LogicalType::TIME; + } else { + throw duckdb::IOException("TIME_MILLIS converted type can only be set for value of Type::INT32"); + } + case duckdb_parquet::ConvertedType::TIME_MICROS: + if (s_ele.type == duckdb_parquet::Type::INT64) { + return duckdb::LogicalType::TIME; + } else { + throw duckdb::IOException("TIME_MICROS converted type can only be set for value of Type::INT64"); + } + case duckdb_parquet::ConvertedType::INTERVAL: + return duckdb::LogicalType::INTERVAL; + case duckdb_parquet::ConvertedType::JSON: + return duckdb::LogicalType::JSON(); + case duckdb_parquet::ConvertedType::MAP: + case duckdb_parquet::ConvertedType::MAP_KEY_VALUE: + case duckdb_parquet::ConvertedType::LIST: + case duckdb_parquet::ConvertedType::BSON: + default: + throw duckdb::IOException("Unsupported converted type (%d)", (int32_t)s_ele.converted_type); + } + } else { + // no converted type set + // use default type for each physical type + switch (s_ele.type) { + case duckdb_parquet::Type::BOOLEAN: + return duckdb::LogicalType::BOOLEAN; + case duckdb_parquet::Type::INT32: + return duckdb::LogicalType::INTEGER; + case duckdb_parquet::Type::INT64: + return duckdb::LogicalType::BIGINT; + case duckdb_parquet::Type::INT96: // always a timestamp it would seem + return duckdb::LogicalType::TIMESTAMP; + case duckdb_parquet::Type::FLOAT: + return duckdb::LogicalType::FLOAT; + case duckdb_parquet::Type::DOUBLE: + return duckdb::LogicalType::DOUBLE; + case duckdb_parquet::Type::BYTE_ARRAY: + case duckdb_parquet::Type::FIXED_LEN_BYTE_ARRAY: + if (binary_as_string) { + return duckdb::LogicalType::VARCHAR; + } + return duckdb::LogicalType::BLOB; + default: + return duckdb::LogicalType::INVALID; + } + } +} From abe9e4b2b4954c796144237a4595432df5949083 Mon Sep 17 00:00:00 2001 From: akvlad Date: Fri, 14 Mar 2025 18:11:09 +0200 Subject: [PATCH 2/7] WIP --- chsql/src/include/chsql_parquet_types.h | 32 ++- chsql/src/parquet_ordered_scan.cpp | 205 ++++++++------ chsql/src/parquet_types.cpp | 347 +++++++++--------------- 3 files changed, 276 insertions(+), 308 deletions(-) diff --git a/chsql/src/include/chsql_parquet_types.h b/chsql/src/include/chsql_parquet_types.h index 179a9f0..870d1ba 100644 --- a/chsql/src/include/chsql_parquet_types.h +++ b/chsql/src/include/chsql_parquet_types.h @@ -11,17 +11,37 @@ struct ParquetType { /*duckdb_parquet::ConvertedType::type -> replaced to int to support -1 nodata value*/ int converted_type; - duckdb_parquet::Type::type parquet_type; - const duckdb::LogicalType &logical_type; - ParquetType(int converted_type, duckdb_parquet::Type::type parquet_type, - const duckdb::LogicalType &logical_type) + /* duckdb_parquet::Type::type -> replaced to int to support -1 for no matter value */ + int parquet_type; + const duckdb::LogicalType logical_type; + ParquetType(int converted_type, int parquet_type, const duckdb::LogicalType &logical_type) : converted_type(converted_type), parquet_type(parquet_type), logical_type(logical_type) {} - bool check_type(const duckdb::vector &schema, idx_t idx); + virtual bool check_type(const duckdb::vector &schema, idx_t idx); + virtual duckdb::LogicalType get_logical_type(const duckdb_parquet::SchemaElement &schema); +}; + +struct LogicalParquetType : public ParquetType { + bool (*get_isset)(const duckdb_parquet::SchemaElement& el); + + LogicalParquetType(bool (*get_isset) (const duckdb_parquet::SchemaElement& el), + const duckdb::LogicalType& logical_type) + : ParquetType(-1, duckdb_parquet::Type::type::INT32, logical_type), get_isset(get_isset) {} + bool check_type(const duckdb::vector &schema, idx_t idx) override; +}; + +struct JSONParquetType : public ParquetType { + JSONParquetType(): ParquetType(duckdb_parquet::ConvertedType::JSON, -1, duckdb::LogicalType::SQLNULL) {} + duckdb::LogicalType get_logical_type(const duckdb_parquet::SchemaElement &schema) override; +}; + +struct DecimalParquetType : public ParquetType { + DecimalParquetType(): ParquetType(-1, duckdb_parquet::Type::type::INT32, duckdb::LogicalType::SQLNULL) {} + bool check_type(const duckdb::vector &schema, idx_t idx) override; + duckdb::LogicalType get_logical_type(const duckdb_parquet::SchemaElement &schema) override; }; class ParquetTypesManager { protected: - std::vector types; static ParquetTypesManager *instance; static std::mutex instance_mutex; ParquetTypesManager(); diff --git a/chsql/src/parquet_ordered_scan.cpp b/chsql/src/parquet_ordered_scan.cpp index f1a4e11..a9715f7 100644 --- a/chsql/src/parquet_ordered_scan.cpp +++ b/chsql/src/parquet_ordered_scan.cpp @@ -7,21 +7,70 @@ namespace duckdb { + struct ReturnColumn { + string name; + LogicalType type; + }; + struct ReaderSet { unique_ptr reader; - idx_t orderByIdx; + vector returnColumns; + int64_t orderByIdx; unique_ptr chunk; unique_ptr scanState; - vector columnMap; + vector columnMap; idx_t result_idx; + bool haveAbsentColumns; + void populateColumnInfo(const vector& returnCols, const string& order_by_column) { + this->returnColumns = returnCols; + columnMap.clear(); + haveAbsentColumns = false; + for (auto it = returnCols.begin(); it!= returnCols.end(); ++it) { + auto schema_column = find_if( + reader->metadata->metadata->schema.begin(), + reader->metadata->metadata->schema.end(), + [&](const SchemaElement& column) { return column.name == it->name; }); + if (schema_column == reader->metadata->metadata->schema.end()) { + columnMap.push_back(-1); + haveAbsentColumns = true; + continue; + } + columnMap.push_back(schema_column - reader->metadata->metadata->schema.begin() - 1); + reader->reader_data.column_ids.push_back(schema_column - reader->metadata->metadata->schema.begin() - 1); + reader->reader_data.column_mapping.push_back(it - returnCols.begin()); + } + auto order_by_column_it = find_if( + reader->metadata->metadata->schema.begin(), + reader->metadata->metadata->schema.end(), + [&](const SchemaElement& column) { return column.name == order_by_column; }); + if (order_by_column_it == reader->metadata->metadata->schema.end()) { + orderByIdx = -1; + } else { + orderByIdx = order_by_column_it - reader->metadata->metadata->schema.begin() - 1; + } + } + void Scan() { + chunk->Reset(); + reader->Scan(*scanState, *chunk); + if (!haveAbsentColumns || chunk->size() == 0) { + return; + } + for (auto it = columnMap.begin(); it!=columnMap.end(); ++it) { + if (*it != -1) { + continue; + } + chunk->data[it - columnMap.begin()].Initialize(false, chunk->size()); + for (idx_t j = 0; j < chunk->size(); j++) { + chunk->data[it - columnMap.begin()].SetValue(j, Value()); + } + } + } }; struct OrderedReadFunctionData : FunctionData { string orderBy; - vector> sets; vector files; - vector returnTypes; - vector names; + vector returnCols; unique_ptr Copy() const override { throw std::runtime_error("not implemented"); } @@ -45,9 +94,15 @@ namespace duckdb { }; }; + bool lt(const Value &left, const Value &right) { + return left.IsNull() || (!right.IsNull() && left < right); + } + bool le(const Value &left, const Value &right) { + return left.IsNull() || (!right.IsNull() && left <= right); + } - struct OrderedReadLocalState: LocalTableFunctionState { + struct OrderedReadLocalState: LocalTableFunctionState { vector> sets; vector winner_group; void RecalculateWinnerGroup() { @@ -56,11 +111,17 @@ namespace duckdb { return; } idx_t winner_idx = 0; + auto first_unordered = std::find_if(sets.begin(), sets.end(), + [&](const unique_ptr &s) { return s->orderByIdx == -1; }); + if (first_unordered != sets.end()) { + winner_group.push_back(first_unordered - sets.begin()); + return; + } for (idx_t i = 1; i < sets.size(); i++) { const auto &s = sets[i]; const auto &w = sets[winner_idx]; - if (s->chunk->GetValue(s->orderByIdx, s->result_idx) < - w->chunk->GetValue(w->orderByIdx, w->result_idx)) { + if (lt(s->chunk->GetValue(s->orderByIdx, s->result_idx), + w->chunk->GetValue(w->orderByIdx, w->result_idx))) { winner_idx = i; } } @@ -71,7 +132,7 @@ namespace duckdb { if (i == winner_idx) continue; auto &s = sets[i]; const auto &sFirst = s->chunk->GetValue(s->orderByIdx, s->result_idx); - if (sFirst <= wLast) { + if (le(sFirst, wLast)) { winner_group.push_back(i); } } @@ -84,6 +145,40 @@ namespace duckdb { } }; + static vector GetColumnsFromParquetSchemas(const vector>& sets) { + vector result; + for (auto &set : sets) { + const auto &schema = set->reader->metadata->metadata->schema; + for (auto it = schema.begin(); it != schema.end(); ++it) { + if (it->num_children > 0) { + continue; + } + auto type = ParquetTypesManager::get_logical_type(schema, it - schema.begin()); + auto existing_col = std::find_if(result.begin(), result.end(), + [it](const ReturnColumn &c) { return c.name == it->name; }); + if (existing_col == result.end()) { + result.push_back(ReturnColumn{it->name, type}); + continue; + } + if (existing_col->type != type) { + throw std::runtime_error("the files have incompatible schema"); + } + } + } + return result; + } + + + static void OpenParquetFiles(ClientContext &context, const vector& fileNames, + vector>& res) { + for (auto & file : fileNames) { + auto set = make_uniq(); + ParquetOptions po; + po.binary_as_string = true; + set->reader = make_uniq(context, file, po, nullptr); + res.push_back(move(set)); + } + } static unique_ptr OrderedParquetScanBind(ClientContext &context, TableFunctionBindInput &input, vector &return_types, vector &names) { @@ -98,48 +193,23 @@ namespace duckdb { string filename; MultiFileListScanData it; fileList.InitializeScan(it); - vector unglobbedFileList; while (fileList.Scan(it, filename)) { - unglobbedFileList.push_back(filename); + res->files.push_back(filename); } - if (unglobbedFileList.empty()) { - throw duckdb::InvalidInputException("No files matched the provided pattern."); + if (res->files.empty()) { + throw InvalidInputException("No files matched the provided pattern."); } + vector> sets; + OpenParquetFiles(context, res->files, sets); + + res->returnCols = GetColumnsFromParquetSchemas(sets); + std::transform(res->returnCols.begin(), res->returnCols.end(), std::back_inserter(names), + [](const ReturnColumn &c) { return c.name; }); + std::transform(res->returnCols.begin(), res->returnCols.end(), std::back_inserter(return_types), + [](const ReturnColumn &c) { return c.type; }); + res->orderBy = input.inputs[1].GetValue(); - for (auto & file : unglobbedFileList) { - auto set = make_uniq(); - res->files.push_back(file); - ParquetOptions po; - po.binary_as_string = true; - ParquetReader reader(context, file, po, nullptr); - set->columnMap = vector(); - for (auto it = reader.metadata->metadata->schema.begin(); - it!= reader.metadata->metadata->schema.end(); ++it) { - auto &el = *it; - if (el.num_children != 0) { - continue; - } - auto name_it = std::find(names.begin(), names.end(), el.name); - auto return_type = ParquetTypesManager::get_logical_type(reader.metadata->metadata->schema, - it - reader.metadata->metadata->schema.begin()); - set->columnMap.push_back(name_it - names.begin()); - if (el.name == res->orderBy) { - set->orderByIdx = name_it - names.begin(); - } - if (name_it != names.end()) { - if (return_types[name_it - names.begin()] != return_type) { - throw std::runtime_error("incompatible schema"); - } - continue; - } - return_types.push_back(return_type); - names.push_back(el.name); - } - res->sets.push_back(std::move(set)); - } - res->returnTypes = return_types; - res->names = names; return std::move(res); } @@ -147,38 +217,23 @@ namespace duckdb { ParquetScanInitLocal(ExecutionContext &context, TableFunctionInitInput &input, GlobalTableFunctionState *gstate_p) { auto res = make_uniq(); const auto &bindData = input.bind_data->Cast(); - ParquetOptions po; - po.binary_as_string = true; - for (int i = 0; i < bindData.files.size(); i++) { - auto set = make_uniq(); - set->reader = make_uniq(context.client, bindData.files[i], po, nullptr); + OpenParquetFiles(context.client, bindData.files, res->sets); + + for (auto &set : res->sets) { + set->populateColumnInfo(bindData.returnCols, bindData.orderBy); set->scanState = make_uniq(); - int j = 0; - for (auto &el : set->reader->metadata->metadata->schema) { - if (el.num_children != 0) { - continue; - } - set->reader->reader_data.column_ids.push_back(j); - j++; - } - set->columnMap = bindData.sets[i]->columnMap; - set->reader->reader_data.column_mapping = set->columnMap; vector rgs(set->reader->metadata->metadata->row_groups.size(), 0); for (idx_t i = 0; i < rgs.size(); i++) { rgs[i] = i; } set->reader->InitializeScan(context.client, *set->scanState, rgs); set->chunk = make_uniq(); - - set->orderByIdx = bindData.sets[i]->orderByIdx; set->result_idx = 0; auto ltypes = vector(); - for (const auto idx : set->columnMap) { - ltypes.push_back(bindData.returnTypes[idx]); - } + std::transform(bindData.returnCols.begin(), bindData.returnCols.end(), std::back_inserter(ltypes), + [](const ReturnColumn &c) { return c.type; }); set->chunk->Initialize(context.client, ltypes); - set->reader->Scan(*set->scanState, *set->chunk); - res->sets.push_back(std::move(set)); + set->Scan(); } res->RecalculateWinnerGroup(); return std::move(res); @@ -187,16 +242,13 @@ namespace duckdb { static void ParquetOrderedScanImplementation( ClientContext &context, duckdb::TableFunctionInput &data_p,DataChunk &output) { auto &loc_state = data_p.local_state->Cast(); - const auto &fieldNames = data_p.bind_data->Cast().names; - const auto &returnTypes = data_p.bind_data->Cast().returnTypes; + const auto &cols = data_p.bind_data->Cast().returnCols; bool toRecalc = false; for (int i = loc_state.sets.size() - 1; i >= 0 ; i--) { if (loc_state.sets[i]->result_idx >= loc_state.sets[i]->chunk->size()) { auto &set = loc_state.sets[i]; set->chunk->Reset(); - loc_state.sets[i]->reader->Scan( - *loc_state.sets[i]->scanState, - *loc_state.sets[i]->chunk); + loc_state.sets[i]->Scan(); loc_state.sets[i]->result_idx = 0; if (loc_state.sets[i]->chunk->size() == 0) { @@ -227,18 +279,17 @@ namespace duckdb { auto winnerSet = &loc_state.sets[loc_state.winner_group[0]]; Value winner_val = (*winnerSet)->chunk->GetValue( (*winnerSet)->orderByIdx, - (*winnerSet)->result_idx - ); + (*winnerSet)->result_idx); for (int k = 1; k < loc_state.winner_group.size(); k++) { const auto i = loc_state.winner_group[k]; const auto &set = loc_state.sets[i]; const Value &val = set->chunk->GetValue(set->orderByIdx, set->result_idx); - if (val < winner_val) { + if (lt(val, winner_val)) { winnerSet = &loc_state.sets[i]; winner_val = (*winnerSet)->chunk->GetValue(set->orderByIdx, set->result_idx); } } - for (int i = 0; i < fieldNames.size(); i++) { + for (int i = 0; i < cols.size(); i++) { const auto &val = (*winnerSet)->chunk->GetValue(i,(*winnerSet)->result_idx); output.SetValue(i, j, val); } diff --git a/chsql/src/parquet_types.cpp b/chsql/src/parquet_types.cpp index 6464adf..efdf9ca 100644 --- a/chsql/src/parquet_types.cpp +++ b/chsql/src/parquet_types.cpp @@ -1,242 +1,139 @@ #include "chsql_parquet_types.h" bool ParquetType::check_type(const duckdb::vector &schema, idx_t idx) { - auto &el = schema[idx]; - if (el.type != parquet_type) { - return false; - } - if (converted_type == -1) { - return !el.__isset.converted_type; - } - return int(el.converted_type) == converted_type; + auto &el = schema[idx]; + if (parquet_type >= 0 && int(el.type) != parquet_type) { + return false; + } + if (converted_type == -1) { + return !el.__isset.converted_type; + } + return int(el.converted_type) == converted_type; +}; + +duckdb::LogicalType ParquetType::get_logical_type(const duckdb_parquet::SchemaElement &schema) { + return logical_type; +} + +bool LogicalParquetType::check_type(const duckdb::vector &schema, idx_t idx) { + auto &el = schema[idx]; + return el.__isset.logicalType && this->get_isset(el); +} + +duckdb::LogicalType JSONParquetType::get_logical_type(const duckdb_parquet::SchemaElement &schema) { + return duckdb::LogicalType::JSON(); +} + +bool DecimalParquetType::check_type(const duckdb::vector &schema, idx_t idx) { + auto &el = schema[idx]; + return el.__isset.converted_type && el.converted_type == duckdb_parquet::ConvertedType::DECIMAL && + el.__isset.precision && el.__isset.scale && (el.precision > duckdb::DecimalType::MaxWidth() || + el.type == duckdb_parquet::Type::BYTE_ARRAY || + el.type == duckdb_parquet::Type::FIXED_LEN_BYTE_ARRAY || + el.type == duckdb_parquet::Type::INT32 || + el.type == duckdb_parquet::Type::INT64); +} + +duckdb::LogicalType DecimalParquetType::get_logical_type(const duckdb_parquet::SchemaElement &el) { + if (el.precision > duckdb::DecimalType::MaxWidth()) { + return duckdb::LogicalType::DOUBLE; + } + return duckdb::LogicalType::DECIMAL(el.precision, el.scale); +} + +bool isUUID(const duckdb_parquet::SchemaElement &el) { + return el.logicalType.__isset.UUID; +} + +bool isTimestampTZ(const duckdb_parquet::SchemaElement &el) { + return el.logicalType.__isset.TIMESTAMP && el.logicalType.TIMESTAMP.isAdjustedToUTC; +} + +bool isTimestampNS(const duckdb_parquet::SchemaElement &el) { + return el.logicalType.__isset.TIMESTAMP && el.logicalType.TIMESTAMP.unit.__isset.NANOS; +} + +bool isTimestamp(const duckdb_parquet::SchemaElement &el) { + return el.logicalType.__isset.TIMESTAMP; +} + +bool isTimeTZ(const duckdb_parquet::SchemaElement &el) { + return el.logicalType.__isset.TIME && el.logicalType.TIME.isAdjustedToUTC; +} + +bool isTime(const duckdb_parquet::SchemaElement &el) { + return el.logicalType.__isset.TIME; +} + +ParquetType *_types[] = { + new LogicalParquetType(isUUID, duckdb::LogicalType::UUID), + new LogicalParquetType(isTimestampTZ, duckdb::LogicalType::TIMESTAMP_TZ), + new LogicalParquetType(isTimestampNS, duckdb::LogicalType::TIMESTAMP_NS), + new LogicalParquetType(isTimestamp, duckdb::LogicalType::TIMESTAMP), + new LogicalParquetType(isTimeTZ, duckdb::LogicalType::TIME), + new LogicalParquetType(isTime, duckdb::LogicalType::TIME), + + new ParquetType(24, -1, duckdb::LogicalType::SQLNULL), + new ParquetType(duckdb_parquet::ConvertedType::INT_8, duckdb_parquet::Type::INT32, duckdb::LogicalType::TINYINT), + new ParquetType(duckdb_parquet::ConvertedType::INT_16, duckdb_parquet::Type::INT32, duckdb::LogicalType::SMALLINT), + new ParquetType(duckdb_parquet::ConvertedType::INT_32, duckdb_parquet::Type::INT32, duckdb::LogicalType::INTEGER), + new ParquetType(duckdb_parquet::ConvertedType::INT_64, duckdb_parquet::Type::INT64, duckdb::LogicalType::BIGINT), + new ParquetType(duckdb_parquet::ConvertedType::UINT_8, duckdb_parquet::Type::INT32, duckdb::LogicalType::UTINYINT), + new ParquetType(duckdb_parquet::ConvertedType::UINT_16, duckdb_parquet::Type::INT32, + duckdb::LogicalType::USMALLINT), + new ParquetType(duckdb_parquet::ConvertedType::UINT_32, duckdb_parquet::Type::INT32, duckdb::LogicalType::UINTEGER), + new ParquetType(duckdb_parquet::ConvertedType::UINT_64, duckdb_parquet::Type::INT64, duckdb::LogicalType::UBIGINT), + new ParquetType(duckdb_parquet::ConvertedType::DATE, duckdb_parquet::Type::INT32, duckdb::LogicalType::DATE), + new ParquetType(duckdb_parquet::ConvertedType::TIME_MICROS, duckdb_parquet::Type::INT64, duckdb::LogicalType::TIME), + new ParquetType(duckdb_parquet::ConvertedType::TIME_MILLIS, duckdb_parquet::Type::INT64, duckdb::LogicalType::TIME), + new ParquetType(duckdb_parquet::ConvertedType::TIMESTAMP_MILLIS, duckdb_parquet::Type::INT32, + duckdb::LogicalType::TIME), + new ParquetType(duckdb_parquet::ConvertedType::TIMESTAMP_MICROS, duckdb_parquet::Type::INT64, + duckdb::LogicalType::TIME), + new ParquetType(duckdb_parquet::ConvertedType::INTERVAL, -1, duckdb::LogicalType::INTERVAL), + new ParquetType(duckdb_parquet::ConvertedType::UTF8, duckdb_parquet::Type::BYTE_ARRAY, + duckdb::LogicalType::VARCHAR), + new ParquetType(duckdb_parquet::ConvertedType::ENUM, duckdb_parquet::Type::BYTE_ARRAY, + duckdb::LogicalType::VARCHAR), + new ParquetType(duckdb_parquet::ConvertedType::UTF8, duckdb_parquet::Type::FIXED_LEN_BYTE_ARRAY, + duckdb::LogicalType::VARCHAR), + new ParquetType(duckdb_parquet::ConvertedType::ENUM, duckdb_parquet::Type::FIXED_LEN_BYTE_ARRAY, + duckdb::LogicalType::VARCHAR), + + new JSONParquetType(), + new DecimalParquetType(), + + new ParquetType(-1, duckdb_parquet::Type::BOOLEAN, duckdb::LogicalType::BOOLEAN), + new ParquetType(-1, duckdb_parquet::Type::BOOLEAN, duckdb::LogicalType::BOOLEAN), + new ParquetType(-1, duckdb_parquet::Type::INT32, duckdb::LogicalType::INTEGER), + new ParquetType(-1, duckdb_parquet::Type::INT64, duckdb::LogicalType::BIGINT), + new ParquetType(-1, duckdb_parquet::Type::INT96, duckdb::LogicalType::TIMESTAMP), + new ParquetType(-1, duckdb_parquet::Type::FLOAT, duckdb::LogicalType::FLOAT), + new ParquetType(-1, duckdb_parquet::Type::DOUBLE, duckdb::LogicalType::DOUBLE), + new ParquetType(-1, duckdb_parquet::Type::BYTE_ARRAY, duckdb::LogicalType::BLOB), + new ParquetType(-1, duckdb_parquet::Type::FIXED_LEN_BYTE_ARRAY, duckdb::LogicalType::BLOB), }; ParquetTypesManager::ParquetTypesManager() { - /*UTF8 = 0, - MAP = 1, -> no support - MAP_KEY_VALUE = 2, -> no support - LIST = 3, -> no support - ENUM = 4, -> no support - DECIMAL = 5, -> no support - DATE = 6, - TIME_MILLIS = 7 - TIME_MICROS = 8 - TIMESTAMP_MILLIS = 9, - TIMESTAMP_MICROS = 10, - UINT_8 = 11, - UINT_16 = 12, - UINT_32 = 13, - UINT_64 = 14, - INT_8 = 15, - INT_16 = 16, - INT_32 = 17, - INT_64 = 18, - JSON = 19, -> no support - BSON = 20, -> no support - INTERVAL = 21 -> no support - */ - types.push_back(ParquetType(duckdb_parquet::ConvertedType::UTF8, duckdb_parquet::Type::BYTE_ARRAY, duckdb::LogicalType::VARCHAR)); - types.push_back(ParquetType(duckdb_parquet::ConvertedType::DATE, duckdb_parquet::Type::INT32, duckdb::LogicalType::DATE)); - types.push_back(ParquetType(duckdb_parquet::ConvertedType::TIME_MILLIS, duckdb_parquet::Type::INT32, duckdb::LogicalType::TIME)); - types.push_back(ParquetType(duckdb_parquet::ConvertedType::TIME_MICROS, duckdb_parquet::Type::INT64, duckdb::LogicalType::TIME)); - types.push_back(ParquetType(duckdb_parquet::ConvertedType::TIMESTAMP_MILLIS, duckdb_parquet::Type::INT64, duckdb::LogicalType::TIMESTAMP)); - types.push_back(ParquetType(duckdb_parquet::ConvertedType::TIMESTAMP_MICROS, duckdb_parquet::Type::INT64, duckdb::LogicalType::TIMESTAMP)); - types.push_back(ParquetType(duckdb_parquet::ConvertedType::UINT_8, duckdb_parquet::Type::INT32, duckdb::LogicalType::UTINYINT)); - types.push_back(ParquetType(duckdb_parquet::ConvertedType::UINT_16, duckdb_parquet::Type::INT32, duckdb::LogicalType::USMALLINT)); - types.push_back(ParquetType(duckdb_parquet::ConvertedType::UINT_32, duckdb_parquet::Type::INT32, duckdb::LogicalType::UINTEGER)); - types.push_back(ParquetType(duckdb_parquet::ConvertedType::UINT_64, duckdb_parquet::Type::INT64, duckdb::LogicalType::UBIGINT)); - types.push_back(ParquetType(duckdb_parquet::ConvertedType::INT_8, duckdb_parquet::Type::INT32, duckdb::LogicalType::TINYINT)); - types.push_back(ParquetType(duckdb_parquet::ConvertedType::INT_16, duckdb_parquet::Type::INT32, duckdb::LogicalType::SMALLINT)); - types.push_back(ParquetType(duckdb_parquet::ConvertedType::INT_32, duckdb_parquet::Type::INT32, duckdb::LogicalType::INTEGER)); - types.push_back(ParquetType(duckdb_parquet::ConvertedType::INT_64, duckdb_parquet::Type::INT64, duckdb::LogicalType::BIGINT)); }; ParquetTypesManager *ParquetTypesManager::instance = nullptr; std::mutex ParquetTypesManager::instance_mutex; ParquetTypesManager *ParquetTypesManager::get_instance() { - std::lock_guard lock(instance_mutex); - if (instance == nullptr) { - instance = new ParquetTypesManager(); - } - return instance; -} - -duckdb::LogicalType ParquetTypesManager::get_logical_type(const duckdb::vector &schema, idx_t idx) { - auto inst = get_instance(); - return inst->derive_logical_type(schema[idx], false); - /*for (auto it = inst->types.begin(); it != inst->types.end(); ++it) { - if (it->check_type(schema, idx)) { - return it->logical_type; - } - } - throw std::runtime_error("Unsupported Parquet type");*/ + std::lock_guard lock(instance_mutex); + if (instance == nullptr) { + instance = new ParquetTypesManager(); + } + return instance; } -duckdb::LogicalType ParquetTypesManager::derive_logical_type(const duckdb_parquet::SchemaElement &s_ele, bool binary_as_string) { - // inner node - if (s_ele.type == duckdb_parquet::Type::FIXED_LEN_BYTE_ARRAY && !s_ele.__isset.type_length) { - throw duckdb::IOException("FIXED_LEN_BYTE_ARRAY requires length to be set"); - } - if (s_ele.__isset.logicalType) { - if (s_ele.logicalType.__isset.UUID) { - if (s_ele.type == duckdb_parquet::Type::FIXED_LEN_BYTE_ARRAY) { - return duckdb::LogicalType::UUID; - } - } else if (s_ele.logicalType.__isset.TIMESTAMP) { - if (s_ele.logicalType.TIMESTAMP.isAdjustedToUTC) { - return duckdb::LogicalType::TIMESTAMP_TZ; - } else if (s_ele.logicalType.TIMESTAMP.unit.__isset.NANOS) { - return duckdb::LogicalType::TIMESTAMP_NS; - } - return duckdb::LogicalType::TIMESTAMP; - } else if (s_ele.logicalType.__isset.TIME) { - if (s_ele.logicalType.TIME.isAdjustedToUTC) { - return duckdb::LogicalType::TIME_TZ; - } - return duckdb::LogicalType::TIME; - } - } - if (s_ele.__isset.converted_type) { - // Legacy NULL type, does no longer exist, but files are still around of course - if (static_cast(s_ele.converted_type) == 24) { - return duckdb::LogicalTypeId::SQLNULL; - } - switch (s_ele.converted_type) { - case duckdb_parquet::ConvertedType::INT_8: - if (s_ele.type == duckdb_parquet::Type::INT32) { - return duckdb::LogicalType::TINYINT; - } else { - throw duckdb::IOException("INT8 converted type can only be set for value of Type::INT32"); - } - case duckdb_parquet::ConvertedType::INT_16: - if (s_ele.type == duckdb_parquet::Type::INT32) { - return duckdb::LogicalType::SMALLINT; - } else { - throw duckdb::IOException("INT16 converted type can only be set for value of Type::INT32"); - } - case duckdb_parquet::ConvertedType::INT_32: - if (s_ele.type == duckdb_parquet::Type::INT32) { - return duckdb::LogicalType::INTEGER; - } else { - throw duckdb::IOException("INT32 converted type can only be set for value of Type::INT32"); - } - case duckdb_parquet::ConvertedType::INT_64: - if (s_ele.type == duckdb_parquet::Type::INT64) { - return duckdb::LogicalType::BIGINT; - } else { - throw duckdb::IOException("INT64 converted type can only be set for value of Type::INT32"); - } - case duckdb_parquet::ConvertedType::UINT_8: - if (s_ele.type == duckdb_parquet::Type::INT32) { - return duckdb::LogicalType::UTINYINT; - } else { - throw duckdb::IOException("UINT8 converted type can only be set for value of Type::INT32"); - } - case duckdb_parquet::ConvertedType::UINT_16: - if (s_ele.type == duckdb_parquet::Type::INT32) { - return duckdb::LogicalType::USMALLINT; - } else { - throw duckdb::IOException("UINT16 converted type can only be set for value of Type::INT32"); - } - case duckdb_parquet::ConvertedType::UINT_32: - if (s_ele.type == duckdb_parquet::Type::INT32) { - return duckdb::LogicalType::UINTEGER; - } else { - throw duckdb::IOException("UINT32 converted type can only be set for value of Type::INT32"); - } - case duckdb_parquet::ConvertedType::UINT_64: - if (s_ele.type == duckdb_parquet::Type::INT64) { - return duckdb::LogicalType::UBIGINT; - } else { - throw duckdb::IOException("UINT64 converted type can only be set for value of Type::INT64"); - } - case duckdb_parquet::ConvertedType::DATE: - if (s_ele.type == duckdb_parquet::Type::INT32) { - return duckdb::LogicalType::DATE; - } else { - throw duckdb::IOException("DATE converted type can only be set for value of Type::INT32"); - } - case duckdb_parquet::ConvertedType::TIMESTAMP_MICROS: - case duckdb_parquet::ConvertedType::TIMESTAMP_MILLIS: - if (s_ele.type == duckdb_parquet::Type::INT64) { - return duckdb::LogicalType::TIMESTAMP; - } else { - throw duckdb::IOException("TIMESTAMP converted type can only be set for value of Type::INT64"); - } - case duckdb_parquet::ConvertedType::DECIMAL: - if (!s_ele.__isset.precision || !s_ele.__isset.scale) { - throw duckdb::IOException("DECIMAL requires a length and scale specifier!"); - } - if (s_ele.precision > duckdb::DecimalType::MaxWidth()) { - return duckdb::LogicalType::DOUBLE; - } - switch (s_ele.type) { - case duckdb_parquet::Type::BYTE_ARRAY: - case duckdb_parquet::Type::FIXED_LEN_BYTE_ARRAY: - case duckdb_parquet::Type::INT32: - case duckdb_parquet::Type::INT64: - return duckdb::LogicalType::DECIMAL(s_ele.precision, s_ele.scale); - default: - throw duckdb::IOException( - "DECIMAL converted type can only be set for value of Type::(FIXED_LEN_)BYTE_ARRAY/INT32/INT64"); - } - case duckdb_parquet::ConvertedType::UTF8: - case duckdb_parquet::ConvertedType::ENUM: - switch (s_ele.type) { - case duckdb_parquet::Type::BYTE_ARRAY: - case duckdb_parquet::Type::FIXED_LEN_BYTE_ARRAY: - return duckdb::LogicalType::VARCHAR; - default: - throw duckdb::IOException("UTF8 converted type can only be set for Type::(FIXED_LEN_)BYTE_ARRAY"); - } - case duckdb_parquet::ConvertedType::TIME_MILLIS: - if (s_ele.type == duckdb_parquet::Type::INT32) { - return duckdb::LogicalType::TIME; - } else { - throw duckdb::IOException("TIME_MILLIS converted type can only be set for value of Type::INT32"); - } - case duckdb_parquet::ConvertedType::TIME_MICROS: - if (s_ele.type == duckdb_parquet::Type::INT64) { - return duckdb::LogicalType::TIME; - } else { - throw duckdb::IOException("TIME_MICROS converted type can only be set for value of Type::INT64"); - } - case duckdb_parquet::ConvertedType::INTERVAL: - return duckdb::LogicalType::INTERVAL; - case duckdb_parquet::ConvertedType::JSON: - return duckdb::LogicalType::JSON(); - case duckdb_parquet::ConvertedType::MAP: - case duckdb_parquet::ConvertedType::MAP_KEY_VALUE: - case duckdb_parquet::ConvertedType::LIST: - case duckdb_parquet::ConvertedType::BSON: - default: - throw duckdb::IOException("Unsupported converted type (%d)", (int32_t)s_ele.converted_type); - } - } else { - // no converted type set - // use default type for each physical type - switch (s_ele.type) { - case duckdb_parquet::Type::BOOLEAN: - return duckdb::LogicalType::BOOLEAN; - case duckdb_parquet::Type::INT32: - return duckdb::LogicalType::INTEGER; - case duckdb_parquet::Type::INT64: - return duckdb::LogicalType::BIGINT; - case duckdb_parquet::Type::INT96: // always a timestamp it would seem - return duckdb::LogicalType::TIMESTAMP; - case duckdb_parquet::Type::FLOAT: - return duckdb::LogicalType::FLOAT; - case duckdb_parquet::Type::DOUBLE: - return duckdb::LogicalType::DOUBLE; - case duckdb_parquet::Type::BYTE_ARRAY: - case duckdb_parquet::Type::FIXED_LEN_BYTE_ARRAY: - if (binary_as_string) { - return duckdb::LogicalType::VARCHAR; - } - return duckdb::LogicalType::BLOB; - default: - return duckdb::LogicalType::INVALID; +duckdb::LogicalType ParquetTypesManager::get_logical_type(const duckdb::vector &schema, + idx_t idx) { + for (auto &type: _types) { + if (type->check_type(schema, idx)) { + return type->get_logical_type(schema[idx]); } } + throw std::runtime_error("Unsupported Parquet type"); } From 7466a3531b7b6855e2a346cb4543504e530a8c5e Mon Sep 17 00:00:00 2001 From: akvlad Date: Mon, 17 Mar 2025 18:17:17 +0200 Subject: [PATCH 3/7] update deps --- duckdb | 2 +- extension-ci-tools | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/duckdb b/duckdb index 1a3d614..8e52ec4 160000 --- a/duckdb +++ b/duckdb @@ -1 +1 @@ -Subproject commit 1a3d614f0eec5a2198af8ba4ea06eb9adee9d5f8 +Subproject commit 8e52ec43959ab363643d63cb78ee214577111da4 diff --git a/extension-ci-tools b/extension-ci-tools index 916d4ef..58970c5 160000 --- a/extension-ci-tools +++ b/extension-ci-tools @@ -1 +1 @@ -Subproject commit 916d4ef4371068ca98a007378b52582c3e46b4e5 +Subproject commit 58970c538d35919db875096460c05806056f4de0 From b845ff4faf5590ea84295b5d362857604b73ccf9 Mon Sep 17 00:00:00 2001 From: akvlad Date: Mon, 17 Mar 2025 18:44:47 +0200 Subject: [PATCH 4/7] debug --- chsql/src/parquet_ordered_scan.cpp | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/chsql/src/parquet_ordered_scan.cpp b/chsql/src/parquet_ordered_scan.cpp index a9715f7..3b720eb 100644 --- a/chsql/src/parquet_ordered_scan.cpp +++ b/chsql/src/parquet_ordered_scan.cpp @@ -36,8 +36,10 @@ namespace duckdb { continue; } columnMap.push_back(schema_column - reader->metadata->metadata->schema.begin() - 1); - reader->reader_data.column_ids.push_back(schema_column - reader->metadata->metadata->schema.begin() - 1); - reader->reader_data.column_mapping.push_back(it - returnCols.begin()); + reader->reader_data.column_ids.push_back( + MultiFileLocalColumnId(schema_column - reader->metadata->metadata->schema.begin() - 1)); + reader->reader_data.column_mapping.push_back( + MultiFileGlobalIndex(it - returnCols.begin())); } auto order_by_column_it = find_if( reader->metadata->metadata->schema.begin(), @@ -49,9 +51,9 @@ namespace duckdb { orderByIdx = order_by_column_it - reader->metadata->metadata->schema.begin() - 1; } } - void Scan() { + void Scan(ClientContext& ctx) { chunk->Reset(); - reader->Scan(*scanState, *chunk); + reader->Scan(ctx, *scanState, *chunk); if (!haveAbsentColumns || chunk->size() == 0) { return; } @@ -233,7 +235,7 @@ namespace duckdb { std::transform(bindData.returnCols.begin(), bindData.returnCols.end(), std::back_inserter(ltypes), [](const ReturnColumn &c) { return c.type; }); set->chunk->Initialize(context.client, ltypes); - set->Scan(); + set->Scan(context.client); } res->RecalculateWinnerGroup(); return std::move(res); @@ -248,7 +250,7 @@ namespace duckdb { if (loc_state.sets[i]->result_idx >= loc_state.sets[i]->chunk->size()) { auto &set = loc_state.sets[i]; set->chunk->Reset(); - loc_state.sets[i]->Scan(); + loc_state.sets[i]->Scan(context); loc_state.sets[i]->result_idx = 0; if (loc_state.sets[i]->chunk->size() == 0) { From f32274efc7bff7507560e87a90853e41cd16ed58 Mon Sep 17 00:00:00 2001 From: lmangani <> Date: Mon, 17 Mar 2025 17:31:20 +0000 Subject: [PATCH 5/7] resync submodules --- duckdb | 2 +- extension-ci-tools | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/duckdb b/duckdb index 8e52ec4..7c0f857 160000 --- a/duckdb +++ b/duckdb @@ -1 +1 @@ -Subproject commit 8e52ec43959ab363643d63cb78ee214577111da4 +Subproject commit 7c0f8574bda9af7aa5b23166d7860d68ae3b9481 diff --git a/extension-ci-tools b/extension-ci-tools index 58970c5..00e6af0 160000 --- a/extension-ci-tools +++ b/extension-ci-tools @@ -1 +1 @@ -Subproject commit 58970c538d35919db875096460c05806056f4de0 +Subproject commit 00e6af068429bf776a54f67cb1cd1ff5370a8dd7 From 2c33917468b282cb3013e61b693218c73cff23d9 Mon Sep 17 00:00:00 2001 From: lmangani <> Date: Mon, 17 Mar 2025 17:48:41 +0000 Subject: [PATCH 6/7] update action --- .github/workflows/MainDistributionPipeline.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/MainDistributionPipeline.yml b/.github/workflows/MainDistributionPipeline.yml index 0cf5d44..a6c77d9 100644 --- a/.github/workflows/MainDistributionPipeline.yml +++ b/.github/workflows/MainDistributionPipeline.yml @@ -25,9 +25,9 @@ jobs: duckdb-stable-build: name: Build extension binaries - uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@v1.1.3 + uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@v1.2.1 with: - duckdb_version: v1.1.3 - ci_tools_version: v1.1.3 + duckdb_version: v1.2.1 + ci_tools_version: v1.2.1 extension_name: chsql From ef1779b86f1ad53042345169a2f6c5f4bc01bbd3 Mon Sep 17 00:00:00 2001 From: akvlad Date: Mon, 17 Mar 2025 20:55:28 +0200 Subject: [PATCH 7/7] fix main build --- .github/workflows/MainDistributionPipeline.yml | 15 ++++++++------- chsql/CMakeLists.txt | 1 + chsql/extension_config.cmake | 2 +- chsql/src/include/chsql_parquet_types.h | 1 + chsql/src/parquet_ordered_scan.cpp | 6 +++--- extension-ci-tools | 2 +- 6 files changed, 15 insertions(+), 12 deletions(-) diff --git a/.github/workflows/MainDistributionPipeline.yml b/.github/workflows/MainDistributionPipeline.yml index a6c77d9..5b1fddb 100644 --- a/.github/workflows/MainDistributionPipeline.yml +++ b/.github/workflows/MainDistributionPipeline.yml @@ -15,13 +15,14 @@ concurrency: cancel-in-progress: true jobs: - duckdb-next-build: - name: Build extension binaries (next) - uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@main - with: - duckdb_version: main - ci_tools_version: main - extension_name: chsql +# Temporarily disabled because main is broken +# duckdb-next-build: +# name: Build extension binaries (next) +# uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@main +# with: +# duckdb_version: 1.1.2 +# ci_tools_version: 1.1.2 +# extension_name: chsql duckdb-stable-build: name: Build extension binaries diff --git a/chsql/CMakeLists.txt b/chsql/CMakeLists.txt index 93ce21b..a3375c5 100644 --- a/chsql/CMakeLists.txt +++ b/chsql/CMakeLists.txt @@ -7,6 +7,7 @@ set(TARGET_NAME chsql) find_package(OpenSSL REQUIRED) set(EXTENSION_NAME ${TARGET_NAME}_extension) set(LOADABLE_EXTENSION_NAME ${TARGET_NAME}_loadable_extension) +set(CHSQL_DUCKDB_VERSION ${DUCKDB_MAJOR_VERSION}) project(${TARGET_NAME}) include_directories( diff --git a/chsql/extension_config.cmake b/chsql/extension_config.cmake index 776f13b..4ec1a10 100644 --- a/chsql/extension_config.cmake +++ b/chsql/extension_config.cmake @@ -1,5 +1,5 @@ # This file is included by DuckDB's build system. It specifies which extension to load - +set(CHSQL_DUCKDB_VERSION ${DUCKDB_MAJOR_VERSION}) include_directories( ./src/include ${CMAKE_CURRENT_SOURCE_DIR}/../duckdb/extension/parquet/include diff --git a/chsql/src/include/chsql_parquet_types.h b/chsql/src/include/chsql_parquet_types.h index 870d1ba..0e44d11 100644 --- a/chsql/src/include/chsql_parquet_types.h +++ b/chsql/src/include/chsql_parquet_types.h @@ -5,6 +5,7 @@ #ifndef PARQUET_TYPES_H #define PARQUET_TYPES_H + #include "duckdb.hpp" #include diff --git a/chsql/src/parquet_ordered_scan.cpp b/chsql/src/parquet_ordered_scan.cpp index 3b720eb..65c3402 100644 --- a/chsql/src/parquet_ordered_scan.cpp +++ b/chsql/src/parquet_ordered_scan.cpp @@ -37,9 +37,9 @@ namespace duckdb { } columnMap.push_back(schema_column - reader->metadata->metadata->schema.begin() - 1); reader->reader_data.column_ids.push_back( - MultiFileLocalColumnId(schema_column - reader->metadata->metadata->schema.begin() - 1)); + schema_column - reader->metadata->metadata->schema.begin() - 1); reader->reader_data.column_mapping.push_back( - MultiFileGlobalIndex(it - returnCols.begin())); + it - returnCols.begin()); } auto order_by_column_it = find_if( reader->metadata->metadata->schema.begin(), @@ -53,7 +53,7 @@ namespace duckdb { } void Scan(ClientContext& ctx) { chunk->Reset(); - reader->Scan(ctx, *scanState, *chunk); + reader->Scan(*scanState, *chunk); if (!haveAbsentColumns || chunk->size() == 0) { return; } diff --git a/extension-ci-tools b/extension-ci-tools index 00e6af0..58970c5 160000 --- a/extension-ci-tools +++ b/extension-ci-tools @@ -1 +1 @@ -Subproject commit 00e6af068429bf776a54f67cb1cd1ff5370a8dd7 +Subproject commit 58970c538d35919db875096460c05806056f4de0