diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/dialect/ReactiveOracleSqlAstTranslator.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/dialect/ReactiveOracleSqlAstTranslator.java new file mode 100644 index 000000000..9418a07dd --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/dialect/ReactiveOracleSqlAstTranslator.java @@ -0,0 +1,85 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.dialect; + +import org.hibernate.dialect.Dialect; +import org.hibernate.dialect.DialectDelegateWrapper; +import org.hibernate.dialect.DmlTargetColumnQualifierSupport; +import org.hibernate.dialect.OracleSqlAstTranslator; +import org.hibernate.engine.spi.SessionFactoryImplementor; +import org.hibernate.persister.entity.mutation.EntityTableMapping; +import org.hibernate.reactive.sql.model.ReactiveDeleteOrUpsertOperation; +import org.hibernate.sql.ast.Clause; +import org.hibernate.sql.ast.tree.Statement; +import org.hibernate.sql.exec.spi.JdbcOperation; +import org.hibernate.sql.model.MutationOperation; +import org.hibernate.sql.model.internal.OptionalTableUpdate; +import org.hibernate.sql.model.jdbc.UpsertOperation; + +public class ReactiveOracleSqlAstTranslator extends OracleSqlAstTranslator { + public ReactiveOracleSqlAstTranslator( + SessionFactoryImplementor sessionFactory, + Statement statement) { + super( sessionFactory, statement ); + } + + @Override + public MutationOperation createMergeOperation(OptionalTableUpdate optionalTableUpdate) { + renderUpsertStatement( optionalTableUpdate ); + + final UpsertOperation upsertOperation = new UpsertOperation( + optionalTableUpdate.getMutatingTable().getTableMapping(), + optionalTableUpdate.getMutationTarget(), + getSql(), + getParameterBinders() + ); + + return new ReactiveDeleteOrUpsertOperation( + optionalTableUpdate.getMutationTarget(), + (EntityTableMapping) optionalTableUpdate.getMutatingTable().getTableMapping(), + upsertOperation, + optionalTableUpdate + ); + } + + // FIXME: Copy and paste from ORM + private void renderUpsertStatement(OptionalTableUpdate optionalTableUpdate) { + // template: + // + // merge into [table] as t + // using values([bindings]) as s ([column-names]) + // on t.[key] = s.[key] + // when not matched + // then insert ... + // when matched + // then update ... + + renderMergeInto( optionalTableUpdate ); + appendSql( " " ); + renderMergeUsing( optionalTableUpdate ); + appendSql( " " ); + renderMergeOn( optionalTableUpdate ); + appendSql( " " ); + renderMergeInsert( optionalTableUpdate ); + appendSql( " " ); + renderMergeUpdate( optionalTableUpdate ); + } + + @Override + protected boolean rendersTableReferenceAlias(Clause clause) { + // todo (6.0) : For now we just skip the alias rendering in the delete and update clauses + // We need some dialect support if we want to support joins in delete and update statements + switch ( clause ) { + case DELETE: + case UPDATE: { + final Dialect realDialect = DialectDelegateWrapper.extractRealDialect( getDialect() ); + return realDialect.getDmlTargetColumnQualifierSupport() == DmlTargetColumnQualifierSupport.TABLE_ALIAS; + } + default: + return true; + } + } +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/jdbc/dialect/internal/ReactiveStandardDialectResolver.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/jdbc/dialect/internal/ReactiveStandardDialectResolver.java index 1b24e567d..9964ffe93 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/jdbc/dialect/internal/ReactiveStandardDialectResolver.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/jdbc/dialect/internal/ReactiveStandardDialectResolver.java @@ -8,8 +8,19 @@ import org.hibernate.dialect.CockroachDialect; import org.hibernate.dialect.Database; import org.hibernate.dialect.Dialect; +import org.hibernate.dialect.DialectDelegateWrapper; import org.hibernate.engine.jdbc.dialect.spi.DialectResolutionInfo; import org.hibernate.engine.jdbc.dialect.spi.DialectResolver; +import org.hibernate.engine.spi.SessionFactoryImplementor; +import org.hibernate.persister.entity.mutation.EntityMutationTarget; +import org.hibernate.reactive.dialect.ReactiveOracleSqlAstTranslator; +import org.hibernate.sql.ast.SqlAstTranslator; +import org.hibernate.sql.ast.SqlAstTranslatorFactory; +import org.hibernate.sql.ast.spi.StandardSqlAstTranslatorFactory; +import org.hibernate.sql.ast.tree.Statement; +import org.hibernate.sql.exec.spi.JdbcOperation; +import org.hibernate.sql.model.MutationOperation; +import org.hibernate.sql.model.internal.OptionalTableUpdate; import static org.hibernate.dialect.CockroachDialect.parseVersion; @@ -25,7 +36,30 @@ public Dialect resolveDialect(DialectResolutionInfo info) { for ( Database database : Database.values() ) { if ( database.matchesResolutionInfo( info ) ) { - return database.createDialect( info ); + Dialect dialect = database.createDialect( info ); + if ( info.getDatabaseName().toUpperCase().startsWith( "ORACLE" ) ) { + return new DialectDelegateWrapper( dialect ) { + @Override + public MutationOperation createOptionalTableUpdateOperation( + EntityMutationTarget mutationTarget, + OptionalTableUpdate optionalTableUpdate, + SessionFactoryImplementor factory) { + return new ReactiveOracleSqlAstTranslator<>( factory, optionalTableUpdate ) + .createMergeOperation( optionalTableUpdate ); + } + + @Override + public SqlAstTranslatorFactory getSqlAstTranslatorFactory() { + return new StandardSqlAstTranslatorFactory() { + @Override + protected SqlAstTranslator buildTranslator(SessionFactoryImplementor sessionFactory, Statement statement) { + return new ReactiveOracleSqlAstTranslator<>( sessionFactory, statement ); + } + }; + } + }; + } + return dialect; } } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/jdbc/mutation/internal/ReactiveMutationExecutorSingleSelfExecuting.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/jdbc/mutation/internal/ReactiveMutationExecutorSingleSelfExecuting.java index ea39d8a6e..2382059e3 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/jdbc/mutation/internal/ReactiveMutationExecutorSingleSelfExecuting.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/engine/jdbc/mutation/internal/ReactiveMutationExecutorSingleSelfExecuting.java @@ -5,15 +5,53 @@ */ package org.hibernate.reactive.engine.jdbc.mutation.internal; + +import java.util.concurrent.CompletionStage; + +import org.hibernate.engine.jdbc.mutation.TableInclusionChecker; import org.hibernate.engine.jdbc.mutation.internal.MutationExecutorSingleSelfExecuting; import org.hibernate.engine.spi.SharedSessionContractImplementor; import org.hibernate.reactive.engine.jdbc.env.internal.ReactiveMutationExecutor; +import org.hibernate.reactive.logging.impl.Log; +import org.hibernate.reactive.sql.model.ReactiveSelfExecutingUpdateOperation; import org.hibernate.sql.model.SelfExecutingUpdateOperation; +import org.hibernate.sql.model.ValuesAnalysis; + +import static java.lang.invoke.MethodHandles.lookup; +import static org.hibernate.reactive.logging.impl.LoggerFactory.make; +import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture; public class ReactiveMutationExecutorSingleSelfExecuting extends MutationExecutorSingleSelfExecuting implements ReactiveMutationExecutor { - public ReactiveMutationExecutorSingleSelfExecuting(SelfExecutingUpdateOperation operation, SharedSessionContractImplementor session) { + private static final Log LOG = make( Log.class, lookup() ); + + private final SelfExecutingUpdateOperation operation; + + public ReactiveMutationExecutorSingleSelfExecuting( + SelfExecutingUpdateOperation operation, + SharedSessionContractImplementor session) { super( operation, session ); + this.operation = operation; + } + + @Override + protected void performSelfExecutingOperations( + ValuesAnalysis valuesAnalysis, + TableInclusionChecker inclusionChecker, + SharedSessionContractImplementor session) { + throw LOG.nonReactiveMethodCall( "performReactiveSelfExecutingOperation" ); + } + + @Override + public CompletionStage performReactiveSelfExecutingOperations( + ValuesAnalysis valuesAnalysis, + TableInclusionChecker inclusionChecker, + SharedSessionContractImplementor session) { + if ( inclusionChecker.include( operation.getTableDetails() ) && operation instanceof ReactiveSelfExecutingUpdateOperation ) { + return ( (ReactiveSelfExecutingUpdateOperation) operation ) + .performReactiveMutation( getJdbcValueBindings(), valuesAnalysis, session ); + } + return voidFuture(); } } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/insert/ReactiveAbstractReturningDelegate.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/insert/ReactiveAbstractReturningDelegate.java index ba0677a7e..e2794b218 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/insert/ReactiveAbstractReturningDelegate.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/insert/ReactiveAbstractReturningDelegate.java @@ -11,6 +11,7 @@ import org.hibernate.dialect.CockroachDialect; import org.hibernate.dialect.DB2Dialect; import org.hibernate.dialect.Dialect; +import org.hibernate.dialect.DialectDelegateWrapper; import org.hibernate.dialect.MySQLDialect; import org.hibernate.dialect.OracleDialect; import org.hibernate.dialect.SQLServerDialect; @@ -69,7 +70,8 @@ && getPersister().getFactory().getJdbcServices().getDialect() instanceof Cockroa private static String createInsert(PreparedStatementDetails insertStatementDetails, String identifierColumnName, Dialect dialect) { final String sqlEnd = " returning " + identifierColumnName; - if ( dialect instanceof MySQLDialect ) { + Dialect realDialect = DialectDelegateWrapper.extractRealDialect( dialect ); + if ( realDialect instanceof MySQLDialect ) { // For some reasons ORM generates a query with an invalid syntax String sql = insertStatementDetails.getSqlString(); int index = sql.lastIndexOf( sqlEnd ); @@ -77,7 +79,7 @@ private static String createInsert(PreparedStatementDetails insertStatementDetai ? sql.substring( 0, index ) : sql; } - if ( dialect instanceof SQLServerDialect ) { + if ( realDialect instanceof SQLServerDialect ) { String sql = insertStatementDetails.getSqlString(); int index = sql.lastIndexOf( sqlEnd ); // FIXME: this is a hack for HHH-16365 @@ -94,12 +96,12 @@ private static String createInsert(PreparedStatementDetails insertStatementDetai } return sql; } - if ( dialect instanceof DB2Dialect ) { + if ( realDialect instanceof DB2Dialect ) { // ORM query: select id from new table ( insert into IntegerTypeEntity values ( )) // Correct : select id from new table ( insert into LongTypeEntity (id) values (default)) return insertStatementDetails.getSqlString().replace( " values ( ))", " (" + identifierColumnName + ") values (default))" ); } - if ( dialect instanceof OracleDialect ) { + if ( realDialect instanceof OracleDialect ) { final String valuesStr = " values ( )"; String sql = insertStatementDetails.getSqlString(); int index = sql.lastIndexOf( sqlEnd ); diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveMergeCoordinatorStandardScopeFactory.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveMergeCoordinatorStandardScopeFactory.java index 290e0efc8..0b9bf87a0 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveMergeCoordinatorStandardScopeFactory.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveMergeCoordinatorStandardScopeFactory.java @@ -5,17 +5,32 @@ */ package org.hibernate.reactive.persister.entity.impl; + +import org.hibernate.dialect.OracleDialect; import org.hibernate.engine.spi.SessionFactoryImplementor; import org.hibernate.persister.entity.AbstractEntityPersister; import org.hibernate.persister.entity.mutation.MergeCoordinator; import org.hibernate.reactive.persister.entity.mutation.ReactiveMergeCoordinator; import org.hibernate.reactive.persister.entity.mutation.ReactiveScopedUpdateCoordinator; import org.hibernate.reactive.persister.entity.mutation.ReactiveUpdateCoordinator; +import org.hibernate.reactive.sql.model.ReactiveOptionalTableUpdateOperation; +import org.hibernate.sql.model.ModelMutationLogging; +import org.hibernate.sql.model.MutationOperation; +import org.hibernate.sql.model.MutationOperationGroup; +import org.hibernate.sql.model.ValuesAnalysis; +import org.hibernate.sql.model.ast.MutationGroup; +import org.hibernate.sql.model.ast.TableMutation; +import org.hibernate.sql.model.internal.MutationOperationGroupFactory; +import org.hibernate.sql.model.internal.OptionalTableUpdate; +import org.hibernate.sql.model.jdbc.DeleteOrUpsertOperation; +import org.hibernate.sql.model.jdbc.OptionalTableUpdateOperation; public class ReactiveMergeCoordinatorStandardScopeFactory extends MergeCoordinator implements ReactiveUpdateCoordinator { - public ReactiveMergeCoordinatorStandardScopeFactory(AbstractEntityPersister entityPersister, SessionFactoryImplementor factory) { + public ReactiveMergeCoordinatorStandardScopeFactory( + AbstractEntityPersister entityPersister, + SessionFactoryImplementor factory) { super( entityPersister, factory ); } @@ -24,10 +39,77 @@ public ReactiveScopedUpdateCoordinator makeScopedCoordinator() { return new ReactiveMergeCoordinator( entityPersister(), factory(), - this.getStaticUpdateGroup(), - this.getBatchKey(), - this.getVersionUpdateGroup(), - this.getVersionUpdateBatchkey() + getStaticUpdateGroup(), + getBatchKey(), + getVersionUpdateGroup(), + getVersionUpdateBatchkey() ); } + + @Override + protected MutationOperationGroup createOperationGroup(ValuesAnalysis valuesAnalysis, MutationGroup mutationGroup) { + final int numberOfTableMutations = mutationGroup.getNumberOfTableMutations(); + switch ( numberOfTableMutations ) { + case 0: + return MutationOperationGroupFactory.noOperations( mutationGroup ); + case 1: { + MutationOperation operation = createOperation( valuesAnalysis, mutationGroup.getSingleTableMutation() ); + return operation == null + ? MutationOperationGroupFactory.noOperations( mutationGroup ) + : MutationOperationGroupFactory.singleOperation( mutationGroup, operation ); + } + default: { + MutationOperation[] operations = new MutationOperation[numberOfTableMutations]; + int outputIndex = 0; + int skipped = 0; + for ( int i = 0; i < mutationGroup.getNumberOfTableMutations(); i++ ) { + final TableMutation tableMutation = mutationGroup.getTableMutation( i ); + MutationOperation operation = createOperation( valuesAnalysis, tableMutation ); + if ( operation != null ) { + operations[outputIndex++] = operation; + } + else { + skipped++; + ModelMutationLogging.MODEL_MUTATION_LOGGER.debugf( + "Skipping table update - %s", + tableMutation.getTableName() + ); + } + } + if ( skipped != 0 ) { + final MutationOperation[] trimmed = new MutationOperation[outputIndex]; + System.arraycopy( operations, 0, trimmed, 0, outputIndex ); + operations = trimmed; + } + return MutationOperationGroupFactory.manyOperations( + mutationGroup.getMutationType(), + entityPersister, + operations + ); + } + } + } + + // FIXME: We could add this method in ORM and override only this code + protected MutationOperation createOperation(ValuesAnalysis valuesAnalysis, TableMutation singleTableMutation) { + MutationOperation operation = singleTableMutation.createMutationOperation( valuesAnalysis, factory() ); + if ( operation instanceof OptionalTableUpdateOperation ) { + // We need to plug in our own reactive operation + return new ReactiveOptionalTableUpdateOperation( + operation.getMutationTarget(), + (OptionalTableUpdate) singleTableMutation, + factory() + ); + } + if ( operation instanceof DeleteOrUpsertOperation + && factory().getJdbcServices().getDialect() instanceof OracleDialect ) { + OracleDialect dialect = ( (OracleDialect) factory().getJdbcServices().getDialect() ); + return dialect.createOptionalTableUpdateOperation( + ( (OptionalTableUpdate) operation ).getMutationTarget(), + (OptionalTableUpdate) operation, + factory() + ); + } + return operation; + } } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/impl/ReactiveTypeContributor.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/impl/ReactiveTypeContributor.java index 7a111e43b..0724718c9 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/impl/ReactiveTypeContributor.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/impl/ReactiveTypeContributor.java @@ -21,6 +21,7 @@ import org.hibernate.dialect.CockroachDialect; import org.hibernate.dialect.DB2Dialect; import org.hibernate.dialect.Dialect; +import org.hibernate.dialect.DialectDelegateWrapper; import org.hibernate.dialect.MySQLDialect; import org.hibernate.dialect.OracleDialect; import org.hibernate.dialect.PostgreSQLDialect; @@ -99,7 +100,7 @@ private void registerReactiveChanges(TypeContributions typeContributions, Servic } private Dialect dialect(ServiceRegistry serviceRegistry) { - return serviceRegistry.getService( JdbcEnvironment.class ).getDialect(); + return DialectDelegateWrapper.extractRealDialect( serviceRegistry.getService( JdbcEnvironment.class ).getDialect() ); } /** diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/StandardReactiveJdbcMutationExecutor.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/StandardReactiveJdbcMutationExecutor.java index de81e9891..ea272765a 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/StandardReactiveJdbcMutationExecutor.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/StandardReactiveJdbcMutationExecutor.java @@ -13,6 +13,7 @@ import java.util.function.Function; import org.hibernate.dialect.Dialect; +import org.hibernate.dialect.DialectDelegateWrapper; import org.hibernate.engine.jdbc.spi.JdbcServices; import org.hibernate.engine.spi.SharedSessionContractImplementor; import org.hibernate.query.spi.QueryOptions; @@ -127,7 +128,7 @@ private static String finalSql( .getSessionFactoryOptions() .isCommentsEnabled() ); - final Dialect dialect = executionContext.getSession().getJdbcServices().getDialect(); + final Dialect dialect = DialectDelegateWrapper.extractRealDialect( executionContext.getSession().getJdbcServices().getDialect() ); return Parameters.instance( dialect ).process( sql ); } } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/model/ReactiveDeleteOrUpsertOperation.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/model/ReactiveDeleteOrUpsertOperation.java new file mode 100644 index 000000000..d837af865 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/model/ReactiveDeleteOrUpsertOperation.java @@ -0,0 +1,156 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.model; + +import java.util.concurrent.CompletionStage; + +import org.hibernate.engine.jdbc.mutation.JdbcValueBindings; +import org.hibernate.engine.jdbc.mutation.group.PreparedStatementDetails; +import org.hibernate.engine.jdbc.mutation.internal.MutationQueryOptions; +import org.hibernate.engine.jdbc.mutation.internal.PreparedStatementGroupSingleTable; +import org.hibernate.engine.spi.SharedSessionContractImplementor; +import org.hibernate.persister.entity.mutation.EntityMutationTarget; +import org.hibernate.persister.entity.mutation.EntityTableMapping; +import org.hibernate.persister.entity.mutation.UpdateValuesAnalysis; +import org.hibernate.reactive.adaptor.impl.PrepareStatementDetailsAdaptor; +import org.hibernate.reactive.adaptor.impl.PreparedStatementAdaptor; +import org.hibernate.reactive.logging.impl.Log; +import org.hibernate.reactive.pool.ReactiveConnection; +import org.hibernate.reactive.session.ReactiveConnectionSupplier; +import org.hibernate.sql.ast.SqlAstTranslator; +import org.hibernate.sql.model.TableMapping; +import org.hibernate.sql.model.ValuesAnalysis; +import org.hibernate.sql.model.internal.OptionalTableUpdate; +import org.hibernate.sql.model.internal.TableDeleteStandard; +import org.hibernate.sql.model.jdbc.DeleteOrUpsertOperation; +import org.hibernate.sql.model.jdbc.JdbcDeleteMutation; +import org.hibernate.sql.model.jdbc.UpsertOperation; + +import static java.lang.invoke.MethodHandles.lookup; +import static java.util.Collections.emptyList; +import static org.hibernate.reactive.logging.impl.LoggerFactory.make; +import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture; +import static org.hibernate.sql.model.ModelMutationLogging.MODEL_MUTATION_LOGGER; + +public class ReactiveDeleteOrUpsertOperation extends DeleteOrUpsertOperation + implements ReactiveSelfExecutingUpdateOperation { + private static final Log LOG = make( Log.class, lookup() ); + private final OptionalTableUpdate upsert; + private final UpsertOperation upsertOperation; + + public ReactiveDeleteOrUpsertOperation( + EntityMutationTarget mutationTarget, + EntityTableMapping tableMapping, + UpsertOperation upsertOperation, + OptionalTableUpdate optionalTableUpdate) { + super( mutationTarget, tableMapping, upsertOperation, optionalTableUpdate ); + this.upsert = optionalTableUpdate; + this.upsertOperation = upsertOperation; + } + + @Override + public void performMutation( + JdbcValueBindings jdbcValueBindings, + ValuesAnalysis valuesAnalysis, + SharedSessionContractImplementor session) { + throw LOG.nonReactiveMethodCall( "performReactiveMutation" ); + } + + @Override + public CompletionStage performReactiveMutation( + JdbcValueBindings jdbcValueBindings, + ValuesAnalysis incomingValuesAnalysis, + SharedSessionContractImplementor session) { + final UpdateValuesAnalysis valuesAnalysis = (UpdateValuesAnalysis) incomingValuesAnalysis; + if ( !valuesAnalysis.getTablesNeedingUpdate().contains( getTableDetails() ) ) { + return voidFuture(); + } + + return doReactiveMutation( getTableDetails(), jdbcValueBindings, valuesAnalysis, session ); + } + + private CompletionStage doReactiveMutation( + TableMapping tableMapping, + JdbcValueBindings jdbcValueBindings, + UpdateValuesAnalysis valuesAnalysis, + SharedSessionContractImplementor session) { + + return voidFuture() + .thenCompose( v -> !valuesAnalysis.getTablesWithNonNullValues().contains( tableMapping ) + ? performReactiveDelete( jdbcValueBindings, session ) + : performReactiveUpsert( jdbcValueBindings, session ) + ) + .whenComplete( (o, throwable) -> jdbcValueBindings.afterStatement( tableMapping ) ); + } + + private CompletionStage performReactiveUpsert( + JdbcValueBindings jdbcValueBindings, + SharedSessionContractImplementor session) { + final String tableName = getTableDetails().getTableName(); + MODEL_MUTATION_LOGGER.tracef( "#performReactiveUpsert(%s)", tableName ); + + final PreparedStatementGroupSingleTable statementGroup = new PreparedStatementGroupSingleTable( + upsertOperation, + session + ); + final PreparedStatementDetails statementDetails = statementGroup.resolvePreparedStatementDetails( tableName ); + + session.getJdbcServices().getSqlStatementLogger().logStatement( statementDetails.getSqlString() ); + Object[] params = PreparedStatementAdaptor.bind( statement -> { + PreparedStatementDetails details = new PrepareStatementDetailsAdaptor( + statementDetails, + statement, + session.getJdbcServices() + ); + jdbcValueBindings.beforeStatement( details ); + } ); + + ReactiveConnection reactiveConnection = ( (ReactiveConnectionSupplier) session ).getReactiveConnection(); + return reactiveConnection + .update( statementDetails.getSqlString(), params ) + .thenAccept( rowCount -> MODEL_MUTATION_LOGGER + .tracef( "`%s` rows upserted into `%s`", rowCount, getTableDetails().getTableName() ) + ); + } + + private CompletionStage performReactiveDelete( + JdbcValueBindings jdbcValueBindings, + SharedSessionContractImplementor session) { + final String tableName = getTableDetails().getTableName(); + MODEL_MUTATION_LOGGER.tracef( "#performReactiveDelete(%s)", tableName ); + + final TableDeleteStandard upsertDeleteAst = new TableDeleteStandard( + upsert.getMutatingTable(), + getMutationTarget(), + "upsert delete", + upsert.getKeyBindings(), + emptyList(), + emptyList() + ); + + final SqlAstTranslator translator = session + .getJdbcServices() + .getJdbcEnvironment() + .getSqlAstTranslatorFactory() + .buildModelMutationTranslator( upsertDeleteAst, session.getFactory() ); + final JdbcDeleteMutation upsertDelete = translator.translate( null, MutationQueryOptions.INSTANCE ); + + final PreparedStatementGroupSingleTable statementGroup = new PreparedStatementGroupSingleTable( upsertDelete, session ); + final PreparedStatementDetails statementDetails = statementGroup.resolvePreparedStatementDetails( tableName ); + + session.getJdbcServices().getSqlStatementLogger().logStatement( statementDetails.getSqlString() ); + Object[] params = PreparedStatementAdaptor.bind( statement -> { + PreparedStatementDetails details = new PrepareStatementDetailsAdaptor( statementDetails, statement, session.getJdbcServices() ); + jdbcValueBindings.beforeStatement( details ); + } ); + + ReactiveConnection reactiveConnection = ( (ReactiveConnectionSupplier) session ).getReactiveConnection(); + String sqlString = statementDetails.getSqlString(); + return reactiveConnection + .update( sqlString, params ) + .thenAccept( rowCount -> MODEL_MUTATION_LOGGER.tracef( "`%s` rows upsert-deleted from `%s`", rowCount, tableName ) ); + } +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/model/ReactiveOptionalTableUpdateOperation.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/model/ReactiveOptionalTableUpdateOperation.java new file mode 100644 index 000000000..6fea783a3 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/model/ReactiveOptionalTableUpdateOperation.java @@ -0,0 +1,333 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.model; + +import java.sql.SQLException; +import java.util.Collections; +import java.util.concurrent.CompletionStage; + +import org.hibernate.engine.jdbc.mutation.JdbcValueBindings; +import org.hibernate.engine.jdbc.mutation.group.PreparedStatementDetails; +import org.hibernate.engine.jdbc.mutation.internal.MutationQueryOptions; +import org.hibernate.engine.jdbc.mutation.internal.PreparedStatementGroupSingleTable; +import org.hibernate.engine.spi.SessionFactoryImplementor; +import org.hibernate.engine.spi.SharedSessionContractImplementor; +import org.hibernate.internal.util.collections.CollectionHelper; +import org.hibernate.persister.entity.mutation.UpdateValuesAnalysis; +import org.hibernate.reactive.adaptor.impl.PrepareStatementDetailsAdaptor; +import org.hibernate.reactive.adaptor.impl.PreparedStatementAdaptor; +import org.hibernate.reactive.logging.impl.Log; +import org.hibernate.reactive.pool.ReactiveConnection; +import org.hibernate.reactive.session.ReactiveConnectionSupplier; +import org.hibernate.reactive.util.impl.CompletionStages; +import org.hibernate.sql.ast.SqlAstTranslator; +import org.hibernate.sql.ast.SqlAstTranslatorFactory; +import org.hibernate.sql.model.MutationTarget; +import org.hibernate.sql.model.TableMapping; +import org.hibernate.sql.model.ValuesAnalysis; +import org.hibernate.sql.model.ast.MutatingTableReference; +import org.hibernate.sql.model.ast.TableDelete; +import org.hibernate.sql.model.ast.TableInsert; +import org.hibernate.sql.model.ast.TableUpdate; +import org.hibernate.sql.model.internal.OptionalTableUpdate; +import org.hibernate.sql.model.internal.TableDeleteCustomSql; +import org.hibernate.sql.model.internal.TableDeleteStandard; +import org.hibernate.sql.model.internal.TableInsertCustomSql; +import org.hibernate.sql.model.internal.TableInsertStandard; +import org.hibernate.sql.model.internal.TableUpdateCustomSql; +import org.hibernate.sql.model.internal.TableUpdateStandard; +import org.hibernate.sql.model.jdbc.JdbcDeleteMutation; +import org.hibernate.sql.model.jdbc.JdbcInsertMutation; +import org.hibernate.sql.model.jdbc.JdbcMutationOperation; +import org.hibernate.sql.model.jdbc.OptionalTableUpdateOperation; + +import static java.lang.invoke.MethodHandles.lookup; +import static org.hibernate.reactive.logging.impl.LoggerFactory.make; +import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture; +import static org.hibernate.sql.model.ModelMutationLogging.MODEL_MUTATION_LOGGER; + +public class ReactiveOptionalTableUpdateOperation extends OptionalTableUpdateOperation + implements ReactiveSelfExecutingUpdateOperation { + private static final Log LOG = make( Log.class, lookup() ); + private final OptionalTableUpdate upsert; + + public ReactiveOptionalTableUpdateOperation( + MutationTarget mutationTarget, + OptionalTableUpdate upsert, + SessionFactoryImplementor factory) { + super( mutationTarget, upsert, factory ); + this.upsert = upsert; + } + + @Override + public void performMutation( + JdbcValueBindings jdbcValueBindings, + ValuesAnalysis valuesAnalysis, + SharedSessionContractImplementor session) { + throw LOG.nonReactiveMethodCall( "performReactiveMutation" ); + } + + @Override + public CompletionStage performReactiveMutation( + JdbcValueBindings jdbcValueBindings, + ValuesAnalysis incomingValuesAnalysis, + SharedSessionContractImplementor session) { + final UpdateValuesAnalysis valuesAnalysis = (UpdateValuesAnalysis) incomingValuesAnalysis; + if ( !valuesAnalysis.getTablesNeedingUpdate().contains( getTableDetails() ) ) { + return voidFuture(); + } + + return doReactiveMutation( getTableDetails(), jdbcValueBindings, valuesAnalysis, session ); + } + + private CompletionStage doReactiveMutation( + TableMapping tableMapping, + JdbcValueBindings jdbcValueBindings, + UpdateValuesAnalysis valuesAnalysis, + SharedSessionContractImplementor session) { + + return voidFuture() + .thenCompose( v -> { + if ( shouldDelete( valuesAnalysis, tableMapping ) ) { + return performReactiveDelete( jdbcValueBindings, session ); + } + else { + return performReactiveUpdate( jdbcValueBindings, session ) + .thenCompose( wasUpdated -> { + if ( !wasUpdated ) { + MODEL_MUTATION_LOGGER.debugf( "Upsert update altered no rows - inserting : %s", tableMapping.getTableName() ); + return performReactiveInsert( jdbcValueBindings, session ); + } + return voidFuture(); + } ); + } + } ) + .whenComplete( (o, throwable) -> jdbcValueBindings.afterStatement( tableMapping ) ); + } + + private boolean shouldDelete(UpdateValuesAnalysis valuesAnalysis, TableMapping tableMapping) { + return !valuesAnalysis.getTablesWithNonNullValues().contains( tableMapping ) + // all the new values for this table were null - possibly delete the row + && valuesAnalysis.getTablesWithPreviousNonNullValues().contains( tableMapping ); + } + + /** + * @see org.hibernate.sql.model.jdbc.OptionalTableUpdateOperation#performDelete(JdbcValueBindings, SharedSessionContractImplementor) + */ + private CompletionStage performReactiveDelete( + JdbcValueBindings jdbcValueBindings, + SharedSessionContractImplementor session) { + final JdbcDeleteMutation jdbcDelete = createJdbcDelete( session ); + + final PreparedStatementGroupSingleTable statementGroup = new PreparedStatementGroupSingleTable( + jdbcDelete, + session + ); + final PreparedStatementDetails statementDetails = statementGroup.resolvePreparedStatementDetails( + getTableDetails().getTableName() ); + + session.getJdbcServices().getSqlStatementLogger().logStatement( jdbcDelete.getSqlString() ); + + Object[] params = PreparedStatementAdaptor.bind( statement -> { + PreparedStatementDetails details = new PrepareStatementDetailsAdaptor( + statementDetails, + statement, + session.getJdbcServices() + ); + jdbcValueBindings.beforeStatement( details ); + } ); + + ReactiveConnection reactiveConnection = ( (ReactiveConnectionSupplier) session ).getReactiveConnection(); + return reactiveConnection.update( statementDetails.getSqlString(), params ).thenCompose( CompletionStages::voidFuture); + } + + /** + * @see org.hibernate.sql.model.jdbc.OptionalTableUpdateOperation#performUpdate(JdbcValueBindings, SharedSessionContractImplementor) + */ + private CompletionStage performReactiveUpdate( + JdbcValueBindings jdbcValueBindings, + SharedSessionContractImplementor session) { + MODEL_MUTATION_LOGGER.tracef( "#performReactiveUpdate(%s)", getTableDetails().getTableName() ); + + final JdbcMutationOperation jdbcUpdate = createJdbcUpdate( session ); + final PreparedStatementGroupSingleTable statementGroup = new PreparedStatementGroupSingleTable( jdbcUpdate, session ); + final PreparedStatementDetails statementDetails = statementGroup + .resolvePreparedStatementDetails( getTableDetails().getTableName() ); + + // If we get here the statement is needed - make sure it is resolved + Object[] params = PreparedStatementAdaptor.bind( statement -> { + PreparedStatementDetails details = new PrepareStatementDetailsAdaptor( + statementDetails, + statement, + session.getJdbcServices() + ); + jdbcValueBindings.beforeStatement( details ); + } ); + + session.getJdbcServices().getSqlStatementLogger().logStatement( statementDetails.getSqlString() ); + + ReactiveConnection reactiveConnection = ( (ReactiveConnectionSupplier) session ).getReactiveConnection(); + return reactiveConnection + .update( statementDetails.getSqlString(), params ) + .thenApply( rowCount -> { + if ( rowCount == 0 ) { + return false; + } + try { + upsert.getExpectation() + .verifyOutcome( + rowCount, + statementDetails.getStatement(), + -1, + statementDetails.getSqlString() + ); + return true; + } + catch (SQLException e) { + throw session.getJdbcServices().getSqlExceptionHelper().convert( + e, + "Unable to execute mutation PreparedStatement against table `" + getTableDetails().getTableName() + "`", + statementDetails.getSqlString() + ); + } + } ); + } + + // FIXME: Adding this to ORM will save us some duplicated code (similar to createJdbcInsert and createJdbcDelete) + private JdbcMutationOperation createJdbcUpdate(SharedSessionContractImplementor session) { + MutationTarget mutationTarget = super.getMutationTarget(); + TableUpdate tableUpdate; + if ( getTableDetails().getUpdateDetails() != null + && getTableDetails().getUpdateDetails().getCustomSql() != null ) { + tableUpdate = new TableUpdateCustomSql( + new MutatingTableReference( getTableDetails() ), + mutationTarget, + "upsert update for " + mutationTarget.getRolePath(), + upsert.getValueBindings(), + upsert.getKeyBindings(), + upsert.getOptimisticLockBindings(), + upsert.getParameters() + ); + } + else { + tableUpdate = new TableUpdateStandard( + new MutatingTableReference( getTableDetails() ), + mutationTarget, + "upsert update for " + mutationTarget.getRolePath(), + upsert.getValueBindings(), + upsert.getKeyBindings(), + upsert.getOptimisticLockBindings(), + upsert.getParameters() + ); + } + + final SqlAstTranslator translator = session + .getJdbcServices() + .getJdbcEnvironment() + .getSqlAstTranslatorFactory() + .buildModelMutationTranslator( tableUpdate, session.getFactory() ); + + return translator.translate( null, MutationQueryOptions.INSTANCE ); + } + + private CompletionStage performReactiveInsert( + JdbcValueBindings jdbcValueBindings, + SharedSessionContractImplementor session) { + final JdbcInsertMutation jdbcInsert = createJdbcInsert( session ); + + final PreparedStatementGroupSingleTable statementGroup = new PreparedStatementGroupSingleTable( jdbcInsert, session ); + final PreparedStatementDetails statementDetails = statementGroup.resolvePreparedStatementDetails( getTableDetails().getTableName() ); + // If we get here the statement is needed - make sure it is resolved + Object[] params = PreparedStatementAdaptor.bind( statement -> { + PreparedStatementDetails details = new PrepareStatementDetailsAdaptor( statementDetails, statement, session.getJdbcServices() ); + jdbcValueBindings.beforeStatement( details ); + } ); + + ReactiveConnection reactiveConnection = ( (ReactiveConnectionSupplier) session ).getReactiveConnection(); + return reactiveConnection.update( statementDetails.getSqlString(), params ).thenCompose(CompletionStages::voidFuture); + } + + /** + * @see org.hibernate.sql.model.jdbc.OptionalTableUpdateOperation#createJdbcInsert(SharedSessionContractImplementor) + */ + // FIXME: change visibility to protected in ORM and remove this method + private JdbcInsertMutation createJdbcInsert(SharedSessionContractImplementor session) { + final TableInsert tableInsert; + if ( getTableDetails().getInsertDetails() != null + && getTableDetails().getInsertDetails().getCustomSql() != null ) { + tableInsert = new TableInsertCustomSql( + new MutatingTableReference( getTableDetails() ), + getMutationTarget(), + CollectionHelper.combine( upsert.getValueBindings(), upsert.getKeyBindings() ), + upsert.getParameters() + ); + } + else { + tableInsert = new TableInsertStandard( + new MutatingTableReference( getTableDetails() ), + getMutationTarget(), + CollectionHelper.combine( upsert.getValueBindings(), upsert.getKeyBindings() ), + Collections.emptyList(), + upsert.getParameters() + ); + } + + final SessionFactoryImplementor factory = session.getSessionFactory(); + final SqlAstTranslatorFactory sqlAstTranslatorFactory = factory + .getJdbcServices() + .getJdbcEnvironment() + .getSqlAstTranslatorFactory(); + + final SqlAstTranslator translator = sqlAstTranslatorFactory.buildModelMutationTranslator( + tableInsert, + factory + ); + + return translator.translate( null, MutationQueryOptions.INSTANCE ); + } + + /** + * @see org.hibernate.sql.model.jdbc.OptionalTableUpdateOperation#createJdbcDelete(SharedSessionContractImplementor) + */ + // FIXME: change visibility to protected in ORM and remove this method + private JdbcDeleteMutation createJdbcDelete(SharedSessionContractImplementor session) { + final TableDelete tableDelete; + if ( getTableDetails().getDeleteDetails() != null + && getTableDetails().getDeleteDetails().getCustomSql() != null ) { + tableDelete = new TableDeleteCustomSql( + new MutatingTableReference( getTableDetails() ), + getMutationTarget(), + "upsert delete for " + upsert.getMutationTarget().getRolePath(), + upsert.getKeyBindings(), + upsert.getOptimisticLockBindings(), + upsert.getParameters() + ); + } + else { + tableDelete = new TableDeleteStandard( + new MutatingTableReference( getTableDetails() ), + getMutationTarget(), + "upsert delete for " + getMutationTarget().getRolePath(), + upsert.getKeyBindings(), + upsert.getOptimisticLockBindings(), + upsert.getParameters() + ); + } + + final SessionFactoryImplementor factory = session.getSessionFactory(); + final SqlAstTranslatorFactory sqlAstTranslatorFactory = factory + .getJdbcServices() + .getJdbcEnvironment() + .getSqlAstTranslatorFactory(); + + final SqlAstTranslator translator = sqlAstTranslatorFactory.buildModelMutationTranslator( + tableDelete, + factory + ); + + return translator.translate( null, MutationQueryOptions.INSTANCE ); + } +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/model/ReactiveSelfExecutingUpdateOperation.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/model/ReactiveSelfExecutingUpdateOperation.java new file mode 100644 index 000000000..98bb99a0d --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/model/ReactiveSelfExecutingUpdateOperation.java @@ -0,0 +1,24 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.model; + +import java.util.concurrent.CompletionStage; + +import org.hibernate.engine.jdbc.mutation.JdbcValueBindings; +import org.hibernate.engine.spi.SharedSessionContractImplementor; +import org.hibernate.sql.model.SelfExecutingUpdateOperation; +import org.hibernate.sql.model.ValuesAnalysis; + +/** + * @see org.hibernate.sql.model.SelfExecutingUpdateOperation + */ +public interface ReactiveSelfExecutingUpdateOperation extends SelfExecutingUpdateOperation { + + CompletionStage performReactiveMutation( + JdbcValueBindings jdbcValueBindings, + ValuesAnalysis valuesAnalysis, + SharedSessionContractImplementor session); +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/results/internal/ReactiveDeferredResultSetAccess.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/results/internal/ReactiveDeferredResultSetAccess.java index 4308b7722..09a2afaa2 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/results/internal/ReactiveDeferredResultSetAccess.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/results/internal/ReactiveDeferredResultSetAccess.java @@ -16,6 +16,7 @@ import org.hibernate.HibernateException; import org.hibernate.dialect.Dialect; +import org.hibernate.dialect.DialectDelegateWrapper; import org.hibernate.engine.jdbc.spi.SqlStatementLogger; import org.hibernate.engine.spi.SessionEventListenerManager; import org.hibernate.engine.spi.SessionFactoryImplementor; @@ -150,7 +151,7 @@ private CompletionStage executeQuery() { .thenCompose( lg -> { LOG.tracef( "Executing query to retrieve ResultSet : %s", getFinalSql() ); - Dialect dialect = executionContext.getSession().getJdbcServices().getDialect(); + Dialect dialect = DialectDelegateWrapper.extractRealDialect( executionContext.getSession().getJdbcServices().getDialect() ); // I'm not sure calling Parameters here is necessary, the query should already have the right parameters final String sql = Parameters.instance( dialect ).process( getFinalSql() ); Object[] parameters = PreparedStatementAdaptor.bind( super::bindParameters ); diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/UpsertTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/UpsertTest.java index 3ba93cbac..52dc52a72 100644 --- a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/UpsertTest.java +++ b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/UpsertTest.java @@ -9,41 +9,62 @@ import java.util.List; import java.util.Objects; -import org.hibernate.reactive.testing.DBSelectionExtension; +import org.hibernate.boot.registry.StandardServiceRegistryBuilder; +import org.hibernate.cfg.Configuration; +import org.hibernate.reactive.testing.SqlStatementTracker; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; import io.vertx.junit5.Timeout; import io.vertx.junit5.VertxTestContext; import jakarta.persistence.Entity; import jakarta.persistence.Id; import jakarta.persistence.Table; +import org.assertj.core.api.Condition; import static java.util.concurrent.TimeUnit.MINUTES; import static org.assertj.core.api.Assertions.assertThat; -import static org.hibernate.reactive.containers.DatabaseConfiguration.DBType.COCKROACHDB; -import static org.hibernate.reactive.containers.DatabaseConfiguration.DBType.DB2; -import static org.hibernate.reactive.containers.DatabaseConfiguration.DBType.MARIA; -import static org.hibernate.reactive.containers.DatabaseConfiguration.DBType.MYSQL; -import static org.hibernate.reactive.containers.DatabaseConfiguration.DBType.ORACLE; -import static org.hibernate.reactive.testing.DBSelectionExtension.skipTestsFor; +import static org.hibernate.reactive.containers.DatabaseConfiguration.dbType; /** * Same as Hibernate ORM org.hibernate.orm.test.stateless.UpsertTest *

- * These tests are in a separate class because we need to skip the execution on some databases, - * but once this has been resolved, they could be in {@link ReactiveStatelessSessionTest}. + * These tests are in a separate class because we need to skip the execution on some databases, + * but once this has been resolved, they could be in {@link ReactiveStatelessSessionTest}. *

*/ @Timeout(value = 10, timeUnit = MINUTES) public class UpsertTest extends BaseReactiveTest { - /** - * Something is missing in HR to make it work for these databases. - */ - @RegisterExtension - public DBSelectionExtension dbSelection = skipTestsFor( COCKROACHDB, DB2, MARIA, MYSQL, ORACLE ); + private static SqlStatementTracker sqlTracker; + + // A condition to check that entities are persisted using a merge operator when the database actually supports it. + private final static Condition IS_USING_MERGE = new Condition<>( + s -> s.toLowerCase().startsWith( "merge into" ), + "insertions or updates without using the merge operator" + ); + + @Override + protected Configuration constructConfiguration() { + Configuration configuration = super.constructConfiguration(); + sqlTracker = new SqlStatementTracker( UpsertTest::filter, configuration.getProperties() ); + return configuration; + } + + @Override + protected void addServices(StandardServiceRegistryBuilder builder) { + sqlTracker.registerService( builder ); + } + + private static boolean filter(String s) { + String[] accepted = {"insert ", "update ", "merge "}; + for ( String valid : accepted ) { + if ( s.toLowerCase().startsWith( valid ) ) { + return true; + } + } + return false; + } @Override protected Collection> annotatedEntities() { @@ -55,19 +76,26 @@ public void testMutinyUpsert(VertxTestContext context) { test( context, getMutinySessionFactory().withStatelessTransaction( ss -> ss .upsert( new Record( 123L, "hello earth" ) ) .call( () -> ss.upsert( new Record( 456L, "hello mars" ) ) ) + .invoke( this::assertQueries ) ) .call( v -> getMutinySessionFactory().withStatelessTransaction( ss -> ss - .createSelectionQuery( "from Record order by id", Record.class ).getResultList() ) - .invoke( results -> assertThat( results ).containsExactly( - new Record( 123L, "hello earth" ), - new Record( 456L, "hello mars" ) - ) ) + .createSelectionQuery( "from Record order by id", Record.class ) + .getResultList() ) + .invoke( results -> { + assertThat( results ).containsExactly( + new Record( 123L, "hello earth" ), + new Record( 456L, "hello mars" ) + ); + } ) ) .call( () -> getMutinySessionFactory().withStatelessTransaction( ss -> ss .upsert( new Record( 123L, "goodbye earth" ) ) ) ) - .call( v -> getMutinySessionFactory().withStatelessTransaction( ss -> ss - .createSelectionQuery( "from Record order by id", Record.class ).getResultList() ) + .invoke( this::assertQueries ) + .call( v -> getMutinySessionFactory() + .withStatelessTransaction( ss -> ss + .createSelectionQuery( "from Record order by id", Record.class ) + .getResultList() ) .invoke( results -> assertThat( results ).containsExactly( new Record( 123L, "goodbye earth" ), new Record( 456L, "hello mars" ) @@ -81,6 +109,7 @@ public void testMutinyUpsertWithEntityName(VertxTestContext context) { test( context, getMutinySessionFactory().withStatelessTransaction( ss -> ss .upsert( Record.class.getName(), new Record( 123L, "hello earth" ) ) .call( () -> ss.upsert( Record.class.getName(), new Record( 456L, "hello mars" ) ) ) + .invoke( this::assertQueries ) ) .call( v -> getMutinySessionFactory().withStatelessTransaction( ss -> ss .createSelectionQuery( "from Record order by id", Record.class ).getResultList() ) @@ -92,6 +121,7 @@ public void testMutinyUpsertWithEntityName(VertxTestContext context) { .call( () -> getMutinySessionFactory().withStatelessTransaction( ss -> ss .upsert( Record.class.getName(), new Record( 123L, "goodbye earth" ) ) ) ) + .invoke( this::assertQueries ) .call( v -> getMutinySessionFactory().withStatelessTransaction( ss -> ss .createSelectionQuery( "from Record order by id", Record.class ).getResultList() ) .invoke( results -> assertThat( results ).containsExactly( @@ -108,6 +138,7 @@ public void testStageUpsert(VertxTestContext context) { .upsert( new Record( 123L, "hello earth" ) ) .thenCompose( v -> ss.upsert( new Record( 456L, "hello mars" ) ) ) ) + .thenAccept( v -> this.assertQueries() ) .thenCompose( v -> getSessionFactory().withStatelessTransaction( ss -> ss .createSelectionQuery( "from Record order by id", Record.class ).getResultList() ) .thenAccept( results -> assertThat( results ).containsExactly( @@ -118,6 +149,7 @@ public void testStageUpsert(VertxTestContext context) { .thenCompose( v -> getSessionFactory().withStatelessTransaction( ss -> ss .upsert( new Record( 123L, "goodbye earth" ) ) ) ) + .thenAccept( v -> this.assertQueries() ) .thenCompose( v -> getSessionFactory().withStatelessTransaction( ss -> ss .createSelectionQuery( "from Record order by id", Record.class ).getResultList() ) .thenAccept( results -> assertThat( results ).containsExactly( @@ -134,6 +166,7 @@ public void testStageUpsertWithEntityName(VertxTestContext context) { .upsert( Record.class.getName(), new Record( 123L, "hello earth" ) ) .thenCompose( v -> ss.upsert( Record.class.getName(), new Record( 456L, "hello mars" ) ) ) ) + .thenAccept( v -> this.assertQueries() ) .thenCompose( v -> getSessionFactory().withStatelessTransaction( ss -> ss .createSelectionQuery( "from Record order by id", Record.class ).getResultList() ) .thenAccept( results -> assertThat( results ).containsExactly( @@ -144,6 +177,7 @@ public void testStageUpsertWithEntityName(VertxTestContext context) { .thenCompose( v -> getSessionFactory().withStatelessTransaction( ss -> ss .upsert( Record.class.getName(), new Record( 123L, "goodbye earth" ) ) ) ) + .thenAccept( v -> this.assertQueries() ) .thenCompose( v -> getSessionFactory().withStatelessTransaction( ss -> ss .createSelectionQuery( "from Record order by id", Record.class ).getResultList() ) .thenAccept( results -> assertThat( results ).containsExactly( @@ -154,6 +188,28 @@ public void testStageUpsertWithEntityName(VertxTestContext context) { ); } + private void assertQueries() { + if ( hasMergeOperator() ) { + assertThat( sqlTracker.getLoggedQueries() ).have( IS_USING_MERGE ); + } + else { + // This might be overkill, but it's still helpful in case more databases are going to support + // the merge operator, and we need to update the documentation or warn people about it. + assertThat( sqlTracker.getLoggedQueries() ).doNotHave( IS_USING_MERGE ); + } + } + + private boolean hasMergeOperator() { + switch ( dbType() ) { + case SQLSERVER: + case ORACLE: + case POSTGRESQL: + return true; + default: + return false; + } + } + @Entity(name = "Record") @Table(name = "Record") public static class Record {