Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mux rtmp protocol #424

Merged
merged 2 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 21 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@ redis:
db: redis db

# optional fields
health_port: if used, will open an http port for health checks
prometheus_port: port used to collect prometheus metrics. Used for autoscaling
health_port: port used for http health checks (default 0)
template_port: port used to host default templates (default 7980)
prometheus_port: port used to collect prometheus metrics (default 0)
debug_handler_port: port used to host http debug handlers (default 0)
logging:
level: debug, info, warn, or error (default info)
json: true
template_base: can be used to host custom templates (default https://egress-composite.livekit.io)
insecure: can be used to connect to an insecure websocket (default false)
backup_storage: files will be moved here when uploads fail. location must have write access granted for all users
cpu_cost: # optionally override cpu cost estimation, used when accepting or denying requests
room_composite_cpu_cost: 3.0
web_cpu_cost: 3.0
Expand Down Expand Up @@ -94,6 +96,15 @@ alioss:
region: Ali OSS region
endpoint: (optional) custom endpoint
bucket: bucket to upload files to

# dev/debugging fields
insecure: can be used to connect to an insecure websocket (default false)
debug:
enable_profiling: create and upload pipeline dot file and pprof file on pipeline failure
s3: upload config for dotfiles (see above)
azure: upload config for dotfiles (see above)
gcp: upload config for dotfiles (see above)
alioss: upload config for dotfiles (see above)
```

The config file can be added to a mounted volume with its location passed in the EGRESS_CONFIG_FILE env var, or its body can be passed in the EGRESS_CONFIG_BODY env var.
Expand Down Expand Up @@ -170,33 +181,22 @@ You can then use our [cli](https://github.com/livekit/livekit-cli) to submit egr
## FAQ

### Can I store the files locally instead of uploading to cloud storage?
- Yes, you can mount a volume with your `docker run` command (e.g. `-v ~/livekit-egress:/out/`), and use the mounted directory in your filenames (e.g. `/out/my-recording.mp4`)
- Yes, you can mount a volume with your `docker run` command (e.g. `-v ~/livekit-egress:/out/`), and use the mounted
directory in your filenames (e.g. `/out/my-recording.mp4`). Since egress is not run as the root user, write permissions
will need to be enabled for all users.

### I get a `"no response from egress service"` error when sending a request

- Your livekit server cannot connect to an egress instance through redis. Make sure they are both able to reach the same redis db.
- Each instance currently only accepts one RoomCompositeRequest at a time - if it's already in use, you'll need to deploy more instances or set up autoscaling.
- If all of your egress instances are full, you'll need to deploy more instances or set up autoscaling.

### I get a different error when sending a request

- Make sure your egress, livekit, server sdk, and livekit-cli are all up to date.

### I'm getting a broken (0 byte) mp4 file

- This is caused by the process being killed - GStreamer needs to be properly shut down to close the file.
- Make sure your instance has enough CPU and memory, and is being stopped correctly.

### I'm seeing GStreamer warnings/errors. Is this normal?

- `GStreamer-CRITICAL **: 20:22:13.875: gst_mini_object_unref: assertion 'GST_MINI_OBJECT_REFCOUNT_VALUE (mini_object) > 0' failed`
- Occurs during audio-only egress - this is a gst bug, and is safe to ignore.
- `WARN flvmux ... Got backwards dts! (0:01:10.379000000 < 0:01:10.457000000)`
- Occurs when streaming to rtmp - safe to ignore. These warnings occur due to live sources being used for the flvmux.
The dts difference should be small (under 150ms).

### Can I run this without docker?

- It's possible, but not recommended. To do so, you would need gstreamer and all the plugins installed, along with xvfb,
- It's possible, but not recommended. To do so, you would need to install gstreamer along with its plugins, chrome, xvfb,
and have a pulseaudio server running.

## Testing and Development
Expand All @@ -211,7 +211,6 @@ api_secret: your-api-secret
ws_url: wss://your-livekit-url.com
redis:
address: 192.168.65.2:6379
room_name: your-room
room_only: false
web_only: false
track_composite_only: false
Expand All @@ -220,6 +219,8 @@ file_only: false
stream_only: false
segments_only: false
muting: false
dot_files: false
short: false
```

Join a room using https://example.livekit.io or your own client, then run `mage integration test/config.yaml`.
Expand Down
30 changes: 15 additions & 15 deletions pkg/config/output_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,34 +33,34 @@ func (p *PipelineConfig) getStreamConfig(outputType types.OutputType, urls []str
outputConfig: outputConfig{OutputType: outputType},
}

conf.StreamInfo = make(map[string]*livekit.StreamInfo)
var streamInfoList []*livekit.StreamInfo
for _, rawUrl := range urls {
url, redacted, err := p.ValidateUrl(rawUrl, outputType)
if err != nil {
return nil, err
}

conf.Urls = append(conf.Urls, url)

info := &livekit.StreamInfo{Url: redacted}
conf.StreamInfo[url] = info
streamInfoList = append(streamInfoList, info)
}

switch outputType {
case types.OutputTypeRTMP:
p.AudioOutCodec = types.MimeTypeAAC
p.VideoOutCodec = types.MimeTypeH264
conf.Urls = urls

case types.OutputTypeRaw:
p.AudioOutCodec = types.MimeTypeRawAudio
conf.Urls = urls
}

// Use a 4s default key frame interval for streaming
if p.KeyFrameInterval == 0 {
p.KeyFrameInterval = 4
}

conf.StreamInfo = make(map[string]*livekit.StreamInfo)
var streamInfoList []*livekit.StreamInfo
for _, rawUrl := range urls {
redacted, err := p.ValidateUrl(rawUrl, outputType)
if err != nil {
return nil, err
}

info := &livekit.StreamInfo{Url: redacted}
conf.StreamInfo[rawUrl] = info
streamInfoList = append(streamInfoList, info)
}

return conf, nil
}
19 changes: 12 additions & 7 deletions pkg/config/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"context"
"fmt"
"net/url"
"strings"
"time"
Expand Down Expand Up @@ -513,28 +514,32 @@ func (p *PipelineConfig) UpdateInfoFromSDK(identifier string, replacements map[s
return nil
}

func (p *PipelineConfig) ValidateUrl(rawUrl string, outputType types.OutputType) (string, error) {
func (p *PipelineConfig) ValidateUrl(rawUrl string, outputType types.OutputType) (string, string, error) {
parsed, err := url.Parse(rawUrl)
if err != nil {
return "", errors.ErrInvalidUrl(rawUrl, err.Error())
return "", "", errors.ErrInvalidUrl(rawUrl, err.Error())
}

switch outputType {
case types.OutputTypeRTMP:
if parsed.Scheme == "mux" {
rawUrl = fmt.Sprintf("rtmps://global-live.mux.com:443/app/%s", parsed.Host)
}

redacted, ok := util.RedactStreamKey(rawUrl)
if !ok {
return "", errors.ErrInvalidUrl(rawUrl, "rtmp urls must be of format rtmp(s)://{host}(/{path})/{app}/{stream_key}( live=1)")
return "", "", errors.ErrInvalidUrl(rawUrl, "rtmp urls must be of format rtmp(s)://{host}(/{path})/{app}/{stream_key}( live=1)")
}
return redacted, nil
return rawUrl, redacted, nil

case types.OutputTypeRaw:
if parsed.Scheme != "ws" && parsed.Scheme != "wss" {
return "", errors.ErrInvalidUrl(rawUrl, "invalid scheme")
return "", "", errors.ErrInvalidUrl(rawUrl, "invalid scheme")
}
return rawUrl, nil
return rawUrl, rawUrl, nil

default:
return "", errors.ErrInvalidInput("stream output type")
return "", "", errors.ErrInvalidInput("stream output type")
}
}

Expand Down
14 changes: 10 additions & 4 deletions pkg/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,9 @@ func (p *Pipeline) UpdateStream(ctx context.Context, req *livekit.UpdateStreamRe
now := time.Now().UnixNano()

// add stream outputs first
for _, url := range req.AddOutputUrls {
for _, rawUrl := range req.AddOutputUrls {
// validate and redact url
redacted, err := p.ValidateUrl(url, types.OutputTypeRTMP)
url, redacted, err := p.ValidateUrl(rawUrl, types.OutputTypeRTMP)
if err != nil {
errs.AppendErr(err)
continue
Expand Down Expand Up @@ -294,8 +294,14 @@ func (p *Pipeline) UpdateStream(ctx context.Context, req *livekit.UpdateStreamRe
}

// remove stream outputs
for _, url := range req.RemoveOutputUrls {
if err := p.removeSink(ctx, url, nil); err != nil {
for _, rawUrl := range req.RemoveOutputUrls {
url, _, err := p.ValidateUrl(rawUrl, types.OutputTypeRTMP)
if err != nil {
errs.AppendErr(err)
continue
}

if err = p.removeSink(ctx, url, nil); err != nil {
errs.AppendErr(err)
} else {
sendUpdate = true
Expand Down