Skip to content

Commit 2b45b4c

Browse files
fredericDelaportehazzik
authored andcommitted
Reduce cache put batches fragmentation (#1798)
1 parent 78abbbe commit 2b45b4c

File tree

4 files changed

+62
-111
lines changed

4 files changed

+62
-111
lines changed

src/NHibernate.Test/Async/CacheTest/BatchableCacheFixture.cs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -450,33 +450,32 @@ public async Task MultiplePutReadWriteTestAsync()
450450
ids.AddRange(items.OrderBy(o => o.Id).Select(o => o.Id));
451451
await (tx.CommitAsync());
452452
}
453-
Assert.That(cache.PutCalls, Has.Count.EqualTo(0));
454-
Assert.That(cache.GetMultipleCalls, Has.Count.EqualTo(2));
453+
Assert.That(cache.PutCalls, Has.Count.EqualTo(0), "Cache put");
454+
Assert.That(cache.PutMultipleCalls, Has.Count.EqualTo(1), "Cache put many");
455+
// Lock get
456+
Assert.That(cache.GetMultipleCalls, Has.Count.EqualTo(1), "Cache get many");
455457

456458
AssertEquivalent(
457459
ids,
458460
new[]
459461
{
460-
new[] {0, 1, 2},
461-
new[] {3, 4, 5}
462+
new[] {0, 1, 2, 3, 4, 5}
462463
},
463464
cache.PutMultipleCalls
464465
);
465466
AssertEquivalent(
466467
ids,
467468
new[]
468469
{
469-
new[] {0, 1, 2},
470-
new[] {3, 4, 5}
470+
new[] {0, 1, 2, 3, 4, 5}
471471
},
472472
cache.LockMultipleCalls
473473
);
474474
AssertEquivalent(
475475
ids,
476476
new[]
477477
{
478-
new[] {0, 1, 2},
479-
new[] {3, 4, 5}
478+
new[] {0, 1, 2, 3, 4, 5}
480479
},
481480
cache.UnlockMultipleCalls
482481
);

src/NHibernate.Test/CacheTest/BatchableCacheFixture.cs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -438,33 +438,32 @@ public void MultiplePutReadWriteTest()
438438
ids.AddRange(items.OrderBy(o => o.Id).Select(o => o.Id));
439439
tx.Commit();
440440
}
441-
Assert.That(cache.PutCalls, Has.Count.EqualTo(0));
442-
Assert.That(cache.GetMultipleCalls, Has.Count.EqualTo(2));
441+
Assert.That(cache.PutCalls, Has.Count.EqualTo(0), "Cache put");
442+
Assert.That(cache.PutMultipleCalls, Has.Count.EqualTo(1), "Cache put many");
443+
// Lock get
444+
Assert.That(cache.GetMultipleCalls, Has.Count.EqualTo(1), "Cache get many");
443445

444446
AssertEquivalent(
445447
ids,
446448
new[]
447449
{
448-
new[] {0, 1, 2},
449-
new[] {3, 4, 5}
450+
new[] {0, 1, 2, 3, 4, 5}
450451
},
451452
cache.PutMultipleCalls
452453
);
453454
AssertEquivalent(
454455
ids,
455456
new[]
456457
{
457-
new[] {0, 1, 2},
458-
new[] {3, 4, 5}
458+
new[] {0, 1, 2, 3, 4, 5}
459459
},
460460
cache.LockMultipleCalls
461461
);
462462
AssertEquivalent(
463463
ids,
464464
new[]
465465
{
466-
new[] {0, 1, 2},
467-
new[] {3, 4, 5}
466+
new[] {0, 1, 2, 3, 4, 5}
468467
},
469468
cache.UnlockMultipleCalls
470469
);

src/NHibernate/Async/Cache/CacheBatcher.cs

Lines changed: 14 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
//------------------------------------------------------------------------------
99

1010

