diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/logging/impl/Log.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/logging/impl/Log.java index 75287e5be..6bacd56db 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/logging/impl/Log.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/logging/impl/Log.java @@ -12,6 +12,7 @@ import jakarta.persistence.PersistenceException; import org.hibernate.HibernateException; +import org.hibernate.JDBCException; import org.hibernate.LazyInitializationException; import org.hibernate.cache.CacheException; import org.hibernate.dialect.Dialect; @@ -24,6 +25,7 @@ import org.jboss.logging.annotations.Message; import org.jboss.logging.annotations.MessageLogger; +import static org.jboss.logging.Logger.Level.DEBUG; import static org.jboss.logging.Logger.Level.ERROR; import static org.jboss.logging.Logger.Level.INFO; import static org.jboss.logging.Logger.Level.WARN; @@ -315,4 +317,12 @@ public interface Log extends BasicLogger { @LogMessage(level = WARN) @Message( id= 494, value = "Attempt to merge an uninitialized collection with queued operations; queued operations will be ignored: %s") void ignoreQueuedOperationsOnMerge(String collectionInfoString); + + // Same method in ORM + @LogMessage(level = DEBUG) + @Message(value = "JDBCException was thrown for a transaction marked for rollback. " + + " This is probably due to an operation failing fast due to the transaction being marked for rollback.", + id = 520) + void jdbcExceptionThrownWithTransactionRolledBack(@Cause JDBCException e); + } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/ReactiveSession.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/ReactiveSession.java index f53ed4fca..88cfe3ec6 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/ReactiveSession.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/ReactiveSession.java @@ -20,6 +20,7 @@ import org.hibernate.engine.spi.EntityEntry; import org.hibernate.engine.spi.SessionImplementor; import org.hibernate.event.spi.DeleteContext; +import org.hibernate.event.spi.LoadEventListener; import org.hibernate.event.spi.MergeContext; import org.hibernate.event.spi.PersistContext; import org.hibernate.event.spi.RefreshContext; @@ -66,6 +67,8 @@ public interface ReactiveSession extends ReactiveQueryProducer, ReactiveSharedSe CompletionStage reactiveMerge(Object object, MergeContext copiedAlready); + CompletionStage reactiveLoad(LoadEventListener.LoadType loadType, Object id, String entityName, LockOptions lockOptions, Boolean readOnly); + CompletionStage reactiveFlush(); CompletionStage reactiveAutoflush(); diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionImpl.java index f3cefc7dc..02b392a27 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionImpl.java @@ -5,16 +5,6 @@ */ package org.hibernate.reactive.session.impl; -import jakarta.persistence.TypedQueryReference; -import jakarta.persistence.criteria.CommonAbstractCriteria; -import java.lang.invoke.MethodHandles; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletionException; -import java.util.concurrent.CompletionStage; -import java.util.function.Supplier; - import org.hibernate.CacheMode; import org.hibernate.FlushMode; import org.hibernate.HibernateException; @@ -66,6 +56,7 @@ import org.hibernate.jpa.spi.NativeQueryTupleTransformer; import org.hibernate.loader.LoaderLogging; import org.hibernate.loader.ast.spi.MultiIdLoadOptions; +import org.hibernate.loader.internal.IdentifierLoadAccessImpl; import org.hibernate.loader.internal.LoadAccessContext; import org.hibernate.metamodel.mapping.EntityMappingType; import org.hibernate.metamodel.mapping.NaturalIdMapping; @@ -127,10 +118,19 @@ import jakarta.persistence.EntityGraph; import jakarta.persistence.EntityNotFoundException; import jakarta.persistence.Tuple; +import jakarta.persistence.TypedQueryReference; +import jakarta.persistence.criteria.CommonAbstractCriteria; import jakarta.persistence.criteria.CriteriaDelete; import jakarta.persistence.criteria.CriteriaQuery; import jakarta.persistence.criteria.CriteriaUpdate; import jakarta.persistence.metamodel.Attribute; +import java.lang.invoke.MethodHandles; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.function.Supplier; import static java.lang.Boolean.TRUE; import static org.hibernate.engine.internal.ManagedTypeHelper.asPersistentAttributeInterceptable; @@ -143,10 +143,12 @@ import static org.hibernate.reactive.persister.entity.impl.ReactiveEntityPersister.forceInitialize; import static org.hibernate.reactive.session.impl.SessionUtil.checkEntityFound; import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture; +import static org.hibernate.reactive.util.impl.CompletionStages.failedFuture; import static org.hibernate.reactive.util.impl.CompletionStages.nullFuture; import static org.hibernate.reactive.util.impl.CompletionStages.rethrow; import static org.hibernate.reactive.util.impl.CompletionStages.returnNullorRethrow; import static org.hibernate.reactive.util.impl.CompletionStages.returnOrRethrow; +import static org.hibernate.reactive.util.impl.CompletionStages.supplyStage; import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture; /** @@ -217,8 +219,7 @@ public Object immediateLoad(String entityName, Object id) throws HibernateExcept public CompletionStage reactiveImmediateLoad(String entityName, Object id) throws HibernateException { if ( LOG.isDebugEnabled() ) { - final EntityPersister persister = getFactory().getMappingMetamodel() - .getEntityDescriptor( entityName ); + final EntityPersister persister = requireEntityPersister( entityName ); LOG.debugf( "Initializing proxy: %s", MessageHelper.infoString( persister, id, getFactory() ) ); } threadCheck(); @@ -251,10 +252,7 @@ public CompletionStage reactiveInternalLoad(String entityName, Object id } threadCheck(); - final LoadEvent event = new LoadEvent( - id, entityName, true, this, - getReadOnlyFromLoadQueryInfluencers() - ); + final LoadEvent event = makeLoadEvent( entityName, id, getReadOnlyFromLoadQueryInfluencers(), true ); return fireLoadNoChecks( event, type ) .thenApply( v -> { final Object result = event.getResult(); @@ -270,6 +268,43 @@ public CompletionStage reactiveInternalLoad(String entityName, Object id } ); } + @Override + public Object load(LoadEventListener.LoadType loadType, Object id, String entityName, LockOptions lockOptions, Boolean readOnly) { + // When the user needs a reference to the entity, we are not supposed to touche the database, and we don't return + // a CompletionStage. So it's fine to delegate to ORM. + // Everywhere else, reactiveLoad should be used. + return super.load( loadType, id, entityName, lockOptions, readOnly ); + } + + /** + * @see SessionImpl#load(LoadEventListener.LoadType, Object, String, LockOptions, Boolean) + */ + public CompletionStage reactiveLoad(LoadEventListener.LoadType loadType, Object id, String entityName, LockOptions lockOptions, Boolean readOnly) { + if ( lockOptions != null ) { + // (from ORM) TODO: I doubt that this branch is necessary, and it's probably even wrong + final LoadEvent event = makeLoadEvent( entityName, id, readOnly, lockOptions ); + return fireLoad( event, loadType ) + .thenApply( v -> { + final Object result = event.getResult(); + releaseLoadEvent( event ); + return result; + } ); + } + else { + final LoadEvent event = makeLoadEvent( entityName, id, readOnly, false ); + return supplyStage( () -> fireLoad( event, loadType ) + .thenApply( v -> { + final Object result = event.getResult(); + releaseLoadEvent( event ); + if ( !loadType.isAllowNulls() && result == null ) { + getSession().getFactory().getEntityNotFoundDelegate().handleEntityNotFound( entityName, id ); + } + return result; + } ) + ).whenComplete( (o, throwable) -> afterOperation( throwable != null ) ); + } + } + @Override //Note: when making changes to this method, please also consider // the similar code in Mutiny.fetch() and Stage.fetch() @@ -1066,7 +1101,7 @@ public CompletionStage reactiveForceFlush(EntityEntry entry) { } if ( getPersistenceContextInternal().getCascadeLevel() > 0 ) { - return CompletionStages.failedFuture( new ObjectDeletedException( + return failedFuture( new ObjectDeletedException( "deleted object would be re-saved by cascade (remove deleted object from associations)", entry.getId(), entry.getPersister().getEntityName() @@ -1194,10 +1229,16 @@ private CompletionStage fireLock(LockEvent event) { } @Override - public CompletionStage reactiveGet( - Class entityClass, - Object id) { - return new ReactiveIdentifierLoadAccessImpl<>( entityClass ).load( id ); + public CompletionStage reactiveGet(Class entityClass, Object id) { + return reactiveById( entityClass ).load( id ); + } + + private ReactiveIdentifierLoadAccessImpl reactiveById(Class entityClass) { + return new ReactiveIdentifierLoadAccessImpl<>( this, requireEntityPersister( entityClass ) ); + } + + private ReactiveIdentifierLoadAccessImpl reactiveById(String entityName) { + return new ReactiveIdentifierLoadAccessImpl<>( this, requireEntityPersister( entityName ) ); } @Override @@ -1207,60 +1248,77 @@ public CompletionStage reactiveFind( LockOptions lockOptions, EntityGraph fetchGraph) { checkOpen(); + return supplyStage( () -> { + if ( fetchGraph != null ) { + getLoadQueryInfluencers() + .getEffectiveEntityGraph() + .applyGraph( (RootGraphImplementor) fetchGraph, GraphSemantic.FETCH ); + } + getLoadQueryInfluencers().setReadOnly( readOnlyHint( null ) ); + + return reactiveById( entityClass ) + .with( determineAppropriateLocalCacheMode( null ) ) + .with( lockOptions ) + .load( id ); + } ).handle( CompletionStages::handle ) + .thenCompose( handler -> handleReactiveFindException( entityClass, id, lockOptions, handler ) ) + .whenComplete( (v, e) -> { + getLoadQueryInfluencers().getEffectiveEntityGraph().clear(); + getLoadQueryInfluencers().setReadOnly( null ); + } ); + } - if ( fetchGraph != null ) { - getLoadQueryInfluencers() - .getEffectiveEntityGraph() - .applyGraph( (RootGraphImplementor) fetchGraph, GraphSemantic.FETCH ); - } - -// Boolean readOnly = properties == null ? null : (Boolean) properties.get( QueryHints.HINT_READONLY ); -// getLoadQueryInfluencers().setReadOnly( readOnly ); - - final ReactiveIdentifierLoadAccessImpl loadAccess = - new ReactiveIdentifierLoadAccessImpl<>( entityClass ) - .with( determineAppropriateLocalCacheMode( null ) ) - .with( lockOptions ); - - return loadAccess.load( id ) - .handle( (result, e) -> { - if ( e instanceof EntityNotFoundException ) { - // DefaultLoadEventListener.returnNarrowedProxy may throw ENFE (see HHH-7861 for details), - // which find() should not throw. Find() should return null if the entity was not found. - // if ( log.isDebugEnabled() ) { - // String entityName = entityClass != null ? entityClass.getName(): null; - // String identifierValue = id != null ? id.toString() : null ; - // log.ignoringEntityNotFound( entityName, identifierValue ); - // } - throw new UnsupportedOperationException(); - } - if ( e instanceof ObjectDeletedException ) { - //the spec is silent about people doing remove() find() on the same PC - throw new UnsupportedOperationException(); - } - if ( e instanceof ObjectNotFoundException ) { - //should not happen on the entity itself with get - throw new IllegalArgumentException( e.getMessage(), e ); - } - if ( e instanceof MappingException - || e instanceof TypeMismatchException - || e instanceof ClassCastException ) { - throw getExceptionConverter().convert( new IllegalArgumentException( e.getMessage(), e ) ); - } - if ( e instanceof JDBCException ) { -// if ( accessTransaction().getRollbackOnly() ) { -// // assume this is the similar to the WildFly / IronJacamar "feature" described under HHH-12472 -// throw new UnsupportedOperationException(); -// } - throw getExceptionConverter().convert( (JDBCException) e, lockOptions ); - } - if ( e instanceof RuntimeException ) { - throw getExceptionConverter().convert( (RuntimeException) e, lockOptions ); - } + private CompletionStage handleReactiveFindException( + Class entityClass, + Object primaryKey, + LockOptions lockOptions, + CompletionStages.CompletionStageHandler handler) { + if ( !handler.hasFailed() ) { + return handler.getResultAsCompletionStage(); + } + final Throwable e = handler.getThrowable(); + if ( e instanceof EntityNotFoundException ) { + // We swallow other sorts of EntityNotFoundException and return null + // For example, DefaultLoadEventListener.proxyImplementation() throws + // EntityNotFoundException if there's an existing proxy in the session, + // but the underlying database row has been deleted (see HHH-7861) + logIgnoringEntityNotFound( entityClass, primaryKey ); + return nullFuture(); + } + if ( e instanceof ObjectDeletedException ) { + // the spec is silent about people doing remove() find() on the same PC + return null; + } + if ( e instanceof ObjectNotFoundException ) { + // should not happen on the entity itself with get + // TODO: in fact this will occur instead of EntityNotFoundException + // when using StandardEntityNotFoundDelegate, so probably we + // should return null here, as we do above + return failedFuture( new IllegalArgumentException( e.getMessage(), e ) ); + } + if ( e instanceof MappingException || e instanceof TypeMismatchException || e instanceof ClassCastException ) { + return failedFuture( getExceptionConverter().convert( new IllegalArgumentException( + e.getMessage(), + e + ) ) ); + } + if ( e instanceof JDBCException ) { + // I don't think this is ever going to happen in Hibernate Reactive + if ( accessTransaction().isActive() && accessTransaction().getRollbackOnly() ) { + // Assume situation HHH-12472 running on WildFly + // Just log the exception and return null + LOG.jdbcExceptionThrownWithTransactionRolledBack( (JDBCException) e ); + return nullFuture(); + } + else { + return failedFuture( getExceptionConverter().convert( (JDBCException) e, lockOptions ) ); + } + } + if ( e instanceof RuntimeException ) { + return failedFuture( getExceptionConverter().convert( (RuntimeException) e, lockOptions ) ); + } - return result; - } ) - .whenComplete( (v, e) -> getLoadQueryInfluencers().getEffectiveEntityGraph().clear() ); + return handler.getResultAsCompletionStage(); } @Override @@ -1281,13 +1339,18 @@ private ReactiveEntityPersister entityPersister(Class entityClass) { return (ReactiveEntityPersister) getFactory().getMappingMetamodel().getEntityDescriptor( entityClass ); } - private CompletionStage fireReactiveLoad(LoadEvent event, LoadEventListener.LoadType loadType) { + private CompletionStage fireLoad(LoadEvent event, LoadEventListener.LoadType loadType) { checkOpenOrWaitingForAutoClose(); - return fireLoadNoChecks( event, loadType ) - .whenComplete( (v, e) -> delayedAfterCompletion() ); + .thenAccept( v -> delayedAfterCompletion() ); } + /** + * This version of {@link #load} is for use by internal methods only. + * It skips the session open check, transaction sync checks, and so on, + * which have been shown to be expensive (apparently they prevent these + * hot methods from being inlined). + */ private CompletionStage fireLoadNoChecks(LoadEvent event, LoadEventListener.LoadType loadType) { pulseTransactionCoordinator(); @@ -1310,76 +1373,38 @@ public void checkTransactionNeededForUpdateOperation(String exceptionMessage) { //no-op because we don't support transactions } - private class ReactiveIdentifierLoadAccessImpl { - - private final EntityPersister entityPersister; - - private LockOptions lockOptions; - private CacheMode cacheMode; - - //Note that entity graphs aren't supported at all - //because we're not using the EntityLoader from - //the plan package, so this stuff is useless - private RootGraphImplementor rootGraph; - private GraphSemantic graphSemantic; - - public ReactiveIdentifierLoadAccessImpl(EntityPersister entityPersister) { - this.entityPersister = entityPersister; - } - - public ReactiveIdentifierLoadAccessImpl(String entityName) { - this( getFactory().getMappingMetamodel().getEntityDescriptor( entityName ) ); - } - - public ReactiveIdentifierLoadAccessImpl(Class entityClass) { - this( getFactory().getMappingMetamodel().getEntityDescriptor( entityClass ) ); - } - - public final ReactiveIdentifierLoadAccessImpl with(LockOptions lockOptions) { - this.lockOptions = lockOptions; - return this; - } - - public ReactiveIdentifierLoadAccessImpl with(CacheMode cacheMode) { - this.cacheMode = cacheMode; - return this; - } + private class ReactiveIdentifierLoadAccessImpl extends IdentifierLoadAccessImpl> { - public ReactiveIdentifierLoadAccessImpl with(RootGraph graph, GraphSemantic semantic) { - rootGraph = (RootGraphImplementor) graph; - graphSemantic = semantic; - return this; - } - - public final CompletionStage getReference(Object id) { - return perform( () -> doGetReference( id ) ); + public ReactiveIdentifierLoadAccessImpl(LoadAccessContext context, EntityPersister entityPersister) { + super(context, entityPersister); } + @Override protected CompletionStage perform(Supplier> executor) { - if ( graphSemantic != null ) { - if ( rootGraph == null ) { + if ( getGraphSemantic() != null ) { + if ( getRootGraph() == null ) { throw new IllegalArgumentException( "Graph semantic specified, but no RootGraph was supplied" ); } } CacheMode sessionCacheMode = getCacheMode(); boolean cacheModeChanged = false; - if ( cacheMode != null ) { + if ( getCacheMode() != null ) { // naive check for now... // todo : account for "conceptually equal" - if ( cacheMode != sessionCacheMode ) { - setCacheMode( cacheMode ); + if ( getCacheMode() != sessionCacheMode ) { + setCacheMode( getCacheMode() ); cacheModeChanged = true; } } - if ( graphSemantic != null ) { - getLoadQueryInfluencers().getEffectiveEntityGraph().applyGraph( rootGraph, graphSemantic ); + if ( getGraphSemantic() != null ) { + getLoadQueryInfluencers().getEffectiveEntityGraph().applyGraph( getRootGraph(), getGraphSemantic() ); } boolean finalCacheModeChanged = cacheModeChanged; return executor.get() .whenComplete( (v, x) -> { - if ( graphSemantic != null ) { + if ( getGraphSemantic() != null ) { getLoadQueryInfluencers().getEffectiveEntityGraph().clear(); } if ( finalCacheModeChanged ) { @@ -1389,71 +1414,39 @@ protected CompletionStage perform(Supplier> executor) { } ); } - @SuppressWarnings("unchecked") + @Override protected CompletionStage doGetReference(Object id) { - if ( lockOptions != null ) { - LoadEvent event = new LoadEvent( - id, - entityPersister.getEntityName(), - lockOptions, - ReactiveSessionImpl.this, - getReadOnlyFromLoadQueryInfluencers() - ); - return fireReactiveLoad( event, LoadEventListener.LOAD ).thenApply( v -> (T) event.getResult() ); - } - - LoadEvent event = new LoadEvent( - id, - entityPersister.getEntityName(), - false, - ReactiveSessionImpl.this, - getReadOnlyFromLoadQueryInfluencers() - ); - return fireReactiveLoad( event, LoadEventListener.LOAD ) - .thenApply( v -> { - if ( event.getResult() == null ) { - getFactory().getEntityNotFoundDelegate().handleEntityNotFound( - entityPersister.getEntityName(), - id - ); - } - return (T) event.getResult(); - } ).whenComplete( (v, x) -> afterOperation( x != null ) ); - } - - public final CompletionStage load(Object id) { - return perform( () -> doLoad( id, LoadEventListener.GET ) ); + // getReference si supposed to return T, not CompletionStage + // I can't think of a way to change the super class so that it is mapped correctly. + // So, for now, I will throw an exception and make sure that it doesn't really get called + // (the one in SessionImpl should be used) + throw new UnsupportedOperationException(); } - // public final CompletionStage fetch(Object id) { -// return perform( () -> doLoad( id, LoadEventListener.IMMEDIATE_LOAD) ); -// } -// + @Override @SuppressWarnings("unchecked") - protected final CompletionStage doLoad(Object id, LoadEventListener.LoadType loadType) { + protected CompletionStage doLoad(Object id) { if ( id == null ) { + // This is needed to make the tests with NaturalIds pass. + // It's was already part of Hibernate Reactive. + // I'm not sure why though, it doesn't seem like Hibernate ORM does it. return nullFuture(); } - if ( lockOptions != null ) { - LoadEvent event = new LoadEvent( - id, - entityPersister.getEntityName(), - lockOptions, - ReactiveSessionImpl.this, - getReadOnlyFromLoadQueryInfluencers() - ); - return fireReactiveLoad( event, loadType ).thenApply( v -> (T) event.getResult() ); - } - LoadEvent event = new LoadEvent( - id, - entityPersister.getEntityName(), - false, - ReactiveSessionImpl.this, - getReadOnlyFromLoadQueryInfluencers() - ); - return fireReactiveLoad( event, loadType ) - .whenComplete( (v, t) -> afterOperation( t != null ) ) - .thenApply( v -> (T) event.getResult() ); + final ReactiveSession session = (ReactiveSession) getContext().getSession(); + return supplyStage( () -> session + .reactiveLoad( LoadEventListener.GET, coerceId( id, session.getFactory() ), getEntityPersister().getEntityName(), getLockOptions(), isReadOnly( getContext().getSession() ) ) + ) + .handle( CompletionStages::handle ) + .thenCompose( handler -> handler.getThrowable() instanceof ObjectNotFoundException + ? nullFuture() + : handler.getResultAsCompletionStage() + ) + .thenApply( result -> { + // ORM calls + // initializeIfNecessary( result ); + // But, Hibernate Reactive doesn't support lazy initializations + return (T) result; + } ); } } @@ -1617,10 +1610,6 @@ public NaturalIdLoadAccessImpl with(LockOptions lockOptions) { return this; } - protected void synchronizationEnabled(boolean synchronizationEnabled) { - this.synchronizationEnabled = synchronizationEnabled; - } - /** * @see org.hibernate.loader.internal.BaseNaturalIdLoadAccessImpl#doGetReference(Object) */ @@ -1700,14 +1689,6 @@ protected void performAnyNeededCrossReferenceSynchronizations() { } } - protected final ReactiveIdentifierLoadAccessImpl getIdentifierLoadAccess() { - final ReactiveIdentifierLoadAccessImpl identifierLoadAccess = new ReactiveIdentifierLoadAccessImpl<>( entityPersister ); - if ( this.lockOptions != null ) { - identifierLoadAccess.with( lockOptions ); - } - return identifierLoadAccess; - } - protected ReactiveEntityPersister entityPersister() { return entityPersister; } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveStatelessSessionImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveStatelessSessionImpl.java index 5079f0315..84483afb3 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveStatelessSessionImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveStatelessSessionImpl.java @@ -7,22 +7,18 @@ import org.hibernate.HibernateException; import org.hibernate.LockMode; -import org.hibernate.LockOptions; -import org.hibernate.TransientObjectException; import org.hibernate.UnknownEntityTypeException; import org.hibernate.UnresolvableObjectException; import org.hibernate.bytecode.enhance.spi.interceptor.EnhancementAsProxyLazinessInterceptor; -import org.hibernate.bytecode.spi.BytecodeEnhancementMetadata; import org.hibernate.cache.spi.access.EntityDataAccess; import org.hibernate.collection.spi.PersistentCollection; import org.hibernate.dialect.Dialect; import org.hibernate.engine.internal.ReactivePersistenceContextAdapter; -import org.hibernate.engine.spi.EntityKey; import org.hibernate.engine.spi.LoadQueryInfluencers; import org.hibernate.engine.spi.PersistenceContext; -import org.hibernate.engine.spi.PersistentAttributeInterceptable; -import org.hibernate.engine.spi.PersistentAttributeInterceptor; import org.hibernate.engine.spi.SharedSessionContractImplementor; +import org.hibernate.event.monitor.spi.DiagnosticEvent; +import org.hibernate.event.monitor.spi.EventMonitor; import org.hibernate.generator.BeforeExecutionGenerator; import org.hibernate.generator.Generator; import org.hibernate.graph.GraphSemantic; @@ -74,7 +70,7 @@ import org.hibernate.reactive.query.sqm.internal.ReactiveSqmSelectionQueryImpl; import org.hibernate.reactive.session.ReactiveSqmQueryImplementor; import org.hibernate.reactive.session.ReactiveStatelessSession; -import org.hibernate.reactive.util.impl.CompletionStages; +import org.hibernate.reactive.util.impl.CompletionStages.Completable; import org.hibernate.stat.spi.StatisticsImplementor; import jakarta.persistence.EntityGraph; @@ -87,19 +83,24 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.function.BiConsumer; import java.util.function.Supplier; import static java.lang.Boolean.TRUE; import static java.lang.invoke.MethodHandles.lookup; +import static java.util.function.Function.identity; import static org.hibernate.engine.internal.ManagedTypeHelper.asPersistentAttributeInterceptable; import static org.hibernate.engine.internal.ManagedTypeHelper.isPersistentAttributeInterceptable; import static org.hibernate.engine.internal.Versioning.incrementVersion; import static org.hibernate.engine.internal.Versioning.seedVersion; import static org.hibernate.engine.internal.Versioning.setVersion; +import static org.hibernate.event.internal.DefaultInitializeCollectionEventListener.handlePotentiallyEmptyCollection; import static org.hibernate.generator.EventType.INSERT; +import static org.hibernate.internal.util.NullnessUtil.castNonNull; import static org.hibernate.internal.util.StringHelper.isEmpty; import static org.hibernate.internal.util.StringHelper.isNotEmpty; import static org.hibernate.loader.ast.spi.CascadingFetchProfile.REFRESH; +import static org.hibernate.loader.internal.CacheLoadHelper.initializeCollectionFromCache; import static org.hibernate.pretty.MessageHelper.infoString; import static org.hibernate.proxy.HibernateProxy.extractLazyInitializer; import static org.hibernate.reactive.id.impl.IdentifierGeneration.castToIdentifierType; @@ -110,6 +111,7 @@ import static org.hibernate.reactive.util.impl.CompletionStages.failedFuture; import static org.hibernate.reactive.util.impl.CompletionStages.loop; import static org.hibernate.reactive.util.impl.CompletionStages.nullFuture; +import static org.hibernate.reactive.util.impl.CompletionStages.supplyStage; import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture; /** @@ -123,11 +125,8 @@ public class ReactiveStatelessSessionImpl extends StatelessSessionImpl implement private static final Log LOG = make( Log.class, lookup() ); private final LoadQueryInfluencers influencers; - private final ReactiveConnection reactiveConnection; - private final ReactiveStatelessSessionImpl batchingHelperSession; - private final PersistenceContext persistenceContext; public ReactiveStatelessSessionImpl(SessionFactoryImpl factory, SessionCreationOptions options, ReactiveConnection connection) { @@ -215,7 +214,7 @@ public CompletionStage> reactiveGet(Class entityClass, Object... checkOpen(); for (Object id : ids) { if ( id == null ) { - throw new IllegalArgumentException("Null id"); + return failedFuture( new IllegalArgumentException( "Null id" ) ); } } @@ -224,12 +223,12 @@ public CompletionStage> reactiveGet(Class entityClass, Object... return getEntityPersister( entityClass.getName() ) .reactiveMultiLoad( sids, this, StatelessSessionImpl.MULTI_ID_LOAD_OPTIONS ) - .whenComplete( (v, e) -> { + .whenComplete( (list, e) -> { if ( getPersistenceContext().isLoadFinished() ) { getPersistenceContext().clear(); } } ) - .thenApply( objects -> (List) objects ); + .thenApply( list -> (List) list ); } @Override @@ -258,7 +257,15 @@ public CompletionStage reactiveGet(String entityName, Object id, LockMode .applyGraph( (RootGraphImplementor) fetchGraph, GraphSemantic.FETCH ); } - return getEntityPersister( entityName ) + ReactiveEntityPersister persister = getEntityPersister( entityName ); + if ( persister.canReadFromCache() ) { + final Object cachedEntity = loadFromSecondLevelCache( persister, generateEntityKey( id, persister ), null, lockMode ); + if ( cachedEntity != null ) { + getPersistenceContext().clear(); + return completedFuture( (T) cachedEntity ); + } + } + return persister .reactiveLoad( id, null, getNullSafeLockMode( lockMode ), this ) .whenComplete( (v, e) -> { if ( getPersistenceContext().isLoadFinished() ) { @@ -283,8 +290,11 @@ public ReactiveEntityPersister getEntityPersister(String entityName, Object obje public CompletionStage reactiveInsert(Object entity) { checkOpen(); final ReactiveEntityPersister persister = getEntityPersister( null, entity ); - return reactiveInsert( entity, persister ) - .thenAccept( v -> { + final Object[] state = persister.getValues( entity ); + return reactiveInsert( entity, state, persister ) + .thenCompose( id -> recreateCollections( entity, id, persister ) ) + .thenAccept( id -> { + firePostInsert( entity, id, state, persister ); final StatisticsImplementor statistics = getFactory().getStatistics(); if ( statistics.isStatisticsEnabled() ) { statistics.insertEntity( persister.getEntityName() ); @@ -292,71 +302,145 @@ public CompletionStage reactiveInsert(Object entity) { } ); } - private CompletionStage reactiveInsert(Object entity, ReactiveEntityPersister persister) { - final Object[] state = persister.getValues( entity ); - final Generator generator = persister.getGenerator(); - if ( !generator.generatedOnExecution() ) { - return generateId( persister, entity, generator ) - .thenCompose( generatedId -> { - final Object id = castToIdentifierType( generatedId, persister ); - if ( persister.isVersioned() ) { - if ( seedVersion( entity, state, persister, this ) ) { - persister.setValues( entity, state ); - } - } - if ( firePreInsert( entity, id, state, persister ) ) { - return voidFuture(); - } - getInterceptor() - .onInsert( - entity, - id, - state, - persister.getPropertyNames(), - persister.getPropertyTypes() - ); - return persister - .insertReactive( id, state, entity, this ) - .thenAccept( ignore -> { - persister.setIdentifier( entity, id, this ); - firePostInsert( entity, id, state, persister ); - } ); - } ); + private static class Loop { + private CompletionStage loop = voidFuture(); + + public void then(Supplier> step) { + loop = loop.thenCompose( v -> step.get() ); } - else { - if ( firePreInsert( entity, null, state, persister ) ) { - return voidFuture(); - } - getInterceptor() - .onInsert( entity, null, state, persister.getPropertyNames(), persister.getPropertyTypes() ); - return persister - .insertReactive( state, entity, this ) - .thenAccept( id -> { - persister.setIdentifier( entity, id, this ); - firePostInsert( entity, id, state, persister ); - } ); + + public void whenComplete(BiConsumer consumer) { + loop = loop.whenComplete( consumer ); } } - private CompletionStage generateId(EntityPersister persister, Object entity, Generator generator) { - if ( generator.generatesOnInsert() ) { - if ( generator instanceof ReactiveIdentifierGenerator reactiveGenerator ) { - return reactiveGenerator.generate(this, this); - } - else if ( generator instanceof BeforeExecutionGenerator beforeExecutionGenerator ) { - return completedFuture( beforeExecutionGenerator.generate(this, entity, null, INSERT) ); - } - else { - throw new IllegalArgumentException( "Unsupported generator type: " + generator.getClass().getName() ); - } - } - else { - final Object id = persister.getIdentifier( entity, this ); - if ( id == null ) { - throw new IdentifierGenerationException( "Identifier of entity '" + persister.getEntityName() + "' must be manually assigned before calling 'insert()'" ); + private CompletionStage recreateCollections(Object entity, Object id, EntityPersister persister) { + final Completable stage = new Completable<>(); + final Loop loop = new Loop(); + forEachOwnedCollection( + entity, id, persister, (descriptor, collection) -> { + firePreRecreate( collection, descriptor ); + final EventMonitor eventMonitor = getEventMonitor(); + final DiagnosticEvent event = eventMonitor.beginCollectionRecreateEvent(); + loop.then( () -> supplyStage( () -> ( (ReactiveCollectionPersister) descriptor ) + .reactiveRecreate( collection, id, this ) ) + .whenComplete( (t, throwable) -> eventMonitor + .completeCollectionRecreateEvent( event, id, descriptor.getRole(), throwable != null, this ) + ) + .thenAccept( unused -> { + final StatisticsImplementor statistics = getFactory().getStatistics(); + if ( statistics.isStatisticsEnabled() ) { + statistics.recreateCollection( descriptor.getRole() ); + } + firePostRecreate( collection, descriptor ); + } ) + ); + } + ); + loop.whenComplete( stage::complete ); + return stage.getStage(); + } + + private CompletionStage reactiveInsert(Object entity, Object[] state, ReactiveEntityPersister persister) { + if ( persister.isVersioned() ) { + if ( seedVersion( entity, state, persister, this ) ) { + persister.setValues( entity, state ); } + } + final Generator generator = persister.getGenerator(); + if ( generator.generatedBeforeExecution( entity, this ) ) { + return generatedIdBeforeInsert( entity, persister, generator, state ); + } + if ( generator.generatedOnExecution( entity, this ) ) { + return generateIdOnInsert( entity, persister, generator, state ); + } + return applyAssignedIdentifierInsert( entity, persister, state ); + } + + private CompletionStage applyAssignedIdentifierInsert(Object entity, ReactiveEntityPersister persister, Object[] state) { + Object id = persister.getIdentifier( entity, this ); + if ( id == null ) { + return failedFuture( new IdentifierGenerationException( "Identifier of entity '" + persister.getEntityName() + "' must be manually assigned before calling 'insert()'" ) ); + } + if ( firePreInsert( entity, id, state, persister ) ) { return completedFuture( id ); } + getInterceptor().onInsert( entity, id, state, persister.getPropertyNames(), persister.getPropertyTypes() ); + final EventMonitor eventMonitor = getEventMonitor(); + final DiagnosticEvent event = eventMonitor.beginEntityInsertEvent(); + // try-block + return supplyStage( () -> persister.insertReactive( id, state, entity, this ) ) + // finally: catches error in case insertReactive fails before returning a CompletionStage + .whenComplete( (generatedValues, throwable) -> eventMonitor + .completeEntityInsertEvent( event, id, persister.getEntityName(), throwable != null, this ) + ); + } + + private CompletionStage generateIdOnInsert( + Object entity, + ReactiveEntityPersister persister, + Generator generator, + Object[] state) { + if ( !generator.generatesOnInsert() ) { + throw new IdentifierGenerationException( "Identifier generator must generate on insert" ); + } + if ( firePreInsert( entity, null, state, persister ) ) { + return nullFuture(); + } + getInterceptor().onInsert( entity, null, state, persister.getPropertyNames(), persister.getPropertyTypes() ); + final EventMonitor eventMonitor = getEventMonitor(); + final DiagnosticEvent event = eventMonitor.beginEntityInsertEvent(); + // try-block + return supplyStage( () -> persister + .insertReactive( state, entity, this ) + .thenApply( generatedValues -> castNonNull( generatedValues ) + .getGeneratedValue( persister.getIdentifierMapping() ) + ) ) + // finally-block: catch the exceptions from insertReactive and getGeneratedValues + .whenComplete( (id, throwable) -> eventMonitor + .completeEntityInsertEvent( event, id, persister.getEntityName(), throwable != null, this ) + ) + .thenApply( id -> { + persister.setIdentifier( entity, id, this ); + return id; + } ); + } + + private CompletionStage generatedIdBeforeInsert( + Object entity, + ReactiveEntityPersister persister, + Generator generator, + Object[] state) { + if ( !generator.generatesOnInsert() ) { + return failedFuture( new IdentifierGenerationException( "Identifier generator must generate on insert" ) ); + } + return generateIdForInsert( entity, generator, persister ) + .thenCompose( id -> { + persister.setIdentifier( entity, id, this ); + if ( firePreInsert( entity, id, state, persister ) ) { + return completedFuture( id ); + } + getInterceptor().onInsert( entity, id, state, persister.getPropertyNames(), persister.getPropertyTypes() ); + final EventMonitor eventMonitor = getEventMonitor(); + final DiagnosticEvent event = eventMonitor.beginEntityInsertEvent(); + // try-block + return supplyStage( () -> persister.insertReactive( id, state, entity, this ) ) + // finally: catches error in case insertReactive fails before returning a CompletionStage + .whenComplete( (generatedValues, throwable) -> eventMonitor + .completeEntityInsertEvent( event, id, persister.getEntityName(), throwable != null, this ) + ) + .thenApply( identity() ); + } ); + } + + private CompletionStage generateIdForInsert(Object entity, Generator generator, ReactiveEntityPersister persister) { + if ( generator instanceof ReactiveIdentifierGenerator reactiveGenerator ) { + return reactiveGenerator.generate( this, this ) + .thenApply( id -> castToIdentifierType( id, persister ) ); + } + + final Object currentValue = generator.allowAssignedIdentifiers() ? persister.getIdentifier( entity ) : null; + return completedFuture( ( (BeforeExecutionGenerator) generator ).generate( this, entity, currentValue, INSERT ) ); } @Override @@ -365,7 +449,57 @@ public CompletionStage reactiveDelete(Object entity) { final ReactiveEntityPersister persister = getEntityPersister( null, entity ); final Object id = persister.getIdentifier( entity, this ); final Object version = persister.getVersion( entity ); - return persister.deleteReactive( id, version, entity, this ); + if ( firePreDelete( entity, id, persister ) ) { + return voidFuture(); + } + + getInterceptor().onDelete( entity, id, persister.getPropertyNames(), persister.getPropertyTypes() ); + return removeCollections( entity, id, persister ) + .thenCompose( v -> { + final Object ck = lockCacheItem( id, version, persister ); + final EventMonitor eventMonitor = getEventMonitor(); + final DiagnosticEvent event = eventMonitor.beginEntityDeleteEvent(); + // try-block + return supplyStage( () -> persister.deleteReactive( id, version, entity, this ) ) + // finally-block + .whenComplete( (unused, throwable) -> eventMonitor + .completeEntityDeleteEvent( event, id, persister.getEntityName(), throwable != null, this ) + ) + .thenAccept( unused -> removeCacheItem( ck, persister ) ); + } ) + .thenAccept( v -> { + firePostDelete( entity, id, persister ); + final StatisticsImplementor statistics = getFactory().getStatistics(); + if ( statistics.isStatisticsEnabled() ) { + statistics.deleteEntity( persister.getEntityName() ); + } + } ); + } + + private CompletionStage removeCollections(Object entity, Object id, EntityPersister persister) { + final Completable stage = new Completable<>(); + final Loop loop = new Loop(); + forEachOwnedCollection( entity, id, persister, + (descriptor, collection) -> { + firePreRemove( collection, entity, descriptor ); + final EventMonitor eventMonitor = getEventMonitor(); + final DiagnosticEvent event = eventMonitor.beginCollectionRemoveEvent(); + loop.then( () -> supplyStage( () -> ( (ReactiveCollectionPersister) descriptor ) + .reactiveRemove( id, this ) ) + .whenComplete( (unused, throwable) -> eventMonitor + .completeCollectionRemoveEvent( event, id, descriptor.getRole(), throwable != null, this ) + ) + .thenAccept( v -> { + firePostRemove( collection, entity, descriptor ); + final StatisticsImplementor statistics = getFactory().getStatistics(); + if ( statistics.isStatisticsEnabled() ) { + statistics.removeCollection( descriptor.getRole() ); + } + } ) + ); + } ); + loop.whenComplete( stage::complete ); + return stage.getStage(); } @Override @@ -390,19 +524,65 @@ private CompletionStage executeReactiveUpdate(Object entity) { final ReactiveEntityPersister persister = getEntityPersister( null, entity ); final Object id = persister.getIdentifier( entity, this ); final Object[] state = persister.getValues( entity ); - final Object oldVersion; + final Object oldVersion = persister.isVersioned() ? persister.getVersion( entity ) : null; if ( persister.isVersioned() ) { - oldVersion = persister.getVersion( entity ); final Object newVersion = incrementVersion( entity, oldVersion, persister, this ); setVersion( state, newVersion, persister ); persister.setValues( entity, state ); } - else { - oldVersion = null; + + if ( firePreUpdate(entity, id, state, persister) ) { + return voidFuture(); } - return persister - .updateReactive( id, state, null, false, null, oldVersion, entity, null, this ) - .thenCompose( CompletionStages::voidFuture ); + + getInterceptor().onUpdate( entity, id, state, persister.getPropertyNames(), persister.getPropertyTypes() ); + final Object ck = lockCacheItem( id, oldVersion, persister ); + final EventMonitor eventMonitor = getEventMonitor(); + final DiagnosticEvent event = eventMonitor.beginEntityUpdateEvent(); + // try-block + return supplyStage( () -> persister + .updateReactive( id, state, null, false, null, oldVersion, entity, null, this ) ) + // finally-block + .whenComplete( (generatedValues, throwable) -> eventMonitor + .completeEntityUpdateEvent( event, id, persister.getEntityName(), throwable != null, this ) + ) + .thenAccept( generatedValues -> removeCacheItem( ck, persister ) ) + .thenCompose( v -> removeAndRecreateCollections( entity, id, persister ) ) + .thenAccept( v -> { + firePostUpdate( entity, id, state, persister ); + final StatisticsImplementor statistics = getFactory().getStatistics(); + if ( statistics.isStatisticsEnabled() ) { + statistics.updateEntity( persister.getEntityName() ); + } + } ); + } + + private CompletionStage removeAndRecreateCollections(Object entity, Object id, EntityPersister persister) { + final Completable stage = new Completable<>(); + final Loop loop = new Loop(); + forEachOwnedCollection( entity, id, persister, + (descriptor, collection) -> { + firePreUpdate( collection, descriptor ); + final EventMonitor eventMonitor = getEventMonitor(); + final DiagnosticEvent event = eventMonitor.beginCollectionRemoveEvent(); + ReactiveCollectionPersister reactivePersister = (ReactiveCollectionPersister) persister; + loop.then( () -> supplyStage( () -> reactivePersister + .reactiveRemove( id, this ) + .thenCompose( v -> reactivePersister.reactiveRecreate( collection, id, this ) ) ) + .whenComplete( (unused, throwable) -> eventMonitor + .completeCollectionRemoveEvent( event, id, descriptor.getRole(), throwable != null, this ) + ) + .thenAccept( v -> { + firePostUpdate( collection, descriptor ); + final StatisticsImplementor statistics = getFactory().getStatistics(); + if ( statistics.isStatisticsEnabled() ) { + statistics.updateCollection( descriptor.getRole() ); + } + } ) + ); + } ); + loop.whenComplete( stage::complete ); + return stage.getStage(); } @Override @@ -422,6 +602,7 @@ public CompletionStage reactiveRefresh(Object entity, LockMode lockMode) { @Override public CompletionStage reactiveRefresh(String entityName, Object entity, LockMode lockMode) { + checkOpen(); final ReactiveEntityPersister persister = getEntityPersister( entityName, entity ); final Object id = persister.getIdentifier( entity, this ); @@ -432,31 +613,23 @@ public CompletionStage reactiveRefresh(String entityName, Object entity, L if ( persister.canWriteToCache() ) { final EntityDataAccess cacheAccess = persister.getCacheAccessStrategy(); if ( cacheAccess != null ) { - final Object ck = cacheAccess.generateCacheKey( - id, - persister, - getFactory(), - getTenantIdentifier() - ); + final Object ck = cacheAccess.generateCacheKey( id, persister, getFactory(), getTenantIdentifier() ); cacheAccess.evict( ck ); } } return fromInternalFetchProfile( REFRESH, () -> persister.reactiveLoad( id, entity, getNullSafeLockMode( lockMode ), this ) ) .thenAccept( result -> { + UnresolvableObjectException.throwIfNull( result, id, persister.getEntityName() ); if ( getPersistenceContext().isLoadFinished() ) { getPersistenceContext().clear(); } - UnresolvableObjectException.throwIfNull( result, id, persister.getEntityName() ); } ); } - private CompletionStage fromInternalFetchProfile( - CascadingFetchProfile cascadingFetchProfile, - Supplier> supplier) { + private CompletionStage fromInternalFetchProfile(CascadingFetchProfile cascadingFetchProfile, Supplier> supplier) { CascadingFetchProfile previous = getLoadQueryInfluencers().getEnabledCascadingFetchProfile(); - return voidFuture() - .thenCompose( v -> { + return supplyStage( () -> { getLoadQueryInfluencers().setEnabledCascadingFetchProfile( cascadingFetchProfile ); return supplier.get(); } ) @@ -479,38 +652,30 @@ public CompletionStage reactiveUpsert(Object entity) { public CompletionStage reactiveUpsert(String entityName, Object entity) { checkOpen(); final ReactiveEntityPersister persister = getEntityPersister( entityName, entity ); - Object id = persister.getIdentifier( entity, this ); - Boolean knownTransient = persister.isTransient( entity, this ); - if ( knownTransient != null && knownTransient ) { - throw new TransientObjectException( - "Object passed to upsert() has a null identifier: " - + persister.getEntityName() ); -// final Generator generator = persister.getGenerator(); -// if ( !generator.generatedOnExecution() ) { -// id = ( (BeforeExecutionGenerator) generator).generate( this, entity, null, INSERT ); -// } - } + final Object id = idToUpsert( entity, persister ); final Object[] state = persister.getValues( entity ); - final Object oldVersion; - if ( persister.isVersioned() ) { - oldVersion = persister.getVersion( entity ); - if ( oldVersion == null ) { - if ( seedVersion( entity, state, persister, this ) ) { - persister.setValues( entity, state ); - } - } - else { - final Object newVersion = incrementVersion( entity, oldVersion, persister, this ); - setVersion( state, newVersion, persister ); - persister.setValues( entity, state ); - } + if ( firePreUpsert( entity, id, state, persister ) ) { + return voidFuture(); } - else { - oldVersion = null; - } - - return persister - .mergeReactive( id, state, null, false, null, oldVersion, entity, null, this ); + getInterceptor().onUpsert( entity, id, state, persister.getPropertyNames(), persister.getPropertyTypes() ); + final Object oldVersion = versionToUpsert( entity, persister, state ); + final Object ck = lockCacheItem( id, oldVersion, persister ); + final EventMonitor eventMonitor = getEventMonitor(); + final DiagnosticEvent event = eventMonitor.beginEntityUpsertEvent(); + return supplyStage( () -> persister + .mergeReactive( id, state, null, false, null, oldVersion, entity, null, this ) ) + .whenComplete( (v, throwable) -> eventMonitor + .completeEntityUpsertEvent( event, id, persister.getEntityName(), throwable != null, this ) + ) + .thenAccept( v -> { + removeCacheItem( ck, persister ); + final StatisticsImplementor statistics = getFactory().getStatistics(); + if ( statistics.isStatisticsEnabled() ) { + statistics.upsertEntity( persister.getEntityName() ); + } + } ) + .thenCompose( v -> removeAndRecreateCollections( entity, id, persister ) ) + .thenAccept( v -> firePostUpsert( entity, id, state, persister ) ); } @Override @@ -570,7 +735,6 @@ public CompletionStage reactiveDeleteAll(int batchSize, Object... entities .whenComplete( (v, throwable) -> batchingHelperSession.setJdbcBatchSize( jdbcBatchSize ) ); } - @Override public CompletionStage reactiveRefreshAll(Object... entities) { return loop( entities, batchingHelperSession::reactiveRefresh ) @@ -592,96 +756,26 @@ private ReactiveConnection batchingConnection(int batchSize) { .withBatchSize( batchSize ); } - private Object createProxy(EntityKey entityKey) { - final Object proxy = entityKey.getPersister().createProxy( entityKey.getIdentifier(), this ); - getPersistenceContext().addProxy( entityKey, proxy ); - return proxy; + @Override + public CompletionStage reactiveInternalLoad(String entityName, Object id, boolean eager, boolean nullable) { + Object object = super.internalLoad( entityName, id, eager, nullable ); + return object instanceof CompletionStage + ? (CompletionStage) object + : completedFuture( object ); } @Override - public CompletionStage reactiveInternalLoad( - String entityName, - Object id, - boolean eager, - boolean nullable) { - checkOpen(); - - final EntityPersister persister = getEntityPersister( entityName ); - final EntityKey entityKey = generateEntityKey( id, persister ); - - // first, try to load it from the temp PC associated to this SS - final PersistenceContext persistenceContext = getPersistenceContext(); - final Object loaded = persistenceContext.getEntity( entityKey ); - if ( loaded != null ) { - // we found it in the temp PC. Should indicate we are in the midst of processing a result set - // containing eager fetches via join fetch - return completedFuture( loaded ); - } - - if ( !eager ) { - // caller did not request forceful eager loading, see if we can create - // some form of proxy - - // first, check to see if we can use "bytecode proxies" - - final BytecodeEnhancementMetadata enhancementMetadata = persister.getBytecodeEnhancementMetadata(); - if ( enhancementMetadata.isEnhancedForLazyLoading() ) { - - // if the entity defines a HibernateProxy factory, see if there is an - // existing proxy associated with the PC - and if so, use it - if ( persister.getRepresentationStrategy().getProxyFactory() != null ) { - final Object proxy = persistenceContext.getProxy( entityKey ); - - if ( proxy != null ) { - if ( LOG.isTraceEnabled() ) { - LOG.trace( "Entity proxy found in session cache" ); - } - if ( LOG.isDebugEnabled() && ( (HibernateProxy) proxy ).getHibernateLazyInitializer().isUnwrap() ) { - LOG.debug( "Ignoring NO_PROXY to honor laziness" ); - } - - return completedFuture( persistenceContext.narrowProxy( proxy, persister, entityKey, null ) ); - } - - // specialized handling for entities with subclasses with a HibernateProxy factory - if ( persister.hasSubclasses() ) { - // entities with subclasses that define a ProxyFactory can create - // a HibernateProxy. -// LOG.debugf( "Creating a HibernateProxy for to-one association with subclasses to honor laziness" ); - return completedFuture( createProxy( entityKey ) ); - } - return completedFuture( enhancementMetadata.createEnhancedProxy( entityKey, false, this ) ); - } - else if ( !persister.hasSubclasses() ) { - return completedFuture( enhancementMetadata.createEnhancedProxy( entityKey, false, this ) ); - } - // If we get here, then the entity class has subclasses and there is no HibernateProxy factory. - // The entity will get loaded below. - } - else { - if ( persister.hasProxy() ) { - final Object existingProxy = persistenceContext.getProxy( entityKey ); - if ( existingProxy != null ) { - return completedFuture( persistenceContext.narrowProxy( existingProxy, persister, entityKey, null ) ); - } - else { - return completedFuture( createProxy( entityKey ) ); - } - } - } - } - + protected Object internalLoadGet(String entityName, Object id, PersistenceContext persistenceContext) { // otherwise immediately materialize it // IMPLEMENTATION NOTE: increment/decrement the load count before/after getting the value // to ensure that #get does not clear the PersistenceContext. persistenceContext.beforeLoad(); - return this.reactiveGet( persister.getEntityName(), id ) + return this.reactiveGet( entityName, id ) .whenComplete( (r, e) -> persistenceContext.afterLoad() ); } @Override - @SuppressWarnings("unchecked") public CompletionStage reactiveFetch(T association, boolean unproxy) { checkOpen(); if ( association == null ) { @@ -691,84 +785,92 @@ public CompletionStage reactiveFetch(T association, boolean unproxy) { final PersistenceContext persistenceContext = getPersistenceContext(); final LazyInitializer initializer = extractLazyInitializer( association ); if ( initializer != null ) { - if ( initializer.isUninitialized() ) { - final String entityName = initializer.getEntityName(); - final Object id = initializer.getIdentifier(); - initializer.setSession( this ); - persistenceContext.beforeLoad(); - - final ReactiveEntityPersister persister = getEntityPersister( entityName ); - - // This is hard to test because it happens on slower machines like the ones we use on CI. - // See AbstractLazyInitializer#initialize, it happens when the object is not initialized and we need to - // call session.immediateLoad - final CompletionStage stage = initializer.getImplementation() instanceof CompletionStage - ? (CompletionStage) initializer.getImplementation() - : completedFuture( initializer.getImplementation() ); - - return stage.thenCompose( implementation -> persister.reactiveLoad( id, implementation, LockOptions.NONE, this ) ) - .thenApply( entity -> { - checkEntityFound( this, entityName, id, entity ); - initializer.setImplementation( entity ); - return unproxy ? (T) entity : association; - } ) - .whenComplete( (v, e) -> { - initializer.unsetSession(); - persistenceContext.afterLoad(); - if ( persistenceContext.isLoadFinished() ) { - persistenceContext.clear(); - } - } ); - } - else { - // Initialized - return completedFuture( unproxy ? (T) initializer.getImplementation() : association ); - } - } - else if ( association instanceof PersistentCollection collection ) { - if ( collection.wasInitialized() ) { - return completedFuture( association ); - } - else { - final ReactiveCollectionPersister collectionDescriptor = - (ReactiveCollectionPersister) getFactory().getMappingMetamodel() - .getCollectionDescriptor( collection.getRole() ); - - final Object key = collection.getKey(); - persistenceContext.addUninitializedCollection( collectionDescriptor, collection, key ); - collection.setCurrentSession( this ); - return collectionDescriptor.reactiveInitialize( key, this ) - .whenComplete( (v, e) -> { - collection.unsetSession( this ); - if ( persistenceContext.isLoadFinished() ) { - persistenceContext.clear(); - } - } ) - .thenApply( v -> association ); - } + return initializer.isUninitialized() + ? fetchUninitialized( association, unproxy, initializer, persistenceContext ) + : completedFuture( unproxy ? (T) initializer.getImplementation() : association ); } else if ( isPersistentAttributeInterceptable( association ) ) { - final PersistentAttributeInterceptable interceptable = asPersistentAttributeInterceptable( association ); - final PersistentAttributeInterceptor interceptor = interceptable.$$_hibernate_getInterceptor(); - if ( interceptor instanceof EnhancementAsProxyLazinessInterceptor lazinessInterceptor ) { - lazinessInterceptor.setSession( this ); - return forceInitialize( association, null, lazinessInterceptor.getIdentifier(), lazinessInterceptor.getEntityName(), this ) + if ( asPersistentAttributeInterceptable( association ).$$_hibernate_getInterceptor() + instanceof EnhancementAsProxyLazinessInterceptor proxyInterceptor ) { + proxyInterceptor.setSession( this ); + return forceInitialize( association, null, proxyInterceptor.getIdentifier(), proxyInterceptor.getEntityName(), this ) .whenComplete( (i,e) -> { - lazinessInterceptor.unsetSession(); + proxyInterceptor.unsetSession(); if ( persistenceContext.isLoadFinished() ) { persistenceContext.clear(); } } ) .thenApply( i -> association ); - - } - else { - return completedFuture( association ); } } - else { - return completedFuture( association ); + else if ( association instanceof PersistentCollection collection && !collection.wasInitialized() ) { + final ReactiveCollectionPersister collectionDescriptor = (ReactiveCollectionPersister) getFactory() + .getMappingMetamodel().getCollectionDescriptor( collection.getRole() ); + + final Object key = collection.getKey(); + persistenceContext.addUninitializedCollection( collectionDescriptor, collection, key ); + collection.setCurrentSession( this ); + return supplyStage( () -> { + if ( initializeCollectionFromCache( key, collectionDescriptor, collection, this ) ) { + LOG.trace( "Collection fetched from cache" ); + return completedFuture( association ); + } + else { + return collectionDescriptor + .reactiveInitialize( key, this ) + .thenApply( v -> { + handlePotentiallyEmptyCollection( collection, getPersistenceContextInternal(), key, collectionDescriptor ); + LOG.trace( "Collection fetched" ); + final StatisticsImplementor statistics = getFactory().getStatistics(); + if ( statistics.isStatisticsEnabled() ) { + statistics.fetchCollection( collectionDescriptor.getRole() ); + } + return association; + } ); + } + } ).whenComplete( (t, throwable) -> { + collection.$$_hibernate_setInstanceId( 0 ); + collection.unsetSession( this ); + if ( persistenceContext.isLoadFinished() ) { + persistenceContext.clear(); + } + } ); } + + return completedFuture( association ); + } + + private CompletionStage fetchUninitialized( + T association, + boolean unproxy, + LazyInitializer initializer, + PersistenceContext persistenceContext) { + final String entityName = initializer.getEntityName(); + final Object id = initializer.getIdentifier(); + initializer.setSession( this ); + persistenceContext.beforeLoad(); + return reactiveImplementation( initializer ) + .thenApply( entity -> { + checkEntityFound( this, entityName, id, entity ); + initializer.setImplementation( entity ); + return unproxy ? (T) entity : association; + } ) + .whenComplete( (v, e) -> { + initializer.unsetSession(); + persistenceContext.afterLoad(); + if ( persistenceContext.isLoadFinished() ) { + persistenceContext.clear(); + } + } ); + } + + private static CompletionStage reactiveImplementation(LazyInitializer initializer) { + // This is hard to test because it happens on slower machines like the ones we use on CI. + // See AbstractLazyInitializer#initialize, it happens when the object is not initialized, and we need to + // call session.immediateLoad + return initializer.getImplementation() instanceof CompletionStage + ? (CompletionStage) initializer.getImplementation() + : completedFuture( initializer.getImplementation() ); } @Override diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/util/impl/CompletionStages.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/util/impl/CompletionStages.java index eb311aba6..397ca0e9e 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/util/impl/CompletionStages.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/util/impl/CompletionStages.java @@ -90,6 +90,26 @@ public static CompletionStage completedFuture(T value) { return CompletableFuture.completedFuture( value ); } + /** + * Useful for implementing try-finally blocks. + * For example: + *
{@code
+	 * return supplyStage( () -> {
+	 *     // Any error here will get caught
+	 *     ...
+	 *     })
+	 *     .whenComplete( (r,t) -> {
+	 *         // finally block
+	 *         ...
+	 *     })
+	 * }
+ */ + public static CompletionStage supplyStage(Supplier> supplier) { + // Using the voidFuture() is the simplest way I found to make sure that everything run in the correct executor + return voidFuture() + .thenCompose( v -> supplier.get() ); + } + public static CompletionStage failedFuture(Throwable t) { CompletableFuture ret = new CompletableFuture<>(); ret.completeExceptionally( t );