Skip to content

Commit

Permalink
Merge pull request #16 from venture23-aleo/fix/memory-leak
Browse files Browse the repository at this point in the history
Fix for memory leak from Prometheus gatherer
  • Loading branch information
naneey authored Oct 7, 2024
2 parents c6185ce + efd4ab5 commit 5b65220
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 87 deletions.
2 changes: 1 addition & 1 deletion attestor/chainService/config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
name: <releaseIdentifier>_attestor_verulink_<yourCompanyIdentifier>
version: 1.0.0
version: 1.0.1
chains:
- name: aleo
chain_id: 6694886634401
Expand Down
9 changes: 4 additions & 5 deletions attestor/chainService/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ func InitMetrics(cfg config.CollecterServiceConfig, mConfig config.MetricsConfig

httpClient := &http.Client{
Transport: transport,
Timeout: time.Second * 30,
}

host := config.GetConfig().MetricConfig.Host
Expand All @@ -214,19 +215,17 @@ func PushMetrics(ctx context.Context, pusher *push.Pusher, pmetrics *PrometheusM

ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

pusher.Gatherer(pmetrics.Registry)
for {

select {
case <-ctx.Done():
return
case <-ticker.C:
gatherer := prometheus.Gatherers{
pmetrics.Registry,
}
if err := pusher.Gatherer(gatherer).Push(); err != nil {
if err := pusher.Push(); err != nil {
logger.GetLogger().Error("Error pushing metrics to Pushgateway:", zap.Error(err))
}
pmetrics = NewPrometheusMetrics()
}
}

Expand Down
48 changes: 16 additions & 32 deletions attestor/chainService/relay/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,27 +71,20 @@ type collector struct {
uri string
chainIDToAddress map[string]string // chainID: walletAddress
collectorWaitDur time.Duration
caCert *x509.CertPool
attestorCert tls.Certificate
collectorClient *http.Client
}

func (c *collector) CheckCollectorHealth(ctx context.Context) error {

client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: c.caCert,
Certificates: []tls.Certificate{c.attestorCert},
},
},
}
ctx, cncl := context.WithTimeout(ctx, time.Minute)
defer cncl()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.uri, nil)
if err != nil {
return err
}

resp, err := client.Do(req)
resp, err := c.collectorClient.Do(req)
if err != nil {
logger.GetLogger().Error(err.Error())
return err
Expand Down Expand Up @@ -138,22 +131,13 @@ func (c *collector) SendToCollector(ctx context.Context, sp *chain.ScreenedPacke
ctx, cncl := context.WithTimeout(ctx, time.Minute)
defer cncl()

client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: c.caCert,
Certificates: []tls.Certificate{c.attestorCert},
},
},
}

req, err := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), io.NopCloser(buf))
if err != nil {
return err
}

req.Header.Set("content-type", contentType)
resp, err := client.Do(req)
resp, err := c.collectorClient.Do(req)
if err != nil {
return err
}
Expand Down Expand Up @@ -224,20 +208,13 @@ func (c *collector) ReceivePktsFromCollector(ctx context.Context, ch chan<- *cha
u.RawQuery = queryParams.Encode()

ctx, cncl := context.WithTimeout(ctx, time.Minute)
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: c.caCert,
Certificates: []tls.Certificate{c.attestorCert},
},
},
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
goto postFor
}

resp, err = client.Do(req)
resp, err = c.collectorClient.Do(req)
if err != nil {
goto postFor
}
Expand Down Expand Up @@ -288,12 +265,19 @@ func SetupCollector(cfg config.CollecterServiceConfig, chainIDToAddress map[stri
if err != nil {
log.Fatal(err)
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: caCertPool,
Certificates: []tls.Certificate{attestorCert},
},
},
}
collc = collector{
uri: cfg.Uri,
collectorWaitDur: waitTime,
chainIDToAddress: make(map[string]string),
caCert: caCertPool,
attestorCert: attestorCert,
collectorClient: client,
}