11+
using System.Collections.Generic;
1112
using System.Diagnostics;
13+
using System.Linq;
1214
using NHibernate.Engine;
1315
using NHibernate.Persister.Collection;
1416
using NHibernate.Persister.Entity;
@@ -21,59 +23,13 @@ public sealed partial class CacheBatcher
2123
{
2224

2325
/// <summary>
24-
/// Adds a put operation to the batch. If the batch size reached the persister batch
25-
/// size, the batch will be executed.
26-
/// </summary>
27-
/// <param name="persister">The entity persister.</param>
28-
/// <param name="data">The data to put in the cache.</param>
29-
/// <param name="cancellationToken">A cancellation token that can be used to cancel the work</param>
30-
internal async Task AddToBatchAsync(IEntityPersister persister, CachePutData data, CancellationToken cancellationToken)
31-
{
32-
cancellationToken.ThrowIfCancellationRequested();
33-
if (ShouldExecuteBatch(persister, _putBatch))
34-
{
35-
await (ExecuteBatchAsync(cancellationToken)).ConfigureAwait(false);
36-
_currentPersister = persister;
37-
_currentBatch = _putBatch = new CachePutBatch(_session, persister.Cache);
38-
}
39-
if (Log.IsDebugEnabled())
40-
{
41-
Log.Debug("Adding a put operation to batch for entity {0} and key {1}", persister.EntityName, data.Key);
42-
}
43-
_putBatch.Add(data);
44-
}
45-
46-
/// <summary>
47-
/// Adds a put operation to the batch. If the batch size reached the persister batch
48-
/// size, the batch will be executed.
49-
/// </summary>
50-
/// <param name="persister">The collection persister.</param>
51-
/// <param name="data">The data to put in the cache.</param>
52-
/// <param name="cancellationToken">A cancellation token that can be used to cancel the work</param>
53-
internal async Task AddToBatchAsync(ICollectionPersister persister, CachePutData data, CancellationToken cancellationToken)
54-
{
55-
cancellationToken.ThrowIfCancellationRequested();
56-
if (ShouldExecuteBatch(persister, _putBatch))
57-
{
58-
await (ExecuteBatchAsync(cancellationToken)).ConfigureAwait(false);
59-
_currentPersister = persister;
60-
_currentBatch = _putBatch = new CachePutBatch(_session, persister.Cache);
61-
}
62-
if (Log.IsDebugEnabled())
63-
{
64-
Log.Debug("Adding a put operation to batch for collection role {0} and key {1}", persister.Role, data.Key);
65-
}
66-
_putBatch.Add(data);
67-
}
68-
69-
/// <summary>
70-
/// Executes the current batch.
26+
/// Executes the pending batches.
7127
/// </summary>
7228
/// <param name="cancellationToken">A cancellation token that can be used to cancel the work</param>
7329
internal async Task ExecuteBatchAsync(CancellationToken cancellationToken)
7430
{
7531
cancellationToken.ThrowIfCancellationRequested();
76-
if (_currentBatch == null || _currentBatch.BatchSize == 0)
32+
if (_putBatches.Count == 0)
7733
{
7834
return;
7935
}
@@ -85,10 +41,18 @@ internal async Task ExecuteBatchAsync(CancellationToken cancellationToken)
8541
{
8642
duration = Stopwatch.StartNew();
8743
}
88-
await (_currentBatch.ExecuteAsync(cancellationToken)).ConfigureAwait(false);
44+
45+
foreach (var batch in _putBatches.Values)
46+
{
47+
await (batch.ExecuteAsync(cancellationToken)).ConfigureAwait(false);
48+
}
49+
8950
if (Log.IsDebugEnabled() && duration != null)
9051
{
91-
Log.Debug("ExecuteBatch for {0} keys took {1} ms", _currentBatch.BatchSize, duration.ElapsedMilliseconds);
52+
Log.Debug(
53+
"ExecuteBatch for {0} batches totaling {1} keys took {2} ms",
54+
_putBatches.Count, _putBatches.Values.Sum(b => b.BatchSize),
55+
duration.ElapsedMilliseconds);
9256
}
9357
}
9458
finally

src/NHibernate/Cache/CacheBatcher.cs

Lines changed: 34 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,20 @@
1-
using System.Diagnostics;
1+
using System.Collections.Generic;
2+
using System.Diagnostics;
3+
using System.Linq;
24
using NHibernate.Engine;
35
using NHibernate.Persister.Collection;
46
using NHibernate.Persister.Entity;
57

68
namespace NHibernate.Cache
79
{
810
/// <summary>
9-
/// A batcher for batching operations of <see cref="ICacheConcurrencyStrategy"/>, where the batch size is retrieved
10-
/// from an <see cref="IEntityPersister"/> or <see cref="ICollectionPersister"/>.
11-
/// When a different persister or a different operation is added to the batch, the current batch will be executed.
11+
/// A batcher for batching operations of <see cref="ICacheConcurrencyStrategy"/>.
1212
/// </summary>
1313
public sealed partial class CacheBatcher
1414
{
15-
private CachePutBatch _putBatch;
15+
private readonly Dictionary<ICacheConcurrencyStrategy, CachePutBatch> _putBatches =
16+
new Dictionary<ICacheConcurrencyStrategy, CachePutBatch>();
1617
private readonly ISessionImplementor _session;
17-
private AbstractCacheBatch _currentBatch;
18-
private object _currentPersister;
1918

2019
private static readonly INHibernateLogger Log = NHibernateLogger.For(typeof(CacheBatcher));
2120

@@ -25,53 +24,50 @@ internal CacheBatcher(ISessionImplementor session)
2524
}
2625

2726
/// <summary>
28-
/// Adds a put operation to the batch. If the batch size reached the persister batch
29-
/// size, the batch will be executed.
27+
/// Adds a put operation to the batch.
3028
/// </summary>
3129
/// <param name="persister">The entity persister.</param>
3230
/// <param name="data">The data to put in the cache.</param>
3331
internal void AddToBatch(IEntityPersister persister, CachePutData data)
3432
{
35-
if (ShouldExecuteBatch(persister, _putBatch))
36-
{
37-
ExecuteBatch();
38-
_currentPersister = persister;
39-
_currentBatch = _putBatch = new CachePutBatch(_session, persister.Cache);
40-
}
4133
if (Log.IsDebugEnabled())
4234
{
4335
Log.Debug("Adding a put operation to batch for entity {0} and key {1}", persister.EntityName, data.Key);
4436
}
45-
_putBatch.Add(data);
37+
AddToBatch(persister.Cache, data);
4638
}
4739

4840
/// <summary>
49-
/// Adds a put operation to the batch. If the batch size reached the persister batch
50-
/// size, the batch will be executed.
41+
/// Adds a put operation to the batch.
5142
/// </summary>
5243
/// <param name="persister">The collection persister.</param>
5344
/// <param name="data">The data to put in the cache.</param>
5445
internal void AddToBatch(ICollectionPersister persister, CachePutData data)
5546
{
56-
if (ShouldExecuteBatch(persister, _putBatch))
57-
{
58-
ExecuteBatch();
59-
_currentPersister = persister;
60-
_currentBatch = _putBatch = new CachePutBatch(_session, persister.Cache);
61-
}
6247
if (Log.IsDebugEnabled())
6348
{
6449
Log.Debug("Adding a put operation to batch for collection role {0} and key {1}", persister.Role, data.Key);
6550
}
66-
_putBatch.Add(data);
51+
AddToBatch(persister.Cache, data);
52+
}
53+
54+
private void AddToBatch(ICacheConcurrencyStrategy cache, CachePutData data)
55+
{
56+
if (!_putBatches.TryGetValue(cache, out var cachePutBatch))
57+
{
58+
cachePutBatch = new CachePutBatch(_session, cache);
59+
_putBatches.Add(cache, cachePutBatch);
60+
}
61+
62+
cachePutBatch.Add(data);
6763
}
6864

6965
/// <summary>
70-
/// Executes the current batch.
66+
/// Executes the pending batches.
7167
/// </summary>
7268
internal void ExecuteBatch()
7369
{
74-
if (_currentBatch == null || _currentBatch.BatchSize == 0)
70+
if (_putBatches.Count == 0)
7571
{
7672
return;
7773
}
@@ -83,10 +79,18 @@ internal void ExecuteBatch()
8379
{
8480
duration = Stopwatch.StartNew();
8581
}
86-
_currentBatch.Execute();
82+
83+
foreach (var batch in _putBatches.Values)
84+
{
85+
batch.Execute();
86+
}
87+
8788
if (Log.IsDebugEnabled() && duration != null)
8889
{
89-
Log.Debug("ExecuteBatch for {0} keys took {1} ms", _currentBatch.BatchSize, duration.ElapsedMilliseconds);
90+
Log.Debug(
91+
"ExecuteBatch for {0} batches totaling {1} keys took {2} ms",
92+
_putBatches.Count, _putBatches.Values.Sum(b => b.BatchSize),
93+
duration.ElapsedMilliseconds);
9094
}
9195
}
9296
finally
@@ -100,22 +104,7 @@ internal void ExecuteBatch()
100104
/// </summary>
101105
internal void Cleanup()
102106
{
103-
_putBatch = null;
104-
105-
_currentBatch = null;
106-
_currentPersister = null;
107-
}
108-
109-
private bool ShouldExecuteBatch(IEntityPersister persister, AbstractCacheBatch batch)
110-
{
111-
return batch != _currentBatch || _currentPersister != persister ||
112-
_currentBatch.BatchSize >= persister.GetBatchSize();
113-
}
114-
115-
private bool ShouldExecuteBatch(ICollectionPersister persister, AbstractCacheBatch batch)
116-
{
117-
return batch != _currentBatch || _currentPersister != persister ||
118-
_currentBatch.BatchSize >= persister.GetBatchSize();
107+
_putBatches.Clear();
119108
}
120109
}
121110
}

0 commit comments

Comments
 (0)