Properly combine packets

This commit is contained in:
UnfamiliarLegacy 2021-12-01 20:51:11 +01:00
parent ad9728af28
commit a525d6c867

View File

@ -3,6 +3,7 @@ package gearth.protocol.connection.proxy.nitro.websocket;
import gearth.protocol.HMessage; import gearth.protocol.HMessage;
import gearth.protocol.HPacket; import gearth.protocol.HPacket;
import gearth.protocol.packethandler.PacketHandler; import gearth.protocol.packethandler.PacketHandler;
import gearth.protocol.packethandler.PayloadBuffer;
import gearth.services.extension_handler.ExtensionHandler; import gearth.services.extension_handler.ExtensionHandler;
import gearth.services.extension_handler.OnHMessageHandled; import gearth.services.extension_handler.OnHMessageHandled;
@ -14,11 +15,15 @@ public class NitroPacketHandler extends PacketHandler {
private final HMessage.Direction direction; private final HMessage.Direction direction;
private final NitroSession session; private final NitroSession session;
private final PayloadBuffer payloadBuffer;
private final Object payloadLock;
protected NitroPacketHandler(HMessage.Direction direction, NitroSession session, ExtensionHandler extensionHandler, Object[] trafficObservables) { protected NitroPacketHandler(HMessage.Direction direction, NitroSession session, ExtensionHandler extensionHandler, Object[] trafficObservables) {
super(extensionHandler, trafficObservables); super(extensionHandler, trafficObservables);
this.direction = direction; this.direction = direction;
this.session = session; this.session = session;
this.payloadBuffer = new PayloadBuffer();
this.payloadLock = new Object();
} }
@Override @Override
@ -40,20 +45,26 @@ public class NitroPacketHandler extends PacketHandler {
@Override @Override
public void act(byte[] buffer) throws IOException { public void act(byte[] buffer) throws IOException {
HMessage hMessage = new HMessage(new HPacket(buffer), direction, currentIndex); payloadBuffer.push(buffer);
OnHMessageHandled afterExtensionIntercept = hMessage1 -> { synchronized (payloadLock) {
notifyListeners(2, hMessage1); for (HPacket packet : payloadBuffer.receive()) {
HMessage hMessage = new HMessage(packet, direction, currentIndex);
if (!hMessage1.isBlocked()) { OnHMessageHandled afterExtensionIntercept = hMessage1 -> {
sendToStream(hMessage1.getPacket().toBytes()); notifyListeners(2, hMessage1);
if (!hMessage1.isBlocked()) {
sendToStream(hMessage1.getPacket().toBytes());
}
};
notifyListeners(0, hMessage);
notifyListeners(1, hMessage);
extensionHandler.handle(hMessage, afterExtensionIntercept);
currentIndex++;
} }
}; }
notifyListeners(0, hMessage);
notifyListeners(1, hMessage);
extensionHandler.handle(hMessage, afterExtensionIntercept);
currentIndex++;
} }
} }