merge development
This commit is contained in:
commit
f6dc11650d
@ -19,7 +19,7 @@ class SocketAckManagerTest: XCTestCase {
|
|||||||
callbackExpection.fulfill()
|
callbackExpection.fulfill()
|
||||||
}
|
}
|
||||||
ackManager.addAck(1, callback: callback)
|
ackManager.addAck(1, callback: callback)
|
||||||
ackManager.executeAck(1, items: itemsArray)
|
ackManager.executeAck(1, with: itemsArray, onQueue: DispatchQueue.main)
|
||||||
waitForExpectations(withTimeout: 3.0, handler: nil)
|
waitForExpectations(withTimeout: 3.0, handler: nil)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -21,8 +21,7 @@
|
|||||||
- (void)setUp {
|
- (void)setUp {
|
||||||
[super setUp];
|
[super setUp];
|
||||||
NSURL* url = [[NSURL alloc] initWithString:@"http://localhost"];
|
NSURL* url = [[NSURL alloc] initWithString:@"http://localhost"];
|
||||||
self.socket = [[SocketIOClient alloc] initWithSocketURL:url
|
self.socket = [[SocketIOClient alloc] initWithSocketURL:url options:nil];
|
||||||
options:@{@"handleQueue": dispatch_queue_create(nil, DISPATCH_QUEUE_SERIAL)}];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
- (void)testOnSyntax {
|
- (void)testOnSyntax {
|
||||||
|
|||||||
@ -16,8 +16,7 @@ class SocketSideEffectTest: XCTestCase {
|
|||||||
|
|
||||||
override func setUp() {
|
override func setUp() {
|
||||||
super.setUp()
|
super.setUp()
|
||||||
socket = SocketIOClient(socketURL: URL(string: "http://localhost/")!,
|
socket = SocketIOClient(socketURL: URL(string: "http://localhost/")!)
|
||||||
options: [.handleQueue(DispatchQueue(label: "handleQueue", attributes: .serial, target: nil))])
|
|
||||||
socket.setTestable()
|
socket.setTestable()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -57,16 +57,16 @@ struct SocketAckManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Should be called on handle queue
|
/// Should be called on handle queue
|
||||||
mutating func executeAck(_ ack: Int, items: [AnyObject]) {
|
mutating func executeAck(_ ack: Int, with items: [AnyObject], onQueue: DispatchQueue) {
|
||||||
let callback = acks.remove(SocketAck(ack: ack))
|
let ack = acks.remove(SocketAck(ack: ack))
|
||||||
|
|
||||||
callback?.callback(items)
|
onQueue.async() { ack?.callback(items) }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Should be called on handle queue
|
/// Should be called on handle queue
|
||||||
mutating func timeoutAck(_ ack: Int) {
|
mutating func timeoutAck(_ ack: Int, onQueue: DispatchQueue) {
|
||||||
let callback = acks.remove(SocketAck(ack: ack))
|
let ack = acks.remove(SocketAck(ack: ack))
|
||||||
|
|
||||||
callback?.callback(["NO ACK"])
|
onQueue.async() { ack?.callback(["NO ACK"]) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -49,6 +49,7 @@ public final class SocketIOClient : NSObject, SocketEngineClient, SocketParsable
|
|||||||
return nsp + "#" + (engine?.sid ?? "")
|
return nsp + "#" + (engine?.sid ?? "")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private let ackQueue = DispatchQueue(label: "com.socketio.ackQueue", attributes: .serial)
|
||||||
private let emitQueue = DispatchQueue(label: "com.socketio.emitQueue", attributes: .serial)
|
private let emitQueue = DispatchQueue(label: "com.socketio.emitQueue", attributes: .serial)
|
||||||
private let logType = "SocketIOClient"
|
private let logType = "SocketIOClient"
|
||||||
private let parseQueue = DispatchQueue(label: "com.socketio.parseQueue", attributes: .serial)
|
private let parseQueue = DispatchQueue(label: "com.socketio.parseQueue", attributes: .serial)
|
||||||
@ -60,7 +61,6 @@ public final class SocketIOClient : NSObject, SocketEngineClient, SocketParsable
|
|||||||
private var reconnecting = false
|
private var reconnecting = false
|
||||||
|
|
||||||
private(set) var currentAck = -1
|
private(set) var currentAck = -1
|
||||||
// Handle queue also controls access to ackManager
|
|
||||||
private(set) var handleQueue = DispatchQueue.main
|
private(set) var handleQueue = DispatchQueue.main
|
||||||
private(set) var reconnectAttempts = -1
|
private(set) var reconnectAttempts = -1
|
||||||
|
|
||||||
@ -160,20 +160,21 @@ public final class SocketIOClient : NSObject, SocketEngineClient, SocketParsable
|
|||||||
|
|
||||||
private func createOnAck(_ items: [AnyObject]) -> OnAckCallback {
|
private func createOnAck(_ items: [AnyObject]) -> OnAckCallback {
|
||||||
currentAck += 1
|
currentAck += 1
|
||||||
|
|
||||||
return {[weak self, ack = currentAck] timeout, callback in
|
return {[weak self, ack = currentAck] timeout, callback in
|
||||||
if let this = self {
|
if let this = self {
|
||||||
this.handleQueue.sync() {
|
this.ackQueue.sync() {
|
||||||
this.ackHandlers.addAck(ack, callback: callback)
|
this.ackHandlers.addAck(ack, callback: callback)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
this._emit(items, ack: ack)
|
this._emit(items, ack: ack)
|
||||||
|
|
||||||
if timeout != 0 {
|
if timeout != 0 {
|
||||||
let time = DispatchTime.now() + Double(Int64(timeout * NSEC_PER_SEC)) / Double(NSEC_PER_SEC)
|
let time = DispatchTime.now() + Double(Int64(timeout * NSEC_PER_SEC)) / Double(NSEC_PER_SEC)
|
||||||
|
|
||||||
this.handleQueue.after(when: time) {
|
this.handleQueue.after(when: time) {
|
||||||
this.ackHandlers.timeoutAck(ack)
|
this.ackHandlers.timeoutAck(ack, onQueue: this.handleQueue)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -299,7 +300,7 @@ public final class SocketIOClient : NSObject, SocketEngineClient, SocketParsable
|
|||||||
DefaultSocketLogger.Logger.log("Handling ack: %@ with data: %@", type: logType, args: ack, data)
|
DefaultSocketLogger.Logger.log("Handling ack: %@ with data: %@", type: logType, args: ack, data)
|
||||||
|
|
||||||
handleQueue.async() {
|
handleQueue.async() {
|
||||||
self.ackHandlers.executeAck(ack, items: data)
|
self.ackHandlers.executeAck(ack, with: data, onQueue: self.handleQueue)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user