Skip to content

Commit fcc299c

Browse files
committed
CSHARP-1449: Create sync versions of the new 2.x Driver APIs.
1 parent 8c5d847 commit fcc299c

25 files changed

+1991
-297
lines changed

src/MongoDB.Driver.Core/IAsyncCursorSource.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@ namespace MongoDB.Driver
2626
/// <typeparam name="TDocument">The type of the document.</typeparam>
2727
public interface IAsyncCursorSource<TDocument>
2828
{
29+
/// <summary>
30+
/// Executes the operation and returns a cursor to the results.
31+
/// </summary>
32+
/// <param name="cancellationToken">The cancellation token.</param>
33+
/// <returns>A cursor.</returns>
34+
IAsyncCursor<TDocument> ToCursor(CancellationToken cancellationToken = default(CancellationToken));
35+
2936
/// <summary>
3037
/// Executes the operation and returns a cursor to the results.
3138
/// </summary>
@@ -113,6 +120,21 @@ public static class IAsyncCursorSourceExtensions
113120
}
114121
}
115122

123+
/// <summary>
124+
/// Returns a list containing all the documents returned by the cursor returned by a cursor source.
125+
/// </summary>
126+
/// <typeparam name="TDocument">The type of the document.</typeparam>
127+
/// <param name="source">The source.</param>
128+
/// <param name="cancellationToken">The cancellation token.</param>
129+
/// <returns>The list of documents.</returns>
130+
public static List<TDocument> ToList<TDocument>(this IAsyncCursorSource<TDocument> source, CancellationToken cancellationToken = default(CancellationToken))
131+
{
132+
using (var cursor = source.ToCursor(cancellationToken))
133+
{
134+
return cursor.ToList(cancellationToken);
135+
}
136+
}
137+
116138
/// <summary>
117139
/// Returns a list containing all the documents returned by the cursor returned by a cursor source.
118140
/// </summary>

src/MongoDB.Driver/AggregateFluent.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,12 @@ public override IAggregateFluent<TNewResult> OfType<TNewResult>(IBsonSerializer<
146146
return AppendStage<TNewResult>(stage);
147147
}
148148

149+
public override IAsyncCursor<TResult> Out(string collectionName, CancellationToken cancellationToken)
150+
{
151+
return AppendStage<TResult>(new BsonDocument("$out", collectionName))
152+
.ToCursor(cancellationToken);
153+
}
154+
149155
public override Task<IAsyncCursor<TResult>> OutAsync(string collectionName, CancellationToken cancellationToken)
150156
{
151157
return AppendStage<TResult>(new BsonDocument("$out", collectionName))
@@ -233,6 +239,12 @@ public override IAggregateFluent<TNewResult> Unwind<TNewResult>(FieldDefinition<
233239
return AppendStage<TNewResult>(stage);
234240
}
235241

242+
public override IAsyncCursor<TResult> ToCursor(CancellationToken cancellationToken)
243+
{
244+
var pipeline = new PipelineStagePipelineDefinition<TDocument, TResult>(_stages);
245+
return _collection.Aggregate(pipeline, _options, cancellationToken);
246+
}
247+
236248
public override Task<IAsyncCursor<TResult>> ToCursorAsync(CancellationToken cancellationToken)
237249
{
238250
var pipeline = new PipelineStagePipelineDefinition<TDocument, TResult>(_stages);

src/MongoDB.Driver/AggregateFluentBase.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ public virtual IAggregateFluent<TNewResult> Lookup<TForeignCollection, TNewResul
5858
/// <inheritdoc />
5959
public abstract IAggregateFluent<TNewResult> OfType<TNewResult>(IBsonSerializer<TNewResult> newResultSerializer) where TNewResult : TResult;
6060

61+
/// <inheritdoc />
62+
public abstract IAsyncCursor<TResult> Out(string collectionName, CancellationToken cancellationToken);
63+
6164
/// <inheritdoc />
6265
public abstract Task<IAsyncCursor<TResult>> OutAsync(string collectionName, CancellationToken cancellationToken);
6366

@@ -80,7 +83,9 @@ public virtual IAggregateFluent<TNewResult> Unwind<TNewResult>(FieldDefinition<T
8083
}
8184

8285
/// <inheritdoc />
83-
public abstract Task<IAsyncCursor<TResult>> ToCursorAsync(CancellationToken cancellationToken);
86+
public abstract IAsyncCursor<TResult> ToCursor(CancellationToken cancellationToken);
8487

88+
/// <inheritdoc />
89+
public abstract Task<IAsyncCursor<TResult>> ToCursorAsync(CancellationToken cancellationToken);
8590
}
8691
}

