Skip to content

Commit 3fb3f32

Browse files
committed
Temp
1 parent de0fb5f commit 3fb3f32

File tree

6 files changed

+246
-362
lines changed

6 files changed

+246
-362
lines changed

src/Elastic.Clients.Elasticsearch/_Shared/Api/BulkRequest.cs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -163,41 +163,41 @@ public BulkRequestDescriptor Delete<TSource>(TSource documentToDelete, Action<Bu
163163
public BulkRequestDescriptor Delete<TSource>(Action<BulkDeleteOperationDescriptor> configure) => Delete(configure);
164164

165165
public BulkRequestDescriptor CreateMany<TSource>(IEnumerable<TSource> documents, Action<BulkCreateOperationDescriptor<TSource>, TSource> bulkCreateSelector) =>
166-
AddOperations(documents, bulkCreateSelector, o => new BulkCreateOperationDescriptor<TSource>(o));
166+
AddOperations(documents, bulkCreateSelector, o => new BulkCreateOperationDescriptor<TSource>(o), x => x.Instance);
167167

168168
public BulkRequestDescriptor CreateMany<TSource>(IEnumerable<TSource> documents) =>
169-
AddOperations(documents, null, o => new BulkCreateOperationDescriptor<TSource>(o));
169+
AddOperations(documents, null, o => new BulkCreateOperationDescriptor<TSource>(o), x => x.Instance);
170170

171171
public BulkRequestDescriptor IndexMany<TSource>(IEnumerable<TSource> documents, Action<BulkIndexOperationDescriptor<TSource>, TSource> bulkIndexSelector) =>
172-
AddOperations(documents, bulkIndexSelector, o => new BulkIndexOperationDescriptor<TSource>(o));
172+
AddOperations(documents, bulkIndexSelector, o => new BulkIndexOperationDescriptor<TSource>(o), x => x.Instance);
173173

174174
public BulkRequestDescriptor IndexMany<TSource>(IEnumerable<TSource> documents) =>
175-
AddOperations(documents, null, o => new BulkIndexOperationDescriptor<TSource>(o));
175+
AddOperations(documents, null, o => new BulkIndexOperationDescriptor<TSource>(o), x => x.Instance);
176176

177177
public BulkRequestDescriptor UpdateMany<TSource>(IEnumerable<TSource> objects, Action<BulkUpdateOperationDescriptor<TSource, TSource>, TSource> bulkIndexSelector) =>
178-
AddOperations(objects, bulkIndexSelector, o => new BulkUpdateOperationDescriptor<TSource, TSource>().IdFrom(o));
178+
AddOperations(objects, bulkIndexSelector, o => new BulkUpdateOperationDescriptor<TSource, TSource>().IdFrom(o), x => x.Instance);
179179

180180
public BulkRequestDescriptor UpdateMany<TSource>(IEnumerable<TSource> objects) =>
181-
AddOperations(objects, null, o => new BulkUpdateOperationDescriptor<TSource, TSource>().IdFrom(o));
181+
AddOperations(objects, null, o => new BulkUpdateOperationDescriptor<TSource, TSource>().IdFrom(o), x => x.Instance);
182182

183183
public BulkRequestDescriptor DeleteMany<T>(IEnumerable<T> objects, Action<BulkDeleteOperationDescriptor, T> bulkDeleteSelector) =>
184-
AddOperations(objects, bulkDeleteSelector, obj => new BulkDeleteOperationDescriptor(new Id(obj)));
184+
AddOperations(objects, bulkDeleteSelector, obj => new BulkDeleteOperationDescriptor(new Id(obj)), x => x.Instance);
185185

186186
public BulkRequestDescriptor DeleteMany(IEnumerable<Id> ids, Action<BulkDeleteOperationDescriptor, Id> bulkDeleteSelector) =>
187-
AddOperations(ids, bulkDeleteSelector, id => new BulkDeleteOperationDescriptor(id));
187+
AddOperations(ids, bulkDeleteSelector, id => new BulkDeleteOperationDescriptor(id), x => x.Instance);
188188

189189
public BulkRequestDescriptor DeleteMany<T>(IEnumerable<T> objects) =>
190-
AddOperations(objects, null, obj => new BulkDeleteOperationDescriptor<T>(obj));
190+
AddOperations(objects, null, obj => new BulkDeleteOperationDescriptor<T>(obj), x => x.Instance);
191191

192192
public BulkRequestDescriptor DeleteMany(IndexName index, IEnumerable<Id> ids) =>
193-
AddOperations(ids, null, id => new BulkDeleteOperationDescriptor(id).Index(index));
193+
AddOperations(ids, null, id => new BulkDeleteOperationDescriptor(id).Index(index), x => x.Instance);
194194

195195
public void Serialize(Stream stream, IElasticsearchClientSettings settings, SerializationFormatting formatting = SerializationFormatting.None)
196196
{
197197
if (_operations is null)
198198
return;
199199

200-
var index = Self.RouteValues.Get<IndexName>("index");
200+
var index = Instance.Index;
201201

202202
foreach (var op in _operations)
203203
{
@@ -219,7 +219,7 @@ public async Task SerializeAsync(Stream stream, IElasticsearchClientSettings set
219219
if (_operations is null)
220220
return;
221221

222-
var index = Self.RouteValues.Get<IndexName>("index");
222+
var index = Instance.Index;
223223

224224
foreach (var op in _operations)
225225
{
@@ -239,8 +239,8 @@ public async Task SerializeAsync(Stream stream, IElasticsearchClientSettings set
239239
private BulkRequestDescriptor AddOperations<TSource, TDescriptor>(
240240
IEnumerable<TSource> objects,
241241
Action<TDescriptor, TSource> configureDescriptor,
242-
Func<TSource, TDescriptor> createDescriptor
243-
) where TDescriptor : IBulkOperation
242+
Func<TSource, TDescriptor> createDescriptor,
243+
Func<TDescriptor, IBulkOperation> getInstance)
244244
{
245245
if (@objects == null)
246246
return this;
@@ -257,10 +257,10 @@ Func<TSource, TDescriptor> createDescriptor
257257
configureDescriptor(descriptor, o);
258258
}
259259

260-
operations.Add(descriptor);
260+
operations.Add(getInstance(descriptor));
261261
}
262262

263263
_operations.AddRange(operations);
264-
return Self;
264+
return this;
265265
}
266266
}

src/Elastic.Clients.Elasticsearch/_Shared/Api/Extensions/CreateIndexRequestDescriptorExtensions.cs

Lines changed: 38 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -6,46 +6,44 @@
66

77
namespace Elastic.Clients.Elasticsearch.IndexManagement;
88

9-
// TODO:
9+
public static class CreateIndexRequestDescriptorExtensions
10+
{
11+
/// <summary>
12+
/// Add multiple aliases to the index at creation time.
13+
/// </summary>
14+
/// <param name="descriptor">A descriptor for an index request.</param>
15+
/// <param name="aliasName">The name of the alias.</param>
16+
/// <returns>The <see cref="CreateIndexRequestDescriptor"/> to allow fluent chaining of calls to configure the indexing request.</returns>
17+
public static CreateIndexRequestDescriptor WithAlias(this CreateIndexRequestDescriptor descriptor, string aliasName)
18+
{
19+
#if NET8_0_OR_GREATER
20+
ArgumentException.ThrowIfNullOrEmpty(aliasName);
21+
#else
22+
if (string.IsNullOrEmpty(aliasName))
23+
throw new ArgumentNullException(nameof(aliasName));
24+
#endif
1025

11-
//public static class CreateIndexRequestDescriptorExtensions
12-
//{
13-
// /// <summary>
14-
// /// Add multiple aliases to the index at creation time.
15-
// /// </summary>
16-
// /// <param name="descriptor">A descriptor for an index request.</param>
17-
// /// <param name="aliasName">The name of the alias.</param>
18-
// /// <returns>The <see cref="CreateIndexRequestDescriptor"/> to allow fluent chaining of calls to configure the indexing request.</returns>
19-
// public static CreateIndexRequestDescriptor WithAlias(this CreateIndexRequestDescriptor descriptor, string aliasName)
20-
// {
21-
//#if NET8_0_OR_GREATER
22-
// ArgumentException.ThrowIfNullOrEmpty(aliasName);
23-
//#else
24-
// if (string.IsNullOrEmpty(aliasName))
25-
// throw new ArgumentNullException(nameof(aliasName));
26-
//#endif
26+
descriptor.Aliases(x => x.Add(aliasName));
27+
return descriptor;
28+
}
2729

28-
// descriptor.Aliases(a => a.Add(aliasName, static _ => { }));
29-
// return descriptor;
30-
// }
30+
/// <summary>
31+
/// Adds an alias to the index at creation time.
32+
/// </summary>
33+
/// <typeparam name="TDocument">The type representing documents stored in this index.</typeparam>
34+
/// <param name="descriptor">A fluent descriptor for an index request.</param>
35+
/// <param name="aliasName">The name of the alias.</param>
36+
/// <returns>The <see cref="CreateIndexRequestDescriptor{TDocument}"/> to allow fluent chaining of calls to configure the indexing request.</returns>
37+
public static CreateIndexRequestDescriptor<TDocument> WithAlias<TDocument>(this CreateIndexRequestDescriptor<TDocument> descriptor, string aliasName)
38+
{
39+
#if NET8_0_OR_GREATER
40+
ArgumentException.ThrowIfNullOrEmpty(aliasName);
41+
#else
42+
if (string.IsNullOrEmpty(aliasName))
43+
throw new ArgumentNullException(nameof(aliasName));
44+
#endif
3145

32-
// /// <summary>
33-
// /// Adds an alias to the index at creation time.
34-
// /// </summary>
35-
// /// <typeparam name="TDocument">The type representing documents stored in this index.</typeparam>
36-
// /// <param name="descriptor">A fluent descriptor for an index request.</param>
37-
// /// <param name="aliasName">The name of the alias.</param>
38-
// /// <returns>The <see cref="CreateIndexRequestDescriptor{TDocument}"/> to allow fluent chaining of calls to configure the indexing request.</returns>
39-
// public static CreateIndexRequestDescriptor<TDocument> WithAlias<TDocument>(this CreateIndexRequestDescriptor<TDocument> descriptor, string aliasName)
40-
// {
41-
//#if NET8_0_OR_GREATER
42-
// ArgumentException.ThrowIfNullOrEmpty(aliasName);
43-
//#else
44-
// if (string.IsNullOrEmpty(aliasName))
45-
// throw new ArgumentNullException(nameof(aliasName));
46-
//#endif
47-
48-
// descriptor.Aliases(a => a.Add(aliasName, static _ => { }));
49-
// return descriptor;
50-
// }
51-
//}
46+
descriptor.Aliases(x => x.Add(aliasName));
47+
return descriptor;
48+
}
49+
}

src/Elastic.Clients.Elasticsearch/_Shared/Helpers/BulkAllRequest.cs

Lines changed: 101 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@
44

55
using System;
66
using System.Collections.Generic;
7-
using System.Text.Json;
7+
88
using Elastic.Transport;
99
using Elastic.Clients.Elasticsearch.Core.Bulk;
1010

1111
namespace Elastic.Clients.Elasticsearch;
1212

13-
public sealed class BulkAllRequest<T> : IBulkAllRequest<T>, IHelperCallable
13+
public sealed class BulkAllRequest<T> :
14+
IHelperCallable
1415
{
1516
public BulkAllRequest(IEnumerable<T> documents)
1617
{
@@ -58,97 +59,121 @@ public BulkAllRequest(IEnumerable<T> documents)
5859
RequestMetaData IHelperCallable.ParentMetaData { get => ParentMetaData; set => ParentMetaData = value; }
5960
}
6061

61-
// TODO:
62-
63-
//public sealed class BulkAllRequestDescriptor<T> : SerializableDescriptor<BulkAllRequestDescriptor<T>>, IBulkAllRequest<T>, IHelperCallable
64-
//{
65-
// private readonly IEnumerable<T> _documents;
66-
67-
// private int? _backOffRetries;
68-
// private Duration _backOffTime;
69-
// private ProducerConsumerBackPressure _backPressure;
70-
// private Action<BulkResponse> _bulkResponseCallback;
71-
// private IndexName _index;
72-
// private int? _maxDegreeOfParallism;
73-
// private int? _size;
74-
// private bool _refreshOnCompleted;
75-
// private Action<BulkRequestDescriptor, IList<T>> _bufferToBulk;
76-
// private Func<ResponseItem, T, bool> _retryDocumentPredicate;
77-
// private Action<ResponseItem, T> _droppedDocumentCallback;
78-
// private Routing _routing;
79-
// private bool _continueAfterDroppedDocuments;
80-
// private string _pipeline;
81-
// private Indices _refreshIndices;
82-
// private Duration _timeout;
83-
// private WaitForActiveShards? _waitForActiveShards;
84-
// private RequestMetaData _requestMetaData;
8562

86-
// public BulkAllRequestDescriptor(IEnumerable<T> documents)
87-
// {
88-
// _documents = documents;
89-
// _index = typeof(T);
90-
// }
91-
92-
// int? IBulkAllRequest<T>.BackOffRetries => _backOffRetries;
93-
// Duration? IBulkAllRequest<T>.BackOffTime => _backOffTime;
94-
// ProducerConsumerBackPressure? IBulkAllRequest<T>.BackPressure => _backPressure;
95-
// Action<BulkRequestDescriptor, IList<T>>? IBulkAllRequest<T>.BufferToBulk => _bufferToBulk;
96-
// Action<BulkResponse>? IBulkAllRequest<T>.BulkResponseCallback => _bulkResponseCallback;
97-
// bool IBulkAllRequest<T>.ContinueAfterDroppedDocuments => _continueAfterDroppedDocuments;
98-
// IEnumerable<T> IBulkAllRequest<T>.Documents => _documents;
99-
// Action<ResponseItem, T>? IBulkAllRequest<T>.DroppedDocumentCallback => _droppedDocumentCallback;
100-
// IndexName IBulkAllRequest<T>.Index => _index;
101-
// int? IBulkAllRequest<T>.MaxDegreeOfParallelism => _maxDegreeOfParallism;
102-
// string? IBulkAllRequest<T>.Pipeline => _pipeline;
103-
// Indices? IBulkAllRequest<T>.RefreshIndices => _refreshIndices;
104-
// bool IBulkAllRequest<T>.RefreshOnCompleted => _refreshOnCompleted;
105-
// Func<ResponseItem, T, bool>? IBulkAllRequest<T>.RetryDocumentPredicate => _retryDocumentPredicate;
106-
// Routing? IBulkAllRequest<T>.Routing => _routing;
107-
// int? IBulkAllRequest<T>.Size => _size;
108-
// Duration? IBulkAllRequest<T>.Timeout => _timeout;
109-
// WaitForActiveShards? IBulkAllRequest<T>.WaitForActiveShards => _waitForActiveShards;
110-
// RequestMetaData IHelperCallable.ParentMetaData { get => _requestMetaData; set => _requestMetaData = value; }
63+
public sealed class BulkAllRequestDescriptor<T>
64+
{
65+
private BulkAllRequest<T> Instance { get; }
11166

112-
// public BulkAllRequestDescriptor<T> BackOffRetries(int? backOffRetries) => Assign(backOffRetries, (a, v) => a._backOffRetries = v);
67+
public BulkAllRequestDescriptor(IEnumerable<T> documents)
68+
{
69+
Instance = new(documents);
70+
}
11371

114-
// public BulkAllRequestDescriptor<T> BackOffTime(Duration? backOffTime) => Assign(backOffTime, (a, v) => a._backOffTime = v);
72+
public BulkAllRequestDescriptor<T> BackOffRetries(int? backOffRetries)
73+
{
74+
Instance.BackOffRetries = backOffRetries;
75+
return this;
76+
}
11577

116-
// public BulkAllRequestDescriptor<T> BackPressure(int maxConcurrency, int? backPressureFactor = null) =>
117-
// Assign(new ProducerConsumerBackPressure(backPressureFactor, maxConcurrency), (a, v) => a._backPressure = v);
78+
public BulkAllRequestDescriptor<T> BackOffTime(Duration? backOffTime)
79+
{
80+
Instance.BackOffTime = backOffTime;
81+
return this;
82+
}
11883

119-
// public BulkAllRequestDescriptor<T> BufferToBulk(Action<BulkRequestDescriptor, IList<T>> modifier) => Assign(modifier, (a, v) => a._bufferToBulk = v);
84+
public BulkAllRequestDescriptor<T> BackPressure(int maxConcurrency, int? backPressureFactor = null)
85+
{
86+
Instance.BackPressure = new ProducerConsumerBackPressure(backPressureFactor, maxConcurrency);
87+
return this;
88+
}
12089

121-
// public BulkAllRequestDescriptor<T> BulkResponseCallback(Action<BulkResponse> callback) =>
122-
// Assign(callback, (a, v) => a._bulkResponseCallback = v);
90+
public BulkAllRequestDescriptor<T> BufferToBulk(Action<BulkRequestDescriptor, IList<T>> modifier)
91+
{
92+
Instance.BufferToBulk = modifier;
93+
return this;
94+
}
12395

124-
// public BulkAllRequestDescriptor<T> ContinueAfterDroppedDocuments(bool proceed = true) => Assign(proceed, (a, v) => a._continueAfterDroppedDocuments = v);
96+
public BulkAllRequestDescriptor<T> BulkResponseCallback(Action<BulkResponse> callback)
97+
{
98+
Instance.BulkResponseCallback = callback;
99+
return this;
100+
}
125101

126-
// public BulkAllRequestDescriptor<T> DroppedDocumentCallback(Action<ResponseItem, T> callback) =>
127-
// Assign(callback, (a, v) => a._droppedDocumentCallback = v);
102+
public BulkAllRequestDescriptor<T> ContinueAfterDroppedDocuments(bool proceed = true)
103+
{
104+
Instance.ContinueAfterDroppedDocuments = proceed;
105+
return this;
106+
}
128107

129-
// public BulkAllRequestDescriptor<T> Index(IndexName index) => Assign(index, (a, v) => a._index = v);
108+
public BulkAllRequestDescriptor<T> DroppedDocumentCallback(Action<ResponseItem, T> callback)
109+
{
110+
Instance.DroppedDocumentCallback = callback;
111+
return this;
112+
}
130113

131-
// public BulkAllRequestDescriptor<T> Index<TOther>() where TOther : class => Assign(typeof(TOther), (a, v) => a._index = v);
114+
public BulkAllRequestDescriptor<T> Index(IndexName index)
115+
{
116+
Instance.Index = index;
117+
return this;
118+
}
132119

133-
// public BulkAllRequestDescriptor<T> MaxDegreeOfParallelism(int? parallelism) => Assign(parallelism, (a, v) => a._maxDegreeOfParallism = v);
120+
public BulkAllRequestDescriptor<T> Index<TOther>() where TOther : class
121+
{
122+
Instance.Index = typeof(TOther);
123+
return this;
124+
}
134125

135-
// public BulkAllRequestDescriptor<T> Pipeline(string pipeline) => Assign(pipeline, (a, v) => a._pipeline = v);
126+
public BulkAllRequestDescriptor<T> MaxDegreeOfParallelism(int? parallelism)
127+
{
128+
Instance.MaxDegreeOfParallelism = parallelism;
129+
return this;
130+
}
136131

137-
// public BulkAllRequestDescriptor<T> RefreshIndices(Indices indicesToRefresh) => Assign(indicesToRefresh, (a, v) => a._refreshIndices = v);
132+
public BulkAllRequestDescriptor<T> Pipeline(string pipeline)
133+
{
134+
Instance.Pipeline = pipeline;
135+
return this;
136+
}
138137

139-
// public BulkAllRequestDescriptor<T> RefreshOnCompleted(bool refreshOnCompleted = true) => Assign(refreshOnCompleted, (a, v) => a._refreshOnCompleted = v);
138+
public BulkAllRequestDescriptor<T> RefreshIndices(Indices indicesToRefresh)
139+
{
140+
Instance.RefreshIndices = indicesToRefresh;
141+
return this;
142+
}
140143

141-
// public BulkAllRequestDescriptor<T> RetryDocumentPredicate(Func<ResponseItem, T, bool> predicate) =>
142-
// Assign(predicate, (a, v) => a._retryDocumentPredicate = v);
144+
public BulkAllRequestDescriptor<T> RefreshOnCompleted(bool refreshOnCompleted = true)
145+
{
146+
Instance.RefreshOnCompleted = refreshOnCompleted;
147+
return this;
148+
}
143149

144-
// public BulkAllRequestDescriptor<T> Routing(Routing routing) => Assign(routing, (a, v) => a._routing = v);
150+
public BulkAllRequestDescriptor<T> RetryDocumentPredicate(Func<ResponseItem, T, bool> predicate)
151+
{
152+
Instance.RetryDocumentPredicate = predicate;
153+
return this;
154+
}
145155

146-
// public BulkAllRequestDescriptor<T> Size(int? size) => Assign(size, (a, v) => a._size = v);
156+
public BulkAllRequestDescriptor<T> Routing(Routing routing)
157+
{
158+
Instance.Routing = routing;
159+
return this;
160+
}
147161

148-
// public BulkAllRequestDescriptor<T> Timeout(Duration timeout) => Assign(timeout, (a, v) => a._timeout = v);
162+
public BulkAllRequestDescriptor<T> Size(int? size)
163+
{
164+
Instance.Size = size;
165+
return this;
166+
}
149167

150-
// public BulkAllRequestDescriptor<T> WaitForActiveShards(WaitForActiveShards? shards) => Assign(shards, (a, v) => a._waitForActiveShards = v);
168+
public BulkAllRequestDescriptor<T> Timeout(Duration timeout)
169+
{
170+
Instance.Timeout = timeout;
171+
return this;
172+
}
151173

152-
// // This descriptor is not serializable and gets converted to a BullAllObservable
153-
// protected override void Serialize(Utf8JsonWriter writer, JsonSerializerOptions options, IElasticsearchClientSettings settings) => throw new NotImplementedException();
154-
//}
174+
public BulkAllRequestDescriptor<T> WaitForActiveShards(WaitForActiveShards? shards)
175+
{
176+
Instance.WaitForActiveShards = shards;
177+
return this;
178+
}
179+
}

0 commit comments

Comments
 (0)