456 lines
15 KiB
Swift
456 lines
15 KiB
Swift
import Foundation
|
|
import Combine
|
|
#if canImport(SocketIO)
|
|
import SocketIO
|
|
#endif
|
|
|
|
final class SocketService {
|
|
static let shared = SocketService()
|
|
|
|
enum ConnectionState: Equatable {
|
|
case disconnected
|
|
case connecting
|
|
case connected
|
|
}
|
|
|
|
private let syncQueue = DispatchQueue(label: "org.yobble.socket.service")
|
|
private var currentToken: String?
|
|
private var currentAuthPayload: [String: Any] = [:]
|
|
|
|
private let connectionStateSubject = CurrentValueSubject<ConnectionState, Never>(.disconnected)
|
|
|
|
var connectionStatePublisher: AnyPublisher<ConnectionState, Never> {
|
|
connectionStateSubject
|
|
.removeDuplicates()
|
|
.eraseToAnyPublisher()
|
|
}
|
|
|
|
var newPrivateMessagePublisher: AnyPublisher<MessageItem, Never> {
|
|
privateMessageSubject
|
|
.receive(on: DispatchQueue.main)
|
|
.eraseToAnyPublisher()
|
|
}
|
|
|
|
var currentConnectionState: ConnectionState {
|
|
connectionStateSubject.value
|
|
}
|
|
|
|
private let heartbeatInterval: TimeInterval = 10
|
|
private let heartbeatTimeout: TimeInterval = 5
|
|
private let maxHeartbeatMissCount = 2
|
|
private let heartbeatEventName = AppConfig.SOCKET_HEARTBEAT_EVENT
|
|
|
|
#if canImport(SocketIO)
|
|
private var manager: SocketManager?
|
|
private var socket: SocketIOClient?
|
|
private var heartbeatTimer: DispatchSourceTimer?
|
|
private var heartbeatAckInFlight = false
|
|
private var lastHeartbeatSentAt: Date?
|
|
private var consecutiveHeartbeatMisses = 0
|
|
private var reconnectWorkItem: DispatchWorkItem?
|
|
private let privateMessageSubject = PassthroughSubject<MessageItem, Never>()
|
|
#endif
|
|
|
|
private init() {}
|
|
|
|
private func updateConnectionState(_ state: ConnectionState) {
|
|
let sendState: () -> Void = { [weak self] in
|
|
guard let self, self.connectionStateSubject.value != state else { return }
|
|
self.connectionStateSubject.send(state)
|
|
}
|
|
|
|
if Thread.isMainThread {
|
|
sendState()
|
|
} else {
|
|
DispatchQueue.main.async(execute: sendState)
|
|
}
|
|
}
|
|
|
|
func connectForCurrentUser() {
|
|
syncQueue.async { [weak self] in
|
|
guard let self else { return }
|
|
guard let token = self.resolveCurrentAccessToken() else {
|
|
if AppConfig.DEBUG { print("[SocketService] No access token available, disconnecting") }
|
|
self.currentToken = nil
|
|
self.disconnectInternal()
|
|
return
|
|
}
|
|
|
|
self.connectInternal(with: token)
|
|
}
|
|
}
|
|
|
|
func connect(withToken token: String) {
|
|
syncQueue.async { [weak self] in
|
|
self?.connectInternal(with: token)
|
|
}
|
|
}
|
|
|
|
func disconnect() {
|
|
syncQueue.async { [weak self] in
|
|
guard let self else { return }
|
|
self.currentToken = nil
|
|
self.disconnectInternal()
|
|
}
|
|
}
|
|
|
|
@discardableResult
|
|
private func refreshAuthTokenIfNeeded(disconnectIfMissing: Bool) -> Bool {
|
|
let storedToken = resolveCurrentAccessToken()
|
|
|
|
guard let token = storedToken, !token.isEmpty else {
|
|
guard disconnectIfMissing else { return false }
|
|
currentToken = nil
|
|
disconnectInternal()
|
|
return true
|
|
}
|
|
|
|
guard token != currentToken else { return false }
|
|
|
|
connectInternal(with: token)
|
|
return true
|
|
}
|
|
|
|
private func resolveCurrentAccessToken() -> String? {
|
|
guard
|
|
let login = UserDefaults.standard.string(forKey: "currentUser"),
|
|
!login.isEmpty
|
|
else {
|
|
return nil
|
|
}
|
|
|
|
return KeychainService.shared.get(forKey: "access_token", service: login)
|
|
}
|
|
|
|
private func connectInternal(with token: String) {
|
|
#if canImport(SocketIO)
|
|
if token == currentToken,
|
|
let socket,
|
|
socket.status == .connected || socket.status == .connecting {
|
|
if AppConfig.DEBUG { print("[SocketService] Already connected with current token") }
|
|
return
|
|
}
|
|
|
|
currentToken = token
|
|
currentAuthPayload = ["token": token]
|
|
setupSocket(with: token)
|
|
cancelScheduledReconnect()
|
|
updateConnectionState(.connecting)
|
|
socket?.connect(withPayload: currentAuthPayload)
|
|
startHeartbeat()
|
|
#else
|
|
updateConnectionState(.disconnected)
|
|
if AppConfig.DEBUG {
|
|
print("[SocketService] SocketIO framework not available; skipping connection")
|
|
}
|
|
#endif
|
|
}
|
|
|
|
#if canImport(SocketIO)
|
|
private func setupSocket(with token: String) {
|
|
guard let baseURL = URL(string: AppConfig.API_SERVER) else {
|
|
if AppConfig.DEBUG { print("[SocketService] Invalid socket URL: \(AppConfig.API_SERVER)") }
|
|
return
|
|
}
|
|
|
|
disconnectInternal()
|
|
|
|
let configuration: SocketIOClientConfiguration = [
|
|
.log(AppConfig.DEBUG),
|
|
.compress,
|
|
.secure(AppConfig.PROTOCOL.lowercased() == "https"),
|
|
.path(AppConfig.SOCKET_PATH),
|
|
.reconnects(true),
|
|
.reconnectWait(2),
|
|
.forceWebsockets(true),
|
|
.extraHeaders([
|
|
"Authorization": "Bearer \(token)",
|
|
"User-Agent": AppConfig.USER_AGENT
|
|
]),
|
|
.connectParams(["token": token])
|
|
]
|
|
|
|
let manager = SocketManager(socketURL: baseURL, config: configuration)
|
|
manager.handleQueue = syncQueue
|
|
let socket = manager.defaultSocket
|
|
|
|
if AppConfig.DEBUG {
|
|
socket.onAny { event in
|
|
print("[SocketService] onAny event=\(event.event) data=\(event.items ?? [])")
|
|
}
|
|
}
|
|
|
|
socket.on(clientEvent: .connect) { _, _ in
|
|
if AppConfig.DEBUG { print("[SocketService] Connected") }
|
|
self.handleHeartbeatSuccess()
|
|
}
|
|
|
|
socket.on(clientEvent: .statusChange) { data, _ in
|
|
guard let rawStatus = data.first as? SocketIOStatus else { return }
|
|
if rawStatus == .connected {
|
|
self.handleHeartbeatSuccess()
|
|
} else if rawStatus == .connecting {
|
|
self.updateConnectionState(.connecting)
|
|
} else {
|
|
self.updateConnectionState(.disconnected)
|
|
}
|
|
}
|
|
|
|
socket.on(clientEvent: .reconnect) { _, _ in
|
|
if AppConfig.DEBUG { print("[SocketService] Reconnecting") }
|
|
if self.refreshAuthTokenIfNeeded(disconnectIfMissing: true) { return }
|
|
self.updateConnectionState(.connecting)
|
|
}
|
|
|
|
socket.on(clientEvent: .reconnectAttempt) { data, _ in
|
|
if AppConfig.DEBUG { print("[SocketService] Reconnect attempt: \(data)") }
|
|
if self.refreshAuthTokenIfNeeded(disconnectIfMissing: true) { return }
|
|
self.updateConnectionState(.connecting)
|
|
}
|
|
|
|
socket.on(clientEvent: .disconnect) { data, _ in
|
|
if AppConfig.DEBUG { print("[SocketService] Disconnected: \(data)") }
|
|
self.updateConnectionState(.disconnected)
|
|
if self.currentToken != nil {
|
|
self.scheduleReconnect()
|
|
}
|
|
}
|
|
|
|
socket.on(clientEvent: .error) { data, _ in
|
|
if AppConfig.DEBUG { print("[SocketService] Error: \(data)") }
|
|
self.updateConnectionState(.disconnected)
|
|
if self.currentToken != nil {
|
|
self.scheduleReconnect()
|
|
}
|
|
}
|
|
|
|
socket.on("pong") { [weak self] _, _ in
|
|
self?.handleHeartbeatSuccess()
|
|
}
|
|
|
|
socket.on("message") { [weak self] data, _ in
|
|
self?.handleMessageEvent(data)
|
|
}
|
|
|
|
socket.on("chat_private:new_message") { [weak self] data, _ in
|
|
self?.handleNewPrivateMessage(data)
|
|
}
|
|
|
|
self.manager = manager
|
|
self.socket = socket
|
|
}
|
|
|
|
private func disconnectInternal() {
|
|
stopHeartbeat()
|
|
cancelScheduledReconnect()
|
|
socket?.disconnect()
|
|
manager?.disconnect()
|
|
socket = nil
|
|
manager = nil
|
|
updateConnectionState(.disconnected)
|
|
}
|
|
|
|
private func startHeartbeat() {
|
|
guard heartbeatInterval > 0 else { return }
|
|
stopHeartbeat()
|
|
let timer = DispatchSource.makeTimerSource(queue: syncQueue)
|
|
timer.schedule(deadline: .now() + heartbeatInterval, repeating: heartbeatInterval)
|
|
timer.setEventHandler { [weak self] in
|
|
guard let self else { return }
|
|
self.performHeartbeatCheck()
|
|
}
|
|
heartbeatTimer = timer
|
|
timer.resume()
|
|
}
|
|
|
|
private func stopHeartbeat() {
|
|
heartbeatTimer?.cancel()
|
|
heartbeatTimer = nil
|
|
heartbeatAckInFlight = false
|
|
consecutiveHeartbeatMisses = 0
|
|
lastHeartbeatSentAt = nil
|
|
}
|
|
|
|
private func performHeartbeatCheck() {
|
|
if refreshAuthTokenIfNeeded(disconnectIfMissing: true) { return }
|
|
|
|
guard let socket = socket else {
|
|
updateConnectionState(.disconnected)
|
|
return
|
|
}
|
|
|
|
switch socket.status {
|
|
case .connected:
|
|
sendHeartbeat(on: socket)
|
|
case .connecting:
|
|
updateConnectionState(.connecting)
|
|
case .disconnected, .notConnected:
|
|
updateConnectionState(.connecting)
|
|
if refreshAuthTokenIfNeeded(disconnectIfMissing: true) { return }
|
|
socket.connect(withPayload: currentAuthPayload)
|
|
@unknown default:
|
|
updateConnectionState(.connecting)
|
|
}
|
|
}
|
|
|
|
private func sendHeartbeat(on socket: SocketIOClient) {
|
|
if heartbeatAckInFlight {
|
|
if let lastSentAt = lastHeartbeatSentAt,
|
|
Date().timeIntervalSince(lastSentAt) >= heartbeatTimeout {
|
|
heartbeatAckInFlight = false
|
|
handleMissedHeartbeat(for: socket)
|
|
} else {
|
|
return
|
|
}
|
|
}
|
|
|
|
heartbeatAckInFlight = true
|
|
lastHeartbeatSentAt = Date()
|
|
|
|
socket.emit(heartbeatEventName, ["data": "ping"])
|
|
}
|
|
|
|
private func handleMessageEvent(_ data: [Any]) {
|
|
guard let payload = data.first else { return }
|
|
|
|
let messageText: String?
|
|
|
|
if let dictionary = payload as? [String: Any] {
|
|
if let nestedData = dictionary["data"] as? [String: Any],
|
|
let nestedMessage = nestedData["message"] as? String {
|
|
messageText = nestedMessage
|
|
} else {
|
|
messageText = dictionary["message"] as? String
|
|
}
|
|
} else if let stringValue = payload as? String {
|
|
messageText = stringValue
|
|
} else {
|
|
messageText = nil
|
|
}
|
|
|
|
guard let message = messageText?.trimmingCharacters(in: .whitespacesAndNewlines).lowercased() else {
|
|
return
|
|
}
|
|
|
|
if message == "pong" {
|
|
handleHeartbeatSuccess()
|
|
}
|
|
}
|
|
|
|
private func handleNewPrivateMessage(_ data: [Any]) {
|
|
guard let payload = data.first else { return }
|
|
|
|
let messageData: Data
|
|
if let dictionary = payload as? [String: Any],
|
|
JSONSerialization.isValidJSONObject(dictionary),
|
|
let json = try? JSONSerialization.data(withJSONObject: dictionary, options: []) {
|
|
messageData = json
|
|
} else if let string = payload as? String,
|
|
let data = string.data(using: .utf8) {
|
|
messageData = data
|
|
} else {
|
|
return
|
|
}
|
|
|
|
let decoder = JSONDecoder()
|
|
decoder.keyDecodingStrategy = .convertFromSnakeCase
|
|
decoder.dateDecodingStrategy = .custom(Self.decodeServerDate)
|
|
|
|
do {
|
|
let message = try decoder.decode(MessageItem.self, from: messageData)
|
|
DispatchQueue.main.async {
|
|
NotificationCenter.default.post(name: .socketDidReceivePrivateMessage, object: message)
|
|
self.privateMessageSubject.send(message)
|
|
NotificationCenter.default.post(name: .chatsShouldRefresh, object: nil)
|
|
}
|
|
} catch {
|
|
if AppConfig.DEBUG {
|
|
print("[SocketService] Failed to decode new message: \(error)")
|
|
if let payloadString = String(data: messageData, encoding: .utf8) {
|
|
print("[SocketService] payload=\(payloadString)")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private func handleHeartbeatSuccess() {
|
|
consecutiveHeartbeatMisses = 0
|
|
heartbeatAckInFlight = false
|
|
lastHeartbeatSentAt = nil
|
|
cancelScheduledReconnect()
|
|
updateConnectionState(.connected)
|
|
}
|
|
|
|
private func handleMissedHeartbeat(for socket: SocketIOClient) {
|
|
consecutiveHeartbeatMisses += 1
|
|
heartbeatAckInFlight = false
|
|
lastHeartbeatSentAt = nil
|
|
updateConnectionState(.connecting)
|
|
guard consecutiveHeartbeatMisses > maxHeartbeatMissCount else {
|
|
return
|
|
}
|
|
|
|
consecutiveHeartbeatMisses = 0
|
|
updateConnectionState(.connecting)
|
|
if refreshAuthTokenIfNeeded(disconnectIfMissing: true) { return }
|
|
socket.disconnect()
|
|
socket.connect(withPayload: currentAuthPayload)
|
|
}
|
|
|
|
private func scheduleReconnect(after delay: TimeInterval = 5) {
|
|
cancelScheduledReconnect()
|
|
let workItem = DispatchWorkItem { [weak self] in
|
|
guard let self else { return }
|
|
if self.refreshAuthTokenIfNeeded(disconnectIfMissing: true) { return }
|
|
|
|
if let socket = self.socket {
|
|
self.updateConnectionState(.connecting)
|
|
socket.connect(withPayload: self.currentAuthPayload)
|
|
} else {
|
|
self.connectForCurrentUser()
|
|
}
|
|
}
|
|
reconnectWorkItem = workItem
|
|
syncQueue.asyncAfter(deadline: .now() + delay, execute: workItem)
|
|
}
|
|
|
|
private func cancelScheduledReconnect() {
|
|
reconnectWorkItem?.cancel()
|
|
reconnectWorkItem = nil
|
|
}
|
|
|
|
private static func decodeServerDate(from decoder: Decoder) throws -> Date {
|
|
let container = try decoder.singleValueContainer()
|
|
let string = try container.decode(String.self)
|
|
if let date = iso8601WithFractionalSeconds.date(from: string) {
|
|
return date
|
|
}
|
|
if let date = iso8601Simple.date(from: string) {
|
|
return date
|
|
}
|
|
throw DecodingError.dataCorruptedError(
|
|
in: container,
|
|
debugDescription: "Unable to decode date: \(string)"
|
|
)
|
|
}
|
|
|
|
private static let iso8601WithFractionalSeconds: ISO8601DateFormatter = {
|
|
let formatter = ISO8601DateFormatter()
|
|
formatter.formatOptions = [.withInternetDateTime, .withFractionalSeconds]
|
|
return formatter
|
|
}()
|
|
|
|
private static let iso8601Simple: ISO8601DateFormatter = {
|
|
let formatter = ISO8601DateFormatter()
|
|
formatter.formatOptions = [.withInternetDateTime]
|
|
return formatter
|
|
}()
|
|
#else
|
|
private func disconnectInternal() { }
|
|
#endif
|
|
}
|
|
|
|
extension Notification.Name {
|
|
static let socketDidReceivePrivateMessage = Notification.Name("socketDidReceivePrivateMessage")
|
|
}
|