bump websocket

This commit is contained in:
Erik 2017-02-12 12:12:33 -05:00
parent 1616a58bd7
commit cce88ae3eb
No known key found for this signature in database
GPG Key ID: 4930B7C5FBC1A69D

View File

@ -193,8 +193,9 @@ open class WebSocket : NSObject, StreamDelegate {
If you supply a zero (or negative) `forceTimeout`, I immediately close the socket (without sending a Close control frame) and notify my delegate. If you supply a zero (or negative) `forceTimeout`, I immediately close the socket (without sending a Close control frame) and notify my delegate.
- Parameter forceTimeout: Maximum time to wait for the server to close the socket. - Parameter forceTimeout: Maximum time to wait for the server to close the socket.
- Parameter closeCode: The code to send on disconnect. The default is the normal close code for cleanly disconnecting a webSocket. - Parameter closeCode: The code to send on disconnect. The default is the normal close code for cleanly disconnecting a webSocket.
*/ */
open func disconnect(forceTimeout: TimeInterval? = nil, closeCode: UInt16 = CloseCode.normal.rawValue) { open func disconnect(forceTimeout: TimeInterval? = nil, closeCode: UInt16 = CloseCode.normal.rawValue) {
guard isConnected else { return }
switch forceTimeout { switch forceTimeout {
case .some(let seconds) where seconds > 0: case .some(let seconds) where seconds > 0:
let milliseconds = Int(seconds * 1_000) let milliseconds = Int(seconds * 1_000)
@ -306,7 +307,7 @@ open class WebSocket : NSObject, StreamDelegate {
//higher level API we will cut over to at some point //higher level API we will cut over to at some point
//NSStream.getStreamsToHostWithName(url.host, port: url.port.integerValue, inputStream: &inputStream, outputStream: &outputStream) //NSStream.getStreamsToHostWithName(url.host, port: url.port.integerValue, inputStream: &inputStream, outputStream: &outputStream)
// Disconnect and clean up any existing streams before setting up a new pair // Disconnect and clean up any existing streams before setting up a new pair
disconnectStream(nil) disconnectStream(nil, runDelegate: false)
var readStream: Unmanaged<CFReadStream>? var readStream: Unmanaged<CFReadStream>?
var writeStream: Unmanaged<CFWriteStream>? var writeStream: Unmanaged<CFWriteStream>?
@ -361,9 +362,12 @@ open class WebSocket : NSObject, StreamDelegate {
let bytes = UnsafeRawPointer((data as NSData).bytes).assumingMemoryBound(to: UInt8.self) let bytes = UnsafeRawPointer((data as NSData).bytes).assumingMemoryBound(to: UInt8.self)
var out = timeout * 1_000_000 // wait 5 seconds before giving up var out = timeout * 1_000_000 // wait 5 seconds before giving up
writeQueue.addOperation { [weak self] in let operation = BlockOperation()
while !outStream.hasSpaceAvailable { operation.addExecutionBlock { [weak self, weak operation] in
guard let sOperation = operation else { return }
while !outStream.hasSpaceAvailable && !sOperation.isCancelled {
usleep(100) // wait until the socket is ready usleep(100) // wait until the socket is ready
guard !sOperation.isCancelled else { return }
out -= 100 out -= 100
if out < 0 { if out < 0 {
self?.cleanupStream() self?.cleanupStream()
@ -373,8 +377,10 @@ open class WebSocket : NSObject, StreamDelegate {
return // disconnectStream will be called. return // disconnectStream will be called.
} }
} }
guard !sOperation.isCancelled else { return }
outStream.write(bytes, maxLength: data.count) outStream.write(bytes, maxLength: data.count)
} }
writeQueue.addOperation(operation)
} }
/** /**
@ -406,14 +412,17 @@ open class WebSocket : NSObject, StreamDelegate {
/** /**
Disconnect the stream object and notifies the delegate. Disconnect the stream object and notifies the delegate.
*/ */
private func disconnectStream(_ error: NSError?) { private func disconnectStream(_ error: NSError?, runDelegate: Bool = true) {
if error == nil { if error == nil {
writeQueue.waitUntilAllOperationsAreFinished() writeQueue.waitUntilAllOperationsAreFinished()
} else { } else {
writeQueue.cancelAllOperations() writeQueue.cancelAllOperations()
} }
cleanupStream() cleanupStream()
doDisconnect(error) connected = false
if runDelegate {
doDisconnect(error)
}
} }
/** /**
@ -432,6 +441,7 @@ open class WebSocket : NSObject, StreamDelegate {
} }
outputStream = nil outputStream = nil
inputStream = nil inputStream = nil
fragBuffer = nil
} }
/** /**
@ -460,11 +470,11 @@ open class WebSocket : NSObject, StreamDelegate {
autoreleasepool { autoreleasepool {
let data = inputQueue[0] let data = inputQueue[0]
var work = data var work = data
if let fragBuffer = fragBuffer { if let buffer = fragBuffer {
var combine = NSData(data: fragBuffer) as Data var combine = NSData(data: buffer) as Data
combine.append(data) combine.append(data)
work = combine work = combine
self.fragBuffer = nil fragBuffer = nil
} }
let buffer = UnsafeRawPointer((work as NSData).bytes).assumingMemoryBound(to: UInt8.self) let buffer = UnsafeRawPointer((work as NSData).bytes).assumingMemoryBound(to: UInt8.self)
let length = work.count let length = work.count
@ -485,18 +495,10 @@ open class WebSocket : NSObject, StreamDelegate {
let code = processHTTP(buffer, bufferLen: bufferLen) let code = processHTTP(buffer, bufferLen: bufferLen)
switch code { switch code {
case 0: case 0:
isConnecting = false break
connected = true
guard canDispatch else {return}
callbackQueue.async { [weak self] in
guard let s = self else { return }
s.onConnect?()
s.delegate?.websocketDidConnect(socket: s)
s.notificationCenter.post(name: NSNotification.Name(WebsocketDidConnectNotification), object: self)
}
case -1: case -1:
fragBuffer = Data(bytes: buffer, count: bufferLen) fragBuffer = Data(bytes: buffer, count: bufferLen)
break // do nothing, we are going to collect more data break // do nothing, we are going to collect more data
default: default:
doDisconnect(errorWithDetail("Invalid HTTP upgrade", code: UInt16(code))) doDisconnect(errorWithDetail("Invalid HTTP upgrade", code: UInt16(code)))
} }
@ -525,6 +527,17 @@ open class WebSocket : NSObject, StreamDelegate {
if code != 0 { if code != 0 {
return code return code
} }
isConnecting = false
connected = true
didDisconnect = false
if canDispatch {
callbackQueue.async { [weak self] in
guard let s = self else { return }
s.onConnect?()
s.delegate?.websocketDidConnect(socket: s)
s.notificationCenter.post(name: NSNotification.Name(WebsocketDidConnectNotification), object: self)
}
}
totalSize += 1 //skip the last \n totalSize += 1 //skip the last \n
let restSize = bufferLen - totalSize let restSize = bufferLen - totalSize
if restSize > 0 { if restSize > 0 {
@ -628,10 +641,10 @@ open class WebSocket : NSObject, StreamDelegate {
let isControlFrame = (receivedOpcode == .connectionClose || receivedOpcode == .ping) let isControlFrame = (receivedOpcode == .connectionClose || receivedOpcode == .ping)
if !isControlFrame && (receivedOpcode != .binaryFrame && receivedOpcode != .continueFrame && if !isControlFrame && (receivedOpcode != .binaryFrame && receivedOpcode != .continueFrame &&
receivedOpcode != .textFrame && receivedOpcode != .pong) { receivedOpcode != .textFrame && receivedOpcode != .pong) {
let errCode = CloseCode.protocolError.rawValue let errCode = CloseCode.protocolError.rawValue
doDisconnect(errorWithDetail("unknown opcode: \(receivedOpcode)", code: errCode)) doDisconnect(errorWithDetail("unknown opcode: \(receivedOpcode)", code: errCode))
writeError(errCode) writeError(errCode)
return emptyBuffer return emptyBuffer
} }
if isControlFrame && isFin == 0 { if isControlFrame && isFin == 0 {
let errCode = CloseCode.protocolError.rawValue let errCode = CloseCode.protocolError.rawValue
@ -723,7 +736,7 @@ open class WebSocket : NSObject, StreamDelegate {
if receivedOpcode == .continueFrame { if receivedOpcode == .continueFrame {
let errCode = CloseCode.protocolError.rawValue let errCode = CloseCode.protocolError.rawValue
doDisconnect(errorWithDetail("first frame can't be a continue frame", doDisconnect(errorWithDetail("first frame can't be a continue frame",
code: errCode)) code: errCode))
writeError(errCode) writeError(errCode)
return emptyBuffer return emptyBuffer
} }
@ -738,7 +751,7 @@ open class WebSocket : NSObject, StreamDelegate {
} else { } else {
let errCode = CloseCode.protocolError.rawValue let errCode = CloseCode.protocolError.rawValue
doDisconnect(errorWithDetail("second and beyond of fragment message must be a continue frame", doDisconnect(errorWithDetail("second and beyond of fragment message must be a continue frame",
code: errCode)) code: errCode))
writeError(errCode) writeError(errCode)
return emptyBuffer return emptyBuffer
} }
@ -832,9 +845,11 @@ open class WebSocket : NSObject, StreamDelegate {
Used to write things to the stream Used to write things to the stream
*/ */
private func dequeueWrite(_ data: Data, code: OpCode, writeCompletion: (() -> ())? = nil) { private func dequeueWrite(_ data: Data, code: OpCode, writeCompletion: (() -> ())? = nil) {
writeQueue.addOperation { [weak self] in let operation = BlockOperation()
operation.addExecutionBlock { [weak self, weak operation] in
//stream isn't ready, let's wait //stream isn't ready, let's wait
guard let s = self else { return } guard let s = self else { return }
guard let sOperation = operation else { return }
var offset = 2 var offset = 2
let dataLength = data.count let dataLength = data.count
let frame = NSMutableData(capacity: dataLength + s.MaxFrameSize) let frame = NSMutableData(capacity: dataLength + s.MaxFrameSize)
@ -861,7 +876,7 @@ open class WebSocket : NSObject, StreamDelegate {
offset += 1 offset += 1
} }
var total = 0 var total = 0
while true { while !sOperation.isCancelled {
guard let outStream = s.outputStream else { break } guard let outStream = s.outputStream else { break }
let writeBuffer = UnsafeRawPointer(frame!.bytes+total).assumingMemoryBound(to: UInt8.self) let writeBuffer = UnsafeRawPointer(frame!.bytes+total).assumingMemoryBound(to: UInt8.self)
let len = outStream.write(writeBuffer, maxLength: offset-total) let len = outStream.write(writeBuffer, maxLength: offset-total)
@ -888,8 +903,8 @@ open class WebSocket : NSObject, StreamDelegate {
break break
} }
} }
} }
writeQueue.addOperation(operation)
} }
/** /**