Skip to content

Reduce cache put batches fragmentation #1798

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions src/NHibernate.Test/Async/CacheTest/BatchableCacheFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -450,33 +450,32 @@ public async Task MultiplePutReadWriteTestAsync()
ids.AddRange(items.OrderBy(o => o.Id).Select(o => o.Id));
await (tx.CommitAsync());
}
Assert.That(cache.PutCalls, Has.Count.EqualTo(0));
Assert.That(cache.GetMultipleCalls, Has.Count.EqualTo(2));
Assert.That(cache.PutCalls, Has.Count.EqualTo(0), "Cache put");
Assert.That(cache.PutMultipleCalls, Has.Count.EqualTo(1), "Cache put many");
// Lock get
Assert.That(cache.GetMultipleCalls, Has.Count.EqualTo(1), "Cache get many");

AssertEquivalent(
ids,
new[]
{
new[] {0, 1, 2},
new[] {3, 4, 5}
new[] {0, 1, 2, 3, 4, 5}
},
cache.PutMultipleCalls
);
AssertEquivalent(
ids,
new[]
{
new[] {0, 1, 2},
new[] {3, 4, 5}
new[] {0, 1, 2, 3, 4, 5}
},
cache.LockMultipleCalls
);
AssertEquivalent(
ids,
new[]
{
new[] {0, 1, 2},
new[] {3, 4, 5}
new[] {0, 1, 2, 3, 4, 5}
},
cache.UnlockMultipleCalls
);
Expand Down
15 changes: 7 additions & 8 deletions src/NHibernate.Test/CacheTest/BatchableCacheFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -438,33 +438,32 @@ public void MultiplePutReadWriteTest()
ids.AddRange(items.OrderBy(o => o.Id).Select(o => o.Id));
tx.Commit();
}
Assert.That(cache.PutCalls, Has.Count.EqualTo(0));
Assert.That(cache.GetMultipleCalls, Has.Count.EqualTo(2));
Assert.That(cache.PutCalls, Has.Count.EqualTo(0), "Cache put");
Assert.That(cache.PutMultipleCalls, Has.Count.EqualTo(1), "Cache put many");
// Lock get
Assert.That(cache.GetMultipleCalls, Has.Count.EqualTo(1), "Cache get many");

AssertEquivalent(
ids,
new[]
{
new[] {0, 1, 2},
new[] {3, 4, 5}
new[] {0, 1, 2, 3, 4, 5}
},
cache.PutMultipleCalls
);
AssertEquivalent(
ids,
new[]
{
new[] {0, 1, 2},
new[] {3, 4, 5}
new[] {0, 1, 2, 3, 4, 5}
},
cache.LockMultipleCalls
);
AssertEquivalent(
ids,
new[]
{
new[] {0, 1, 2},
new[] {3, 4, 5}
new[] {0, 1, 2, 3, 4, 5}
},
cache.UnlockMultipleCalls
);
Expand Down
64 changes: 14 additions & 50 deletions src/NHibernate/Async/Cache/CacheBatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
//------------------------------------------------------------------------------


using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using NHibernate.Engine;
using NHibernate.Persister.Collection;
using NHibernate.Persister.Entity;
Expand All @@ -21,59 +23,13 @@ public sealed partial class CacheBatcher
{

/// <summary>
/// Adds a put operation to the batch. If the batch size reached the persister batch
/// size, the batch will be executed.
/// </summary>
/// <param name="persister">The entity persister.</param>
/// <param name="data">The data to put in the cache.</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the work</param>
internal async Task AddToBatchAsync(IEntityPersister persister, CachePutData data, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
if (ShouldExecuteBatch(persister, _putBatch))
{
await (ExecuteBatchAsync(cancellationToken)).ConfigureAwait(false);
_currentPersister = persister;
_currentBatch = _putBatch = new CachePutBatch(_session, persister.Cache);
}
if (Log.IsDebugEnabled())
{
Log.Debug("Adding a put operation to batch for entity {0} and key {1}", persister.EntityName, data.Key);
}
_putBatch.Add(data);
}

/// <summary>
/// Adds a put operation to the batch. If the batch size reached the persister batch
/// size, the batch will be executed.
/// </summary>
/// <param name="persister">The collection persister.</param>
/// <param name="data">The data to put in the cache.</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the work</param>
internal async Task AddToBatchAsync(ICollectionPersister persister, CachePutData data, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
if (ShouldExecuteBatch(persister, _putBatch))
{
await (ExecuteBatchAsync(cancellationToken)).ConfigureAwait(false);
_currentPersister = persister;
_currentBatch = _putBatch = new CachePutBatch(_session, persister.Cache);
}
if (Log.IsDebugEnabled())
{
Log.Debug("Adding a put operation to batch for collection role {0} and key {1}", persister.Role, data.Key);
}
_putBatch.Add(data);
}

/// <summary>
/// Executes the current batch.
/// Executes the pending batches.
/// </summary>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the work</param>
internal async Task ExecuteBatchAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
if (_currentBatch == null || _currentBatch.BatchSize == 0)
if (_putBatches.Count == 0)
{
return;
}
Expand All @@ -85,10 +41,18 @@ internal async Task ExecuteBatchAsync(CancellationToken cancellationToken)
{
duration = Stopwatch.StartNew();
}
await (_currentBatch.ExecuteAsync(cancellationToken)).ConfigureAwait(false);

foreach (var batch in _putBatches.Values)
{
await (batch.ExecuteAsync(cancellationToken)).ConfigureAwait(false);
}

if (Log.IsDebugEnabled() && duration != null)
{
Log.Debug("ExecuteBatch for {0} keys took {1} ms", _currentBatch.BatchSize, duration.ElapsedMilliseconds);
Log.Debug(
"ExecuteBatch for {0} batches totaling {1} keys took {2} ms",
_putBatches.Count, _putBatches.Values.Sum(b => b.BatchSize),
duration.ElapsedMilliseconds);
}
}
finally
Expand Down
79 changes: 34 additions & 45 deletions src/NHibernate/Cache/CacheBatcher.cs
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
using System.Diagnostics;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using NHibernate.Engine;
using NHibernate.Persister.Collection;
using NHibernate.Persister.Entity;