src/MongoDB.Driver/AsyncCursorHelper.cs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,21 @@ public async static Task<bool> AnyAsync<T>(Task<IAsyncCursor<T>> cursorTask, Can
3737
}
3838
}
3939

40+
public static T First<T>(IAsyncCursor<T> cursor, CancellationToken cancellationToken)
41+
{
42+
using (cursor)
43+
{
44+
if (cursor.MoveNext(cancellationToken))
45+
{
46+
return cursor.Current.First();
47+
}
48+
else
49+
{
50+
throw new InvalidOperationException("The source sequence is empty.");
51+
}
52+
}
53+
}
54+
4055
public async static Task<T> FirstAsync<T>(Task<IAsyncCursor<T>> cursorTask, CancellationToken cancellationToken)
4156
{
4257
using (var cursor = await cursorTask.ConfigureAwait(false))
@@ -52,6 +67,21 @@ public async static Task<T> FirstAsync<T>(Task<IAsyncCursor<T>> cursorTask, Canc
5267
}
5368
}
5469

70+
public static T FirstOrDefault<T>(IAsyncCursor<T> cursor, CancellationToken cancellationToken)
71+
{
72+
using (cursor)
73+
{
74+
if (cursor.MoveNext(cancellationToken))
75+
{
76+
return cursor.Current.FirstOrDefault();
77+
}
78+
else
79+
{
80+
return default(T);
81+
}
82+
}
83+
}
84+
5585
public async static Task<T> FirstOrDefaultAsync<T>(Task<IAsyncCursor<T>> cursorTask, CancellationToken cancellationToken)
5686
{
5787
using (var cursor = await cursorTask.ConfigureAwait(false))
@@ -67,6 +97,21 @@ public async static Task<T> FirstOrDefaultAsync<T>(Task<IAsyncCursor<T>> cursorT
6797
}
6898
}
6999

100+
public static T Single<T>(IAsyncCursor<T> cursor, CancellationToken cancellationToken)
101+
{
102+
using (cursor)
103+
{
104+
if (cursor.MoveNext(cancellationToken))
105+
{
106+
return cursor.Current.Single();
107+
}
108+
else
109+
{
110+
throw new InvalidOperationException("The source sequence is empty.");
111+
}
112+
}
113+
}
114+
70115
public async static Task<T> SingleAsync<T>(Task<IAsyncCursor<T>> cursorTask, CancellationToken cancellationToken)
71116
{
72117
using (var cursor = await cursorTask.ConfigureAwait(false))
@@ -82,6 +127,21 @@ public async static Task<T> SingleAsync<T>(Task<IAsyncCursor<T>> cursorTask, Can
82127
}
83128
}
84129

130+
public static T SingleOrDefault<T>(IAsyncCursor<T> cursor, CancellationToken cancellationToken)
131+
{
132+
using (cursor)
133+
{
134+
if (cursor.MoveNext(cancellationToken))
135+
{
136+
return cursor.Current.SingleOrDefault();
137+
}
138+
else
139+
{
140+
return default(T);
141+
}
142+
}
143+
}
144+
85145
public async static Task<T> SingleOrDefaultAsync<T>(Task<IAsyncCursor<T>> cursorTask, CancellationToken cancellationToken)
86146
{
87147
using (var cursor = await cursorTask.ConfigureAwait(false))

src/MongoDB.Driver/FilteredMongoCollectionBase.cs

Lines changed: 69 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -73,63 +73,95 @@ protected IMongoCollection<TDocument> WrappedCollection
7373
}
7474

