Skip to content

Commit

Permalink
Merge pull request #255 from ejensen/channel-thread-safety
Browse files Browse the repository at this point in the history
Prevent crash caused by non-exclusive access to the Channels array
  • Loading branch information
dsrees authored Apr 17, 2024
2 parents 588bf6b + 88261c1 commit 7d23b4a
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 9 deletions.
21 changes: 12 additions & 9 deletions Sources/SwiftPhoenixClient/Socket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ public class Socket: PhoenixTransportDelegate {
let stateChangeCallbacks: StateChangeCallbacks = StateChangeCallbacks()

/// Collection on channels created for the Socket
public internal(set) var channels: [Channel] = []
public var channels: [Channel] { _channels.copy() }
private var _channels = SynchronizedArray<Channel>()

/// Buffers messages that need to be sent once the socket has connected. It is an array
/// of tuples, with the ref of the message to send and the callback that will send the message.
Expand Down Expand Up @@ -557,7 +558,7 @@ public class Socket: PhoenixTransportDelegate {
public func channel(_ topic: String,
params: [String: Any] = [:]) -> Channel {
let channel = Channel(topic: topic, params: params, socket: self)
self.channels.append(channel)
_channels.append(channel)

return channel
}
Expand All @@ -574,7 +575,7 @@ public class Socket: PhoenixTransportDelegate {
/// - parameter channel: Channel to remove
public func remove(_ channel: Channel) {
self.off(channel.stateChangeRefs)
self.channels.removeAll(where: { $0.joinRef == channel.joinRef })
_channels.removeAll(where: { $0.joinRef == channel.joinRef })
}

/// Removes `onOpen`, `onClose`, `onError,` and `onMessage` registrations.
Expand Down Expand Up @@ -710,17 +711,19 @@ public class Socket: PhoenixTransportDelegate {
}

// Dispatch the message to all channels that belong to the topic
self.channels
.filter( { $0.isMember(message) } )
.forEach( { $0.trigger(message) } )

_channels.forEach { channel in
if channel.isMember(message) {
channel.trigger(message)
}
}

// Inform all onMessage callbacks of the message
self.stateChangeCallbacks.message.forEach({ $0.callback.call(message) })
}

/// Triggers an error event to all of the connected Channels
internal func triggerChannelError() {
self.channels.forEach { (channel) in
_channels.forEach { channel in
// Only trigger a channel error if it is in an "opened" state
if !(channel.isErrored || channel.isLeaving || channel.isClosed) {
channel.trigger(event: ChannelEvent.error)
Expand Down Expand Up @@ -777,7 +780,7 @@ public class Socket: PhoenixTransportDelegate {
// Leaves any channel that is open that has a duplicate topic
internal func leaveOpenTopic(topic: String) {
guard
let dupe = self.channels.first(where: { $0.topic == topic && ($0.isJoined || $0.isJoining) })
let dupe = _channels.first(where: { $0.topic == topic && ($0.isJoined || $0.isJoining) })
else { return }

self.logItems("transport", "leaving duplicate topic: [\(topic)]" )
Expand Down
8 changes: 8 additions & 0 deletions Sources/SwiftPhoenixClient/SynchronizedArray.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,20 @@ public class SynchronizedArray<Element> {
self.array = array
}

public func copy() -> [Element] {
queue.sync { self.array }
}

func append( _ newElement: Element) {
queue.async(flags: .barrier) {
self.array.append(newElement)
}
}

func first(where predicate: (Element) -> Bool) -> Element? {
queue.sync { self.array.first(where: predicate) }
}

func forEach(_ body: (Element) -> Void) {
queue.sync { self.array }.forEach(body)
}
Expand Down

0 comments on commit 7d23b4a

Please sign in to comment.