Skip to content

Commit

Permalink
[bugfix](hive)Delete the temporarily created folder (apache#40424)
Browse files Browse the repository at this point in the history
## Proposed changes

Delete the temporarily created folder, otherwise it will cause too many
folders on hdfs:
> The directory item limit of xxx is exceeded: limit=xxx items=xxx
  • Loading branch information
wuwenchi authored Sep 10, 2024
1 parent 948fb2a commit c7f57ae
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1484,6 +1484,8 @@ public void doCommit() {
runS3cleanWhenSuccess();
doAddPartitionsTask();
doUpdateStatisticsTasks();
//delete write path
pruneAndDeleteStagingDirectories();
doNothing();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.datasource.hive;

import org.apache.doris.backup.Status;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.info.SimpleTableInfo;
Expand Down Expand Up @@ -169,15 +170,21 @@ public void testNewPartitionForUnPartitionedTable() throws IOException {
@Test
public void testAppendPartitionForUnPartitionedTable() throws IOException {
genQueryID();
System.out.println(DebugUtil.printId(connectContext.queryId()));
List<THivePartitionUpdate> pus = new ArrayList<>();
pus.add(createRandomAppend(null));
pus.add(createRandomAppend(null));
pus.add(createRandomAppend(null));
new MockUp<HMSTransaction.HmsCommitter>(HMSTransaction.HmsCommitter.class) {
@Mock
private void doNothing() {
Assert.assertEquals(Status.ErrCode.NOT_FOUND, fs.exists(getWritePath()).getErrCode());
}
};
commit(dbName, tbWithoutPartition, pus);
Table table = hmsClient.getTable(dbName, tbWithoutPartition);
assertNumRows(3, table);


genQueryID();
List<THivePartitionUpdate> pus2 = new ArrayList<>();
pus2.add(createRandomAppend(null));
Expand All @@ -204,6 +211,12 @@ public void testOverwritePartitionForUnPartitionedTable() throws IOException {

@Test
public void testNewPartitionForPartitionedTable() throws IOException {
new MockUp<HMSTransaction.HmsCommitter>(HMSTransaction.HmsCommitter.class) {
@Mock
private void doNothing() {
Assert.assertEquals(Status.ErrCode.NOT_FOUND, fs.exists(getWritePath()).getErrCode());
}
};
genQueryID();
List<THivePartitionUpdate> pus = new ArrayList<>();
pus.add(createRandomNew("a"));
Expand Down Expand Up @@ -377,6 +390,11 @@ public THivePartitionUpdate createRandomOverwrite(String partition) throws IOExc
genOnePartitionUpdate("c3=" + partition, TUpdateMode.OVERWRITE);
}

private String getWritePath() {
String queryId = DebugUtil.printId(ConnectContext.get().queryId());
return writeLocation + queryId + "/";
}

public void commit(String dbName,
String tableName,
List<THivePartitionUpdate> hivePUs) {
Expand All @@ -385,7 +403,7 @@ public void commit(String dbName,
HiveInsertCommandContext ctx = new HiveInsertCommandContext();
String queryId = DebugUtil.printId(ConnectContext.get().queryId());
ctx.setQueryId(queryId);
ctx.setWritePath(writeLocation + queryId + "/");
ctx.setWritePath(getWritePath());
hmsTransaction.beginInsertTable(ctx);
hmsTransaction.finishInsertTable(new SimpleTableInfo(dbName, tableName));
hmsTransaction.commit();
Expand Down

0 comments on commit c7f57ae

Please sign in to comment.