Skip to content

Commit 70f3bf8

Browse files
committed
feat: Encode resources with data
1 parent 2b82e04 commit 70f3bf8

File tree

22 files changed

+297
-90
lines changed

22 files changed

+297
-90
lines changed

lib/build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ dependencies {
6464
test {
6565
useJUnitPlatform()
6666
testLogging {
67-
events "passed", "skipped", "failed"
67+
events "skipped", "failed"
6868
}
6969
jvmArgs("--add-opens=java.base/java.nio=ALL-UNNAMED")
7070
}
@@ -100,6 +100,7 @@ task runMemDBServe(type: JavaExec) {
100100
classpath = sourceSets.main.runtimeClasspath
101101
main = javaMainClass
102102
args = ["serve"]
103+
jvmArgs = ["--add-opens=java.base/java.nio=ALL-UNNAMED"]
103104
}
104105

105106
spotless {

lib/src/main/java/io/cloudquery/helper/ArrowHelper.java

Lines changed: 165 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44

55
import com.google.protobuf.ByteString;
66
import io.cloudquery.schema.Column;
7+
import io.cloudquery.schema.Resource;
78
import io.cloudquery.schema.Table;
89
import io.cloudquery.schema.Table.TableBuilder;
10+
import io.cloudquery.types.JSONType.JSONVector;
11+
import io.cloudquery.types.UUIDType.UUIDVector;
912
import java.io.ByteArrayOutputStream;
1013
import java.io.IOException;
1114
import java.nio.channels.Channels;
@@ -15,29 +18,137 @@
1518
import java.util.Map;
1619
import org.apache.arrow.memory.BufferAllocator;
1720
import org.apache.arrow.memory.RootAllocator;
21+
import org.apache.arrow.vector.BigIntVector;
22+
import org.apache.arrow.vector.BitVector;
23+
import org.apache.arrow.vector.FieldVector;
24+
import org.apache.arrow.vector.FixedSizeBinaryVector;
25+
import org.apache.arrow.vector.Float4Vector;
26+
import org.apache.arrow.vector.Float8Vector;
27+
import org.apache.arrow.vector.IntVector;
28+
import org.apache.arrow.vector.LargeVarBinaryVector;
29+
import org.apache.arrow.vector.LargeVarCharVector;
30+
import org.apache.arrow.vector.SmallIntVector;
31+
import org.apache.arrow.vector.TimeStampVector;
32+
import org.apache.arrow.vector.TinyIntVector;
33+
import org.apache.arrow.vector.UInt1Vector;
34+
import org.apache.arrow.vector.UInt2Vector;
35+
import org.apache.arrow.vector.UInt4Vector;
36+
import org.apache.arrow.vector.UInt8Vector;
37+
import org.apache.arrow.vector.VarBinaryVector;
38+
import org.apache.arrow.vector.VarCharVector;
1839
import org.apache.arrow.vector.VectorSchemaRoot;
1940
import org.apache.arrow.vector.ipc.ArrowReader;
2041
import org.apache.arrow.vector.ipc.ArrowStreamReader;
2142
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
2243
import org.apache.arrow.vector.types.pojo.Field;
44+
import org.apache.arrow.vector.types.pojo.FieldType;
2345
import org.apache.arrow.vector.types.pojo.Schema;
46+
import org.apache.arrow.vector.util.Text;
2447

2548
public class ArrowHelper {
49+
public static final String CQ_EXTENSION_INCREMENTAL = "cq:extension:incremental";
50+
public static final String CQ_EXTENSION_CONSTRAINT_NAME = "cq:extension:constraint_name";
51+
public static final String CQ_EXTENSION_PRIMARY_KEY = "cq:extension:primary_key";
52+
public static final String CQ_EXTENSION_UNIQUE = "cq:extension:unique";
2653
public static final String CQ_TABLE_NAME = "cq:table_name";
2754
public static final String CQ_TABLE_TITLE = "cq:table_title";
2855
public static final String CQ_TABLE_DESCRIPTION = "cq:table_description";
2956
public static final String CQ_TABLE_DEPENDS_ON = "cq:table_depends_on";
3057

58+
private static void setVectorData(FieldVector vector, Object data) {
59+
vector.allocateNew();
60+
vector.setValueCount(1);
61+
if (vector instanceof BigIntVector) {
62+
((BigIntVector) vector).set(0, (long) data);
63+
return;
64+
}
65+
if (vector instanceof BitVector) {
66+
((BitVector) vector).set(0, (int) data);
67+
return;
68+
}
69+
if (vector instanceof FixedSizeBinaryVector) {
70+
((FixedSizeBinaryVector) vector).set(0, (byte[]) data);
71+
return;
72+
}
73+
if (vector instanceof Float4Vector) {
74+
((Float4Vector) vector).set(0, (float) data);
75+
return;
76+
}
77+
if (vector instanceof Float8Vector) {
78+
((Float8Vector) vector).set(0, (double) data);
79+
return;
80+
}
81+
if (vector instanceof IntVector) {
82+
((IntVector) vector).set(0, (int) data);
83+
return;
84+
}
85+
if (vector instanceof LargeVarBinaryVector) {
86+
((LargeVarBinaryVector) vector).set(0, (byte[]) data);
87+
return;
88+
}
89+
if (vector instanceof LargeVarCharVector) {
90+
((LargeVarCharVector) vector).set(0, (Text) data);
91+
return;
92+
}
93+
if (vector instanceof SmallIntVector) {
94+
((SmallIntVector) vector).set(0, (short) data);
95+
return;
96+
}
97+
if (vector instanceof TimeStampVector) {
98+
((TimeStampVector) vector).set(0, (long) data);
99+
return;
100+
}
101+
if (vector instanceof TinyIntVector) {
102+
((TinyIntVector) vector).set(0, (byte) data);
103+
return;
104+
}
105+
if (vector instanceof UInt1Vector) {
106+
((UInt1Vector) vector).set(0, (byte) data);
107+
return;
108+
}
109+
if (vector instanceof UInt2Vector) {
110+
((UInt2Vector) vector).set(0, (short) data);
111+
return;
112+
}
113+
if (vector instanceof UInt4Vector) {
114+
((UInt4Vector) vector).set(0, (int) data);
115+
return;
116+
}
117+
if (vector instanceof UInt8Vector) {
118+
((UInt8Vector) vector).set(0, (long) data);
119+
return;
120+
}
121+
if (vector instanceof VarBinaryVector) {
122+
((VarBinaryVector) vector).set(0, (byte[]) data);
123+
return;
124+
}
125+
if (vector instanceof VarCharVector) {
126+
((VarCharVector) vector).set(0, (Text) data);
127+
return;
128+
}
129+
if (vector instanceof UUIDVector) {
130+
((UUIDVector) vector).set(0, (java.util.UUID) data);
131+
return;
132+
}
133+
if (vector instanceof JSONVector) {
134+
((JSONVector) vector).setSafe(0, (byte[]) data);
135+
return;
136+
}
137+
138+
throw new IllegalArgumentException("Unsupported vector type: " + vector.getClass());
139+
}
140+
31141
public static ByteString encode(Table table) throws IOException {
32142
try (BufferAllocator bufferAllocator = new RootAllocator()) {
33143
Schema schema = toArrowSchema(table);
34-
VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(schema, bufferAllocator);
35-
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
36-
try (ArrowStreamWriter writer =
37-
new ArrowStreamWriter(schemaRoot, null, Channels.newChannel(out))) {
38-
writer.start();
39-
writer.end();
40-
return ByteString.copyFrom(out.toByteArray());
144+
try (VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(schema, bufferAllocator)) {
145+
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
146+
try (ArrowStreamWriter writer =
147+
new ArrowStreamWriter(schemaRoot, null, Channels.newChannel(out))) {
148+
writer.start();
149+
writer.end();
150+
return ByteString.copyFrom(out.toByteArray());
151+
}
41152
}
42153
}
43154
}
@@ -57,7 +168,18 @@ public static Schema toArrowSchema(Table table) {
57168
Field[] fields = new Field[columns.size()];
58169
for (int i = 0; i < columns.size(); i++) {
59170
Column column = columns.get(i);
60-
Field field = Field.nullable(column.getName(), column.getType());
171+
Map<String, String> metadata = new HashMap<>();
172+
metadata.put(CQ_EXTENSION_UNIQUE, column.isUnique() ? "true" : "false");
173+
metadata.put(CQ_EXTENSION_PRIMARY_KEY, column.isPrimaryKey() ? "true" : "false");
174+
if (column.getConstraintName() != null) {
175+
metadata.put(CQ_EXTENSION_CONSTRAINT_NAME, column.getConstraintName());
176+
}
177+
metadata.put(CQ_EXTENSION_INCREMENTAL, column.isIncrementalKey() ? "true" : "false");
178+
Field field =
179+
new Field(
180+
column.getName(),
181+
new FieldType(!column.isNotNull(), column.getType(), null, metadata),
182+
null);
61183
fields[i] = field;
62184
}
63185
Map<String, String> metadata = new HashMap<>();
@@ -77,7 +199,20 @@ public static Schema toArrowSchema(Table table) {
77199
public static Table fromArrowSchema(Schema schema) {
78200
List<Column> columns = new ArrayList<>();
79201
for (Field field : schema.getFields()) {
80-
columns.add(Column.builder().name(field.getName()).type(field.getType()).build());
202+
boolean isUnique = field.getMetadata().get(CQ_EXTENSION_UNIQUE) == "true";
203+
boolean isPrimaryKey = field.getMetadata().get(CQ_EXTENSION_PRIMARY_KEY) == "true";
204+
String constraintName = field.getMetadata().get(CQ_EXTENSION_CONSTRAINT_NAME);
205+
boolean isIncrementalKey = field.getMetadata().get(CQ_EXTENSION_INCREMENTAL) == "true";
206+
207+
columns.add(
208+
Column.builder()
209+
.name(field.getName())
210+
.unique(isUnique)
211+
.primaryKey(isPrimaryKey)
212+
.incrementalKey(isIncrementalKey)
213+
.constraintName(constraintName)
214+
.type(field.getType())
215+
.build());
81216
}
82217

83218
Map<String, String> metaData = schema.getCustomMetadata();
@@ -99,4 +234,25 @@ public static Table fromArrowSchema(Schema schema) {
99234

100235
return tableBuilder.build();
101236
}
237+
238+
public static ByteString encode(Resource resource) throws IOException {
239+
try (BufferAllocator bufferAllocator = new RootAllocator()) {
240+
Table table = resource.getTable();
241+
Schema schema = toArrowSchema(table);
242+
try (VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(schema, bufferAllocator)) {
243+
for (int i = 0; i < table.getColumns().size(); i++) {
244+
setVectorData(schemaRoot.getVector(i), resource.getData().get(i).get());
245+
}
246+
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
247+
try (ArrowStreamWriter writer =
248+
new ArrowStreamWriter(schemaRoot, null, Channels.newChannel(out))) {
249+
writer.start();
250+
writer.writeBatch();
251+
writer.end();
252+
return ByteString.copyFrom(out.toByteArray());
253+
}
254+
}
255+
}
256+
}
257+
}
102258
}

lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public void init(
5353
responseObserver.onNext(io.cloudquery.plugin.v3.Init.Response.newBuilder().build());
5454
responseObserver.onCompleted();
5555
} catch (Exception e) {
56+
plugin.getLogger().error("Error initializing plugin", e);
5657
responseObserver.onError(e);
5758
}
5859
}
@@ -77,6 +78,7 @@ public void getTables(
7778
.build());
7879
responseObserver.onCompleted();
7980
} catch (Exception e) {
81+
plugin.getLogger().error("Error getting tables", e);
8082
responseObserver.onError(e);
8183
}
8284
}
@@ -95,6 +97,7 @@ public void sync(
9597
request.getBackend().getTableName(), request.getBackend().getConnection()),
9698
responseObserver);
9799
} catch (Exception e) {
100+
plugin.getLogger().error("Error syncing tables", e);
98101
responseObserver.onError(e);
99102
}
100103
}

