Skip to content

Commit

Permalink
fix crash when publishing to a path with 'runOnDemand' from outside '…
Browse files Browse the repository at this point in the history
…runOnDemand' (#2636)
  • Loading branch information
aler9 committed Nov 3, 2023
1 parent 39a239c commit 15737a8
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 52 deletions.
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1272,6 +1272,7 @@ The server allows to specify commands that are executed when a certain event hap
`runOnConnect` allows to run a command when a client connects to the server:

```yml
# Command to run when a client connects to the server.
# This is terminated with SIGINT when a client disconnects from the server.
# The following environment variables are available:
# * RTSP_PORT: RTSP server port
Expand All @@ -1285,6 +1286,7 @@ runOnConnectRestart: no
`runOnDisconnect` allows to run a command when a client disconnects from the server:

```yml
# Command to run when a client disconnects from the server.
# Environment variables are the same of runOnConnect.
runOnDisconnect: curl http://my-custom-server/webhook?conn_type=$MTX_CONN_TYPE&conn_id=$MTX_CONN_ID
```
Expand All @@ -1294,7 +1296,8 @@ runOnDisconnect: curl http://my-custom-server/webhook?conn_type=$MTX_CONN_TYPE&c
```yml
paths:
mypath:
# This is terminated with SIGINT when the program closes.
# Command to run when this path is initialized.
# This can be used to publish a stream when the server is launched.
# The following environment variables are available:
# * MTX_PATH: path name
# * RTSP_PORT: RTSP server port
Expand All @@ -1310,6 +1313,8 @@ paths:
```yml
paths:
mypath:
# Command to run when this path is requested by a reader
# and no one is publishing to this path yet.
# This is terminated with SIGINT when the program closes.
# The following environment variables are available:
# * MTX_PATH: path name
Expand All @@ -1326,6 +1331,8 @@ paths:

```yml
pathDefaults:
# Command to run when the stream is ready to be read, whenever it is
# published by a client or pulled from a server / camera.
# This is terminated with SIGINT when the stream is not ready anymore.
# The following environment variables are available:
# * MTX_PATH: path name
Expand All @@ -1344,6 +1351,7 @@ pathDefaults:

```yml
pathDefaults:
# Command to run when the stream is not available anymore.
# Environment variables are the same of runOnReady.
runOnNotReady: curl http://my-custom-server/webhook?path=$MTX_PATH&source_type=$MTX_SOURCE_TYPE&source_id=$MTX_SOURCE_ID
```
Expand All @@ -1352,6 +1360,7 @@ pathDefaults:

