diff --git a/.gitignore b/.gitignore index 66fd13c..0bfb0f4 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,7 @@ # Dependency directories (remove the comment below to include it) # vendor/ + +# Other +.idea +out \ No newline at end of file diff --git a/cmd/broadcast.go b/cmd/broadcast.go new file mode 100644 index 0000000..6756ca0 --- /dev/null +++ b/cmd/broadcast.go @@ -0,0 +1,59 @@ +package cmd + +import ( + "github.com/racing-telemetry/f1-dump/internal/text/emoji" + "github.com/racing-telemetry/f1-dump/internal/text/printer" + "github.com/racing-telemetry/f1-dump/pkg/broadcaster" + "github.com/spf13/cobra" + "os" + "os/signal" + "syscall" +) + +var broadcastCmd = &cobra.Command{ + Use: "broadcast", + Short: "Start broadcasting", + Long: `Start broadcasting`, + SilenceUsage: true, + RunE: func(cmd *cobra.Command, args []string) error { + flags, err := buildFlags(cmd) + if err != nil { + return printer.Error(err) + } + + b, err := broadcaster.NewBroadcaster(flags.UDPAddr()) + if err != nil { + return printer.Error(err) + } + + // wait exit signal, ctrl+c to early exit + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + + go func() { + <-c + + b.Stop() + printer.Print(emoji.Rocket, "%d packs have been published", b.Stats.RecvCount()) + + os.Exit(0) + }() + + printer.Print(emoji.Sparkless, "Broadcast started at %s:%d, press Ctrl+C to stop", flags.host, flags.port) + err = b.Start(flags.file) + if err != nil { + return printer.Error(err) + } + + b.Stop() + printer.Print(emoji.Rocket, "%d packs have been published", b.Stats.RecvCount()) + + return nil + }, +} + +func init() { + addFlags(broadcastCmd) + + rootCmd.AddCommand(broadcastCmd) +} diff --git a/cmd/flags.go b/cmd/flags.go new file mode 100644 index 0000000..23b2cd0 --- /dev/null +++ b/cmd/flags.go @@ -0,0 +1,67 @@ +package cmd + +import ( + "errors" + "fmt" + "github.com/racing-telemetry/f1-dump/internal" + "github.com/spf13/cobra" + "net" + "os" + "path/filepath" + "strings" +) + +type Flags struct { + port int + host string + file string +} + +func (f *Flags) UDPAddr() *net.UDPAddr { + return &net.UDPAddr{ + IP: net.ParseIP(f.host), + Port: f.port, + } +} + +func addFlags(cmd *cobra.Command) { + cmd.Flags().IntP("port", "p", internal.DefaultPort, "Port address to listen on UDP.") + cmd.Flags().String("ip", internal.DefaultHost, "Address to listen on UDP.") + cmd.Flags().StringP("file", "f", "", "I/O file path or name to save and read packets. (sample: foo/bar/output.bin)") +} + +func buildFlags(cmd *cobra.Command) (*Flags, error) { + port, err := cmd.Flags().GetInt("port") + if err != nil { + return nil, err + } + + host, _ := cmd.Flags().GetString("ip") + if host == "" { + host = internal.DefaultHost + } + + path, _ := cmd.Flags().GetString("file") + path = strings.TrimSpace(path) + if path != "" { + ext := filepath.Ext(path) + if ext != ".bin" { + return nil, errors.New("file extension must be ends with .bin") + } + + _, err = os.Stat(path) + switch cmd.Name() { + case "record": + if err == nil { + return nil, fmt.Errorf("file already exists: %s", path) + } + + case "broadcast": + if err != nil { + return nil, fmt.Errorf("file doesnt exist: %s", path) + } + } + } + + return &Flags{port: port, host: host, file: path}, nil +} diff --git a/cmd/record.go b/cmd/record.go new file mode 100644 index 0000000..4dff526 --- /dev/null +++ b/cmd/record.go @@ -0,0 +1,75 @@ +package cmd + +import ( + "errors" + "fmt" + "github.com/dustin/go-humanize" + "github.com/racing-telemetry/f1-dump/internal/text/emoji" + "github.com/racing-telemetry/f1-dump/internal/text/printer" + "github.com/racing-telemetry/f1-dump/pkg/recorder" + "github.com/spf13/cobra" + "os" + "os/signal" + "syscall" +) + +var cmdRecord = &cobra.Command{ + Use: "record", + Short: "Start recording packets from UDP source.", + Long: `Start recording packets from UDP source.`, + SilenceUsage: true, + RunE: func(cmd *cobra.Command, args []string) error { + flags, err := buildFlags(cmd) + if err != nil { + return printer.Error(err) + } + + rc, err := recorder.NewRecorder(flags.UDPAddr()) + if err != nil { + return fmt.Errorf("%s\n%s", printer.Error(errors.New("recorder can't create")), printer.Error(err)) + } + + // wait exit signal, ctrl+c to exit + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + + go func() { + <-c + + rc.Stop() + + f, err := rc.Save(flags.file) + if err != nil { + printer.PrintError(err.Error()) + } else { + stat, err := f.Stat() + if err != nil { + printer.PrintError(err.Error()) + } + + printer.Print(emoji.File, "File saved to %s (size: %s)", f.Name(), humanize.Bytes(uint64(stat.Size()))) + + err = f.Close() + if err != nil { + printer.PrintError(err.Error()) + } + } + + printer.Print(emoji.Rocket, "Received Packets: %d", rc.Stats.RecvCount()) + printer.Print(emoji.Construction, "Lost Packets: %d", rc.Stats.ErrCount()) + + os.Exit(0) + }() + + printer.Print(emoji.Sparkless, "Record started at %s:%d, press Ctrl+C to stop", flags.host, flags.port) + rc.Start() + + return nil + }, +} + +func init() { + addFlags(cmdRecord) + + rootCmd.AddCommand(cmdRecord) +} diff --git a/cmd/root.go b/cmd/root.go index 4e8685d..47b1a9e 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -1,6 +1,7 @@ package cmd import ( + "github.com/racing-telemetry/f1-dump/pkg/opts" "github.com/spf13/cobra" "os" ) @@ -16,3 +17,9 @@ func Execute() { os.Exit(1) } } + +func init() { + rootCmd.CompletionOptions.DisableDefaultCmd = true + + rootCmd.PersistentFlags().BoolVarP(&opts.Verbose, "verbose", "v", false, "verbose output") +} diff --git a/cmd/version.go b/cmd/version.go index ffe722c..00fdee3 100644 --- a/cmd/version.go +++ b/cmd/version.go @@ -2,9 +2,10 @@ package cmd import ( "encoding/json" + "errors" "fmt" - "github.com/pkg/errors" "github.com/racing-telemetry/f1-dump/internal" + "github.com/racing-telemetry/f1-dump/internal/text/printer" "github.com/spf13/cobra" "runtime" ) @@ -23,26 +24,6 @@ type CLIVersionInfo struct { Platform string } -func NewCmdVersion() *cobra.Command { - cmd := &cobra.Command{ - Use: "version", - Short: "Prints the CLI version", - Long: `Prints the CLI version`, - SilenceUsage: true, - RunE: func(cmd *cobra.Command, args []string) error { - bytes, err := json.Marshal(VersionInfo()) - if err != nil { - return errors.Wrap(err, "failed to marshal version info") - } - - fmt.Println(string(bytes)) - return nil - }, - } - - return cmd -} - func VersionInfo() *CLIVersionInfo { return &CLIVersionInfo{ Version: internal.Version, @@ -55,5 +36,19 @@ func VersionInfo() *CLIVersionInfo { } func init() { - rootCmd.AddCommand(NewCmdVersion()) + rootCmd.AddCommand(&cobra.Command{ + Use: "version", + Short: "Prints the CLI version", + Long: "Prints the CLI version", + SilenceUsage: true, + RunE: func(cmd *cobra.Command, args []string) error { + bytes, err := json.Marshal(VersionInfo()) + if err != nil { + return printer.Error(errors.New("failed to marshal version info")) + } + + fmt.Println(string(bytes)) + return nil + }, + }) } diff --git a/go.mod b/go.mod index 2566aec..9c112f6 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/racing-telemetry/f1-dump go 1.17 require ( - github.com/pkg/errors v0.8.1 + github.com/dustin/go-humanize v1.0.0 github.com/spf13/cobra v1.3.0 ) diff --git a/go.sum b/go.sum index cadc431..43207d8 100644 --- a/go.sum +++ b/go.sum @@ -90,6 +90,8 @@ github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= +github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -273,7 +275,6 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FI github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= diff --git a/internal/constants.go b/internal/constants.go new file mode 100644 index 0000000..3febaf3 --- /dev/null +++ b/internal/constants.go @@ -0,0 +1,10 @@ +package internal + +const ( + DefaultPort = 20777 + DefaultHost = "localhost" +) + +const BufferSize = 1024 + 1024/2 + +const OutFileFormat = "f1-out-%d.bin" diff --git a/internal/text/emoji/emoji.go b/internal/text/emoji/emoji.go new file mode 100644 index 0000000..5a8a1d2 --- /dev/null +++ b/internal/text/emoji/emoji.go @@ -0,0 +1,12 @@ +package emoji + +type Emoji rune + +const ( + Boom Emoji = '💥' + Fire Emoji = '🔥' + Sparkless Emoji = '✨' + File Emoji = '📁' + Construction Emoji = '🚧' + Rocket Emoji = '🚀' +) diff --git a/internal/text/printer/printer.go b/internal/text/printer/printer.go new file mode 100644 index 0000000..eb233cb --- /dev/null +++ b/internal/text/printer/printer.go @@ -0,0 +1,18 @@ +package printer + +import ( + "fmt" + "github.com/racing-telemetry/f1-dump/internal/text/emoji" +) + +func Print(emoji emoji.Emoji, s string, a ...interface{}) { + fmt.Printf("\r%s %s\n", string(emoji), fmt.Sprintf(s, a...)) +} + +func Error(err error) error { + return fmt.Errorf("\r%s Error: %s", string(emoji.Boom), err.Error()) +} + +func PrintError(format string, a ...interface{}) { + fmt.Println(Error(fmt.Errorf(format, a...))) +} diff --git a/internal/udp/client.go b/internal/udp/client.go new file mode 100644 index 0000000..2b98b2c --- /dev/null +++ b/internal/udp/client.go @@ -0,0 +1,33 @@ +package udp + +import ( + "github.com/racing-telemetry/f1-dump/internal" + "net" +) + +type Client struct { + conn *net.UDPConn +} + +func Serve(addr *net.UDPAddr) (*Client, error) { + conn, err := net.ListenUDP("udp", addr) + if err != nil { + return nil, err + } + + return &Client{conn: conn}, nil +} + +func (c *Client) ReadSocket() ([]byte, error) { + buf := make([]byte, internal.BufferSize) + _, _, err := c.conn.ReadFromUDP(buf) + if err != nil { + return nil, err + } + + return buf, err +} + +func (c *Client) Close() error { + return c.conn.Close() +} diff --git a/internal/udp/counter.go b/internal/udp/counter.go new file mode 100644 index 0000000..8da75ac --- /dev/null +++ b/internal/udp/counter.go @@ -0,0 +1,30 @@ +package udp + +import "sync" + +type Counter struct { + sync.Mutex + + recv uint64 // Success Client Packet Count + err uint64 // Fail Client Packet Count +} + +func (c *Counter) RecvCount() uint64 { + return c.recv +} + +func (c *Counter) ErrCount() uint64 { + return c.err +} + +func (c *Counter) IncRecv() { + c.Lock() + c.recv += 1 + c.Unlock() +} + +func (c *Counter) IncErr() { + c.Lock() + c.err += 1 + c.Unlock() +} diff --git a/internal/udp/server.go b/internal/udp/server.go new file mode 100644 index 0000000..da4f1ab --- /dev/null +++ b/internal/udp/server.go @@ -0,0 +1,29 @@ +package udp + +import "net" + +type Server struct { + conn net.Conn // UDP Connection +} + +func Dial(addr *net.UDPAddr) (*Server, error) { + conn, err := net.Dial("udp", addr.String()) + if err != nil { + return nil, err + } + + return &Server{conn: conn}, nil +} + +func (s *Server) WriteSocket(buf []byte) error { + _, err := s.conn.Write(buf) + if err != nil { + return err + } + + return nil +} + +func (s *Server) Close() error { + return s.conn.Close() +} diff --git a/pkg/broadcaster/broadcaster.go b/pkg/broadcaster/broadcaster.go new file mode 100644 index 0000000..a9b6bce --- /dev/null +++ b/pkg/broadcaster/broadcaster.go @@ -0,0 +1,97 @@ +package broadcaster + +import ( + "context" + "github.com/racing-telemetry/f1-dump/internal" + "github.com/racing-telemetry/f1-dump/internal/text/printer" + "github.com/racing-telemetry/f1-dump/internal/udp" + "github.com/racing-telemetry/f1-dump/pkg/opts" + "io" + "net" + "os" +) + +type Broadcaster struct { + serv *udp.Server + + ctx context.Context + stop context.CancelFunc + + Stats *udp.Counter +} + +func NewBroadcaster(addr *net.UDPAddr) (*Broadcaster, error) { + serv, err := udp.Dial(addr) + if err != nil { + return nil, err + } + + return newBroadcaster(serv), nil +} + +func newBroadcaster(serv *udp.Server) *Broadcaster { + ctx, fn := context.WithCancel(context.Background()) + return &Broadcaster{ + serv: serv, + ctx: ctx, + stop: fn, + Stats: new(udp.Counter), + } +} + +func (b *Broadcaster) Start(file string) error { + f, err := os.Open(file) + if err != nil { + return err + } + + defer f.Close() + + offset := int64(0) + for { + select { + case <-b.ctx.Done(): + break + default: + } + + buf := make([]byte, internal.BufferSize) + + n, err := f.ReadAt(buf, offset) + if err != nil { + if err == io.EOF { + break + } + + if opts.Verbose { + printer.PrintError("reading file: %s", err.Error()) + } + } + + offset += int64(n) + + err = b.serv.WriteSocket(buf) + if err != nil { + if opts.Verbose { + printer.PrintError("socket write error: %s", err.Error()) + } + + b.Stats.IncErr() + } else { + b.Stats.IncRecv() + } + } + + return nil +} + +func (b *Broadcaster) Stop() { + b.stop() + + err := b.serv.Close() + if err != nil { + if opts.Verbose { + printer.PrintError("socket closing: %s", err.Error()) + } + } +} diff --git a/pkg/opts/opts.go b/pkg/opts/opts.go new file mode 100644 index 0000000..4c11331 --- /dev/null +++ b/pkg/opts/opts.go @@ -0,0 +1,3 @@ +package opts + +var Verbose bool diff --git a/pkg/recorder/recorder.go b/pkg/recorder/recorder.go new file mode 100644 index 0000000..a1641e3 --- /dev/null +++ b/pkg/recorder/recorder.go @@ -0,0 +1,98 @@ +package recorder + +import ( + "bytes" + "context" + "fmt" + "github.com/racing-telemetry/f1-dump/internal" + "github.com/racing-telemetry/f1-dump/internal/udp" + "github.com/racing-telemetry/f1-dump/pkg/opts" + "log" + "net" + "os" + "sync" + "time" +) + +type Recorder struct { + sync.Mutex + + buf *bytes.Buffer + serv *udp.Client + + ctx context.Context + stop context.CancelFunc + + Stats *udp.Counter +} + +func NewRecorder(addr *net.UDPAddr) (*Recorder, error) { + serv, err := udp.Serve(addr) + if err != nil { + return nil, err + } + + return newRecorder(serv), nil +} + +func newRecorder(serv *udp.Client) *Recorder { + ctx, fn := context.WithCancel(context.Background()) + return &Recorder{ + Stats: new(udp.Counter), + buf: new(bytes.Buffer), + serv: serv, + ctx: ctx, + stop: fn, + } +} + +func (r *Recorder) Start() { + for { + select { + case <-r.ctx.Done(): + return + default: + } + + buf, err := r.serv.ReadSocket() + if err != nil { + if opts.Verbose { + log.Println(err) + } + + r.Stats.IncErr() + continue + } + + r.Stats.IncRecv() + + r.buf.Grow(internal.BufferSize) + r.buf.Write(buf) + } +} + +func (r *Recorder) Stop() { + r.stop() +} + +func (r *Recorder) Save(file string) (*os.File, error) { + if r.buf.Len() == 0 { + return nil, fmt.Errorf("no data found to save") + } + + if file == "" { + file = fmt.Sprintf(internal.OutFileFormat, time.Now().Unix()) + } + + f, err := os.Create(file) + if err != nil { + return nil, err + } + + err = os.WriteFile(file, r.buf.Bytes(), 0644) + if err != nil { + return nil, err + } + + return f, nil +}