diff --git a/lib/src/main/java/io/cloudquery/memdb/MemDB.java b/lib/src/main/java/io/cloudquery/memdb/MemDB.java
index 8d90932..cd53cf2 100644
--- a/lib/src/main/java/io/cloudquery/memdb/MemDB.java
+++ b/lib/src/main/java/io/cloudquery/memdb/MemDB.java
@@ -54,6 +54,27 @@ public void resolve(
}
})
.transform(TransformWithClass.builder(Table2Data.class).pkField("id").build())
+ .relations(
+ List.of(
+ Table.builder()
+ .name("table2_child")
+ .resolver(
+ new TableResolver() {
+
+ @Override
+ public void resolve(
+ ClientMeta clientMeta,
+ Resource parent,
+ TableOutputStream stream) {
+ String parentName = parent.get("name").toString();
+ stream.write(
+ Table2ChildData.builder().name(parentName + "_name1").build());
+ stream.write(
+ Table2ChildData.builder().name(parentName + "_name2").build());
+ }
+ })
+ .transform(TransformWithClass.builder(Table2ChildData.class).build())
+ .build()))
.build());
}
diff --git a/lib/src/main/java/io/cloudquery/memdb/Spec.java b/lib/src/main/java/io/cloudquery/memdb/Spec.java
index 316bcd2..359360e 100644
--- a/lib/src/main/java/io/cloudquery/memdb/Spec.java
+++ b/lib/src/main/java/io/cloudquery/memdb/Spec.java
@@ -13,7 +13,7 @@ public static Spec fromJSON(String json) throws JsonMappingException, JsonProces
ObjectMapper objectMapper = new ObjectMapper();
Spec spec = objectMapper.readValue(json, Spec.class);
if (spec.getConcurrency() == 0) {
- spec.setConcurrency(10000);
+ spec.setConcurrency(100);
}
return spec;
}
diff --git a/lib/src/main/java/io/cloudquery/memdb/Table2ChildData.java b/lib/src/main/java/io/cloudquery/memdb/Table2ChildData.java
new file mode 100644
index 0000000..4597b67
--- /dev/null
+++ b/lib/src/main/java/io/cloudquery/memdb/Table2ChildData.java
@@ -0,0 +1,10 @@
+package io.cloudquery.memdb;
+
+import lombok.Builder;
+import lombok.Getter;
+
+@Builder
+@Getter
+public class Table2ChildData {
+ private String name;
+}
diff --git a/lib/src/main/java/io/cloudquery/scheduler/OnResourceResolved.java b/lib/src/main/java/io/cloudquery/scheduler/OnResourceResolved.java
deleted file mode 100644
index 288c001..0000000
--- a/lib/src/main/java/io/cloudquery/scheduler/OnResourceResolved.java
+++ /dev/null
@@ -1,3 +0,0 @@
-package io.cloudquery.scheduler;
-
-public class OnResourceResolved {}
diff --git a/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java b/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java
index 560ee02..ba734f7 100644
--- a/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java
+++ b/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java
@@ -1,11 +1,16 @@
package io.cloudquery.scheduler;
+import com.google.protobuf.ByteString;
import io.cloudquery.helper.ArrowHelper;
import io.cloudquery.plugin.v3.Sync;
import io.cloudquery.schema.ClientMeta;
+import io.cloudquery.schema.Resource;
import io.cloudquery.schema.Table;
import io.grpc.stub.StreamObserver;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import lombok.Builder;
import lombok.NonNull;
import org.apache.logging.log4j.Logger;
@@ -20,8 +25,58 @@ public class Scheduler {
private int concurrency;
private boolean deterministicCqId;
- public void sync() {
+ private void resolveTables(List
tables, Resource parent, int concurrency)
+ throws InterruptedException {
+ if (tables == null || tables.isEmpty()) {
+ return;
+ }
+ ExecutorService executor = Executors.newFixedThreadPool(Math.min(tables.size(), concurrency));
for (Table table : tables) {
+ final int nextLevelConcurrency = Math.max(1, concurrency / 2);
+ executor.submit(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ String tableMessage =
+ parent != null
+ ? "table " + table.getName() + " of parent" + parent.getTable().getName()
+ : "table " + table.getName();
+
+ logger.info("resolving {}", tableMessage);
+ if (!table.getResolver().isPresent()) {
+ logger.error("no resolver for {}", tableMessage);
+ return;
+ }
+
+ SchedulerTableOutputStream schedulerTableOutputStream =
+ new SchedulerTableOutputStream(table, parent, client, logger);
+ table.getResolver().get().resolve(client, parent, schedulerTableOutputStream);
+
+ for (Resource resource : schedulerTableOutputStream.getResources()) {
+ ByteString record = resource.encode();
+ Sync.MessageInsert insert =
+ Sync.MessageInsert.newBuilder().setRecord(record).build();
+ Sync.Response response = Sync.Response.newBuilder().setInsert(insert).build();
+ syncStream.onNext(response);
+ resolveTables(table.getRelations(), resource, nextLevelConcurrency);
+ }
+
+ logger.info("resolved {}", tableMessage);
+ } catch (Exception e) {
+ logger.error("Failed to resolve table: {}", table.getName(), e);
+ syncStream.onError(e);
+ return;
+ }
+ }
+ });
+ }
+ executor.shutdown();
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
+ }
+
+ public void sync() {
+ for (Table table : Table.flattenTables(tables)) {
try {
logger.info("sending migrate message for table: {}", table.getName());
Sync.MessageMigrateTable migrateTable =
@@ -34,26 +89,12 @@ public void sync() {
}
}
- for (Table table : tables) {
- try {
- logger.info("resolving table: {}", table.getName());
- if (!table.getResolver().isPresent()) {
- logger.error("no resolver for table: {}", table.getName());
- continue;
- }
- SchedulerTableOutputStream schedulerTableOutputStream =
- SchedulerTableOutputStream.builder()
- .table(table)
- .client(client)
- .logger(logger)
- .syncStream(syncStream)
- .build();
- table.getResolver().get().resolve(client, null, schedulerTableOutputStream);
- logger.info("resolved table: {}", table.getName());
- } catch (Exception e) {
- syncStream.onError(e);
- return;
- }
+ try {
+ resolveTables(this.tables, null, this.concurrency);
+ } catch (InterruptedException e) {
+ logger.error("Failed to resolve tables", e);
+ syncStream.onError(e);
+ return;
}
syncStream.onCompleted();
diff --git a/lib/src/main/java/io/cloudquery/scheduler/SchedulerTableOutputStream.java b/lib/src/main/java/io/cloudquery/scheduler/SchedulerTableOutputStream.java
index f25ebad..0ac4f72 100644
--- a/lib/src/main/java/io/cloudquery/scheduler/SchedulerTableOutputStream.java
+++ b/lib/src/main/java/io/cloudquery/scheduler/SchedulerTableOutputStream.java
@@ -1,52 +1,71 @@
package io.cloudquery.scheduler;
-import com.google.protobuf.ByteString;
import io.cloudquery.plugin.TableOutputStream;
-import io.cloudquery.plugin.v3.Sync;
import io.cloudquery.schema.ClientMeta;
import io.cloudquery.schema.Column;
import io.cloudquery.schema.Resource;
import io.cloudquery.schema.Table;
import io.cloudquery.transformers.TransformerException;
-import io.grpc.stub.StreamObserver;
-import lombok.Builder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import lombok.NonNull;
import org.apache.logging.log4j.Logger;
-@Builder
public class SchedulerTableOutputStream implements TableOutputStream {
+ private static final int RESOURCE_RESOLVE_CONCURRENCY = 100;
+ private static final int RESOURCE_RESOLVE_TIMEOUT_MINUTES = 10;
@NonNull private final Table table;
private final Resource parent;
@NonNull private final ClientMeta client;
@NonNull private final Logger logger;
- @NonNull private final StreamObserver syncStream;
+
+ private List resources = new ArrayList();
+
+ private ExecutorService executor;
+
+ public SchedulerTableOutputStream(
+ @NonNull Table table, Resource parent, @NonNull ClientMeta client, @NonNull Logger logger) {
+ this.table = table;
+ this.parent = parent;
+ this.client = client;
+ this.logger = logger;
+ this.executor = Executors.newFixedThreadPool(RESOURCE_RESOLVE_CONCURRENCY);
+ }
@Override
public void write(Object data) {
Resource resource = Resource.builder().table(table).parent(parent).item(data).build();
for (Column column : table.getColumns()) {
- try {
- logger.info("resolving column: {}", column.getName());
- if (column.getResolver() == null) {
- logger.error("no resolver for column: {}", column.getName());
- continue;
- }
- column.getResolver().resolve(client, resource, column);
- logger.info("resolved column: {}", column.getName());
- } catch (TransformerException e) {
- logger.error("Failed to resolve column: {}", column.getName(), e);
- return;
- }
+ executor.submit(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ logger.debug("resolving column: {}", column.getName());
+ if (column.getResolver() == null) {
+ logger.error("no resolver for column: {}", column.getName());
+ return;
+ }
+ column.getResolver().resolve(client, resource, column);
+ logger.debug("resolved column: {}", column.getName());
+ return;
+ } catch (TransformerException e) {
+ logger.error("Failed to resolve column: {}", column.getName(), e);
+ return;
+ }
+ }
+ });
}
+ resources.add(resource);
+ }
- try {
- ByteString record = resource.encode();
- Sync.MessageInsert insert = Sync.MessageInsert.newBuilder().setRecord(record).build();
- Sync.Response response = Sync.Response.newBuilder().setInsert(insert).build();
- syncStream.onNext(response);
- } catch (Exception e) {
- logger.error("Failed to encode resource: {}", resource, e);
- return;
- }
+ public List getResources() throws InterruptedException {
+ // TODO: Optimize this to not wait for all resources to complete
+ executor.shutdown();
+ executor.awaitTermination(RESOURCE_RESOLVE_TIMEOUT_MINUTES, TimeUnit.MINUTES);
+ return this.resources;
}
}
diff --git a/lib/src/main/java/io/cloudquery/schema/Table.java b/lib/src/main/java/io/cloudquery/schema/Table.java
index d5ca9e6..6700784 100644
--- a/lib/src/main/java/io/cloudquery/schema/Table.java
+++ b/lib/src/main/java/io/cloudquery/schema/Table.java
@@ -9,7 +9,7 @@
import io.cloudquery.transformers.TransformerException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -28,7 +28,7 @@ public interface Transform {
}
public static List flattenTables(List tables) {
- Map flattenMap = new HashMap<>();
+ Map flattenMap = new LinkedHashMap<>();
for (Table table : tables) {
Table newTable = table.toBuilder().relations(Collections.emptyList()).build();
flattenMap.put(newTable.name, newTable);