```yml
pathDefaults:
# Command to run when a client starts reading.
# This is terminated with SIGINT when a client stops reading.
# The following environment variables are available:
# * MTX_PATH: path name
Expand Down
87 changes: 37 additions & 50 deletions internal/core/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ type path struct {
chStaticSourceSetReady chan defs.PathSourceStaticSetReadyReq
chStaticSourceSetNotReady chan defs.PathSourceStaticSetNotReadyReq
chDescribe chan pathDescribeReq
chRemovePublisher chan pathRemovePublisherReq
chAddPublisher chan pathAddPublisherReq
chRemovePublisher chan pathRemovePublisherReq
chStartPublisher chan pathStartPublisherReq
chStopPublisher chan pathStopPublisherReq
chAddReader chan pathAddReaderReq
Expand Down Expand Up @@ -254,8 +254,8 @@ func newPath(
chStaticSourceSetReady: make(chan defs.PathSourceStaticSetReadyReq),
chStaticSourceSetNotReady: make(chan defs.PathSourceStaticSetNotReadyReq),
chDescribe: make(chan pathDescribeReq),
chRemovePublisher: make(chan pathRemovePublisherReq),
chAddPublisher: make(chan pathAddPublisherReq),
chRemovePublisher: make(chan pathRemovePublisherReq),
chStartPublisher: make(chan pathStartPublisherReq),
chStopPublisher: make(chan pathStopPublisherReq),
chAddReader: make(chan pathAddReaderReq),
Expand Down Expand Up @@ -357,7 +357,7 @@ func (pa *path) run() {
}

if pa.onUnDemandHook != nil {
pa.onUnDemandHook("path closed")
pa.onUnDemandHook("path destroyed")

Check warning on line 360 in internal/core/path.go

View check run for this annotation

Codecov / codecov/patch

internal/core/path.go#L360

Added line #L360 was not covered by tests
}

pa.Log(logger.Debug, "destroyed: %v", err)
Expand Down Expand Up @@ -414,16 +414,16 @@ func (pa *path) runInner() error {
return fmt.Errorf("not in use")
}

case req := <-pa.chAddPublisher:
pa.doAddPublisher(req)

case req := <-pa.chRemovePublisher:
pa.doRemovePublisher(req)

if pa.shouldClose() {
return fmt.Errorf("not in use")
}

case req := <-pa.chAddPublisher:
pa.doAddPublisher(req)

case req := <-pa.chStartPublisher:
pa.doStartPublisher(req)

Expand Down Expand Up @@ -519,22 +519,11 @@ func (pa *path) doSourceStaticSetReady(req defs.PathSourceStaticSetReadyReq) {
if pa.conf.HasOnDemandStaticSource() {
pa.onDemandStaticSourceReadyTimer.Stop()
pa.onDemandStaticSourceReadyTimer = newEmptyTimer()

pa.onDemandStaticSourceScheduleClose()

for _, req := range pa.describeRequestsOnHold {
req.res <- pathDescribeRes{
stream: pa.stream,
}
}
pa.describeRequestsOnHold = nil

for _, req := range pa.readerAddRequestsOnHold {
pa.addReaderPost(req)
}
pa.readerAddRequestsOnHold = nil
}

pa.consumeOnHoldRequests()

Check warning on line 526 in internal/core/path.go

View check run for this annotation

Codecov / codecov/patch

internal/core/path.go#L525-L526

Added lines #L525 - L526 were not covered by tests
req.Res <- defs.PathSourceStaticSetReadyRes{Stream: pa.stream}
}

Expand Down Expand Up @@ -649,25 +638,14 @@ func (pa *path) doStartPublisher(req pathStartPublisherReq) {
pa.name,
mediaInfo(req.desc.Medias))

if pa.conf.HasOnDemandPublisher() {
if pa.conf.HasOnDemandPublisher() && pa.onDemandPublisherState != pathOnDemandStateInitial {
pa.onDemandPublisherReadyTimer.Stop()
pa.onDemandPublisherReadyTimer = newEmptyTimer()

pa.onDemandPublisherScheduleClose()

for _, req := range pa.describeRequestsOnHold {
req.res <- pathDescribeRes{
stream: pa.stream,
}
}
pa.describeRequestsOnHold = nil

for _, req := range pa.readerAddRequestsOnHold {
pa.addReaderPost(req)
}
pa.readerAddRequestsOnHold = nil
}

pa.consumeOnHoldRequests()

req.res <- pathStartPublisherRes{stream: pa.stream}
}

Expand Down Expand Up @@ -840,20 +818,15 @@ func (pa *path) onDemandPublisherScheduleClose() {
}

func (pa *path) onDemandPublisherStop(reason string) {
if pa.source != nil {
pa.source.(publisher).close()
pa.executeRemovePublisher()
}

if pa.onDemandPublisherState == pathOnDemandStateClosing {
pa.onDemandPublisherCloseTimer.Stop()
pa.onDemandPublisherCloseTimer = newEmptyTimer()
}

pa.onDemandPublisherState = pathOnDemandStateInitial

pa.onUnDemandHook(reason)
pa.onUnDemandHook = nil

pa.onDemandPublisherState = pathOnDemandStateInitial
}

func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error {
Expand Down Expand Up @@ -881,6 +854,20 @@ func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error
return nil
}

func (pa *path) consumeOnHoldRequests() {
for _, req := range pa.describeRequestsOnHold {
req.res <- pathDescribeRes{
stream: pa.stream,
}
}
pa.describeRequestsOnHold = nil

for _, req := range pa.readerAddRequestsOnHold {
pa.addReaderPost(req)
}
pa.readerAddRequestsOnHold = nil
}

func (pa *path) setNotReady() {
pa.parent.pathNotReady(pa)

Expand Down Expand Up @@ -1048,16 +1035,6 @@ func (pa *path) describe(req pathDescribeReq) pathDescribeRes {
}
}

// removePublisher is called by a publisher.
func (pa *path) removePublisher(req pathRemovePublisherReq) {
req.res = make(chan struct{})
select {
case pa.chRemovePublisher <- req:
<-req.res
case <-pa.ctx.Done():
}
}

// addPublisher is called by a publisher through pathManager.
func (pa *path) addPublisher(req pathAddPublisherReq) pathAddPublisherRes {
select {
Expand All @@ -1068,6 +1045,16 @@ func (pa *path) addPublisher(req pathAddPublisherReq) pathAddPublisherRes {
}
}

// removePublisher is called by a publisher.
func (pa *path) removePublisher(req pathRemovePublisherReq) {
req.res = make(chan struct{})
select {
case pa.chRemovePublisher <- req:
<-req.res
case <-pa.ctx.Done():

Check warning on line 1054 in internal/core/path.go

View check run for this annotation

Codecov / codecov/patch

internal/core/path.go#L1054

Added line #L1054 was not covered by tests
}
}

// startPublisher is called by a publisher.
func (pa *path) startPublisher(req pathStartPublisherReq) pathStartPublisherRes {
req.res = make(chan pathStartPublisherRes)
Expand Down
3 changes: 2 additions & 1 deletion mediamtx.yml
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,8 @@ pathDefaults:
# Restart the command if it exits.
runOnInitRestart: no

# Command to run when this path is requested by a reader.
# Command to run when this path is requested by a reader
# and no one is publishing to this path yet.
# This can be used to publish a stream on demand.
# This is terminated with SIGINT when the path is not requested anymore.
# The following environment variables are available:
Expand Down

0 comments on commit 15737a8

Please sign in to comment.