diff --git a/lib/src/main/java/io/cloudquery/memdb/MemDB.java b/lib/src/main/java/io/cloudquery/memdb/MemDB.java index 8d90932..cd53cf2 100644 --- a/lib/src/main/java/io/cloudquery/memdb/MemDB.java +++ b/lib/src/main/java/io/cloudquery/memdb/MemDB.java @@ -54,6 +54,27 @@ public void resolve( } }) .transform(TransformWithClass.builder(Table2Data.class).pkField("id").build()) + .relations( + List.of( + Table.builder() + .name("table2_child") + .resolver( + new TableResolver() { + + @Override + public void resolve( + ClientMeta clientMeta, + Resource parent, + TableOutputStream stream) { + String parentName = parent.get("name").toString(); + stream.write( + Table2ChildData.builder().name(parentName + "_name1").build()); + stream.write( + Table2ChildData.builder().name(parentName + "_name2").build()); + } + }) + .transform(TransformWithClass.builder(Table2ChildData.class).build()) + .build())) .build()); } diff --git a/lib/src/main/java/io/cloudquery/memdb/Spec.java b/lib/src/main/java/io/cloudquery/memdb/Spec.java index 316bcd2..359360e 100644 --- a/lib/src/main/java/io/cloudquery/memdb/Spec.java +++ b/lib/src/main/java/io/cloudquery/memdb/Spec.java @@ -13,7 +13,7 @@ public static Spec fromJSON(String json) throws JsonMappingException, JsonProces ObjectMapper objectMapper = new ObjectMapper(); Spec spec = objectMapper.readValue(json, Spec.class); if (spec.getConcurrency() == 0) { - spec.setConcurrency(10000); + spec.setConcurrency(100); } return spec; } diff --git a/lib/src/main/java/io/cloudquery/memdb/Table2ChildData.java b/lib/src/main/java/io/cloudquery/memdb/Table2ChildData.java new file mode 100644 index 0000000..4597b67 --- /dev/null +++ b/lib/src/main/java/io/cloudquery/memdb/Table2ChildData.java @@ -0,0 +1,10 @@ +package io.cloudquery.memdb; + +import lombok.Builder; +import lombok.Getter; + +@Builder +@Getter +public class Table2ChildData { + private String name; +} diff --git a/lib/src/main/java/io/cloudquery/scheduler/OnResourceResolved.java b/lib/src/main/java/io/cloudquery/scheduler/OnResourceResolved.java deleted file mode 100644 index 288c001..0000000 --- a/lib/src/main/java/io/cloudquery/scheduler/OnResourceResolved.java +++ /dev/null @@ -1,3 +0,0 @@ -package io.cloudquery.scheduler; - -public class OnResourceResolved {} diff --git a/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java b/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java index 560ee02..ba734f7 100644 --- a/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java +++ b/lib/src/main/java/io/cloudquery/scheduler/Scheduler.java @@ -1,11 +1,16 @@ package io.cloudquery.scheduler; +import com.google.protobuf.ByteString; import io.cloudquery.helper.ArrowHelper; import io.cloudquery.plugin.v3.Sync; import io.cloudquery.schema.ClientMeta; +import io.cloudquery.schema.Resource; import io.cloudquery.schema.Table; import io.grpc.stub.StreamObserver; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import lombok.Builder; import lombok.NonNull; import org.apache.logging.log4j.Logger; @@ -20,8 +25,58 @@ public class Scheduler { private int concurrency; private boolean deterministicCqId; - public void sync() { + private void resolveTables(List tables, Resource parent, int concurrency) + throws InterruptedException { + if (tables == null || tables.isEmpty()) { + return; + } + ExecutorService executor = Executors.newFixedThreadPool(Math.min(tables.size(), concurrency)); for (Table table : tables) { + final int nextLevelConcurrency = Math.max(1, concurrency / 2); + executor.submit( + new Runnable() { + @Override + public void run() { + try { + String tableMessage = + parent != null + ? "table " + table.getName() + " of parent" + parent.getTable().getName() + : "table " + table.getName(); + + logger.info("resolving {}", tableMessage); + if (!table.getResolver().isPresent()) { + logger.error("no resolver for {}", tableMessage); + return; + } + + SchedulerTableOutputStream schedulerTableOutputStream = + new SchedulerTableOutputStream(table, parent, client, logger); + table.getResolver().get().resolve(client, parent, schedulerTableOutputStream); + + for (Resource resource : schedulerTableOutputStream.getResources()) { + ByteString record = resource.encode(); + Sync.MessageInsert insert = + Sync.MessageInsert.newBuilder().setRecord(record).build(); + Sync.Response response = Sync.Response.newBuilder().setInsert(insert).build(); + syncStream.onNext(response); + resolveTables(table.getRelations(), resource, nextLevelConcurrency); + } + + logger.info("resolved {}", tableMessage); + } catch (Exception e) { + logger.error("Failed to resolve table: {}", table.getName(), e); + syncStream.onError(e); + return; + } + } + }); + } + executor.shutdown(); + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); + } + + public void sync() { + for (Table table : Table.flattenTables(tables)) { try { logger.info("sending migrate message for table: {}", table.getName()); Sync.MessageMigrateTable migrateTable = @@ -34,26 +89,12 @@ public void sync() { } } - for (Table table : tables) { - try { - logger.info("resolving table: {}", table.getName()); - if (!table.getResolver().isPresent()) { - logger.error("no resolver for table: {}", table.getName()); - continue; - } - SchedulerTableOutputStream schedulerTableOutputStream = - SchedulerTableOutputStream.builder() - .table(table) - .client(client) - .logger(logger) - .syncStream(syncStream) - .build(); - table.getResolver().get().resolve(client, null, schedulerTableOutputStream); - logger.info("resolved table: {}", table.getName()); - } catch (Exception e) { - syncStream.onError(e); - return; - } + try { + resolveTables(this.tables, null, this.concurrency); + } catch (InterruptedException e) { + logger.error("Failed to resolve tables", e); + syncStream.onError(e); + return; } syncStream.onCompleted(); diff --git a/lib/src/main/java/io/cloudquery/scheduler/SchedulerTableOutputStream.java b/lib/src/main/java/io/cloudquery/scheduler/SchedulerTableOutputStream.java index f25ebad..0ac4f72 100644 --- a/lib/src/main/java/io/cloudquery/scheduler/SchedulerTableOutputStream.java +++ b/lib/src/main/java/io/cloudquery/scheduler/SchedulerTableOutputStream.java @@ -1,52 +1,71 @@ package io.cloudquery.scheduler; -import com.google.protobuf.ByteString; import io.cloudquery.plugin.TableOutputStream; -import io.cloudquery.plugin.v3.Sync; import io.cloudquery.schema.ClientMeta; import io.cloudquery.schema.Column; import io.cloudquery.schema.Resource; import io.cloudquery.schema.Table; import io.cloudquery.transformers.TransformerException; -import io.grpc.stub.StreamObserver; -import lombok.Builder; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import lombok.NonNull; import org.apache.logging.log4j.Logger; -@Builder public class SchedulerTableOutputStream implements TableOutputStream { + private static final int RESOURCE_RESOLVE_CONCURRENCY = 100; + private static final int RESOURCE_RESOLVE_TIMEOUT_MINUTES = 10; @NonNull private final Table table; private final Resource parent; @NonNull private final ClientMeta client; @NonNull private final Logger logger; - @NonNull private final StreamObserver syncStream; + + private List resources = new ArrayList(); + + private ExecutorService executor; + + public SchedulerTableOutputStream( + @NonNull Table table, Resource parent, @NonNull ClientMeta client, @NonNull Logger logger) { + this.table = table; + this.parent = parent; + this.client = client; + this.logger = logger; + this.executor = Executors.newFixedThreadPool(RESOURCE_RESOLVE_CONCURRENCY); + } @Override public void write(Object data) { Resource resource = Resource.builder().table(table).parent(parent).item(data).build(); for (Column column : table.getColumns()) { - try { - logger.info("resolving column: {}", column.getName()); - if (column.getResolver() == null) { - logger.error("no resolver for column: {}", column.getName()); - continue; - } - column.getResolver().resolve(client, resource, column); - logger.info("resolved column: {}", column.getName()); - } catch (TransformerException e) { - logger.error("Failed to resolve column: {}", column.getName(), e); - return; - } + executor.submit( + new Runnable() { + @Override + public void run() { + try { + logger.debug("resolving column: {}", column.getName()); + if (column.getResolver() == null) { + logger.error("no resolver for column: {}", column.getName()); + return; + } + column.getResolver().resolve(client, resource, column); + logger.debug("resolved column: {}", column.getName()); + return; + } catch (TransformerException e) { + logger.error("Failed to resolve column: {}", column.getName(), e); + return; + } + } + }); } + resources.add(resource); + } - try { - ByteString record = resource.encode(); - Sync.MessageInsert insert = Sync.MessageInsert.newBuilder().setRecord(record).build(); - Sync.Response response = Sync.Response.newBuilder().setInsert(insert).build(); - syncStream.onNext(response); - } catch (Exception e) { - logger.error("Failed to encode resource: {}", resource, e); - return; - } + public List getResources() throws InterruptedException { + // TODO: Optimize this to not wait for all resources to complete + executor.shutdown(); + executor.awaitTermination(RESOURCE_RESOLVE_TIMEOUT_MINUTES, TimeUnit.MINUTES); + return this.resources; } } diff --git a/lib/src/main/java/io/cloudquery/schema/Table.java b/lib/src/main/java/io/cloudquery/schema/Table.java index d5ca9e6..6700784 100644 --- a/lib/src/main/java/io/cloudquery/schema/Table.java +++ b/lib/src/main/java/io/cloudquery/schema/Table.java @@ -9,7 +9,7 @@ import io.cloudquery.transformers.TransformerException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -28,7 +28,7 @@ public interface Transform { } public static List
flattenTables(List
tables) { - Map flattenMap = new HashMap<>(); + Map flattenMap = new LinkedHashMap<>(); for (Table table : tables) { Table newTable = table.toBuilder().relations(Collections.emptyList()).build(); flattenMap.put(newTable.name, newTable);