diff --git a/Source/SocketEngine.swift b/Source/SocketEngine.swift index dcd350e..dfbdddd 100644 --- a/Source/SocketEngine.swift +++ b/Source/SocketEngine.swift @@ -115,6 +115,7 @@ public final class SocketEngine: NSObject, SocketEnginePollable, SocketEngineWeb deinit { DefaultSocketLogger.Logger.log("Engine is being deinit", type: logType) closed = true + ws?.disconnect() stopPolling() } diff --git a/Source/SocketEngineSpec.swift b/Source/SocketEngineSpec.swift index d589a18..078fcbb 100644 --- a/Source/SocketEngineSpec.swift +++ b/Source/SocketEngineSpec.swift @@ -61,7 +61,7 @@ import Foundation extension SocketEngineSpec { func createBinaryDataForSend(data: NSData) -> Either { - if websocket { + if websocket { var byteArray = [UInt8](count: 1, repeatedValue: 0x4) let mutData = NSMutableData(bytes: &byteArray, length: 1) diff --git a/Source/WebSocket.swift b/Source/WebSocket.swift index 06dd38e..49e585b 100644 --- a/Source/WebSocket.swift +++ b/Source/WebSocket.swift @@ -111,14 +111,13 @@ public class WebSocket : NSObject, NSStreamDelegate { public var selfSignedSSL = false private var security: SSLSecurity? public var enabledSSLCipherSuites: [SSLCipherSuite]? - public var origin: String? public var isConnected :Bool { return connected } - public var currentURL: NSURL {return url} private var url: NSURL private var inputStream: NSInputStream? private var outputStream: NSOutputStream? + private var isRunLoop = false private var connected = false private var isCreated = false private var writeQueue = NSOperationQueue() @@ -127,13 +126,10 @@ public class WebSocket : NSObject, NSStreamDelegate { private var fragBuffer: NSData? private var certValidated = false private var didDisconnect = false - //the shared processing queue used for all websocket - private static let sharedWorkQueue = dispatch_queue_create("com.vluxe.starscream.websocket", DISPATCH_QUEUE_SERIAL) //used for setting protocols. public init(url: NSURL, protocols: [String]? = nil) { self.url = url - self.origin = url.absoluteString writeQueue.maxConcurrentOperationCount = 1 optionalProtocols = protocols } @@ -141,10 +137,15 @@ public class WebSocket : NSObject, NSStreamDelegate { ///Connect to the websocket server on a background thread public func connect() { guard !isCreated else { return } - didDisconnect = false - isCreated = true - createHTTPRequest() - isCreated = false + + dispatch_async(queue) { [weak self] in + self?.didDisconnect = false + } + dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,0)) { [weak self] in + self?.isCreated = true + self?.createHTTPRequest() + self?.isCreated = false + } } /** @@ -174,22 +175,20 @@ public class WebSocket : NSObject, NSStreamDelegate { ///write a string to the websocket. This sends it as a text frame. public func writeString(str: String) { - guard isConnected else { return } dequeueWrite(str.dataUsingEncoding(NSUTF8StringEncoding)!, code: .TextFrame) } ///write binary data to the websocket. This sends it as a binary frame. public func writeData(data: NSData) { - guard isConnected else { return } dequeueWrite(data, code: .BinaryFrame) } //write a ping to the websocket. This sends it as a control frame. //yodel a sound to the planet. This sends it as an astroid. http://youtu.be/Eu5ZJELRiJ8?t=42s public func writePing(data: NSData) { - guard isConnected else { return } dequeueWrite(data, code: .Ping) } + //private methods below! //private method that starts the connection private func createHTTPRequest() { @@ -212,9 +211,7 @@ public class WebSocket : NSObject, NSStreamDelegate { } addHeader(urlRequest, key: headerWSVersionName, val: headerWSVersionValue) addHeader(urlRequest, key: headerWSKeyName, val: generateWebSocketKey()) - if let origin = origin { - addHeader(urlRequest, key: headerOriginName, val: origin) - } + addHeader(urlRequest, key: headerOriginName, val: url.absoluteString) addHeader(urlRequest, key: headerWSHostName, val: "\(url.host!):\(port!)") for (key,value) in headers { addHeader(urlRequest, key: key, val: value) @@ -224,12 +221,10 @@ public class WebSocket : NSObject, NSStreamDelegate { initStreamsWithData(serializedRequest, Int(port!)) } } - //Add a header to the CFHTTPMessage by using the NSString bridges to CFString private func addHeader(urlRequest: CFHTTPMessage, key: NSString, val: NSString) { CFHTTPMessageSetHeaderFieldValue(urlRequest, key, val) } - //generate a websocket key as needed in rfc private func generateWebSocketKey() -> String { var key = "" @@ -242,7 +237,6 @@ public class WebSocket : NSObject, NSStreamDelegate { let baseKey = data?.base64EncodedStringWithOptions(NSDataBase64EncodingOptions(rawValue: 0)) return baseKey! } - //Start the stream connection and write the data to the output stream private func initStreamsWithData(data: NSData, _ port: Int) { //higher level API we will cut over to at some point @@ -289,16 +283,15 @@ public class WebSocket : NSObject, NSStreamDelegate { } } } - CFReadStreamSetDispatchQueue(inStream, WebSocket.sharedWorkQueue) - CFWriteStreamSetDispatchQueue(outStream, WebSocket.sharedWorkQueue) + isRunLoop = true + inStream.scheduleInRunLoop(NSRunLoop.currentRunLoop(), forMode: NSDefaultRunLoopMode) + outStream.scheduleInRunLoop(NSRunLoop.currentRunLoop(), forMode: NSDefaultRunLoopMode) inStream.open() outStream.open() let bytes = UnsafePointer(data.bytes) - writeQueue.addOperationWithBlock { - while !outStream.hasSpaceAvailable { - usleep(100) //wait until the socket is ready - } - outStream.write(bytes, maxLength: data.length) + outStream.write(bytes, maxLength: data.length) + while(isRunLoop) { + NSRunLoop.currentRunLoop().runMode(NSDefaultRunLoopMode, beforeDate: NSDate.distantFuture() as NSDate) } } //delegate for the stream methods. Processes incoming bytes @@ -331,16 +324,18 @@ public class WebSocket : NSObject, NSStreamDelegate { private func disconnectStream(error: NSError?) { writeQueue.waitUntilAllOperationsAreFinished() if let stream = inputStream { - CFReadStreamSetDispatchQueue(stream, nil) + stream.removeFromRunLoop(NSRunLoop.currentRunLoop(), forMode: NSDefaultRunLoopMode) stream.close() } if let stream = outputStream { - CFWriteStreamSetDispatchQueue(stream, nil) + stream.removeFromRunLoop(NSRunLoop.currentRunLoop(), forMode: NSDefaultRunLoopMode) stream.close() } outputStream = nil + isRunLoop = false certValidated = false doDisconnect(error) + connected = false } ///handles the incoming bytes and sending them to the proper processing method @@ -350,13 +345,24 @@ public class WebSocket : NSObject, NSStreamDelegate { let length = inputStream!.read(buffer, maxLength: BUFFER_MAX) guard length > 0 else { return } - var process = false - if inputQueue.count == 0 { - process = true - } - inputQueue.append(NSData(bytes: buffer, length: length)) - if process { - dequeueInput() + + if !connected { + connected = processHTTP(buffer, bufferLen: length) + if !connected { + let response = CFHTTPMessageCreateEmpty(kCFAllocatorDefault, false).takeRetainedValue() + CFHTTPMessageAppendBytes(response, buffer, length) + let code = CFHTTPMessageGetResponseStatusCode(response) + doDisconnect(errorWithDetail("Invalid HTTP upgrade", code: UInt16(code))) + } + } else { + var process = false + if inputQueue.count == 0 { + process = true + } + inputQueue.append(NSData(bytes: buffer, length: length)) + if process { + dequeueInput() + } } } ///dequeue the incoming input so it is processed in order @@ -372,36 +378,13 @@ public class WebSocket : NSObject, NSStreamDelegate { self.fragBuffer = nil } let buffer = UnsafePointer(work.bytes) - let length = work.length - if !connected { - processTCPHandshake(buffer, bufferLen: length) - } else { - processRawMessage(buffer, bufferLen: length) - } + processRawMessage(buffer, bufferLen: work.length) inputQueue = inputQueue.filter{$0 != data} dequeueInput() } - //handle checking the inital connection status - private func processTCPHandshake(buffer: UnsafePointer, bufferLen: Int) { - let code = processHTTP(buffer, bufferLen: bufferLen) - switch code { - case 0: - connected = true - dispatch_async(queue) { [weak self] in - guard let s = self else { return } - s.onConnect?() - s.delegate?.websocketDidConnect(s) - } - case -1: - fragBuffer = NSData(bytes: buffer, length: bufferLen) - break //do nothing, we are going to collect more data - default: - doDisconnect(errorWithDetail("Invalid HTTP upgrade", code: UInt16(code))) - } - } ///Finds the HTTP Packet in the TCP stream, by looking for the CRLF. - private func processHTTP(buffer: UnsafePointer, bufferLen: Int) -> Int { + private func processHTTP(buffer: UnsafePointer, bufferLen: Int) -> Bool { let CRLFBytes = [UInt8(ascii: "\r"), UInt8(ascii: "\n"), UInt8(ascii: "\r"), UInt8(ascii: "\n")] var k = 0 var totalSize = 0 @@ -417,37 +400,39 @@ public class WebSocket : NSObject, NSStreamDelegate { } } if totalSize > 0 { - let code = validateResponse(buffer, bufferLen: totalSize) - if code != 0 { - return code + if validateResponse(buffer, bufferLen: totalSize) { + dispatch_async(queue) { [weak self] in + guard let s = self else { return } + s.onConnect?() + s.delegate?.websocketDidConnect(s) + } + totalSize += 1 //skip the last \n + let restSize = bufferLen - totalSize + if restSize > 0 { + processRawMessage((buffer+totalSize),bufferLen: restSize) + } + return true } - totalSize += 1 //skip the last \n - let restSize = bufferLen - totalSize - if restSize > 0 { - processRawMessage((buffer+totalSize),bufferLen: restSize) - } - return 0 //success } - return -1 //was unable to find the full TCP header + return false } ///validates the HTTP is a 101 as per the RFC spec - private func validateResponse(buffer: UnsafePointer, bufferLen: Int) -> Int { + private func validateResponse(buffer: UnsafePointer, bufferLen: Int) -> Bool { let response = CFHTTPMessageCreateEmpty(kCFAllocatorDefault, false).takeRetainedValue() CFHTTPMessageAppendBytes(response, buffer, bufferLen) - let code = CFHTTPMessageGetResponseStatusCode(response) - if code != 101 { - return code + if CFHTTPMessageGetResponseStatusCode(response) != 101 { + return false } if let cfHeaders = CFHTTPMessageCopyAllHeaderFields(response) { let headers = cfHeaders.takeRetainedValue() as NSDictionary if let acceptKey = headers[headerWSAcceptName] as? NSString { if acceptKey.length > 0 { - return 0 + return true } } } - return -1 + return false } ///read a 16 bit big endian value from a buffer @@ -703,6 +688,8 @@ public class WebSocket : NSObject, NSStreamDelegate { } ///used to write things to the stream private func dequeueWrite(data: NSData, code: OpCode) { + guard isConnected else { return } + writeQueue.addOperationWithBlock { [weak self] in //stream isn't ready, let's wait guard let s = self else { return } @@ -734,6 +721,9 @@ public class WebSocket : NSObject, NSStreamDelegate { } var total = 0 while true { + if !s.isConnected { + break + } guard let outStream = s.outputStream else { break } let writeBuffer = UnsafePointer(frame!.bytes+total) let len = outStream.write(writeBuffer, maxLength: offset-total) @@ -761,16 +751,17 @@ public class WebSocket : NSObject, NSStreamDelegate { ///used to preform the disconnect delegate private func doDisconnect(error: NSError?) { guard !didDisconnect else { return } - didDisconnect = true - connected = false + dispatch_async(queue) { [weak self] in guard let s = self else { return } + s.didDisconnect = true s.onDisconnect?(error) s.delegate?.websocketDidDisconnect(s, error: error) } } } + private class SSLCert { var certData: NSData? var key: SecKeyRef?