From 70c2e97ac91cb6ef2629cb7bd89f3467958263cd Mon Sep 17 00:00:00 2001 From: shogo4405 Date: Tue, 3 Sep 2024 22:43:10 +0900 Subject: [PATCH] Support SRT listener mode. --- SRTHaishinKit/SRTConnection.swift | 36 ++++++++++++++++++++++++++++--- SRTHaishinKit/SRTSocket.swift | 24 +++++++++++++++------ Sources/IO/IOStream.swift | 8 +++++++ Sources/RTMP/RTMPStream.swift | 4 ---- 4 files changed, 58 insertions(+), 14 deletions(-) diff --git a/SRTHaishinKit/SRTConnection.swift b/SRTHaishinKit/SRTConnection.swift index 452f7cc26..655b3a2be 100644 --- a/SRTHaishinKit/SRTConnection.swift +++ b/SRTHaishinKit/SRTConnection.swift @@ -18,6 +18,7 @@ public final class SRTConnection: NSObject { /// This instance connect to server(true) or not(false) @objc public private(set) dynamic var connected = false + var mode: SRTMode = .caller var socket: SRTSocket? { didSet { socket?.delegate = self @@ -59,7 +60,13 @@ public final class SRTConnection: NSObject { socket = .init() try socket?.open(addr, mode: mode, options: options) self.uri = uri - connected = socket?.status == SRTS_CONNECTED + switch mode { + case .listener: + break + default: + connected = socket?.status == SRTS_CONNECTED + } + self.mode = mode continuation.resume() } catch { continuation.resume(throwing: error) @@ -98,7 +105,21 @@ public final class SRTConnection: NSObject { extension SRTConnection: SRTSocketDelegate { // MARK: SRTSocketDelegate func socket(_ socket: SRTSocket, status: SRT_SOCKSTATUS) { - connected = socket.status == SRTS_CONNECTED + switch mode { + case .caller: + connected = socket.status == SRTS_CONNECTED + case .listener: + let connected = socket.status == SRTS_CONNECTED + guard !connected else { + return + } + if let indexOf = clients.firstIndex(where: { $0.socket == socket.socket }) { + clients[indexOf].delegate = nil + clients[indexOf].close() + clients.remove(at: indexOf) + } + self.connected = false + } } func socket(_ socket: SRTSocket, incomingDataAvailabled data: Data, bytes: Int32) { @@ -106,6 +127,15 @@ extension SRTConnection: SRTSocketDelegate { } func socket(_ socket: SRTSocket, didAcceptSocket client: SRTSocket) { - clients.append(client) + // only one client can accept. + if clients.isEmpty { + client.delegate = self + clients.append(client) + connected = true + client.startRunning() + client.doInput() + } else { + client.reject() + } } } diff --git a/SRTHaishinKit/SRTSocket.swift b/SRTHaishinKit/SRTSocket.swift index eb482cfae..f00076fd1 100644 --- a/SRTHaishinKit/SRTSocket.swift +++ b/SRTHaishinKit/SRTSocket.swift @@ -135,13 +135,18 @@ final class SRTSocket { } func doInput() { - incomingQueue.async { - repeat { - let result = self.recvmsg() - if 0 < result { - self.delegate?.socket(self, incomingDataAvailabled: self.incomingBuffer, bytes: result) - } - } while self.isRunning.value + switch mode { + case .caller: + incomingQueue.async { + repeat { + let result = self.recvmsg() + if 0 < result { + self.delegate?.socket(self, incomingDataAvailabled: self.incomingBuffer, bytes: result) + } + } while self.isRunning.value + } + case .listener: + break } } @@ -161,6 +166,11 @@ final class SRTSocket { return srt_bstats(socket, &perf, 1) } + func reject() { + srt_setrejectreason(socket, Int32(SRT_REJ_CLOSE.rawValue)) + srt_close(socket) + } + private func accept() { let socket = srt_accept(socket, nil, nil) do { diff --git a/Sources/IO/IOStream.swift b/Sources/IO/IOStream.swift index a62d22ed2..21361b642 100644 --- a/Sources/IO/IOStream.swift +++ b/Sources/IO/IOStream.swift @@ -83,6 +83,11 @@ open class IOStream: NSObject { /// The lockQueue. public let lockQueue: DispatchQueue = .init(label: "com.haishinkit.HaishinKit.IOStream.lock", qos: .userInitiated) + /// The boolean value that indicates audio samples allow access or not. + public internal(set) var audioSampleAccess = true + /// The boolean value that indicates video samples allow access or not. + public internal(set) var videoSampleAccess = true + /// The offscreen rendering object. public var screen: Screen { return mixer.videoIO.screen @@ -560,6 +565,9 @@ extension IOStream: IOTellyUnitDelegate { // MARK: IOTellyUnitDelegate func tellyUnit(_ tellyUnit: IOTellyUnit, dequeue sampleBuffer: CMSampleBuffer) { mixer.videoIO.view?.enqueue(sampleBuffer) + if videoSampleAccess { + observers.forEach { $0.stream(self, didOutput: sampleBuffer) } + } } func tellyUnit(_ tellyUnit: IOTellyUnit, didBufferingChanged: Bool) { diff --git a/Sources/RTMP/RTMPStream.swift b/Sources/RTMP/RTMPStream.swift index 19c13735c..3917030c8 100644 --- a/Sources/RTMP/RTMPStream.swift +++ b/Sources/RTMP/RTMPStream.swift @@ -171,10 +171,6 @@ open class RTMPStream: IOStream { public internal(set) var info = RTMPStreamInfo() /// The object encoding (AMF). Framework supports AMF0 only. public private(set) var objectEncoding: RTMPObjectEncoding = RTMPConnection.defaultObjectEncoding - /// The boolean value that indicates audio samples allow access or not. - public private(set) var audioSampleAccess = true - /// The boolean value that indicates video samples allow access or not. - public private(set) var videoSampleAccess = true /// Incoming audio plays on the stream or not. public var receiveAudio = true { didSet {