346 lines
11 KiB
Swift
346 lines
11 KiB
Swift
import Foundation
|
|
import Combine
|
|
#if canImport(SocketIO)
|
|
import SocketIO
|
|
#endif
|
|
|
|
final class SocketService {
|
|
static let shared = SocketService()
|
|
|
|
enum ConnectionState: Equatable {
|
|
case disconnected
|
|
case connecting
|
|
case connected
|
|
}
|
|
|
|
private let syncQueue = DispatchQueue(label: "org.yobble.socket.service")
|
|
private var currentToken: String?
|
|
private var currentAuthPayload: [String: Any] = [:]
|
|
|
|
private let connectionStateSubject = CurrentValueSubject<ConnectionState, Never>(.disconnected)
|
|
|
|
var connectionStatePublisher: AnyPublisher<ConnectionState, Never> {
|
|
connectionStateSubject
|
|
.removeDuplicates()
|
|
.eraseToAnyPublisher()
|
|
}
|
|
|
|
var currentConnectionState: ConnectionState {
|
|
connectionStateSubject.value
|
|
}
|
|
|
|
private let heartbeatInterval: TimeInterval = 10
|
|
private let heartbeatTimeout: TimeInterval = 5
|
|
private let maxHeartbeatMissCount = 2
|
|
private let heartbeatEventName = AppConfig.SOCKET_HEARTBEAT_EVENT
|
|
|
|
#if canImport(SocketIO)
|
|
private var manager: SocketManager?
|
|
private var socket: SocketIOClient?
|
|
private var heartbeatTimer: DispatchSourceTimer?
|
|
private var heartbeatAckInFlight = false
|
|
private var lastHeartbeatSentAt: Date?
|
|
private var consecutiveHeartbeatMisses = 0
|
|
#endif
|
|
|
|
private init() {}
|
|
|
|
private func updateConnectionState(_ state: ConnectionState) {
|
|
let sendState: () -> Void = { [weak self] in
|
|
guard let self, self.connectionStateSubject.value != state else { return }
|
|
self.connectionStateSubject.send(state)
|
|
}
|
|
|
|
if Thread.isMainThread {
|
|
sendState()
|
|
} else {
|
|
DispatchQueue.main.async(execute: sendState)
|
|
}
|
|
}
|
|
|
|
func connectForCurrentUser() {
|
|
syncQueue.async { [weak self] in
|
|
guard let self else { return }
|
|
guard let token = self.resolveCurrentAccessToken() else {
|
|
if AppConfig.DEBUG { print("[SocketService] No access token available, disconnecting") }
|
|
self.currentToken = nil
|
|
self.disconnectInternal()
|
|
return
|
|
}
|
|
|
|
self.connectInternal(with: token)
|
|
}
|
|
}
|
|
|
|
func connect(withToken token: String) {
|
|
syncQueue.async { [weak self] in
|
|
self?.connectInternal(with: token)
|
|
}
|
|
}
|
|
|
|
func disconnect() {
|
|
syncQueue.async { [weak self] in
|
|
guard let self else { return }
|
|
self.currentToken = nil
|
|
self.disconnectInternal()
|
|
}
|
|
}
|
|
|
|
@discardableResult
|
|
private func refreshAuthTokenIfNeeded(disconnectIfMissing: Bool) -> Bool {
|
|
let storedToken = resolveCurrentAccessToken()
|
|
|
|
guard let token = storedToken, !token.isEmpty else {
|
|
guard disconnectIfMissing else { return false }
|
|
currentToken = nil
|
|
disconnectInternal()
|
|
return true
|
|
}
|
|
|
|
guard token != currentToken else { return false }
|
|
|
|
connectInternal(with: token)
|
|
return true
|
|
}
|
|
|
|
private func resolveCurrentAccessToken() -> String? {
|
|
guard
|
|
let login = UserDefaults.standard.string(forKey: "currentUser"),
|
|
!login.isEmpty
|
|
else {
|
|
return nil
|
|
}
|
|
|
|
return KeychainService.shared.get(forKey: "access_token", service: login)
|
|
}
|
|
|
|
private func connectInternal(with token: String) {
|
|
#if canImport(SocketIO)
|
|
if token == currentToken,
|
|
let socket,
|
|
socket.status == .connected || socket.status == .connecting {
|
|
if AppConfig.DEBUG { print("[SocketService] Already connected with current token") }
|
|
return
|
|
}
|
|
|
|
currentToken = token
|
|
currentAuthPayload = ["token": token]
|
|
setupSocket(with: token)
|
|
updateConnectionState(.connecting)
|
|
socket?.connect(withPayload: currentAuthPayload)
|
|
startHeartbeat()
|
|
#else
|
|
updateConnectionState(.disconnected)
|
|
if AppConfig.DEBUG {
|
|
print("[SocketService] SocketIO framework not available; skipping connection")
|
|
}
|
|
#endif
|
|
}
|
|
|
|
#if canImport(SocketIO)
|
|
private func setupSocket(with token: String) {
|
|
guard let baseURL = URL(string: AppConfig.API_SERVER) else {
|
|
if AppConfig.DEBUG { print("[SocketService] Invalid socket URL: \(AppConfig.API_SERVER)") }
|
|
return
|
|
}
|
|
|
|
disconnectInternal()
|
|
|
|
let configuration: SocketIOClientConfiguration = [
|
|
.log(AppConfig.DEBUG),
|
|
.compress,
|
|
.secure(AppConfig.PROTOCOL.lowercased() == "https"),
|
|
.path(AppConfig.SOCKET_PATH),
|
|
.reconnects(true),
|
|
.reconnectWait(2),
|
|
.forceWebsockets(true),
|
|
.extraHeaders([
|
|
"Authorization": "Bearer \(token)",
|
|
"User-Agent": AppConfig.USER_AGENT
|
|
]),
|
|
.connectParams(["token": token])
|
|
]
|
|
|
|
let manager = SocketManager(socketURL: baseURL, config: configuration)
|
|
manager.handleQueue = syncQueue
|
|
let socket = manager.defaultSocket
|
|
|
|
if AppConfig.DEBUG {
|
|
socket.onAny { event in
|
|
print("[SocketService] onAny event=\(event.event) data=\(event.items ?? [])")
|
|
}
|
|
}
|
|
|
|
socket.on(clientEvent: .connect) { _, _ in
|
|
if AppConfig.DEBUG { print("[SocketService] Connected") }
|
|
self.handleHeartbeatSuccess()
|
|
}
|
|
|
|
socket.on(clientEvent: .statusChange) { data, _ in
|
|
guard let rawStatus = data.first as? SocketIOStatus else { return }
|
|
if rawStatus == .connected {
|
|
self.handleHeartbeatSuccess()
|
|
} else if rawStatus == .connecting {
|
|
self.updateConnectionState(.connecting)
|
|
} else {
|
|
self.updateConnectionState(.disconnected)
|
|
}
|
|
}
|
|
|
|
socket.on(clientEvent: .reconnect) { _, _ in
|
|
if AppConfig.DEBUG { print("[SocketService] Reconnecting") }
|
|
if self.refreshAuthTokenIfNeeded(disconnectIfMissing: true) { return }
|
|
self.updateConnectionState(.connecting)
|
|
}
|
|
|
|
socket.on(clientEvent: .reconnectAttempt) { data, _ in
|
|
if AppConfig.DEBUG { print("[SocketService] Reconnect attempt: \(data)") }
|
|
if self.refreshAuthTokenIfNeeded(disconnectIfMissing: true) { return }
|
|
self.updateConnectionState(.connecting)
|
|
}
|
|
|
|
socket.on(clientEvent: .disconnect) { data, _ in
|
|
if AppConfig.DEBUG { print("[SocketService] Disconnected: \(data)") }
|
|
self.updateConnectionState(.disconnected)
|
|
}
|
|
|
|
socket.on(clientEvent: .error) { data, _ in
|
|
if AppConfig.DEBUG { print("[SocketService] Error: \(data)") }
|
|
self.updateConnectionState(.disconnected)
|
|
}
|
|
|
|
socket.on("pong") { [weak self] _, _ in
|
|
self?.handleHeartbeatSuccess()
|
|
}
|
|
|
|
socket.on("message") { [weak self] data, _ in
|
|
self?.handleMessageEvent(data)
|
|
}
|
|
|
|
self.manager = manager
|
|
self.socket = socket
|
|
}
|
|
|
|
private func disconnectInternal() {
|
|
stopHeartbeat()
|
|
socket?.disconnect()
|
|
manager?.disconnect()
|
|
socket = nil
|
|
manager = nil
|
|
updateConnectionState(.disconnected)
|
|
}
|
|
|
|
private func startHeartbeat() {
|
|
guard heartbeatInterval > 0 else { return }
|
|
stopHeartbeat()
|
|
let timer = DispatchSource.makeTimerSource(queue: syncQueue)
|
|
timer.schedule(deadline: .now() + heartbeatInterval, repeating: heartbeatInterval)
|
|
timer.setEventHandler { [weak self] in
|
|
guard let self else { return }
|
|
self.performHeartbeatCheck()
|
|
}
|
|
heartbeatTimer = timer
|
|
timer.resume()
|
|
}
|
|
|
|
private func stopHeartbeat() {
|
|
heartbeatTimer?.cancel()
|
|
heartbeatTimer = nil
|
|
heartbeatAckInFlight = false
|
|
consecutiveHeartbeatMisses = 0
|
|
lastHeartbeatSentAt = nil
|
|
}
|
|
|
|
private func performHeartbeatCheck() {
|
|
if refreshAuthTokenIfNeeded(disconnectIfMissing: true) { return }
|
|
|
|
guard let socket = socket else {
|
|
updateConnectionState(.disconnected)
|
|
return
|
|
}
|
|
|
|
switch socket.status {
|
|
case .connected:
|
|
sendHeartbeat(on: socket)
|
|
case .connecting:
|
|
updateConnectionState(.connecting)
|
|
case .disconnected, .notConnected:
|
|
updateConnectionState(.connecting)
|
|
if refreshAuthTokenIfNeeded(disconnectIfMissing: true) { return }
|
|
socket.connect(withPayload: currentAuthPayload)
|
|
@unknown default:
|
|
updateConnectionState(.connecting)
|
|
}
|
|
}
|
|
|
|
private func sendHeartbeat(on socket: SocketIOClient) {
|
|
if heartbeatAckInFlight {
|
|
if let lastSentAt = lastHeartbeatSentAt,
|
|
Date().timeIntervalSince(lastSentAt) >= heartbeatTimeout {
|
|
heartbeatAckInFlight = false
|
|
handleMissedHeartbeat(for: socket)
|
|
} else {
|
|
return
|
|
}
|
|
}
|
|
|
|
heartbeatAckInFlight = true
|
|
lastHeartbeatSentAt = Date()
|
|
|
|
socket.emit(heartbeatEventName, ["data": "ping"])
|
|
}
|
|
|
|
private func handleMessageEvent(_ data: [Any]) {
|
|
guard let payload = data.first else { return }
|
|
|
|
let messageText: String?
|
|
|
|
if let dictionary = payload as? [String: Any] {
|
|
if let nestedData = dictionary["data"] as? [String: Any],
|
|
let nestedMessage = nestedData["message"] as? String {
|
|
messageText = nestedMessage
|
|
} else {
|
|
messageText = dictionary["message"] as? String
|
|
}
|
|
} else if let stringValue = payload as? String {
|
|
messageText = stringValue
|
|
} else {
|
|
messageText = nil
|
|
}
|
|
|
|
guard let message = messageText?.trimmingCharacters(in: .whitespacesAndNewlines).lowercased() else {
|
|
return
|
|
}
|
|
|
|
if message == "pong" {
|
|
handleHeartbeatSuccess()
|
|
}
|
|
}
|
|
|
|
private func handleHeartbeatSuccess() {
|
|
consecutiveHeartbeatMisses = 0
|
|
heartbeatAckInFlight = false
|
|
lastHeartbeatSentAt = nil
|
|
updateConnectionState(.connected)
|
|
}
|
|
|
|
private func handleMissedHeartbeat(for socket: SocketIOClient) {
|
|
consecutiveHeartbeatMisses += 1
|
|
heartbeatAckInFlight = false
|
|
lastHeartbeatSentAt = nil
|
|
updateConnectionState(.connecting)
|
|
guard consecutiveHeartbeatMisses > maxHeartbeatMissCount else {
|
|
return
|
|
}
|
|
|
|
consecutiveHeartbeatMisses = 0
|
|
updateConnectionState(.connecting)
|
|
if refreshAuthTokenIfNeeded(disconnectIfMissing: true) { return }
|
|
socket.disconnect()
|
|
socket.connect(withPayload: currentAuthPayload)
|
|
}
|
|
#else
|
|
private func disconnectInternal() { }
|
|
#endif
|
|
}
|