7575
// public methods
76-
public override Task<IAsyncCursor<TResult>> AggregateAsync<TResult>(PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
76+
public override IAsyncCursor<TResult> Aggregate<TResult>(PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
7777
{
78-
const string matchOperatorName = "$match";
79-
80-
var filterStage = new DelegatedPipelineStageDefinition<TDocument, TDocument>(
81-
matchOperatorName,
82-
(s, sr) =>
83-
{
84-
var renderedFilter = _filter.Render(s, sr);
85-
return new RenderedPipelineStageDefinition<TDocument>(matchOperatorName, new BsonDocument(matchOperatorName, renderedFilter), s);
86-
});
87-
88-
var filterPipeline = new PipelineStagePipelineDefinition<TDocument, TDocument>(new[] { filterStage });
89-
var combinedPipeline = new CombinedPipelineDefinition<TDocument, TDocument, TResult>(
90-
filterPipeline,
91-
pipeline);
78+
var filteredPipeline = CreateFilteredPipeline(pipeline);
79+
return _wrappedCollection.Aggregate(filteredPipeline, options, cancellationToken);
80+
}
9281

93-
var optimizedPipeline = new OptimizingPipelineDefinition<TDocument, TResult>(combinedPipeline);
82+
public override Task<IAsyncCursor<TResult>> AggregateAsync<TResult>(PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
83+
{
84+
var filteredPipeline = CreateFilteredPipeline(pipeline);
85+
return _wrappedCollection.AggregateAsync(filteredPipeline, options, cancellationToken);
86+
}
9487

95-
return _wrappedCollection.AggregateAsync(optimizedPipeline, options, cancellationToken);
88+
public override BulkWriteResult<TDocument> BulkWrite(IEnumerable<WriteModel<TDocument>> requests, BulkWriteOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
89+
{
90+
return _wrappedCollection.BulkWrite(CombineModelFilters(requests), options, cancellationToken);
9691
}
9792

9893
public override Task<BulkWriteResult<TDocument>> BulkWriteAsync(IEnumerable<WriteModel<TDocument>> requests, BulkWriteOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
9994
{
10095
return _wrappedCollection.BulkWriteAsync(CombineModelFilters(requests), options, cancellationToken);
10196
}
10297

98+
public override long Count(FilterDefinition<TDocument> filter, CountOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
99+
{
100+
return _wrappedCollection.Count(CombineFilters(filter), options, cancellationToken);
101+
}
102+
103103
public override Task<long> CountAsync(FilterDefinition<TDocument> filter, CountOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
104104
{
105105
return _wrappedCollection.CountAsync(CombineFilters(filter), options, cancellationToken);
106106
}
107107

108+
public override IAsyncCursor<TField> Distinct<TField>(FieldDefinition<TDocument, TField> field, FilterDefinition<TDocument> filter, DistinctOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
109+
{
110+
return _wrappedCollection.Distinct(field, CombineFilters(filter), options, cancellationToken);
111+
}
112+
108113
public override Task<IAsyncCursor<TField>> DistinctAsync<TField>(FieldDefinition<TDocument, TField> field, FilterDefinition<TDocument> filter, DistinctOptions options = null, CancellationToken cancellationToken = default(CancellationToken))
109114
{
110115
return _wrappedCollection.DistinctAsync(field, CombineFilters(filter), options, cancellationToken);
111116
}
112117

118+
public override IAsyncCursor<TProjection> FindSync<TProjection>(FilterDefinition<TDocument> filter, FindOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken))
119+
{
120+
return _wrappedCollection.FindSync(CombineFilters(filter), options, cancellationToken);
121+
}
122+
113123
public override Task<IAsyncCursor<TProjection>> FindAsync<TProjection>(FilterDefinition<TDocument> filter, FindOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken))
114124
{
115125
return _wrappedCollection.FindAsync(CombineFilters(filter), options, cancellationToken);
116126
}
117127

128+
public override TProjection FindOneAndDelete<TProjection>(FilterDefinition<TDocument> filter, FindOneAndDeleteOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken))
129+
{
130+
return _wrappedCollection.FindOneAndDelete(CombineFilters(filter), options, cancellationToken);
131+
}
132+
118133
public override Task<TProjection> FindOneAndDeleteAsync<TProjection>(FilterDefinition<TDocument> filter, FindOneAndDeleteOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken))
119134
{
120135
return _wrappedCollection.FindOneAndDeleteAsync(CombineFilters(filter), options, cancellationToken);
121136
}
122137

138+
public override TProjection FindOneAndReplace<TProjection>(FilterDefinition<TDocument> filter, TDocument replacement, FindOneAndReplaceOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken))
139+
{
140+
return _wrappedCollection.FindOneAndReplace(CombineFilters(filter), replacement, options, cancellationToken);
141+
}
142+
123143
public override Task<TProjection> FindOneAndReplaceAsync<TProjection>(FilterDefinition<TDocument> filter, TDocument replacement, FindOneAndReplaceOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken))
124144
{
125145
return _wrappedCollection.FindOneAndReplaceAsync(CombineFilters(filter), replacement, options, cancellationToken);
126146
}
127147

148+
public override TProjection FindOneAndUpdate<TProjection>(FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, FindOneAndUpdateOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken))
149+
{
150+
return _wrappedCollection.FindOneAndUpdate(CombineFilters(filter), update, options, cancellationToken);
151+
}
152+
128153
public override Task<TProjection> FindOneAndUpdateAsync<TProjection>(FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, FindOneAndUpdateOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken))
129154
{
130155
return _wrappedCollection.FindOneAndUpdateAsync(CombineFilters(filter), update, options, cancellationToken);
131156
}
132157

