Skip to content

Commit

Permalink
chore: kafka test ssh
Browse files Browse the repository at this point in the history
  • Loading branch information
fracasula committed Jan 17, 2024
1 parent bb06daa commit ac335ce
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 12 deletions.
101 changes: 89 additions & 12 deletions testhelper/docker/resource/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"net"
"os"
"path/filepath"
"testing"
Expand All @@ -24,6 +25,9 @@ import (
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"
"github.com/stretchr/testify/require"
"golang.org/x/crypto/ssh"

"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/sshserver"
)

func TestResource(t *testing.T) {
Expand Down Expand Up @@ -69,14 +73,6 @@ func TestWithSASL(t *testing.T) {
pool, err := dockertest.NewPool("")
require.NoError(t, err)

network, err := pool.Client.CreateNetwork(dc.CreateNetworkOptions{Name: "test_network"})
require.NoError(t, err)
t.Cleanup(func() {
if err := pool.Client.RemoveNetwork(network.ID); err != nil {
t.Logf("Error while removing Docker network: %v", err)
}
})

path, err := os.Getwd()
require.NoError(t, err)

Expand All @@ -94,10 +90,7 @@ func TestWithSASL(t *testing.T) {
for _, hashType := range hashTypes {
t.Run(hashType, func(t *testing.T) {
var mechanism sasl.Mechanism
containerOptions := []Option{
WithBrokers(1),
WithNetwork(network),
}
containerOptions := []Option{WithBrokers(1)}

switch hashType {
case "scramPlainText":
Expand Down Expand Up @@ -247,6 +240,90 @@ func TestAvroSchemaRegistry(t *testing.T) {
consumeUserMsg(t, deser)
}

func TestSSH(t *testing.T) {
pool, err := dockertest.NewPool("")
require.NoError(t, err)

// Start shared Docker network
network, err := pool.Client.CreateNetwork(dc.CreateNetworkOptions{Name: "kafka_network"})
require.NoError(t, err)
t.Cleanup(func() {
if err := pool.Client.RemoveNetwork(network.ID); err != nil {
t.Logf("Error while removing Docker network: %v", err)
}
})

// Start Kafka cluster with ZooKeeper and three brokers
_, err = Setup(pool, t,
WithBrokers(1),
WithNetwork(network),
WithoutDockerHostListeners(),
)
require.NoError(t, err)

// Let's setup the SSH server
publicKeyPath, err := filepath.Abs("./testdata/ssh/test_key.pub")
require.NoError(t, err)
sshServer, err := sshserver.Setup(pool, t,
sshserver.WithPublicKeyPath(publicKeyPath),
sshserver.WithCredentials("linuxserver.io", ""),
sshserver.WithDockerNetwork(network),
)
require.NoError(t, err)
sshServerHost := fmt.Sprintf("localhost:%d", sshServer.Port)
t.Logf("SSH server is listening on %s", sshServerHost)

// Prepare SSH configuration
privateKey, err := os.ReadFile("./testdata/ssh/test_key")
require.NoError(t, err)

signer, err := ssh.ParsePrivateKey(privateKey)
require.NoError(t, err)

sshConfig := &ssh.ClientConfig{
User: "linuxserver.io",
Auth: []ssh.AuthMethod{ssh.PublicKeys(signer)},
Timeout: 10 * time.Second,
HostKeyCallback: ssh.InsecureIgnoreHostKey(), // skipcq: GSC-G106
}
transport := &kafka.Transport{
DialTimeout: 10 * time.Second,
Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
sshClient, err := ssh.Dial("tcp", sshServerHost, sshConfig)
if err != nil {
return nil, fmt.Errorf("cannot dial SSH host %q: %w", sshServerHost, err)
}

conn, err := sshClient.Dial(network, address)
if err != nil {
return nil, fmt.Errorf(
"cannot dial address %q over SSH (host %q): %w", address, sshServerHost, err,
)
}
return conn, nil
},
}

// Setup writer
w := &kafka.Writer{
Addr: kafka.TCP("kafka1:9092"),
Balancer: &kafka.LeastBytes{},
AllowAutoTopicCreation: true,
Transport: transport,
}
t.Cleanup(func() { _ = w.Close() })

require.Eventually(t, func() bool {
err := w.WriteMessages(context.Background(),
kafka.Message{Topic: "my-topic", Key: []byte("foo"), Value: []byte("bar!")},
)
if err != nil {
t.Logf("failed to write messages: %s", err)
}
return err == nil
}, 30*time.Second, 500*time.Millisecond)
}

func registerSchema(
t *testing.T, schemaName, schemaPath string, c schemaregistry.Client,
) (schema string, schemaID int) {
Expand Down
39 changes: 39 additions & 0 deletions testhelper/docker/resource/kafka/testdata/ssh/test_key
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
-----BEGIN OPENSSH PRIVATE KEY-----
b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAABlwAAAAdzc2gtcn
NhAAAAAwEAAQAAAYEA0f/mqkkZ3c9qw8MTz5FoEO3PGecO/dtUFfJ4g1UBu9E7hi/pyVYY
fLfdsd5bqA2pXdU0ROymyVe683I1VzJcihUtwB1eQxP1mUhmoo0ixK0IUUGm4PRieCGv+r
0/gMvaYbVGUPCi5tAUVh02vZB7p2cTIaz872lvCnRhYbhGUHSbhNSSQOjnCtZfjuZZnE0l
PKjWV/wbJ7Pvoc/FZMlWOqL1AjAKuwFH5zs1RMrPDDv5PCZksq4a7DDxziEdq39jvA3sOm
pQXvzBBBLBOzu7rM3/MPJb6dvAGJcYxkptfL4YXTscIMINr0g24cn+Thvt9yqA93rkb9RB
kw6RIEwMlQKqserA+pfsaoW0SkvnlDKzS1DLwXioL4Uc1Jpr/9jTMEfR+W7v7gJPB1JDnV
gen5FBfiMqbsG1amUS+mjgNfC8I00tR+CUHxpqUWANtcWTinhSnLJ2skj/2QnciPHkHurR
EKyEwCVecgn+xVKyRgVDCGsJ+QnAdn51+i/kO3nvAAAFqENNbN9DTWzfAAAAB3NzaC1yc2
EAAAGBANH/5qpJGd3PasPDE8+RaBDtzxnnDv3bVBXyeINVAbvRO4Yv6clWGHy33bHeW6gN
qV3VNETspslXuvNyNVcyXIoVLcAdXkMT9ZlIZqKNIsStCFFBpuD0Ynghr/q9P4DL2mG1Rl
DwoubQFFYdNr2Qe6dnEyGs/O9pbwp0YWG4RlB0m4TUkkDo5wrWX47mWZxNJTyo1lf8Gyez
76HPxWTJVjqi9QIwCrsBR+c7NUTKzww7+TwmZLKuGuww8c4hHat/Y7wN7DpqUF78wQQSwT
s7u6zN/zDyW+nbwBiXGMZKbXy+GF07HCDCDa9INuHJ/k4b7fcqgPd65G/UQZMOkSBMDJUC
qrHqwPqX7GqFtEpL55Qys0tQy8F4qC+FHNSaa//Y0zBH0flu7+4CTwdSQ51YHp+RQX4jKm
7BtWplEvpo4DXwvCNNLUfglB8aalFgDbXFk4p4UpyydrJI/9kJ3Ijx5B7q0RCshMAlXnIJ
/sVSskYFQwhrCfkJwHZ+dfov5Dt57wAAAAMBAAEAAAGAd9pxr+ag2LO0353LBMCcgGz5sn
LpX4F6cDw/A9XUc3lrW56k88AroaLe6NFbxoJlk6RHfL8EQg3MKX2Za/bWUgjcX7VjQy11
EtL7oPKkUVPgV1/8+o8AVEgFxDmWsM+oB/QJ+dAdaVaBBNUPlQmNSXHOvX2ZrpqiQXlCyx
79IpYq3JjmEB3dH5ZSW6CkrExrYD+MdhLw/Kv5rISEyI0Qpc6zv1fkB+8nNpXYRTbrDLR9
/xJ6jnBH9V3J5DeKU4MUQ39nrAp6iviyWydB973+MOygpy41fXO6hHyVZ2aSCysn1t6J/K
QdeEjqAOI/5CbdtiFGp06et799EFyzPItW0FKetW1UTOL2YHqdb+Q9sNjiNlUSzgxMbJWJ
RGO6g9B1mJsHl5mJZUiHQPsG/wgBER8VOP4bLOEB6gzVO2GE9HTJTOh5C+eEfrl52wPfXj
TqjtWAnhssxtgmWjkS0ibi+u1KMVXKHfaiqJ7nH0jMx+eu1RpMvuR8JqkU8qdMMGChAAAA
wHkQMfpCnjNAo6sllEB5FwjEdTBBOt7gu6nLQ2O3uGv0KNEEZ/BWJLQ5fKOfBtDHO+kl+5
Qoxc0cE7cg64CyBF3+VjzrEzuX5Tuh4NwrsjT4vTTHhCIbIynxEPmKzvIyCMuglqd/nhu9
6CXhghuTg8NrC7lY+cImiBfhxE32zqNITlpHW7exr95Gz1sML2TRJqxDN93oUFfrEuInx8
HpXXnvMQxPRhcp9nDMU9/ahUamMabQqVVMwKDi8n3sPPzTiAAAAMEA+/hm3X/yNotAtMAH
y11parKQwPgEF4HYkSE0bEe+2MPJmEk4M4PGmmt/MQC5N5dXdUGxiQeVMR+Sw0kN9qZjM6
SIz0YHQFMsxVmUMKFpAh4UI0GlsW49jSpVXs34Fg95AfhZOYZmOcGcYosp0huCeRlpLeIH
7Vv2bkfQaic3uNaVPg7+cXg7zdY6tZlzwa/4Fj0udfTjGQJOPSzIihdMLHnV81rZ2cUOZq
MSk6b02aMpVB4TV0l1w4j2mlF2eGD9AAAAwQDVW6p2VXKuPR7SgGGQgHXpAQCFZPGLYd8K
duRaCbxKJXzUnZBn53OX5fuLlFhmRmAMXE6ztHPN1/5JjwILn+O49qel1uUvzU8TaWioq7
Are3SJR2ZucR4AKUvzUHGP3GWW96xPN8lq+rgb0th1eOSU2aVkaIdeTJhV1iPfaUUf+15S
YcJlSHLGgeqkok+VfuudZ73f3RFFhjoe1oAjlPB4leeMsBD9UBLx2U3xAevnfkecF4Lm83
4sVswWATSFAFsAAAAsYWJoaW1hbnl1YmFiYmFyQEFiaGltYW55dXMtTWFjQm9vay1Qcm8u
bG9jYWwBAgMEBQYH
-----END OPENSSH PRIVATE KEY-----
1 change: 1 addition & 0 deletions testhelper/docker/resource/kafka/testdata/ssh/test_key.pub
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDR/+aqSRndz2rDwxPPkWgQ7c8Z5w7921QV8niDVQG70TuGL+nJVhh8t92x3luoDald1TRE7KbJV7rzcjVXMlyKFS3AHV5DE/WZSGaijSLErQhRQabg9GJ4Ia/6vT+Ay9phtUZQ8KLm0BRWHTa9kHunZxMhrPzvaW8KdGFhuEZQdJuE1JJA6OcK1l+O5lmcTSU8qNZX/Bsns++hz8VkyVY6ovUCMAq7AUfnOzVEys8MO/k8JmSyrhrsMPHOIR2rf2O8Dew6alBe/MEEEsE7O7uszf8w8lvp28AYlxjGSm18vhhdOxwgwg2vSDbhyf5OG+33KoD3euRv1EGTDpEgTAyVAqqx6sD6l+xqhbRKS+eUMrNLUMvBeKgvhRzUmmv/2NMwR9H5bu/uAk8HUkOdWB6fkUF+IypuwbVqZRL6aOA18LwjTS1H4JQfGmpRYA21xZOKeFKcsnaySP/ZCdyI8eQe6tEQrITAJV5yCf7FUrJGBUMIawn5CcB2fnX6L+Q7ee8= abhimanyubabbar@Abhimanyus-MacBook-Pro.local

0 comments on commit ac335ce

Please sign in to comment.