From 8d22de967ea1ef2b188af55dc140e1a219814819 Mon Sep 17 00:00:00 2001 From: Erik Date: Thu, 28 Jan 2016 09:09:29 -0500 Subject: [PATCH] bump websocket version --- Source/WebSocket.swift | 218 +++++++++++++++++++++++++---------------- 1 file changed, 136 insertions(+), 82 deletions(-) diff --git a/Source/WebSocket.swift b/Source/WebSocket.swift index 49e585b..8c769e8 100644 --- a/Source/WebSocket.swift +++ b/Source/WebSocket.swift @@ -111,13 +111,14 @@ public class WebSocket : NSObject, NSStreamDelegate { public var selfSignedSSL = false private var security: SSLSecurity? public var enabledSSLCipherSuites: [SSLCipherSuite]? + public var origin: String? public var isConnected :Bool { return connected } + public var currentURL: NSURL {return url} private var url: NSURL private var inputStream: NSInputStream? private var outputStream: NSOutputStream? - private var isRunLoop = false private var connected = false private var isCreated = false private var writeQueue = NSOperationQueue() @@ -126,10 +127,21 @@ public class WebSocket : NSObject, NSStreamDelegate { private var fragBuffer: NSData? private var certValidated = false private var didDisconnect = false + private var readyToWrite = false + private let mutex = NSLock() + private var canDispatch: Bool { + mutex.lock() + let canWork = readyToWrite + mutex.unlock() + return canWork + } + //the shared processing queue used for all websocket + private static let sharedWorkQueue = dispatch_queue_create("com.vluxe.starscream.websocket", DISPATCH_QUEUE_SERIAL) //used for setting protocols. public init(url: NSURL, protocols: [String]? = nil) { self.url = url + self.origin = url.absoluteString writeQueue.maxConcurrentOperationCount = 1 optionalProtocols = protocols } @@ -137,15 +149,10 @@ public class WebSocket : NSObject, NSStreamDelegate { ///Connect to the websocket server on a background thread public func connect() { guard !isCreated else { return } - - dispatch_async(queue) { [weak self] in - self?.didDisconnect = false - } - dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,0)) { [weak self] in - self?.isCreated = true - self?.createHTTPRequest() - self?.isCreated = false - } + didDisconnect = false + isCreated = true + createHTTPRequest() + isCreated = false } /** @@ -175,20 +182,22 @@ public class WebSocket : NSObject, NSStreamDelegate { ///write a string to the websocket. This sends it as a text frame. public func writeString(str: String) { + guard isConnected else { return } dequeueWrite(str.dataUsingEncoding(NSUTF8StringEncoding)!, code: .TextFrame) } ///write binary data to the websocket. This sends it as a binary frame. public func writeData(data: NSData) { + guard isConnected else { return } dequeueWrite(data, code: .BinaryFrame) } //write a ping to the websocket. This sends it as a control frame. //yodel a sound to the planet. This sends it as an astroid. http://youtu.be/Eu5ZJELRiJ8?t=42s public func writePing(data: NSData) { + guard isConnected else { return } dequeueWrite(data, code: .Ping) } - //private methods below! //private method that starts the connection private func createHTTPRequest() { @@ -211,7 +220,9 @@ public class WebSocket : NSObject, NSStreamDelegate { } addHeader(urlRequest, key: headerWSVersionName, val: headerWSVersionValue) addHeader(urlRequest, key: headerWSKeyName, val: generateWebSocketKey()) - addHeader(urlRequest, key: headerOriginName, val: url.absoluteString) + if let origin = origin { + addHeader(urlRequest, key: headerOriginName, val: origin) + } addHeader(urlRequest, key: headerWSHostName, val: "\(url.host!):\(port!)") for (key,value) in headers { addHeader(urlRequest, key: key, val: value) @@ -221,10 +232,12 @@ public class WebSocket : NSObject, NSStreamDelegate { initStreamsWithData(serializedRequest, Int(port!)) } } + //Add a header to the CFHTTPMessage by using the NSString bridges to CFString private func addHeader(urlRequest: CFHTTPMessage, key: NSString, val: NSString) { CFHTTPMessageSetHeaderFieldValue(urlRequest, key, val) } + //generate a websocket key as needed in rfc private func generateWebSocketKey() -> String { var key = "" @@ -237,6 +250,7 @@ public class WebSocket : NSObject, NSStreamDelegate { let baseKey = data?.base64EncodedStringWithOptions(NSDataBase64EncodingOptions(rawValue: 0)) return baseKey! } + //Start the stream connection and write the data to the output stream private func initStreamsWithData(data: NSData, _ port: Int) { //higher level API we will cut over to at some point @@ -283,15 +297,28 @@ public class WebSocket : NSObject, NSStreamDelegate { } } } - isRunLoop = true - inStream.scheduleInRunLoop(NSRunLoop.currentRunLoop(), forMode: NSDefaultRunLoopMode) - outStream.scheduleInRunLoop(NSRunLoop.currentRunLoop(), forMode: NSDefaultRunLoopMode) + CFReadStreamSetDispatchQueue(inStream, WebSocket.sharedWorkQueue) + CFWriteStreamSetDispatchQueue(outStream, WebSocket.sharedWorkQueue) inStream.open() outStream.open() let bytes = UnsafePointer(data.bytes) - outStream.write(bytes, maxLength: data.length) - while(isRunLoop) { - NSRunLoop.currentRunLoop().runMode(NSDefaultRunLoopMode, beforeDate: NSDate.distantFuture() as NSDate) + var timeout = 5000000 //wait 5 seconds before giving up + writeQueue.addOperationWithBlock { [unowned self] in + while !outStream.hasSpaceAvailable { + usleep(100) //wait until the socket is ready + timeout -= 100 + if timeout < 0 { + self.disconnectStream(self.errorWithDetail("write wait timed out", code: 2)) + return + } else if let error = outStream.streamError { + self.disconnectStream(error) + return + } + } + self.mutex.lock() + self.readyToWrite = true + self.mutex.unlock() + outStream.write(bytes, maxLength: data.length) } } //delegate for the stream methods. Processes incoming bytes @@ -324,18 +351,16 @@ public class WebSocket : NSObject, NSStreamDelegate { private func disconnectStream(error: NSError?) { writeQueue.waitUntilAllOperationsAreFinished() if let stream = inputStream { - stream.removeFromRunLoop(NSRunLoop.currentRunLoop(), forMode: NSDefaultRunLoopMode) + CFReadStreamSetDispatchQueue(stream, nil) stream.close() } if let stream = outputStream { - stream.removeFromRunLoop(NSRunLoop.currentRunLoop(), forMode: NSDefaultRunLoopMode) + CFWriteStreamSetDispatchQueue(stream, nil) stream.close() } outputStream = nil - isRunLoop = false certValidated = false doDisconnect(error) - connected = false } ///handles the incoming bytes and sending them to the proper processing method @@ -345,24 +370,13 @@ public class WebSocket : NSObject, NSStreamDelegate { let length = inputStream!.read(buffer, maxLength: BUFFER_MAX) guard length > 0 else { return } - - if !connected { - connected = processHTTP(buffer, bufferLen: length) - if !connected { - let response = CFHTTPMessageCreateEmpty(kCFAllocatorDefault, false).takeRetainedValue() - CFHTTPMessageAppendBytes(response, buffer, length) - let code = CFHTTPMessageGetResponseStatusCode(response) - doDisconnect(errorWithDetail("Invalid HTTP upgrade", code: UInt16(code))) - } - } else { - var process = false - if inputQueue.count == 0 { - process = true - } - inputQueue.append(NSData(bytes: buffer, length: length)) - if process { - dequeueInput() - } + var process = false + if inputQueue.count == 0 { + process = true + } + inputQueue.append(NSData(bytes: buffer, length: length)) + if process { + dequeueInput() } } ///dequeue the incoming input so it is processed in order @@ -378,13 +392,37 @@ public class WebSocket : NSObject, NSStreamDelegate { self.fragBuffer = nil } let buffer = UnsafePointer(work.bytes) - processRawMessage(buffer, bufferLen: work.length) + let length = work.length + if !connected { + processTCPHandshake(buffer, bufferLen: length) + } else { + processRawMessage(buffer, bufferLen: length) + } inputQueue = inputQueue.filter{$0 != data} dequeueInput() } + //handle checking the inital connection status + private func processTCPHandshake(buffer: UnsafePointer, bufferLen: Int) { + let code = processHTTP(buffer, bufferLen: bufferLen) + switch code { + case 0: + connected = true + guard canDispatch else {return} + dispatch_async(queue) { [weak self] in + guard let s = self else { return } + s.onConnect?() + s.delegate?.websocketDidConnect(s) + } + case -1: + fragBuffer = NSData(bytes: buffer, length: bufferLen) + break //do nothing, we are going to collect more data + default: + doDisconnect(errorWithDetail("Invalid HTTP upgrade", code: UInt16(code))) + } + } ///Finds the HTTP Packet in the TCP stream, by looking for the CRLF. - private func processHTTP(buffer: UnsafePointer, bufferLen: Int) -> Bool { + private func processHTTP(buffer: UnsafePointer, bufferLen: Int) -> Int { let CRLFBytes = [UInt8(ascii: "\r"), UInt8(ascii: "\n"), UInt8(ascii: "\r"), UInt8(ascii: "\n")] var k = 0 var totalSize = 0 @@ -400,39 +438,37 @@ public class WebSocket : NSObject, NSStreamDelegate { } } if totalSize > 0 { - if validateResponse(buffer, bufferLen: totalSize) { - dispatch_async(queue) { [weak self] in - guard let s = self else { return } - s.onConnect?() - s.delegate?.websocketDidConnect(s) - } - totalSize += 1 //skip the last \n - let restSize = bufferLen - totalSize - if restSize > 0 { - processRawMessage((buffer+totalSize),bufferLen: restSize) - } - return true + let code = validateResponse(buffer, bufferLen: totalSize) + if code != 0 { + return code } + totalSize += 1 //skip the last \n + let restSize = bufferLen - totalSize + if restSize > 0 { + processRawMessage((buffer+totalSize),bufferLen: restSize) + } + return 0 //success } - return false + return -1 //was unable to find the full TCP header } ///validates the HTTP is a 101 as per the RFC spec - private func validateResponse(buffer: UnsafePointer, bufferLen: Int) -> Bool { + private func validateResponse(buffer: UnsafePointer, bufferLen: Int) -> Int { let response = CFHTTPMessageCreateEmpty(kCFAllocatorDefault, false).takeRetainedValue() CFHTTPMessageAppendBytes(response, buffer, bufferLen) - if CFHTTPMessageGetResponseStatusCode(response) != 101 { - return false + let code = CFHTTPMessageGetResponseStatusCode(response) + if code != 101 { + return code } if let cfHeaders = CFHTTPMessageCopyAllHeaderFields(response) { let headers = cfHeaders.takeRetainedValue() as NSDictionary if let acceptKey = headers[headerWSAcceptName] as? NSString { if acceptKey.length > 0 { - return true + return 0 } } } - return false + return -1 } ///read a 16 bit big endian value from a buffer @@ -563,12 +599,13 @@ public class WebSocket : NSObject, NSStreamDelegate { data = NSData(bytes: UnsafePointer((buffer+offset)), length: Int(len)) } if receivedOpcode == .Pong { - dispatch_async(queue) { [weak self] in - guard let s = self else { return } - s.onPong?() - s.pongDelegate?.websocketDidReceivePong(s) + if canDispatch { + dispatch_async(queue) { [weak self] in + guard let s = self else { return } + s.onPong?() + s.pongDelegate?.websocketDidReceivePong(s) + } } - let step = Int(offset+numericCast(len)) let extra = bufferLen-step if extra > 0 { @@ -652,18 +689,21 @@ public class WebSocket : NSObject, NSStreamDelegate { writeError(CloseCode.Encoding.rawValue) return false } - - dispatch_async(queue) { [weak self] in - guard let s = self else { return } - s.onText?(str! as String) - s.delegate?.websocketDidReceiveMessage(s, text: str! as String) + if canDispatch { + dispatch_async(queue) { [weak self] in + guard let s = self else { return } + s.onText?(str! as String) + s.delegate?.websocketDidReceiveMessage(s, text: str! as String) + } } } else if response.code == .BinaryFrame { - let data = response.buffer! //local copy so it is perverse for writing - dispatch_async(queue) { [weak self] in - guard let s = self else { return } - s.onData?(data) - s.delegate?.websocketDidReceiveData(s, data: data) + if canDispatch { + let data = response.buffer! //local copy so it is perverse for writing + dispatch_async(queue) { [weak self] in + guard let s = self else { return } + s.onData?(data) + s.delegate?.websocketDidReceiveData(s, data: data) + } } } readStack.removeLast() @@ -688,8 +728,6 @@ public class WebSocket : NSObject, NSStreamDelegate { } ///used to write things to the stream private func dequeueWrite(data: NSData, code: OpCode) { - guard isConnected else { return } - writeQueue.addOperationWithBlock { [weak self] in //stream isn't ready, let's wait guard let s = self else { return } @@ -721,9 +759,6 @@ public class WebSocket : NSObject, NSStreamDelegate { } var total = 0 while true { - if !s.isConnected { - break - } guard let outStream = s.outputStream else { break } let writeBuffer = UnsafePointer(frame!.bytes+total) let len = outStream.write(writeBuffer, maxLength: offset-total) @@ -751,15 +786,34 @@ public class WebSocket : NSObject, NSStreamDelegate { ///used to preform the disconnect delegate private func doDisconnect(error: NSError?) { guard !didDisconnect else { return } - + didDisconnect = true + connected = false + guard canDispatch else {return} dispatch_async(queue) { [weak self] in guard let s = self else { return } - s.didDisconnect = true s.onDisconnect?(error) s.delegate?.websocketDidDisconnect(s, error: error) } } + deinit { + mutex.lock() + readyToWrite = false + mutex.unlock() + outputStream?.delegate = nil + inputStream?.delegate = nil + if let stream = inputStream { + CFReadStreamSetDispatchQueue(stream, nil) + stream.close() + } + if let stream = outputStream { + CFWriteStreamSetDispatchQueue(stream, nil) + stream.close() + } + outputStream = nil + inputStream = nil + } + } private class SSLCert {