From ed049e888d5d83c784540c1980a4d259614a823c Mon Sep 17 00:00:00 2001 From: Erik Date: Wed, 3 May 2017 22:48:25 -0400 Subject: [PATCH] Make engine single queued Fix polling allow building of refactor branch remove self reference Some refactoring --- .travis.yml | 1 + Source/SocketEngine.swift | 113 +++++++++++++++------------- Source/SocketEnginePollable.swift | 119 ++++++++++++++---------------- Source/SocketEngineSpec.swift | 4 +- 4 files changed, 119 insertions(+), 118 deletions(-) diff --git a/.travis.yml b/.travis.yml index 1c99082..cd54215 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,6 +6,7 @@ branches: only: - master - development + - refactor-engine before_install: - brew update - brew outdated xctool || brew upgrade xctool diff --git a/Source/SocketEngine.swift b/Source/SocketEngine.swift index 02b1d81..cd1e8d0 100644 --- a/Source/SocketEngine.swift +++ b/Source/SocketEngine.swift @@ -22,12 +22,11 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +import Dispatch import Foundation public final class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePollable, SocketEngineWebsocket { - public let emitQueue = DispatchQueue(label: "com.socketio.engineEmitQueue", attributes: []) - public let handleQueue = DispatchQueue(label: "com.socketio.engineHandleQueue", attributes: []) - public let parseQueue = DispatchQueue(label: "com.socketio.engineParseQueue", attributes: []) + public let engineQueue = DispatchQueue(label: "com.socketio.engineHandleQueue", attributes: []) public var connectParams: [String: Any]? { didSet { @@ -101,7 +100,7 @@ public final class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePoll forceWebsockets = force case let .path(path): socketPath = path - + if !socketPath.hasSuffix("/") { socketPath += "/" } @@ -119,12 +118,12 @@ public final class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePoll } super.init() - + sessionDelegate = sessionDelegate ?? self - + (urlPolling, urlWebSocket) = createURLs() } - + public convenience init(client: SocketEngineClient, url: URL, options: NSDictionary?) { self.init(client: client, url: url, config: options?.toSocketConfiguration() ?? []) } @@ -139,7 +138,7 @@ public final class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePoll do { let dict = try msg.toNSDictionary() guard let error = dict["message"] as? String else { return } - + /* 0: Unknown transport 1: Unknown sid @@ -155,7 +154,7 @@ public final class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePoll private func handleBase64(message: String) { // binary in base64 string let noPrefix = message[message.index(message.startIndex, offsetBy: 2).. pongsMissedMax { client?.engineDidClose(reason: "Ping timeout") - + return } guard let pingInterval = pingInterval else { return } - + pongsMissed += 1 write("", withType: .ping, withData: []) - - let time = DispatchTime.now() + Double(Int64(pingInterval * Double(NSEC_PER_SEC))) / Double(NSEC_PER_SEC) - DispatchQueue.main.asyncAfter(deadline: time) {[weak self] in self?.sendPing() } + + engineQueue.asyncAfter(deadline: DispatchTime.now() + Double(pingInterval)) {[weak self] in self?.sendPing() } } // Moves from long-polling to websockets @@ -501,16 +508,16 @@ public final class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePoll /// Write a message, independent of transport. public func write(_ msg: String, withType type: SocketEnginePacketType, withData data: [Data]) { - emitQueue.async { + engineQueue.async { guard self.connected else { return } if self.websocket { DefaultSocketLogger.Logger.log("Writing ws: %@ has data: %@", - type: self.logType, args: msg, data.count != 0) + type: self.logType, args: msg, data.count != 0) self.sendWebSocketMessage(msg, withType: type, withData: data) } else if !self.probing { DefaultSocketLogger.Logger.log("Writing poll: %@ has data: %@", - type: self.logType, args: msg, data.count != 0) + type: self.logType, args: msg, data.count != 0) self.sendPollMessage(msg, withType: type, withData: data) } else { self.probeWait.append((msg, type, data)) @@ -535,7 +542,7 @@ public final class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePoll if closed { client?.engineDidClose(reason: "Disconnect") - + return } @@ -557,7 +564,7 @@ public final class SocketEngine : NSObject, URLSessionDelegate, SocketEnginePoll extension SocketEngine { public func URLSession(session: URLSession, didBecomeInvalidWithError error: NSError?) { DefaultSocketLogger.Logger.error("Engine URLSession became invalid", type: "SocketEngine") - + didError(reason: "Engine URLSession became invalid") } } diff --git a/Source/SocketEnginePollable.swift b/Source/SocketEnginePollable.swift index b050975..4bb4905 100644 --- a/Source/SocketEnginePollable.swift +++ b/Source/SocketEnginePollable.swift @@ -27,7 +27,7 @@ import Foundation /// Protocol that is used to implement socket.io polling support public protocol SocketEnginePollable : SocketEngineSpec { var invalidated: Bool { get } - /// Holds strings waiting to be sent over polling. + /// Holds strings waiting to be sent over polling. /// You shouldn't need to mess with this. var postWait: [String] { get set } var session: URLSession? { get } @@ -37,7 +37,7 @@ public protocol SocketEnginePollable : SocketEngineSpec { /// Because socket.io doesn't let you send two post request at the same time /// we have to keep track if there's an outstanding post var waitingForPost: Bool { get set } - + func doPoll() func sendPollMessage(_ message: String, withType type: SocketEnginePacketType, withData datas: [Data]) func stopPolling() @@ -47,95 +47,94 @@ public protocol SocketEnginePollable : SocketEngineSpec { extension SocketEnginePollable { private func addHeaders(for req: URLRequest) -> URLRequest { var req = req - + if cookies != nil { let headers = HTTPCookie.requestHeaderFields(with: cookies!) req.allHTTPHeaderFields = headers } - + if extraHeaders != nil { for (headerName, value) in extraHeaders! { req.setValue(value, forHTTPHeaderField: headerName) } } - + return req } - + func createRequestForPostWithPostWait() -> URLRequest { defer { postWait.removeAll(keepingCapacity: true) } var postStr = "" - + for packet in postWait { let len = packet.characters.count - + postStr += "\(len):\(packet)" } - + DefaultSocketLogger.Logger.log("Created POST string: %@", type: "SocketEnginePolling", args: postStr) - + var req = URLRequest(url: urlPollingWithSid) let postData = postStr.data(using: .utf8, allowLossyConversion: false)! - + req = addHeaders(for: req) - + req.httpMethod = "POST" req.setValue("text/plain; charset=UTF-8", forHTTPHeaderField: "Content-Type") req.httpBody = postData req.setValue(String(postData.count), forHTTPHeaderField: "Content-Length") - - return req as URLRequest + + return req } - + public func doPoll() { if websocket || waitingForPoll || !connected || closed { return } - - waitingForPoll = true - + var req = URLRequest(url: urlPollingWithSid) - req = addHeaders(for: req) + doLongPoll(for: req ) } - + func doRequest(for req: URLRequest, callbackWith callback: @escaping (Data?, URLResponse?, Error?) -> Void) { if !polling || closed || invalidated || fastUpgrade { return } - - DefaultSocketLogger.Logger.log("Doing polling request", type: "SocketEnginePolling") - + + DefaultSocketLogger.Logger.log("Doing polling %@ %@", type: "SocketEnginePolling", + args: req.httpMethod ?? "", req) + session?.dataTask(with: req, completionHandler: callback).resume() } - + func doLongPoll(for req: URLRequest) { + waitingForPoll = true + doRequest(for: req) {[weak self] data, res, err in guard let this = self, this.polling else { return } - + if err != nil || data == nil { DefaultSocketLogger.Logger.error(err?.localizedDescription ?? "Error", type: "SocketEnginePolling") - + if this.polling { this.didError(reason: err?.localizedDescription ?? "Error") } - + return } - + DefaultSocketLogger.Logger.log("Got polling response", type: "SocketEnginePolling") - + if let str = String(data: data!, encoding: String.Encoding.utf8) { - this.parseQueue.async { - this.parsePollingMessage(str) - } + this.parsePollingMessage(str) } - + this.waitingForPoll = false - + if this.fastUpgrade { this.doFastUpgrade() } else if !this.closed && this.polling { @@ -143,7 +142,7 @@ extension SocketEnginePollable { } } } - + private func flushWaitingForPost() { if postWait.count == 0 || !connected { return @@ -151,79 +150,75 @@ extension SocketEnginePollable { flushWaitingForPostToWebSocket() return } - + let req = createRequestForPostWithPostWait() - + waitingForPost = true - + DefaultSocketLogger.Logger.log("POSTing", type: "SocketEnginePolling") - + doRequest(for: req) {[weak self] data, res, err in guard let this = self else { return } - + if err != nil { DefaultSocketLogger.Logger.error(err?.localizedDescription ?? "Error", type: "SocketEnginePolling") - + if this.polling { this.didError(reason: err?.localizedDescription ?? "Error") } - + return } - + this.waitingForPost = false - - this.emitQueue.async { - if !this.fastUpgrade { - this.flushWaitingForPost() - this.doPoll() - } + + if !this.fastUpgrade { + this.flushWaitingForPost() + this.doPoll() } } } - + func parsePollingMessage(_ str: String) { guard str.characters.count != 1 else { return } - + var reader = SocketStringReader(message: str) - + while reader.hasNext { if let n = Int(reader.readUntilOccurence(of: ":")) { - let str = reader.read(count: n) - - handleQueue.async { self.parseEngineMessage(str, fromPolling: true) } + parseEngineMessage(reader.read(count: n), fromPolling: true) } else { - handleQueue.async { self.parseEngineMessage(str, fromPolling: true) } + parseEngineMessage(str, fromPolling: true) break } } } - + /// Send polling message. /// Only call on emitQueue public func sendPollMessage(_ message: String, withType type: SocketEnginePacketType, withData datas: [Data]) { DefaultSocketLogger.Logger.log("Sending poll: %@ as type: %@", type: "SocketEnginePolling", args: message, type.rawValue) let fixedMessage: String - + if doubleEncodeUTF8 { fixedMessage = doubleEncodeUTF8(message) } else { fixedMessage = message } - + postWait.append(String(type.rawValue) + fixedMessage) - + for data in datas { if case let .right(bin) = createBinaryDataForSend(using: data) { postWait.append(bin) } } - + if !waitingForPost { flushWaitingForPost() } } - + public func stopPolling() { waitingForPoll = false waitingForPost = false diff --git a/Source/SocketEngineSpec.swift b/Source/SocketEngineSpec.swift index f862889..46a7947 100644 --- a/Source/SocketEngineSpec.swift +++ b/Source/SocketEngineSpec.swift @@ -32,15 +32,13 @@ import Foundation var connectParams: [String: Any]? { get set } var doubleEncodeUTF8: Bool { get } var cookies: [HTTPCookie]? { get } + var engineQueue: DispatchQueue { get } var extraHeaders: [String: String]? { get } var fastUpgrade: Bool { get } var forcePolling: Bool { get } var forceWebsockets: Bool { get } - var parseQueue: DispatchQueue { get } var polling: Bool { get } var probing: Bool { get } - var emitQueue: DispatchQueue { get } - var handleQueue: DispatchQueue { get } var sid: String { get } var socketPath: String { get } var urlPolling: URL { get }