Skip to content

Add support for COPY operations #1316

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
20 changes: 20 additions & 0 deletions vertx-pg-client/src/main/java/examples/PgClientExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,26 @@ public void batchReturning(SqlClient client) {
});
}

public void importDataToDb(Vertx vertx, PgConnection client) {
vertx.fileSystem().readFile("path/to/file")
.flatMap(bufferAsyncResult -> {
return client.copyFromBytes(
"COPY my_table FROM STDIN (FORMAT csv, HEADER)",
bufferAsyncResult
).execute();
}).onSuccess(result -> {
Long rowsWritten = result.iterator().next().getLong("rowsWritten");
System.out.println("rows written: " + rowsWritten);
});
}

public void exportDataFromDb(Vertx vertx, PgConnection client) {
String path = "path/to/file";
client.copyToBytes("COPY my_table TO STDOUT (FORMAT csv, HEADER)")
.flatMap(result -> vertx.fileSystem().writeFile("path/to/file.csv", result.value()))
.onSuccess(res -> System.out.println("Data exported to " + path));
}

public void pgBouncer(PgConnectOptions connectOptions) {
connectOptions.setUseLayer7Proxy(true);
}
Expand Down
50 changes: 50 additions & 0 deletions vertx-pg-client/src/main/java/io/vertx/pgclient/PgConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@

package io.vertx.pgclient;

import io.vertx.core.buffer.Buffer;
import io.vertx.codegen.annotations.Fluent;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.pgclient.impl.PgConnectionImpl;
import io.vertx.sqlclient.Query;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.SqlResult;

import java.util.List;

/**
* A connection to Postgres.
Expand All @@ -34,6 +41,7 @@
* <ul>
* <li>Notification</li>
* <li>Request Cancellation</li>
* <li>Copy from STDIN / to STDOUT</li>
* </ul>
* </P>
*
Expand Down Expand Up @@ -92,6 +100,48 @@ static Future<PgConnection> connect(Vertx vertx, String connectionUri) {
@Fluent
PgConnection noticeHandler(Handler<PgNotice> handler);

/**
* Imports data into a database.
*
* <p>Use this method when importing opaque bytes, e.g. from a CSV file.
*
* <p>If you need bulk inserts of POJOs, use {@link io.vertx.sqlclient.PreparedQuery#executeBatch(List)} instead.
*
* @param sql COPY command (example {@code COPY my_table FROM STDIN (FORMAT csv, HEADER)})
* @param from byte stream data will be fetched from
* @return result set with single field {@code rowsWritten}
*/
Query<RowSet<Row>> copyFromBytes(String sql, Buffer from);

/**
* Exports data from a database with decoding.
*
* {@code FORMAT} can only be {@code binary}.
*
* @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT binary)})
* @return decoded records
*/
Query<RowSet<Row>> copyToRows(String sql);

/**
* Exports data from a database as-is, without decoding.
*
* <p>Use this method when exporting opaque bytes, e.g. to a CSV file.
*
* @param sql COPY command (example {@code COPY my_table TO STDOUT (FORMAT csv)})
* @return async result of bytes container data will be written to
*
* - vertx.core.stream - https://vertx.io/docs/apidocs/io/vertx/core/streams/ReadStream.html
* - future of read stream.
* - when we do query operation
* - we should not use query result builder
* - what about SELECT 1;SELECT 1 or COPY ....;COPY ... ?
* - we need a new interface Future<ReadStream<Buffer>>
* - https://vertx.io/docs/apidocs/io/vertx/core/streams/ReadStream.html
* - PgSocketConnection - we will apply backpressure in SocketInternal
*/
Future<SqlResult<Buffer>> copyToBytes(String sql);