namespace NHibernate.Cache
{
/// <summary>
/// A batcher for batching operations of <see cref="ICacheConcurrencyStrategy"/>, where the batch size is retrieved
/// from an <see cref="IEntityPersister"/> or <see cref="ICollectionPersister"/>.
/// When a different persister or a different operation is added to the batch, the current batch will be executed.
/// A batcher for batching operations of <see cref="ICacheConcurrencyStrategy"/>.
/// </summary>
public sealed partial class CacheBatcher
{
private CachePutBatch _putBatch;
private readonly Dictionary<ICacheConcurrencyStrategy, CachePutBatch> _putBatches =
new Dictionary<ICacheConcurrencyStrategy, CachePutBatch>();
private readonly ISessionImplementor _session;
private AbstractCacheBatch _currentBatch;
private object _currentPersister;

private static readonly INHibernateLogger Log = NHibernateLogger.For(typeof(CacheBatcher));

Expand All @@ -25,53 +24,50 @@ internal CacheBatcher(ISessionImplementor session)
}

/// <summary>
/// Adds a put operation to the batch. If the batch size reached the persister batch
/// size, the batch will be executed.
/// Adds a put operation to the batch.
/// </summary>
/// <param name="persister">The entity persister.</param>
/// <param name="data">The data to put in the cache.</param>
internal void AddToBatch(IEntityPersister persister, CachePutData data)
{
if (ShouldExecuteBatch(persister, _putBatch))
{
ExecuteBatch();
_currentPersister = persister;
_currentBatch = _putBatch = new CachePutBatch(_session, persister.Cache);
}
if (Log.IsDebugEnabled())
{
Log.Debug("Adding a put operation to batch for entity {0} and key {1}", persister.EntityName, data.Key);
}
_putBatch.Add(data);
AddToBatch(persister.Cache, data);
}

/// <summary>
/// Adds a put operation to the batch. If the batch size reached the persister batch
/// size, the batch will be executed.
/// Adds a put operation to the batch.
/// </summary>
/// <param name="persister">The collection persister.</param>
/// <param name="data">The data to put in the cache.</param>
internal void AddToBatch(ICollectionPersister persister, CachePutData data)
{
if (ShouldExecuteBatch(persister, _putBatch))
{
ExecuteBatch();
_currentPersister = persister;
_currentBatch = _putBatch = new CachePutBatch(_session, persister.Cache);
}
if (Log.IsDebugEnabled())
{
Log.Debug("Adding a put operation to batch for collection role {0} and key {1}", persister.Role, data.Key);
}
_putBatch.Add(data);
AddToBatch(persister.Cache, data);
}

private void AddToBatch(ICacheConcurrencyStrategy cache, CachePutData data)
{
if (!_putBatches.TryGetValue(cache, out var cachePutBatch))
{
cachePutBatch = new CachePutBatch(_session, cache);
_putBatches.Add(cache, cachePutBatch);
}

cachePutBatch.Add(data);
}

/// <summary>
/// Executes the current batch.
/// Executes the pending batches.
/// </summary>
internal void ExecuteBatch()
{
if (_currentBatch == null || _currentBatch.BatchSize == 0)
if (_putBatches.Count == 0)
{
return;
}
Expand All @@ -83,10 +79,18 @@ internal void ExecuteBatch()
{
duration = Stopwatch.StartNew();
}
_currentBatch.Execute();

foreach (var batch in _putBatches.Values)
{
batch.Execute();
}

if (Log.IsDebugEnabled() && duration != null)
{
Log.Debug("ExecuteBatch for {0} keys took {1} ms", _currentBatch.BatchSize, duration.ElapsedMilliseconds);
Log.Debug(
"ExecuteBatch for {0} batches totaling {1} keys took {2} ms",
_putBatches.Count, _putBatches.Values.Sum(b => b.BatchSize),
duration.ElapsedMilliseconds);
}
}
finally
Expand All @@ -100,22 +104,7 @@ internal void ExecuteBatch()
/// </summary>
internal void Cleanup()
{
_putBatch = null;

_currentBatch = null;
_currentPersister = null;
}

private bool ShouldExecuteBatch(IEntityPersister persister, AbstractCacheBatch batch)
{
return batch != _currentBatch || _currentPersister != persister ||
_currentBatch.BatchSize >= persister.GetBatchSize();
}

private bool ShouldExecuteBatch(ICollectionPersister persister, AbstractCacheBatch batch)
{
return batch != _currentBatch || _currentPersister != persister ||
_currentBatch.BatchSize >= persister.GetBatchSize();
_putBatches.Clear();
}
}
}