Skip to content

Commit c3507f5

Browse files
authored
[bugfix](hive)fix after insert overwrite hive table, data error (#43049)
### What problem does this PR solve? 1. Different remoteFs should correspond to different nativeFs. 2. If it is s3, we do not need to delete the stage directory. 3. When an error occurs when deleting a directory, we need to roll back.
1 parent 2ad6cb7 commit c3507f5

File tree

5 files changed

+48
-14
lines changed

5 files changed

+48
-14
lines changed

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java

+22-8
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public class HMSTransaction implements Transaction {
101101
private final Executor fileSystemExecutor;
102102
private HmsCommitter hmsCommitter;
103103
private List<THivePartitionUpdate> hivePartitionUpdates = Lists.newArrayList();
104-
private String declaredIntentionsToWrite;
104+
private Optional<String> stagingDirectory;
105105
private boolean isMockedPartitionUpdate = false;
106106

107107
private static class UncompletedMpuPendingUpload {
@@ -184,10 +184,14 @@ public void rollback() {
184184
}
185185

186186
public void beginInsertTable(HiveInsertCommandContext ctx) {
187-
declaredIntentionsToWrite = ctx.getWritePath();
188187
queryId = ctx.getQueryId();
189188
isOverwrite = ctx.isOverwrite();
190189
fileType = ctx.getFileType();
190+
if (fileType == TFileType.FILE_S3) {
191+
stagingDirectory = Optional.empty();
192+
} else {
193+
stagingDirectory = Optional.of(ctx.getWritePath());
194+
}
191195
}
192196

193197
public void finishInsertTable(SimpleTableInfo tableInfo) {
@@ -207,10 +211,12 @@ public void finishInsertTable(SimpleTableInfo tableInfo) {
207211
}
208212
});
209213
} else {
210-
fs.makeDir(declaredIntentionsToWrite);
211-
setLocation(new THiveLocationParams() {{
212-
setWritePath(declaredIntentionsToWrite);
213-
}
214+
stagingDirectory.ifPresent((v) -> {
215+
fs.makeDir(v);
216+
setLocation(new THiveLocationParams() {{
217+
setWritePath(v);
218+
}
219+
});
214220
});
215221
}
216222
}
@@ -643,15 +649,23 @@ private void recursiveDeleteItems(Path directory, boolean deleteEmptyDir, boolea
643649
if (!deleteResult.getNotDeletedEligibleItems().isEmpty()) {
644650
LOG.warn("Failed to delete directory {}. Some eligible items can't be deleted: {}.",
645651
directory.toString(), deleteResult.getNotDeletedEligibleItems());
652+
throw new RuntimeException(
653+
"Failed to delete directory for files: " + deleteResult.getNotDeletedEligibleItems());
646654
} else if (deleteEmptyDir && !deleteResult.dirNotExists()) {
647655
LOG.warn("Failed to delete directory {} due to dir isn't empty", directory.toString());
656+
throw new RuntimeException("Failed to delete directory for empty dir: " + directory.toString());
648657
}
649658
}
650659

651660
private DeleteRecursivelyResult recursiveDeleteFiles(Path directory, boolean deleteEmptyDir, boolean reverse) {
652661
try {
653-
if (!fs.directoryExists(directory.toString()).ok()) {
662+
Status status = fs.directoryExists(directory.toString());
663+
if (status.getErrCode().equals(Status.ErrCode.NOT_FOUND)) {
654664
return new DeleteRecursivelyResult(true, ImmutableList.of());
665+
} else if (!status.ok()) {
666+
ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder();
667+
notDeletedEligibleItems.add(directory.toString() + "/*");
668+
return new DeleteRecursivelyResult(false, notDeletedEligibleItems.build());
655669
}
656670
} catch (Exception e) {
657671
ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder();
@@ -1447,7 +1461,7 @@ private void doUpdateStatisticsTasks() {
14471461
}
14481462

14491463
private void pruneAndDeleteStagingDirectories() {
1450-
recursiveDeleteItems(new Path(declaredIntentionsToWrite), true, false);
1464+
stagingDirectory.ifPresent((v) -> recursiveDeleteItems(new Path(v), true, false));
14511465
}
14521466

14531467
private void abortMultiUploads() {

fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java

+9
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.doris.common.CustomThreadFactory;
2121

22+
import com.google.common.collect.Sets;
2223
import org.apache.hadoop.fs.FileSystem;
2324
import org.apache.logging.log4j.LogManager;
2425
import org.apache.logging.log4j.Logger;
@@ -27,6 +28,7 @@
2728
import java.lang.ref.PhantomReference;
2829
import java.lang.ref.Reference;
2930
import java.lang.ref.ReferenceQueue;
31+
import java.util.Set;
3032
import java.util.concurrent.ConcurrentHashMap;
3133
import java.util.concurrent.Executors;
3234
import java.util.concurrent.ScheduledExecutorService;
@@ -63,6 +65,8 @@ public class RemoteFSPhantomManager {
6365
private static final ConcurrentHashMap<PhantomReference<RemoteFileSystem>, FileSystem> referenceMap
6466
= new ConcurrentHashMap<>();
6567

68+
private static final Set<FileSystem> fsSet = Sets.newConcurrentHashSet();
69+
6670
// Flag indicating whether the cleanup thread has been started
6771
private static final AtomicBoolean isStarted = new AtomicBoolean(false);
6872

@@ -77,9 +81,13 @@ public static void registerPhantomReference(RemoteFileSystem remoteFileSystem) {
7781
start();
7882
isStarted.set(true);
7983
}
84+
if (fsSet.contains(remoteFileSystem.dfsFileSystem)) {
85+
throw new RuntimeException("FileSystem already exists: " + remoteFileSystem.dfsFileSystem.getUri());
86+
}
8087
RemoteFileSystemPhantomReference phantomReference = new RemoteFileSystemPhantomReference(remoteFileSystem,
8188
referenceQueue);
8289
referenceMap.put(phantomReference, remoteFileSystem.dfsFileSystem);
90+
fsSet.add(remoteFileSystem.dfsFileSystem);
8391
}
8492

8593
/**
@@ -102,6 +110,7 @@ public static void start() {
102110
if (fs != null) {
103111
try {
104112
fs.close();
113+
fsSet.remove(fs);
105114
LOG.info("Closed file system: {}", fs.getUri());
106115
} catch (IOException e) {
107116
LOG.warn("Failed to close file system", e);

fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import org.apache.doris.analysis.StorageBackend;
2121
import org.apache.doris.backup.Status;
2222
import org.apache.doris.common.UserException;
23+
import org.apache.doris.common.security.authentication.AuthenticationConfig;
24+
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
2325
import org.apache.doris.datasource.property.PropertyConverter;
2426
import org.apache.doris.fs.obj.S3ObjStorage;
2527
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
@@ -34,6 +36,7 @@
3436
import org.apache.logging.log4j.Logger;
3537

3638
import java.io.FileNotFoundException;
39+
import java.io.IOException;
3740
import java.util.List;
3841
import java.util.Map;
3942

@@ -74,12 +77,20 @@ protected FileSystem nativeFileSystem(String remotePath) throws UserException {
7477
PropertyConverter.convertToHadoopFSProperties(properties).entrySet().stream()
7578
.filter(entry -> entry.getKey() != null && entry.getValue() != null)
7679
.forEach(entry -> conf.set(entry.getKey(), entry.getValue()));
80+
AuthenticationConfig authConfig = AuthenticationConfig.getKerberosConfig(conf);
81+
HadoopAuthenticator authenticator = HadoopAuthenticator.getHadoopAuthenticator(authConfig);
7782
try {
78-
dfsFileSystem = FileSystem.get(new Path(remotePath).toUri(), conf);
83+
dfsFileSystem = authenticator.doAs(() -> {
84+
try {
85+
return FileSystem.get(new Path(remotePath).toUri(), conf);
86+
} catch (IOException e) {
87+
throw new RuntimeException(e);
88+
}
89+
});
90+
RemoteFSPhantomManager.registerPhantomReference(this);
7991
} catch (Exception e) {
8092
throw new UserException("Failed to get S3 FileSystem for " + e.getMessage(), e);
8193
}
82-
RemoteFSPhantomManager.registerPhantomReference(this);
8394
}
8495
}
8596
}

fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,11 @@ public FileSystem nativeFileSystem(String remotePath) throws UserException {
9999
throw new RuntimeException(e);
100100
}
101101
});
102+
operations = new HDFSFileOperations(dfsFileSystem);
103+
RemoteFSPhantomManager.registerPhantomReference(this);
102104
} catch (Exception e) {
103-
throw new UserException(e);
105+
throw new UserException("Failed to get dfs FileSystem for " + e.getMessage(), e);
104106
}
105-
operations = new HDFSFileOperations(dfsFileSystem);
106-
RemoteFSPhantomManager.registerPhantomReference(this);
107107
}
108108
}
109109
}

fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public void bindDataSink(Optional<InsertCommandContext> insertCtx)
133133
if (insertCtx.isPresent()) {
134134
HiveInsertCommandContext context = (HiveInsertCommandContext) insertCtx.get();
135135
tSink.setOverwrite(context.isOverwrite());
136-
context.setWritePath(storageLocation);
136+
context.setWritePath(location);
137137
context.setFileType(fileType);
138138
}
139139
} else {

0 commit comments

Comments
 (0)