Skip to content

Commit

Permalink
Refactor: Removed the useless Ser/de logic in configNode read plans (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
Caideyipi authored Sep 23, 2024
1 parent 7ceddae commit 6978717
Show file tree
Hide file tree
Showing 66 changed files with 508 additions and 1,833 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,46 +20,15 @@
package org.apache.iotdb.confignode.consensus.request;

import org.apache.iotdb.commons.exception.runtime.SerializationRunTimeException;
import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
import org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan;
import org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan;
import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan;
import org.apache.iotdb.confignode.consensus.request.read.function.GetFunctionTablePlan;
import org.apache.iotdb.confignode.consensus.request.read.function.GetUDFJarPlan;
import org.apache.iotdb.confignode.consensus.request.read.partition.CountTimeSlotListPlan;
import org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.read.partition.GetNodePathsPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.read.partition.GetOrCreateDataPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.read.partition.GetOrCreateSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.read.partition.GetSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.read.partition.GetSeriesSlotListPlan;
import org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotListPlan;
import org.apache.iotdb.confignode.consensus.request.read.pipe.plugin.GetPipePluginJarPlan;
import org.apache.iotdb.confignode.consensus.request.read.pipe.plugin.GetPipePluginTablePlan;
import org.apache.iotdb.confignode.consensus.request.read.pipe.task.ShowPipePlanV2;
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionIdPlan;
import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan;
import org.apache.iotdb.confignode.consensus.request.read.subscription.ShowSubscriptionPlan;
import org.apache.iotdb.confignode.consensus.request.read.subscription.ShowTopicPlan;
import org.apache.iotdb.confignode.consensus.request.read.table.ShowTablePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.CheckTemplateSettablePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetAllSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetAllTemplateSetInfoPlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetPathsSetTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.read.template.GetTemplateSetInfoPlan;
import org.apache.iotdb.confignode.consensus.request.read.trigger.GetTransferringTriggersPlan;
import org.apache.iotdb.confignode.consensus.request.read.trigger.GetTriggerJarPlan;
import org.apache.iotdb.confignode.consensus.request.read.trigger.GetTriggerLocationPlan;
import org.apache.iotdb.confignode.consensus.request.read.trigger.GetTriggerTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorPlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateClusterIdPlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateVersionInfoPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.ShowCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTimePlan;
import org.apache.iotdb.confignode.consensus.request.write.database.AdjustMaxRegionGroupNumPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
Expand Down Expand Up @@ -115,7 +84,6 @@
import org.apache.iotdb.confignode.consensus.request.write.sync.PreCreatePipePlanV1;
import org.apache.iotdb.confignode.consensus.request.write.sync.RecordPipeMessagePlan;
import org.apache.iotdb.confignode.consensus.request.write.sync.SetPipeStatusPlanV1;
import org.apache.iotdb.confignode.consensus.request.write.sync.ShowPipePlanV1;
import org.apache.iotdb.confignode.consensus.request.write.table.AddTableColumnPlan;
import org.apache.iotdb.confignode.consensus.request.write.table.CommitCreateTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.table.PreCreateTablePlan;
Expand All @@ -138,8 +106,6 @@
import org.apache.iotdb.consensus.common.request.IConsensusRequest;

import org.apache.tsfile.utils.PublicBAOS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataOutputStream;
import java.io.IOException;
Expand All @@ -148,11 +114,9 @@

public abstract class ConfigPhysicalPlan implements IConsensusRequest {

private static final Logger LOGGER = LoggerFactory.getLogger(ConfigPhysicalPlan.class);

private final ConfigPhysicalPlanType type;

protected ConfigPhysicalPlan(ConfigPhysicalPlanType type) {
protected ConfigPhysicalPlan(final ConfigPhysicalPlanType type) {
this.type = type;
}

Expand All @@ -162,37 +126,37 @@ public ConfigPhysicalPlanType getType() {

@Override
public ByteBuffer serializeToByteBuffer() {
try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
serializeImpl(outputStream);
return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
} catch (IOException e) {
} catch (final IOException e) {
throw new SerializationRunTimeException(e);
}
}

protected abstract void serializeImpl(DataOutputStream stream) throws IOException;
protected abstract void serializeImpl(final DataOutputStream stream) throws IOException;

protected abstract void deserializeImpl(ByteBuffer buffer) throws IOException;
protected abstract void deserializeImpl(final ByteBuffer buffer) throws IOException;

public int getSerializedSize() throws IOException {
PublicBAOS byteArrayOutputStream = new PublicBAOS();
DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
serializeImpl(outputStream);
return byteArrayOutputStream.size();
}

public static class Factory {

public static ConfigPhysicalPlan create(ByteBuffer buffer) throws IOException {
short planType = buffer.getShort();
ConfigPhysicalPlanType configPhysicalPlanType =
public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOException {
final short planType = buffer.getShort();
final ConfigPhysicalPlanType configPhysicalPlanType =
ConfigPhysicalPlanType.convertToConfigPhysicalPlanType(planType);
if (configPhysicalPlanType == null) {
throw new IOException("Unrecognized log configPhysicalPlanType: " + planType);
}

ConfigPhysicalPlan plan;
final ConfigPhysicalPlan plan;
switch (configPhysicalPlanType) {
case RegisterDataNode:
plan = new RegisterDataNodePlan();
Expand All @@ -203,9 +167,6 @@ public static ConfigPhysicalPlan create(ByteBuffer buffer) throws IOException {
case RemoveDataNode:
plan = new RemoveDataNodePlan();
break;
case GetDataNodeConfiguration:
plan = new GetDataNodeConfigurationPlan();
break;
case CreateDatabase:
plan = new DatabaseSchemaPlan(ConfigPhysicalPlanType.CreateDatabase);
break;
Expand All @@ -227,12 +188,6 @@ public static ConfigPhysicalPlan create(ByteBuffer buffer) throws IOException {
case AdjustMaxRegionGroupNum:
plan = new AdjustMaxRegionGroupNumPlan();
break;
case CountDatabase:
plan = new CountDatabasePlan();
break;
case GetDatabase:
plan = new GetDatabasePlan();
break;
case CreateRegionGroups:
plan = new CreateRegionGroupsPlan();
break;
Expand All @@ -254,24 +209,12 @@ public static ConfigPhysicalPlan create(ByteBuffer buffer) throws IOException {
case PollSpecificRegionMaintainTask:
plan = new PollSpecificRegionMaintainTaskPlan();
break;
case GetSchemaPartition:
plan = new GetSchemaPartitionPlan();
break;
case CreateSchemaPartition:
plan = new CreateSchemaPartitionPlan();
break;
case GetOrCreateSchemaPartition:
plan = new GetOrCreateSchemaPartitionPlan();
break;
case GetDataPartition:
plan = new GetDataPartitionPlan();
break;
case CreateDataPartition:
plan = new CreateDataPartitionPlan();
break;
case GetOrCreateDataPartition:
plan = new GetOrCreateDataPartitionPlan();
break;
case DeleteProcedure:
plan = new DeleteProcedurePlan();
break;
Expand All @@ -284,12 +227,6 @@ public static ConfigPhysicalPlan create(ByteBuffer buffer) throws IOException {
case DeleteDatabase:
plan = new DeleteDatabasePlan();
break;
case ListUserDep:
case ListRoleDep:
case ListUserPrivilegeDep:
case ListRolePrivilegeDep:
case ListUserRolesDep:
case ListRoleUsersDep:
case CreateUserDep:
case CreateRoleDep:
case DropUserDep:
Expand All @@ -301,12 +238,6 @@ public static ConfigPhysicalPlan create(ByteBuffer buffer) throws IOException {
case RevokeRoleDep:
case RevokeRoleFromUserDep:
case UpdateUserDep:
case ListUser:
case ListRole:
case ListUserPrivilege:
case ListRolePrivilege:
case ListUserRoles:
case ListRoleUsers:
case CreateUser:
case CreateRole:
case DropUser:
Expand Down Expand Up @@ -348,33 +279,9 @@ public static ConfigPhysicalPlan create(ByteBuffer buffer) throws IOException {
case UpdateTriggerStateInTable:
plan = new UpdateTriggerStateInTablePlan();
break;
case GetTriggerTable:
plan = new GetTriggerTablePlan();
break;
case GetTriggerLocation:
plan = new GetTriggerLocationPlan();
break;
case GetTriggerJar:
plan = new GetTriggerJarPlan();
break;
case CreateSchemaTemplate:
plan = new CreateSchemaTemplatePlan();
break;
case GetAllSchemaTemplate:
plan = new GetAllSchemaTemplatePlan();
break;
case GetSchemaTemplate:
plan = new GetSchemaTemplatePlan();
break;
case CheckTemplateSettable:
plan = new CheckTemplateSettablePlan();
break;
case GetPathsSetTemplate:
plan = new GetPathsSetTemplatePlan();
break;
case GetAllTemplateSetInfo:
plan = new GetAllTemplateSetInfoPlan();
break;
case SetSchemaTemplate:
plan = new SetSchemaTemplatePlan();
break;
Expand All @@ -384,9 +291,6 @@ public static ConfigPhysicalPlan create(ByteBuffer buffer) throws IOException {
case CommitSetSchemaTemplate:
plan = new CommitSetSchemaTemplatePlan();
break;
case GetTemplateSetInfo:
plan = new GetTemplateSetInfoPlan();
break;
case DropSchemaTemplate:
plan = new DropSchemaTemplatePlan();
break;
Expand Down Expand Up @@ -417,15 +321,6 @@ public static ConfigPhysicalPlan create(ByteBuffer buffer) throws IOException {
case SetTableProperties:
plan = new SetTablePropertiesPlan();
break;
case ShowTable:
plan = new ShowTablePlan();
break;
case GetNodePathsPartition:
plan = new GetNodePathsPartitionPlan();
break;
case GetRegionInfoList:
plan = new GetRegionInfoListPlan();
break;
case CreatePipeSinkV1:
plan = new CreatePipeSinkPlanV1();
break;
Expand All @@ -444,9 +339,6 @@ public static ConfigPhysicalPlan create(ByteBuffer buffer) throws IOException {
case DropPipeV1:
plan = new DropPipePlanV1();
break;
case ShowPipeV1:
plan = new ShowPipePlanV1();
break;
case RecordPipeMessageV1:
plan = new RecordPipeMessagePlan();
break;
Expand All @@ -462,9 +354,6 @@ public static ConfigPhysicalPlan create(ByteBuffer buffer) throws IOException {
case AlterPipeV2:
plan = new AlterPipePlanV2();
break;
case ShowPipeV2:
plan = new ShowPipePlanV2();
break;
case OperateMultiplePipesV2:
plan = new OperateMultiplePipesPlanV2();
break;
Expand Down Expand Up @@ -501,9 +390,6 @@ public static ConfigPhysicalPlan create(ByteBuffer buffer) throws IOException {
case ConsumerGroupHandleMetaChange:
plan = new ConsumerGroupHandleMetaChangePlan();
break;
case ShowSubscription:
plan = new ShowSubscriptionPlan();
break;
case PipeUnsetTemplate:
plan = new PipeUnsetSchemaTemplatePlan();
break;
Expand All @@ -516,27 +402,12 @@ public static ConfigPhysicalPlan create(ByteBuffer buffer) throws IOException {
case PipeDeactivateTemplate:
plan = new PipeDeactivateTemplatePlan();
break;
case GetRegionId:
plan = new GetRegionIdPlan();
break;
case GetTimeSlotList:
plan = new GetTimeSlotListPlan();
break;
case CountTimeSlotList:
plan = new CountTimeSlotListPlan();
break;
case GetSeriesSlotList:
plan = new GetSeriesSlotListPlan();
break;
case UpdateTriggersOnTransferNodes:
plan = new UpdateTriggersOnTransferNodesPlan();
break;
case UpdateTriggerLocation:
plan = new UpdateTriggerLocationPlan();
break;
case GetTransferringTriggers:
plan = new GetTransferringTriggersPlan();
break;
case ACTIVE_CQ:
plan = new ActiveCQPlan();
break;
Expand All @@ -549,27 +420,12 @@ public static ConfigPhysicalPlan create(ByteBuffer buffer) throws IOException {
case UPDATE_CQ_LAST_EXEC_TIME:
plan = new UpdateCQLastExecTimePlan();
break;
case SHOW_CQ:
plan = new ShowCQPlan();
break;
case GetFunctionTable:
plan = new GetFunctionTablePlan();
break;
case GetFunctionJar:
plan = new GetUDFJarPlan();
break;
case CreatePipePlugin:
plan = new CreatePipePluginPlan();
break;
case DropPipePlugin:
plan = new DropPipePluginPlan();
break;
case GetPipePluginTable:
plan = new GetPipePluginTablePlan();
break;
case GetPipePluginJar:
plan = new GetPipePluginJarPlan();
break;
case setSpaceQuota:
plan = new SetSpaceQuotaPlan();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.iotdb.confignode.consensus.request;

import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
Expand Down
Loading

0 comments on commit 6978717

Please sign in to comment.