add optional write completion handler for emit's

This commit is contained in:
Andy 2018-09-29 15:47:30 +07:00
parent a3d443268a
commit f9d8595a25
10 changed files with 77 additions and 36 deletions

View File

@ -213,7 +213,7 @@ open class SocketIOClient : NSObject, SocketIOClientSpec {
/// - parameter items: The items to send with this event. May be left out. /// - parameter items: The items to send with this event. May be left out.
open func emit(_ event: String, _ items: SocketData...) { open func emit(_ event: String, _ items: SocketData...) {
do { do {
try emit(event, with: items.map({ try $0.socketRepresentation() })) try emit(event, with: items.map({ try $0.socketRepresentation() }), completion: {})
} catch { } catch {
DefaultSocketLogger.Logger.error("Error creating socketRepresentation for emit: \(event), \(items)", DefaultSocketLogger.Logger.error("Error creating socketRepresentation for emit: \(event), \(items)",
type: logType) type: logType)
@ -228,7 +228,17 @@ open class SocketIOClient : NSObject, SocketIOClientSpec {
/// - parameter items: The items to send with this event. Send an empty array to send no data. /// - parameter items: The items to send with this event. Send an empty array to send no data.
@objc @objc
open func emit(_ event: String, with items: [Any]) { open func emit(_ event: String, with items: [Any]) {
emit([event] + items) emit([event] + items, completion: {})
}
/// Same as emit, but meant for Objective-C
///
/// - parameter event: The event to send.
/// - parameter items: The items to send with this event. Send an empty array to send no data.
/// - parameter completion: Callback called on transport write completion.
@objc
open func emit(_ event: String, with items: [Any], completion: @escaping () -> ()) {
emit([event] + items, completion: completion)
} }
/// Sends a message to the server, requesting an ack. /// Sends a message to the server, requesting an ack.
@ -284,8 +294,9 @@ open class SocketIOClient : NSObject, SocketIOClientSpec {
return createOnAck([event] + items) return createOnAck([event] + items)
} }
func emit(_ data: [Any], ack: Int? = nil, binary: Bool = true, isAck: Bool = false) { func emit(_ data: [Any], ack: Int? = nil, binary: Bool = true, isAck: Bool = false, completion: (() -> ())? = nil) {
guard status == .connected else { guard status == .connected else {
completion?();
handleClientEvent(.error, data: ["Tried emitting when not connected"]) handleClientEvent(.error, data: ["Tried emitting when not connected"])
return return
} }
@ -295,7 +306,7 @@ open class SocketIOClient : NSObject, SocketIOClientSpec {
DefaultSocketLogger.Logger.log("Emitting: \(str), Ack: \(isAck)", type: logType) DefaultSocketLogger.Logger.log("Emitting: \(str), Ack: \(isAck)", type: logType)
manager?.engine?.send(str, withData: packet.binary) manager?.engine?.send(str, withData: packet.binary, completion: completion)
} }
/// Call when you wish to tell the server that you've received the event for `ack`. /// Call when you wish to tell the server that you've received the event for `ack`.

View File

@ -49,7 +49,7 @@ open class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePollable, So
/// A queue of engine.io messages waiting for POSTing /// A queue of engine.io messages waiting for POSTing
/// ///
/// **You should not touch this directly** /// **You should not touch this directly**
public var postWait = [String]() public var postWait = [Post]()
/// `true` if there is an outstanding poll. Trying to poll before the first is done will cause socket.io to /// `true` if there is an outstanding poll. Trying to poll before the first is done will cause socket.io to
/// disconnect us. /// disconnect us.
@ -340,7 +340,7 @@ open class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePollable, So
if polling { if polling {
disconnectPolling(reason: reason) disconnectPolling(reason: reason)
} else { } else {
sendWebSocketMessage("", withType: .close, withData: []) sendWebSocketMessage("", withType: .close, withData: [], completion: {})
closeOutEngine(reason: reason) closeOutEngine(reason: reason)
} }
} }
@ -348,7 +348,7 @@ open class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePollable, So
// We need to take special care when we're polling that we send it ASAP // We need to take special care when we're polling that we send it ASAP
// Also make sure we're on the emitQueue since we're touching postWait // Also make sure we're on the emitQueue since we're touching postWait
private func disconnectPolling(reason: String) { private func disconnectPolling(reason: String) {
postWait.append(String(SocketEnginePacketType.close.rawValue)) postWait.append((String(SocketEnginePacketType.close.rawValue), {}))
doRequest(for: createRequestForPostWithPostWait()) {_, _, _ in } doRequest(for: createRequestForPostWithPostWait()) {_, _, _ in }
closeOutEngine(reason: reason) closeOutEngine(reason: reason)
@ -366,7 +366,7 @@ open class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePollable, So
DefaultSocketLogger.Logger.log("Switching to WebSockets", type: SocketEngine.logType) DefaultSocketLogger.Logger.log("Switching to WebSockets", type: SocketEngine.logType)
sendWebSocketMessage("", withType: .upgrade, withData: []) sendWebSocketMessage("", withType: .upgrade, withData: [], completion: {})
polling = false polling = false
fastUpgrade = false fastUpgrade = false
probing = false probing = false
@ -384,7 +384,7 @@ open class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePollable, So
DefaultSocketLogger.Logger.log("Flushing probe wait", type: SocketEngine.logType) DefaultSocketLogger.Logger.log("Flushing probe wait", type: SocketEngine.logType)
for waiter in probeWait { for waiter in probeWait {
write(waiter.msg, withType: waiter.type, withData: waiter.data) write(waiter.msg, withType: waiter.type, withData: waiter.data, completion:waiter.completion)
} }
probeWait.removeAll(keepingCapacity: false) probeWait.removeAll(keepingCapacity: false)
@ -398,7 +398,7 @@ open class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePollable, So
guard let ws = self.ws else { return } guard let ws = self.ws else { return }
for msg in postWait { for msg in postWait {
ws.write(string: msg) ws.write(string: msg.msg, completion: msg.completion)
} }
postWait.removeAll(keepingCapacity: false) postWait.removeAll(keepingCapacity: false)
@ -544,7 +544,7 @@ open class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePollable, So
} }
pongsMissed += 1 pongsMissed += 1
write("", withType: .ping, withData: []) write("", withType: .ping, withData: [], completion: {})
engineQueue.asyncAfter(deadline: DispatchTime.now() + .milliseconds(pingInterval)) {[weak self, id = self.sid] in engineQueue.asyncAfter(deadline: DispatchTime.now() + .milliseconds(pingInterval)) {[weak self, id = self.sid] in
// Make sure not to ping old connections // Make sure not to ping old connections
@ -600,7 +600,7 @@ open class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePollable, So
DefaultSocketLogger.Logger.log("Upgrading transport to WebSockets", type: SocketEngine.logType) DefaultSocketLogger.Logger.log("Upgrading transport to WebSockets", type: SocketEngine.logType)
fastUpgrade = true fastUpgrade = true
sendPollMessage("", withType: .noop, withData: []) sendPollMessage("", withType: .noop, withData: [], completion: {})
// After this point, we should not send anymore polling messages // After this point, we should not send anymore polling messages
} }
} }
@ -610,11 +610,15 @@ open class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePollable, So
/// - parameter msg: The message to send. /// - parameter msg: The message to send.
/// - parameter type: The type of this message. /// - parameter type: The type of this message.
/// - parameter data: Any data that this message has. /// - parameter data: Any data that this message has.
open func write(_ msg: String, withType type: SocketEnginePacketType, withData data: [Data]) { /// - parameter completion: Callback called on transport write completion.
open func write(_ msg: String, withType type: SocketEnginePacketType, withData data: [Data], completion: @escaping () -> ()) {
engineQueue.async { engineQueue.async {
guard self.connected else { return } guard self.connected else {
completion()
return
}
guard !self.probing else { guard !self.probing else {
self.probeWait.append((msg, type, data)) self.probeWait.append((msg, type, data, completion))
return return
} }
@ -622,11 +626,11 @@ open class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePollable, So
if self.polling { if self.polling {
DefaultSocketLogger.Logger.log("Writing poll: \(msg) has data: \(data.count != 0)", DefaultSocketLogger.Logger.log("Writing poll: \(msg) has data: \(data.count != 0)",
type: SocketEngine.logType) type: SocketEngine.logType)
self.sendPollMessage(msg, withType: type, withData: data) self.sendPollMessage(msg, withType: type, withData: data, completion: completion)
} else { } else {
DefaultSocketLogger.Logger.log("Writing ws: \(msg) has data: \(data.count != 0)", DefaultSocketLogger.Logger.log("Writing ws: \(msg) has data: \(data.count != 0)",
type: SocketEngine.logType) type: SocketEngine.logType)
self.sendWebSocketMessage(msg, withType: type, withData: data) self.sendWebSocketMessage(msg, withType: type, withData: data, completion: completion)
} }
} }
} }

View File

@ -34,7 +34,7 @@ public protocol SocketEnginePollable : SocketEngineSpec {
/// A queue of engine.io messages waiting for POSTing /// A queue of engine.io messages waiting for POSTing
/// ///
/// **You should not touch this directly** /// **You should not touch this directly**
var postWait: [String] { get set } var postWait: [Post] { get set }
/// The URLSession that will be used for polling. /// The URLSession that will be used for polling.
var session: URLSession? { get } var session: URLSession? { get }
@ -65,7 +65,7 @@ public protocol SocketEnginePollable : SocketEngineSpec {
/// - parameter message: The message to send. /// - parameter message: The message to send.
/// - parameter withType: The type of message to send. /// - parameter withType: The type of message to send.
/// - parameter withData: The data associated with this message. /// - parameter withData: The data associated with this message.
func sendPollMessage(_ message: String, withType type: SocketEnginePacketType, withData datas: [Data]) func sendPollMessage(_ message: String, withType type: SocketEnginePacketType, withData datas: [Data], completion: @escaping () -> ())
/// Call to stop polling and invalidate the URLSession. /// Call to stop polling and invalidate the URLSession.
func stopPolling() func stopPolling()
@ -74,12 +74,15 @@ public protocol SocketEnginePollable : SocketEngineSpec {
// Default polling methods // Default polling methods
extension SocketEnginePollable { extension SocketEnginePollable {
func createRequestForPostWithPostWait() -> URLRequest { func createRequestForPostWithPostWait() -> URLRequest {
defer { postWait.removeAll(keepingCapacity: true) } defer {
for packet in postWait { packet.completion() }
postWait.removeAll(keepingCapacity: true)
}
var postStr = "" var postStr = ""
for packet in postWait { for packet in postWait {
postStr += "\(packet.utf16.count):\(packet)" postStr += "\(packet.msg.utf16.count):\(packet.msg)"
} }
DefaultSocketLogger.Logger.log("Created POST string: \(postStr)", type: "SocketEnginePolling") DefaultSocketLogger.Logger.log("Created POST string: \(postStr)", type: "SocketEnginePolling")
@ -209,14 +212,17 @@ extension SocketEnginePollable {
/// - parameter message: The message to send. /// - parameter message: The message to send.
/// - parameter withType: The type of message to send. /// - parameter withType: The type of message to send.
/// - parameter withData: The data associated with this message. /// - parameter withData: The data associated with this message.
public func sendPollMessage(_ message: String, withType type: SocketEnginePacketType, withData datas: [Data]) { /// - parameter completion: Callback called on transport write completion.
public func sendPollMessage(_ message: String, withType type: SocketEnginePacketType, withData datas: [Data], completion: @escaping () -> ()) {
DefaultSocketLogger.Logger.log("Sending poll: \(message) as type: \(type.rawValue)", type: "SocketEnginePolling") DefaultSocketLogger.Logger.log("Sending poll: \(message) as type: \(type.rawValue)", type: "SocketEnginePolling")
postWait.append(String(type.rawValue) + message) postWait.append((String(type.rawValue) + message, completion))
for data in datas { for data in datas {
if case let .right(bin) = createBinaryDataForSend(using: data) { if case let .right(bin) = createBinaryDataForSend(using: data) {
postWait.append(bin) // completion handler will be called on initial message write
// TODO: call completion after last message in batch
postWait.append((bin, {}))
} }
} }

View File

@ -137,7 +137,8 @@ import Starscream
/// - parameter msg: The message to send. /// - parameter msg: The message to send.
/// - parameter type: The type of this message. /// - parameter type: The type of this message.
/// - parameter data: Any data that this message has. /// - parameter data: Any data that this message has.
func write(_ msg: String, withType type: SocketEnginePacketType, withData data: [Data]) /// - parameter completion: Callback called on transport write completion.
func write(_ msg: String, withType type: SocketEnginePacketType, withData data: [Data], completion: @escaping () -> ())
} }
extension SocketEngineSpec { extension SocketEngineSpec {
@ -179,7 +180,7 @@ extension SocketEngineSpec {
} }
/// Send an engine message (4) /// Send an engine message (4)
func send(_ msg: String, withData datas: [Data]) { func send(_ msg: String, withData datas: [Data], completion: (() -> ())? = nil) {
write(msg, withType: .message, withData: datas) write(msg, withType: .message, withData: datas, completion: completion ?? {})
} }
} }

View File

@ -37,14 +37,15 @@ public protocol SocketEngineWebsocket : SocketEngineSpec {
/// - parameter message: The message to send. /// - parameter message: The message to send.
/// - parameter withType: The type of message to send. /// - parameter withType: The type of message to send.
/// - parameter withData: The data associated with this message. /// - parameter withData: The data associated with this message.
func sendWebSocketMessage(_ str: String, withType type: SocketEnginePacketType, withData datas: [Data]) /// - parameter completion: Callback called on transport write completion.
func sendWebSocketMessage(_ str: String, withType type: SocketEnginePacketType, withData datas: [Data], completion: @escaping () -> ())
} }
// WebSocket methods // WebSocket methods
extension SocketEngineWebsocket { extension SocketEngineWebsocket {
func probeWebSocket() { func probeWebSocket() {
if ws?.isConnected ?? false { if ws?.isConnected ?? false {
sendWebSocketMessage("probe", withType: .ping, withData: []) sendWebSocketMessage("probe", withType: .ping, withData: [], completion: {})
} }
} }
@ -55,14 +56,15 @@ extension SocketEngineWebsocket {
/// - parameter message: The message to send. /// - parameter message: The message to send.
/// - parameter withType: The type of message to send. /// - parameter withType: The type of message to send.
/// - parameter withData: The data associated with this message. /// - parameter withData: The data associated with this message.
public func sendWebSocketMessage(_ str: String, withType type: SocketEnginePacketType, withData datas: [Data]) { /// - parameter completion: Callback called on transport write completion.
public func sendWebSocketMessage(_ str: String, withType type: SocketEnginePacketType, withData datas: [Data], completion: @escaping () -> ()) {
DefaultSocketLogger.Logger.log("Sending ws: \(str) as type: \(type.rawValue)", type: "SocketEngineWebSocket") DefaultSocketLogger.Logger.log("Sending ws: \(str) as type: \(type.rawValue)", type: "SocketEngineWebSocket")
ws?.write(string: "\(type.rawValue)\(str)") ws?.write(string: "\(type.rawValue)\(str)")
for data in datas { for data in datas {
if case let .left(bin) = createBinaryDataForSend(using: data) { if case let .left(bin) = createBinaryDataForSend(using: data) {
ws?.write(data: bin) ws?.write(data: bin, completion: completion)
} }
} }
} }

View File

@ -290,7 +290,7 @@ open class SocketManager : NSObject, SocketManagerSpec, SocketParsable, SocketDa
/// - parameter items: The data to send with this event. /// - parameter items: The data to send with this event.
open func emitAll(_ event: String, withItems items: [Any]) { open func emitAll(_ event: String, withItems items: [Any]) {
forAll {socket in forAll {socket in
socket.emit(event, with: items) socket.emit(event, with: items, completion: {})
} }
} }

View File

@ -73,8 +73,11 @@ public typealias AckCallback = ([Any]) -> ()
/// A typealias for a normal callback. /// A typealias for a normal callback.
public typealias NormalCallback = ([Any], SocketAckEmitter) -> () public typealias NormalCallback = ([Any], SocketAckEmitter) -> ()
/// A typealias for a queued POST
public typealias Post = (msg: String, completion: (() -> ()))
typealias JSON = [String: Any] typealias JSON = [String: Any]
typealias Probe = (msg: String, type: SocketEnginePacketType, data: [Data]) typealias Probe = (msg: String, type: SocketEnginePacketType, data: [Data], completion: (() -> ()))
typealias ProbeWaitQueue = [Probe] typealias ProbeWaitQueue = [Probe]
enum Either<E, V> { enum Either<E, V> {

View File

@ -188,7 +188,7 @@ public class TestSocket : SocketIOClient {
super.didDisconnect(reason: reason) super.didDisconnect(reason: reason)
} }
public override func emit(_ event: String, with items: [Any]) { public override func emit(_ event: String, with items: [Any], completion: @escaping () -> ()) {
expectations[ManagerExpectation.emitAllEventCalled]?.fulfill() expectations[ManagerExpectation.emitAllEventCalled]?.fulfill()
expectations[ManagerExpectation.emitAllEventCalled] = nil expectations[ManagerExpectation.emitAllEventCalled] = nil

View File

@ -506,5 +506,5 @@ class TestEngine : SocketEngineSpec {
func flushWaitingForPostToWebSocket() { } func flushWaitingForPostToWebSocket() { }
func parseEngineData(_ data: Data) { } func parseEngineData(_ data: Data) { }
func parseEngineMessage(_ message: String) { } func parseEngineMessage(_ message: String) { }
func write(_ msg: String, withType type: SocketEnginePacketType, withData data: [Data]) { } func write(_ msg: String, withType type: SocketEnginePacketType, withData data: [Data], completion: @escaping () -> ()) { }
} }

View File

@ -67,6 +67,20 @@
[self.socket emit:@"testEmit" with:@[@YES]]; [self.socket emit:@"testEmit" with:@[@YES]];
} }
- (void)testEmitWriteCompletionSyntax {
[self.socket emit:@"testEmit" with:@[@YES] completion:^{}];
}
- (void)testEmitWriteCompletion {
XCTestExpectation* expect = [self expectationWithDescription:@"Write completion should be called"];
[self.socket emit:@"testEmit" with:@[@YES] completion:^{
[expect fulfill];
}];
[self waitForExpectationsWithTimeout:0.3 handler:nil];
}
- (void)testRawEmitSyntax { - (void)testRawEmitSyntax {
[[self.socket rawEmitView] emit:@"myEvent" with:@[@1]]; [[self.socket rawEmitView] emit:@"myEvent" with:@[@1]];
} }