Skip to content

Improved collection batch fetching #1558

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jan 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions src/NHibernate/Async/Engine/BatchFetchQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using NHibernate.Persister.Entity;
using NHibernate.Util;
using System.Collections.Generic;
using Iesi.Collections.Generic;

namespace NHibernate.Engine
{
Expand All @@ -40,22 +41,33 @@ public async Task<object[]> GetCollectionBatchAsync(ICollectionPersister collect
int end = -1;
bool checkForEnd = false;

// this only works because collection entries are kept in a sequenced
// map by persistence context (maybe we should do like entities and
// keep a separate sequences set...)
foreach (DictionaryEntry me in context.CollectionEntries)
if (batchLoadableCollections.TryGetValue(collectionPersister.Role, out var map))
{
CollectionEntry ce = (CollectionEntry) me.Value;
IPersistentCollection collection = (IPersistentCollection) me.Key;
if (!collection.WasInitialized && ce.LoadedPersister == collectionPersister)
foreach (KeyValuePair<CollectionEntry, IPersistentCollection> me in map)
{
var ce = me.Key;
var collection = me.Value;
if (ce.LoadedKey == null)
{
// the LoadedKey of the CollectionEntry might be null as it might have been reset to null
// (see for example Collections.ProcessDereferencedCollection()
// and CollectionEntry.AfterAction())
// though we clear the queue on flush, it seems like a good idea to guard
// against potentially null LoadedKey:s
continue;
}

if (collection.WasInitialized)
{
log.Warn("Encountered initialized collection in BatchFetchQueue, this should not happen.");
continue;
}

if (checkForEnd && i == end)
{
return keys; //the first key found after the given key
}

//if ( end == -1 && count > batchSize*10 ) return keys; //try out ten batches, max

bool isEqual = collectionPersister.KeyType.IsEqual(id, ce.LoadedKey, collectionPersister.Factory);

if (isEqual)
Expand All @@ -79,6 +91,7 @@ public async Task<object[]> GetCollectionBatchAsync(ICollectionPersister collect
}
}
}

return keys; //we ran out of keys to try
}

Expand All @@ -92,7 +105,7 @@ public async Task<object[]> GetCollectionBatchAsync(ICollectionPersister collect
/// <param name="batchSize">The maximum number of keys to return</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the work</param>
/// <returns>an array of identifiers, of length batchSize (possibly padded with nulls)</returns>
public async Task<object[]> GetEntityBatchAsync(IEntityPersister persister,object id,int batchSize, CancellationToken cancellationToken)
public async Task<object[]> GetEntityBatchAsync(IEntityPersister persister, object id, int batchSize, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
object[] ids = new object[batchSize];
Expand All @@ -101,9 +114,9 @@ public async Task<object[]> GetEntityBatchAsync(IEntityPersister persister,objec
int end = -1;
bool checkForEnd = false;

foreach (EntityKey key in batchLoadableEntityKeys.Keys)
if (batchLoadableEntityKeys.TryGetValue(persister.EntityName, out var set))
{
if (key.EntityName.Equals(persister.EntityName))
foreach (var key in set)
{
//TODO: this needn't exclude subclasses...
if (checkForEnd && i == end)
Expand Down
12 changes: 7 additions & 5 deletions src/NHibernate/Async/Engine/Loading/CollectionLoadContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ private async Task EndLoadingCollectionAsync(LoadingCollectionEntry lce, ICollec
{
log.Debug("ending loading collection [{0}]", lce);
}
ISessionImplementor session = LoadContext.PersistenceContext.Session;

var persistenceContext = LoadContext.PersistenceContext;
var session = persistenceContext.Session;

bool statsEnabled = session.Factory.Statistics.IsStatisticsEnabled;
var stopWath = new Stopwatch();
Expand All @@ -141,17 +143,17 @@ private async Task EndLoadingCollectionAsync(LoadingCollectionEntry lce, ICollec

if (persister.CollectionType.HasHolder())
{
LoadContext.PersistenceContext.AddCollectionHolder(lce.Collection);
persistenceContext.AddCollectionHolder(lce.Collection);
}

CollectionEntry ce = LoadContext.PersistenceContext.GetCollectionEntry(lce.Collection);
CollectionEntry ce = persistenceContext.GetCollectionEntry(lce.Collection);
if (ce == null)
{
ce = LoadContext.PersistenceContext.AddInitializedCollection(persister, lce.Collection, lce.Key);
ce = persistenceContext.AddInitializedCollection(persister, lce.Collection, lce.Key);
}
else
{
ce.PostInitialize(lce.Collection);
ce.PostInitialize(lce.Collection, persistenceContext);
}

bool addToCache = hasNoQueuedAdds && persister.HasCache &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private async Task<bool> InitializeCollectionFromCacheAsync(object id, ICollecti
CollectionCacheEntry cacheEntry = (CollectionCacheEntry)persister.CacheEntryStructure.Destructure(ce, factory);
await (cacheEntry.AssembleAsync(collection, persister, persistenceContext.GetCollectionOwner(id, persister), cancellationToken)).ConfigureAwait(false);

persistenceContext.GetCollectionEntry(collection).PostInitialize(collection);
persistenceContext.GetCollectionEntry(collection).PostInitialize(collection, persistenceContext);
return true;
}
}
Expand Down
98 changes: 77 additions & 21 deletions src/NHibernate/Engine/BatchFetchQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,23 @@
using NHibernate.Persister.Entity;
using NHibernate.Util;
using System.Collections.Generic;
using Iesi.Collections.Generic;

namespace NHibernate.Engine
{
public partial class BatchFetchQueue
{
private static readonly object Marker = new object();
private static readonly INHibernateLogger log = NHibernateLogger.For(typeof(BatchFetchQueue));

/// <summary>
/// Defines a sequence of <see cref="EntityKey" /> elements that are currently
/// eligible for batch fetching.
/// Used to hold information about the entities that are currently eligible for batch-fetching. Ultimately
/// used by <see cref="GetEntityBatch" /> to build entity load batches.
/// </summary>
/// <remarks>
/// Even though this is a map, we only use the keys. A map was chosen in
/// order to utilize a <see cref="LinkedHashMap{K, V}" /> to maintain sequencing
/// as well as uniqueness.
/// A Map structure is used to segment the keys by entity type since loading can only be done for a particular entity
/// type at a time.
/// </remarks>
private readonly IDictionary<EntityKey, object> batchLoadableEntityKeys = new LinkedHashMap<EntityKey, object>(8);
private readonly IDictionary<string, LinkedHashSet<EntityKey>> batchLoadableEntityKeys = new Dictionary<string, LinkedHashSet<EntityKey>>(8);

/// <summary>
/// A map of <see cref="SubselectFetch">subselect-fetch descriptors</see>
Expand All @@ -30,6 +30,7 @@ public partial class BatchFetchQueue
/// </summary>
private readonly IDictionary<EntityKey, SubselectFetch> subselectsByEntityKey = new Dictionary<EntityKey, SubselectFetch>(8);

private readonly IDictionary<string, LinkedHashMap<CollectionEntry, IPersistentCollection>> batchLoadableCollections = new Dictionary<string, LinkedHashMap<CollectionEntry, IPersistentCollection>>(8);
/// <summary>
/// The owning persistence context.
/// </summary>
Expand All @@ -50,6 +51,7 @@ public BatchFetchQueue(IPersistenceContext context)
public void Clear()
{
batchLoadableEntityKeys.Clear();
batchLoadableCollections.Clear();
subselectsByEntityKey.Clear();
}

Expand Down Expand Up @@ -113,7 +115,12 @@ public void AddBatchLoadableEntityKey(EntityKey key)
{
if (key.IsBatchLoadable)
{
batchLoadableEntityKeys[key] = Marker;
if (!batchLoadableEntityKeys.TryGetValue(key.EntityName, out var set))
{
set = new LinkedHashSet<EntityKey>();
batchLoadableEntityKeys.Add(key.EntityName, set);
}
set.Add(key);
}
}

Expand All @@ -125,7 +132,44 @@ public void AddBatchLoadableEntityKey(EntityKey key)
public void RemoveBatchLoadableEntityKey(EntityKey key)
{
if (key.IsBatchLoadable)
batchLoadableEntityKeys.Remove(key);
{
if (batchLoadableEntityKeys.TryGetValue(key.EntityName, out var set))
{
set.Remove(key);
}
}
}

/// <summary>
/// If a CollectionEntry represents a batch loadable collection, add
/// it to the queue.
/// </summary>
/// <param name="collection"></param>
/// <param name="ce"></param>
public void AddBatchLoadableCollection(IPersistentCollection collection, CollectionEntry ce)
{
var persister = ce.LoadedPersister;

if (!batchLoadableCollections.TryGetValue(persister.Role, out var map))
{
map = new LinkedHashMap<CollectionEntry, IPersistentCollection>();
batchLoadableCollections.Add(persister.Role, map);
}
map[ce] = collection;
}

/// <summary>
/// After a collection was initialized or evicted, we don't
/// need to batch fetch it anymore, remove it from the queue
/// if necessary
/// </summary>
/// <param name="ce"></param>
public void RemoveBatchLoadableCollection(CollectionEntry ce)
{
if (batchLoadableCollections.TryGetValue(ce.LoadedPersister.Role, out var map))
{
map.Remove(ce);
}
}

/// <summary>
Expand All @@ -143,22 +187,33 @@ public object[] GetCollectionBatch(ICollectionPersister collectionPersister, obj
int end = -1;
bool checkForEnd = false;

// this only works because collection entries are kept in a sequenced
// map by persistence context (maybe we should do like entities and
// keep a separate sequences set...)
foreach (DictionaryEntry me in context.CollectionEntries)
if (batchLoadableCollections.TryGetValue(collectionPersister.Role, out var map))
{
CollectionEntry ce = (CollectionEntry) me.Value;
IPersistentCollection collection = (IPersistentCollection) me.Key;
if (!collection.WasInitialized && ce.LoadedPersister == collectionPersister)
foreach (KeyValuePair<CollectionEntry, IPersistentCollection> me in map)
{
var ce = me.Key;
var collection = me.Value;
if (ce.LoadedKey == null)
{
// the LoadedKey of the CollectionEntry might be null as it might have been reset to null
// (see for example Collections.ProcessDereferencedCollection()
// and CollectionEntry.AfterAction())
// though we clear the queue on flush, it seems like a good idea to guard
// against potentially null LoadedKey:s
continue;
}

if (collection.WasInitialized)
{
log.Warn("Encountered initialized collection in BatchFetchQueue, this should not happen.");
continue;
}

if (checkForEnd && i == end)
{
return keys; //the first key found after the given key
}

//if ( end == -1 && count > batchSize*10 ) return keys; //try out ten batches, max

bool isEqual = collectionPersister.KeyType.IsEqual(id, ce.LoadedKey, collectionPersister.Factory);

if (isEqual)
Expand All @@ -182,6 +237,7 @@ public object[] GetCollectionBatch(ICollectionPersister collectionPersister, obj
}
}
}

return keys; //we ran out of keys to try
}

Expand All @@ -194,17 +250,17 @@ public object[] GetCollectionBatch(ICollectionPersister collectionPersister, obj
/// <param name="id">The identifier of the entity currently demanding load.</param>
/// <param name="batchSize">The maximum number of keys to return</param>
/// <returns>an array of identifiers, of length batchSize (possibly padded with nulls)</returns>
public object[] GetEntityBatch(IEntityPersister persister,object id,int batchSize)
public object[] GetEntityBatch(IEntityPersister persister, object id, int batchSize)
{
object[] ids = new object[batchSize];
ids[0] = id; //first element of array is reserved for the actual instance we are loading!
int i = 1;
int end = -1;
bool checkForEnd = false;

foreach (EntityKey key in batchLoadableEntityKeys.Keys)
if (batchLoadableEntityKeys.TryGetValue(persister.EntityName, out var set))
{
if (key.EntityName.Equals(persister.EntityName))
foreach (var key in set)
{
//TODO: this needn't exclude subclasses...
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comments now looks redundant.

if (checkForEnd && i == end)
Expand Down
20 changes: 20 additions & 0 deletions src/NHibernate/Engine/CollectionEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -298,12 +298,32 @@ public void PreFlush(IPersistentCollection collection)
/// has been initialized.
/// </summary>
/// <param name="collection">The initialized <see cref="AbstractPersistentCollection"/> that this Entry is for.</param>
//Since v5.1
[Obsolete("Please use PostInitialize(collection, persistenceContext) instead.")]
public void PostInitialize(IPersistentCollection collection)
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The single argument PostInitialize should be obsoleted. Or is there any reason to keep it, beside avoiding a breaking change? It seems now only called from the two arguments PostInitialize.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

snapshot = LoadedPersister.IsMutable ? collection.GetSnapshot(LoadedPersister) : null;
collection.SetSnapshot(loadedKey, role, snapshot);
}

/// <summary>
/// Updates the CollectionEntry to reflect that the <see cref="IPersistentCollection"/>
/// has been initialized.
/// </summary>
/// <param name="collection">The initialized <see cref="AbstractPersistentCollection"/> that this Entry is for.</param>
/// <param name="persistenceContext"></param>
public void PostInitialize(IPersistentCollection collection, IPersistenceContext persistenceContext)
{
#pragma warning disable 618
//6.0 TODO: Inline PostInitialize here.
PostInitialize(collection);
#pragma warning restore 618
if (LoadedPersister.GetBatchSize() > 1)
{
persistenceContext.BatchFetchQueue.RemoveBatchLoadableCollection(this);
}
}

/// <summary>
/// Updates the CollectionEntry to reflect that it is has been successfully flushed to the database.
/// </summary>
Expand Down
12 changes: 7 additions & 5 deletions src/NHibernate/Engine/Loading/CollectionLoadContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,9 @@ private void EndLoadingCollection(LoadingCollectionEntry lce, ICollectionPersist
{
log.Debug("ending loading collection [{0}]", lce);
}
ISessionImplementor session = LoadContext.PersistenceContext.Session;

var persistenceContext = LoadContext.PersistenceContext;
var session = persistenceContext.Session;

bool statsEnabled = session.Factory.Statistics.IsStatisticsEnabled;
var stopWath = new Stopwatch();
Expand All @@ -247,17 +249,17 @@ private void EndLoadingCollection(LoadingCollectionEntry lce, ICollectionPersist

if (persister.CollectionType.HasHolder())
{
LoadContext.PersistenceContext.AddCollectionHolder(lce.Collection);
persistenceContext.AddCollectionHolder(lce.Collection);
}

CollectionEntry ce = LoadContext.PersistenceContext.GetCollectionEntry(lce.Collection);
CollectionEntry ce = persistenceContext.GetCollectionEntry(lce.Collection);
if (ce == null)
{
ce = LoadContext.PersistenceContext.AddInitializedCollection(persister, lce.Collection, lce.Key);
ce = persistenceContext.AddInitializedCollection(persister, lce.Collection, lce.Key);
}
else
{
ce.PostInitialize(lce.Collection);
ce.PostInitialize(lce.Collection, persistenceContext);
}

bool addToCache = hasNoQueuedAdds && persister.HasCache &&
Expand Down
6 changes: 5 additions & 1 deletion src/NHibernate/Engine/StatefulPersistenceContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,10 @@ public void AddUninitializedCollection(ICollectionPersister persister, IPersiste
{
CollectionEntry ce = new CollectionEntry(collection, persister, id, flushing);
AddCollection(collection, ce, id);
if (persister.GetBatchSize() > 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here but for ICollectionPersister.

{
batchFetchQueue.AddBatchLoadableCollection(collection, ce);
}
}

/// <summary> add a detached uninitialized collection</summary>
Expand Down Expand Up @@ -913,7 +917,7 @@ public CollectionEntry AddInitializedCollection(ICollectionPersister persister,
object id)
{
CollectionEntry ce = new CollectionEntry(collection, persister, id, flushing);
ce.PostInitialize(collection);
ce.PostInitialize(collection, this);
AddCollection(collection, ce, id);
return ce;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private bool InitializeCollectionFromCache(object id, ICollectionPersister persi
CollectionCacheEntry cacheEntry = (CollectionCacheEntry)persister.CacheEntryStructure.Destructure(ce, factory);
cacheEntry.Assemble(collection, persister, persistenceContext.GetCollectionOwner(id, persister));

persistenceContext.GetCollectionEntry(collection).PostInitialize(collection);
persistenceContext.GetCollectionEntry(collection).PostInitialize(collection, persistenceContext);
return true;
}
}
Expand Down
Loading