Skip to content

Commit

Permalink
Issue #172: Support unix socket for HTTP and WS
Browse files Browse the repository at this point in the history
  • Loading branch information
F1bonacc1 committed Apr 6, 2024
1 parent b76fce6 commit 5a554d3
Show file tree
Hide file tree
Showing 26 changed files with 213 additions and 133 deletions.
48 changes: 23 additions & 25 deletions src/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,40 @@ import (
"github.com/f1bonacc1/process-compose/src/app"
"github.com/gin-gonic/gin"
"github.com/rs/zerolog/log"

"net/http"
"os"
"time"
)

const EnvDebugMode = "PC_DEBUG_MODE"

func StartHttpServer(useLogger bool, port int, project app.IProject) {

if os.Getenv(EnvDebugMode) == "" {
gin.SetMode(gin.ReleaseMode)
useLogger = false
}
func StartHttpServerWithUnixSocket(useLogger bool, unixSocket string, project app.IProject) {
router := getRouter(useLogger, project)
log.Info().Msgf("start UDS http server listening %s", unixSocket)
go func() {
os.Remove(unixSocket)
err := router.RunUnix(unixSocket)
if err != nil {
log.Fatal().Err(err).Msgf("start UDS http server on %s failed", unixSocket)
}
}()
}

handler := NewPcApi(project)
routersInit := InitRoutes(useLogger, handler)
readTimeout := time.Duration(60) * time.Second
writeTimeout := time.Duration(60) * time.Second
func StartHttpServerWithTCP(useLogger bool, port int, project app.IProject) {
router := getRouter(useLogger, project)
endPoint := fmt.Sprintf(":%d", port)
maxHeaderBytes := 1 << 20

server := &http.Server{
Addr: endPoint,
Handler: routersInit,
ReadTimeout: readTimeout,
WriteTimeout: writeTimeout,
MaxHeaderBytes: maxHeaderBytes,
}

log.Info().Msgf("start http server listening %s", endPoint)

go func() {
err := server.ListenAndServe()
err := router.Run(endPoint)
if err != nil {
log.Fatal().Err(err).Msgf("start http server on %s failed", endPoint)
}
}()

}

func getRouter(useLogger bool, project app.IProject) *gin.Engine {
if os.Getenv(EnvDebugMode) == "" {
gin.SetMode(gin.ReleaseMode)
useLogger = false
}
return InitRoutes(useLogger, NewPcApi(project))
}
3 changes: 3 additions & 0 deletions src/api/ws_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ func handleIncoming(ws *websocket.Conn, done chan struct{}) {
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
return
}
if msgType == -1 {
return
}
log.Err(err).Msgf("Failed to read from socket %d", msgType)
return
}
Expand Down
59 changes: 40 additions & 19 deletions src/client/client.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package client

