Skip to content

Commit 9d9f37d

Browse files
committed
feat(sync): Initial insert message support
1 parent 8c9872a commit 9d9f37d

17 files changed

+238
-18
lines changed

lib/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ dependencies {
4545
implementation 'org.apache.logging.log4j:log4j-api:2.20.0'
4646
implementation 'org.apache.logging.log4j:log4j-core:2.20.0'
4747

48+
implementation 'com.google.code.gson:gson:2.10.1'
49+
4850
testImplementation(platform('org.junit:junit-bom:5.10.0'))
4951
testImplementation('org.junit.jupiter:junit-jupiter:5.10.0')
5052
testImplementation('org.junit.jupiter:junit-jupiter-api:5.10.0')

lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.google.protobuf.ByteString;
44
import io.cloudquery.plugin.BackendOptions;
5+
import io.cloudquery.plugin.NewClientOptions;
56
import io.cloudquery.plugin.Plugin;
67
import io.cloudquery.plugin.v3.PluginGrpc.PluginImplBase;
78
import io.cloudquery.plugin.v3.Write;
@@ -41,7 +42,9 @@ public void getVersion(
4142
public void init(
4243
io.cloudquery.plugin.v3.Init.Request request,
4344
StreamObserver<io.cloudquery.plugin.v3.Init.Response> responseObserver) {
44-
plugin.init();
45+
plugin.init(
46+
request.getSpec().toStringUtf8(),
47+
NewClientOptions.builder().noConnection(request.getNoConnection()).build());
4548
responseObserver.onNext(io.cloudquery.plugin.v3.Init.Response.newBuilder().build());
4649
responseObserver.onCompleted();
4750
}

lib/src/main/java/io/cloudquery/memdb/MemDB.java

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
11
package io.cloudquery.memdb;
22

33
import io.cloudquery.plugin.BackendOptions;
4+
import io.cloudquery.plugin.ClientNotInitializedException;
5+
import io.cloudquery.plugin.NewClientOptions;
46
import io.cloudquery.plugin.Plugin;
7+
import io.cloudquery.plugin.TableOutputStream;
58
import io.cloudquery.scheduler.Scheduler;
9+
import io.cloudquery.schema.ClientMeta;
610
import io.cloudquery.schema.Column;
11+
import io.cloudquery.schema.Resource;
712
import io.cloudquery.schema.SchemaException;
813
import io.cloudquery.schema.Table;
14+
import io.cloudquery.schema.TableResolver;
915
import io.grpc.stub.StreamObserver;
1016
import java.util.List;
1117
import org.apache.arrow.vector.types.pojo.ArrowType.Utf8;
@@ -15,26 +21,44 @@ public class MemDB extends Plugin {
1521
List.of(
1622
Table.builder()
1723
.name("table1")
18-
.columns(List.of(Column.builder().name("name1").type(new Utf8()).build()))
24+
.resolver(
25+
new TableResolver() {
26+
@Override
27+
public void resolve(
28+
ClientMeta clientMeta, Resource parent, TableOutputStream stream) {
29+
stream.write(Table1Data.builder().name("name1").build());
30+
stream.write(Table1Data.builder().name("name2").build());
31+
}
32+
})
33+
.columns(List.of(Column.builder().name("name").type(new Utf8()).build()))
1934
.build(),
2035
Table.builder()
2136
.name("table2")
22-
.columns(List.of(Column.builder().name("name1").type(new Utf8()).build()))
37+
.resolver(
38+
new TableResolver() {
39+
@Override
40+
public void resolve(
41+
ClientMeta clientMeta, Resource parent, TableOutputStream stream) {
42+
stream.write(Table2Data.builder().id("id1").build());
43+
stream.write(Table2Data.builder().id("id2").build());
44+
}
45+
})
46+
.columns(List.of(Column.builder().name("id").type(new Utf8()).build()))
2347
.build());
2448

49+
private Spec spec;
50+
2551
public MemDB() {
2652
super("memdb", "0.0.1");
2753
}
2854

29-
@Override
30-
public void init() {
31-
// do nothing
32-
}
33-
3455
@Override
3556
public List<Table> tables(
3657
List<String> includeList, List<String> skipList, boolean skipDependentTables)
37-
throws SchemaException {
58+
throws SchemaException, ClientNotInitializedException {
59+
if (this.client == null) {
60+
throw new ClientNotInitializedException();
61+
}
3862
return Table.filterDFS(allTables, includeList, skipList, skipDependentTables);
3963
}
4064

@@ -46,13 +70,19 @@ public void sync(
4670
boolean deterministicCqId,
4771
BackendOptions backendOptions,
4872
StreamObserver<io.cloudquery.plugin.v3.Sync.Response> syncStream)
49-
throws SchemaException {
73+
throws SchemaException, ClientNotInitializedException {
74+
if (this.client == null) {
75+
throw new ClientNotInitializedException();
76+
}
77+
5078
List<Table> filtered = Table.filterDFS(allTables, includeList, skipList, skipDependentTables);
5179
Scheduler.builder()
80+
.client(client)
5281
.tables(filtered)
5382
.syncStream(syncStream)
5483
.deterministicCqId(deterministicCqId)
5584
.logger(getLogger())
85+
.concurrency(this.spec.getConcurrency())
5686
.build()
5787
.sync();
5888
}
@@ -69,6 +99,17 @@ public void write() {
6999

70100
@Override
71101
public void close() {
72-
// do nothing
102+
if (this.client != null) {
103+
((MemDBClient) this.client).close();
104+
}
105+
}
106+
107+
@Override
108+
public ClientMeta newClient(String spec, NewClientOptions options) {
109+
if (options.isNoConnection()) {
110+
return null;
111+
}
112+
this.spec = Spec.fromJSON(spec);
113+
return new MemDBClient();
73114
}
74115
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.cloudquery.memdb;
2+
3+
import io.cloudquery.schema.ClientMeta;
4+
5+
public class MemDBClient implements ClientMeta {
6+
private static final String id = "memdb";
7+
8+
public MemDBClient() {}
9+
10+
@Override
11+
public String getId() {
12+
return id;
13+
}
14+
15+
public void close() {
16+
// do nothing
17+
}
18+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.cloudquery.memdb;
2+
3+
import com.google.gson.Gson;
4+
import lombok.Getter;
5+
import lombok.Setter;
6+
7+
@Getter
8+
@Setter
9+
public class Spec {
10+
public static Spec fromJSON(String json) {
11+
Spec spec = new Gson().fromJson(json, Spec.class);
12+
if (spec.getConcurrency() == 0) {
13+
spec.setConcurrency(10000);
14+
}
15+
return spec;
16+
}
17+
18+
private int concurrency;
19+
20+
public Spec() {}
21+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.cloudquery.memdb;
2+
3+
import lombok.Builder;
4+
import lombok.Getter;
5+
6+
@Builder
7+
@Getter
8+
public class Table1Data {
9+
private String name;
10+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.cloudquery.memdb;
2+
3+
import lombok.Builder;
4+
import lombok.Getter;
5+
6+
@Builder
7+
@Getter
8+
public class Table2Data {
9+
private String id;
10+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package io.cloudquery.plugin;
2+
3+
public class ClientNotInitializedException extends Exception {
4+
5+
public ClientNotInitializedException() {}
6+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.cloudquery.plugin;
2+
3+
import lombok.Builder;
4+
import lombok.Getter;
5+
6+
@Builder
7+
@Getter
8+
public class NewClientOptions {
9+
private final boolean noConnection;
10+
}

lib/src/main/java/io/cloudquery/plugin/Plugin.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.cloudquery.plugin;
22

3+
import io.cloudquery.schema.ClientMeta;
34
import io.cloudquery.schema.SchemaException;
45
import io.cloudquery.schema.Table;
56
import io.grpc.stub.StreamObserver;
@@ -16,12 +17,17 @@ public abstract class Plugin {
1617
@NonNull protected final String name;
1718
@NonNull protected final String version;
1819
@Setter protected Logger logger;
20+
protected ClientMeta client;
1921

20-
public abstract void init();
22+
public void init(String spec, NewClientOptions options) {
23+
client = newClient(spec, options);
24+
}
25+
26+
public abstract ClientMeta newClient(String spec, NewClientOptions options);
2127

2228
public abstract List<Table> tables(
2329
List<String> includeList, List<String> skipList, boolean skipDependentTables)
24-
throws SchemaException;
30+
throws SchemaException, ClientNotInitializedException;
2531

2632
public abstract void sync(
2733
List<String> includeList,
@@ -30,7 +36,7 @@ public abstract void sync(
3036
boolean deterministicCqId,
3137
BackendOptions backendOptions,
3238
StreamObserver<io.cloudquery.plugin.v3.Sync.Response> syncStream)
33-
throws SchemaException;
39+
throws SchemaException, ClientNotInitializedException;
3440

3541
public abstract void read();
3642

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package io.cloudquery.plugin;
2+
3+
public interface TableOutputStream {
4+
public void write(Object data);
5+
}

lib/src/main/java/io/cloudquery/scheduler/Scheduler.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.cloudquery.scheduler;
22

33
import io.cloudquery.plugin.v3.Sync;
4+
import io.cloudquery.schema.ClientMeta;
45
import io.cloudquery.schema.Table;
56
import io.grpc.stub.StreamObserver;
67
import java.io.IOException;
@@ -14,7 +15,9 @@ public class Scheduler {
1415
@NonNull private final List<Table> tables;
1516
@NonNull private final StreamObserver<io.cloudquery.plugin.v3.Sync.Response> syncStream;
1617
@NonNull private final Logger logger;
18+
@NonNull private final ClientMeta client;
1719

20+
private int concurrency;
1821
private boolean deterministicCqId;
1922

2023
public void sync() {
@@ -30,6 +33,25 @@ public void sync() {
3033
return;
3134
}
3235
}
36+
37+
for (Table table : tables) {
38+
try {
39+
logger.info("resolving table: {}", table.getName());
40+
SchedulerTableOutputStream schedulerTableOutputStream =
41+
SchedulerTableOutputStream.builder()
42+
.table(table)
43+
.client(client)
44+
.logger(logger)
45+
.syncStream(syncStream)
46+
.build();
47+
table.getResolver().resolve(client, null, schedulerTableOutputStream);
48+
logger.info("resolved table: {}", table.getName());
49+
} catch (Exception e) {
50+
syncStream.onError(e);
51+
return;
52+
}
53+
}
54+
3355
syncStream.onCompleted();
3456
}
3557
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package io.cloudquery.scheduler;
2+
3+
import io.cloudquery.plugin.TableOutputStream;
4+
import io.cloudquery.plugin.v3.Sync;
5+
import io.cloudquery.schema.ClientMeta;
6+
import io.cloudquery.schema.Column;
7+
import io.cloudquery.schema.Resource;
8+
import io.cloudquery.schema.Table;
9+
import io.cloudquery.transformers.TransformerException;
10+
import io.grpc.stub.StreamObserver;
11+
import java.io.IOException;
12+
import lombok.Builder;
13+
import lombok.NonNull;
14+
import org.apache.logging.log4j.Logger;
15+
16+
@Builder
17+
public class SchedulerTableOutputStream implements TableOutputStream {
18+
@NonNull private final Table table;
19+
private final Resource parent;
20+
@NonNull private final ClientMeta client;
21+
@NonNull private final Logger logger;
22+
@NonNull private final StreamObserver<io.cloudquery.plugin.v3.Sync.Response> syncStream;
23+
24+
@Override
25+
public void write(Object data) {
26+
Resource resource = Resource.builder().table(table).parent(parent).item(data).build();
27+
for (Column column : table.getColumns()) {
28+
try {
29+
logger.info("resolving column: {}", column.getName());
30+
if (column.getResolver() == null) {
31+
// TODO: Fall back to path resolver
32+
continue;
33+
}
34+
column.getResolver().resolve(client, resource, column);
35+
logger.info("resolved column: {}", column.getName());
36+
} catch (TransformerException e) {
37+
logger.error("Failed to resolve column: {}", column.getName(), e);
38+
return;
39+
}
40+
}
41+
42+
try {
43+
Sync.MessageInsert insert =
44+
Sync.MessageInsert.newBuilder().setRecord(resource.encode()).build();
45+
Sync.Response response = Sync.Response.newBuilder().setInsert(insert).build();
46+
syncStream.onNext(response);
47+
} catch (IOException e) {
48+
logger.error("Failed to encode resource: {}", resource, e);
49+
return;
50+
}
51+
}
52+
}
Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.cloudquery.schema;
22

3-
import lombok.Builder;
4-
5-
@Builder
6-
public class ClientMeta {}
3+
public interface ClientMeta {
4+
String getId();
5+
}

lib/src/main/java/io/cloudquery/schema/Resource.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package io.cloudquery.schema;
22

3+
import com.google.protobuf.ByteString;
34
import io.cloudquery.scalar.Scalar;
45
import io.cloudquery.scalar.ValidationException;
6+
import java.io.IOException;
57
import java.util.ArrayList;
68
import java.util.List;
79
import lombok.Builder;
@@ -37,4 +39,9 @@ public Scalar<?> get(String columnName) {
3739
int index = table.indexOfColumn(columnName);
3840
return this.data.get(index);
3941
}
42+
43+
public ByteString encode() throws IOException {
44+
// TODO: Encode data and not only schema
45+
return table.encode();
46+
}
4047
}

lib/src/main/java/io/cloudquery/schema/Table.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ public static int maxDepth(List<Table> tables) {
133133
}
134134

135135
@NonNull private String name;
136+
private TableResolver resolver;
136137
private String title;
137138
private String description;
138139
@Setter private Table parent;

0 commit comments

Comments
 (0)