/**
* Send a request cancellation message to tell the server to cancel processing request in this connection.
* <br>Note: Use this with caution because the cancellation signal may or may not have any effect.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,36 @@
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.netty.buffer.ByteBuf;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.buffer.impl.BufferImpl;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgConnection;
import io.vertx.pgclient.PgNotice;
import io.vertx.pgclient.PgNotification;
import io.vertx.pgclient.impl.codec.CopyOutCommand;
import io.vertx.pgclient.impl.codec.NoticeResponse;
import io.vertx.pgclient.impl.codec.TxFailedEvent;
import io.vertx.pgclient.spi.PgDriver;
import io.vertx.sqlclient.Query;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.SqlResult;
import io.vertx.sqlclient.impl.Connection;
import io.vertx.sqlclient.impl.Notification;
import io.vertx.sqlclient.impl.QueryExecutor;
import io.vertx.sqlclient.impl.QueryResultBuilder;
import io.vertx.sqlclient.impl.QueryResultHandler;
import io.vertx.sqlclient.impl.SocketConnectionBase;
import io.vertx.sqlclient.impl.SqlConnectionBase;
import io.vertx.sqlclient.impl.SqlResultImpl;
import io.vertx.sqlclient.impl.command.QueryCommandBase;
import io.vertx.sqlclient.impl.command.SimpleQueryCommand;

import java.util.function.Function;
import java.util.stream.Collector;

public class PgConnectionImpl extends SqlConnectionBase<PgConnectionImpl> implements PgConnection {

Expand Down Expand Up @@ -107,6 +125,32 @@ public PgConnection noticeHandler(Handler<PgNotice> handler) {
return this;
}

@Override
public Query<RowSet<Row>> copyFromBytes(String sql, Buffer from) {
return null;
}

@Override
public Query<RowSet<Row>> copyToRows(String sql) {
return null;
}

@Override
public Future<SqlResult<Buffer>> copyToBytes(String sql) {
Function<Buffer, SqlResultImpl<Buffer>> factory = (buffer) -> new SqlResultImpl<>(buffer);
PromiseInternal<SqlResult<Buffer>> promise = context.promise();

// currently, this loads entire content into Buffer
// it should stream bytes out instead
// TODO: signal completion as soon as the database replied CopyOutResponse 'H' ?
QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler =
new QueryResultBuilder<>(factory, promise);

CopyOutCommand cmd = new CopyOutCommand(sql, resultHandler);
this.schedule(promise.context(), cmd).onComplete(resultHandler);
return promise.future();
}

@Override
public int processId() {
return conn.getProcessId();
Expand All @@ -126,4 +170,5 @@ public Future<Void> cancelRequest() {
});
return promise.future();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.vertx.pgclient.impl.codec;

import io.netty.buffer.ByteBuf;
import io.vertx.core.buffer.Buffer;
import io.vertx.sqlclient.SqlResult;
import io.vertx.sqlclient.impl.QueryResultBuilder;
import io.vertx.sqlclient.impl.SqlResultImpl;
import io.vertx.sqlclient.impl.command.CommandBase;

import java.util.stream.Collector;

public class CopyOutCommand extends CommandBase<Boolean> {
private final String sql;
private final Collector<ByteBuf, Buffer, Buffer> collector;
private final QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler;

public CopyOutCommand(
String sql,
QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler
) {
this.sql = sql;
this.resultHandler = resultHandler;
this.collector = Collector.of(
Buffer::buffer,
// TODO: this might be unnecessary slow - think of alternatives
(v, chunk) -> v.appendBuffer(Buffer.buffer(chunk)),
(v1, v2) -> null,
(finalResult) -> finalResult
);
}

QueryResultBuilder<Buffer, SqlResultImpl<Buffer>, SqlResult<Buffer>> resultHandler() {
return resultHandler;
}

String sql() {
return sql;
}

Collector<ByteBuf, Buffer, Buffer> collector() {
return collector;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.vertx.pgclient.impl.codec;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.buffer.impl.BufferImpl;
import io.vertx.sqlclient.SqlResult;
import io.vertx.sqlclient.impl.SqlResultImpl;

class CopyOutCommandCodec extends PgCommandCodec<Boolean, CopyOutCommand> {
CopyOutDataDecoder decoder;

CopyOutCommandCodec(CopyOutCommand cmd) {
super(cmd);
decoder = new CopyOutDataDecoder(cmd.collector());
}

@Override
public void handleCommandComplete(int updated) {
this.result = false;
Buffer result;
Throwable failure;
int size;
if (decoder != null) {
failure = decoder.complete();
result = decoder.result();
size = decoder.size();
decoder.reset();
} else {
failure = null;
result = new BufferImpl();
size = 0;
}
cmd.resultHandler().handleResult(updated, size, null, result, failure);
}

@Override
public void handleErrorResponse(ErrorResponse errorResponse) {
failure = errorResponse.toException();
}

void encode(PgEncoder encoder) {
encoder.writeQuery(new Query(cmd.sql()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.vertx.pgclient.impl.codec;

import io.netty.buffer.ByteBuf;
import io.vertx.core.buffer.Buffer;
import java.util.function.BiConsumer;
import java.util.stream.Collector;

public class CopyOutDataDecoder {

private final Collector<ByteBuf, Buffer, Buffer> collector;
private BiConsumer<Buffer, ByteBuf> accumulator;

private int size;
private Buffer container;
private Throwable failure;
private Buffer result;

protected CopyOutDataDecoder(Collector<ByteBuf, Buffer, Buffer> collector) {
this.collector = collector;
reset();
}

public int size() {
return size;
}

public void handleChunk(ByteBuf in) {
if (failure != null) {
return;
}
if (accumulator == null) {
try {
accumulator = collector.accumulator();
} catch (Exception e) {
failure = e;
return;
}
}
try {
accumulator.accept(container, in);
} catch (Exception e) {
failure = e;
return;
}
size++;
}

public Buffer result() {
return result;
}

public Throwable complete() {
try {
result = collector.finisher().apply(container);
} catch (Exception e) {
failure = e;
}
return failure;
}

public void reset() {
size = 0;
failure = null;
result = null;
try {
this.container = collector.supplier().get();
} catch (Exception e) {
failure = e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,20 @@ private void decodeMessage(ChannelHandlerContext ctx, byte id, ByteBuf in) {
decodeNotificationResponse(ctx, in);
break;
}
// TODO: check if these handlers need to be at this level of loop
// TODO: check if COPY needs a separate loop
case PgProtocolConstants.MESSAGE_TYPE_COPY_OUT_RESPONSE: {
decodeCopyOutResponse(ctx, in);
break;
}
case PgProtocolConstants.MESSAGE_TYPE_COPY_DATA: {
decodeCopyData(ctx, in);
break;
}
case PgProtocolConstants.MESSAGE_TYPE_COPY_COMPLETION: {
decodeCopyCompletion(ctx, in);
break;
}
default: {
throw new UnsupportedOperationException();
}
Expand Down Expand Up @@ -455,4 +469,14 @@ private void decodeBackendKeyData(ByteBuf in) {
private void decodeNotificationResponse(ChannelHandlerContext ctx, ByteBuf in) {
ctx.fireChannelRead(new Notification(in.readInt(), Util.readCStringUTF8(in), Util.readCStringUTF8(in)));
}

private void decodeCopyOutResponse(ChannelHandlerContext ctx, ByteBuf in) {}

private void decodeCopyData(ChannelHandlerContext ctx, ByteBuf in) {
PgCommandCodec<?, ?> codec = inflight.peek();
CopyOutCommandCodec cmdCodec = (CopyOutCommandCodec) codec;
cmdCodec.decoder.handleChunk(in);
}

private void decodeCopyCompletion(ChannelHandlerContext ctx, ByteBuf in) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ void write(CommandBase<?> cmd) {
return new ClosePortalCommandCodec((CloseCursorCommand) cmd);
} else if (cmd instanceof CloseStatementCommand) {
return new CloseStatementCommandCodec((CloseStatementCommand) cmd);
} else if (cmd instanceof CopyOutCommand) {
return new CopyOutCommandCodec((CopyOutCommand) cmd);
}
throw new AssertionError();
}
Expand Down
Loading