diff --git a/src/main/java/com/actiontech/dble/net/service/AbstractService.java b/src/main/java/com/actiontech/dble/net/service/AbstractService.java index 16e405e064..52c7687049 100644 --- a/src/main/java/com/actiontech/dble/net/service/AbstractService.java +++ b/src/main/java/com/actiontech/dble/net/service/AbstractService.java @@ -41,7 +41,7 @@ public abstract class AbstractService implements Service { private volatile boolean isSupportCompress = false; protected volatile ProtoHandler proto; protected final BlockingQueue taskQueue; - private int sequenceId = 0; + private int extraPartOfBigPacketCount = 0; public AbstractService(AbstractConnection connection) { this.connection = connection; @@ -64,7 +64,7 @@ public void handle(ByteBuffer dataBuffer) { switch (result.getCode()) { case PART_OF_BIG_PACKET: - sequenceId++; + extraPartOfBigPacketCount++; if (!result.isHasMorePacket()) { connection.readReachEnd(); dataBuffer.clear(); @@ -99,8 +99,26 @@ protected void taskCreate(byte[] packetData) { if (packetData == null) { return; } + int tmpCount = extraPartOfBigPacketCount; + if (!isSupportCompress) { + extraPartOfBigPacketCount = 0; + handleTask(new ServiceTask(packetData, this, tmpCount)); + } else { + final ConcurrentLinkedQueue decompressUnfinishedDataQueue = new ConcurrentLinkedQueue<>(); + List packs = CompressUtil.decompressMysqlPacket(packetData, decompressUnfinishedDataQueue); + if (decompressUnfinishedDataQueue.isEmpty()) { + extraPartOfBigPacketCount = 0; + } + for (byte[] pack : packs) { + if (pack.length != 0) { + handleTask(new ServiceTask(pack, this, tmpCount)); + } + } + } + } + + public void handleTask(ServiceTask task) { if (beforeHandlingTask()) { - ServiceTask task = new ServiceTask(packetData, this, sequenceId); try { taskQueue.put(task); } catch (InterruptedException e) { @@ -322,28 +340,12 @@ public void handleData(ServiceTask task) { try { byte[] data = executeTask.getOrgData(); if (data != null && !executeTask.isReuse()) { - this.setPacketId(executeTask.getSequenceId()); + this.setPacketId(executeTask.getLastSequenceId()); } if (data != null && data.length - MySQLPacket.PACKET_HEADER_SIZE >= SystemConfig.getInstance().getMaxPacketSize()) { throw new IllegalArgumentException("Packet for query is too large (" + data.length + " > " + SystemConfig.getInstance().getMaxPacketSize() + ").You can change maxPacketSize value in bootstrap.cnf."); } - - if (isSupportCompress()) { - final ConcurrentLinkedQueue decompressUnfinishedDataQueue = new ConcurrentLinkedQueue<>(); - List packs = CompressUtil.decompressMysqlPacket(data, decompressUnfinishedDataQueue); - if (decompressUnfinishedDataQueue.isEmpty()) { - sequenceId = 0; - } - for (byte[] pack : packs) { - if (pack.length != 0) { - handleInnerData(pack); - } - } - } else { - sequenceId = 0; - this.handleInnerData(data); - - } + this.handleInnerData(data); } catch (Throwable e) { String msg = e.getMessage(); if (StringUtil.isEmpty(msg)) { diff --git a/src/main/java/com/actiontech/dble/net/service/ServiceTask.java b/src/main/java/com/actiontech/dble/net/service/ServiceTask.java index cd5608621b..d63b184c24 100644 --- a/src/main/java/com/actiontech/dble/net/service/ServiceTask.java +++ b/src/main/java/com/actiontech/dble/net/service/ServiceTask.java @@ -1,6 +1,6 @@ package com.actiontech.dble.net.service; -import javax.annotation.Nullable; +import java.util.Arrays; /** * Created by szf on 2020/6/18. @@ -11,13 +11,13 @@ public class ServiceTask { private volatile boolean highPriority = false; private final boolean reuse; private final Service service; - private Integer sequenceId = null; + private int extraPartOfBigPacketCount = 0; - public ServiceTask(byte[] orgData, Service service, @Nullable Integer sequenceId) { + public ServiceTask(byte[] orgData, Service service, int extraPartsOfBigPacketCount) { this.orgData = orgData; this.service = service; this.reuse = false; - this.sequenceId = sequenceId; + this.extraPartOfBigPacketCount = extraPartsOfBigPacketCount; } public ServiceTask(byte[] orgData, Service service, boolean reuse) { @@ -47,15 +47,10 @@ public boolean isReuse() { return reuse; } - public int getSequenceId() { - if (sequenceId != null) { - return sequenceId; - } else { - if (orgData == null) { - throw new IllegalStateException("can't get Sequence Id from null"); - } - return (orgData[3]); + public int getLastSequenceId() { + if (orgData == null || orgData.length < 4) { + throw new IllegalStateException("can't get Sequence Id from " + Arrays.toString(orgData)); } - + return (orgData[3]) + extraPartOfBigPacketCount; } }