From a525d6c8673e0c7a9e47fd75054398dcfd7771f4 Mon Sep 17 00:00:00 2001 From: UnfamiliarLegacy <74633542+UnfamiliarLegacy@users.noreply.github.com> Date: Wed, 1 Dec 2021 20:51:11 +0100 Subject: [PATCH] Properly combine packets --- .../nitro/websocket/NitroPacketHandler.java | 35 ++++++++++++------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/websocket/NitroPacketHandler.java b/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/websocket/NitroPacketHandler.java index d6ed58a..70ed0cb 100644 --- a/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/websocket/NitroPacketHandler.java +++ b/G-Earth/src/main/java/gearth/protocol/connection/proxy/nitro/websocket/NitroPacketHandler.java @@ -3,6 +3,7 @@ package gearth.protocol.connection.proxy.nitro.websocket; import gearth.protocol.HMessage; import gearth.protocol.HPacket; import gearth.protocol.packethandler.PacketHandler; +import gearth.protocol.packethandler.PayloadBuffer; import gearth.services.extension_handler.ExtensionHandler; import gearth.services.extension_handler.OnHMessageHandled; @@ -14,11 +15,15 @@ public class NitroPacketHandler extends PacketHandler { private final HMessage.Direction direction; private final NitroSession session; + private final PayloadBuffer payloadBuffer; + private final Object payloadLock; protected NitroPacketHandler(HMessage.Direction direction, NitroSession session, ExtensionHandler extensionHandler, Object[] trafficObservables) { super(extensionHandler, trafficObservables); this.direction = direction; this.session = session; + this.payloadBuffer = new PayloadBuffer(); + this.payloadLock = new Object(); } @Override @@ -40,20 +45,26 @@ public class NitroPacketHandler extends PacketHandler { @Override public void act(byte[] buffer) throws IOException { - HMessage hMessage = new HMessage(new HPacket(buffer), direction, currentIndex); + payloadBuffer.push(buffer); - OnHMessageHandled afterExtensionIntercept = hMessage1 -> { - notifyListeners(2, hMessage1); + synchronized (payloadLock) { + for (HPacket packet : payloadBuffer.receive()) { + HMessage hMessage = new HMessage(packet, direction, currentIndex); - if (!hMessage1.isBlocked()) { - sendToStream(hMessage1.getPacket().toBytes()); + OnHMessageHandled afterExtensionIntercept = hMessage1 -> { + notifyListeners(2, hMessage1); + + if (!hMessage1.isBlocked()) { + sendToStream(hMessage1.getPacket().toBytes()); + } + }; + + notifyListeners(0, hMessage); + notifyListeners(1, hMessage); + extensionHandler.handle(hMessage, afterExtensionIntercept); + + currentIndex++; } - }; - - notifyListeners(0, hMessage); - notifyListeners(1, hMessage); - extensionHandler.handle(hMessage, afterExtensionIntercept); - - currentIndex++; + } } }