Skip to content

Upsert dbs support #1805

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: Apache-2.0
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive.dialect;

import org.hibernate.dialect.Dialect;
import org.hibernate.dialect.DialectDelegateWrapper;
import org.hibernate.dialect.DmlTargetColumnQualifierSupport;
import org.hibernate.dialect.OracleSqlAstTranslator;
import org.hibernate.engine.spi.SessionFactoryImplementor;
import org.hibernate.persister.entity.mutation.EntityTableMapping;
import org.hibernate.reactive.sql.model.ReactiveDeleteOrUpsertOperation;
import org.hibernate.sql.ast.Clause;
import org.hibernate.sql.ast.tree.Statement;
import org.hibernate.sql.exec.spi.JdbcOperation;
import org.hibernate.sql.model.MutationOperation;
import org.hibernate.sql.model.internal.OptionalTableUpdate;
import org.hibernate.sql.model.jdbc.UpsertOperation;

public class ReactiveOracleSqlAstTranslator<T extends JdbcOperation> extends OracleSqlAstTranslator<T> {
public ReactiveOracleSqlAstTranslator(
SessionFactoryImplementor sessionFactory,
Statement statement) {
super( sessionFactory, statement );
}

@Override
public MutationOperation createMergeOperation(OptionalTableUpdate optionalTableUpdate) {
renderUpsertStatement( optionalTableUpdate );

final UpsertOperation upsertOperation = new UpsertOperation(
optionalTableUpdate.getMutatingTable().getTableMapping(),
optionalTableUpdate.getMutationTarget(),
getSql(),
getParameterBinders()
);

return new ReactiveDeleteOrUpsertOperation(
optionalTableUpdate.getMutationTarget(),
(EntityTableMapping) optionalTableUpdate.getMutatingTable().getTableMapping(),
upsertOperation,
optionalTableUpdate
);
}

// FIXME: Copy and paste from ORM
private void renderUpsertStatement(OptionalTableUpdate optionalTableUpdate) {
// template:
//
// merge into [table] as t
// using values([bindings]) as s ([column-names])
// on t.[key] = s.[key]
// when not matched
// then insert ...
// when matched
// then update ...

renderMergeInto( optionalTableUpdate );
appendSql( " " );
renderMergeUsing( optionalTableUpdate );
appendSql( " " );
renderMergeOn( optionalTableUpdate );
appendSql( " " );
renderMergeInsert( optionalTableUpdate );
appendSql( " " );
renderMergeUpdate( optionalTableUpdate );
}

@Override
protected boolean rendersTableReferenceAlias(Clause clause) {
// todo (6.0) : For now we just skip the alias rendering in the delete and update clauses
// We need some dialect support if we want to support joins in delete and update statements
switch ( clause ) {
case DELETE:
case UPDATE: {
final Dialect realDialect = DialectDelegateWrapper.extractRealDialect( getDialect() );
return realDialect.getDmlTargetColumnQualifierSupport() == DmlTargetColumnQualifierSupport.TABLE_ALIAS;
}
default:
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,19 @@
import org.hibernate.dialect.CockroachDialect;
import org.hibernate.dialect.Database;
import org.hibernate.dialect.Dialect;
import org.hibernate.dialect.DialectDelegateWrapper;
import org.hibernate.engine.jdbc.dialect.spi.DialectResolutionInfo;
import org.hibernate.engine.jdbc.dialect.spi.DialectResolver;
import org.hibernate.engine.spi.SessionFactoryImplementor;
import org.hibernate.persister.entity.mutation.EntityMutationTarget;
import org.hibernate.reactive.dialect.ReactiveOracleSqlAstTranslator;
import org.hibernate.sql.ast.SqlAstTranslator;
import org.hibernate.sql.ast.SqlAstTranslatorFactory;
import org.hibernate.sql.ast.spi.StandardSqlAstTranslatorFactory;
import org.hibernate.sql.ast.tree.Statement;
import org.hibernate.sql.exec.spi.JdbcOperation;
import org.hibernate.sql.model.MutationOperation;
import org.hibernate.sql.model.internal.OptionalTableUpdate;

import static org.hibernate.dialect.CockroachDialect.parseVersion;

Expand All @@ -25,7 +36,30 @@ public Dialect resolveDialect(DialectResolutionInfo info) {

for ( Database database : Database.values() ) {
if ( database.matchesResolutionInfo( info ) ) {
return database.createDialect( info );
Dialect dialect = database.createDialect( info );
if ( info.getDatabaseName().toUpperCase().startsWith( "ORACLE" ) ) {
return new DialectDelegateWrapper( dialect ) {
@Override
public MutationOperation createOptionalTableUpdateOperation(
EntityMutationTarget mutationTarget,
OptionalTableUpdate optionalTableUpdate,
SessionFactoryImplementor factory) {
return new ReactiveOracleSqlAstTranslator<>( factory, optionalTableUpdate )
.createMergeOperation( optionalTableUpdate );
}

@Override
public SqlAstTranslatorFactory getSqlAstTranslatorFactory() {
return new StandardSqlAstTranslatorFactory() {
@Override
protected <T extends JdbcOperation> SqlAstTranslator<T> buildTranslator(SessionFactoryImplementor sessionFactory, Statement statement) {
return new ReactiveOracleSqlAstTranslator<>( sessionFactory, statement );
}
};
}
};
}
return dialect;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,53 @@
*/
package org.hibernate.reactive.engine.jdbc.mutation.internal;


import java.util.concurrent.CompletionStage;

import org.hibernate.engine.jdbc.mutation.TableInclusionChecker;
import org.hibernate.engine.jdbc.mutation.internal.MutationExecutorSingleSelfExecuting;
import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.reactive.engine.jdbc.env.internal.ReactiveMutationExecutor;
import org.hibernate.reactive.logging.impl.Log;
import org.hibernate.reactive.sql.model.ReactiveSelfExecutingUpdateOperation;
import org.hibernate.sql.model.SelfExecutingUpdateOperation;
import org.hibernate.sql.model.ValuesAnalysis;

import static java.lang.invoke.MethodHandles.lookup;
import static org.hibernate.reactive.logging.impl.LoggerFactory.make;
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;

public class ReactiveMutationExecutorSingleSelfExecuting extends MutationExecutorSingleSelfExecuting
implements ReactiveMutationExecutor {

public ReactiveMutationExecutorSingleSelfExecuting(SelfExecutingUpdateOperation operation, SharedSessionContractImplementor session) {
private static final Log LOG = make( Log.class, lookup() );

private final SelfExecutingUpdateOperation operation;

public ReactiveMutationExecutorSingleSelfExecuting(
SelfExecutingUpdateOperation operation,
SharedSessionContractImplementor session) {
super( operation, session );
this.operation = operation;
}

@Override
protected void performSelfExecutingOperations(
ValuesAnalysis valuesAnalysis,
TableInclusionChecker inclusionChecker,
SharedSessionContractImplementor session) {
throw LOG.nonReactiveMethodCall( "performReactiveSelfExecutingOperation" );
}

@Override
public CompletionStage<Void> performReactiveSelfExecutingOperations(
ValuesAnalysis valuesAnalysis,
TableInclusionChecker inclusionChecker,
SharedSessionContractImplementor session) {
if ( inclusionChecker.include( operation.getTableDetails() ) && operation instanceof ReactiveSelfExecutingUpdateOperation ) {
return ( (ReactiveSelfExecutingUpdateOperation) operation )
.performReactiveMutation( getJdbcValueBindings(), valuesAnalysis, session );
}
return voidFuture();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.hibernate.dialect.CockroachDialect;
import org.hibernate.dialect.DB2Dialect;
import org.hibernate.dialect.Dialect;
import org.hibernate.dialect.DialectDelegateWrapper;
import org.hibernate.dialect.MySQLDialect;
import org.hibernate.dialect.OracleDialect;
import org.hibernate.dialect.SQLServerDialect;
Expand Down Expand Up @@ -69,15 +70,16 @@ && getPersister().getFactory().getJdbcServices().getDialect() instanceof Cockroa

private static String createInsert(PreparedStatementDetails insertStatementDetails, String identifierColumnName, Dialect dialect) {
final String sqlEnd = " returning " + identifierColumnName;
if ( dialect instanceof MySQLDialect ) {
Dialect realDialect = DialectDelegateWrapper.extractRealDialect( dialect );
if ( realDialect instanceof MySQLDialect ) {
// For some reasons ORM generates a query with an invalid syntax
String sql = insertStatementDetails.getSqlString();
int index = sql.lastIndexOf( sqlEnd );
return index > -1
? sql.substring( 0, index )
: sql;
}
if ( dialect instanceof SQLServerDialect ) {
if ( realDialect instanceof SQLServerDialect ) {
String sql = insertStatementDetails.getSqlString();
int index = sql.lastIndexOf( sqlEnd );
// FIXME: this is a hack for HHH-16365
Expand All @@ -94,12 +96,12 @@ private static String createInsert(PreparedStatementDetails insertStatementDetai
}
return sql;
}
if ( dialect instanceof DB2Dialect ) {
if ( realDialect instanceof DB2Dialect ) {
// ORM query: select id from new table ( insert into IntegerTypeEntity values ( ))
// Correct : select id from new table ( insert into LongTypeEntity (id) values (default))
return insertStatementDetails.getSqlString().replace( " values ( ))", " (" + identifierColumnName + ") values (default))" );
}
if ( dialect instanceof OracleDialect ) {
if ( realDialect instanceof OracleDialect ) {
final String valuesStr = " values ( )";
String sql = insertStatementDetails.getSqlString();
int index = sql.lastIndexOf( sqlEnd );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,32 @@
*/
package org.hibernate.reactive.persister.entity.impl;


import org.hibernate.dialect.OracleDialect;
import org.hibernate.engine.spi.SessionFactoryImplementor;
import org.hibernate.persister.entity.AbstractEntityPersister;
import org.hibernate.persister.entity.mutation.MergeCoordinator;
import org.hibernate.reactive.persister.entity.mutation.ReactiveMergeCoordinator;
import org.hibernate.reactive.persister.entity.mutation.ReactiveScopedUpdateCoordinator;
import org.hibernate.reactive.persister.entity.mutation.ReactiveUpdateCoordinator;
import org.hibernate.reactive.sql.model.ReactiveOptionalTableUpdateOperation;
import org.hibernate.sql.model.ModelMutationLogging;
import org.hibernate.sql.model.MutationOperation;
import org.hibernate.sql.model.MutationOperationGroup;
import org.hibernate.sql.model.ValuesAnalysis;
import org.hibernate.sql.model.ast.MutationGroup;
import org.hibernate.sql.model.ast.TableMutation;
import org.hibernate.sql.model.internal.MutationOperationGroupFactory;
import org.hibernate.sql.model.internal.OptionalTableUpdate;
import org.hibernate.sql.model.jdbc.DeleteOrUpsertOperation;
import org.hibernate.sql.model.jdbc.OptionalTableUpdateOperation;

public class ReactiveMergeCoordinatorStandardScopeFactory extends MergeCoordinator
implements ReactiveUpdateCoordinator {

public ReactiveMergeCoordinatorStandardScopeFactory(AbstractEntityPersister entityPersister, SessionFactoryImplementor factory) {
public ReactiveMergeCoordinatorStandardScopeFactory(
AbstractEntityPersister entityPersister,
SessionFactoryImplementor factory) {
super( entityPersister, factory );
}

Expand All @@ -24,10 +39,77 @@ public ReactiveScopedUpdateCoordinator makeScopedCoordinator() {
return new ReactiveMergeCoordinator(
entityPersister(),
factory(),
this.getStaticUpdateGroup(),
this.getBatchKey(),
this.getVersionUpdateGroup(),
this.getVersionUpdateBatchkey()
getStaticUpdateGroup(),
getBatchKey(),
getVersionUpdateGroup(),
getVersionUpdateBatchkey()
);
}

@Override
protected MutationOperationGroup createOperationGroup(ValuesAnalysis valuesAnalysis, MutationGroup mutationGroup) {
final int numberOfTableMutations = mutationGroup.getNumberOfTableMutations();
switch ( numberOfTableMutations ) {
case 0:
return MutationOperationGroupFactory.noOperations( mutationGroup );
case 1: {
MutationOperation operation = createOperation( valuesAnalysis, mutationGroup.getSingleTableMutation() );
return operation == null
? MutationOperationGroupFactory.noOperations( mutationGroup )
: MutationOperationGroupFactory.singleOperation( mutationGroup, operation );
}
default: {
MutationOperation[] operations = new MutationOperation[numberOfTableMutations];
int outputIndex = 0;
int skipped = 0;
for ( int i = 0; i < mutationGroup.getNumberOfTableMutations(); i++ ) {
final TableMutation<?> tableMutation = mutationGroup.getTableMutation( i );
MutationOperation operation = createOperation( valuesAnalysis, tableMutation );
if ( operation != null ) {
operations[outputIndex++] = operation;
}
else {
skipped++;
ModelMutationLogging.MODEL_MUTATION_LOGGER.debugf(
"Skipping table update - %s",
tableMutation.getTableName()
);
}
}
if ( skipped != 0 ) {
final MutationOperation[] trimmed = new MutationOperation[outputIndex];
System.arraycopy( operations, 0, trimmed, 0, outputIndex );
operations = trimmed;
}
return MutationOperationGroupFactory.manyOperations(
mutationGroup.getMutationType(),
entityPersister,
operations
);
}
}
}

// FIXME: We could add this method in ORM and override only this code
protected MutationOperation createOperation(ValuesAnalysis valuesAnalysis, TableMutation<?> singleTableMutation) {
MutationOperation operation = singleTableMutation.createMutationOperation( valuesAnalysis, factory() );
if ( operation instanceof OptionalTableUpdateOperation ) {
// We need to plug in our own reactive operation
return new ReactiveOptionalTableUpdateOperation(
operation.getMutationTarget(),
(OptionalTableUpdate) singleTableMutation,
factory()
);
}
if ( operation instanceof DeleteOrUpsertOperation
&& factory().getJdbcServices().getDialect() instanceof OracleDialect ) {
OracleDialect dialect = ( (OracleDialect) factory().getJdbcServices().getDialect() );
return dialect.createOptionalTableUpdateOperation(
( (OptionalTableUpdate) operation ).getMutationTarget(),
(OptionalTableUpdate) operation,
factory()
);
}
return operation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.hibernate.dialect.CockroachDialect;
import org.hibernate.dialect.DB2Dialect;
import org.hibernate.dialect.Dialect;
import org.hibernate.dialect.DialectDelegateWrapper;
import org.hibernate.dialect.MySQLDialect;
import org.hibernate.dialect.OracleDialect;
import org.hibernate.dialect.PostgreSQLDialect;
Expand Down Expand Up @@ -99,7 +100,7 @@ private void registerReactiveChanges(TypeContributions typeContributions, Servic
}

private Dialect dialect(ServiceRegistry serviceRegistry) {
return serviceRegistry.getService( JdbcEnvironment.class ).getDialect();
return DialectDelegateWrapper.extractRealDialect( serviceRegistry.getService( JdbcEnvironment.class ).getDialect() );
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.function.Function;

import org.hibernate.dialect.Dialect;
import org.hibernate.dialect.DialectDelegateWrapper;
import org.hibernate.engine.jdbc.spi.JdbcServices;
import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.query.spi.QueryOptions;
Expand Down Expand Up @@ -127,7 +128,7 @@ private static String finalSql(
.getSessionFactoryOptions()
.isCommentsEnabled()
);
final Dialect dialect = executionContext.getSession().getJdbcServices().getDialect();
final Dialect dialect = DialectDelegateWrapper.extractRealDialect( executionContext.getSession().getJdbcServices().getDialect() );
return Parameters.instance( dialect ).process( sql );
}
}
Loading