diff --git a/lib/build.gradle b/lib/build.gradle index 9fb4ca4..075db72 100644 --- a/lib/build.gradle +++ b/lib/build.gradle @@ -4,6 +4,10 @@ plugins { id "maven-publish" } +ext { + javaMainClass = "io.cloudquery.MainClass" +} + group 'io.cloudquery' // x-release-please-start-version version = '0.0.1' @@ -34,6 +38,7 @@ dependencies { implementation "io.grpc:grpc-services:1.57.1" implementation "io.grpc:grpc-testing:1.57.1" implementation "io.cloudquery:plugin-pb-java:0.0.5" + implementation "org.apache.arrow:arrow-memory-core:12.0.1" implementation "org.apache.arrow:arrow-vector:12.0.1" implementation 'org.apache.logging.log4j:log4j-api:2.20.0' @@ -44,11 +49,12 @@ dependencies { testImplementation('org.junit.jupiter:junit-jupiter-api:5.10.0') testImplementation('org.mockito:mockito-core:5.4.0') testImplementation('org.mockito:mockito-junit-jupiter:5.4.0') - testImplementation('org.apache.arrow:arrow-memory-netty:12.0.1') testImplementation('nl.jqno.equalsverifier:equalsverifier:3.15') testRuntimeOnly('org.junit.jupiter:junit-jupiter-engine:5.10.0') testImplementation 'org.assertj:assertj-core:3.24.2' + + runtimeOnly "org.apache.arrow:arrow-memory-netty:12.0.1" } test { @@ -83,3 +89,11 @@ publishing { } } } + +task runMemDBServe(type: JavaExec) { + group = "Execution" + description = "Start the MemDB plugin server" + classpath = sourceSets.main.runtimeClasspath + main = javaMainClass + args = ["serve"] +} \ No newline at end of file diff --git a/lib/src/main/java/io/cloudquery/Main.java b/lib/src/main/java/io/cloudquery/MainClass.java similarity index 92% rename from lib/src/main/java/io/cloudquery/Main.java rename to lib/src/main/java/io/cloudquery/MainClass.java index c4dfbac..13a9e19 100644 --- a/lib/src/main/java/io/cloudquery/Main.java +++ b/lib/src/main/java/io/cloudquery/MainClass.java @@ -3,7 +3,7 @@ import io.cloudquery.memdb.MemDB; import io.cloudquery.server.PluginServe; -public class Main { +public class MainClass { public static void main(String[] args) { PluginServe serve = PluginServe.builder().plugin(new MemDB()).args(args).build(); int exitCode = serve.Serve(); diff --git a/lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java b/lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java index 78faa56..c7d800b 100644 --- a/lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java +++ b/lib/src/main/java/io/cloudquery/internal/servers/plugin/v3/PluginServer.java @@ -1,6 +1,22 @@ package io.cloudquery.internal.servers.plugin.v3; import io.cloudquery.plugin.v3.PluginGrpc.PluginImplBase; +import io.cloudquery.schema.Table; +import io.cloudquery.plugin.v3.Write; +import io.grpc.stub.StreamObserver; + +import java.io.ByteArrayOutputStream; +import java.nio.channels.Channels; +import java.util.ArrayList; +import java.util.List; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowStreamWriter; +import org.apache.arrow.vector.types.pojo.Schema; +import com.google.protobuf.ByteString; + import io.cloudquery.plugin.Plugin; public class PluginServer extends PluginImplBase { @@ -9,4 +25,100 @@ public class PluginServer extends PluginImplBase { public PluginServer(Plugin plugin) { this.plugin = plugin; } + + @Override + public void getName(io.cloudquery.plugin.v3.GetName.Request request, + StreamObserver responseObserver) { + responseObserver + .onNext(io.cloudquery.plugin.v3.GetName.Response.newBuilder().setName(plugin.getName()).build()); + responseObserver.onCompleted(); + } + + @Override + public void getVersion(io.cloudquery.plugin.v3.GetVersion.Request request, + StreamObserver responseObserver) { + responseObserver.onNext( + io.cloudquery.plugin.v3.GetVersion.Response.newBuilder().setVersion(plugin.getVersion()).build()); + responseObserver.onCompleted(); + } + + @Override + public void init(io.cloudquery.plugin.v3.Init.Request request, + StreamObserver responseObserver) { + plugin.init(); + responseObserver.onNext(io.cloudquery.plugin.v3.Init.Response.newBuilder().build()); + responseObserver.onCompleted(); + } + + @Override + public void getTables(io.cloudquery.plugin.v3.GetTables.Request request, + StreamObserver responseObserver) { + try { + List tables = plugin.tables(); + List byteStrings = new ArrayList<>(); + for (Table table : tables) { + try (BufferAllocator bufferAllocator = new RootAllocator()) { + Schema schema = table.toArrowSchema(); + VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(schema, bufferAllocator); + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + try (ArrowStreamWriter writer = new ArrowStreamWriter(schemaRoot, null, + Channels.newChannel(out))) { + writer.start(); + writer.end(); + byteStrings.add(ByteString.copyFrom(out.toByteArray())); + } + } + } + } + responseObserver + .onNext(io.cloudquery.plugin.v3.GetTables.Response.newBuilder().addAllTables(byteStrings).build()); + responseObserver.onCompleted(); + } catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void sync(io.cloudquery.plugin.v3.Sync.Request request, + StreamObserver responseObserver) { + plugin.sync(); + responseObserver.onNext(io.cloudquery.plugin.v3.Sync.Response.newBuilder().build()); + responseObserver.onCompleted(); + } + + @Override + public void read(io.cloudquery.plugin.v3.Read.Request request, + StreamObserver responseObserver) { + plugin.read(); + responseObserver.onNext(io.cloudquery.plugin.v3.Read.Response.newBuilder().build()); + responseObserver.onCompleted(); + } + + @Override + public StreamObserver write(StreamObserver responseObserver) { + plugin.write(); + return new StreamObserver<>() { + @Override + public void onNext(Write.Request request) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onCompleted() { + responseObserver.onNext(Write.Response.newBuilder().build()); + responseObserver.onCompleted(); + } + }; + } + + @Override + public void close(io.cloudquery.plugin.v3.Close.Request request, + StreamObserver responseObserver) { + plugin.close(); + responseObserver.onNext(io.cloudquery.plugin.v3.Close.Response.newBuilder().build()); + responseObserver.onCompleted(); + } } diff --git a/lib/src/main/java/io/cloudquery/memdb/MemDB.java b/lib/src/main/java/io/cloudquery/memdb/MemDB.java index a0c8f92..0cfdaeb 100644 --- a/lib/src/main/java/io/cloudquery/memdb/MemDB.java +++ b/lib/src/main/java/io/cloudquery/memdb/MemDB.java @@ -1,9 +1,53 @@ package io.cloudquery.memdb; +import java.util.List; + import io.cloudquery.plugin.Plugin; +import io.cloudquery.schema.Table; +import io.cloudquery.schema.Column; +import io.cloudquery.schema.SchemaException; + +import org.apache.arrow.vector.types.pojo.ArrowType.Utf8; public class MemDB extends Plugin { + private List
allTables = List.of( + Table.builder().name("table1").columns(List.of(Column.builder().name("name1").type(new Utf8()).build())) + .build(), + Table.builder().name("table2").columns(List.of(Column.builder().name("name1").type(new Utf8()).build())) + .build()); + public MemDB() { super("memdb", "0.0.1"); } + + @Override + public void init() { + // do nothing + } + + @Override + public List
tables() throws SchemaException { + return Table.filterDFS(allTables, List.of("*"), List.of(), false); + } + + @Override + public void sync() { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'Sync'"); + } + + @Override + public void read() { + throw new UnsupportedOperationException("Unimplemented method 'Read'"); + } + + @Override + public void write() { + throw new UnsupportedOperationException("Unimplemented method 'Write'"); + } + + @Override + public void close() { + // do nothing + } } diff --git a/lib/src/main/java/io/cloudquery/plugin/Plugin.java b/lib/src/main/java/io/cloudquery/plugin/Plugin.java index 918981e..57a968d 100644 --- a/lib/src/main/java/io/cloudquery/plugin/Plugin.java +++ b/lib/src/main/java/io/cloudquery/plugin/Plugin.java @@ -1,14 +1,36 @@ package io.cloudquery.plugin; -import lombok.AllArgsConstructor; +import java.util.List; + +import org.apache.logging.log4j.Logger; + +import io.cloudquery.schema.SchemaException; +import io.cloudquery.schema.Table; import lombok.Getter; import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +@RequiredArgsConstructor @Getter -@AllArgsConstructor public abstract class Plugin { @NonNull protected final String name; @NonNull protected final String version; + @Setter + protected Logger logger; + + public abstract void init(); + + public abstract List
tables() throws SchemaException; + + public abstract void sync(); + + public abstract void read(); + + public abstract void write(); + + public abstract void close(); + } diff --git a/lib/src/main/java/io/cloudquery/schema/Resource.java b/lib/src/main/java/io/cloudquery/schema/Resource.java index 3959ac0..47c562a 100644 --- a/lib/src/main/java/io/cloudquery/schema/Resource.java +++ b/lib/src/main/java/io/cloudquery/schema/Resource.java @@ -4,6 +4,7 @@ import io.cloudquery.scalar.ValidationException; import lombok.Builder; import lombok.Getter; +import lombok.NonNull; import java.util.ArrayList; import java.util.List; @@ -17,10 +18,10 @@ public class Resource { private final List> data; @Builder(toBuilder = true) - public Resource(Table table, Resource parent, Object item) { + public Resource(@NonNull Table table, Resource parent, Object item) { this.item = item; this.parent = parent; - this.table = table != null ? table : Table.builder().build(); + this.table = table; this.data = new ArrayList<>(); for (Column column : this.table.getColumns()) { diff --git a/lib/src/main/java/io/cloudquery/schema/Table.java b/lib/src/main/java/io/cloudquery/schema/Table.java index a590616..0fe0a43 100644 --- a/lib/src/main/java/io/cloudquery/schema/Table.java +++ b/lib/src/main/java/io/cloudquery/schema/Table.java @@ -5,6 +5,7 @@ import io.cloudquery.transformers.TransformerException; import lombok.Builder; import lombok.Getter; +import lombok.NonNull; import lombok.Setter; import java.util.ArrayList; @@ -15,6 +16,11 @@ import java.util.Optional; import java.util.function.Predicate; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; + +import static java.util.Arrays.asList; + @Builder(toBuilder = true) @Getter public class Table { @@ -34,7 +40,8 @@ public static List
flattenTables(List
tables) { return flattenMap.values().stream().toList(); } - public static List
filterDFS(List
tables, List includeConfiguration, List skipConfiguration, boolean skipDependentTables) throws SchemaException { + public static List
filterDFS(List
tables, List includeConfiguration, + List skipConfiguration, boolean skipDependentTables) throws SchemaException { List
flattenedTables = flattenTables(tables); for (String includePattern : includeConfiguration) { boolean includeMatch = false; @@ -45,7 +52,8 @@ public static List
filterDFS(List
tables, List includeConf } } if (!includeMatch) { - throw new SchemaException("table configuration includes a pattern \"" + includePattern + "\" with no matches"); + throw new SchemaException( + "table configuration includes a pattern \"" + includePattern + "\" with no matches"); } } for (String excludePattern : skipConfiguration) { @@ -57,7 +65,8 @@ public static List
filterDFS(List
tables, List includeConf } } if (!excludeMatch) { - throw new SchemaException("skip configuration includes a pattern \"" + excludePattern + "\" with no matches"); + throw new SchemaException( + "skip configuration includes a pattern \"" + excludePattern + "\" with no matches"); } } @@ -82,11 +91,13 @@ public static List
filterDFS(List
tables, List includeConf return filterDFSFunc(tables, include, exclude, skipDependentTables); } - private static List
filterDFSFunc(List
tables, Predicate
include, Predicate
exclude, boolean skipDependentTables) { + private static List
filterDFSFunc(List
tables, Predicate
include, Predicate
exclude, + boolean skipDependentTables) { List
filteredTables = new ArrayList<>(); for (Table table : tables) { Table filteredTable = table.toBuilder().parent(null).build(); - Optional
optionalFilteredTable = filteredTable.filterDfs(false, include, exclude, skipDependentTables); + Optional
optionalFilteredTable = filteredTable.filterDfs(false, include, exclude, + skipDependentTables); optionalFilteredTable.ifPresent(filteredTables::add); } return filteredTables; @@ -106,7 +117,10 @@ public static int maxDepth(List
tables) { return depth; } + @NonNull private String name; + private String title; + private String description; @Setter private Table parent; @Builder.Default @@ -150,13 +164,11 @@ public int indexOfColumn(String columnName) { } public List primaryKeys() { - return columns.stream(). - filter(Column::isPrimaryKey). - map(Column::getName). - toList(); + return columns.stream().filter(Column::isPrimaryKey).map(Column::getName).toList(); } - private Optional
filterDfs(boolean parentMatched, Predicate
include, Predicate
exclude, boolean skipDependentTables) { + private Optional
filterDfs(boolean parentMatched, Predicate
include, Predicate
exclude, + boolean skipDependentTables) { if (exclude.test(this)) { return Optional.empty(); } @@ -187,4 +199,26 @@ public Optional getColumn(String name) { } return Optional.empty(); } + + public Schema toArrowSchema() { + Field[] fields = new Field[columns.size()]; + for (int i = 0; i < columns.size(); i++) { + Column column = columns.get(i); + Field field = Field.nullable(column.getName(), column.getType()); + fields[i] = field; + } + Map metadata = new HashMap<>(); + metadata.put("cq:table_name", name); + if (title != null) { + metadata.put("cq:table_title", title); + } + if (description != null) { + metadata.put("cq:table_description", description); + } + if (parent != null) { + metadata.put("cq:table_depends_on", parent.getName()); + } + Schema schema = new Schema(asList(fields), metadata); + return schema; + } } diff --git a/lib/src/main/java/io/cloudquery/server/ServeCommand.java b/lib/src/main/java/io/cloudquery/server/ServeCommand.java index b4eaccb..b67cb82 100644 --- a/lib/src/main/java/io/cloudquery/server/ServeCommand.java +++ b/lib/src/main/java/io/cloudquery/server/ServeCommand.java @@ -78,19 +78,11 @@ private LoggerContext initLogger() { @Override public Integer call() { - LoggerContext context = this.initLogger(); - - try { - // Configure open telemetry - // Configure test listener - // Configure gRPC server + try (LoggerContext context = this.initLogger()) { Server server = Grpc.newServerBuilderForPort(address.port(), InsecureServerCredentials.create()) .addService(new DiscoverServer(DISCOVERY_VERSIONS)).addService(new PluginServer(plugin)) .addService(ProtoReflectionService.newInstance()).executor(Executors.newFixedThreadPool(10)) .build(); - // Configure sentry - // Log we are listening on address and port - // Run gRPC server and block server.start(); logger.info("Started server on {}:{}", address.host(), address.port()); server.awaitTermination(); @@ -98,8 +90,6 @@ public Integer call() { } catch (IOException | InterruptedException e) { logger.error("Failed to start server", e); return 1; - } finally { - context.close(); } } diff --git a/lib/src/test/java/io/cloudquery/schema/ResourceTest.java b/lib/src/test/java/io/cloudquery/schema/ResourceTest.java index 930ab37..c32ad18 100644 --- a/lib/src/test/java/io/cloudquery/schema/ResourceTest.java +++ b/lib/src/test/java/io/cloudquery/schema/ResourceTest.java @@ -17,19 +17,13 @@ public class ResourceTest { @Test public void shouldBuildWithNoErrors() { - assertDoesNotThrow(() -> Resource.builder().build()); + assertDoesNotThrow(() -> Resource.builder().table(Table.builder().name("").build()).build()); } @Test public void shouldCreateScalarData() { - Column column1 = Column.builder(). - name("test_column1"). - type(new UUIDType()). - build(); - Column column2 = Column.builder(). - name("test_column2"). - type(ArrowType.Utf8.INSTANCE). - build(); + Column column1 = Column.builder().name("test_column1").type(new UUIDType()).build(); + Column column2 = Column.builder().name("test_column2").type(ArrowType.Utf8.INSTANCE).build(); Table table = Table.builder().name("test").columns(List.of(column1, column2)).build(); Resource resource = Resource.builder().table(table).build(); @@ -40,10 +34,7 @@ public void shouldCreateScalarData() { @Test public void shouldSetAndGetDataTypes() throws ValidationException { - Column column1 = Column.builder(). - name("test_column1"). - type(new UUIDType()). - build(); + Column column1 = Column.builder().name("test_column1").type(new UUIDType()).build(); Table table = Table.builder().name("test").columns(List.of(column1)).build(); Resource resource = Resource.builder().table(table).build();