diff --git a/.github/workflows/MainDistributionPipeline.yml b/.github/workflows/MainDistributionPipeline.yml index 0cf5d44..5b1fddb 100644 --- a/.github/workflows/MainDistributionPipeline.yml +++ b/.github/workflows/MainDistributionPipeline.yml @@ -15,19 +15,20 @@ concurrency: cancel-in-progress: true jobs: - duckdb-next-build: - name: Build extension binaries (next) - uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@main - with: - duckdb_version: main - ci_tools_version: main - extension_name: chsql +# Temporarily disabled because main is broken +# duckdb-next-build: +# name: Build extension binaries (next) +# uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@main +# with: +# duckdb_version: 1.1.2 +# ci_tools_version: 1.1.2 +# extension_name: chsql duckdb-stable-build: name: Build extension binaries - uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@v1.1.3 + uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@v1.2.1 with: - duckdb_version: v1.1.3 - ci_tools_version: v1.1.3 + duckdb_version: v1.2.1 + ci_tools_version: v1.2.1 extension_name: chsql diff --git a/chsql/CMakeLists.txt b/chsql/CMakeLists.txt index 5460e12..a3375c5 100644 --- a/chsql/CMakeLists.txt +++ b/chsql/CMakeLists.txt @@ -7,6 +7,7 @@ set(TARGET_NAME chsql) find_package(OpenSSL REQUIRED) set(EXTENSION_NAME ${TARGET_NAME}_extension) set(LOADABLE_EXTENSION_NAME ${TARGET_NAME}_loadable_extension) +set(CHSQL_DUCKDB_VERSION ${DUCKDB_MAJOR_VERSION}) project(${TARGET_NAME}) include_directories( @@ -21,7 +22,7 @@ include_directories( ../duckdb/third_party/mbedtls ../duckdb/third_party/mbedtls/include ../duckdb/third_party/brotli/include) -set(EXTENSION_SOURCES src/chsql_extension.cpp src/duck_flock.cpp src/chsql_system.cpp) +set(EXTENSION_SOURCES src/chsql_extension.cpp src/duck_flock.cpp src/chsql_system.cpp src/parquet_types.cpp) build_static_extension(${TARGET_NAME} ${EXTENSION_SOURCES}) build_loadable_extension(${TARGET_NAME} " " ${EXTENSION_SOURCES}) # Link OpenSSL in both the static library as the loadable extension diff --git a/chsql/extension_config.cmake b/chsql/extension_config.cmake index 776f13b..4ec1a10 100644 --- a/chsql/extension_config.cmake +++ b/chsql/extension_config.cmake @@ -1,5 +1,5 @@ # This file is included by DuckDB's build system. It specifies which extension to load - +set(CHSQL_DUCKDB_VERSION ${DUCKDB_MAJOR_VERSION}) include_directories( ./src/include ${CMAKE_CURRENT_SOURCE_DIR}/../duckdb/extension/parquet/include diff --git a/chsql/src/include/chsql_parquet_types.h b/chsql/src/include/chsql_parquet_types.h new file mode 100644 index 0000000..0e44d11 --- /dev/null +++ b/chsql/src/include/chsql_parquet_types.h @@ -0,0 +1,55 @@ +// +// Created by hromozeka on 10.03.25. +// + +#ifndef PARQUET_TYPES_H +#define PARQUET_TYPES_H + + +#include "duckdb.hpp" +#include + +struct ParquetType { + /*duckdb_parquet::ConvertedType::type -> replaced to int to support -1 nodata value*/ + int converted_type; + /* duckdb_parquet::Type::type -> replaced to int to support -1 for no matter value */ + int parquet_type; + const duckdb::LogicalType logical_type; + ParquetType(int converted_type, int parquet_type, const duckdb::LogicalType &logical_type) + : converted_type(converted_type), parquet_type(parquet_type), logical_type(logical_type) {} + virtual bool check_type(const duckdb::vector &schema, idx_t idx); + virtual duckdb::LogicalType get_logical_type(const duckdb_parquet::SchemaElement &schema); +}; + +struct LogicalParquetType : public ParquetType { + bool (*get_isset)(const duckdb_parquet::SchemaElement& el); + + LogicalParquetType(bool (*get_isset) (const duckdb_parquet::SchemaElement& el), + const duckdb::LogicalType& logical_type) + : ParquetType(-1, duckdb_parquet::Type::type::INT32, logical_type), get_isset(get_isset) {} + bool check_type(const duckdb::vector &schema, idx_t idx) override; +}; + +struct JSONParquetType : public ParquetType { + JSONParquetType(): ParquetType(duckdb_parquet::ConvertedType::JSON, -1, duckdb::LogicalType::SQLNULL) {} + duckdb::LogicalType get_logical_type(const duckdb_parquet::SchemaElement &schema) override; +}; + +struct DecimalParquetType : public ParquetType { + DecimalParquetType(): ParquetType(-1, duckdb_parquet::Type::type::INT32, duckdb::LogicalType::SQLNULL) {} + bool check_type(const duckdb::vector &schema, idx_t idx) override; + duckdb::LogicalType get_logical_type(const duckdb_parquet::SchemaElement &schema) override; +}; + +class ParquetTypesManager { + protected: + static ParquetTypesManager *instance; + static std::mutex instance_mutex; + ParquetTypesManager(); + static ParquetTypesManager* get_instance(); + duckdb::LogicalType derive_logical_type(const duckdb_parquet::SchemaElement &s_ele, bool binary_as_string); + public: + static duckdb::LogicalType get_logical_type(const duckdb::vector &schema, idx_t idx); +}; + +#endif //PARQUET_TYPES_H diff --git a/chsql/src/parquet_ordered_scan.cpp b/chsql/src/parquet_ordered_scan.cpp index 88585c4..65c3402 100644 --- a/chsql/src/parquet_ordered_scan.cpp +++ b/chsql/src/parquet_ordered_scan.cpp @@ -3,24 +3,76 @@ #include #include "chsql_extension.hpp" #include +#include "chsql_parquet_types.h" namespace duckdb { + struct ReturnColumn { + string name; + LogicalType type; + }; + struct ReaderSet { unique_ptr reader; - idx_t orderByIdx; + vector returnColumns; + int64_t orderByIdx; unique_ptr chunk; unique_ptr scanState; - vector columnMap; + vector columnMap; idx_t result_idx; + bool haveAbsentColumns; + void populateColumnInfo(const vector& returnCols, const string& order_by_column) { + this->returnColumns = returnCols; + columnMap.clear(); + haveAbsentColumns = false; + for (auto it = returnCols.begin(); it!= returnCols.end(); ++it) { + auto schema_column = find_if( + reader->metadata->metadata->schema.begin(), + reader->metadata->metadata->schema.end(), + [&](const SchemaElement& column) { return column.name == it->name; }); + if (schema_column == reader->metadata->metadata->schema.end()) { + columnMap.push_back(-1); + haveAbsentColumns = true; + continue; + } + columnMap.push_back(schema_column - reader->metadata->metadata->schema.begin() - 1); + reader->reader_data.column_ids.push_back( + schema_column - reader->metadata->metadata->schema.begin() - 1); + reader->reader_data.column_mapping.push_back( + it - returnCols.begin()); + } + auto order_by_column_it = find_if( + reader->metadata->metadata->schema.begin(), + reader->metadata->metadata->schema.end(), + [&](const SchemaElement& column) { return column.name == order_by_column; }); + if (order_by_column_it == reader->metadata->metadata->schema.end()) { + orderByIdx = -1; + } else { + orderByIdx = order_by_column_it - reader->metadata->metadata->schema.begin() - 1; + } + } + void Scan(ClientContext& ctx) { + chunk->Reset(); + reader->Scan(*scanState, *chunk); + if (!haveAbsentColumns || chunk->size() == 0) { + return; + } + for (auto it = columnMap.begin(); it!=columnMap.end(); ++it) { + if (*it != -1) { + continue; + } + chunk->data[it - columnMap.begin()].Initialize(false, chunk->size()); + for (idx_t j = 0; j < chunk->size(); j++) { + chunk->data[it - columnMap.begin()].SetValue(j, Value()); + } + } + } }; struct OrderedReadFunctionData : FunctionData { string orderBy; - vector> sets; vector files; - vector returnTypes; - vector names; + vector returnCols; unique_ptr Copy() const override { throw std::runtime_error("not implemented"); } @@ -44,9 +96,15 @@ namespace duckdb { }; }; + bool lt(const Value &left, const Value &right) { + return left.IsNull() || (!right.IsNull() && left < right); + } + bool le(const Value &left, const Value &right) { + return left.IsNull() || (!right.IsNull() && left <= right); + } - struct OrderedReadLocalState: LocalTableFunctionState { + struct OrderedReadLocalState: LocalTableFunctionState { vector> sets; vector winner_group; void RecalculateWinnerGroup() { @@ -55,11 +113,17 @@ namespace duckdb { return; } idx_t winner_idx = 0; + auto first_unordered = std::find_if(sets.begin(), sets.end(), + [&](const unique_ptr &s) { return s->orderByIdx == -1; }); + if (first_unordered != sets.end()) { + winner_group.push_back(first_unordered - sets.begin()); + return; + } for (idx_t i = 1; i < sets.size(); i++) { const auto &s = sets[i]; const auto &w = sets[winner_idx]; - if (s->chunk->GetValue(s->orderByIdx, s->result_idx) < - w->chunk->GetValue(w->orderByIdx, w->result_idx)) { + if (lt(s->chunk->GetValue(s->orderByIdx, s->result_idx), + w->chunk->GetValue(w->orderByIdx, w->result_idx))) { winner_idx = i; } } @@ -70,7 +134,7 @@ namespace duckdb { if (i == winner_idx) continue; auto &s = sets[i]; const auto &sFirst = s->chunk->GetValue(s->orderByIdx, s->result_idx); - if (sFirst <= wLast) { + if (le(sFirst, wLast)) { winner_group.push_back(i); } } @@ -83,6 +147,40 @@ namespace duckdb { } }; + static vector GetColumnsFromParquetSchemas(const vector>& sets) { + vector result; + for (auto &set : sets) { + const auto &schema = set->reader->metadata->metadata->schema; + for (auto it = schema.begin(); it != schema.end(); ++it) { + if (it->num_children > 0) { + continue; + } + auto type = ParquetTypesManager::get_logical_type(schema, it - schema.begin()); + auto existing_col = std::find_if(result.begin(), result.end(), + [it](const ReturnColumn &c) { return c.name == it->name; }); + if (existing_col == result.end()) { + result.push_back(ReturnColumn{it->name, type}); + continue; + } + if (existing_col->type != type) { + throw std::runtime_error("the files have incompatible schema"); + } + } + } + return result; + } + + + static void OpenParquetFiles(ClientContext &context, const vector& fileNames, + vector>& res) { + for (auto & file : fileNames) { + auto set = make_uniq(); + ParquetOptions po; + po.binary_as_string = true; + set->reader = make_uniq(context, file, po, nullptr); + res.push_back(move(set)); + } + } static unique_ptr OrderedParquetScanBind(ClientContext &context, TableFunctionBindInput &input, vector &return_types, vector &names) { @@ -97,69 +195,23 @@ namespace duckdb { string filename; MultiFileListScanData it; fileList.InitializeScan(it); - vector unglobbedFileList; while (fileList.Scan(it, filename)) { - unglobbedFileList.push_back(filename); + res->files.push_back(filename); } - if (unglobbedFileList.empty()) { - throw duckdb::InvalidInputException("No files matched the provided pattern."); + if (res->files.empty()) { + throw InvalidInputException("No files matched the provided pattern."); } + vector> sets; + OpenParquetFiles(context, res->files, sets); + + res->returnCols = GetColumnsFromParquetSchemas(sets); + std::transform(res->returnCols.begin(), res->returnCols.end(), std::back_inserter(names), + [](const ReturnColumn &c) { return c.name; }); + std::transform(res->returnCols.begin(), res->returnCols.end(), std::back_inserter(return_types), + [](const ReturnColumn &c) { return c.type; }); + res->orderBy = input.inputs[1].GetValue(); - for (auto & file : unglobbedFileList) { - auto set = make_uniq(); - res->files.push_back(file); - ParquetOptions po; - po.binary_as_string = true; - ParquetReader reader(context, file, po, nullptr); - set->columnMap = vector(); - for (auto &el : reader.metadata->metadata->schema) { - if (el.num_children != 0) { - continue; - } - auto name_it = std::find(names.begin(), names.end(), el.name); - auto return_type = LogicalType::ANY; - switch (el.type) { - case Type::INT32: - return_type = LogicalType::INTEGER; - break; - case Type::INT64: - return_type = LogicalType::BIGINT; - break; - case Type::DOUBLE: - return_type = LogicalType::DOUBLE; - break; - case Type::FLOAT: - return_type = LogicalType::FLOAT; - break; - case Type::BYTE_ARRAY: - return_type = LogicalType::VARCHAR; - case Type::FIXED_LEN_BYTE_ARRAY: - return_type = LogicalType::VARCHAR; - break; - case Type::BOOLEAN: - return_type = LogicalType::TINYINT; - break; - default: - break;; - } - set->columnMap.push_back(name_it - names.begin()); - if (el.name == res->orderBy) { - set->orderByIdx = name_it - names.begin(); - } - if (name_it != names.end()) { - if (return_types[name_it - names.begin()] != return_type) { - throw std::runtime_error("incompatible schema"); - } - continue; - } - return_types.push_back(return_type); - names.push_back(el.name); - } - res->sets.push_back(std::move(set)); - } - res->returnTypes = return_types; - res->names = names; return std::move(res); } @@ -167,38 +219,23 @@ namespace duckdb { ParquetScanInitLocal(ExecutionContext &context, TableFunctionInitInput &input, GlobalTableFunctionState *gstate_p) { auto res = make_uniq(); const auto &bindData = input.bind_data->Cast(); - ParquetOptions po; - po.binary_as_string = true; - for (int i = 0; i < bindData.files.size(); i++) { - auto set = make_uniq(); - set->reader = make_uniq(context.client, bindData.files[i], po, nullptr); + OpenParquetFiles(context.client, bindData.files, res->sets); + + for (auto &set : res->sets) { + set->populateColumnInfo(bindData.returnCols, bindData.orderBy); set->scanState = make_uniq(); - int j = 0; - for (auto &el : set->reader->metadata->metadata->schema) { - if (el.num_children != 0) { - continue; - } - set->reader->reader_data.column_ids.push_back(j); - j++; - } - set->columnMap = bindData.sets[i]->columnMap; - set->reader->reader_data.column_mapping = set->columnMap; vector rgs(set->reader->metadata->metadata->row_groups.size(), 0); for (idx_t i = 0; i < rgs.size(); i++) { rgs[i] = i; } set->reader->InitializeScan(context.client, *set->scanState, rgs); set->chunk = make_uniq(); - - set->orderByIdx = bindData.sets[i]->orderByIdx; set->result_idx = 0; auto ltypes = vector(); - for (const auto idx : set->columnMap) { - ltypes.push_back(bindData.returnTypes[idx]); - } + std::transform(bindData.returnCols.begin(), bindData.returnCols.end(), std::back_inserter(ltypes), + [](const ReturnColumn &c) { return c.type; }); set->chunk->Initialize(context.client, ltypes); - set->reader->Scan(*set->scanState, *set->chunk); - res->sets.push_back(std::move(set)); + set->Scan(context.client); } res->RecalculateWinnerGroup(); return std::move(res); @@ -207,16 +244,13 @@ namespace duckdb { static void ParquetOrderedScanImplementation( ClientContext &context, duckdb::TableFunctionInput &data_p,DataChunk &output) { auto &loc_state = data_p.local_state->Cast(); - const auto &fieldNames = data_p.bind_data->Cast().names; - const auto &returnTypes = data_p.bind_data->Cast().returnTypes; + const auto &cols = data_p.bind_data->Cast().returnCols; bool toRecalc = false; for (int i = loc_state.sets.size() - 1; i >= 0 ; i--) { if (loc_state.sets[i]->result_idx >= loc_state.sets[i]->chunk->size()) { auto &set = loc_state.sets[i]; set->chunk->Reset(); - loc_state.sets[i]->reader->Scan( - *loc_state.sets[i]->scanState, - *loc_state.sets[i]->chunk); + loc_state.sets[i]->Scan(context); loc_state.sets[i]->result_idx = 0; if (loc_state.sets[i]->chunk->size() == 0) { @@ -247,18 +281,17 @@ namespace duckdb { auto winnerSet = &loc_state.sets[loc_state.winner_group[0]]; Value winner_val = (*winnerSet)->chunk->GetValue( (*winnerSet)->orderByIdx, - (*winnerSet)->result_idx - ); + (*winnerSet)->result_idx); for (int k = 1; k < loc_state.winner_group.size(); k++) { const auto i = loc_state.winner_group[k]; const auto &set = loc_state.sets[i]; const Value &val = set->chunk->GetValue(set->orderByIdx, set->result_idx); - if (val < winner_val) { + if (lt(val, winner_val)) { winnerSet = &loc_state.sets[i]; winner_val = (*winnerSet)->chunk->GetValue(set->orderByIdx, set->result_idx); } } - for (int i = 0; i < fieldNames.size(); i++) { + for (int i = 0; i < cols.size(); i++) { const auto &val = (*winnerSet)->chunk->GetValue(i,(*winnerSet)->result_idx); output.SetValue(i, j, val); } diff --git a/chsql/src/parquet_types.cpp b/chsql/src/parquet_types.cpp new file mode 100644 index 0000000..efdf9ca --- /dev/null +++ b/chsql/src/parquet_types.cpp @@ -0,0 +1,139 @@ +#include "chsql_parquet_types.h" + +bool ParquetType::check_type(const duckdb::vector &schema, idx_t idx) { + auto &el = schema[idx]; + if (parquet_type >= 0 && int(el.type) != parquet_type) { + return false; + } + if (converted_type == -1) { + return !el.__isset.converted_type; + } + return int(el.converted_type) == converted_type; +}; + +duckdb::LogicalType ParquetType::get_logical_type(const duckdb_parquet::SchemaElement &schema) { + return logical_type; +} + +bool LogicalParquetType::check_type(const duckdb::vector &schema, idx_t idx) { + auto &el = schema[idx]; + return el.__isset.logicalType && this->get_isset(el); +} + +duckdb::LogicalType JSONParquetType::get_logical_type(const duckdb_parquet::SchemaElement &schema) { + return duckdb::LogicalType::JSON(); +} + +bool DecimalParquetType::check_type(const duckdb::vector &schema, idx_t idx) { + auto &el = schema[idx]; + return el.__isset.converted_type && el.converted_type == duckdb_parquet::ConvertedType::DECIMAL && + el.__isset.precision && el.__isset.scale && (el.precision > duckdb::DecimalType::MaxWidth() || + el.type == duckdb_parquet::Type::BYTE_ARRAY || + el.type == duckdb_parquet::Type::FIXED_LEN_BYTE_ARRAY || + el.type == duckdb_parquet::Type::INT32 || + el.type == duckdb_parquet::Type::INT64); +} + +duckdb::LogicalType DecimalParquetType::get_logical_type(const duckdb_parquet::SchemaElement &el) { + if (el.precision > duckdb::DecimalType::MaxWidth()) { + return duckdb::LogicalType::DOUBLE; + } + return duckdb::LogicalType::DECIMAL(el.precision, el.scale); +} + +bool isUUID(const duckdb_parquet::SchemaElement &el) { + return el.logicalType.__isset.UUID; +} + +bool isTimestampTZ(const duckdb_parquet::SchemaElement &el) { + return el.logicalType.__isset.TIMESTAMP && el.logicalType.TIMESTAMP.isAdjustedToUTC; +} + +bool isTimestampNS(const duckdb_parquet::SchemaElement &el) { + return el.logicalType.__isset.TIMESTAMP && el.logicalType.TIMESTAMP.unit.__isset.NANOS; +} + +bool isTimestamp(const duckdb_parquet::SchemaElement &el) { + return el.logicalType.__isset.TIMESTAMP; +} + +bool isTimeTZ(const duckdb_parquet::SchemaElement &el) { + return el.logicalType.__isset.TIME && el.logicalType.TIME.isAdjustedToUTC; +} + +bool isTime(const duckdb_parquet::SchemaElement &el) { + return el.logicalType.__isset.TIME; +} + +ParquetType *_types[] = { + new LogicalParquetType(isUUID, duckdb::LogicalType::UUID), + new LogicalParquetType(isTimestampTZ, duckdb::LogicalType::TIMESTAMP_TZ), + new LogicalParquetType(isTimestampNS, duckdb::LogicalType::TIMESTAMP_NS), + new LogicalParquetType(isTimestamp, duckdb::LogicalType::TIMESTAMP), + new LogicalParquetType(isTimeTZ, duckdb::LogicalType::TIME), + new LogicalParquetType(isTime, duckdb::LogicalType::TIME), + + new ParquetType(24, -1, duckdb::LogicalType::SQLNULL), + new ParquetType(duckdb_parquet::ConvertedType::INT_8, duckdb_parquet::Type::INT32, duckdb::LogicalType::TINYINT), + new ParquetType(duckdb_parquet::ConvertedType::INT_16, duckdb_parquet::Type::INT32, duckdb::LogicalType::SMALLINT), + new ParquetType(duckdb_parquet::ConvertedType::INT_32, duckdb_parquet::Type::INT32, duckdb::LogicalType::INTEGER), + new ParquetType(duckdb_parquet::ConvertedType::INT_64, duckdb_parquet::Type::INT64, duckdb::LogicalType::BIGINT), + new ParquetType(duckdb_parquet::ConvertedType::UINT_8, duckdb_parquet::Type::INT32, duckdb::LogicalType::UTINYINT), + new ParquetType(duckdb_parquet::ConvertedType::UINT_16, duckdb_parquet::Type::INT32, + duckdb::LogicalType::USMALLINT), + new ParquetType(duckdb_parquet::ConvertedType::UINT_32, duckdb_parquet::Type::INT32, duckdb::LogicalType::UINTEGER), + new ParquetType(duckdb_parquet::ConvertedType::UINT_64, duckdb_parquet::Type::INT64, duckdb::LogicalType::UBIGINT), + new ParquetType(duckdb_parquet::ConvertedType::DATE, duckdb_parquet::Type::INT32, duckdb::LogicalType::DATE), + new ParquetType(duckdb_parquet::ConvertedType::TIME_MICROS, duckdb_parquet::Type::INT64, duckdb::LogicalType::TIME), + new ParquetType(duckdb_parquet::ConvertedType::TIME_MILLIS, duckdb_parquet::Type::INT64, duckdb::LogicalType::TIME), + new ParquetType(duckdb_parquet::ConvertedType::TIMESTAMP_MILLIS, duckdb_parquet::Type::INT32, + duckdb::LogicalType::TIME), + new ParquetType(duckdb_parquet::ConvertedType::TIMESTAMP_MICROS, duckdb_parquet::Type::INT64, + duckdb::LogicalType::TIME), + new ParquetType(duckdb_parquet::ConvertedType::INTERVAL, -1, duckdb::LogicalType::INTERVAL), + new ParquetType(duckdb_parquet::ConvertedType::UTF8, duckdb_parquet::Type::BYTE_ARRAY, + duckdb::LogicalType::VARCHAR), + new ParquetType(duckdb_parquet::ConvertedType::ENUM, duckdb_parquet::Type::BYTE_ARRAY, + duckdb::LogicalType::VARCHAR), + new ParquetType(duckdb_parquet::ConvertedType::UTF8, duckdb_parquet::Type::FIXED_LEN_BYTE_ARRAY, + duckdb::LogicalType::VARCHAR), + new ParquetType(duckdb_parquet::ConvertedType::ENUM, duckdb_parquet::Type::FIXED_LEN_BYTE_ARRAY, + duckdb::LogicalType::VARCHAR), + + new JSONParquetType(), + new DecimalParquetType(), + + new ParquetType(-1, duckdb_parquet::Type::BOOLEAN, duckdb::LogicalType::BOOLEAN), + new ParquetType(-1, duckdb_parquet::Type::BOOLEAN, duckdb::LogicalType::BOOLEAN), + new ParquetType(-1, duckdb_parquet::Type::INT32, duckdb::LogicalType::INTEGER), + new ParquetType(-1, duckdb_parquet::Type::INT64, duckdb::LogicalType::BIGINT), + new ParquetType(-1, duckdb_parquet::Type::INT96, duckdb::LogicalType::TIMESTAMP), + new ParquetType(-1, duckdb_parquet::Type::FLOAT, duckdb::LogicalType::FLOAT), + new ParquetType(-1, duckdb_parquet::Type::DOUBLE, duckdb::LogicalType::DOUBLE), + new ParquetType(-1, duckdb_parquet::Type::BYTE_ARRAY, duckdb::LogicalType::BLOB), + new ParquetType(-1, duckdb_parquet::Type::FIXED_LEN_BYTE_ARRAY, duckdb::LogicalType::BLOB), +}; + +ParquetTypesManager::ParquetTypesManager() { +}; + +ParquetTypesManager *ParquetTypesManager::instance = nullptr; +std::mutex ParquetTypesManager::instance_mutex; + +ParquetTypesManager *ParquetTypesManager::get_instance() { + std::lock_guard lock(instance_mutex); + if (instance == nullptr) { + instance = new ParquetTypesManager(); + } + return instance; +} + +duckdb::LogicalType ParquetTypesManager::get_logical_type(const duckdb::vector &schema, + idx_t idx) { + for (auto &type: _types) { + if (type->check_type(schema, idx)) { + return type->get_logical_type(schema[idx]); + } + } + throw std::runtime_error("Unsupported Parquet type"); +} diff --git a/duckdb b/duckdb index 1a3d614..7c0f857 160000 --- a/duckdb +++ b/duckdb @@ -1 +1 @@ -Subproject commit 1a3d614f0eec5a2198af8ba4ea06eb9adee9d5f8 +Subproject commit 7c0f8574bda9af7aa5b23166d7860d68ae3b9481 diff --git a/extension-ci-tools b/extension-ci-tools index 916d4ef..58970c5 160000 --- a/extension-ci-tools +++ b/extension-ci-tools @@ -1 +1 @@ -Subproject commit 916d4ef4371068ca98a007378b52582c3e46b4e5 +Subproject commit 58970c538d35919db875096460c05806056f4de0