Skip to content

Commit

Permalink
V1.x develop limit storage (#12492)
Browse files Browse the repository at this point in the history
* Limit storage type.

* Limit derby sql type.
  • Loading branch information
KomachiSion authored Aug 15, 2024
1 parent 06455c9 commit a1bec0a
Show file tree
Hide file tree
Showing 6 changed files with 302 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import com.alibaba.nacos.config.server.service.sql.ModifyRequest;
import com.alibaba.nacos.config.server.service.sql.QueryType;
import com.alibaba.nacos.config.server.service.sql.SelectRequest;
import com.alibaba.nacos.config.server.service.sql.limit.SqlLimiter;
import com.alibaba.nacos.config.server.service.sql.limit.SqlTypeLimiter;
import com.alibaba.nacos.config.server.utils.ConfigExecutor;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.consistency.SerializeFactory;
Expand All @@ -57,8 +59,8 @@
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.distributed.ProtocolManager;
import com.alibaba.nacos.core.utils.ClassUtils;
import com.alibaba.nacos.sys.utils.DiskUtils;
import com.alibaba.nacos.core.utils.GenericType;
import com.alibaba.nacos.sys.utils.DiskUtils;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import org.springframework.context.annotation.Conditional;
Expand All @@ -71,6 +73,7 @@
import org.springframework.transaction.support.TransactionTemplate;

import java.io.File;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -167,10 +170,13 @@ public class DistributedDatabaseOperateImpl extends RequestProcessor4CP implemen

private ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();

private final SqlLimiter sqlLimiter;

public DistributedDatabaseOperateImpl(ServerMemberManager memberManager, ProtocolManager protocolManager)
throws Exception {
this.memberManager = memberManager;
this.protocol = protocolManager.getCpProtocol();
sqlLimiter = new SqlTypeLimiter();
init();
}

Expand Down Expand Up @@ -223,8 +229,8 @@ public <R> R queryOne(String sql, Class<R> cls) {
SelectRequest.builder().queryType(QueryType.QUERY_ONE_NO_MAPPER_NO_ARGS).sql(sql)
.className(cls.getCanonicalName()).build());

final boolean blockRead = EmbeddedStorageContextUtils
.containsExtendInfo(Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA);
final boolean blockRead = EmbeddedStorageContextUtils.containsExtendInfo(
Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA);

Response response = innerRead(
ReadRequest.newBuilder().setGroup(group()).setData(ByteString.copyFrom(data)).build(), blockRead);
Expand All @@ -247,8 +253,8 @@ public <R> R queryOne(String sql, Object[] args, Class<R> cls) {
SelectRequest.builder().queryType(QueryType.QUERY_ONE_NO_MAPPER_WITH_ARGS).sql(sql).args(args)
.className(cls.getCanonicalName()).build());

final boolean blockRead = EmbeddedStorageContextUtils
.containsExtendInfo(Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA);
final boolean blockRead = EmbeddedStorageContextUtils.containsExtendInfo(
Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA);

Response response = innerRead(
ReadRequest.newBuilder().setGroup(group()).setData(ByteString.copyFrom(data)).build(), blockRead);
Expand All @@ -271,8 +277,8 @@ public <R> R queryOne(String sql, Object[] args, RowMapper<R> mapper) {
SelectRequest.builder().queryType(QueryType.QUERY_ONE_WITH_MAPPER_WITH_ARGS).sql(sql).args(args)
.className(mapper.getClass().getCanonicalName()).build());

final boolean blockRead = EmbeddedStorageContextUtils
.containsExtendInfo(Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA);
final boolean blockRead = EmbeddedStorageContextUtils.containsExtendInfo(
Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA);

Response response = innerRead(
ReadRequest.newBuilder().setGroup(group()).setData(ByteString.copyFrom(data)).build(), blockRead);
Expand All @@ -296,8 +302,8 @@ public <R> List<R> queryMany(String sql, Object[] args, RowMapper<R> mapper) {
SelectRequest.builder().queryType(QueryType.QUERY_MANY_WITH_MAPPER_WITH_ARGS).sql(sql).args(args)
.className(mapper.getClass().getCanonicalName()).build());

final boolean blockRead = EmbeddedStorageContextUtils
.containsExtendInfo(Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA);
final boolean blockRead = EmbeddedStorageContextUtils.containsExtendInfo(
Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA);

Response response = innerRead(
ReadRequest.newBuilder().setGroup(group()).setData(ByteString.copyFrom(data)).build(), blockRead);
Expand All @@ -320,8 +326,8 @@ public <R> List<R> queryMany(String sql, Object[] args, Class<R> rClass) {
SelectRequest.builder().queryType(QueryType.QUERY_MANY_NO_MAPPER_WITH_ARGS).sql(sql).args(args)
.className(rClass.getCanonicalName()).build());

final boolean blockRead = EmbeddedStorageContextUtils
.containsExtendInfo(Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA);
final boolean blockRead = EmbeddedStorageContextUtils.containsExtendInfo(
Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA);

Response response = innerRead(
ReadRequest.newBuilder().setGroup(group()).setData(ByteString.copyFrom(data)).build(), blockRead);
Expand All @@ -344,8 +350,8 @@ public List<Map<String, Object>> queryMany(String sql, Object[] args) {
SelectRequest.builder().queryType(QueryType.QUERY_MANY_WITH_LIST_WITH_ARGS).sql(sql).args(args)
.build());

final boolean blockRead = EmbeddedStorageContextUtils
.containsExtendInfo(Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA);
final boolean blockRead = EmbeddedStorageContextUtils.containsExtendInfo(
Constants.EXTEND_NEED_READ_UNTIL_HAVE_DATA);

Response response = innerRead(
ReadRequest.newBuilder().setGroup(group()).setData(ByteString.copyFrom(data)).build(), blockRead);
Expand Down Expand Up @@ -390,9 +396,10 @@ public CompletableFuture<RestResult<String>> dataImport(File file) {
if (submit) {
List<ModifyRequest> requests = batchUpdate.stream().map(ModifyRequest::new)
.collect(Collectors.toList());
CompletableFuture<Response> future = protocol.writeAsync(WriteRequest.newBuilder().setGroup(group())
.setData(ByteString.copyFrom(serializer.serialize(requests)))
.putExtendInfo(DATA_IMPORT_KEY, Boolean.TRUE.toString()).build());
CompletableFuture<Response> future = protocol.writeAsync(
WriteRequest.newBuilder().setGroup(group())
.setData(ByteString.copyFrom(serializer.serialize(requests)))
.putExtendInfo(DATA_IMPORT_KEY, Boolean.TRUE.toString()).build());
futures.add(future);
batchUpdate.clear();
}
Expand Down Expand Up @@ -463,8 +470,8 @@ public List<SnapshotOperation> loadSnapshotOperate() {
@SuppressWarnings("all")
@Override
public Response onRequest(final ReadRequest request) {
final SelectRequest selectRequest = serializer
.deserialize(request.getData().toByteArray(), SelectRequest.class);
final SelectRequest selectRequest = serializer.deserialize(request.getData().toByteArray(),
SelectRequest.class);

LoggerUtils.printIfDebugEnabled(LogUtil.DEFAULT_LOG, "getData info : selectRequest : {}", selectRequest);

Expand All @@ -473,6 +480,7 @@ public Response onRequest(final ReadRequest request) {
readLock.lock();
Object data;
try {
sqlLimiter.doLimitForSelectRequest(selectRequest);
switch (type) {
case QueryType.QUERY_ONE_WITH_MAPPER_WITH_ARGS:
data = queryOne(jdbcTemplate, selectRequest.getSql(), selectRequest.getArgs(), mapper);
Expand Down Expand Up @@ -515,10 +523,11 @@ public Response onApply(WriteRequest log) {
LoggerUtils.printIfDebugEnabled(LogUtil.DEFAULT_LOG, "onApply info : log : {}", log);
final ByteString byteString = log.getData();
Preconditions.checkArgument(byteString != null, "Log.getData() must not null");
List<ModifyRequest> sqlContext = serializer.deserialize(byteString.toByteArray(), List.class);
final Lock lock = readLock;
lock.lock();
try {
List<ModifyRequest> sqlContext = serializer.deserialize(byteString.toByteArray(), List.class);
sqlLimiter.doLimitForModifyRequest(sqlContext);
boolean isOk = false;
if (log.containsExtendInfo(DATA_IMPORT_KEY)) {
isOk = doDataImport(jdbcTemplate, sqlContext);
Expand All @@ -539,6 +548,9 @@ public Response onApply(WriteRequest log) {
return Response.newBuilder().setSuccess(false).setErrMsg(e.toString()).build();
} catch (DataAccessException e) {
throw new ConsistencyException(e.toString());
} catch (SQLException e) {
LogUtil.FATAL_LOG.error("onApply warn : log : {}", log, e);
return Response.newBuilder().setSuccess(false).setErrMsg(e.toString()).build();
} catch (Throwable t) {
throw t;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.alibaba.nacos.config.server.service.datasource.DataSourceService;
import com.alibaba.nacos.config.server.service.datasource.DynamicDataSource;
import com.alibaba.nacos.config.server.service.sql.ModifyRequest;
import com.alibaba.nacos.config.server.service.sql.limit.SqlLimiter;
import com.alibaba.nacos.config.server.service.sql.limit.SqlTypeLimiter;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.sys.utils.DiskUtils;
import org.apache.commons.lang3.BooleanUtils;
Expand Down Expand Up @@ -55,11 +57,14 @@ public class StandaloneDatabaseOperateImpl implements BaseDatabaseOperate {

private TransactionTemplate transactionTemplate;

private SqlLimiter sqlLimiter;

@PostConstruct
protected void init() {
DataSourceService dataSourceService = DynamicDataSource.getInstance().getDataSource();
jdbcTemplate = dataSourceService.getJdbcTemplate();
transactionTemplate = dataSourceService.getTransactionTemplate();
sqlLimiter = new SqlTypeLimiter();
LogUtil.DEFAULT_LOG.info("use StandaloneDatabaseOperateImpl");
}

Expand Down Expand Up @@ -104,6 +109,7 @@ public CompletableFuture<RestResult<String>> dataImport(File file) {
while (iterator.hasNext()) {
String sql = iterator.next();
if (StringUtils.isNotBlank(sql)) {
sqlLimiter.doLimit(sql);
batchUpdate.add(sql);
}
if (batchUpdate.size() == batchSize || !iterator.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 1999-2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.config.server.service.sql.limit;

import com.alibaba.nacos.config.server.service.sql.ModifyRequest;
import com.alibaba.nacos.config.server.service.sql.SelectRequest;

import java.sql.SQLException;
import java.util.List;

/**
* SQL limiter.
*
* @author xiweng.yy
*/
public interface SqlLimiter {

/**
* Do SQL limit for modify request.
*
* @param modifyRequest modify request
* @throws SQLException when SQL match the limit rule.
*/
void doLimitForModifyRequest(ModifyRequest modifyRequest) throws SQLException;

/**
* Do SQL limit for modify request.
*
* @param modifyRequests modify request
* @throws SQLException when SQL match the limit rule.
*/
void doLimitForModifyRequest(List<ModifyRequest> modifyRequests) throws SQLException;

/**
* Do SQL limit for select request.
*
* @param selectRequest select request
* @throws SQLException when SQL match the limit rule.
*/
void doLimitForSelectRequest(SelectRequest selectRequest) throws SQLException;

/**
* Do SQL limit for select request.
*
* @param selectRequests select request
* @throws SQLException when SQL match the limit rule.
*/
void doLimitForSelectRequest(List<SelectRequest> selectRequests) throws SQLException;

/**
* Do SQL limit for sql.
*
* @param sql SQL
* @throws SQLException when SQL match the limit rule.
*/
void doLimit(String sql) throws SQLException;

/**
* Do SQL limit for sql.
*
* @param sql SQL
* @throws SQLException when SQL match the limit rule.
*/
void doLimit(List<String> sql) throws SQLException;
}
Loading

0 comments on commit a1bec0a

Please sign in to comment.