158+
public override IAsyncCursor<TResult> MapReduce<TResult>(BsonJavaScript map, BsonJavaScript reduce, MapReduceOptions<TDocument, TResult> options = null, CancellationToken cancellationToken = default(CancellationToken))
159+
{
160+
options = options ?? new MapReduceOptions<TDocument, TResult>();
161+
options.Filter = CombineFilters(options.Filter);
162+
return _wrappedCollection.MapReduce(map, reduce, options, cancellationToken);
163+
}
164+
133165
public override Task<IAsyncCursor<TResult>> MapReduceAsync<TResult>(BsonJavaScript map, BsonJavaScript reduce, MapReduceOptions<TDocument, TResult> options = null, CancellationToken cancellationToken = default(CancellationToken))
134166
{
135167
options = options ?? new MapReduceOptions<TDocument, TResult>();
@@ -181,5 +213,25 @@ private IEnumerable<WriteModel<TDocument>> CombineModelFilters(IEnumerable<Write
181213
}
182214
});
183215
}
216+
217+
private PipelineDefinition<TDocument, TResult> CreateFilteredPipeline<TResult>(PipelineDefinition<TDocument, TResult> pipeline)
218+
{
219+
const string matchOperatorName = "$match";
220+
221+
var filterStage = new DelegatedPipelineStageDefinition<TDocument, TDocument>(
222+
matchOperatorName,
223+
(s, sr) =>
224+
{
225+
var renderedFilter = _filter.Render(s, sr);
226+
return new RenderedPipelineStageDefinition<TDocument>(matchOperatorName, new BsonDocument(matchOperatorName, renderedFilter), s);
227+
});
228+
229+
var filterPipeline = new PipelineStagePipelineDefinition<TDocument, TDocument>(new[] { filterStage });
230+
var combinedPipeline = new CombinedPipelineDefinition<TDocument, TDocument, TResult>(
231+
filterPipeline,
232+
pipeline);
233+
234+
return new OptimizingPipelineDefinition<TDocument, TResult>(combinedPipeline);
235+
}
184236
}
185237
}

0 commit comments

Comments
 (0)