Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix memory-leak RTMPConnection, RTMPStream. #1658

Merged
merged 1 commit into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions HaishinKit/Sources/Codec/AudioCodec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ final class AudioCodec {
}

var outputStream: AsyncStream<(AVAudioBuffer, AVAudioTime)> {
let (stream, continuation) = AsyncStream.makeStream(of: (AVAudioBuffer, AVAudioTime).self)
self.continuation = continuation
return stream
AsyncStream { continuation in
self.continuation = continuation
}
}

/// This instance is running to process(true) or not(false).
Expand All @@ -45,9 +45,13 @@ final class AudioCodec {
}
private var cursor: Int = 0
private var inputBuffers: [AVAudioBuffer] = []
private var continuation: AsyncStream<(AVAudioBuffer, AVAudioTime)>.Continuation? {
didSet {
oldValue?.finish()
}
}
private var outputBuffers: [AVAudioBuffer] = []
private var audioConverter: AVAudioConverter?
private var continuation: AsyncStream<(AVAudioBuffer, AVAudioTime)>.Continuation?

func append(_ sampleBuffer: CMSampleBuffer) {
guard isRunning else {
Expand Down Expand Up @@ -184,7 +188,7 @@ extension AudioCodec: Runner {
guard isRunning else {
return
}
continuation?.finish()
isRunning = false
continuation = nil
}
}
2 changes: 1 addition & 1 deletion HaishinKit/Sources/Codec/VideoCodec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ final class VideoCodec {
var needsSync = true
var passthrough = true
var outputStream: AsyncStream<CMSampleBuffer> {
AsyncStream<CMSampleBuffer> { continuation in
AsyncStream { continuation in
self.continuation = continuation
}
}
Expand Down
21 changes: 13 additions & 8 deletions HaishinKit/Sources/HKStream/HKOutgoingStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ public final class HKOutgoingStream {
/// The asynchronous sequence for video input buffer.
public var videoInputStream: AsyncStream<CMSampleBuffer> {
if 0 < videoInputBufferCounts {
let (stream, continuation) = AsyncStream.makeStream(of: CMSampleBuffer.self, bufferingPolicy: .bufferingNewest(videoInputBufferCounts))
self.videoInputContinuation = continuation
return stream
return AsyncStream(CMSampleBuffer.self, bufferingPolicy: .bufferingNewest(videoInputBufferCounts)) { continuation in
self.videoInputContinuation = continuation
}
} else {
let (stream, continuation) = AsyncStream.makeStream(of: CMSampleBuffer.self)
self.videoInputContinuation = continuation
return stream
return AsyncStream { continuation in
self.videoInputContinuation = continuation
}
}
}

Expand All @@ -59,7 +59,11 @@ public final class HKOutgoingStream {

private var audioCodec = AudioCodec()
private var videoCodec = VideoCodec()
private var videoInputContinuation: AsyncStream<CMSampleBuffer>.Continuation?
private var videoInputContinuation: AsyncStream<CMSampleBuffer>.Continuation? {
didSet {
oldValue?.finish()
}
}

/// Create a new instance.
public init() {
Expand Down Expand Up @@ -106,8 +110,9 @@ extension HKOutgoingStream: Runner {
guard isRunning else {
return
}
isRunning = false
videoCodec.stopRunning()
audioCodec.stopRunning()
isRunning = false
videoInputContinuation = nil
}
}
29 changes: 20 additions & 9 deletions HaishinKit/Sources/Network/NetworkMonitor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,28 @@ public final actor NetworkMonitor {

/// An asynchronous sequence for network monitoring event.
public var event: AsyncStream<NetworkMonitorEvent> {
let (stream, continuation) = AsyncStream<NetworkMonitorEvent>.makeStream()
self.continuation = continuation
return stream
AsyncStream { continuation in
self.continuation = continuation
}
}

public private(set) var isRunning = false
private var timer: Task<Void, Never>? {
didSet {
oldValue?.cancel()
}
}
private var measureInterval = 3
private var currentBytesInPerSecond = 0
private var currentBytesOutPerSecond = 0
private var previousTotalBytesIn = 0
private var previousTotalBytesOut = 0
private var previousQueueBytesOut: [Int] = []
private var continuation: AsyncStream<NetworkMonitorEvent>.Continuation?
private var continuation: AsyncStream<NetworkMonitorEvent>.Continuation? {
didSet {
oldValue?.finish()
}
}
private weak var reporter: (any NetworkTransportReporter)?

/// Creates a new instance.
Expand Down Expand Up @@ -74,11 +83,11 @@ extension NetworkMonitor: AsyncRunner {
return
}
isRunning = true
let timer = AsyncStream {
try? await Task.sleep(nanoseconds: 1_000_000_000)
}
Task {
for await _ in timer where isRunning {
timer = Task {
let timer = AsyncStream {
try? await Task.sleep(nanoseconds: 1_000_000_000)
}
for await _ in timer {
do {
let event = try await collect()
continuation?.yield(event)
Expand All @@ -94,5 +103,7 @@ extension NetworkMonitor: AsyncRunner {
return
}
isRunning = false
timer = nil
continuation = nil
}
}
6 changes: 3 additions & 3 deletions HaishinKit/Sources/RTMP/RTMPStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -383,17 +383,17 @@ public actor RTMPStream {
try? send("@setDataFrame", arguments: "onMetaData", metadata)
outgoing.startRunning()
Task {
for await audio in outgoing.audioOutputStream where outgoing.isRunning {
for await audio in outgoing.audioOutputStream {
append(audio.0, when: audio.1)
}
}
Task {
for await video in outgoing.videoOutputStream where outgoing.isRunning {
for await video in outgoing.videoOutputStream {
append(video)
}
}
Task {
for await video in outgoing.videoInputStream where outgoing.isRunning {
for await video in outgoing.videoInputStream {
outgoing.append(video: video)
}
}
Expand Down