Skip to content

Commit 7d6870f

Browse files
authored
fix(realtime): make Push associated to MainActor (#721)
* fix(realtime): make Push associated to MainActor * code format * skip test in Linux
1 parent 899e2e5 commit 7d6870f

File tree

4 files changed

+300
-306
lines changed

4 files changed

+300
-306
lines changed

Sources/Realtime/PushV2.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ public enum PushStatus: String, Sendable {
1515
case timeout
1616
}
1717

18-
actor PushV2 {
18+
@MainActor
19+
final class PushV2 {
1920
private weak var channel: RealtimeChannelV2?
2021
let message: RealtimeMessageV2
2122

Sources/Realtime/RealtimeChannelV2.swift

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,15 @@ public final class RealtimeChannelV2: Sendable {
3232
var pushes: [String: PushV2] = [:]
3333
}
3434

35-
private let mutableState = LockIsolated(MutableState())
35+
@MainActor
36+
private var mutableState = MutableState()
3637

3738
let topic: String
3839
let config: RealtimeChannelConfig
3940
let logger: (any SupabaseLogger)?
4041
let socket: RealtimeClientV2
41-
var joinRef: String? { mutableState.joinRef }
42+
43+
@MainActor var joinRef: String? { mutableState.joinRef }
4244

4345
let callbackManager = CallbackManager()
4446
private let statusSubject = AsyncValueSubject<RealtimeChannelStatus>(.unsubscribed)
@@ -81,6 +83,7 @@ public final class RealtimeChannelV2: Sendable {
8183
}
8284

8385
/// Subscribes to the channel
86+
@MainActor
8487
public func subscribe() async {
8588
if socket.status != .connected {
8689
if socket.options.connectOnSubscribe != true {
@@ -109,7 +112,7 @@ public final class RealtimeChannelV2: Sendable {
109112
)
110113

111114
let joinRef = socket.makeRef()
112-
mutableState.withValue { $0.joinRef = joinRef }
115+
mutableState.joinRef = joinRef
113116

114117
logger?.debug("Subscribing to channel with body: \(joinConfig)")
115118

@@ -497,8 +500,8 @@ public final class RealtimeChannelV2: Sendable {
497500
filter: filter
498501
)
499502

500-
mutableState.withValue {
501-
$0.clientChanges.append(config)
503+
Task { @MainActor in
504+
mutableState.clientChanges.append(config)
502505
}
503506

504507
let id = callbackManager.addPostgresCallback(filter: config, callback: callback)
@@ -538,32 +541,28 @@ public final class RealtimeChannelV2: Sendable {
538541
self.onSystem { _ in callback() }
539542
}
540543

544+
@MainActor
541545
@discardableResult
542546
func push(_ event: String, ref: String? = nil, payload: JSONObject = [:]) async -> PushStatus {
543-
let push = mutableState.withValue {
544-
let message = RealtimeMessageV2(
545-
joinRef: $0.joinRef,
546-
ref: ref ?? socket.makeRef(),
547-
topic: self.topic,
548-
event: event,
549-
payload: payload
550-
)
551-
552-
let push = PushV2(channel: self, message: message)
553-
if let ref = message.ref {
554-
$0.pushes[ref] = push
555-
}
547+
let message = RealtimeMessageV2(
548+
joinRef: joinRef,
549+
ref: ref ?? socket.makeRef(),
550+
topic: self.topic,
551+
event: event,
552+
payload: payload
553+
)
556554

557-
return push
555+
let push = PushV2(channel: self, message: message)
556+
if let ref = message.ref {
557+
mutableState.pushes[ref] = push
558558
}
559559

560560
return await push.send()
561561
}
562562

563-
private func didReceiveReply(ref: String, status: String) async {
564-
let push = mutableState.withValue {
565-
$0.pushes.removeValue(forKey: ref)
566-
}
567-
await push?.didReceive(status: PushStatus(rawValue: status) ?? .ok)
563+
@MainActor
564+
private func didReceiveReply(ref: String, status: String) {
565+
let push = mutableState.pushes.removeValue(forKey: ref)
566+
push?.didReceive(status: PushStatus(rawValue: status) ?? .ok)
568567
}
569568
}

0 commit comments

Comments
 (0)