From 62a5f696dd9429170bf5b65d29a640bc3a370b50 Mon Sep 17 00:00:00 2001 From: sirjonasxx <36828922+sirjonasxx@users.noreply.github.com> Date: Thu, 11 Jun 2020 20:25:51 +0200 Subject: [PATCH] implement asynchrone extensionsupport --- .../java/gearth/protocol/HConnection.java | 11 + .../connection/proxy/ProxyProvider.java | 4 +- .../packethandler/IncomingPacketHandler.java | 5 +- .../packethandler/OutgoingPacketHandler.java | 5 +- .../protocol/packethandler/PacketHandler.java | 51 +++-- .../extensionhandler/ExtensionHandler.java | 199 ++++++++++-------- .../extensionhandler/OnHMessageHandled.java | 11 + 7 files changed, 174 insertions(+), 112 deletions(-) create mode 100644 G-Earth/src/main/java/gearth/services/extensionhandler/OnHMessageHandled.java diff --git a/G-Earth/src/main/java/gearth/protocol/HConnection.java b/G-Earth/src/main/java/gearth/protocol/HConnection.java index 10fd082..1a84a6d 100644 --- a/G-Earth/src/main/java/gearth/protocol/HConnection.java +++ b/G-Earth/src/main/java/gearth/protocol/HConnection.java @@ -7,6 +7,7 @@ import gearth.protocol.connection.proxy.ProxyProvider; import gearth.protocol.connection.proxy.ProxyProviderFactory; import gearth.protocol.connection.proxy.unix.LinuxRawIpProxyProvider; import gearth.protocol.connection.proxy.windows.WindowsRawIpProxyProvider; +import gearth.services.extensionhandler.ExtensionHandler; import java.io.IOException; @@ -15,6 +16,8 @@ public class HConnection { public static volatile boolean DECRYPTPACKETS = true; public static volatile boolean DEBUG = false; + private volatile ExtensionHandler extensionHandler = null; + private volatile Object[] trafficObservables = {new Observable(), new Observable(), new Observable()}; private volatile Observable stateObservable = new Observable<>(); @@ -100,6 +103,14 @@ public class HConnection { ((Observable) trafficObservables[2]).removeListener(listener); } + public void setExtensionHandler(ExtensionHandler handler) { + this.extensionHandler = handler; + } + + public ExtensionHandler getExtensionHandler() { + return extensionHandler; + } + public Object[] getTrafficObservables() { return trafficObservables; } diff --git a/G-Earth/src/main/java/gearth/protocol/connection/proxy/ProxyProvider.java b/G-Earth/src/main/java/gearth/protocol/connection/proxy/ProxyProvider.java index c6a7881..a1faa47 100644 --- a/G-Earth/src/main/java/gearth/protocol/connection/proxy/ProxyProvider.java +++ b/G-Earth/src/main/java/gearth/protocol/connection/proxy/ProxyProvider.java @@ -44,8 +44,8 @@ public abstract class ProxyProvider { if (HConnection.DEBUG) System.out.println(server.getLocalAddress().getHostAddress() + ": " + server.getLocalPort()); Rc4Obtainer rc4Obtainer = new Rc4Obtainer(hConnection); - OutgoingPacketHandler outgoingHandler = new OutgoingPacketHandler(server.getOutputStream(), hConnection.getTrafficObservables()); - IncomingPacketHandler incomingHandler = new IncomingPacketHandler(client.getOutputStream(), hConnection.getTrafficObservables(), outgoingHandler); + OutgoingPacketHandler outgoingHandler = new OutgoingPacketHandler(server.getOutputStream(), hConnection.getTrafficObservables(), hConnection.getExtensionHandler()); + IncomingPacketHandler incomingHandler = new IncomingPacketHandler(client.getOutputStream(), hConnection.getTrafficObservables(), outgoingHandler, hConnection.getExtensionHandler()); rc4Obtainer.setPacketHandlers(outgoingHandler, incomingHandler); Semaphore abort = new Semaphore(0); diff --git a/G-Earth/src/main/java/gearth/protocol/packethandler/IncomingPacketHandler.java b/G-Earth/src/main/java/gearth/protocol/packethandler/IncomingPacketHandler.java index a5eb858..f968241 100644 --- a/G-Earth/src/main/java/gearth/protocol/packethandler/IncomingPacketHandler.java +++ b/G-Earth/src/main/java/gearth/protocol/packethandler/IncomingPacketHandler.java @@ -4,6 +4,7 @@ import gearth.misc.listenerpattern.Observable; import gearth.protocol.HMessage; import gearth.protocol.HPacket; import gearth.protocol.TrafficListener; +import gearth.services.extensionhandler.ExtensionHandler; import java.io.IOException; import java.io.OutputStream; @@ -11,8 +12,8 @@ import java.util.List; public class IncomingPacketHandler extends PacketHandler { - public IncomingPacketHandler(OutputStream outputStream, Object[] trafficObservables, OutgoingPacketHandler outgoingHandler) { - super(outputStream, trafficObservables); + public IncomingPacketHandler(OutputStream outputStream, Object[] trafficObservables, OutgoingPacketHandler outgoingHandler, ExtensionHandler extensionHandler) { + super(outputStream, trafficObservables, extensionHandler); TrafficListener listener = new TrafficListener() { @Override diff --git a/G-Earth/src/main/java/gearth/protocol/packethandler/OutgoingPacketHandler.java b/G-Earth/src/main/java/gearth/protocol/packethandler/OutgoingPacketHandler.java index da6eb2a..156eec6 100644 --- a/G-Earth/src/main/java/gearth/protocol/packethandler/OutgoingPacketHandler.java +++ b/G-Earth/src/main/java/gearth/protocol/packethandler/OutgoingPacketHandler.java @@ -3,6 +3,7 @@ package gearth.protocol.packethandler; import gearth.misc.listenerpattern.Observable; import gearth.protocol.HMessage; import gearth.protocol.HPacket; +import gearth.services.extensionhandler.ExtensionHandler; import java.io.IOException; import java.io.OutputStream; @@ -12,8 +13,8 @@ import java.util.function.Consumer; public class OutgoingPacketHandler extends PacketHandler { - public OutgoingPacketHandler(OutputStream outputStream, Object[] trafficObservables) { - super(outputStream, trafficObservables); + public OutgoingPacketHandler(OutputStream outputStream, Object[] trafficObservables, ExtensionHandler extensionHandler) { + super(outputStream, trafficObservables, extensionHandler); } diff --git a/G-Earth/src/main/java/gearth/protocol/packethandler/PacketHandler.java b/G-Earth/src/main/java/gearth/protocol/packethandler/PacketHandler.java index 49b9379..6151c3b 100644 --- a/G-Earth/src/main/java/gearth/protocol/packethandler/PacketHandler.java +++ b/G-Earth/src/main/java/gearth/protocol/packethandler/PacketHandler.java @@ -6,6 +6,8 @@ import gearth.protocol.HMessage; import gearth.protocol.HPacket; import gearth.protocol.TrafficListener; import gearth.protocol.crypto.RC4; +import gearth.services.extensionhandler.ExtensionHandler; +import gearth.services.extensionhandler.OnHMessageHandled; import java.io.IOException; import java.io.OutputStream; @@ -18,6 +20,7 @@ public abstract class PacketHandler { private volatile PayloadBuffer payloadBuffer = new PayloadBuffer(); private volatile OutputStream out; + private volatile ExtensionHandler extensionHandler; private volatile Object[] trafficObservables; //get notified on packet send private volatile boolean isTempBlocked = false; volatile boolean isDataStream = false; @@ -33,8 +36,9 @@ public abstract class PacketHandler { volatile boolean isEncryptedStream = false; - PacketHandler(OutputStream outputStream, Object[] trafficObservables) { + PacketHandler(OutputStream outputStream, Object[] trafficObservables, ExtensionHandler extensionHandler) { this.trafficObservables = trafficObservables; + this.extensionHandler = extensionHandler; out = outputStream; } @@ -115,13 +119,11 @@ public abstract class PacketHandler { * LISTENERS CAN EDIT THE MESSAGE BEFORE BEING SENT * @param message */ - void notifyListeners(HMessage message) { - for (int x = 0; x < 3; x++) { - ((Observable) trafficObservables[x]).fireEvent(trafficListener -> { - message.getPacket().resetReadIndex(); - trafficListener.onCapture(message); - }); - } + private void notifyListeners(int i, HMessage message) { + ((Observable) trafficObservables[i]).fireEvent(trafficListener -> { + message.getPacket().resetReadIndex(); + trafficListener.onCapture(message); + }); message.getPacket().resetReadIndex(); } @@ -146,19 +148,32 @@ public abstract class PacketHandler { for (HPacket hpacket : hpackets){ HMessage hMessage = new HMessage(hpacket, getMessageSide(), currentIndex); boolean isencrypted = isEncryptedStream; + + OnHMessageHandled afterExtensionIntercept = hMessage1 -> { + if (isDataStream) { + notifyListeners(2, hMessage1); + } + + if (!hMessage1.isBlocked()) { + synchronized (sendLock) { + out.write( + (!isencrypted) + ? hMessage1.getPacket().toBytes() + : encryptcipher.rc4(hMessage1.getPacket().toBytes()) + ); + } + } + }; + if (isDataStream) { - notifyListeners(hMessage); + notifyListeners(0, hMessage); + notifyListeners(1, hMessage); + extensionHandler.handle(hMessage, afterExtensionIntercept); + } + else { + afterExtensionIntercept.finished(hMessage); } - if (!hMessage.isBlocked()) { - synchronized (sendLock) { - out.write( - (!isencrypted) - ? hMessage.getPacket().toBytes() - : encryptcipher.rc4(hMessage.getPacket().toBytes()) - ); - } - } currentIndex++; } } diff --git a/G-Earth/src/main/java/gearth/services/extensionhandler/ExtensionHandler.java b/G-Earth/src/main/java/gearth/services/extensionhandler/ExtensionHandler.java index ea5b071..18c8642 100644 --- a/G-Earth/src/main/java/gearth/services/extensionhandler/ExtensionHandler.java +++ b/G-Earth/src/main/java/gearth/services/extensionhandler/ExtensionHandler.java @@ -8,13 +8,14 @@ import gearth.protocol.HMessage; import gearth.protocol.HPacket; import gearth.protocol.connection.HState; import gearth.services.extensionhandler.extensions.ExtensionListener; +import gearth.services.extensionhandler.extensions.GEarthExtension; import gearth.services.extensionhandler.extensions.extensionproducers.ExtensionProducer; import gearth.services.extensionhandler.extensions.extensionproducers.ExtensionProducerFactory; import gearth.services.extensionhandler.extensions.extensionproducers.ExtensionProducerObserver; -import gearth.services.extensionhandler.extensions.GEarthExtension; +import javafx.util.Pair; +import java.io.IOException; import java.util.*; -import java.util.function.Consumer; public class ExtensionHandler { @@ -31,9 +32,15 @@ public class ExtensionHandler { } }; + private final Map> awaitManipulation = new HashMap<>(); + private final Map finishManipulationCallback = new HashMap<>(); + private final Map, HMessage> originalMessages = new HashMap<>(); + private final Map editedMessages = new HashMap<>(); + private final TreeSet allAwaitingMessages = new TreeSet<>(Comparator.comparingInt(HMessage::getIndex)); public ExtensionHandler(HConnection hConnection) { this.hConnection = hConnection; + hConnection.setExtensionHandler(this); initialize(); } @@ -54,7 +61,7 @@ public class ExtensionHandler { } } if (oldState == HState.CONNECTED) { - synchronized (hConnection) { + synchronized (gEarthExtensions) { for (GEarthExtension extension : gEarthExtensions) { extension.connectionEnd(); } @@ -62,93 +69,105 @@ public class ExtensionHandler { } }); - - hConnection.addTrafficListener(1, message -> { - Set collection; - synchronized (gEarthExtensions) { - collection = new HashSet<>(gEarthExtensions); - } - HMessage result = new HMessage(message); - - boolean[] isblock = new boolean[1]; - synchronized (collection) { - for (GEarthExtension extension : collection) { - ExtensionListener respondCallback = new ExtensionListener() { - @Override - public void manipulatedPacket(HMessage responseMessage) { - if (responseMessage.getDestination() == message.getDestination() && responseMessage.getIndex() == message.getIndex()) { - synchronized (result) { - if (!message.equals(responseMessage)) { - result.constructFromHMessage(responseMessage); - } - if (responseMessage.isBlocked()) { - isblock[0] = true; - } - synchronized (collection) { - collection.remove(extension); - } - - synchronized (extension) { - extension.getExtensionObservable().removeListener(this); - } - } - } - } - }; - synchronized (extension) { - extension.getExtensionObservable().addListener(respondCallback); - } - } - } - - Set collection2; - synchronized (collection) { - collection2 = new HashSet<>(collection); - } - - synchronized (collection2) { - for (GEarthExtension extension : collection2) { - synchronized (extension) { - extension.packetIntercept(new HMessage(message)); - } - } - } - - //block untill all extensions have responded - List willdelete = new ArrayList<>(); - while (true) { - synchronized (collection) { - if (collection.isEmpty()) { - break; - } - - synchronized (gEarthExtensions) { - for (GEarthExtension extension : collection) { - if (!gEarthExtensions.contains(extension)) willdelete.add(extension); - } - } - - for (int i = willdelete.size() - 1; i >= 0; i--) { - collection.remove(willdelete.get(i)); - willdelete.remove(i); - } - } - - - try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();} - } - - message.constructFromHMessage(result); - - if (isblock[0]) { - message.setBlocked(true); - } - }); - extensionProducers = ExtensionProducerFactory.getAll(); extensionProducers.forEach(this::initializeExtensionProducer); } + + private final Object hMessageStuffLock = new Object(); + private void onExtensionRespond(GEarthExtension extension, HMessage edited) { + HMessage hMessage; + + synchronized (hMessageStuffLock) { + Pair msgDirAndId = new Pair<>(edited.getDestination(), edited.getIndex()); + hMessage = originalMessages.get(msgDirAndId); + + if (awaitManipulation.containsKey(hMessage)) { + awaitManipulation.get(hMessage).remove(extension); + + boolean wasBlocked = hMessage.isBlocked() || + (editedMessages.get(hMessage) != null && editedMessages.get(hMessage).isBlocked()); + + if (!hMessage.equals(edited)) { + editedMessages.put(hMessage, edited); + if (wasBlocked) { + editedMessages.get(hMessage).setBlocked(true); + } + } + } + else { + hMessage = null; + } + } + + if (hMessage != null) { + maybeFinishHmessage(hMessage); + } + } + private void onExtensionRemoved(GEarthExtension extension) { + List awaiting; + synchronized (hMessageStuffLock) { + awaiting = new ArrayList<>(allAwaitingMessages); + } + for (HMessage hMessage : awaiting) { + synchronized (hMessageStuffLock) { + awaitManipulation.get(hMessage).remove(extension); + } + maybeFinishHmessage(hMessage); + } + } + + // argument is the original hmessage, not an edited one + private void maybeFinishHmessage(HMessage hMessage) { + OnHMessageHandled maybeCallback = null; + HMessage result = null; + + synchronized (hMessageStuffLock) { + if (hMessage != null && awaitManipulation.containsKey(hMessage)) { + boolean isFinished = awaitManipulation.get(hMessage).isEmpty(); + + if (isFinished) { + awaitManipulation.remove(hMessage); + result = editedMessages.get(hMessage) == null ? hMessage : editedMessages.get(hMessage); + editedMessages.remove(hMessage); + originalMessages.remove(new Pair<>(result.getDestination(), result.getIndex())); + allAwaitingMessages.remove(hMessage); + maybeCallback = finishManipulationCallback.remove(hMessage); + } + } + } + + if (maybeCallback != null) { + try { + maybeCallback.finished(result); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + public void handle(HMessage hMessage, OnHMessageHandled callback) { + synchronized (hMessageStuffLock) { + Pair msgDirectionAndId = new Pair<>(hMessage.getDestination(), hMessage.getIndex()); + originalMessages.put(msgDirectionAndId, hMessage); + finishManipulationCallback.put(hMessage, callback); + editedMessages.put(hMessage, null); + allAwaitingMessages.add(hMessage); + + synchronized (gEarthExtensions) { + awaitManipulation.put(hMessage, new HashSet<>(gEarthExtensions)); + + for (GEarthExtension extension : gEarthExtensions) { + extension.packetIntercept(hMessage); + } + } + } + + + maybeFinishHmessage(hMessage); + } + + + private void initializeExtensionProducer(ExtensionProducer producer) { producer.startProducing(new ExtensionProducerObserver() { @Override @@ -179,9 +198,15 @@ public class ExtensionHandler { synchronized (gEarthExtensions) { gEarthExtensions.remove(extension); } + onExtensionRemoved(extension); extension.getExtensionObservable().removeListener(this); extension.getDeletedObservable().fireEvent(); } + + @Override + protected void manipulatedPacket(HMessage hMessage) { + onExtensionRespond(extension, hMessage); + } }; extension.getExtensionObservable().addListener(listener); @@ -204,11 +229,9 @@ public class ExtensionHandler { }); } - public List getExtensionProducers() { return extensionProducers; } - public Observable getObservable() { return observable; } diff --git a/G-Earth/src/main/java/gearth/services/extensionhandler/OnHMessageHandled.java b/G-Earth/src/main/java/gearth/services/extensionhandler/OnHMessageHandled.java new file mode 100644 index 0000000..8cce3f1 --- /dev/null +++ b/G-Earth/src/main/java/gearth/services/extensionhandler/OnHMessageHandled.java @@ -0,0 +1,11 @@ +package gearth.services.extensionhandler; + +import gearth.protocol.HMessage; + +import java.io.IOException; + +public interface OnHMessageHandled { + + void finished(HMessage hMessage) throws IOException; + +}