From 14d86accba51a9a9ed99226fb80fa1bbc4de6a64 Mon Sep 17 00:00:00 2001 From: Moritz Heidkamp Date: Sun, 11 Feb 2024 18:36:25 +0100 Subject: [PATCH] HTTP client request cancellation This patch changes `aleph.http/request` so that setting the response deferred to an error status will terminate an in-flight request. Closes #712. --- .../utils/RequestCancellationException.java | 23 +++ src/aleph/http.clj | 186 ++++++++++-------- test/aleph/http_test.clj | 24 ++- 3 files changed, 145 insertions(+), 88 deletions(-) create mode 100644 src-java/aleph/utils/RequestCancellationException.java diff --git a/src-java/aleph/utils/RequestCancellationException.java b/src-java/aleph/utils/RequestCancellationException.java new file mode 100644 index 00000000..948c8788 --- /dev/null +++ b/src-java/aleph/utils/RequestCancellationException.java @@ -0,0 +1,23 @@ +package aleph.utils; + +import java.util.concurrent.CancellationException; + +public class RequestCancellationException extends CancellationException { + + public RequestCancellationException() { } + + public RequestCancellationException(String message) { + super(message); + } + + public RequestCancellationException(Throwable cause) { + super(cause.getMessage()); + initCause(cause); + } + + public RequestCancellationException(String message, Throwable cause) { + super(message); + initCause(cause); + } + +} diff --git a/src/aleph/http.clj b/src/aleph/http.clj index 247449c1..bc38993d 100644 --- a/src/aleph/http.clj +++ b/src/aleph/http.clj @@ -20,6 +20,7 @@ ConnectionTimeoutException PoolTimeoutException ReadTimeoutException + RequestCancellationException RequestTimeoutException) (io.aleph.dirigiste Pools) (io.netty.handler.codec Headers) @@ -358,96 +359,109 @@ middleware identity connection-timeout 6e4} ;; 60 seconds :as req}] - - (executor/with-executor response-executor - ((middleware - (fn [req] - (let [k (client/req->domain req) - start (System/currentTimeMillis)] - - ;; acquire a connection - (-> (flow/acquire pool k) - (maybe-timeout! pool-timeout) - - ;; pool timeout triggered - (d/catch' TimeoutException - (fn [^Throwable e] - (d/error-deferred (PoolTimeoutException. e)))) - - (d/chain' - (fn [conn] - - ;; get the wrapper for the connection, which may or may not be realized yet - (-> (first conn) - (maybe-timeout! connection-timeout) - - ;; connection timeout triggered, dispose of the connetion - (d/catch' TimeoutException - (fn [^Throwable e] - (log/error e "Timed out waiting for connection to be established") - (flow/dispose pool k conn) - (d/error-deferred (ConnectionTimeoutException. e)))) - - ;; connection failed, bail out - (d/catch' - (fn [e] - (log/error e "Connection failure") - (flow/dispose pool k conn) - (d/error-deferred e))) - - ;; actually make the request now - (d/chain' - (fn [conn'] - (when-not (nil? conn') - (let [end (System/currentTimeMillis)] - (-> (conn' req) - (maybe-timeout! request-timeout) - - ;; request timeout triggered, dispose of the connection - (d/catch' TimeoutException - (fn [^Throwable e] - (flow/dispose pool k conn) - (d/error-deferred (RequestTimeoutException. e)))) - - ;; request failed, dispose of the connection - (d/catch' + (let [dispose-conn! (atom (fn [])) + result (d/deferred nil) + response (executor/with-executor response-executor + ((middleware + (fn [req] + (let [k (client/req->domain req) + start (System/currentTimeMillis)] + + ;; acquire a connection + (-> (flow/acquire pool k) + (maybe-timeout! pool-timeout) + + ;; pool timeout triggered + (d/catch' TimeoutException + (fn [^Throwable e] + (d/error-deferred (PoolTimeoutException. e)))) + + (d/chain' + (fn [conn] + (reset! dispose-conn! (fn [] (flow/dispose pool k conn))) + + ;; get the wrapper for the connection, which may or may not be realized yet + (-> (first conn) + (maybe-timeout! connection-timeout) + + ;; connection timeout triggered, dispose of the connetion + (d/catch' TimeoutException + (fn [^Throwable e] + (log/error e "Timed out waiting for connection to be established") + (flow/dispose pool k conn) + (d/error-deferred (ConnectionTimeoutException. e)))) + + ;; connection failed, bail out + (d/catch' (fn [e] - (log/trace "Request failed. Disposing of connection...") + (log/error e "Connection failure") (flow/dispose pool k conn) (d/error-deferred e))) - ;; clean up the connection - (d/chain' - (fn cleanup-conn [rsp] - - ;; either destroy/dispose of the conn, or release it back for reuse - (-> (:aleph/destroy-conn? rsp) - (maybe-timeout! read-timeout) - - (d/catch' TimeoutException - (fn [^Throwable e] - (log/trace "Request timed out. Disposing of connection...") - (flow/dispose pool k conn) - (d/error-deferred (ReadTimeoutException. e)))) - - (d/chain' - (fn [early?] - (if (or early? - (not (:aleph/keep-alive? rsp)) - (<= 400 (:status rsp))) - (do - (log/trace "Connection finished. Disposing...") - (flow/dispose pool k conn)) - (flow/release pool k conn))))) - (-> rsp - (dissoc :aleph/destroy-conn?) - (assoc :connection-time (- end start))))))))) - - (fn handle-response [rsp] - (->> rsp - (middleware/handle-cookies req) - (middleware/handle-redirects request req))))))))))) - req)))) + ;; actually make the request now + (d/chain' + (fn [conn'] + (when-not (nil? conn') + (let [end (System/currentTimeMillis)] + (-> (conn' req) + (maybe-timeout! request-timeout) + + ;; request timeout triggered, dispose of the connection + (d/catch' TimeoutException + (fn [^Throwable e] + (flow/dispose pool k conn) + (d/error-deferred (RequestTimeoutException. e)))) + + ;; request failed, dispose of the connection + (d/catch' + (fn [e] + (log/trace "Request failed. Disposing of connection...") + (flow/dispose pool k conn) + (d/error-deferred e))) + + ;; clean up the connection + (d/chain' + (fn cleanup-conn [rsp] + + ;; either destroy/dispose of the conn, or release it back for reuse + (-> (:aleph/destroy-conn? rsp) + (maybe-timeout! read-timeout) + + (d/catch' TimeoutException + (fn [^Throwable e] + (log/trace "Request timed out. Disposing of connection...") + (flow/dispose pool k conn) + (d/error-deferred (ReadTimeoutException. e)))) + + (d/chain' + (fn [early?] + (if (or early? + (not (:aleph/keep-alive? rsp)) + (<= 400 (:status rsp))) + (do + (log/trace "Connection finished. Disposing...") + (flow/dispose pool k conn)) + (flow/release pool k conn))))) + (-> rsp + (dissoc :aleph/destroy-conn?) + (assoc :connection-time (- end start))))))))) + + (fn handle-response [rsp] + (->> rsp + (middleware/handle-cookies req) + (middleware/handle-redirects request req))))))))))) + req))] + (d/connect response result) + (d/catch' result + RequestCancellationException + (fn [e] + (log/trace e "Request cancelled. Disposing of connection...") + (@dispose-conn!) + (d/error-deferred e))) + result))) + +(defn cancel-request! [r] + (d/error! r (RequestCancellationException. "Request cancelled"))) (defn- req ([method url] diff --git a/test/aleph/http_test.clj b/test/aleph/http_test.clj index 0ee91a40..4b9ee42e 100644 --- a/test/aleph/http_test.clj +++ b/test/aleph/http_test.clj @@ -17,6 +17,7 @@ (:import (aleph.utils ConnectionTimeoutException + RequestCancellationException RequestTimeoutException) (clojure.lang ExceptionInfo) @@ -1073,9 +1074,13 @@ (Thread/sleep 5) (s/put! s (encode-http-object response)))) +(defmacro with-tcp-server [handler & body] + `(with-server (tcp/start-server ~handler {:port port + :shutdown-timeout 0}) + ~@body)) + (defmacro with-tcp-response [response & body] - `(with-server (tcp/start-server (tcp-handler ~response) {:port port - :shutdown-timeout 0}) + `(with-server (with-tcp-server (tcp-handler ~response)) ~@body)) (defmacro with-tcp-request-handler [handler options request & body] @@ -1438,3 +1443,18 @@ :http-versions [:http1]})] (is (instance? IllegalArgumentException result)) (is (= "use-h2c? may only be true when HTTP/2 is enabled." (ex-message result)))))) + +(deftest test-in-flight-request-cancellation + (let [conn-established (promise) + conn-closed (promise)] + (with-tcp-server (fn [s _] + (deliver conn-established true) + ;; Required for the client close to be detected + (s/consume identity s) + (s/on-closed s (fn [] + (deliver conn-closed true)))) + (let [rsp (http-get "/")] + (is (= true (deref conn-established 1000 :timeout))) + (http/cancel-request! rsp) + (is (= true (deref conn-closed 1000 :timeout))) + (is (thrown? RequestCancellationException (deref rsp 1000 :timeout)))))))