This commit is contained in:
Erik 2015-10-14 13:21:11 -04:00
parent ab8de6932c
commit a231104c15
3 changed files with 225 additions and 212 deletions

View File

@ -14,6 +14,7 @@ class SocketParserTest: XCTestCase {
static let packetTypes: Dictionary<String, (String, [AnyObject], [NSData], Int)> = [
"0": ("/", [], [], -1), "1": ("/", [], [], -1),
"25[\"test\"]": ("/", ["test"], [], 5),
"2[\"test\",\"~~0\"]": ("/", ["test", "~~0"], [], -1),
"2/swift,[\"testArrayEmitReturn\",[\"test3\",\"test4\"]]": ("/swift", ["testArrayEmitReturn", ["test3", "test4"]], [], -1),
"51-/swift,[\"testMultipleItemsWithBufferEmitReturn\",[1,2],{\"test\":\"bob\"},25,\"polo\",{\"_placeholder\":true,\"num\":0}]": ("/swift", ["testMultipleItemsWithBufferEmitReturn", [1, 2], ["test": "bob"], 25, "polo", "~~0"], [], -1),
"3/swift,0[[\"test3\",\"test4\"]]": ("/swift", [["test3", "test4"]], [], 0),
@ -49,6 +50,11 @@ class SocketParserTest: XCTestCase {
validateParseResult(message)
}
func testBinaryPlaceholderAsString() {
let message = "2[\"test\",\"~~0\"]"
validateParseResult(message)
}
func testNameSpaceArrayParse() {
let message = "2/swift,[\"testArrayEmitReturn\",[\"test3\",\"test4\"]]"
validateParseResult(message)

View File

@ -219,71 +219,6 @@ public final class SocketEngine: NSObject, SocketEngineSpec, WebSocketDelegate {
flushProbeWait()
}
private func doPoll() {
if websocket || waitingForPoll || !connected || closed {
return
}
waitingForPoll = true
let req = NSMutableURLRequest(URL: NSURL(string: urlPolling + "&sid=\(sid)&b64=1")!)
if cookies != nil {
let headers = NSHTTPCookie.requestHeaderFieldsWithCookies(cookies!)
req.allHTTPHeaderFields = headers
}
if extraHeaders != nil {
for (headerName, value) in extraHeaders! {
req.setValue(value, forHTTPHeaderField: headerName)
}
}
doLongPoll(req)
}
private func doRequest(req: NSMutableURLRequest,
withCallback callback: (NSData?, NSURLResponse?, NSError?) -> Void) {
if !polling || closed || invalidated {
return
}
Logger.log("Doing polling request", type: logType)
req.cachePolicy = .ReloadIgnoringLocalAndRemoteCacheData
session.dataTaskWithRequest(req, completionHandler: callback).resume()
}
private func doLongPoll(req: NSMutableURLRequest) {
doRequest(req) {[weak self] data, res, err in
if let this = self {
if err != nil || data == nil {
if this.polling {
this.handlePollingFailed(err?.localizedDescription ?? "Error")
} else {
Logger.error(err?.localizedDescription ?? "Error", type: this.logType)
}
return
}
Logger.log("Got polling response", type: this.logType)
if let str = NSString(data: data!, encoding: NSUTF8StringEncoding) as? String {
dispatch_async(this.parseQueue) {[weak this] in
this?.parsePollingMessage(str)
}
}
this.waitingForPoll = false
if this.fastUpgrade {
this.doFastUpgrade()
} else if !this.closed && this.polling {
this.doPoll()
}
}
}
}
private func flushProbeWait() {
Logger.log("Flushing probe wait", type: logType)
@ -302,78 +237,6 @@ public final class SocketEngine: NSObject, SocketEngineSpec, WebSocketDelegate {
}
}
private func flushWaitingForPost() {
if postWait.count == 0 || !connected {
return
} else if websocket {
flushWaitingForPostToWebSocket()
return
}
var postStr = ""
for packet in postWait {
let len = packet.characters.count
postStr += "\(len):\(packet)"
}
postWait.removeAll(keepCapacity: false)
let req = NSMutableURLRequest(URL: NSURL(string: urlPolling + "&sid=\(sid)")!)
if let cookies = cookies {
let headers = NSHTTPCookie.requestHeaderFieldsWithCookies(cookies)
req.allHTTPHeaderFields = headers
}
req.HTTPMethod = "POST"
req.setValue("text/plain; charset=UTF-8", forHTTPHeaderField: "Content-Type")
let postData = postStr.dataUsingEncoding(NSUTF8StringEncoding,
allowLossyConversion: false)!
req.HTTPBody = postData
req.setValue(String(postData.length), forHTTPHeaderField: "Content-Length")
waitingForPost = true
Logger.log("POSTing: %@", type: logType, args: postStr)
doRequest(req) {[weak self] data, res, err in
if let this = self {
if err != nil && this.polling {
this.handlePollingFailed(err?.localizedDescription ?? "Error")
return
} else if err != nil {
Logger.error(err?.localizedDescription ?? "Error", type: this.logType)
return
}
this.waitingForPost = false
dispatch_async(this.emitQueue) {[weak this] in
if !(this?.fastUpgrade ?? true) {
this?.flushWaitingForPost()
this?.doPoll()
}
}
}
}
}
// We had packets waiting for send when we upgraded
// Send them raw
private func flushWaitingForPostToWebSocket() {
guard let ws = self.ws else {return}
for msg in postWait {
ws.writeString(msg)
}
postWait.removeAll(keepCapacity: true)
}
private func handleClose() {
if let client = client where polling == true {
client.engineDidClose("Disconnect")
@ -487,29 +350,6 @@ public final class SocketEngine: NSObject, SocketEngineSpec, WebSocketDelegate {
doLongPoll(reqPolling)
}
private func parsePollingMessage(str: String) {
guard str.characters.count != 1 else {
return
}
var reader = SocketStringReader(message: str)
while reader.hasNext {
if let n = Int(reader.readUntilStringOccurence(":")) {
let str = reader.read(n)
dispatch_async(handleQueue) {
self.parseEngineMessage(str, fromPolling: true)
}
} else {
dispatch_async(handleQueue) {
self.parseEngineMessage(str, fromPolling: true)
}
break
}
}
}
private func parseEngineData(data: NSData) {
Logger.log("Got binary data: %@", type: "SocketEngine", args: data)
client?.parseBinaryData(data.subdataWithRange(NSMakeRange(1, data.length - 1)))
@ -572,43 +412,6 @@ public final class SocketEngine: NSObject, SocketEngineSpec, WebSocketDelegate {
write("", withType: .Ping, withData: nil)
}
/// Send polling message.
/// Only call on emitQueue
private func sendPollMessage(var msg: String, withType type: SocketEnginePacketType,
datas:[NSData]? = nil) {
Logger.log("Sending poll: %@ as type: %@", type: logType, args: msg, type.rawValue)
doubleEncodeUTF8(&msg)
let strMsg = "\(type.rawValue)\(msg)"
postWait.append(strMsg)
for data in datas ?? [] {
if case let .Right(bin) = createBinaryDataForSend(data) {
postWait.append(bin)
}
}
if !waitingForPost {
flushWaitingForPost()
}
}
/// Send message on WebSockets
/// Only call on emitQueue
private func sendWebSocketMessage(str: String, withType type: SocketEnginePacketType,
datas:[NSData]? = nil) {
Logger.log("Sending ws: %@ as type: %@", type: logType, args: str, type.rawValue)
ws?.writeString("\(type.rawValue)\(str)")
for data in datas ?? [] {
if case let Either.Left(bin) = createBinaryDataForSend(data) {
ws?.writeData(bin)
}
}
}
// Starts the ping timer
private func startPingTimer() {
if let pingInterval = pingInterval {
@ -622,11 +425,6 @@ public final class SocketEngine: NSObject, SocketEngineSpec, WebSocketDelegate {
}
}
func stopPolling() {
invalidated = true
session.finishTasksAndInvalidate()
}
private func upgradeTransport() {
if websocketConnected {
Logger.log("Upgrading transport to WebSockets", type: logType)
@ -655,12 +453,220 @@ public final class SocketEngine: NSObject, SocketEngineSpec, WebSocketDelegate {
}
}
}
}
// Polling methods
private extension SocketEngine {
func doPoll() {
if websocket || waitingForPoll || !connected || closed {
return
}
waitingForPoll = true
let req = NSMutableURLRequest(URL: NSURL(string: urlPolling + "&sid=\(sid)&b64=1")!)
if cookies != nil {
let headers = NSHTTPCookie.requestHeaderFieldsWithCookies(cookies!)
req.allHTTPHeaderFields = headers
}
if extraHeaders != nil {
for (headerName, value) in extraHeaders! {
req.setValue(value, forHTTPHeaderField: headerName)
}
}
doLongPoll(req)
}
func doRequest(req: NSMutableURLRequest,
withCallback callback: (NSData?, NSURLResponse?, NSError?) -> Void) {
if !polling || closed || invalidated {
return
}
Logger.log("Doing polling request", type: logType)
req.cachePolicy = .ReloadIgnoringLocalAndRemoteCacheData
session.dataTaskWithRequest(req, completionHandler: callback).resume()
}
func doLongPoll(req: NSMutableURLRequest) {
doRequest(req) {[weak self] data, res, err in
if let this = self {
if err != nil || data == nil {
if this.polling {
this.handlePollingFailed(err?.localizedDescription ?? "Error")
} else {
Logger.error(err?.localizedDescription ?? "Error", type: this.logType)
}
return
}
Logger.log("Got polling response", type: this.logType)
if let str = NSString(data: data!, encoding: NSUTF8StringEncoding) as? String {
dispatch_async(this.parseQueue) {[weak this] in
this?.parsePollingMessage(str)
}
}
this.waitingForPoll = false
if this.fastUpgrade {
this.doFastUpgrade()
} else if !this.closed && this.polling {
this.doPoll()
}
}
}
}
func flushWaitingForPost() {
if postWait.count == 0 || !connected {
return
} else if websocket {
flushWaitingForPostToWebSocket()
return
}
var postStr = ""
for packet in postWait {
let len = packet.characters.count
postStr += "\(len):\(packet)"
}
postWait.removeAll(keepCapacity: false)
let req = NSMutableURLRequest(URL: NSURL(string: urlPolling + "&sid=\(sid)")!)
if let cookies = cookies {
let headers = NSHTTPCookie.requestHeaderFieldsWithCookies(cookies)
req.allHTTPHeaderFields = headers
}
req.HTTPMethod = "POST"
req.setValue("text/plain; charset=UTF-8", forHTTPHeaderField: "Content-Type")
let postData = postStr.dataUsingEncoding(NSUTF8StringEncoding,
allowLossyConversion: false)!
req.HTTPBody = postData
req.setValue(String(postData.length), forHTTPHeaderField: "Content-Length")
waitingForPost = true
Logger.log("POSTing: %@", type: logType, args: postStr)
doRequest(req) {[weak self] data, res, err in
if let this = self {
if err != nil && this.polling {
this.handlePollingFailed(err?.localizedDescription ?? "Error")
return
} else if err != nil {
Logger.error(err?.localizedDescription ?? "Error", type: this.logType)
return
}
this.waitingForPost = false
dispatch_async(this.emitQueue) {[weak this] in
if !(this?.fastUpgrade ?? true) {
this?.flushWaitingForPost()
this?.doPoll()
}
}
}
}
}
// We had packets waiting for send when we upgraded
// Send them raw
private func flushWaitingForPostToWebSocket() {
guard let ws = self.ws else {return}
for msg in postWait {
ws.writeString(msg)
}
postWait.removeAll(keepCapacity: true)
}
func parsePollingMessage(str: String) {
guard str.characters.count != 1 else {
return
}
var reader = SocketStringReader(message: str)
while reader.hasNext {
if let n = Int(reader.readUntilStringOccurence(":")) {
let str = reader.read(n)
dispatch_async(handleQueue) {
self.parseEngineMessage(str, fromPolling: true)
}
} else {
dispatch_async(handleQueue) {
self.parseEngineMessage(str, fromPolling: true)
}
break
}
}
}
/// Send polling message.
/// Only call on emitQueue
func sendPollMessage(var msg: String, withType type: SocketEnginePacketType,
datas:[NSData]? = nil) {
Logger.log("Sending poll: %@ as type: %@", type: logType, args: msg, type.rawValue)
doubleEncodeUTF8(&msg)
let strMsg = "\(type.rawValue)\(msg)"
postWait.append(strMsg)
for data in datas ?? [] {
if case let .Right(bin) = createBinaryDataForSend(data) {
postWait.append(bin)
}
}
if !waitingForPost {
flushWaitingForPost()
}
}
func stopPolling() {
invalidated = true
session.finishTasksAndInvalidate()
}
}
// WebSocket methods
extension SocketEngine {
/// Send message on WebSockets
/// Only call on emitQueue
private func sendWebSocketMessage(str: String, withType type: SocketEnginePacketType,
datas:[NSData]? = nil) {
Logger.log("Sending ws: %@ as type: %@", type: logType, args: str, type.rawValue)
ws?.writeString("\(type.rawValue)\(str)")
for data in datas ?? [] {
if case let Either.Left(bin) = createBinaryDataForSend(data) {
ws?.writeData(bin)
}
}
}
// Delagate methods
public func websocketDidConnect(socket:WebSocket) {
websocketConnected = true
if !forceWebsockets {
probing = true
probeWebSocket()
@ -670,37 +676,37 @@ public final class SocketEngine: NSObject, SocketEngineSpec, WebSocketDelegate {
polling = false
}
}
public func websocketDidDisconnect(socket: WebSocket, error: NSError?) {
websocketConnected = false
probing = false
if closed {
client?.engineDidClose("Disconnect")
return
}
if websocket {
pingTimer?.invalidate()
connected = false
websocket = false
let reason = error?.localizedDescription ?? "Socket Disconnected"
if error != nil {
client?.didError(reason)
}
client?.engineDidClose(reason)
} else {
flushProbeWait()
}
}
public func websocketDidReceiveMessage(socket: WebSocket, text: String) {
parseEngineMessage(text, fromPolling: false)
}
public func websocketDidReceiveData(socket: WebSocket, data: NSData) {
parseEngineData(data)
}

View File

@ -210,6 +210,7 @@ struct SocketPacket {
mutating func fillInPlaceholders() {
for i in 0..<data.count {
if let str = data[i] as? String, num = str["~~(\\d)"].groups() {
// Fill in binary placeholder with data
data[i] = binary[Int(num[1])!]
} else if data[i] is NSDictionary || data[i] is NSArray {
data[i] = _fillInPlaceholders(data[i])