Skip to content

Commit

Permalink
(cloud-merge) Support BE smooth upgrade (#35819)
Browse files Browse the repository at this point in the history
  • Loading branch information
deardeng authored Jun 13, 2024
1 parent 011062a commit ac2bf56
Show file tree
Hide file tree
Showing 8 changed files with 322 additions and 159 deletions.
40 changes: 40 additions & 0 deletions be/src/http/action/shrink_mem_action.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "http/action/shrink_mem_action.h"

#include <fmt/core.h>

#include "http/http_channel.h"
#include "http/http_request.h"
#include "runtime/exec_env.h"
#include "util/brpc_client_cache.h"
#include "util/mem_info.h"
#include "util/string_util.h"

namespace doris {
void ShrinkMemAction::handle(HttpRequest* req) {
LOG(INFO) << "begin shrink memory";
/* this interface might be ready for cloud in the near future
* int freed_mem = 0;
* doris::MemInfo::process_cache_gc(&freed_mem); */
MemInfo::process_minor_gc();
LOG(INFO) << "shrink memory triggered, using Process Minor GC Free Memory";
HttpChannel::send_reply(req, HttpStatus::OK, "shrinking");
}

} // namespace doris
32 changes: 32 additions & 0 deletions be/src/http/action/shrink_mem_action.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "http/http_handler.h"

namespace doris {
class ExecEnv;
class ShrinkMemAction : public HttpHandler {
public:
explicit ShrinkMemAction() {}

virtual ~ShrinkMemAction() {}

void handle(HttpRequest* req) override;
};
} // namespace doris
5 changes: 5 additions & 0 deletions be/src/service/http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include "http/action/reset_rpc_channel_action.h"
#include "http/action/restore_tablet_action.h"
#include "http/action/show_hotspot_action.h"
#include "http/action/shrink_mem_action.h"
#include "http/action/snapshot_action.h"
#include "http/action/stream_load.h"
#include "http/action/stream_load_2pc.h"
Expand Down Expand Up @@ -218,6 +219,10 @@ Status HttpService::start() {
new ReportAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN, "REPORT_TASK"));
_ev_http_server->register_handler(HttpMethod::GET, "/api/report/task", report_task_action);

// shrink memory for starting co-exist process during upgrade
ShrinkMemAction* shrink_mem_action = _pool.add(new ShrinkMemAction());
_ev_http_server->register_handler(HttpMethod::GET, "/api/shrink_mem", shrink_mem_action);

auto& engine = _env->storage_engine();
if (config::is_cloud_mode()) {
register_cloud_handler(engine.to_cloud());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ private void checkToAddCluster(Map<String, ClusterPB> remoteClusterIdToPB, Set<S
ClusterStatus clusterStatus = remoteClusterIdToPB.get(addId).hasClusterStatus()
? remoteClusterIdToPB.get(addId).getClusterStatus() : ClusterStatus.NORMAL;
MetricRepo.registerCloudMetrics(clusterId, clusterName);
//toAdd.forEach(i -> i.setTagMap(newTagMap));
List<Backend> toAdd = new ArrayList<>();
for (Cloud.NodeInfoPB node : remoteClusterIdToPB.get(addId).getNodesList()) {
String addr = Config.enable_fqdn_mode ? node.getHost() : node.getIp();
Expand Down Expand Up @@ -449,6 +448,8 @@ private void getCloudBackends() {
// local - remote > 0, drop bes from local
checkToDelCluster(remoteClusterIdToPB, localClusterIds, clusterIdToBackend);

clusterIdToBackend = cloudSystemInfoService.getCloudClusterIdToBackend();

if (remoteClusterIdToPB.keySet().size() != clusterIdToBackend.keySet().size()) {
LOG.warn("impossible cluster id size not match, check it local {}, remote {}",
clusterIdToBackend, remoteClusterIdToPB);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
Expand Down Expand Up @@ -187,9 +188,7 @@ protected void runAfterCatalogReady() {
// 3 check whether the inflight preheating task has been completed
checkInflghtWarmUpCacheAsync();

// TODO(merge-cloud): wait add cloud upgrade mgr
// 4 migrate tablet for smooth upgrade
/*
Pair<Long, Long> pair;
statRouteInfo();
while (!tabletsMigrateTasks.isEmpty()) {
Expand All @@ -201,7 +200,6 @@ protected void runAfterCatalogReady() {
LOG.info("begin tablets migration from be {} to be {}", pair.first, pair.second);
migrateTablets(pair.first, pair.second);
}
*/

// 5 statistics be to tablets mapping information
statRouteInfo();
Expand Down Expand Up @@ -923,14 +921,12 @@ private void migrateTablets(Long srcBe, Long dstBe) {
}
}

// TODO(merge-cloud): wait add cloud upgrade mgr
/*
try {
Env.getCurrentEnv().getCloudUpgradeMgr().registerWaterShedTxnId(srcBe);
} catch (AnalysisException e) {
((CloudEnv) Env.getCurrentEnv()).getCloudUpgradeMgr().registerWaterShedTxnId(srcBe);
} catch (UserException e) {
LOG.warn("registerWaterShedTxnId get exception", e);
throw new RuntimeException(e);
}
*/
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ protected void runAfterCatalogReady() {
isFinished = true;
}
} catch (AnalysisException e) {
LOG.warn("cloud upgrade mgr exception", e);
throw new RuntimeException(e);
}
if (!isFinished) {
Expand Down
Loading

0 comments on commit ac2bf56

Please sign in to comment.