diff --git a/SwiftIO/SocketEngine.swift b/SwiftIO/SocketEngine.swift index 6bd66e0..d44f3a2 100644 --- a/SwiftIO/SocketEngine.swift +++ b/SwiftIO/SocketEngine.swift @@ -34,7 +34,7 @@ extension String { } } -private typealias ProbeQueue = [() -> Void] +private typealias PollWaitQueue = [() -> Void] private enum PacketType: String { case OPEN = "0" @@ -49,16 +49,21 @@ private enum PacketType: String { class SocketEngine: NSObject, SRWebSocketDelegate { unowned let client:SocketIOClient private let workQueue = NSOperationQueue() + private let emitQueue = dispatch_queue_create( + "emitQueue".cStringUsingEncoding(NSUTF8StringEncoding), DISPATCH_QUEUE_SERIAL) private let handleQueue = dispatch_queue_create( "handleQueue".cStringUsingEncoding(NSUTF8StringEncoding), DISPATCH_QUEUE_SERIAL) private var forcePolling = false private var pingTimer:NSTimer? + private var postWait = [String]() private var _polling = true private var probing = false - private var probeWait = ProbeQueue() - private var wait = false + private var probeWait = PollWaitQueue() + private var waitingForPoll = false + private var waitingForPost = false private var _websocket = false private var websocketConnected = false + var connected = false var pingInterval:Int? var polling:Bool { return self._polling @@ -133,21 +138,22 @@ class SocketEngine: NSObject, SRWebSocketDelegate { } private func doPoll() { - if self.urlPolling == nil || self.websocket || self.wait { + if self.urlPolling == nil || self.websocket || self.waitingForPoll || !self.connected { return } let req = NSURLRequest(URL: NSURL(string: self.urlPolling! + "&sid=\(self.sid)")!) - self.wait = true + self.waitingForPoll = true NSURLConnection.sendAsynchronousRequest(req, queue: self.workQueue) {[weak self] res, data, err in if self == nil { return } else if err != nil { - // println(err) - self?.handlePollingFailed() + if self!.polling { + self?.handlePollingFailed(err) + } return } @@ -156,29 +162,91 @@ class SocketEngine: NSObject, SRWebSocketDelegate { if let str = NSString(data: data, encoding: NSUTF8StringEncoding) { // println(str) - self?.parsePollingMessage(str) + dispatch_async(self?.handleQueue) {[weak self] in + self?.parsePollingMessage(str) + return + } } - self?.wait = false + self?.waitingForPoll = false self?.doPoll() } } private func flushProbeWait() { - for waiter in self.probeWait { - waiter() + // println("flushing probe wait") + dispatch_async(self.emitQueue) {[weak self] in + if self == nil { + return + } + + for waiter in self!.probeWait { + waiter() + } + + self?.probeWait.removeAll(keepCapacity: false) + } + } + + private func flushWaitingForPost() { + if self.postWait.count == 0 || !self.connected || !self.polling { + return } - self.probeWait.removeAll(keepCapacity: false) + let postStr = self.postWait.reduce("") {$0 + $1} + assert(self.postWait.count != 0) + self.postWait.removeAll(keepCapacity: true) + + var req = NSMutableURLRequest(URL: + NSURL(string: self.urlPolling! + "&sid=\(self.sid)")!) + + req.HTTPMethod = "POST" + req.setValue("application/html-text", forHTTPHeaderField: "Content-Type") + + let postData = postStr.dataUsingEncoding(NSUTF8StringEncoding, + allowLossyConversion: false)! + + + req.setValue(String(postData.length), forHTTPHeaderField: "Content-Length") + req.HTTPBody = postData + + self.waitingForPost = true + NSURLConnection.sendAsynchronousRequest(req, queue: self.workQueue) {[weak self] res, data, err in + if err != nil { + if self!.polling { + self?.handlePollingFailed(err) + } + return + } + + self?.flushWaitingForPost() + self?.waitingForPost = false + self?.doPoll() + } + } + + // A poll failed, tell the client about it + // We check to see if we were closed by the server first + private func handlePollingFailed(reason:NSError?) { + if !self.client.reconnecting { + self.connected = false + self.pingTimer?.invalidate() + self.waitingForPoll = false + self.waitingForPost = false + self.client.pollingDidFail(reason) + } } func open(opts:[String: AnyObject]? = nil) { + if self.waitingForPost || self.waitingForPoll || self.websocket || self.connected { + assert(false, "We're in a bad state, this shouldn't happen.") + } + let (urlPolling, urlWebSocket) = self.createURLs(params: opts) self.urlPolling = urlPolling self.urlWebSocket = urlWebSocket - let time = Int(NSDate().timeIntervalSince1970) - let reqPolling = NSURLRequest(URL: NSURL(string: urlPolling + "&t=\(time)-0&b64=1")!) + let reqPolling = NSURLRequest(URL: NSURL(string: urlPolling + "&b64=1")!) NSURLConnection.sendAsynchronousRequest(reqPolling, queue: self.workQueue) {[weak self] res, data, err in @@ -186,10 +254,10 @@ class SocketEngine: NSObject, SRWebSocketDelegate { if self == nil { return } else if err != nil || data == nil { - // println(err) - self?.handlePollingFailed() + if self!.polling { + self?.handlePollingFailed(err) + } return - } if let dataString = NSString(data: data, encoding: NSUTF8StringEncoding) { @@ -209,6 +277,8 @@ class SocketEngine: NSObject, SRWebSocketDelegate { return } + self?.connected = true + if let json = NSJSONSerialization.JSONObjectWithData(jsonData!, options: NSJSONReadingOptions.AllowFragments, error: &err) as? NSDictionary { if let sid = json["sid"] as? String { @@ -237,17 +307,6 @@ class SocketEngine: NSObject, SRWebSocketDelegate { } } - // A poll failed, tell the client about it - // We check to see if we were closed by the server first - private func handlePollingFailed() { - if !self.client.reconnecting { - self.pingTimer?.invalidate() - self.wait = false - - self.client.pollingDidFail() - } - } - // Translatation of engine.io-parser#decodePayload private func parsePollingMessage(str:String) { if str.length == 1 { @@ -277,8 +336,8 @@ class SocketEngine: NSObject, SRWebSocketDelegate { if chr != ":" { length += chr } else { - if testLength(length, &n) || length == "" { - self.handlePollingFailed() + if length == "" || testLength(length, &n) { + self.handlePollingFailed(nil) return } @@ -304,54 +363,52 @@ class SocketEngine: NSObject, SRWebSocketDelegate { private func parseEngineMessage(message:AnyObject?) { // println(message) - dispatch_async(self.handleQueue) {[weak self] in - if let data = message as? NSData { - // Strip off message type - self?.client.parseSocketMessage(data.subdataWithRange(NSMakeRange(1, data.length - 1))) - return - } - - var messageString = message as String - var strMessage = RegexMutable(messageString) - - // We should upgrade - if strMessage == "3probe" { - self?.upgradeTransport() - return - } - - let type = strMessage["^(\\d)"].groups()?[1] - - if type != PacketType.MESSAGE.rawValue { - // TODO Handle other packets - if messageString.hasPrefix("b4") { - // binary in base64 string - messageString.removeRange(Range(start: messageString.startIndex, - end: advance(messageString.startIndex, 2))) - - if let data = NSData(base64EncodedString: messageString, - options: NSDataBase64DecodingOptions.IgnoreUnknownCharacters) { - // println("sending \(data)") - self?.client.parseSocketMessage(data) - } - - return + if let data = message as? NSData { + // Strip off message type + self.client.parseSocketMessage(data.subdataWithRange(NSMakeRange(1, data.length - 1))) + return + } + + var messageString = message as String + var strMessage = RegexMutable(messageString) + + // We should upgrade + if strMessage == "3probe" { + self.upgradeTransport() + return + } + + let type = strMessage["^(\\d)"].groups()?[1] + + if type != PacketType.MESSAGE.rawValue { + // TODO Handle other packets + if messageString.hasPrefix("b4") { + // binary in base64 string + messageString.removeRange(Range(start: messageString.startIndex, + end: advance(messageString.startIndex, 2))) + + if let data = NSData(base64EncodedString: messageString, + options: NSDataBase64DecodingOptions.IgnoreUnknownCharacters) { + // println("sending \(data)") + self.client.parseSocketMessage(data) } - if messageString == PacketType.CLOSE.rawValue { - // do nothing - return - } - // println("Got something idk what to do with") - // println(messageString) + return } - // Remove message type - messageString.removeAtIndex(messageString.startIndex) - // println("sending \(messageString)") - - self?.client.parseSocketMessage(messageString) + if messageString == PacketType.CLOSE.rawValue { + // do nothing + return + } + // println("Got something idk what to do with") + // println(messageString) } + + // Remove message type + messageString.removeAtIndex(messageString.startIndex) + // println("sending \(messageString)") + + self.client.parseSocketMessage(messageString) } private func probeWebSocket() { @@ -363,24 +420,30 @@ class SocketEngine: NSObject, SRWebSocketDelegate { func send(msg:String, datas:[NSData]? = nil) { let _send = {[weak self] (msg:String, datas:[NSData]?) -> () -> Void in return { - if self == nil { + if self == nil || !self!.connected { return } if self!.websocket { - // println("sending ws: \(msg)") + // println("sending ws: \(msg):\(datas)") self?.sendWebSocketMessage(msg, withType: PacketType.MESSAGE, datas: datas) } else { - // println("sending poll: \(msg)") + // println("sending poll: \(msg):\(datas)") self?.sendPollMessage(msg, withType: PacketType.MESSAGE, datas: datas) } } } - if self.probing { - self.probeWait.append(_send(msg, datas)) - } else { - _send(msg, datas)() + dispatch_async(self.emitQueue) {[weak self] in + if self == nil { + return + } + + if self!.probing { + self?.probeWait.append(_send(msg, datas)) + } else { + _send(msg, datas)() + } } } @@ -398,11 +461,6 @@ class SocketEngine: NSObject, SRWebSocketDelegate { // println("Sending: \(msg)") var postData:NSData var bDatas:[String]? - var req = NSMutableURLRequest(URL: - NSURL(string: self.urlPolling! + "&sid=\(self.sid)")!) - - req.HTTPMethod = "POST" - req.setValue("application/html-text", forHTTPHeaderField: "Content-Type") if datas != nil { bDatas = [String]() @@ -424,21 +482,13 @@ class SocketEngine: NSObject, SRWebSocketDelegate { } } - postData = postStr.dataUsingEncoding(NSUTF8StringEncoding, - allowLossyConversion: false)! + self.postWait.append(postStr) - - req.setValue(String(postData.length), forHTTPHeaderField: "Content-Length") - req.HTTPBody = postData - - NSURLConnection.sendAsynchronousRequest(req, queue: self.workQueue) {[weak self] res, data, err in - if err != nil { - // println(err) - self?.handlePollingFailed() - return - } - - self?.doPoll() + if waitingForPost { + self.doPoll() + return + } else { + self.flushWaitingForPost() } } @@ -472,6 +522,7 @@ class SocketEngine: NSObject, SRWebSocketDelegate { if self.websocketConnected { self.probing = false self._websocket = true + self.waitingForPoll = false self._polling = false self.sendWebSocketMessage("", withType: PacketType.UPGRADE) self.flushProbeWait() @@ -481,7 +532,11 @@ class SocketEngine: NSObject, SRWebSocketDelegate { // Called when a message is recieved func webSocket(webSocket:SRWebSocket!, didReceiveMessage message:AnyObject?) { // println(message) - self.parseEngineMessage(message) + + dispatch_async(self.handleQueue) {[weak self] in + self?.parseEngineMessage(message) + return + } } // Called when the socket is opened @@ -495,13 +550,15 @@ class SocketEngine: NSObject, SRWebSocketDelegate { func webSocket(webSocket:SRWebSocket!, didCloseWithCode code:Int, reason:String!, wasClean:Bool) { self.websocketConnected = false self.probing = false - self.flushProbeWait() if self.websocket { self.pingTimer?.invalidate() + self.connected = false self._websocket = false self._polling = true self.client.webSocketDidCloseWithCode(code, reason: reason, wasClean: wasClean) + } else { + self.flushProbeWait() } } @@ -510,11 +567,13 @@ class SocketEngine: NSObject, SRWebSocketDelegate { self.websocketConnected = false self._polling = true self.probing = false - self.flushProbeWait() if self.websocket { self.pingTimer?.invalidate() + self.connected = false self.client.webSocketDidFailWithError(error) + } else { + self.flushProbeWait() } } } \ No newline at end of file diff --git a/SwiftIO/SocketIOClient.swift b/SwiftIO/SocketIOClient.swift index e5390b4..c761a39 100644 --- a/SwiftIO/SocketIOClient.swift +++ b/SwiftIO/SocketIOClient.swift @@ -151,11 +151,8 @@ class SocketIOClient { } dispatch_async(self.emitQueue) {[weak self] in - if self == nil { - return - } - self?._emit(event, args) + return } } @@ -169,11 +166,8 @@ class SocketIOClient { self.ackHandlers.append(ackHandler) dispatch_async(self.emitQueue) {[weak self] in - if self == nil { - return - } - self?._emit(event, args, ack: true) + return } return ackHandler @@ -324,29 +318,27 @@ class SocketIOClient { } // Parse an NSArray looking for binary data - private class func parseArray(arr:NSArray, var placeholders:Int) -> (NSArray, Bool, [NSData]) { + private class func parseArray(arr:NSArray, var currentPlaceholder:Int) -> (NSArray, Bool, [NSData]) { var replacementArr = [AnyObject](count: arr.count, repeatedValue: 1) var hasBinary = false var arrayDatas = [NSData]() - if placeholders == -1 { - placeholders = 0 - } - for g in 0.. ([AnyObject], Bool, [NSData]) { var items = [AnyObject](count: args.count, repeatedValue: 1) - var numberOfPlaceholders = -1 + var currentPlaceholder = -1 var hasBinary = false var emitDatas = [NSData]() @@ -401,9 +394,9 @@ class SocketIOClient { if let dict = args[i] as? NSDictionary { // Check for binary data let (newDict, hadBinary, binaryDatas) = SocketIOClient.parseNSDictionary(dict, - placeholders: numberOfPlaceholders) + currentPlaceholder: currentPlaceholder) if hadBinary { - numberOfPlaceholders = binaryDatas.count + currentPlaceholder += binaryDatas.count emitDatas.extend(binaryDatas) hasBinary = true @@ -414,11 +407,11 @@ class SocketIOClient { } else if let arr = args[i] as? NSArray { // arg is array, check for binary let (replace, hadData, newDatas) = SocketIOClient.parseArray(arr, - placeholders: numberOfPlaceholders) + currentPlaceholder: currentPlaceholder) if hadData { hasBinary = true - numberOfPlaceholders += emitDatas.count + currentPlaceholder += newDatas.count for data in newDatas { emitDatas.append(data) @@ -432,8 +425,8 @@ class SocketIOClient { // args is just binary hasBinary = true - numberOfPlaceholders++ - items[i] = ["_placeholder": true, "num": numberOfPlaceholders] + currentPlaceholder++ + items[i] = ["_placeholder": true, "num": currentPlaceholder] emitDatas.append(binaryData) } else { items[i] = args[i] @@ -444,39 +437,36 @@ class SocketIOClient { } // Parses a NSDictionary, looking for NSData objects - private class func parseNSDictionary(dict:NSDictionary, var placeholders:Int) -> (NSDictionary, Bool, [NSData]) { + private class func parseNSDictionary(dict:NSDictionary, var currentPlaceholder:Int) -> (NSDictionary, Bool, [NSData]) { var returnDict = NSMutableDictionary() var hasBinary = false - if placeholders == -1 { - placeholders = 0 - } var returnDatas = [NSData]() for (key, value) in dict { if let binaryData = value as? NSData { + currentPlaceholder++ hasBinary = true - returnDatas.append(binaryData) - returnDict[key as String] = ["_placeholder": true, "num": placeholders++] + returnDict[key as String] = ["_placeholder": true, "num": currentPlaceholder++] } else if let arr = value as? NSArray { - let (replace, hadBinary, arrDatas) = self.parseArray(arr, placeholders: placeholders) + let (replace, hadBinary, arrDatas) = self.parseArray(arr, currentPlaceholder: currentPlaceholder) if hadBinary { hasBinary = true returnDict[key as String] = replace - placeholders += arrDatas.count + currentPlaceholder += arrDatas.count returnDatas.extend(arrDatas) } else { returnDict[key as String] = arr } } else if let dict = value as? NSDictionary { // Recursive - let (nestDict, hadBinary, nestDatas) = self.parseNSDictionary(dict, placeholders: placeholders) + let (nestDict, hadBinary, nestDatas) = self.parseNSDictionary(dict, currentPlaceholder: currentPlaceholder) if hadBinary { hasBinary = true returnDict[key as String] = nestDict - placeholders += nestDatas.count + currentPlaceholder += nestDatas.count returnDatas.extend(nestDatas) } else { returnDict[key as String] = dict @@ -772,9 +762,9 @@ class SocketIOClient { } // Something happened while polling - func pollingDidFail() { + func pollingDidFail(err:NSError?) { if !self.reconnecting { - self.handleEvent("reconnect", data: "XHR polling error", isInternalMessage: true) + self.handleEvent("reconnect", data: err?.localizedDescription, isInternalMessage: true) self.tryReconnect(triesLeft: self.reconnectAttempts) } }