Merge branch 'development'

* development:
  SocketAckManager thread-safe acks removal
  thread-safe ack generation
  bump websocket
  Fix building on xcode8.3
  double encoding should be getting fixed, make the default false
  bump websocket
This commit is contained in:
Erik 2017-03-08 19:55:46 -05:00
commit b3fd4203d2
No known key found for this signature in database
GPG Key ID: 4930B7C5FBC1A69D
6 changed files with 148 additions and 112 deletions

View File

@ -83,7 +83,9 @@ class SocketEngineTest: XCTestCase {
expect.fulfill()
}
engine = SocketEngine(client: client, url: URL(string: "http://localhost")!, config: [.doubleEncodeUTF8(true)])
engine.parsePollingMessage("41:42[\"stringTest\",\"lïne one\\nlīne \\rtwo\"]")
waitForExpectations(timeout: 3, handler: nil)
}

View File

@ -51,6 +51,7 @@ private func ==(lhs: SocketAck, rhs: SocketAck) -> Bool {
struct SocketAckManager {
private var acks = Set<SocketAck>(minimumCapacity: 1)
private let ackSemaphore = DispatchSemaphore(value: 1)
mutating func addAck(_ ack: Int, callback: @escaping AckCallback) {
acks.insert(SocketAck(ack: ack, callback: callback))
@ -58,6 +59,8 @@ struct SocketAckManager {
/// Should be called on handle queue
mutating func executeAck(_ ack: Int, with items: [Any], onQueue: DispatchQueue) {
ackSemaphore.wait()
defer { ackSemaphore.signal() }
let ack = acks.remove(SocketAck(ack: ack))
onQueue.async() { ack?.callback(items) }
@ -65,8 +68,12 @@ struct SocketAckManager {
/// Should be called on handle queue
mutating func timeoutAck(_ ack: Int, onQueue: DispatchQueue) {
ackSemaphore.wait()
defer { ackSemaphore.signal() }
let ack = acks.remove(SocketAck(ack: ack))
onQueue.async() { ack?.callback?(["NO ACK"]) }
onQueue.async() {
ack?.callback?(["NO ACK"])
}
}
}

View File

@ -42,7 +42,7 @@ public final class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePoll
public private(set) var closed = false
public private(set) var connected = false
public private(set) var cookies: [HTTPCookie]?
public private(set) var doubleEncodeUTF8 = true
public private(set) var doubleEncodeUTF8 = false
public private(set) var extraHeaders: [String: String]?
public private(set) var fastUpgrade = false
public private(set) var forcePolling = false

View File

@ -54,6 +54,7 @@ public final class SocketIOClient : NSObject, SocketEngineClient, SocketParsable
private var handlers = [SocketEventHandler]()
private var reconnecting = false
private let ackSemaphore = DispatchSemaphore(value: 1)
private(set) var currentAck = -1
private(set) var handleQueue = DispatchQueue.main
private(set) var reconnectAttempts = -1
@ -161,10 +162,15 @@ public final class SocketIOClient : NSObject, SocketEngineClient, SocketParsable
}
}
private func createOnAck(_ items: [Any]) -> OnAckCallback {
private func nextAck() -> Int {
ackSemaphore.wait()
defer { ackSemaphore.signal() }
currentAck += 1
return currentAck
}
return OnAckCallback(ackNumber: currentAck, items: items, socket: self)
private func createOnAck(_ items: [Any]) -> OnAckCallback {
return OnAckCallback(ackNumber: nextAck(), items: items, socket: self)
}
func didConnect() {

View File

@ -25,7 +25,7 @@
public struct SocketIOClientConfiguration : ExpressibleByArrayLiteral, Collection, MutableCollection {
public typealias Element = SocketIOClientOption
public typealias Index = Array<SocketIOClientOption>.Index
public typealias Generator = Array<SocketIOClientOption>.Generator
public typealias Generator = Array<SocketIOClientOption>.Iterator
public typealias SubSequence = Array<SocketIOClientOption>.SubSequence
private var backingArray = [SocketIOClientOption]()

View File

@ -180,12 +180,11 @@ open class WebSocket : NSObject, StreamDelegate {
/**
Connect to the WebSocket server on a background thread.
*/
public func connect() {
open func connect() {
guard !isConnecting else { return }
didDisconnect = false
isConnecting = true
createHTTPRequest()
isConnecting = false
}
/**
@ -195,7 +194,8 @@ open class WebSocket : NSObject, StreamDelegate {
- Parameter forceTimeout: Maximum time to wait for the server to close the socket.
- Parameter closeCode: The code to send on disconnect. The default is the normal close code for cleanly disconnecting a webSocket.
*/
public func disconnect(forceTimeout: TimeInterval? = nil, closeCode: UInt16 = CloseCode.normal.rawValue) {
open func disconnect(forceTimeout: TimeInterval? = nil, closeCode: UInt16 = CloseCode.normal.rawValue) {
guard isConnected else { return }
switch forceTimeout {
case .some(let seconds) where seconds > 0:
let milliseconds = Int(seconds * 1_000)
@ -217,7 +217,7 @@ open class WebSocket : NSObject, StreamDelegate {
- parameter string: The string to write.
- parameter completion: The (optional) completion handler.
*/
public func write(string: String, completion: (() -> ())? = nil) {
open func write(string: String, completion: (() -> ())? = nil) {
guard isConnected else { return }
dequeueWrite(string.data(using: String.Encoding.utf8)!, code: .textFrame, writeCompletion: completion)
}
@ -228,7 +228,7 @@ open class WebSocket : NSObject, StreamDelegate {
- parameter data: The data to write.
- parameter completion: The (optional) completion handler.
*/
public func write(data: Data, completion: (() -> ())? = nil) {
open func write(data: Data, completion: (() -> ())? = nil) {
guard isConnected else { return }
dequeueWrite(data, code: .binaryFrame, writeCompletion: completion)
}
@ -237,7 +237,7 @@ open class WebSocket : NSObject, StreamDelegate {
Write a ping to the websocket. This sends it as a control frame.
Yodel a sound to the planet. This sends it as an astroid. http://youtu.be/Eu5ZJELRiJ8?t=42s
*/
public func write(ping: Data, completion: (() -> ())? = nil) {
open func write(ping: Data, completion: (() -> ())? = nil) {
guard isConnected else { return }
dequeueWrite(ping, code: .ping, writeCompletion: completion)
}
@ -306,6 +306,9 @@ open class WebSocket : NSObject, StreamDelegate {
private func initStreamsWithData(_ data: Data, _ port: Int) {
//higher level API we will cut over to at some point
//NSStream.getStreamsToHostWithName(url.host, port: url.port.integerValue, inputStream: &inputStream, outputStream: &outputStream)
// Disconnect and clean up any existing streams before setting up a new pair
disconnectStream(nil, runDelegate: false)
var readStream: Unmanaged<CFReadStream>?
var writeStream: Unmanaged<CFWriteStream>?
let h = url.host! as NSString
@ -359,9 +362,12 @@ open class WebSocket : NSObject, StreamDelegate {
let bytes = UnsafeRawPointer((data as NSData).bytes).assumingMemoryBound(to: UInt8.self)
var out = timeout * 1_000_000 // wait 5 seconds before giving up
writeQueue.addOperation { [weak self] in
while !outStream.hasSpaceAvailable {
let operation = BlockOperation()
operation.addExecutionBlock { [weak self, weak operation] in
guard let sOperation = operation else { return }
while !outStream.hasSpaceAvailable && !sOperation.isCancelled {
usleep(100) // wait until the socket is ready
guard !sOperation.isCancelled else { return }
out -= 100
if out < 0 {
self?.cleanupStream()
@ -371,14 +377,16 @@ open class WebSocket : NSObject, StreamDelegate {
return // disconnectStream will be called.
}
}
guard !sOperation.isCancelled else { return }
outStream.write(bytes, maxLength: data.count)
}
writeQueue.addOperation(operation)
}
/**
Delegate for the stream methods. Processes incoming bytes
*/
public func stream(_ aStream: Stream, handle eventCode: Stream.Event) {
open func stream(_ aStream: Stream, handle eventCode: Stream.Event) {
if let sec = security, !certValidated && [.hasBytesAvailable, .hasSpaceAvailable].contains(eventCode) {
let trust = aStream.property(forKey: kCFStreamPropertySSLPeerTrust as Stream.PropertyKey) as! SecTrust
let domain = aStream.property(forKey: kCFStreamSSLPeerName as Stream.PropertyKey) as? String
@ -404,15 +412,18 @@ open class WebSocket : NSObject, StreamDelegate {
/**
Disconnect the stream object and notifies the delegate.
*/
private func disconnectStream(_ error: NSError?) {
private func disconnectStream(_ error: NSError?, runDelegate: Bool = true) {
if error == nil {
writeQueue.waitUntilAllOperationsAreFinished()
} else {
writeQueue.cancelAllOperations()
}
cleanupStream()
connected = false
if runDelegate {
doDisconnect(error)
}
}
/**
cleanup the streams.
@ -430,6 +441,7 @@ open class WebSocket : NSObject, StreamDelegate {
}
outputStream = nil
inputStream = nil
fragBuffer = nil
}
/**
@ -455,13 +467,14 @@ open class WebSocket : NSObject, StreamDelegate {
*/
private func dequeueInput() {
while !inputQueue.isEmpty {
autoreleasepool {
let data = inputQueue[0]
var work = data
if let fragBuffer = fragBuffer {
var combine = NSData(data: fragBuffer) as Data
if let buffer = fragBuffer {
var combine = NSData(data: buffer) as Data
combine.append(data)
work = combine
self.fragBuffer = nil
fragBuffer = nil
}
let buffer = UnsafeRawPointer((work as NSData).bytes).assumingMemoryBound(to: UInt8.self)
let length = work.count
@ -473,6 +486,7 @@ open class WebSocket : NSObject, StreamDelegate {
inputQueue = inputQueue.filter{ $0 != data }
}
}
}
/**
Handle checking the inital connection status
@ -481,14 +495,7 @@ open class WebSocket : NSObject, StreamDelegate {
let code = processHTTP(buffer, bufferLen: bufferLen)
switch code {
case 0:
connected = true
guard canDispatch else {return}
callbackQueue.async { [weak self] in
guard let s = self else { return }
s.onConnect?()
s.delegate?.websocketDidConnect(socket: s)
s.notificationCenter.post(name: NSNotification.Name(WebsocketDidConnectNotification), object: self)
}
break
case -1:
fragBuffer = Data(bytes: buffer, count: bufferLen)
break // do nothing, we are going to collect more data
@ -520,6 +527,17 @@ open class WebSocket : NSObject, StreamDelegate {
if code != 0 {
return code
}
isConnecting = false
connected = true
didDisconnect = false
if canDispatch {
callbackQueue.async { [weak self] in
guard let s = self else { return }
s.onConnect?()
s.delegate?.websocketDidConnect(socket: s)
s.notificationCenter.post(name: NSNotification.Name(WebsocketDidConnectNotification), object: self)
}
}
totalSize += 1 //skip the last \n
let restSize = bufferLen - totalSize
if restSize > 0 {
@ -827,9 +845,11 @@ open class WebSocket : NSObject, StreamDelegate {
Used to write things to the stream
*/
private func dequeueWrite(_ data: Data, code: OpCode, writeCompletion: (() -> ())? = nil) {
writeQueue.addOperation { [weak self] in
let operation = BlockOperation()
operation.addExecutionBlock { [weak self, weak operation] in
//stream isn't ready, let's wait
guard let s = self else { return }
guard let sOperation = operation else { return }
var offset = 2
let dataLength = data.count
let frame = NSMutableData(capacity: dataLength + s.MaxFrameSize)
@ -856,7 +876,7 @@ open class WebSocket : NSObject, StreamDelegate {
offset += 1
}
var total = 0
while true {
while !sOperation.isCancelled {
guard let outStream = s.outputStream else { break }
let writeBuffer = UnsafeRawPointer(frame!.bytes+total).assumingMemoryBound(to: UInt8.self)
let len = outStream.write(writeBuffer, maxLength: offset-total)
@ -883,8 +903,8 @@ open class WebSocket : NSObject, StreamDelegate {
break
}
}
}
writeQueue.addOperation(operation)
}
/**
@ -893,6 +913,7 @@ open class WebSocket : NSObject, StreamDelegate {
private func doDisconnect(_ error: NSError?) {
guard !didDisconnect else { return }
didDisconnect = true
isConnecting = false
connected = false
guard canDispatch else {return}
callbackQueue.async { [weak self] in