for k, v := range chainIDToAddress {
Expand Down
118 changes: 77 additions & 41 deletions attestor/chainService/relay/collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"crypto/x509"
"encoding/json"
"fmt"
"io"
"math/big"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -56,12 +55,19 @@ func TestSendToCollector(t *testing.T) {

attestorCert, _ := tls.LoadX509KeyPair("../../../chainService/.mtls/attestor1.crt",
"../../../chainService/.mtls/attestor1.key")
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: caCertPool,
Certificates: []tls.Certificate{attestorCert},
},
},
}
collec := &collector{
uri: uri,
chainIDToAddress: chainIdToAddress,
collectorWaitDur: time.Second,
caCert: caCertPool,
attestorCert: attestorCert,
collectorClient: client,
}
sp := &chain.ScreenedPacket{
Packet: &chain.Packet{
Expand All @@ -87,10 +93,28 @@ func TestSendToCollector(t *testing.T) {
"2": "aleoaddr",
"1": "ethAddr",
}

caCert, _ := os.ReadFile("../../../chainService/.mtls/ca.cer")

caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

attestorCert, _ := tls.LoadX509KeyPair("../../../chainService/.mtls/attestor1.crt",
"../../../chainService/.mtls/attestor1.key")
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: caCertPool,
Certificates: []tls.Certificate{attestorCert},
},
},
}

collec := &collector{
uri: uri,
chainIDToAddress: chainIdToAddress,
collectorWaitDur: time.Second,
collectorClient: client,
}
sp := &chain.ScreenedPacket{
Packet: &chain.Packet{
Expand Down Expand Up @@ -120,10 +144,26 @@ func TestSendToCollector(t *testing.T) {
"2": "aleoaddr",
"1": "ethAddr",
}
caCert, _ := os.ReadFile("../../../chainService/.mtls/ca.cer")

caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

attestorCert, _ := tls.LoadX509KeyPair("../../../chainService/.mtls/attestor1.crt",
"../../../chainService/.mtls/attestor1.key")
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: caCertPool,
Certificates: []tls.Certificate{attestorCert},
},
},
}
collec := &collector{
uri: uri,
chainIDToAddress: chainIdToAddress,
collectorWaitDur: time.Second,
collectorClient: client,
}
sp := &chain.ScreenedPacket{
Packet: &chain.Packet{
Expand Down Expand Up @@ -202,10 +242,27 @@ func TestGetPktsFromCollector(t *testing.T) {
"2": "aleoaddr",
"1": "ethAddr",
}

caCert, _ := os.ReadFile("../../../chainService/.mtls/ca.cer")

caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

attestorCert, _ := tls.LoadX509KeyPair("../../../chainService/.mtls/attestor1.crt",
"../../../chainService/.mtls/attestor1.key")
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: caCertPool,
Certificates: []tls.Certificate{attestorCert},
},
},
}
collec := &collector{
uri: uri,
chainIDToAddress: chainIdToAddress,
collectorWaitDur: time.Second,
collectorClient: client,
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -238,10 +295,27 @@ func TestGetPktsFromCollector(t *testing.T) {
"2": "aleoaddr",
"1": "ethAddr",
}

caCert, _ := os.ReadFile("../../../chainService/.mtls/ca.cer")

caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

attestorCert, _ := tls.LoadX509KeyPair("../../../chainService/.mtls/attestor1.crt",
"../../../chainService/.mtls/attestor1.key")
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: caCertPool,
Certificates: []tls.Certificate{attestorCert},
},
},
}
collec := &collector{
uri: uri,
chainIDToAddress: chainIdToAddress,
collectorWaitDur: time.Second,
collectorClient: client,
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
Expand All @@ -256,41 +330,3 @@ func TestGetPktsFromCollector(t *testing.T) {
})
}

func TestMTLSIntegration(t *testing.T) {
dbUrl := "https://aleomtls.ibriz.ai/"

caCert, err := os.ReadFile("../../../chainService/ca.cer")
assert.NoError(t, err)

caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

cert, err := tls.LoadX509KeyPair("../../../chainService/attestor-stresstest.crt",
"../../../chainService/attestor-stresstest.key")
assert.NoError(t, err)

client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: caCertPool,
Certificates: []tls.Certificate{cert},
},
},
}

resp, err := client.Get(dbUrl)
if err != nil {
fmt.Println("Connection failed:", err)

}

body, err := io.ReadAll(resp.Body)
fmt.Println("Response Body:", string(body))

if resp.StatusCode != http.StatusOK {
fmt.Println("Bad request :", resp.StatusCode)
}
assert.Equal(t, resp.StatusCode, http.StatusOK)
assert.NoError(t, err)

}
Loading

0 comments on commit 5b65220

Please sign in to comment.