Skip to content

Commit

Permalink
No public description
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 574199472
  • Loading branch information
Fleetspeak Team authored and copybara-github committed Oct 17, 2023
1 parent 75a76ba commit d32b1c1
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 19 deletions.
23 changes: 13 additions & 10 deletions fleetspeak/src/server/https/https.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (

"github.com/google/fleetspeak/fleetspeak/src/server/authorizer"
"github.com/google/fleetspeak/fleetspeak/src/server/comms"

cpb "github.com/google/fleetspeak/fleetspeak/src/server/components/proto/fleetspeak_components"
)

const (
Expand Down Expand Up @@ -84,13 +82,14 @@ func (l listener) Accept() (net.Conn, error) {

// Params wraps the parameters required to create an https communicator.
type Params struct {
Listener net.Listener // Where to listen for connections, required.
Cert, Key []byte // x509 encoded certificate and matching private key, required.
Streaming bool // Whether to enable streaming communications.
FrontendConfig *cpb.FrontendConfig // Configure how the frontend identifies and communicates with clients
StreamingLifespan time.Duration // Maximum time to keep a streaming connection open, defaults to 10 min.
StreamingCloseTime time.Duration // How much of StreamingLifespan to allocate to an orderly stream close, defaults to 30 sec.
StreamingJitter time.Duration // Maximum amount of jitter to add to StreamingLifespan.
Listener net.Listener // Where to listen for connections, required.
Cert, Key []byte // x509 encoded certificate and matching private key, required.
Streaming bool // Whether to enable streaming communications.
ClientCertHeader string // Where to locate the client certificate from the request header, if not provided use TLS request.
StreamingLifespan time.Duration // Maximum time to keep a streaming connection open, defaults to 10 min.
StreamingCloseTime time.Duration // How much of StreamingLifespan to allocate to an orderly stream close, defaults to 30 sec.
StreamingJitter time.Duration // Maximum amount of jitter to add to StreamingLifespan.
MaxPerClientBatchProcessors uint32 // Maximum number of concurrent processors for messages coming from a single client.
}

// NewCommunicator creates a Communicator, which listens through l and identifies
Expand All @@ -102,6 +101,10 @@ func NewCommunicator(p Params) (*Communicator, error) {
if p.StreamingCloseTime == 0 {
p.StreamingCloseTime = 30 * time.Second
}
if p.MaxPerClientBatchProcessors == 0 {
p.MaxPerClientBatchProcessors = 10
}

mux := http.NewServeMux()
c, err := tls.X509KeyPair(p.Cert, p.Key)
if err != nil {
Expand Down Expand Up @@ -138,7 +141,7 @@ func NewCommunicator(p Params) (*Communicator, error) {
}
mux.Handle("/message", messageServer{&h})
if p.Streaming {
mux.Handle("/streaming-message", streamingMessageServer{&h})
mux.Handle("/streaming-message", newStreamingMessageServer(&h, p.MaxPerClientBatchProcessors))
}
mux.Handle("/files/", fileServer{&h})

Expand Down
27 changes: 18 additions & 9 deletions fleetspeak/src/server/https/streaming_message_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,17 @@ func writeUint32(res fullResponseWriter, i uint32) error {
return binary.Write(res, binary.LittleEndian, i)
}

func newStreamingMessageServer(c *Communicator, maxPerClientBatchProcessors uint32) *streamingMessageServer {
return &streamingMessageServer{c, maxPerClientBatchProcessors}
}

// messageServer wraps a Communicator in order to handle clients polls.
type streamingMessageServer struct {
*Communicator
maxPerClientBatchProcessors uint32
}

func (s streamingMessageServer) ServeHTTP(res http.ResponseWriter, req *http.Request) {
func (s *streamingMessageServer) ServeHTTP(res http.ResponseWriter, req *http.Request) {
earlyError := func(msg string, status int) {
log.ErrorDepth(1, fmt.Sprintf("%s: %s", http.StatusText(status), msg))
s.fs.StatsCollector().ClientPoll(stats.PollInfo{
Expand Down Expand Up @@ -167,7 +172,7 @@ func (s streamingMessageServer) ServeHTTP(res http.ResponseWriter, req *http.Req
m.cancel()
}

func (s streamingMessageServer) initialPoll(ctx context.Context, addr net.Addr, key crypto.PublicKey, res fullResponseWriter, body *bufio.Reader) (*comms.ConnectionInfo, bool, error) {
func (s *streamingMessageServer) initialPoll(ctx context.Context, addr net.Addr, key crypto.PublicKey, res fullResponseWriter, body *bufio.Reader) (*comms.ConnectionInfo, bool, error) {
ctx, fin := context.WithTimeout(ctx, 3*time.Minute)

pi := stats.PollInfo{
Expand Down Expand Up @@ -266,7 +271,7 @@ func (s streamingMessageServer) initialPoll(ctx context.Context, addr net.Addr,

type streamManager struct {
ctx context.Context
s streamingMessageServer
s *streamingMessageServer

info *comms.ConnectionInfo
res fullResponseWriter
Expand All @@ -293,7 +298,7 @@ func (m *streamManager) readLoop() {

// Number of batches from the same client that will be processed concurrently.
const maxBatchProcessors = 10
batchCh := make(chan *fspb.WrappedContactData, maxBatchProcessors)
batchCh := make(chan *fspb.WrappedContactData, m.s.maxPerClientBatchProcessors)

for {
pi, wcd, err := m.readOne()
Expand All @@ -309,19 +314,23 @@ func (m *streamManager) readLoop() {
return
}

// Increment the counter with every processed message.
cnt++

// This will block if number of concurrent processors is greater than maxBatchProcessors.
batchCh <- wcd
go func() {
// Given that the processing is done concurrently, capture the current counter value in
// the function argument.
go func(curCnt uint64) {
wcd := <-batchCh
if err := m.processOne(wcd); err != nil {
log.Errorf("Error processing message from %v: %v", m.info.Client.ID, err)
return
}
}()
m.out <- &fspb.ContactData{AckIndex: curCnt}
}(cnt)

m.s.fs.StatsCollector().ClientPoll(*pi)
cnt++

m.out <- &fspb.ContactData{AckIndex: cnt}
}
}

Expand Down

0 comments on commit d32b1c1

Please sign in to comment.