Skip to content

Commit

Permalink
fix: update locking webrtc
Browse files Browse the repository at this point in the history
  • Loading branch information
cedricve committed Nov 23, 2023
1 parent 94b71a0 commit 54bc198
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 32 deletions.
9 changes: 1 addition & 8 deletions machinery/src/cloud/Cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,14 +601,7 @@ func HandleLiveStreamHD(livestreamCursor *pubsub.QueueCursor, configuration *mod
log.Log.Info("HandleLiveStreamHD: Waiting for peer connections.")
for handshake := range communication.HandleLiveHDHandshake {
log.Log.Info("HandleLiveStreamHD: setting up a peer connection.")
key := config.Key + "/" + handshake.SessionID
webrtc.CandidatesMutex.Lock()
_, ok := webrtc.CandidateArrays[key]
if !ok {
webrtc.CandidateArrays[key] = make(chan string)
}
webrtc.CandidatesMutex.Unlock()
webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, videoTrack, audioTrack, handshake, webrtc.CandidateArrays[key])
go webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, videoTrack, audioTrack, handshake)

}
}
Expand Down
2 changes: 1 addition & 1 deletion machinery/src/routers/mqtt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ func HandleReceiveHDCandidates(mqttClient mqtt.Client, hubKey string, payload mo
if communication.CameraConnected {
// Register candidate channel
key := configuration.Config.Key + "/" + receiveHDCandidatesPayload.SessionID
webrtc.RegisterCandidates(key, receiveHDCandidatesPayload)
go webrtc.RegisterCandidates(key, receiveHDCandidatesPayload)
} else {
log.Log.Info("HandleReceiveHDCandidates: received candidate, but camera is not connected.")
}
Expand Down
51 changes: 28 additions & 23 deletions machinery/src/webrtc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,19 @@ func (w WebRTC) CreateOffer(sd []byte) pionWebRTC.SessionDescription {
}

func RegisterCandidates(key string, candidate models.ReceiveHDCandidatesPayload) {

// Set lock
CandidatesMutex.Lock()
defer CandidatesMutex.Unlock()

channel := CandidateArrays[key]
if channel == nil {
channel = make(chan string)
CandidateArrays[key] = channel
}
log.Log.Info("HandleReceiveHDCandidates: " + candidate.Candidate)
channel <- candidate.Candidate
CandidatesMutex.Unlock()
}

func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoTrack *pionWebRTC.TrackLocalStaticSample, audioTrack *pionWebRTC.TrackLocalStaticSample, handshake models.RequestHDStreamPayload, candidates chan string) {
func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoTrack *pionWebRTC.TrackLocalStaticSample, audioTrack *pionWebRTC.TrackLocalStaticSample, handshake models.RequestHDStreamPayload) {

config := configuration.Config
deviceKey := config.Key
Expand All @@ -111,6 +109,15 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
turnServersUsername := config.TURNUsername
turnServersCredential := config.TURNPassword

// We create a channel which will hold the candidates for this session.
sessionKey := config.Key + "/" + handshake.SessionID
CandidatesMutex.Lock()
_, ok := CandidateArrays[sessionKey]
if !ok {
CandidateArrays[sessionKey] = make(chan string)
}
CandidatesMutex.Unlock()

// Set variables
hubKey := handshake.HubKey
sessionDescription := handshake.SessionDescription
Expand Down Expand Up @@ -147,35 +154,40 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
if err == nil && peerConnection != nil {

if _, err = peerConnection.AddTrack(videoTrack); err != nil {
panic(err)
//panic(err)
}

if _, err = peerConnection.AddTrack(audioTrack); err != nil {
panic(err)
//panic(err)
}

if err != nil {
panic(err)
//panic(err)
}

peerConnection.OnICEConnectionStateChange(func(connectionState pionWebRTC.ICEConnectionState) {
if connectionState == pionWebRTC.ICEConnectionStateDisconnected {
CandidatesMutex.Lock()
defer CandidatesMutex.Unlock()

atomic.AddInt64(&peerConnectionCount, -1)

// Set lock
CandidatesMutex.Lock()
peerConnections[handshake.SessionID] = nil
close(candidates)
_, ok := CandidateArrays[sessionKey]
if ok {
close(CandidateArrays[sessionKey])
}
CandidatesMutex.Unlock()

close(w.PacketsCount)
if err := peerConnection.Close(); err != nil {
panic(err)
//panic(err)
}
} else if connectionState == pionWebRTC.ICEConnectionStateConnected {
atomic.AddInt64(&peerConnectionCount, 1)
} else if connectionState == pionWebRTC.ICEConnectionStateChecking {
// Iterate over the candidates and send them to the remote client
// Non blocking channel
for candidate := range candidates {
for candidate := range CandidateArrays[sessionKey] {
log.Log.Info("InitializeWebRTCConnection: Received candidate.")
if candidateErr := peerConnection.AddICECandidate(pionWebRTC.ICECandidateInit{Candidate: string(candidate)}); candidateErr != nil {
log.Log.Error("InitializeWebRTCConnection: something went wrong while adding candidate: " + candidateErr.Error())
Expand All @@ -188,29 +200,22 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati

offer := w.CreateOffer(sd)
if err = peerConnection.SetRemoteDescription(offer); err != nil {
panic(err)
//panic(err)
}

answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
panic(err)
//panic(err)
} else if err = peerConnection.SetLocalDescription(answer); err != nil {
panic(err)
//panic(err)
}

// When an ICE candidate is available send to the other Pion instance
// the other Pion instance will add this candidate by calling AddICECandidate
var candidatesMux sync.Mutex
// When an ICE candidate is available send to the other peer using the signaling server (MQTT).
// The other peer will add this candidate by calling AddICECandidate
peerConnection.OnICECandidate(func(candidate *pionWebRTC.ICECandidate) {
if candidate == nil {
return
}

candidatesMux.Lock()
defer candidatesMux.Unlock()

// Create a config map
valueMap := make(map[string]interface{})
candateJSON := candidate.ToJSON()
Expand Down

0 comments on commit 54bc198

Please sign in to comment.