diff --git a/lib/build.gradle b/lib/build.gradle index 69debe4..4cd2306 100644 --- a/lib/build.gradle +++ b/lib/build.gradle @@ -64,7 +64,7 @@ dependencies { test { useJUnitPlatform() testLogging { - events "passed", "skipped", "failed" + events "skipped", "failed" } jvmArgs("--add-opens=java.base/java.nio=ALL-UNNAMED") } @@ -100,6 +100,7 @@ task runMemDBServe(type: JavaExec) { classpath = sourceSets.main.runtimeClasspath main = javaMainClass args = ["serve"] + jvmArgs = ["--add-opens=java.base/java.nio=ALL-UNNAMED"] } spotless { diff --git a/lib/src/main/java/io/cloudquery/helper/ArrowHelper.java b/lib/src/main/java/io/cloudquery/helper/ArrowHelper.java index f68aec2..cd4b7c9 100644 --- a/lib/src/main/java/io/cloudquery/helper/ArrowHelper.java +++ b/lib/src/main/java/io/cloudquery/helper/ArrowHelper.java @@ -4,8 +4,11 @@ import com.google.protobuf.ByteString; import io.cloudquery.schema.Column; +import io.cloudquery.schema.Resource; import io.cloudquery.schema.Table; import io.cloudquery.schema.Table.TableBuilder; +import io.cloudquery.types.JSONType.JSONVector; +import io.cloudquery.types.UUIDType.UUIDVector; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.channels.Channels; @@ -15,29 +18,136 @@ import java.util.Map; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.FixedSizeBinaryVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.LargeVarBinaryVector; +import org.apache.arrow.vector.LargeVarCharVector; +import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TimeStampVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.UInt1Vector; +import org.apache.arrow.vector.UInt2Vector; +import org.apache.arrow.vector.UInt4Vector; +import org.apache.arrow.vector.UInt8Vector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.arrow.vector.ipc.ArrowStreamReader; import org.apache.arrow.vector.ipc.ArrowStreamWriter; import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.Text; public class ArrowHelper { + public static final String CQ_EXTENSION_INCREMENTAL = "cq:extension:incremental"; + public static final String CQ_EXTENSION_CONSTRAINT_NAME = "cq:extension:constraint_name"; + public static final String CQ_EXTENSION_PRIMARY_KEY = "cq:extension:primary_key"; + public static final String CQ_EXTENSION_UNIQUE = "cq:extension:unique"; public static final String CQ_TABLE_NAME = "cq:table_name"; public static final String CQ_TABLE_TITLE = "cq:table_title"; public static final String CQ_TABLE_DESCRIPTION = "cq:table_description"; public static final String CQ_TABLE_DEPENDS_ON = "cq:table_depends_on"; + private static void setVectorData(FieldVector vector, Object data) { + vector.allocateNew(); + if (vector instanceof BigIntVector) { + ((BigIntVector) vector).set(0, (long) data); + return; + } + if (vector instanceof BitVector) { + ((BitVector) vector).set(0, (int) data); + return; + } + if (vector instanceof FixedSizeBinaryVector) { + ((FixedSizeBinaryVector) vector).set(0, (byte[]) data); + return; + } + if (vector instanceof Float4Vector) { + ((Float4Vector) vector).set(0, (float) data); + return; + } + if (vector instanceof Float8Vector) { + ((Float8Vector) vector).set(0, (double) data); + return; + } + if (vector instanceof IntVector) { + ((IntVector) vector).set(0, (int) data); + return; + } + if (vector instanceof LargeVarBinaryVector) { + ((LargeVarBinaryVector) vector).set(0, (byte[]) data); + return; + } + if (vector instanceof LargeVarCharVector) { + ((LargeVarCharVector) vector).set(0, (Text) data); + return; + } + if (vector instanceof SmallIntVector) { + ((SmallIntVector) vector).set(0, (short) data); + return; + } + if (vector instanceof TimeStampVector) { + ((TimeStampVector) vector).set(0, (long) data); + return; + } + if (vector instanceof TinyIntVector) { + ((TinyIntVector) vector).set(0, (byte) data); + return; + } + if (vector instanceof UInt1Vector) { + ((UInt1Vector) vector).set(0, (byte) data); + return; + } + if (vector instanceof UInt2Vector) { + ((UInt2Vector) vector).set(0, (short) data); + return; + } + if (vector instanceof UInt4Vector) { + ((UInt4Vector) vector).set(0, (int) data); + return; + } + if (vector instanceof UInt8Vector) { + ((UInt8Vector) vector).set(0, (long) data); + return; + } + if (vector instanceof VarBinaryVector) { + ((VarBinaryVector) vector).set(0, (byte[]) data); + return; + } + if (vector instanceof VarCharVector) { + ((VarCharVector) vector).set(0, (Text) data); + return; + } + if (vector instanceof UUIDVector) { + ((UUIDVector) vector).set(0, (java.util.UUID) data); + return; + } + if (vector instanceof JSONVector) { + ((JSONVector) vector).setSafe(0, (byte[]) data); + return; + } + + throw new IllegalArgumentException("Unsupported vector type: " + vector.getClass()); + } + public static ByteString encode(Table table) throws IOException { try (BufferAllocator bufferAllocator = new RootAllocator()) { Schema schema = toArrowSchema(table); - VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(schema, bufferAllocator); - try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - try (ArrowStreamWriter writer = - new ArrowStreamWriter(schemaRoot, null, Channels.newChannel(out))) { - writer.start(); - writer.end(); - return ByteString.copyFrom(out.toByteArray()); + try (VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(schema, bufferAllocator)) { + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + try (ArrowStreamWriter writer = + new ArrowStreamWriter(schemaRoot, null, Channels.newChannel(out))) { + writer.start(); + writer.end(); + return ByteString.copyFrom(out.toByteArray()); + } } } } @@ -57,7 +167,15 @@ public static Schema toArrowSchema(Table table) { Field[] fields = new Field[columns.size()]; for (int i = 0; i < columns.size(); i++) { Column column = columns.get(i); - Field field = Field.nullable(column.getName(), column.getType()); + Map metadata = new HashMap<>(); + metadata.put(CQ_EXTENSION_UNIQUE, column.isUnique() ? "true" : "false"); + metadata.put(CQ_EXTENSION_PRIMARY_KEY, column.isPrimaryKey() ? "true" : "false"); + metadata.put(CQ_EXTENSION_INCREMENTAL, column.isIncrementalKey() ? "true" : "false"); + Field field = + new Field( + column.getName(), + new FieldType(!column.isNotNull(), column.getType(), null, metadata), + null); fields[i] = field; } Map metadata = new HashMap<>(); @@ -71,13 +189,25 @@ public static Schema toArrowSchema(Table table) { if (table.getParent() != null) { metadata.put(CQ_TABLE_DEPENDS_ON, table.getParent().getName()); } + metadata.put(CQ_EXTENSION_CONSTRAINT_NAME, table.getConstraintName()); return new Schema(asList(fields), metadata); } public static Table fromArrowSchema(Schema schema) { List columns = new ArrayList<>(); for (Field field : schema.getFields()) { - columns.add(Column.builder().name(field.getName()).type(field.getType()).build()); + boolean isUnique = field.getMetadata().get(CQ_EXTENSION_UNIQUE) == "true"; + boolean isPrimaryKey = field.getMetadata().get(CQ_EXTENSION_PRIMARY_KEY) == "true"; + boolean isIncrementalKey = field.getMetadata().get(CQ_EXTENSION_INCREMENTAL) == "true"; + + columns.add( + Column.builder() + .name(field.getName()) + .unique(isUnique) + .primaryKey(isPrimaryKey) + .incrementalKey(isIncrementalKey) + .type(field.getType()) + .build()); } Map metaData = schema.getCustomMetadata(); @@ -85,8 +215,11 @@ public static Table fromArrowSchema(Schema schema) { String title = metaData.get(CQ_TABLE_TITLE); String description = metaData.get(CQ_TABLE_DESCRIPTION); String parent = metaData.get(CQ_TABLE_DEPENDS_ON); + String constraintName = metaData.get(CQ_EXTENSION_CONSTRAINT_NAME); + + TableBuilder tableBuilder = + Table.builder().name(name).constraintName(constraintName).columns(columns); - TableBuilder tableBuilder = Table.builder().name(name).columns(columns); if (title != null) { tableBuilder.title(title); } @@ -99,4 +232,29 @@ public static Table fromArrowSchema(Schema schema) { return tableBuilder.build(); } + + public static ByteString encode(Resource resource) throws IOException { + try (BufferAllocator bufferAllocator = new RootAllocator()) { + Table table = resource.getTable(); + Schema schema = toArrowSchema(table); + try (VectorSchemaRoot vectorRoot = VectorSchemaRoot.create(schema, bufferAllocator)) { + for (int i = 0; i < table.getColumns().size(); i++) { + FieldVector vector = vectorRoot.getVector(i); + Object data = resource.getData().get(i).get(); + setVectorData(vector, data); + } + // TODO: Support encoding multiple resources + vectorRoot.setRowCount(1); + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + try (ArrowStreamWriter writer = + new ArrowStreamWriter(vectorRoot, null, Channels.newChannel(out))) { + writer.start(); + writer.writeBatch(); + writer.end(); + return ByteString.copyFrom(out.toByteArray()); + } + } + } + } + } } diff --git a/lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java b/lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java index 0439fb5..43cb1a7 100644 --- a/lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java +++ b/lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java @@ -53,6 +53,7 @@ public void init( responseObserver.onNext(io.cloudquery.plugin.v3.Init.Response.newBuilder().build()); responseObserver.onCompleted(); } catch (Exception e) { + plugin.getLogger().error("Error initializing plugin", e); responseObserver.onError(e); } } @@ -77,6 +78,7 @@ public void getTables( .build()); responseObserver.onCompleted(); } catch (Exception e) { + plugin.getLogger().error("Error getting tables", e); responseObserver.onError(e); } } @@ -95,6 +97,7 @@ public void sync( request.getBackend().getTableName(), request.getBackend().getConnection()), responseObserver); } catch (Exception e) { + plugin.getLogger().error("Error syncing tables", e); responseObserver.onError(e); } } diff --git a/lib/src/main/java/io/cloudquery/memdb/MemDB.java b/lib/src/main/java/io/cloudquery/memdb/MemDB.java index 593cf83..8d90932 100644 --- a/lib/src/main/java/io/cloudquery/memdb/MemDB.java +++ b/lib/src/main/java/io/cloudquery/memdb/MemDB.java @@ -8,44 +8,56 @@ import io.cloudquery.plugin.TableOutputStream; import io.cloudquery.scheduler.Scheduler; import io.cloudquery.schema.ClientMeta; -import io.cloudquery.schema.Column; import io.cloudquery.schema.Resource; import io.cloudquery.schema.SchemaException; import io.cloudquery.schema.Table; import io.cloudquery.schema.TableResolver; +import io.cloudquery.transformers.Tables; +import io.cloudquery.transformers.TransformWithClass; import io.grpc.stub.StreamObserver; import java.util.List; -import org.apache.arrow.vector.types.pojo.ArrowType.Utf8; +import java.util.UUID; public class MemDB extends Plugin { - private List allTables = - List.of( - Table.builder() - .name("table1") - .resolver( - new TableResolver() { - @Override - public void resolve( - ClientMeta clientMeta, Resource parent, TableOutputStream stream) { - stream.write(Table1Data.builder().name("name1").build()); - stream.write(Table1Data.builder().name("name2").build()); - } - }) - .columns(List.of(Column.builder().name("name").type(new Utf8()).build())) - .build(), - Table.builder() - .name("table2") - .resolver( - new TableResolver() { - @Override - public void resolve( - ClientMeta clientMeta, Resource parent, TableOutputStream stream) { - stream.write(Table2Data.builder().id("id1").build()); - stream.write(Table2Data.builder().id("id2").build()); - } - }) - .columns(List.of(Column.builder().name("id").type(new Utf8()).build())) - .build()); + private static List
getTables() { + return List.of( + Table.builder() + .name("table1") + .resolver( + new TableResolver() { + @Override + public void resolve( + ClientMeta clientMeta, Resource parent, TableOutputStream stream) { + stream.write( + Table1Data.builder() + .id(UUID.fromString("46b2b6e6-8f3e-4340-a721-4aa0786b1cc0")) + .name("name1") + .build()); + stream.write( + Table1Data.builder() + .id(UUID.fromString("e89f95df-a389-4f1b-9ba6-1fab565523d6")) + .name("name2") + .build()); + } + }) + .transform(TransformWithClass.builder(Table1Data.class).pkField("id").build()) + .build(), + Table.builder() + .name("table2") + .resolver( + new TableResolver() { + @Override + public void resolve( + ClientMeta clientMeta, Resource parent, TableOutputStream stream) { + stream.write(Table2Data.builder().id(1).name("name1").build()); + stream.write(Table2Data.builder().id(2).name("name2").build()); + } + }) + .transform(TransformWithClass.builder(Table2Data.class).pkField("id").build()) + .build()); + } + + private List
allTables; private Spec spec; @@ -107,10 +119,9 @@ public void close() { @Override public ClientMeta newClient(String spec, NewClientOptions options) throws Exception { - if (options.isNoConnection()) { - return null; - } this.spec = Spec.fromJSON(spec); + this.allTables = getTables(); + Tables.transformTables(allTables); return new MemDBClient(); } } diff --git a/lib/src/main/java/io/cloudquery/memdb/Table1Data.java b/lib/src/main/java/io/cloudquery/memdb/Table1Data.java index 7eee118..f4ba5d0 100644 --- a/lib/src/main/java/io/cloudquery/memdb/Table1Data.java +++ b/lib/src/main/java/io/cloudquery/memdb/Table1Data.java @@ -1,10 +1,12 @@ package io.cloudquery.memdb; +import java.util.UUID; import lombok.Builder; import lombok.Getter; @Builder @Getter public class Table1Data { + private UUID id; private String name; } diff --git a/lib/src/main/java/io/cloudquery/memdb/Table2Data.java b/lib/src/main/java/io/cloudquery/memdb/Table2Data.java index 7715300..494b331 100644 --- a/lib/src/main/java/io/cloudquery/memdb/Table2Data.java +++ b/lib/src/main/java/io/cloudquery/memdb/Table2Data.java @@ -6,5 +6,6 @@ @Builder @Getter public class Table2Data { - private String id; + private int id; + private String name; } diff --git a/lib/src/main/java/io/cloudquery/plugin/SyncStream.java b/lib/src/main/java/io/cloudquery/plugin/SyncStream.java new file mode 100644 index 0000000..684e82d --- /dev/null +++ b/lib/src/main/java/io/cloudquery/plugin/SyncStream.java @@ -0,0 +1,5 @@ +package io.cloudquery.plugin; + +import io.grpc.stub.StreamObserver; + +public interface SyncStream extends StreamObserver {} diff --git a/lib/src/main/java/io/cloudquery/scalar/Scalar.java b/lib/src/main/java/io/cloudquery/scalar/Scalar.java index 232540e..c996079 100644 --- a/lib/src/main/java/io/cloudquery/scalar/Scalar.java +++ b/lib/src/main/java/io/cloudquery/scalar/Scalar.java @@ -48,7 +48,10 @@ public void set(Object value) throws ValidationException { } public T get() { - return this.value; + if (this.isValid()) { + return this.value; + } + return null; } public boolean equals(Object other) { diff --git a/lib/src/main/java/io/cloudquery/scalar/String.java b/lib/src/main/java/io/cloudquery/scalar/String.java index 23d0d53..cb02aab 100644 --- a/lib/src/main/java/io/cloudquery/scalar/String.java +++ b/lib/src/main/java/io/cloudquery/scalar/String.java @@ -1,9 +1,9 @@ package io.cloudquery.scalar; import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.util.Text; -public class String extends Scalar { - +public class String extends Scalar { public String() { super(); } @@ -19,6 +19,6 @@ public ArrowType dataType() { @Override public void setValue(Object value) throws ValidationException { - this.value = value.toString(); + this.value = new Text(value.toString()); } } diff --git a/lib/src/main/java/io/cloudquery/scalar/Timestamp.java b/lib/src/main/java/io/cloudquery/scalar/Timestamp.java index 0c28e4e..37d00f0 100644 --- a/lib/src/main/java/io/cloudquery/scalar/Timestamp.java +++ b/lib/src/main/java/io/cloudquery/scalar/Timestamp.java @@ -4,12 +4,11 @@ import org.apache.arrow.vector.types.TimeUnit; import org.apache.arrow.vector.types.pojo.ArrowType; -public class Timestamp extends Scalar { +public class Timestamp extends Scalar { public static final ZoneId zoneID = ZoneOffset.UTC; // TODO: add more units support later - private static final ArrowType dt = - new ArrowType.Timestamp(TimeUnit.MILLISECOND, zoneID.toString()); + private static final ArrowType dt = new ArrowType.Timestamp(TimeUnit.SECOND, zoneID.toString()); public Timestamp() { super(); @@ -27,36 +26,47 @@ public ArrowType dataType() { @Override public void setValue(Object value) throws ValidationException { if (value instanceof ZonedDateTime timestamp) { - this.value = timestamp.withZoneSameInstant(zoneID); + this.value = timestamp.withZoneSameInstant(zoneID).toEpochSecond(); return; } if (value instanceof LocalDate date) { - this.value = date.atStartOfDay(zoneID); + this.value = date.atStartOfDay(zoneID).toEpochSecond(); return; } if (value instanceof LocalDateTime date) { - this.value = date.atZone(zoneID); + this.value = date.atZone(zoneID).toEpochSecond(); return; } if (value instanceof Integer integer) { - this.value = ZonedDateTime.ofInstant(Instant.ofEpochMilli(integer), ZoneOffset.UTC); + this.value = + ZonedDateTime.ofInstant(Instant.ofEpochMilli(integer), ZoneOffset.UTC).toEpochSecond(); return; } if (value instanceof Long longValue) { - this.value = ZonedDateTime.ofInstant(Instant.ofEpochMilli(longValue), ZoneOffset.UTC); + this.value = + ZonedDateTime.ofInstant(Instant.ofEpochMilli(longValue), ZoneOffset.UTC).toEpochSecond(); return; } if (value instanceof CharSequence sequence) { - this.value = ZonedDateTime.parse(sequence); + this.value = ZonedDateTime.parse(sequence).toEpochSecond(); return; } throw new ValidationException( ValidationException.NO_CONVERSION_AVAILABLE, this.dataType(), value); } + + @Override + public java.lang.String toString() { + if (this.value != null) { + return ZonedDateTime.ofInstant(Instant.ofEpochSecond((Long) this.value), zoneID).toString(); + } + + return NULL_VALUE_STRING; + } } diff --git a/lib/src/main/java/io/cloudquery/scheduler/OnResourceResolved.java b/lib/src/main/java/io/cloudquery/scheduler/OnResourceResolved.java new file mode 100644 index 0000000..288c001 --- /dev/null +++ b/lib/src/main/java/io/cloudquery/scheduler/OnResourceResolved.java @@ -0,0 +1,3 @@ +package io.cloudquery.scheduler; + +public class OnResourceResolved {} diff --git a/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java b/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java index 4b4049d..897d0b9 100644 --- a/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java +++ b/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java @@ -5,7 +5,6 @@ import io.cloudquery.schema.ClientMeta; import io.cloudquery.schema.Table; import io.grpc.stub.StreamObserver; -import java.io.IOException; import java.util.List; import lombok.Builder; import lombok.NonNull; @@ -29,7 +28,7 @@ public void sync() { Sync.MessageMigrateTable.newBuilder().setTable(ArrowHelper.encode(table)).build(); Sync.Response response = Sync.Response.newBuilder().setMigrateTable(migrateTable).build(); syncStream.onNext(response); - } catch (IOException e) { + } catch (Exception e) { syncStream.onError(e); return; } @@ -38,6 +37,10 @@ public void sync() { for (Table table : tables) { try { logger.info("resolving table: {}", table.getName()); + if (table.getResolver() == null) { + logger.error("no resolver for table: {}", table.getName()); + continue; + } SchedulerTableOutputStream schedulerTableOutputStream = SchedulerTableOutputStream.builder() .table(table) diff --git a/lib/src/main/java/io/cloudquery/scheduler/SchedulerTableOutputStream.java b/lib/src/main/java/io/cloudquery/scheduler/SchedulerTableOutputStream.java index a13bf2f..f25ebad 100644 --- a/lib/src/main/java/io/cloudquery/scheduler/SchedulerTableOutputStream.java +++ b/lib/src/main/java/io/cloudquery/scheduler/SchedulerTableOutputStream.java @@ -1,5 +1,6 @@ package io.cloudquery.scheduler; +import com.google.protobuf.ByteString; import io.cloudquery.plugin.TableOutputStream; import io.cloudquery.plugin.v3.Sync; import io.cloudquery.schema.ClientMeta; @@ -8,7 +9,6 @@ import io.cloudquery.schema.Table; import io.cloudquery.transformers.TransformerException; import io.grpc.stub.StreamObserver; -import java.io.IOException; import lombok.Builder; import lombok.NonNull; import org.apache.logging.log4j.Logger; @@ -28,7 +28,7 @@ public void write(Object data) { try { logger.info("resolving column: {}", column.getName()); if (column.getResolver() == null) { - // TODO: Fall back to path resolver + logger.error("no resolver for column: {}", column.getName()); continue; } column.getResolver().resolve(client, resource, column); @@ -40,11 +40,11 @@ public void write(Object data) { } try { - Sync.MessageInsert insert = - Sync.MessageInsert.newBuilder().setRecord(resource.encode()).build(); + ByteString record = resource.encode(); + Sync.MessageInsert insert = Sync.MessageInsert.newBuilder().setRecord(record).build(); Sync.Response response = Sync.Response.newBuilder().setInsert(insert).build(); syncStream.onNext(response); - } catch (IOException e) { + } catch (Exception e) { logger.error("Failed to encode resource: {}", resource, e); return; } diff --git a/lib/src/main/java/io/cloudquery/schema/Resource.java b/lib/src/main/java/io/cloudquery/schema/Resource.java index 33b2141..e2b6c2a 100644 --- a/lib/src/main/java/io/cloudquery/schema/Resource.java +++ b/lib/src/main/java/io/cloudquery/schema/Resource.java @@ -42,7 +42,6 @@ public Scalar get(String columnName) { } public ByteString encode() throws IOException { - // TODO: Encode data and not only schema - return ArrowHelper.encode(table); + return ArrowHelper.encode(this); } } diff --git a/lib/src/main/java/io/cloudquery/schema/Table.java b/lib/src/main/java/io/cloudquery/schema/Table.java index 8f91c09..9ac307b 100644 --- a/lib/src/main/java/io/cloudquery/schema/Table.java +++ b/lib/src/main/java/io/cloudquery/schema/Table.java @@ -126,6 +126,7 @@ public static int maxDepth(List
tables) { } @NonNull private String name; + @Builder.Default private String constraintName = ""; private TableResolver resolver; private String title; private String description; diff --git a/lib/src/main/java/io/cloudquery/transformers/Tables.java b/lib/src/main/java/io/cloudquery/transformers/Tables.java index f038c86..b0b276f 100644 --- a/lib/src/main/java/io/cloudquery/transformers/Tables.java +++ b/lib/src/main/java/io/cloudquery/transformers/Tables.java @@ -5,7 +5,7 @@ import io.cloudquery.schema.Table; import java.util.List; -class Tables { +public class Tables { public static void setParents(List
tables, Table parent) { for (Table table : tables) { table.setParent(parent); diff --git a/lib/src/main/java/io/cloudquery/transformers/TypeTransformer.java b/lib/src/main/java/io/cloudquery/transformers/TypeTransformer.java index b6ec63c..b1cc640 100644 --- a/lib/src/main/java/io/cloudquery/transformers/TypeTransformer.java +++ b/lib/src/main/java/io/cloudquery/transformers/TypeTransformer.java @@ -3,6 +3,7 @@ import io.cloudquery.types.InetType; import io.cloudquery.types.JSONType; import io.cloudquery.types.ListType; +import io.cloudquery.types.UUIDType; import java.lang.reflect.Field; import org.apache.arrow.vector.types.FloatingPointPrecision; import org.apache.arrow.vector.types.TimeUnit; @@ -42,6 +43,9 @@ private static ArrowType transformArrowType(String name, Class type) case "java.time.LocalDateTime" -> { return new ArrowType.Timestamp(TimeUnit.MICROSECOND, null); } + case "java.util.UUID" -> { + return new UUIDType(); + } default -> { if (type.isArray()) { Class componentType = type.getComponentType(); diff --git a/lib/src/main/java/io/cloudquery/types/JSONType.java b/lib/src/main/java/io/cloudquery/types/JSONType.java index 16e7852..2484d05 100644 --- a/lib/src/main/java/io/cloudquery/types/JSONType.java +++ b/lib/src/main/java/io/cloudquery/types/JSONType.java @@ -80,12 +80,12 @@ public int hashCode(int index, ArrowBufHasher hasher) { return getUnderlyingVector().hashCode(index, hasher); } - public String get(int index) { - return new String((byte[]) getObject(index)); + public byte[] get(int index) { + return (byte[]) getObject(index); } - public void set(int index, String value) { - getUnderlyingVector().setSafe(index, value.getBytes(), 0, value.getBytes().length); + public void setSafe(int index, byte[] bytes) { + getUnderlyingVector().setSafe(index, bytes); } } } diff --git a/lib/src/main/java/io/cloudquery/types/UUIDType.java b/lib/src/main/java/io/cloudquery/types/UUIDType.java index 4ae6ddf..6f7350d 100644 --- a/lib/src/main/java/io/cloudquery/types/UUIDType.java +++ b/lib/src/main/java/io/cloudquery/types/UUIDType.java @@ -10,6 +10,7 @@ import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.FixedSizeBinaryVector; import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.ArrowType.ExtensionType; import org.apache.arrow.vector.types.pojo.FieldType; public class UUIDType extends ExtensionType { diff --git a/lib/src/test/java/io/cloudquery/helper/ArrowHelperTest.java b/lib/src/test/java/io/cloudquery/helper/ArrowHelperTest.java index cdb041f..ff5dc46 100644 --- a/lib/src/test/java/io/cloudquery/helper/ArrowHelperTest.java +++ b/lib/src/test/java/io/cloudquery/helper/ArrowHelperTest.java @@ -1,5 +1,9 @@ package io.cloudquery.helper; +import static io.cloudquery.helper.ArrowHelper.CQ_EXTENSION_CONSTRAINT_NAME; +import static io.cloudquery.helper.ArrowHelper.CQ_EXTENSION_INCREMENTAL; +import static io.cloudquery.helper.ArrowHelper.CQ_EXTENSION_PRIMARY_KEY; +import static io.cloudquery.helper.ArrowHelper.CQ_EXTENSION_UNIQUE; import static io.cloudquery.helper.ArrowHelper.CQ_TABLE_DEPENDS_ON; import static io.cloudquery.helper.ArrowHelper.CQ_TABLE_DESCRIPTION; import static io.cloudquery.helper.ArrowHelper.CQ_TABLE_NAME; @@ -27,7 +31,13 @@ public class ArrowHelperTest { .parent(Table.builder().name("parent").build()) .columns( List.of( - Column.builder().name("column1").type(ArrowType.Utf8.INSTANCE).build(), + Column.builder() + .name("column1") + .type(ArrowType.Utf8.INSTANCE) + .unique(true) + .incrementalKey(true) + .primaryKey(true) + .build(), Column.builder().name("column2").type(ArrowType.Utf8.INSTANCE).build())) .build(); @@ -36,7 +46,25 @@ public void testToArrowSchema() { Schema arrowSchema = ArrowHelper.toArrowSchema(TEST_TABLE); assertEquals(arrowSchema.getFields().get(0).getName(), "column1"); + assertEquals( + arrowSchema.getFields().get(0).getMetadata(), + Map.of( + CQ_EXTENSION_UNIQUE, + "true", + CQ_EXTENSION_INCREMENTAL, + "true", + CQ_EXTENSION_PRIMARY_KEY, + "true")); assertEquals(arrowSchema.getFields().get(1).getName(), "column2"); + assertEquals( + arrowSchema.getFields().get(1).getMetadata(), + Map.of( + CQ_EXTENSION_UNIQUE, + "false", + CQ_EXTENSION_INCREMENTAL, + "false", + CQ_EXTENSION_PRIMARY_KEY, + "false")); assertEquals( arrowSchema.getCustomMetadata(), @@ -44,7 +72,8 @@ public void testToArrowSchema() { CQ_TABLE_NAME, "table1", CQ_TABLE_DESCRIPTION, "A simple test table", CQ_TABLE_TITLE, "Test table title", - CQ_TABLE_DEPENDS_ON, "parent")); + CQ_TABLE_DEPENDS_ON, "parent", + CQ_EXTENSION_CONSTRAINT_NAME, "")); } @Test diff --git a/lib/src/test/java/io/cloudquery/scalar/StringTest.java b/lib/src/test/java/io/cloudquery/scalar/StringTest.java index 7d6e76b..5266657 100644 --- a/lib/src/test/java/io/cloudquery/scalar/StringTest.java +++ b/lib/src/test/java/io/cloudquery/scalar/StringTest.java @@ -90,14 +90,14 @@ public void testGet() { string.set(-1L); }); assertTrue(string.isValid()); - assertEquals("-1", string.get()); + assertEquals("-1", string.get().toString()); assertDoesNotThrow( () -> { string.set(""); }); assertTrue(string.isValid()); - assertEquals("", string.get()); + assertEquals("", string.get().toString()); } @Test diff --git a/lib/src/test/java/io/cloudquery/scalar/TimestampTest.java b/lib/src/test/java/io/cloudquery/scalar/TimestampTest.java index bfcd0ce..8f74f1d 100644 --- a/lib/src/test/java/io/cloudquery/scalar/TimestampTest.java +++ b/lib/src/test/java/io/cloudquery/scalar/TimestampTest.java @@ -49,9 +49,13 @@ public void testToString() { () -> { timestamp.set(1); }); - assertEquals("1970-01-01T00:00:00.001Z", timestamp.toString()); + assertEquals("1970-01-01T00:00Z", timestamp.toString()); - java.lang.String ts = ZonedDateTime.now(ZoneOffset.UTC).toString(); + java.lang.String ts = + ZonedDateTime.ofInstant( + Instant.ofEpochSecond(ZonedDateTime.now(ZoneOffset.UTC).toEpochSecond()), + ZoneOffset.UTC) + .toString(); assertDoesNotThrow( () -> { timestamp.set(ts); @@ -62,7 +66,7 @@ public void testToString() { @Test public void testDataType() { Timestamp timestamp = new Timestamp(); - assertEquals(new ArrowType.Timestamp(TimeUnit.MILLISECOND, "Z"), timestamp.dataType()); + assertEquals(new ArrowType.Timestamp(TimeUnit.SECOND, "Z"), timestamp.dataType()); } @Test @@ -114,14 +118,14 @@ public void testGet() { timestamp.set(ts); }); assertTrue(timestamp.isValid()); - assertEquals(ts, timestamp.get()); + assertEquals(ts.toEpochSecond(), timestamp.get()); assertDoesNotThrow( () -> { timestamp.set(0); }); assertTrue(timestamp.isValid()); - assertEquals(Instant.EPOCH.atZone(ZoneOffset.UTC), timestamp.get()); + assertEquals(Instant.EPOCH.atZone(ZoneOffset.UTC).toEpochSecond(), timestamp.get()); } @Test diff --git a/lib/src/test/java/io/cloudquery/types/JSONTypeTest.java b/lib/src/test/java/io/cloudquery/types/JSONTypeTest.java index 996dcda..a1acdea 100644 --- a/lib/src/test/java/io/cloudquery/types/JSONTypeTest.java +++ b/lib/src/test/java/io/cloudquery/types/JSONTypeTest.java @@ -1,5 +1,6 @@ package io.cloudquery.types; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -8,7 +9,6 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; -import java.io.OutputStream; import java.nio.channels.FileChannel; import java.nio.channels.WritableByteChannel; import java.nio.file.Files; @@ -45,7 +45,7 @@ public static class Person { } private File file; - private List jsonData; + private List jsonData; @BeforeAll public static void setUpTest() { @@ -72,16 +72,16 @@ public void shouldSetJSONOnJSONVector() throws IOException { try (BufferAllocator allocator = new RootAllocator()) { ArrowType.ExtensionType jsonType = ExtensionTypeRegistry.lookup("json"); try (JSONVector vector = (JSONVector) jsonType.getNewVector("vector", null, allocator)) { - vector.set(0, jsonData.get(0)); + vector.setValueCount(4); + vector.setSafe(0, jsonData.get(0)); vector.setNull(1); - vector.set(2, jsonData.get(1)); + vector.setSafe(2, jsonData.get(1)); vector.setNull(3); - vector.setValueCount(4); // Assert that the values were set correctly - assertEquals(jsonData.get(0), vector.get(0), "JSON should match"); + assertArrayEquals(jsonData.get(0), vector.get(0), "JSON should match"); assertTrue(vector.isNull(1), "Should be null"); - assertEquals(jsonData.get(1), vector.get(2), "JSON should match"); + assertArrayEquals(jsonData.get(1), vector.get(2), "JSON should match"); assertTrue(vector.isNull(3), "Should be null"); // Assert that the value count and null count are correct @@ -110,7 +110,7 @@ public void roundTripJSON() throws IOException { JSONVector fieldVector = (JSONVector) reader.getVectorSchemaRoot().getVector(FIELD_NAME); assertEquals(jsonData.size(), fieldVector.getValueCount(), "Value count should match"); for (int i = 0; i < jsonData.size(); i++) { - assertEquals(jsonData.get(i), fieldVector.get(i), "JSON should match"); + assertArrayEquals(jsonData.get(i), fieldVector.get(i), "JSON should match"); } } } @@ -128,7 +128,7 @@ private void generateDataAndWriteToFile(VectorSchemaRoot root) throws IOExceptio // Generate some JSON data vector.setValueCount(jsonData.size()); for (int i = 0; i < jsonData.size(); i++) { - vector.set(i, jsonData.get(i)); + vector.setSafe(i, jsonData.get(i)); } root.setRowCount(jsonData.size()); @@ -142,10 +142,10 @@ private void generateDataAndWriteToFile(VectorSchemaRoot root) throws IOExceptio } } - private static String toJSON(Object object) throws IOException { - try (OutputStream outputStream = new ByteArrayOutputStream()) { + private static byte[] toJSON(Object object) throws IOException { + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { OBJECT_MAPPER.writeValue(outputStream, object); - return outputStream.toString(); + return outputStream.toByteArray(); } } }