Skip to content

Commit

Permalink
inner-983&2184: fixed load data lost connection (#2545)
Browse files Browse the repository at this point in the history
(cherry picked from commit 2b7c5cc)
  • Loading branch information
wenyh1 committed Nov 1, 2023
1 parent baef1eb commit de13077
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 34 deletions.
44 changes: 23 additions & 21 deletions src/main/java/com/actiontech/dble/net/service/AbstractService.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public abstract class AbstractService implements Service {
private volatile boolean isSupportCompress = false;
protected volatile ProtoHandler proto;
protected final BlockingQueue<ServiceTask> taskQueue;
private int sequenceId = 0;
private int extraPartOfBigPacketCount = 0;

public AbstractService(AbstractConnection connection) {
this.connection = connection;
Expand All @@ -64,7 +64,7 @@ public void handle(ByteBuffer dataBuffer) {
switch (result.getCode()) {
case PART_OF_BIG_PACKET:

sequenceId++;
extraPartOfBigPacketCount++;
if (!result.isHasMorePacket()) {
connection.readReachEnd();
dataBuffer.clear();
Expand Down Expand Up @@ -99,8 +99,26 @@ protected void taskCreate(byte[] packetData) {
if (packetData == null) {
return;
}
int tmpCount = extraPartOfBigPacketCount;
if (!isSupportCompress) {
extraPartOfBigPacketCount = 0;
handleTask(new ServiceTask(packetData, this, tmpCount));
} else {
final ConcurrentLinkedQueue<byte[]> decompressUnfinishedDataQueue = new ConcurrentLinkedQueue<>();
List<byte[]> packs = CompressUtil.decompressMysqlPacket(packetData, decompressUnfinishedDataQueue);
if (decompressUnfinishedDataQueue.isEmpty()) {
extraPartOfBigPacketCount = 0;
}
for (byte[] pack : packs) {
if (pack.length != 0) {
handleTask(new ServiceTask(pack, this, tmpCount));
}
}
}
}

public void handleTask(ServiceTask task) {
if (beforeHandlingTask()) {
ServiceTask task = new ServiceTask(packetData, this, sequenceId);
try {
taskQueue.put(task);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -322,28 +340,12 @@ public void handleData(ServiceTask task) {
try {
byte[] data = executeTask.getOrgData();
if (data != null && !executeTask.isReuse()) {
this.setPacketId(executeTask.getSequenceId());
this.setPacketId(executeTask.getLastSequenceId());
}
if (data != null && data.length - MySQLPacket.PACKET_HEADER_SIZE >= SystemConfig.getInstance().getMaxPacketSize()) {
throw new IllegalArgumentException("Packet for query is too large (" + data.length + " > " + SystemConfig.getInstance().getMaxPacketSize() + ").You can change maxPacketSize value in bootstrap.cnf.");
}

if (isSupportCompress()) {
final ConcurrentLinkedQueue<byte[]> decompressUnfinishedDataQueue = new ConcurrentLinkedQueue<>();
List<byte[]> packs = CompressUtil.decompressMysqlPacket(data, decompressUnfinishedDataQueue);
if (decompressUnfinishedDataQueue.isEmpty()) {
sequenceId = 0;
}
for (byte[] pack : packs) {
if (pack.length != 0) {
handleInnerData(pack);
}
}
} else {
sequenceId = 0;
this.handleInnerData(data);

}
this.handleInnerData(data);
} catch (Throwable e) {
String msg = e.getMessage();
if (StringUtil.isEmpty(msg)) {
Expand Down
21 changes: 8 additions & 13 deletions src/main/java/com/actiontech/dble/net/service/ServiceTask.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.actiontech.dble.net.service;

import javax.annotation.Nullable;
import java.util.Arrays;

/**
* Created by szf on 2020/6/18.
Expand All @@ -11,13 +11,13 @@ public class ServiceTask {
private volatile boolean highPriority = false;
private final boolean reuse;
private final Service service;
private Integer sequenceId = null;
private int extraPartOfBigPacketCount = 0;

public ServiceTask(byte[] orgData, Service service, @Nullable Integer sequenceId) {
public ServiceTask(byte[] orgData, Service service, int extraPartsOfBigPacketCount) {
this.orgData = orgData;
this.service = service;
this.reuse = false;
this.sequenceId = sequenceId;
this.extraPartOfBigPacketCount = extraPartsOfBigPacketCount;
}

public ServiceTask(byte[] orgData, Service service, boolean reuse) {
Expand Down Expand Up @@ -47,15 +47,10 @@ public boolean isReuse() {
return reuse;
}

public int getSequenceId() {
if (sequenceId != null) {
return sequenceId;
} else {
if (orgData == null) {
throw new IllegalStateException("can't get Sequence Id from null");
}
return (orgData[3]);
public int getLastSequenceId() {
if (orgData == null || orgData.length < 4) {
throw new IllegalStateException("can't get Sequence Id from " + Arrays.toString(orgData));
}

return (orgData[3]) + extraPartOfBigPacketCount;
}
}

0 comments on commit de13077

Please sign in to comment.