Fix connection issues, binary data in nested data structures, some race conditions, reconnect needs to be redone

This commit is contained in:
Erik 2015-03-05 22:06:17 -05:00
parent 166b2b515b
commit 851ac7635a
2 changed files with 188 additions and 139 deletions

View File

@ -34,7 +34,7 @@ extension String {
} }
} }
private typealias ProbeQueue = [() -> Void] private typealias PollWaitQueue = [() -> Void]
private enum PacketType: String { private enum PacketType: String {
case OPEN = "0" case OPEN = "0"
@ -49,16 +49,21 @@ private enum PacketType: String {
class SocketEngine: NSObject, SRWebSocketDelegate { class SocketEngine: NSObject, SRWebSocketDelegate {
unowned let client:SocketIOClient unowned let client:SocketIOClient
private let workQueue = NSOperationQueue() private let workQueue = NSOperationQueue()
private let emitQueue = dispatch_queue_create(
"emitQueue".cStringUsingEncoding(NSUTF8StringEncoding), DISPATCH_QUEUE_SERIAL)
private let handleQueue = dispatch_queue_create( private let handleQueue = dispatch_queue_create(
"handleQueue".cStringUsingEncoding(NSUTF8StringEncoding), DISPATCH_QUEUE_SERIAL) "handleQueue".cStringUsingEncoding(NSUTF8StringEncoding), DISPATCH_QUEUE_SERIAL)
private var forcePolling = false private var forcePolling = false
private var pingTimer:NSTimer? private var pingTimer:NSTimer?
private var postWait = [String]()
private var _polling = true private var _polling = true
private var probing = false private var probing = false
private var probeWait = ProbeQueue() private var probeWait = PollWaitQueue()
private var wait = false private var waitingForPoll = false
private var waitingForPost = false
private var _websocket = false private var _websocket = false
private var websocketConnected = false private var websocketConnected = false
var connected = false
var pingInterval:Int? var pingInterval:Int?
var polling:Bool { var polling:Bool {
return self._polling return self._polling
@ -133,21 +138,22 @@ class SocketEngine: NSObject, SRWebSocketDelegate {
} }
private func doPoll() { private func doPoll() {
if self.urlPolling == nil || self.websocket || self.wait { if self.urlPolling == nil || self.websocket || self.waitingForPoll || !self.connected {
return return
} }
let req = NSURLRequest(URL: let req = NSURLRequest(URL:
NSURL(string: self.urlPolling! + "&sid=\(self.sid)")!) NSURL(string: self.urlPolling! + "&sid=\(self.sid)")!)
self.wait = true self.waitingForPoll = true
NSURLConnection.sendAsynchronousRequest(req, NSURLConnection.sendAsynchronousRequest(req,
queue: self.workQueue) {[weak self] res, data, err in queue: self.workQueue) {[weak self] res, data, err in
if self == nil { if self == nil {
return return
} else if err != nil { } else if err != nil {
// println(err) if self!.polling {
self?.handlePollingFailed() self?.handlePollingFailed(err)
}
return return
} }
@ -156,29 +162,91 @@ class SocketEngine: NSObject, SRWebSocketDelegate {
if let str = NSString(data: data, encoding: NSUTF8StringEncoding) { if let str = NSString(data: data, encoding: NSUTF8StringEncoding) {
// println(str) // println(str)
dispatch_async(self?.handleQueue) {[weak self] in
self?.parsePollingMessage(str) self?.parsePollingMessage(str)
return
}
} }
self?.wait = false self?.waitingForPoll = false
self?.doPoll() self?.doPoll()
} }
} }
private func flushProbeWait() { private func flushProbeWait() {
for waiter in self.probeWait { // println("flushing probe wait")
dispatch_async(self.emitQueue) {[weak self] in
if self == nil {
return
}
for waiter in self!.probeWait {
waiter() waiter()
} }
self.probeWait.removeAll(keepCapacity: false) self?.probeWait.removeAll(keepCapacity: false)
}
}
private func flushWaitingForPost() {
if self.postWait.count == 0 || !self.connected || !self.polling {
return
}
let postStr = self.postWait.reduce("") {$0 + $1}
assert(self.postWait.count != 0)
self.postWait.removeAll(keepCapacity: true)
var req = NSMutableURLRequest(URL:
NSURL(string: self.urlPolling! + "&sid=\(self.sid)")!)
req.HTTPMethod = "POST"
req.setValue("application/html-text", forHTTPHeaderField: "Content-Type")
let postData = postStr.dataUsingEncoding(NSUTF8StringEncoding,
allowLossyConversion: false)!
req.setValue(String(postData.length), forHTTPHeaderField: "Content-Length")
req.HTTPBody = postData
self.waitingForPost = true
NSURLConnection.sendAsynchronousRequest(req, queue: self.workQueue) {[weak self] res, data, err in
if err != nil {
if self!.polling {
self?.handlePollingFailed(err)
}
return
}
self?.flushWaitingForPost()
self?.waitingForPost = false
self?.doPoll()
}
}
// A poll failed, tell the client about it
// We check to see if we were closed by the server first
private func handlePollingFailed(reason:NSError?) {
if !self.client.reconnecting {
self.connected = false
self.pingTimer?.invalidate()
self.waitingForPoll = false
self.waitingForPost = false
self.client.pollingDidFail(reason)
}
} }
func open(opts:[String: AnyObject]? = nil) { func open(opts:[String: AnyObject]? = nil) {
if self.waitingForPost || self.waitingForPoll || self.websocket || self.connected {
assert(false, "We're in a bad state, this shouldn't happen.")
}
let (urlPolling, urlWebSocket) = self.createURLs(params: opts) let (urlPolling, urlWebSocket) = self.createURLs(params: opts)
self.urlPolling = urlPolling self.urlPolling = urlPolling
self.urlWebSocket = urlWebSocket self.urlWebSocket = urlWebSocket
let time = Int(NSDate().timeIntervalSince1970) let reqPolling = NSURLRequest(URL: NSURL(string: urlPolling + "&b64=1")!)
let reqPolling = NSURLRequest(URL: NSURL(string: urlPolling + "&t=\(time)-0&b64=1")!)
NSURLConnection.sendAsynchronousRequest(reqPolling, NSURLConnection.sendAsynchronousRequest(reqPolling,
queue: self.workQueue) {[weak self] res, data, err in queue: self.workQueue) {[weak self] res, data, err in
@ -186,10 +254,10 @@ class SocketEngine: NSObject, SRWebSocketDelegate {
if self == nil { if self == nil {
return return
} else if err != nil || data == nil { } else if err != nil || data == nil {
// println(err) if self!.polling {
self?.handlePollingFailed() self?.handlePollingFailed(err)
}
return return
} }
if let dataString = NSString(data: data, encoding: NSUTF8StringEncoding) { if let dataString = NSString(data: data, encoding: NSUTF8StringEncoding) {
@ -209,6 +277,8 @@ class SocketEngine: NSObject, SRWebSocketDelegate {
return return
} }
self?.connected = true
if let json = NSJSONSerialization.JSONObjectWithData(jsonData!, if let json = NSJSONSerialization.JSONObjectWithData(jsonData!,
options: NSJSONReadingOptions.AllowFragments, error: &err) as? NSDictionary { options: NSJSONReadingOptions.AllowFragments, error: &err) as? NSDictionary {
if let sid = json["sid"] as? String { if let sid = json["sid"] as? String {
@ -237,17 +307,6 @@ class SocketEngine: NSObject, SRWebSocketDelegate {
} }
} }
// A poll failed, tell the client about it
// We check to see if we were closed by the server first
private func handlePollingFailed() {
if !self.client.reconnecting {
self.pingTimer?.invalidate()
self.wait = false
self.client.pollingDidFail()
}
}
// Translatation of engine.io-parser#decodePayload // Translatation of engine.io-parser#decodePayload
private func parsePollingMessage(str:String) { private func parsePollingMessage(str:String) {
if str.length == 1 { if str.length == 1 {
@ -277,8 +336,8 @@ class SocketEngine: NSObject, SRWebSocketDelegate {
if chr != ":" { if chr != ":" {
length += chr length += chr
} else { } else {
if testLength(length, &n) || length == "" { if length == "" || testLength(length, &n) {
self.handlePollingFailed() self.handlePollingFailed(nil)
return return
} }
@ -304,10 +363,9 @@ class SocketEngine: NSObject, SRWebSocketDelegate {
private func parseEngineMessage(message:AnyObject?) { private func parseEngineMessage(message:AnyObject?) {
// println(message) // println(message)
dispatch_async(self.handleQueue) {[weak self] in
if let data = message as? NSData { if let data = message as? NSData {
// Strip off message type // Strip off message type
self?.client.parseSocketMessage(data.subdataWithRange(NSMakeRange(1, data.length - 1))) self.client.parseSocketMessage(data.subdataWithRange(NSMakeRange(1, data.length - 1)))
return return
} }
@ -316,7 +374,7 @@ class SocketEngine: NSObject, SRWebSocketDelegate {
// We should upgrade // We should upgrade
if strMessage == "3probe" { if strMessage == "3probe" {
self?.upgradeTransport() self.upgradeTransport()
return return
} }
@ -332,7 +390,7 @@ class SocketEngine: NSObject, SRWebSocketDelegate {
if let data = NSData(base64EncodedString: messageString, if let data = NSData(base64EncodedString: messageString,
options: NSDataBase64DecodingOptions.IgnoreUnknownCharacters) { options: NSDataBase64DecodingOptions.IgnoreUnknownCharacters) {
// println("sending \(data)") // println("sending \(data)")
self?.client.parseSocketMessage(data) self.client.parseSocketMessage(data)
} }
return return
@ -350,8 +408,7 @@ class SocketEngine: NSObject, SRWebSocketDelegate {
messageString.removeAtIndex(messageString.startIndex) messageString.removeAtIndex(messageString.startIndex)
// println("sending \(messageString)") // println("sending \(messageString)")
self?.client.parseSocketMessage(messageString) self.client.parseSocketMessage(messageString)
}
} }
private func probeWebSocket() { private func probeWebSocket() {
@ -363,26 +420,32 @@ class SocketEngine: NSObject, SRWebSocketDelegate {
func send(msg:String, datas:[NSData]? = nil) { func send(msg:String, datas:[NSData]? = nil) {
let _send = {[weak self] (msg:String, datas:[NSData]?) -> () -> Void in let _send = {[weak self] (msg:String, datas:[NSData]?) -> () -> Void in
return { return {
if self == nil { if self == nil || !self!.connected {
return return
} }
if self!.websocket { if self!.websocket {
// println("sending ws: \(msg)") // println("sending ws: \(msg):\(datas)")
self?.sendWebSocketMessage(msg, withType: PacketType.MESSAGE, datas: datas) self?.sendWebSocketMessage(msg, withType: PacketType.MESSAGE, datas: datas)
} else { } else {
// println("sending poll: \(msg)") // println("sending poll: \(msg):\(datas)")
self?.sendPollMessage(msg, withType: PacketType.MESSAGE, datas: datas) self?.sendPollMessage(msg, withType: PacketType.MESSAGE, datas: datas)
} }
} }
} }
if self.probing { dispatch_async(self.emitQueue) {[weak self] in
self.probeWait.append(_send(msg, datas)) if self == nil {
return
}
if self!.probing {
self?.probeWait.append(_send(msg, datas))
} else { } else {
_send(msg, datas)() _send(msg, datas)()
} }
} }
}
func sendPing() { func sendPing() {
// println("sending ping") // println("sending ping")
@ -398,11 +461,6 @@ class SocketEngine: NSObject, SRWebSocketDelegate {
// println("Sending: \(msg)") // println("Sending: \(msg)")
var postData:NSData var postData:NSData
var bDatas:[String]? var bDatas:[String]?
var req = NSMutableURLRequest(URL:
NSURL(string: self.urlPolling! + "&sid=\(self.sid)")!)
req.HTTPMethod = "POST"
req.setValue("application/html-text", forHTTPHeaderField: "Content-Type")
if datas != nil { if datas != nil {
bDatas = [String]() bDatas = [String]()
@ -424,21 +482,13 @@ class SocketEngine: NSObject, SRWebSocketDelegate {
} }
} }
postData = postStr.dataUsingEncoding(NSUTF8StringEncoding, self.postWait.append(postStr)
allowLossyConversion: false)!
if waitingForPost {
req.setValue(String(postData.length), forHTTPHeaderField: "Content-Length") self.doPoll()
req.HTTPBody = postData
NSURLConnection.sendAsynchronousRequest(req, queue: self.workQueue) {[weak self] res, data, err in
if err != nil {
// println(err)
self?.handlePollingFailed()
return return
} } else {
self.flushWaitingForPost()
self?.doPoll()
} }
} }
@ -472,6 +522,7 @@ class SocketEngine: NSObject, SRWebSocketDelegate {
if self.websocketConnected { if self.websocketConnected {
self.probing = false self.probing = false
self._websocket = true self._websocket = true
self.waitingForPoll = false
self._polling = false self._polling = false
self.sendWebSocketMessage("", withType: PacketType.UPGRADE) self.sendWebSocketMessage("", withType: PacketType.UPGRADE)
self.flushProbeWait() self.flushProbeWait()
@ -481,7 +532,11 @@ class SocketEngine: NSObject, SRWebSocketDelegate {
// Called when a message is recieved // Called when a message is recieved
func webSocket(webSocket:SRWebSocket!, didReceiveMessage message:AnyObject?) { func webSocket(webSocket:SRWebSocket!, didReceiveMessage message:AnyObject?) {
// println(message) // println(message)
self.parseEngineMessage(message)
dispatch_async(self.handleQueue) {[weak self] in
self?.parseEngineMessage(message)
return
}
} }
// Called when the socket is opened // Called when the socket is opened
@ -495,13 +550,15 @@ class SocketEngine: NSObject, SRWebSocketDelegate {
func webSocket(webSocket:SRWebSocket!, didCloseWithCode code:Int, reason:String!, wasClean:Bool) { func webSocket(webSocket:SRWebSocket!, didCloseWithCode code:Int, reason:String!, wasClean:Bool) {
self.websocketConnected = false self.websocketConnected = false
self.probing = false self.probing = false
self.flushProbeWait()
if self.websocket { if self.websocket {
self.pingTimer?.invalidate() self.pingTimer?.invalidate()
self.connected = false
self._websocket = false self._websocket = false
self._polling = true self._polling = true
self.client.webSocketDidCloseWithCode(code, reason: reason, wasClean: wasClean) self.client.webSocketDidCloseWithCode(code, reason: reason, wasClean: wasClean)
} else {
self.flushProbeWait()
} }
} }
@ -510,11 +567,13 @@ class SocketEngine: NSObject, SRWebSocketDelegate {
self.websocketConnected = false self.websocketConnected = false
self._polling = true self._polling = true
self.probing = false self.probing = false
self.flushProbeWait()
if self.websocket { if self.websocket {
self.pingTimer?.invalidate() self.pingTimer?.invalidate()
self.connected = false
self.client.webSocketDidFailWithError(error) self.client.webSocketDidFailWithError(error)
} else {
self.flushProbeWait()
} }
} }
} }

View File

@ -151,11 +151,8 @@ class SocketIOClient {
} }
dispatch_async(self.emitQueue) {[weak self] in dispatch_async(self.emitQueue) {[weak self] in
if self == nil {
return
}
self?._emit(event, args) self?._emit(event, args)
return
} }
} }
@ -169,11 +166,8 @@ class SocketIOClient {
self.ackHandlers.append(ackHandler) self.ackHandlers.append(ackHandler)
dispatch_async(self.emitQueue) {[weak self] in dispatch_async(self.emitQueue) {[weak self] in
if self == nil {
return
}
self?._emit(event, args, ack: true) self?._emit(event, args, ack: true)
return
} }
return ackHandler return ackHandler
@ -324,29 +318,27 @@ class SocketIOClient {
} }
// Parse an NSArray looking for binary data // Parse an NSArray looking for binary data
private class func parseArray(arr:NSArray, var placeholders:Int) -> (NSArray, Bool, [NSData]) { private class func parseArray(arr:NSArray, var currentPlaceholder:Int) -> (NSArray, Bool, [NSData]) {
var replacementArr = [AnyObject](count: arr.count, repeatedValue: 1) var replacementArr = [AnyObject](count: arr.count, repeatedValue: 1)
var hasBinary = false var hasBinary = false
var arrayDatas = [NSData]() var arrayDatas = [NSData]()
if placeholders == -1 {
placeholders = 0
}
for g in 0..<arr.count { for g in 0..<arr.count {
if arr[g] is NSData { if arr[g] is NSData {
hasBinary = true hasBinary = true
currentPlaceholder++
let sendData = arr[g] as NSData let sendData = arr[g] as NSData
arrayDatas.append(sendData) arrayDatas.append(sendData)
replacementArr[g] = ["_placeholder": true, replacementArr[g] = ["_placeholder": true,
"num": placeholders++] "num": currentPlaceholder]
} else if let dict = arr[g] as? NSDictionary { } else if let dict = arr[g] as? NSDictionary {
let (nestDict, hadBinary, dictArrs) = self.parseNSDictionary(dict, placeholders: placeholders) let (nestDict, hadBinary, dictArrs) = self.parseNSDictionary(dict,
currentPlaceholder: currentPlaceholder)
if hadBinary { if hadBinary {
hasBinary = true hasBinary = true
placeholders += dictArrs.count currentPlaceholder += dictArrs.count
replacementArr[g] = nestDict replacementArr[g] = nestDict
arrayDatas.extend(dictArrs) arrayDatas.extend(dictArrs)
} else { } else {
@ -354,11 +346,12 @@ class SocketIOClient {
} }
} else if let nestArr = arr[g] as? NSArray { } else if let nestArr = arr[g] as? NSArray {
// Recursive // Recursive
let (nested, hadBinary, nestDatas) = self.parseArray(nestArr, placeholders: placeholders) let (nested, hadBinary, nestDatas) = self.parseArray(nestArr,
currentPlaceholder: currentPlaceholder)
if hadBinary { if hadBinary {
hasBinary = true hasBinary = true
placeholders += nestDatas.count currentPlaceholder += nestDatas.count
replacementArr[g] = nested replacementArr[g] = nested
arrayDatas.extend(nestDatas) arrayDatas.extend(nestDatas)
} else { } else {
@ -393,7 +386,7 @@ class SocketIOClient {
private class func parseEmitArgs(args:[AnyObject]) -> ([AnyObject], Bool, [NSData]) { private class func parseEmitArgs(args:[AnyObject]) -> ([AnyObject], Bool, [NSData]) {
var items = [AnyObject](count: args.count, repeatedValue: 1) var items = [AnyObject](count: args.count, repeatedValue: 1)
var numberOfPlaceholders = -1 var currentPlaceholder = -1
var hasBinary = false var hasBinary = false
var emitDatas = [NSData]() var emitDatas = [NSData]()
@ -401,9 +394,9 @@ class SocketIOClient {
if let dict = args[i] as? NSDictionary { if let dict = args[i] as? NSDictionary {
// Check for binary data // Check for binary data
let (newDict, hadBinary, binaryDatas) = SocketIOClient.parseNSDictionary(dict, let (newDict, hadBinary, binaryDatas) = SocketIOClient.parseNSDictionary(dict,
placeholders: numberOfPlaceholders) currentPlaceholder: currentPlaceholder)
if hadBinary { if hadBinary {
numberOfPlaceholders = binaryDatas.count currentPlaceholder += binaryDatas.count
emitDatas.extend(binaryDatas) emitDatas.extend(binaryDatas)
hasBinary = true hasBinary = true
@ -414,11 +407,11 @@ class SocketIOClient {
} else if let arr = args[i] as? NSArray { } else if let arr = args[i] as? NSArray {
// arg is array, check for binary // arg is array, check for binary
let (replace, hadData, newDatas) = SocketIOClient.parseArray(arr, let (replace, hadData, newDatas) = SocketIOClient.parseArray(arr,
placeholders: numberOfPlaceholders) currentPlaceholder: currentPlaceholder)
if hadData { if hadData {
hasBinary = true hasBinary = true
numberOfPlaceholders += emitDatas.count currentPlaceholder += newDatas.count
for data in newDatas { for data in newDatas {
emitDatas.append(data) emitDatas.append(data)
@ -432,8 +425,8 @@ class SocketIOClient {
// args is just binary // args is just binary
hasBinary = true hasBinary = true
numberOfPlaceholders++ currentPlaceholder++
items[i] = ["_placeholder": true, "num": numberOfPlaceholders] items[i] = ["_placeholder": true, "num": currentPlaceholder]
emitDatas.append(binaryData) emitDatas.append(binaryData)
} else { } else {
items[i] = args[i] items[i] = args[i]
@ -444,39 +437,36 @@ class SocketIOClient {
} }
// Parses a NSDictionary, looking for NSData objects // Parses a NSDictionary, looking for NSData objects
private class func parseNSDictionary(dict:NSDictionary, var placeholders:Int) -> (NSDictionary, Bool, [NSData]) { private class func parseNSDictionary(dict:NSDictionary, var currentPlaceholder:Int) -> (NSDictionary, Bool, [NSData]) {
var returnDict = NSMutableDictionary() var returnDict = NSMutableDictionary()
var hasBinary = false var hasBinary = false
if placeholders == -1 {
placeholders = 0
}
var returnDatas = [NSData]() var returnDatas = [NSData]()
for (key, value) in dict { for (key, value) in dict {
if let binaryData = value as? NSData { if let binaryData = value as? NSData {
currentPlaceholder++
hasBinary = true hasBinary = true
returnDatas.append(binaryData) returnDatas.append(binaryData)
returnDict[key as String] = ["_placeholder": true, "num": placeholders++] returnDict[key as String] = ["_placeholder": true, "num": currentPlaceholder++]
} else if let arr = value as? NSArray { } else if let arr = value as? NSArray {
let (replace, hadBinary, arrDatas) = self.parseArray(arr, placeholders: placeholders) let (replace, hadBinary, arrDatas) = self.parseArray(arr, currentPlaceholder: currentPlaceholder)
if hadBinary { if hadBinary {
hasBinary = true hasBinary = true
returnDict[key as String] = replace returnDict[key as String] = replace
placeholders += arrDatas.count currentPlaceholder += arrDatas.count
returnDatas.extend(arrDatas) returnDatas.extend(arrDatas)
} else { } else {
returnDict[key as String] = arr returnDict[key as String] = arr
} }
} else if let dict = value as? NSDictionary { } else if let dict = value as? NSDictionary {
// Recursive // Recursive
let (nestDict, hadBinary, nestDatas) = self.parseNSDictionary(dict, placeholders: placeholders) let (nestDict, hadBinary, nestDatas) = self.parseNSDictionary(dict, currentPlaceholder: currentPlaceholder)
if hadBinary { if hadBinary {
hasBinary = true hasBinary = true
returnDict[key as String] = nestDict returnDict[key as String] = nestDict
placeholders += nestDatas.count currentPlaceholder += nestDatas.count
returnDatas.extend(nestDatas) returnDatas.extend(nestDatas)
} else { } else {
returnDict[key as String] = dict returnDict[key as String] = dict
@ -772,9 +762,9 @@ class SocketIOClient {
} }
// Something happened while polling // Something happened while polling
func pollingDidFail() { func pollingDidFail(err:NSError?) {
if !self.reconnecting { if !self.reconnecting {
self.handleEvent("reconnect", data: "XHR polling error", isInternalMessage: true) self.handleEvent("reconnect", data: err?.localizedDescription, isInternalMessage: true)
self.tryReconnect(triesLeft: self.reconnectAttempts) self.tryReconnect(triesLeft: self.reconnectAttempts)
} }
} }