SocketAckManager thread-safe acks removal

This commit is contained in:
Alexander Levin 2017-03-06 19:57:19 +02:00
parent fcb2b6bbf6
commit 99faafe723

View File

@ -51,6 +51,7 @@ private func ==(lhs: SocketAck, rhs: SocketAck) -> Bool {
struct SocketAckManager { struct SocketAckManager {
private var acks = Set<SocketAck>(minimumCapacity: 1) private var acks = Set<SocketAck>(minimumCapacity: 1)
private let ackSemaphore = DispatchSemaphore(value: 1)
mutating func addAck(_ ack: Int, callback: @escaping AckCallback) { mutating func addAck(_ ack: Int, callback: @escaping AckCallback) {
acks.insert(SocketAck(ack: ack, callback: callback)) acks.insert(SocketAck(ack: ack, callback: callback))
@ -58,6 +59,8 @@ struct SocketAckManager {
/// Should be called on handle queue /// Should be called on handle queue
mutating func executeAck(_ ack: Int, with items: [Any], onQueue: DispatchQueue) { mutating func executeAck(_ ack: Int, with items: [Any], onQueue: DispatchQueue) {
ackSemaphore.wait()
defer { ackSemaphore.signal() }
let ack = acks.remove(SocketAck(ack: ack)) let ack = acks.remove(SocketAck(ack: ack))
onQueue.async() { ack?.callback(items) } onQueue.async() { ack?.callback(items) }
@ -65,8 +68,12 @@ struct SocketAckManager {
/// Should be called on handle queue /// Should be called on handle queue
mutating func timeoutAck(_ ack: Int, onQueue: DispatchQueue) { mutating func timeoutAck(_ ack: Int, onQueue: DispatchQueue) {
ackSemaphore.wait()
defer { ackSemaphore.signal() }
let ack = acks.remove(SocketAck(ack: ack)) let ack = acks.remove(SocketAck(ack: ack))
onQueue.async() { ack?.callback?(["NO ACK"]) } onQueue.async() {
ack?.callback?(["NO ACK"])
}
} }
} }