import (
"context"
"fmt"
"github.com/f1bonacc1/process-compose/src/pclog"
"github.com/f1bonacc1/process-compose/src/types"
"net"
"net/http"
"sync"
"time"
Expand All @@ -14,7 +17,6 @@ var (

type PcClient struct {
address string
port int
logLength int
logger *LogClient
errMtx sync.Mutex
Expand All @@ -23,17 +25,36 @@ type PcClient struct {
client *http.Client
}

func NewClient(address string, port, logLength int) *PcClient {
func NewUdsClient(sockPath string, logLength int) *PcClient {

udsClient := &http.Client{
Transport: &http.Transport{
DisableKeepAlives: true,
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, "unix", sockPath)
},
},
}
c := newClient("unix", udsClient, logLength)
c.logger = NewLogClient("unix", sockPath)
return c
}

func NewTcpClient(host string, port, logLength int) *PcClient {
address := fmt.Sprintf("%s:%d", host, port)
c := newClient(address, &http.Client{}, logLength)
c.logger = NewLogClient(address, "")
return c
}

func newClient(address string, client *http.Client, logLength int) *PcClient {
return &PcClient{
address: address,
port: port,
logLength: logLength,
logger: NewLogClient(),
firstError: zeroTime,
isErrored: false,
client: &http.Client{},
client: client,
}

}

func (p *PcClient) ShutDownProject() error {
Expand All @@ -45,15 +66,15 @@ func (p *PcClient) IsRemote() bool {
}

func (p *PcClient) GetHostName() (string, error) {
return getHostName(p.address, p.port)
return p.getHostName()
}

func (p *PcClient) GetLogLength() int {
return p.logLength
}

func (p *PcClient) GetLogsAndSubscribe(name string, observer pclog.LogObserver) error {
return p.logger.ReadProcessLogs(p.address, p.port, name, p.logLength, true, observer)
return p.logger.ReadProcessLogs(name, p.logLength, true, observer)
}

func (p *PcClient) UnSubscribeLogger(name string, observer pclog.LogObserver) error {
Expand All @@ -66,49 +87,49 @@ func (p *PcClient) GetProcessLog(name string, offsetFromEnd, limit int) ([]strin
}

func (p *PcClient) GetLexicographicProcessNames() ([]string, error) {
names, err := GetProcessesName(p.address, p.port)
names, err := p.GetProcessesName()
return names, err
}

func (p *PcClient) GetProcessInfo(name string) (*types.ProcessConfig, error) {
return GetProcessInfo(p.address, p.port, name)
return p.getProcessInfo(name)
}

func (p *PcClient) GetProcessPorts(name string) (*types.ProcessPorts, error) {
return GetProcessPorts(p.address, p.port, name)
return p.getProcessPorts(name)
}

func (p *PcClient) GetProcessState(name string) (*types.ProcessState, error) {
state, err := GetProcessState(p.address, p.port, name)
state, err := p.getProcessState(name)
return state, err
}

func (p *PcClient) GetProcessesState() (*types.ProcessesState, error) {
return GetProcessesState(p.address, p.port)
return p.getProcessesState()
}

func (p *PcClient) StopProcess(name string) error {
return StopProcess(p.address, p.port, name)
return p.stopProcess(name)
}

func (p *PcClient) StopProcesses(names []string) ([]string, error) {
return StopProcesses(p.address, p.port, names)
return p.stopProcesses(names)
}

func (p *PcClient) StartProcess(name string) error {
return StartProcess(p.address, p.port, name)
return p.startProcess(name)
}

func (p *PcClient) RestartProcess(name string) error {
return RestartProcess(p.address, p.port, name)
return p.restartProcess(name)
}

func (p *PcClient) ScaleProcess(name string, scale int) error {
return ScaleProcess(p.address, p.port, name, scale)
return p.scaleProcess(name, scale)
}

func (p *PcClient) IsAlive() error {
return p.logError(isAlive(p.address, p.port))
return p.logError(p.isAlive())
}

func (p *PcClient) ErrorForSecs() int {
Expand Down
30 changes: 22 additions & 8 deletions src/client/logs.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,45 @@
package client

import (
"context"
"fmt"
"github.com/f1bonacc1/process-compose/src/api"
"github.com/gorilla/websocket"
"github.com/rs/zerolog/log"
"io"
"net"
"sync/atomic"
)

type LogClient struct {
ws *websocket.Conn
Format string
isClosed atomic.Bool
ws *websocket.Conn
Format string
isClosed atomic.Bool
socketPath string
address string
}

func NewLogClient() *LogClient {
func NewLogClient(address, socketPath string) *LogClient {
return &LogClient{
Format: "%s",
Format: "%s",
address: address,
socketPath: socketPath,
}
}

func (l *LogClient) ReadProcessLogs(address string, port int, name string, offset int, follow bool, out io.StringWriter) (err error) {
func (l *LogClient) ReadProcessLogs(name string, offset int, follow bool, out io.StringWriter) (err error) {

url := fmt.Sprintf("ws://%s:%d/process/logs/ws?name=%s&offset=%d&follow=%v", address, port, name, offset, follow)
url := fmt.Sprintf("ws://%s/process/logs/ws?name=%s&offset=%d&follow=%v", l.address, name, offset, follow)
log.Info().Msgf("Connecting to %s", url)
l.ws, _, err = websocket.DefaultDialer.Dial(url, nil)

dialer := websocket.DefaultDialer
if l.address == "unix" {
dialer.NetDialContext = func(ctx context.Context, _, _ string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, l.address, l.socketPath)
}
}
l.ws, _, err = dialer.Dial(url, nil)

if err != nil {
log.Error().Msgf("failed to dial to %s error: %v", url, err)
return err
Expand Down
29 changes: 14 additions & 15 deletions src/client/processes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ import (
"fmt"
"github.com/f1bonacc1/process-compose/src/types"
"github.com/rs/zerolog/log"
"net/http"
"sort"
)

func GetProcessesName(address string, port int) ([]string, error) {
states, err := GetProcessesState(address, port)
func (p *PcClient) GetProcessesName() ([]string, error) {
states, err := p.getProcessesState()
if err != nil {
return nil, err
}
Expand All @@ -22,9 +21,9 @@ func GetProcessesName(address string, port int) ([]string, error) {
return procs, nil
}

func GetProcessesState(address string, port int) (*types.ProcessesState, error) {
url := fmt.Sprintf("http://%s:%d/processes", address, port)
resp, err := http.Get(url)
func (p *PcClient) getProcessesState() (*types.ProcessesState, error) {
url := fmt.Sprintf("http://%s/processes", p.address)
resp, err := p.client.Get(url)
if err != nil {
return nil, err
}
Expand All @@ -40,9 +39,9 @@ func GetProcessesState(address string, port int) (*types.ProcessesState, error)
return &sResp, nil
}

func GetProcessState(address string, port int, name string) (*types.ProcessState, error) {
url := fmt.Sprintf("http://%s:%d/process/%s", address, port, name)
resp, err := http.Get(url)
func (p *PcClient) getProcessState(name string) (*types.ProcessState, error) {
url := fmt.Sprintf("http://%s/process/%s", p.address, name)
resp, err := p.client.Get(url)
if err != nil {
return nil, err
}
Expand All @@ -58,9 +57,9 @@ func GetProcessState(address string, port int, name string) (*types.ProcessState
return &sResp, nil
}

func GetProcessInfo(address string, port int, name string) (*types.ProcessConfig, error) {
url := fmt.Sprintf("http://%s:%d/process/info/%s", address, port, name)
resp, err := http.Get(url)
func (p *PcClient) getProcessInfo(name string) (*types.ProcessConfig, error) {
url := fmt.Sprintf("http://%s/process/info/%s", p.address, name)
resp, err := p.client.Get(url)
if err != nil {
return nil, err
}
Expand All @@ -76,9 +75,9 @@ func GetProcessInfo(address string, port int, name string) (*types.ProcessConfig
return &sResp, nil
}

func GetProcessPorts(address string, port int, name string) (*types.ProcessPorts, error) {
url := fmt.Sprintf("http://%s:%d/process/ports/%s", address, port, name)
resp, err := http.Get(url)
func (p *PcClient) getProcessPorts(name string) (*types.ProcessPorts, error) {
url := fmt.Sprintf("http://%s/process/ports/%s", p.address, name)
resp, err := p.client.Get(url)
if err != nil {
return nil, err
}
Expand Down
10 changes: 7 additions & 3 deletions src/client/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

func (p *PcClient) shutDownProject() error {
url := fmt.Sprintf("http://%s:%d/project/stop/", p.address, p.port)
url := fmt.Sprintf("http://%s/project/stop/", p.address)
req, err := http.NewRequest(http.MethodPost, url, nil)
if err != nil {
return err
Expand All @@ -27,12 +27,16 @@ func (p *PcClient) shutDownProject() error {
}

func (p *PcClient) getProjectState(withMemory bool) (*types.ProjectState, error) {
url := fmt.Sprintf("http://%s:%d/project/state/?withMemory=%v", p.address, p.port, withMemory)
resp, err := http.Get(url)
url := fmt.Sprintf("http://%s/project/state/?withMemory=%v", p.address, withMemory)
resp, err := p.client.Get(url)

if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.Error().Msgf("failed to get project state - unexpected status code: %s", resp.Status)
}
var sResp types.ProjectState

//Decode the data
Expand Down
6 changes: 3 additions & 3 deletions src/client/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"net/http"
)

func RestartProcess(address string, port int, name string) error {
url := fmt.Sprintf("http://%s:%d/process/restart/%s", address, port, name)
resp, err := http.Post(url, "application/json", nil)
func (p *PcClient) restartProcess(name string) error {
url := fmt.Sprintf("http://%s/process/restart/%s", p.address, name)
resp, err := p.client.Post(url, "application/json", nil)
if err != nil {
return err
}
Expand Down
7 changes: 3 additions & 4 deletions src/client/scale_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ import (
"net/http"
)

func ScaleProcess(address string, port int, name string, scale int) error {
url := fmt.Sprintf("http://%s:%d/process/scale/%s/%d", address, port, name, scale)
client := &http.Client{}
func (p *PcClient) scaleProcess(name string, scale int) error {
url := fmt.Sprintf("http://%s/process/scale/%s/%d", p.address, name, scale)
req, err := http.NewRequest(http.MethodPatch, url, nil)
if err != nil {
return err
}
resp, err := client.Do(req)
resp, err := p.client.Do(req)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 5a554d3

Please sign in to comment.