Skip to content

Commit bd554c5

Browse files
committed
refactor: use Clock protocol when available (#633)
1 parent a7f4246 commit bd554c5

File tree

10 files changed

+238
-38
lines changed

10 files changed

+238
-38
lines changed

Package.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,15 @@ let package = Package(
2828
.package(url: "https://github.com/pointfreeco/swift-custom-dump", from: "1.3.2"),
2929
.package(url: "https://github.com/pointfreeco/swift-snapshot-testing", from: "1.17.2"),
3030
.package(url: "https://github.com/pointfreeco/xctest-dynamic-overlay", from: "1.2.2"),
31+
.package(url: "https://github.com/pointfreeco/swift-clocks", from: "1.0.0"),
3132
],
3233
targets: [
3334
.target(
3435
name: "Helpers",
3536
dependencies: [
3637
.product(name: "ConcurrencyExtras", package: "swift-concurrency-extras"),
3738
.product(name: "HTTPTypes", package: "swift-http-types"),
39+
.product(name: "Clocks", package: "swift-clocks"),
3840
]
3941
),
4042
.testTarget(
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
//
2+
// AsyncValueSubject.swift
3+
// Supabase
4+
//
5+
// Created by Guilherme Souza on 31/10/24.
6+
//
7+
8+
import ConcurrencyExtras
9+
import Foundation
10+
11+
/// A thread-safe subject that wraps a single value and provides async access to its updates.
12+
/// Similar to Combine's CurrentValueSubject, but designed for async/await usage.
13+
package final class AsyncValueSubject<Value: Sendable>: Sendable {
14+
15+
/// Defines how values are buffered in the underlying AsyncStream.
16+
package typealias BufferingPolicy = AsyncStream<Value>.Continuation.BufferingPolicy
17+
18+
/// Internal state container for the subject.
19+
struct MutableState {
20+
var value: Value
21+
var continuations: [UInt: AsyncStream<Value>.Continuation] = [:]
22+
var count: UInt = 0
23+
}
24+
25+
let bufferingPolicy: BufferingPolicy
26+
let mutableState: LockIsolated<MutableState>
27+
28+
/// Creates a new AsyncValueSubject with an initial value.
29+
/// - Parameters:
30+
/// - initialValue: The initial value to store
31+
/// - bufferingPolicy: Determines how values are buffered in the AsyncStream (defaults to .unbounded)
32+
package init(_ initialValue: Value, bufferingPolicy: BufferingPolicy = .unbounded) {
33+
self.mutableState = LockIsolated(MutableState(value: initialValue))
34+
self.bufferingPolicy = bufferingPolicy
35+
}
36+
37+
deinit {
38+
finish()
39+
}
40+
41+
/// The current value stored in the subject.
42+
package var value: Value {
43+
mutableState.value
44+
}
45+
46+
/// Sends a new value to the subject and notifies all observers.
47+
/// - Parameter value: The new value to send
48+
package func yield(_ value: Value) {
49+
mutableState.withValue {
50+
$0.value = value
51+
52+
for (_, continuation) in $0.continuations {
53+
continuation.yield(value)
54+
}
55+
}
56+
}
57+
58+
/// Resume the task awaiting the next iteration point by having it return
59+
/// nil, which signifies the end of the iteration.
60+
///
61+
/// Calling this function more than once has no effect. After calling
62+
/// finish, the stream enters a terminal state and doesn't produce any
63+
/// additional elements.
64+
package func finish() {
65+
for (_, continuation) in mutableState.continuations {
66+
continuation.finish()
67+
}
68+
}
69+
70+
/// An AsyncStream that emits the current value and all subsequent updates.
71+
package var values: AsyncStream<Value> {
72+
AsyncStream(bufferingPolicy: bufferingPolicy) { continuation in
73+
insert(continuation)
74+
}
75+
}
76+
77+
/// Observes changes to the subject's value by executing the provided handler.
78+
/// - Parameters:
79+
/// - priority: The priority of the task that will observe changes (optional)
80+
/// - handler: A closure that will be called with each new value
81+
/// - Returns: A task that can be cancelled to stop observing changes
82+
@discardableResult
83+
package func onChange(
84+
priority: TaskPriority? = nil,
85+
_ handler: @escaping @Sendable (Value) -> Void
86+
) -> Task<Void, Never> {
87+
let stream = self.values
88+
return Task(priority: priority) {
89+
for await value in stream {
90+
if Task.isCancelled {
91+
break
92+
}
93+
handler(value)
94+
}
95+
}
96+
}
97+
98+
/// Adds a new continuation to the subject and yields the current value.
99+
private func insert(_ continuation: AsyncStream<Value>.Continuation) {
100+
mutableState.withValue { state in
101+
continuation.yield(state.value)
102+
let id = state.count + 1
103+
state.count = id
104+
state.continuations[id] = continuation
105+
106+
continuation.onTermination = { [weak self] _ in
107+
self?.remove(continuation: id)
108+
}
109+
}
110+
}
111+
112+
/// Removes a continuation when it's terminated.
113+
private func remove(continuation id: UInt) {
114+
mutableState.withValue {
115+
_ = $0.continuations.removeValue(forKey: id)
116+
}
117+
}
118+
}
119+
120+
extension AsyncValueSubject where Value == Void {
121+
package func yield() {
122+
self.yield(())
123+
}
124+
}

Sources/Helpers/Task+withTimeout.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ package func withTimeout<R: Sendable>(
2626
group.addTask {
2727
let interval = deadline.timeIntervalSinceNow
2828
if interval > 0 {
29-
try await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(interval))
29+
try await _clock.sleep(for: interval)
3030
}
3131
try Task.checkCancellation()
3232
throw TimeoutError()

Sources/Helpers/_Clock.swift

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
//
2+
// _Clock.swift
3+
// Supabase
4+
//
5+
// Created by Guilherme Souza on 08/01/25.
6+
//
7+
8+
import Clocks
9+
import Foundation
10+
11+
package protocol _Clock: Sendable {
12+
func sleep(for duration: TimeInterval) async throws
13+
}
14+
15+
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
16+
extension ContinuousClock: _Clock {
17+
package func sleep(for duration: TimeInterval) async throws {
18+
try await sleep(for: .seconds(duration))
19+
}
20+
}
21+
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
22+
extension TestClock<Duration>: _Clock {
23+
package func sleep(for duration: TimeInterval) async throws {
24+
try await sleep(for: .seconds(duration))
25+
}
26+
}
27+
28+
/// `_Clock` used on platforms where ``Clock`` protocol isn't available.
29+
struct FallbackClock: _Clock {
30+
func sleep(for duration: TimeInterval) async throws {
31+
try await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(duration))
32+
}
33+
}
34+
35+
// Resolves clock instance based on platform availability.
36+
let _resolveClock: @Sendable () -> any _Clock = {
37+
if #available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) {
38+
ContinuousClock()
39+
} else {
40+
FallbackClock()
41+
}
42+
}
43+
44+
// For overriding clock on tests, we use a mutable _clock in DEBUG builds.
45+
// nonisolated(unsafe) is safe to use if making sure we assign _clock once in test set up.
46+
//
47+
// _clock is read-only in RELEASE builds.
48+
#if DEBUG
49+
nonisolated(unsafe) package var _clock = _resolveClock()
50+
#else
51+
package let _clock = _resolveClock()
52+
#endif

Sources/Realtime/V2/RealtimeChannelV2.swift

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,15 @@ public final class RealtimeChannelV2: Sendable {
4141
var joinRef: String? { mutableState.joinRef }
4242

4343
let callbackManager = CallbackManager()
44-
private let statusEventEmitter = EventEmitter<RealtimeChannelStatus>(initialEvent: .unsubscribed)
44+
private let statusSubject = AsyncValueSubject<RealtimeChannelStatus>(.unsubscribed)
4545

4646
public private(set) var status: RealtimeChannelStatus {
47-
get { statusEventEmitter.lastEvent }
48-
set { statusEventEmitter.emit(newValue) }
47+
get { statusSubject.value }
48+
set { statusSubject.yield(newValue) }
4949
}
5050

5151
public var statusChange: AsyncStream<RealtimeChannelStatus> {
52-
statusEventEmitter.stream()
52+
statusSubject.values
5353
}
5454

5555
/// Listen for connection status changes.
@@ -59,8 +59,9 @@ public final class RealtimeChannelV2: Sendable {
5959
/// - Note: Use ``statusChange`` if you prefer to use Async/Await.
6060
public func onStatusChange(
6161
_ listener: @escaping @Sendable (RealtimeChannelStatus) -> Void
62-
) -> ObservationToken {
63-
statusEventEmitter.attach(listener)
62+
) -> RealtimeSubscription {
63+
let task = statusSubject.onChange { listener($0) }
64+
return RealtimeSubscription { task.cancel() }
6465
}
6566

6667
init(

Sources/Realtime/V2/RealtimeClientV2.swift

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,19 +56,19 @@ public final class RealtimeClientV2: Sendable {
5656
)
5757
}
5858

59-
private let statusEventEmitter = EventEmitter<RealtimeClientStatus>(initialEvent: .disconnected)
59+
private let statusSubject = AsyncValueSubject<RealtimeClientStatus>(.disconnected)
6060

6161
/// Listen for connection status changes.
6262
///
6363
/// You can also use ``onStatusChange(_:)`` for a closure based method.
6464
public var statusChange: AsyncStream<RealtimeClientStatus> {
65-
statusEventEmitter.stream()
65+
statusSubject.values
6666
}
6767

6868
/// The current connection status.
6969
public private(set) var status: RealtimeClientStatus {
70-
get { statusEventEmitter.lastEvent }
71-
set { statusEventEmitter.emit(newValue) }
70+
get { statusSubject.value }
71+
set { statusSubject.yield(newValue) }
7272
}
7373

7474
/// Listen for connection status changes.
@@ -79,7 +79,8 @@ public final class RealtimeClientV2: Sendable {
7979
public func onStatusChange(
8080
_ listener: @escaping @Sendable (RealtimeClientStatus) -> Void
8181
) -> RealtimeSubscription {
82-
statusEventEmitter.attach(listener)
82+
let task = statusSubject.onChange { listener($0) }
83+
return RealtimeSubscription { task.cancel() }
8384
}
8485

8586
public convenience init(url: URL, options: RealtimeClientOptions) {
@@ -150,7 +151,7 @@ public final class RealtimeClientV2: Sendable {
150151
if status == .disconnected {
151152
let connectionTask = Task {
152153
if reconnect {
153-
try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(options.reconnectDelay))
154+
try? await _clock.sleep(for: options.reconnectDelay)
154155

155156
if Task.isCancelled {
156157
options.logger?.debug("Reconnect cancelled, returning")
@@ -348,7 +349,7 @@ public final class RealtimeClientV2: Sendable {
348349
private func startHeartbeating() {
349350
let heartbeatTask = Task { [weak self, options] in
350351
while !Task.isCancelled {
351-
try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(options.heartbeatInterval))
352+
try? await _clock.sleep(for: options.heartbeatInterval)
352353
if Task.isCancelled {
353354
break
354355
}

Supabase.xcworkspace/xcshareddata/swiftpm/Package.resolved

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Tests/IntegrationTests/RealtimeIntegrationTests.swift

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
// Created by Guilherme Souza on 27/03/24.
66
//
77

8+
import Clocks
89
import ConcurrencyExtras
910
import CustomDump
1011
import Helpers
@@ -22,20 +23,22 @@ struct TestLogger: SupabaseLogger {
2223
}
2324
}
2425

26+
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
2527
final class RealtimeIntegrationTests: XCTestCase {
2628

27-
static let reconnectDelay: TimeInterval = 1
29+
let testClock = TestClock<Duration>()
2830

2931
let client = SupabaseClient(
3032
supabaseURL: URL(string: DotEnv.SUPABASE_URL)!,
31-
supabaseKey: DotEnv.SUPABASE_ANON_KEY,
32-
options: SupabaseClientOptions(
33-
realtime: RealtimeClientOptions(
34-
reconnectDelay: reconnectDelay
35-
)
36-
)
33+
supabaseKey: DotEnv.SUPABASE_ANON_KEY
3734
)
3835

36+
override func setUp() {
37+
super.setUp()
38+
39+
_clock = testClock
40+
}
41+
3942
override func invokeTest() {
4043
withMainSerialExecutor {
4144
super.invokeTest()
@@ -49,8 +52,7 @@ final class RealtimeIntegrationTests: XCTestCase {
4952
client.realtimeV2.disconnect()
5053

5154
/// Wait for the reconnection delay
52-
try? await Task.sleep(
53-
nanoseconds: NSEC_PER_SEC * UInt64(Self.reconnectDelay) + 1)
55+
await testClock.advance(by: .seconds(RealtimeClientOptions.defaultReconnectDelay))
5456

5557
XCTAssertEqual(client.realtimeV2.status, .disconnected)
5658
}

0 commit comments

Comments
 (0)