From 88261c1d1f0c54d72a76e3903e3c6887319e78e8 Mon Sep 17 00:00:00 2001 From: Eric Jensen Date: Tue, 16 Apr 2024 13:05:47 -0400 Subject: [PATCH] Fix a threading issue with the Socket Channels array being accessed while it is being mutated --- Sources/SwiftPhoenixClient/Socket.swift | 21 +++++++++++-------- .../SynchronizedArray.swift | 8 +++++++ 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/Sources/SwiftPhoenixClient/Socket.swift b/Sources/SwiftPhoenixClient/Socket.swift index f24fdcf2..e65890d6 100644 --- a/Sources/SwiftPhoenixClient/Socket.swift +++ b/Sources/SwiftPhoenixClient/Socket.swift @@ -143,7 +143,8 @@ public class Socket: PhoenixTransportDelegate { let stateChangeCallbacks: StateChangeCallbacks = StateChangeCallbacks() /// Collection on channels created for the Socket - public internal(set) var channels: [Channel] = [] + public var channels: [Channel] { _channels.copy() } + private var _channels = SynchronizedArray() /// Buffers messages that need to be sent once the socket has connected. It is an array /// of tuples, with the ref of the message to send and the callback that will send the message. @@ -557,7 +558,7 @@ public class Socket: PhoenixTransportDelegate { public func channel(_ topic: String, params: [String: Any] = [:]) -> Channel { let channel = Channel(topic: topic, params: params, socket: self) - self.channels.append(channel) + _channels.append(channel) return channel } @@ -574,7 +575,7 @@ public class Socket: PhoenixTransportDelegate { /// - parameter channel: Channel to remove public func remove(_ channel: Channel) { self.off(channel.stateChangeRefs) - self.channels.removeAll(where: { $0.joinRef == channel.joinRef }) + _channels.removeAll(where: { $0.joinRef == channel.joinRef }) } /// Removes `onOpen`, `onClose`, `onError,` and `onMessage` registrations. @@ -710,17 +711,19 @@ public class Socket: PhoenixTransportDelegate { } // Dispatch the message to all channels that belong to the topic - self.channels - .filter( { $0.isMember(message) } ) - .forEach( { $0.trigger(message) } ) - + _channels.forEach { channel in + if channel.isMember(message) { + channel.trigger(message) + } + } + // Inform all onMessage callbacks of the message self.stateChangeCallbacks.message.forEach({ $0.callback.call(message) }) } /// Triggers an error event to all of the connected Channels internal func triggerChannelError() { - self.channels.forEach { (channel) in + _channels.forEach { channel in // Only trigger a channel error if it is in an "opened" state if !(channel.isErrored || channel.isLeaving || channel.isClosed) { channel.trigger(event: ChannelEvent.error) @@ -777,7 +780,7 @@ public class Socket: PhoenixTransportDelegate { // Leaves any channel that is open that has a duplicate topic internal func leaveOpenTopic(topic: String) { guard - let dupe = self.channels.first(where: { $0.topic == topic && ($0.isJoined || $0.isJoining) }) + let dupe = _channels.first(where: { $0.topic == topic && ($0.isJoined || $0.isJoining) }) else { return } self.logItems("transport", "leaving duplicate topic: [\(topic)]" ) diff --git a/Sources/SwiftPhoenixClient/SynchronizedArray.swift b/Sources/SwiftPhoenixClient/SynchronizedArray.swift index 25fadcc7..fd52b4cd 100644 --- a/Sources/SwiftPhoenixClient/SynchronizedArray.swift +++ b/Sources/SwiftPhoenixClient/SynchronizedArray.swift @@ -17,12 +17,20 @@ public class SynchronizedArray { self.array = array } + public func copy() -> [Element] { + queue.sync { self.array } + } + func append( _ newElement: Element) { queue.async(flags: .barrier) { self.array.append(newElement) } } + func first(where predicate: (Element) -> Bool) -> Element? { + queue.sync { self.array.first(where: predicate) } + } + func forEach(_ body: (Element) -> Void) { queue.sync { self.array }.forEach(body) }