lib/src/main/java/io/cloudquery/memdb/MemDB.java

Lines changed: 44 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -8,44 +8,56 @@
88
import io.cloudquery.plugin.TableOutputStream;
99
import io.cloudquery.scheduler.Scheduler;
1010
import io.cloudquery.schema.ClientMeta;
11-
import io.cloudquery.schema.Column;
1211
import io.cloudquery.schema.Resource;
1312
import io.cloudquery.schema.SchemaException;
1413
import io.cloudquery.schema.Table;
1514
import io.cloudquery.schema.TableResolver;
15+
import io.cloudquery.transformers.Tables;
16+
import io.cloudquery.transformers.TransformWithClass;
1617
import io.grpc.stub.StreamObserver;
1718
import java.util.List;
18-
import org.apache.arrow.vector.types.pojo.ArrowType.Utf8;
19+
import java.util.UUID;
1920

2021
public class MemDB extends Plugin {
21-
private List<Table> allTables =
22-
List.of(
23-
Table.builder()
24-
.name("table1")
25-
.resolver(
26-
new TableResolver() {
27-
@Override
28-
public void resolve(
29-
ClientMeta clientMeta, Resource parent, TableOutputStream stream) {
30-
stream.write(Table1Data.builder().name("name1").build());
31-
stream.write(Table1Data.builder().name("name2").build());
32-
}
33-
})
34-
.columns(List.of(Column.builder().name("name").type(new Utf8()).build()))
35-
.build(),
36-
Table.builder()
37-
.name("table2")
38-
.resolver(
39-
new TableResolver() {
40-
@Override
41-
public void resolve(
42-
ClientMeta clientMeta, Resource parent, TableOutputStream stream) {
43-
stream.write(Table2Data.builder().id("id1").build());
44-
stream.write(Table2Data.builder().id("id2").build());
45-
}
46-
})
47-
.columns(List.of(Column.builder().name("id").type(new Utf8()).build()))
48-
.build());
22+
private static List<Table> getTables() {
23+
return List.of(
24+
Table.builder()
25+
.name("table1")
26+
.resolver(
27+
new TableResolver() {
28+
@Override
29+
public void resolve(
30+
ClientMeta clientMeta, Resource parent, TableOutputStream stream) {
31+
stream.write(
32+
Table1Data.builder()
33+
.id(UUID.fromString("46b2b6e6-8f3e-4340-a721-4aa0786b1cc0"))
34+
.name("name1")
35+
.build());
36+
stream.write(
37+
Table1Data.builder()
38+
.id(UUID.fromString("e89f95df-a389-4f1b-9ba6-1fab565523d6"))
39+
.name("name2")
40+
.build());
41+
}
42+
})
43+
.transform(TransformWithClass.builder(Table1Data.class).pkField("id").build())
44+
.build(),
45+
Table.builder()
46+
.name("table2")
47+
.resolver(
48+
new TableResolver() {
49+
@Override
50+
public void resolve(
51+
ClientMeta clientMeta, Resource parent, TableOutputStream stream) {
52+
stream.write(Table2Data.builder().id(1).name("name1").build());
53+
stream.write(Table2Data.builder().id(2).name("name2").build());
54+
}
55+
})
56+
.transform(TransformWithClass.builder(Table2Data.class).pkField("id").build())
57+
.build());
58+
}
59+
60+
private List<Table> allTables;
4961

5062
private Spec spec;
5163

@@ -107,10 +119,9 @@ public void close() {
107119

108120
@Override
109121
public ClientMeta newClient(String spec, NewClientOptions options) throws Exception {
110-
if (options.isNoConnection()) {
111-
return null;
112-
}
113122
this.spec = Spec.fromJSON(spec);
123+
this.allTables = getTables();
124+
Tables.transformTables(allTables);
114125
return new MemDBClient();
115126
}
116127
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package io.cloudquery.memdb;
22

3+
import java.util.UUID;
34
import lombok.Builder;
45
import lombok.Getter;
56

67
@Builder
78
@Getter
89
public class Table1Data {
10+
private UUID id;
911
private String name;
1012
}

lib/src/main/java/io/cloudquery/memdb/Table2Data.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,6 @@
66
@Builder
77
@Getter
88
public class Table2Data {
9-
private String id;
9+
private int id;
10+
private String name;
1011
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package io.cloudquery.plugin;
2+
3+
import io.grpc.stub.StreamObserver;
4+
5+
public interface SyncStream extends StreamObserver<io.cloudquery.plugin.v3.Sync.Response> {}

0 commit comments

Comments
 (0)