Skip to content

Commit f719957

Browse files
authored
[feature](Azure) Support copy into on Azure Blob Storage (#36554)
1 parent 29a9766 commit f719957

File tree

8 files changed

+267
-6
lines changed

8 files changed

+267
-6
lines changed

fe/fe-core/src/main/java/org/apache/doris/analysis/CopyStmt.java

+1
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ private void analyzeStagePB(StagePB stagePB) throws AnalysisException {
205205
}
206206
brokerProperties.put(S3_BUCKET, objInfo.getBucket());
207207
brokerProperties.put(S3_PREFIX, objInfo.getPrefix());
208+
brokerProperties.put(S3Properties.PROVIDER, objInfo.getProvider().toString());
208209
StageProperties stageProperties = new StageProperties(stagePB.getPropertiesMap());
209210
this.copyIntoProperties.mergeProperties(stageProperties);
210211
this.copyIntoProperties.analyze();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.cloud.storage;
19+
20+
import org.apache.doris.common.DdlException;
21+
22+
import com.azure.core.credential.AccessToken;
23+
import com.azure.core.credential.TokenCredential;
24+
import com.azure.core.credential.TokenRequestContext;
25+
import com.azure.core.http.rest.PagedIterable;
26+
import com.azure.core.http.rest.PagedResponse;
27+
import com.azure.core.http.rest.Response;
28+
import com.azure.core.util.Context;
29+
import com.azure.storage.blob.BlobClient;
30+
import com.azure.storage.blob.BlobContainerClient;
31+
import com.azure.storage.blob.BlobContainerClientBuilder;
32+
import com.azure.storage.blob.BlobServiceClient;
33+
import com.azure.storage.blob.batch.BlobBatch;
34+
import com.azure.storage.blob.batch.BlobBatchClient;
35+
import com.azure.storage.blob.batch.BlobBatchClientBuilder;
36+
import com.azure.storage.blob.models.BlobItem;
37+
import com.azure.storage.blob.models.BlobProperties;
38+
import com.azure.storage.blob.models.BlobStorageException;
39+
import com.azure.storage.blob.models.ListBlobsOptions;
40+
import com.azure.storage.blob.models.UserDelegationKey;
41+
import com.azure.storage.blob.sas.BlobContainerSasPermission;
42+
import com.azure.storage.blob.sas.BlobSasPermission;
43+
import com.azure.storage.blob.sas.BlobServiceSasSignatureValues;
44+
import com.azure.storage.common.StorageSharedKeyCredential;
45+
import com.azure.storage.common.sas.SasProtocol;
46+
import com.google.common.collect.Lists;
47+
import org.apache.commons.lang3.tuple.Triple;
48+
import org.apache.http.HttpStatus;
49+
import org.apache.logging.log4j.LogManager;
50+
import org.apache.logging.log4j.Logger;
51+
import reactor.core.publisher.Mono;
52+
53+
import java.time.OffsetDateTime;
54+
import java.util.ArrayList;
55+
import java.util.List;
56+
57+
public class AzureRemote extends RemoteBase {
58+
59+
private static final Logger LOG = LogManager.getLogger(AzureRemote.class);
60+
61+
private static final String URI_TEMPLATE = "https://%s.blob.core.windows.net/%s";
62+
63+
private BlobContainerClient client;
64+
65+
public AzureRemote(ObjectInfo obj) {
66+
super(obj);
67+
}
68+
69+
@Override
70+
public String getPresignedUrl(String fileName) {
71+
try {
72+
BlobContainerClientBuilder builder = new BlobContainerClientBuilder();
73+
builder.credential(new StorageSharedKeyCredential(obj.getAk(), obj.getSk()));
74+
String containerName = obj.getBucket();
75+
String uri = String.format(URI_TEMPLATE, obj.getAk(),
76+
containerName);
77+
builder.endpoint(uri);
78+
BlobContainerClient containerClient = builder.buildClient();
79+
80+
BlobClient blobClient = containerClient.getBlobClient(normalizePrefix(fileName));
81+
82+
OffsetDateTime expiryTime = OffsetDateTime.now().plusSeconds(SESSION_EXPIRE_SECOND);
83+
BlobSasPermission permission = new BlobSasPermission()
84+
.setReadPermission(true)
85+
.setWritePermission(true)
86+
.setDeletePermission(true);
87+
88+
BlobServiceSasSignatureValues sasValues = new BlobServiceSasSignatureValues(expiryTime, permission)
89+
.setProtocol(SasProtocol.HTTPS_ONLY)
90+
.setStartTime(OffsetDateTime.now());
91+
92+
String sasToken = blobClient.generateSas(sasValues);
93+
return blobClient.getBlobUrl() + "?" + sasToken;
94+
} catch (Exception e) {
95+
e.getStackTrace();
96+
}
97+
return "";
98+
}
99+
100+
@Override
101+
public ListObjectsResult listObjects(String continuationToken) throws DdlException {
102+
return listObjectsInner(normalizePrefix(), continuationToken);
103+
}
104+
105+
@Override
106+
public ListObjectsResult listObjects(String subPrefix, String continuationToken) throws DdlException {
107+
return listObjectsInner(normalizePrefix(subPrefix), continuationToken);
108+
}
109+
110+
@Override
111+
public ListObjectsResult headObject(String subKey) throws DdlException {
112+
initClient();
113+
try {
114+
String key = normalizePrefix(subKey);
115+
BlobClient blobClient = client.getBlobClient(key);
116+
BlobProperties properties = blobClient.getProperties();
117+
ObjectFile objectFile = new ObjectFile(key, getRelativePath(key), properties.getETag(),
118+
properties.getBlobSize());
119+
return new ListObjectsResult(Lists.newArrayList(objectFile), false, null);
120+
} catch (BlobStorageException e) {
121+
if (e.getStatusCode() == HttpStatus.SC_NOT_FOUND) {
122+
LOG.warn("NoSuchKey when head object for Azure, subKey={}", subKey);
123+
return new ListObjectsResult(Lists.newArrayList(), false, null);
124+
}
125+
LOG.warn("Failed to head object for Azure, subKey={}", subKey, e);
126+
throw new DdlException(
127+
"Failed to head object for Azure, subKey=" + subKey + " Error message=" + e.getMessage());
128+
}
129+
}
130+
131+
@Override
132+
public Triple<String, String, String> getStsToken() throws DdlException {
133+
try {
134+
BlobContainerClientBuilder builder = new BlobContainerClientBuilder();
135+
builder.credential(new StorageSharedKeyCredential(obj.getAk(), obj.getSk()));
136+
String containerName = obj.getBucket();
137+
String uri = String.format(URI_TEMPLATE, obj.getAk(),
138+
containerName);
139+
builder.endpoint(uri);
140+
BlobContainerClient containerClient = builder.buildClient();
141+
BlobServiceClient blobServiceClient = containerClient.getServiceClient();
142+
143+
OffsetDateTime keyStart = OffsetDateTime.now();
144+
OffsetDateTime keyExpiry = keyStart.plusSeconds(SESSION_EXPIRE_SECOND);
145+
UserDelegationKey userDelegationKey = blobServiceClient.getUserDelegationKey(keyStart, keyExpiry);
146+
147+
OffsetDateTime expiryTime = OffsetDateTime.now().plusSeconds(SESSION_EXPIRE_SECOND);
148+
BlobContainerSasPermission permission = new BlobContainerSasPermission()
149+
.setReadPermission(true)
150+
.setWritePermission(true)
151+
.setListPermission(true);
152+
153+
BlobServiceSasSignatureValues sasValues = new BlobServiceSasSignatureValues(expiryTime, permission)
154+
.setProtocol(SasProtocol.HTTPS_ONLY)
155+
.setStartTime(OffsetDateTime.now());
156+
157+
String sasToken = containerClient.generateUserDelegationSas(sasValues, userDelegationKey);
158+
return Triple.of(obj.getAk(), obj.getSk(), sasToken);
159+
} catch (Throwable e) {
160+
LOG.warn("Failed get Azure sts token", e);
161+
throw new DdlException(e.getMessage());
162+
}
163+
}
164+
165+
@Override
166+
public void deleteObjects(List<String> keys) throws DdlException {
167+
checkDeleteKeys(keys);
168+
initClient();
169+
170+
try {
171+
BlobBatchClient blobBatchClient = new BlobBatchClientBuilder(
172+
client.getServiceClient()).buildClient();
173+
int maxDelete = 1000;
174+
for (int i = 0; i < keys.size() / maxDelete + 1; i++) {
175+
int cnt = 0;
176+
BlobBatch batch = blobBatchClient.getBlobBatch();
177+
for (int j = maxDelete * i; j < keys.size() && cnt < maxDelete; j++) {
178+
batch.deleteBlob(client.getBlobContainerName(), keys.get(j));
179+
cnt++;
180+
}
181+
Response<Void> resp = blobBatchClient.submitBatchWithResponse(batch, true, null, Context.NONE);
182+
if (resp.getStatusCode() != HttpStatus.SC_OK) {
183+
throw new DdlException(
184+
"Failed delete objects, bucket=" + obj.getBucket());
185+
}
186+
}
187+
} catch (BlobStorageException e) {
188+
LOG.warn("Failed to delete objects for Azure", e);
189+
throw new DdlException("Failed to delete objects for Azure, Error message=" + e.getMessage());
190+
}
191+
}
192+
193+
@Override
194+
public void close() {
195+
client = null;
196+
}
197+
198+
@Override
199+
public String toString() {
200+
return "AzureRemote{obj=" + obj + '}';
201+
}
202+
203+
private ListObjectsResult listObjectsInner(String prefix, String continuationToken) throws DdlException {
204+
initClient();
205+
try {
206+
ListBlobsOptions options = new ListBlobsOptions().setPrefix(prefix);
207+
PagedIterable<BlobItem> pagedBlobs = client.listBlobs(options, continuationToken, null);
208+
PagedResponse<BlobItem> pagedResponse = pagedBlobs.iterableByPage().iterator().next();
209+
List<ObjectFile> objectFiles = new ArrayList<>();
210+
211+
for (BlobItem blobItem : pagedResponse.getElements()) {
212+
objectFiles.add(new ObjectFile(blobItem.getName(), getRelativePath(blobItem.getName()),
213+
blobItem.getProperties().getETag(), blobItem.getProperties().getContentLength()));
214+
}
215+
return new ListObjectsResult(objectFiles, pagedResponse.getContinuationToken() == null,
216+
pagedResponse.getContinuationToken());
217+
} catch (BlobStorageException e) {
218+
LOG.warn("Failed to list objects for Azure", e);
219+
throw new DdlException("Failed to list objects for Azure, Error message=" + e.getMessage());
220+
}
221+
}
222+
223+
private void initClient() {
224+
if (client == null) {
225+
BlobContainerClientBuilder builder = new BlobContainerClientBuilder();
226+
if (obj.getToken() != null) {
227+
builder.credential(new SimpleTokenCredential(obj.getToken()));
228+
} else {
229+
builder.credential(new StorageSharedKeyCredential(obj.getAk(), obj.getSk()));
230+
}
231+
String containerName = obj.getBucket();
232+
String uri = String.format(URI_TEMPLATE, obj.getAk(),
233+
containerName);
234+
builder.endpoint(uri);
235+
client = builder.buildClient();
236+
}
237+
}
238+
239+
// Custom implementation of TokenCredential
240+
private static class SimpleTokenCredential implements TokenCredential {
241+
private static final Logger LOG = LogManager.getLogger(SimpleTokenCredential.class);
242+
private final String token;
243+
244+
SimpleTokenCredential(String token) {
245+
this.token = token;
246+
}
247+
248+
@Override
249+
public Mono<AccessToken> getToken(TokenRequestContext request) {
250+
LOG.info("Getting token for scopes: {}", String.join(", ", request.getScopes()));
251+
// Assume the token is valid for 1 hour from the current time
252+
return Mono.just(new AccessToken(token, OffsetDateTime.now().plusSeconds(SESSION_EXPIRE_SECOND)));
253+
}
254+
}
255+
}

fe/fe-core/src/main/java/org/apache/doris/cloud/storage/BosRemote.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ public String getPresignedUrl(String fileName) {
4242
config.setCredentials(new DefaultBceCredentials(obj.getAk(), obj.getSk()));
4343
config.setEndpoint(obj.getEndpoint());
4444
BosClient client = new BosClient(config);
45-
URL url = client.generatePresignedUrl(obj.getBucket(), normalizePrefix(fileName), 3600, HttpMethodName.PUT);
45+
URL url = client.generatePresignedUrl(obj.getBucket(), normalizePrefix(fileName),
46+
(int) SESSION_EXPIRE_SECOND, HttpMethodName.PUT);
4647
LOG.info("Bos getPresignedUrl: {}", url);
4748
return url.toString();
4849
}

fe/fe-core/src/main/java/org/apache/doris/cloud/storage/CosRemote.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public String getPresignedUrl(String fileName) {
5656
clientConfig.setRegion(new Region(obj.getRegion()));
5757
clientConfig.setHttpProtocol(HttpProtocol.https);
5858
COSClient cosClient = new COSClient(cred, clientConfig);
59-
Date expirationDate = new Date(System.currentTimeMillis() + 60 * 60 * 1000);
59+
Date expirationDate = new Date(System.currentTimeMillis() + SESSION_EXPIRE_SECOND);
6060
URL url = cosClient.generatePresignedUrl(obj.getBucket(),
6161
normalizePrefix(fileName), expirationDate, HttpMethodName.PUT,
6262
new HashMap<String, String>(), new HashMap<String, String>());

fe/fe-core/src/main/java/org/apache/doris/cloud/storage/ObsRemote.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ public String getPresignedUrl(String fileName) {
5555
String sk = obj.getSk();
5656

5757
ObsClient obsClient = new ObsClient(ak, sk, endPoint);
58-
long expireSeconds = 3600L;
59-
TemporarySignatureRequest request = new TemporarySignatureRequest(HttpMethodEnum.PUT, expireSeconds);
58+
TemporarySignatureRequest request = new TemporarySignatureRequest(
59+
HttpMethodEnum.PUT, SESSION_EXPIRE_SECOND);
6060
request.setBucketName(obj.getBucket());
6161
request.setObjectKey(normalizePrefix(fileName));
6262
request.setHeaders(new HashMap<String, String>());

fe/fe-core/src/main/java/org/apache/doris/cloud/storage/OssRemote.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public String getPresignedUrl(String fileName) {
6565
try {
6666
GeneratePresignedUrlRequest request
6767
= new GeneratePresignedUrlRequest(bucketName, objectName, HttpMethod.PUT);
68-
Date expiration = new Date(new Date().getTime() + 3600 * 1000);
68+
Date expiration = new Date(new Date().getTime() + SESSION_EXPIRE_SECOND * 1000);
6969
request.setExpiration(expiration);
7070
URL signedUrl = ossClient.generatePresignedUrl(request);
7171
return signedUrl.toString();

fe/fe-core/src/main/java/org/apache/doris/cloud/storage/RemoteBase.java

+4
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ public String toString() {
116116

117117
public ObjectInfo obj;
118118

119+
protected static long SESSION_EXPIRE_SECOND = 3600;
120+
119121
public RemoteBase(ObjectInfo obj) {
120122
this.obj = obj;
121123
}
@@ -149,6 +151,8 @@ public static RemoteBase newInstance(ObjectInfo obj) throws Exception {
149151
return new ObsRemote(obj);
150152
case BOS:
151153
return new BosRemote(obj);
154+
case AZURE:
155+
return new AzureRemote(obj);
152156
default:
153157
throw new Exception("current not support obj : " + obj.toString());
154158
}

fe/fe-core/src/main/java/org/apache/doris/cloud/storage/S3Remote.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public String getPresignedUrl(String fileName) {
5454
.build();
5555

5656
PutObjectPresignRequest presignRequest = PutObjectPresignRequest.builder()
57-
.signatureDuration(Duration.ofMinutes(60))
57+
.signatureDuration(Duration.ofSeconds(SESSION_EXPIRE_SECOND))
5858
.putObjectRequest(objectRequest)
5959
.build();
6060

0 commit comments

Comments
 (0)