-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-38336][state/forst] Avoid data copy during failover for ForSt statebackend #27042
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
77b41af
to
9c4c31e
Compare
9c4c31e
to
a8460bc
Compare
.getSharedStateDirectory(); | ||
FsCheckpointStorageAccess fsCheckpointStorageAccess = | ||
(FsCheckpointStorageAccess) env.getCheckpointStorageAccess(); | ||
remoteJobPath = fsCheckpointStorageAccess.getCheckpointsDirectory(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can only share state from the shared directory, so what exactly the remoteJobPath
means?
: new CopyDataTransferStrategy(forStFlinkFileSystem); | ||
LOG.info( | ||
"Build DataTransferStrategy for Restore: {}, forStFlinkFileSystem: {}, cpSharedFs:{}, recoveryClaimMode:{}", | ||
"Build DataTransferStrategy for Restore: {}, forStFlinkFileSystem: {}, cpSharedFs:{}, isDbUnderSameJobPathFromRestore:{}, recoveryClaimMode:{}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest we make this log as debug level one, as well the one in buildForSnapshot
@Nullable Path remoteBasePath) { | ||
this.localJobPath = localJobPath; | ||
this.localBasePath = localBasePath; | ||
this.localForStPath = localBasePath != null ? new Path(localBasePath, DB_DIR_STRING) : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have localJobPath, localBasePath and localForStPath which is derived from localBasePath.
I was expecting localJobpath to be a subfolder of localBasePath, is this true? Is this the bean to validate that?
I see localBasePath can be null, in this case localForStPath is set to null, but localJobPath can have a value. What does this mean? The code indicates that it is possible to run without Forst but have a . Have I understood this correctly?
|
||
import javax.annotation.Nullable; | ||
|
||
/** Container for ForSt paths. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be worth describing each path here. The text talks of Forst paths , but there are only 2 of the 6 that mention Forst - localForStPath
and remoteForStPath
private static final Logger LOG = LoggerFactory.getLogger(ForStResourceContainer.class); | ||
public static final String DB_DIR_STRING = "db"; | ||
|
||
@Nullable final Path localJobPath; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this not private?
} | ||
|
||
@Override | ||
public String toString() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add equals and hashcode standard methods for beans
if (pathContainer.getRemoteForStPath() != null | ||
&& pathContainer.getLocalForStPath() != null) { | ||
if (cacheBasePath == null && pathContainer.getLocalBasePath() != null) { | ||
cacheBasePath = new Path(pathContainer.getLocalBasePath().getPath(), "cache"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume thr log is incorrect, not
should be now
public void clearDirectories() throws Exception { | ||
if (remoteBasePath != null) { | ||
forStFileSystem.delete(remoteBasePath, true); | ||
if (pathContainer.getRemoteBasePath() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use a variable for pathContainer.getRemoteBasePath() and pathContainer.getLocalBasePath()
public void forceClearRemoteDirectories() throws Exception { | ||
if (remoteBasePath != null && remotePathNewlyCreated) { | ||
clearDirectories(remoteBasePath); | ||
if (pathContainer.getRemoteBasePath() != null && remotePathNewlyCreated) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The forceClear does not seem to force anything, in fact it will only clear the directories in a specific case when remotePathNewlyCreated is set.
optionsContainer.getLocalBasePath(), | ||
optionsContainer.getRemoteBasePath(), | ||
ex); | ||
LOG.warn("Could not delete ForSt: {}.", optionsContainer.getPathContainer(), ex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it seems we lose information here. I read the log entry text before Could not delete ForSt local working directory {}
to means it is only the local directory that could not be deleted and we put out the remote directory for information. It would be good to point to the actual folder that could not be deleted in the new log message - rather than list all the folders and not know which could not be deleted.
Path instanceForStPath = | ||
resourceContainer.getRemoteForStPath() == null | ||
? resourceContainer.getLocalForStPath() | ||
resourceContainer.getPathContainer().getRemoteForStPath() == null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: variable for resourceContainer.getPathContainer()
RecoveryClaimMode.CLAIM, | ||
CopyDataTransferStrategy.class); | ||
|
||
testRestoreStrategyAsExpected( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parameterized test for all these permutations?
@VisibleForTesting | ||
Path getLocalBasePath() { | ||
return optionsContainer.getLocalBasePath(); | ||
return optionsContainer.getPathContainer().getLocalBasePath(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is optionsContainer.getPathContainer() ever null?
What is the purpose of the change
This PR fixes FLINK-38336, by allowing ForSt statebackend to reuse the restored files in failover scenario.
Brief change log
ForStPathContainer
ReusableDataTransferStrategy
if we are in a failover scenarioVerifying this change
This change added tests and can be verified as follows:
DataTransferStrategyTest#testBuildingStrategyAsExpected
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation