Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix existing local file integration tests for streaming writes flow. #2902

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion tools/integration_tests/local_file/local_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ func TestMain(m *testing.M) {
// Not setting config file explicitly with 'create-empty-file: false' as it is default.
flagsSet := [][]string{
{"--implicit-dirs=true", "--rename-dir-limit=3"},
{"--implicit-dirs=false", "--rename-dir-limit=3"}}
{"--implicit-dirs=false", "--rename-dir-limit=3"},
{"--enable-streaming-writes=true", "--write-block-size-mb=2", "--write-max-blocks-per-file=2"},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we combine this with implicit-dirs false tests in favor of saving some test execution time.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean if we add "--implicit-dirs=false" flag to the mount then it makes the mounting faster because it doesn't have to list recursive directories? Or is there other reason for fast test execution in this case?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I meant instead of having 3 flagsets to run the tests with, we can just have 2 by combining implicit dirs false(default) with streaming writes tests

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay got it we keep less flagsets combination.

{"--enable-streaming-writes=true", "--write-block-size-mb=2", "--write-max-blocks-per-file=2", "create-empty-file: true"},
meet2mky marked this conversation as resolved.
Show resolved Hide resolved
}

if hnsFlagSet, err := setup.AddHNSFlagForHierarchicalBucket(ctx, storageClient); err == nil {
flagsSet = append(flagsSet, hnsFlagSet)
Expand Down
15 changes: 9 additions & 6 deletions tools/integration_tests/local_file/read_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@ func TestReadLocalFile(t *testing.T) {
WritingToLocalFileShouldNotWriteToGCS(ctx, storageClient, fh, testDirName, FileName1, t)
WritingToLocalFileShouldNotWriteToGCS(ctx, storageClient, fh, testDirName, FileName1, t)

// Read the local file contents.
buf := make([]byte, len(content))
n, err := fh.ReadAt(buf, 0)
if err != nil || len(content) != n || content != string(buf) {
t.Fatalf("Read file operation failed on local file: %v "+
"Expected content: %s, Got Content: %s", err, content, string(buf))
// mounts with streaming writes disabled support read operation.
if !setup.StreamingWritesEnabled() {
// Read the local file contents.
buf := make([]byte, len(content))
n, err := fh.ReadAt(buf, 0)
if err != nil || len(content) != n || content != string(buf) {
t.Fatalf("Read file operation failed on local file: %v "+
"Expected content: %s, Got Content: %s", err, content, string(buf))
}
}

// Close the file and validate that the file is created on GCS.
Expand Down
51 changes: 44 additions & 7 deletions tools/integration_tests/local_file/stat_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
. "github.com/googlecloudplatform/gcsfuse/v2/tools/integration_tests/util/client"
"github.com/googlecloudplatform/gcsfuse/v2/tools/integration_tests/util/operations"
"github.com/googlecloudplatform/gcsfuse/v2/tools/integration_tests/util/setup"
"github.com/stretchr/testify/require"
)

func TestStatOnLocalFile(t *testing.T) {
Expand Down Expand Up @@ -57,7 +58,7 @@ func TestStatOnLocalFileWithConflictingFileNameSuffix(t *testing.T) {
FileName1, "", t)
}

func TestTruncateLocalFile(t *testing.T) {
func TestTruncateLocalFileToSmallerSize(t *testing.T) {
testDirPath = setup.SetupTestDirectory(testDirName)
// Create a local file.
filePath, fh := CreateLocalFileInTestDir(ctx, storageClient, testDirPath, FileName1, t)
Expand All @@ -67,17 +68,53 @@ func TestTruncateLocalFile(t *testing.T) {
// Stat the file to validate if new contents are written.
operations.VerifyStatFile(filePath, SizeOfFileContents, FilePerms, t)

// Truncate the file to update the file size.
err := os.Truncate(filePath, SizeTruncate)
if err != nil {
t.Fatalf("os.Truncate err: %v", err)
// Truncate the file to update file size to smaller file size.
err := os.Truncate(filePath, SmallerSizeTruncate)
var expectTruncatedSize int64 = SizeOfFileContents
var expectedContent = FileContents
if setup.StreamingWritesEnabled() {
// Mounts with streaming writes do not supporting truncating files to smaller.
require.Error(t, err)
} else {
if err != nil {
t.Fatalf("os.Truncate err: %v", err)
}
expectTruncatedSize = SmallerSizeTruncate
expectedContent = FileContents[:SmallerSizeTruncate]
}

ValidateObjectNotFoundErrOnGCS(ctx, storageClient, testDirName, FileName1, t)

// Stat the file to validate if file is truncated correctly.
operations.VerifyStatFile(filePath, SizeTruncate, FilePerms, t)
operations.VerifyStatFile(filePath, expectTruncatedSize, FilePerms, t)

// Close the file and validate that the file is created on GCS.
CloseFileAndValidateContentFromGCS(ctx, storageClient, fh, testDirName,
FileName1, "testS", t)
FileName1, expectedContent, t)
}

func TestTruncateLocalFileToBiggerSize(t *testing.T) {
meet2mky marked this conversation as resolved.
Show resolved Hide resolved
testDirPath = setup.SetupTestDirectory(testDirName)
// Create a local file.
filePath, fh := CreateLocalFileInTestDir(ctx, storageClient, testDirPath, FileName1, t)
// Writing contents to local file .
WritingToLocalFileShouldNotWriteToGCS(ctx, storageClient, fh, testDirName, FileName1, t)

// Stat the file to validate if new contents are written.
operations.VerifyStatFile(filePath, SizeOfFileContents, FilePerms, t)

// Truncate the file to update file size to bigger file size.
err := os.Truncate(filePath, BiggerSizeTruncate)
if err != nil {
t.Fatalf("os.Truncate err: %v", err)
}

ValidateObjectNotFoundErrOnGCS(ctx, storageClient, testDirName, FileName1, t)

// Stat the file to validate if file is truncated correctly.
operations.VerifyStatFile(filePath, BiggerSizeTruncate, FilePerms, t)

// Close file and validate that file of expected size is created on GCS.
CloseFileAndValidateSizeFromGCS(ctx, storageClient, fh, testDirName,
FileName1, BiggerSizeTruncate, t)
}
11 changes: 8 additions & 3 deletions tools/integration_tests/local_file/sym_link_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/googlecloudplatform/gcsfuse/v2/tools/integration_tests/util/setup"
)

func createAndVerifySymLink(t *testing.T) (filePath, symlink string, fh *os.File) {
func createAndVerifySymLink(streamingWritesEnabled bool, t *testing.T) (filePath, symlink string, fh *os.File) {
testDirPath = setup.SetupTestDirectory(testDirName)
// Create a local file.
filePath, fh = CreateLocalFileInTestDir(ctx, storageClient, testDirPath, FileName1, t)
Expand All @@ -38,18 +38,23 @@ func createAndVerifySymLink(t *testing.T) (filePath, symlink string, fh *os.File

// Read the link.
operations.VerifyReadLink(filePath, symlink, t)
if streamingWritesEnabled {
// Mounts with streaming writes do not support reading files.
return
}
operations.VerifyReadFile(symlink, FileContents, t)
return
}

func TestCreateSymlinkForLocalFile(t *testing.T) {
_, _, fh := createAndVerifySymLink(t)
_, _, fh := createAndVerifySymLink(setup.StreamingWritesEnabled(), t)
// Close the file and validate that the file is created on GCS.
CloseFileAndValidateContentFromGCS(ctx, storageClient, fh, testDirName,
FileName1, FileContents, t)
}

func TestReadSymlinkForDeletedLocalFile(t *testing.T) {
filePath, symlink, fh := createAndVerifySymLink(t)
filePath, symlink, fh := createAndVerifySymLink(setup.StreamingWritesEnabled(), t)
// Remove filePath and then close the fileHandle to avoid syncing to GCS.
operations.RemoveFile(filePath)
operations.CloseFileShouldNotThrowError(fh, t)
Expand Down
9 changes: 8 additions & 1 deletion tools/integration_tests/local_file/write_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,16 @@ func TestRandomWritesToLocalFile(t *testing.T) {

// Write some contents to file randomly.
operations.WriteAt("string1", 0, fh, t)
ValidateObjectNotFoundErrOnGCS(ctx, storageClient, testDirName, FileName1, t)
operations.WriteAt("string2", 2, fh, t)
if setup.StreamingWritesEnabled() {
// First out of order write ensures the existing sequentially written data is uploaded
// to GCS when streaming writes are enabled.
ValidateObjectContentsFromGCS(ctx, storageClient, testDirName, FileName1, "string1", t)
} else {
ValidateObjectNotFoundErrOnGCS(ctx, storageClient, testDirName, FileName1, t)
}
operations.WriteAt("string3", 3, fh, t)
ValidateObjectNotFoundErrOnGCS(ctx, storageClient, testDirName, FileName1, t)

// Close the file and validate that the file is created on GCS.
CloseFileAndValidateContentFromGCS(ctx, storageClient, fh, testDirName,
Expand Down
21 changes: 20 additions & 1 deletion tools/integration_tests/util/client/gcs_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ const (
GCSFileContent = "GCSteststring"
GCSFileSize = 13
FilePerms = 0644
SizeTruncate = 5
SmallerSizeTruncate = 5
BiggerSizeTruncate = 15
NewFileName = "newName"
NewDirName = "newDirName"
)
Expand Down Expand Up @@ -79,6 +80,18 @@ func ValidateObjectContentsFromGCS(ctx context.Context, storageClient *storage.C
}
}

func ValidateObjectSizeFromGCS(ctx context.Context, storageClient *storage.Client,
testDirName string, fileName string, expectedSize int64, t *testing.T) {
gotContent, err := ReadObjectFromGCS(ctx, storageClient, path.Join(testDirName, fileName))
if err != nil {
t.Fatalf("Error while reading file from GCS, Err: %v", err)
}

if int64(len(gotContent)) != expectedSize {
t.Fatalf("GCS file %s size mismatch. Got file size: %d, Expected file size: %d ", fileName, len(gotContent), expectedSize)
}
}

func ValidateObjectChunkFromGCS(ctx context.Context, storageClient *storage.Client,
testDirName string, fileName string, offset, size int64, expectedContent string,
t *testing.T) {
Expand All @@ -100,6 +113,12 @@ func CloseFileAndValidateContentFromGCS(ctx context.Context, storageClient *stor
ValidateObjectContentsFromGCS(ctx, storageClient, testDirName, fileName, content, t)
}

func CloseFileAndValidateSizeFromGCS(ctx context.Context, storageClient *storage.Client,
fh *os.File, testDirName, fileName string, size int64, t *testing.T) {
operations.CloseFileShouldNotThrowError(fh, t)
ValidateObjectSizeFromGCS(ctx, storageClient, testDirName, fileName, size, t)
}

func CreateLocalFileInTestDir(ctx context.Context, storageClient *storage.Client,
testDirPath, fileName string, t *testing.T) (string, *os.File) {
filePath := path.Join(testDirPath, fileName)
Expand Down
3 changes: 2 additions & 1 deletion tools/integration_tests/util/mounting/mounting.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ func MountGcsfuse(binaryFile string, flags []string) error {
binaryFile,
flags...,
)

// Sets to true iff current mount operation is using streaming writes
setup.SetStreamingWritesEnabled(flags)
// Adding mount command in LogFile
file, err := os.OpenFile(setup.LogFile(), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
Expand Down
29 changes: 22 additions & 7 deletions tools/integration_tests/util/setup/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,18 @@ const (
DirPermission_0755 = 0755
Charset = "abcdefghijklmnopqrstuvwxyz0123456789"
PathEnvVariable = "PATH"
StreamingWritesFlag = "--enable-streaming-writes=true"
)

var (
binFile string
logFile string
testDir string
mntDir string
sbinFile string
onlyDirMounted string
dynamicBucketMounted string
binFile string
logFile string
testDir string
mntDir string
sbinFile string
onlyDirMounted string
dynamicBucketMounted string
streamingWritesEnabled bool
)

// Run the shell script to prepare the testData in the specified bucket.
Expand Down Expand Up @@ -103,6 +105,19 @@ func LogFile() string {
return logFile
}

func SetStreamingWritesEnabled(flags []string) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to take @ashmeenkaur opinion on how to refactor local_file tests to run for streaming_writes=true scenario.

We have done it this way for streaming_writes package:https://github.com/GoogleCloudPlatform/gcsfuse/blob/master/tools/integration_tests/streaming_writes/default_mount_local_file_test.go

Probably here its too much of work to refactor, lets take a call with Ashmeen

Copy link
Collaborator

@ashmeenkaur ashmeenkaur Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edit: Thinking a bit more about it, I think it makes sense to create a common suite and run all these tests for local file without streaming files, local file with streaming writes and empty gcs object with streaming writes since there will be considerable overlap in the 3. That way, instead of having if-else statements in the test, we can have separate tests in the respective test suites for scenarios which differ:
eg:

if setup.StreamingWritesEnabled() {
		// First out of order write ensures the existing sequentially written data is uploaded
		// to GCS when streaming writes are enabled.
		ValidateObjectContentsFromGCS(ctx, storageClient, testDirName, FileName1, "string1", t)
	} else {
		ValidateObjectNotFoundErrOnGCS(ctx, storageClient, testDirName, FileName1, t)
	}
	```

streamingWritesEnabled = false
for _, flag := range flags {
if flag == StreamingWritesFlag {
streamingWritesEnabled = true
}
}
}

func StreamingWritesEnabled() bool {
return streamingWritesEnabled
}

func BinFile() string {
return binFile
}
Expand Down
Loading