From f14ce638e7f2e3c12f5310f73b0544cff7b8d6dd Mon Sep 17 00:00:00 2001 From: Lucas Pluvinage Date: Thu, 30 Jun 2022 13:29:02 +0200 Subject: [PATCH 01/11] extract low_level testing --- test/low_level.ml | 150 +++++++++++++++++++++++++++++++++++++++++++ test/test_rfc5961.ml | 147 +----------------------------------------- 2 files changed, 151 insertions(+), 146 deletions(-) create mode 100644 test/low_level.ml diff --git a/test/low_level.ml b/test/low_level.ml new file mode 100644 index 00000000..1c7f0560 --- /dev/null +++ b/test/low_level.ml @@ -0,0 +1,150 @@ +open Lwt.Infix + +(* + * Connects two stacks to the same backend. + * One is a complete v4 stack (the system under test, referred to as [sut]). + * The other gives us low level access to inject crafted TCP packets, + * and sends and receives crafted packets to check the [sut] behavior. + *) +module VNETIF_STACK = Vnetif_common.VNETIF_STACK(Vnetif_backends.Basic) + +module Time = Vnetif_common.Time +module V = Vnetif.Make(Vnetif_backends.Basic) +module E = Ethernet.Make(V) +module A = Arp.Make(E)(Time) +module I = Static_ipv4.Make(Mirage_random_test)(Vnetif_common.Clock)(E)(A) +module Wire = Tcp.Wire +module WIRE = Wire.Make(I) +module Tcp_wire = Tcp.Tcp_wire +module Tcp_unmarshal = Tcp.Tcp_packet.Unmarshal +module Sequence = Tcp.Sequence + +let sut_cidr = Ipaddr.V4.Prefix.of_string_exn "10.0.0.101/24" +let server_ip = Ipaddr.V4.of_string_exn "10.0.0.100" +let server_cidr = Ipaddr.V4.Prefix.make 24 server_ip +let gateway = Ipaddr.V4.of_string_exn "10.0.0.1" + +let header_size = Ethernet.Packet.sizeof_ethernet + + + +(* defaults when injecting packets *) +let options = [] +let window = 5120 + +(* Helper functions *) +let reply_id_from ~src ~dst data = + let sport = Tcp_wire.get_tcp_src_port data in + let dport = Tcp_wire.get_tcp_dst_port data in + WIRE.v ~dst_port:sport ~dst:src ~src_port:dport ~src:dst + +let ack_for data = + match Tcp_unmarshal.of_cstruct data with + | Error s -> Alcotest.fail ("attempting to ack data: " ^ s) + | Ok (packet, data) -> + let open Tcp.Tcp_packet in + let data_len = + Sequence.of_int ((Cstruct.length data) + + (if packet.fin then 1 else 0) + + (if packet.syn then 1 else 0)) in + let sequence = packet.sequence in + let ack_n = Sequence.(add sequence data_len) in + ack_n + +let ack data = + Some(ack_for data) + +let ack_in_future data off = + Some Sequence.(add (ack_for data) (of_int off)) + +let ack_from_past data off = + Some Sequence.(sub (ack_for data) (of_int off)) + +let fail_result_not_expected fail = function + | Error _err -> + fail "error not expected" + | Ok `Eof -> + fail "eof" + | Ok (`Data data) -> + Alcotest.fail (Format.asprintf "data not expected but received: %a" + Cstruct.hexdump_pp data) + + + +let create_sut_stack backend = + VNETIF_STACK.create_stack ~cidr:sut_cidr ~gateway backend + +let create_raw_stack backend = + V.connect backend >>= fun netif -> + E.connect netif >>= fun ethif -> + A.connect ethif >>= fun arpv4 -> + I.connect ~cidr:server_cidr ~gateway ethif arpv4 >>= fun ip -> + Lwt.return (netif, ethif, arpv4, ip) + +type 'state fsm_result = + | Fsm_next of 'state + | Fsm_done + | Fsm_error of string + +(* This could be moved to a common module and reused for other low level tcp tests *) + +(* setups network and run a given sut and raw fsm *) +let run backend fsm sut () = + let initial_state, fsm_handler = fsm in + create_sut_stack backend >>= fun stackv4 -> + create_raw_stack backend >>= fun (netif, ethif, arp, rawip) -> + let error_mbox = Lwt_mvar.create_empty () in + let stream, pushf = Lwt_stream.create () in + Lwt.pick [ + VNETIF_STACK.Stackv4.listen stackv4; + + (* Consume TCP packets one by one, in sequence *) + let rec fsm_thread state = + Lwt_stream.next stream >>= fun (src, dst, data) -> + fsm_handler rawip state ~src ~dst data >>= function + | Fsm_next s -> + fsm_thread s + | Fsm_done -> + Lwt.return_unit + | Fsm_error err -> + Lwt_mvar.put error_mbox err >>= fun () -> + (* it will be terminated anyway when the error is picked up *) + fsm_thread state in + + Lwt.async (fun () -> + (V.listen netif ~header_size + (E.input + ~arpv4:(A.input arp) + ~ipv4:(I.input + ~tcp: (fun ~src ~dst data -> pushf (Some(src,dst,data)); Lwt.return_unit) + ~udp:(fun ~src:_ ~dst:_ _data -> Lwt.return_unit) + ~default:(fun ~proto ~src ~dst _data -> + Logs.debug (fun f -> f "default handler invoked for packet from %a to %a, protocol %d -- dropping" Ipaddr.V4.pp src Ipaddr.V4.pp dst proto); Lwt.return_unit) + rawip + ) + ~ipv6:(fun _buf -> + Logs.debug (fun f -> f "IPv6 packet -- dropping"); + Lwt.return_unit) + ethif) ) >|= fun _ -> ()); + + (* Either both fsm and the sut terminates, or a timeout occurs, or one of the sut/fsm informs an error *) + Lwt.pick [ + (Time.sleep_ns (Duration.of_sec 5) >>= fun () -> + Lwt.return_some "timed out"); + + (Lwt.join [ + (fsm_thread initial_state); + + (* time to let the other end connects to the network and listen. + * Otherwise initial syn might need to be repeated slowing down the test *) + (Time.sleep_ns (Duration.of_ms 100) >>= fun () -> + sut stackv4 (Lwt_mvar.put error_mbox) >>= fun _ -> + Time.sleep_ns (Duration.of_ms 100)); + ] >>= fun () -> Lwt.return_none); + + (Lwt_mvar.take error_mbox >>= fun cause -> + Lwt.return_some cause); + ] >|= function + | None -> () + | Some err -> Alcotest.fail err + ] diff --git a/test/test_rfc5961.ml b/test/test_rfc5961.ml index fddc824f..694cdb12 100644 --- a/test/test_rfc5961.ml +++ b/test/test_rfc5961.ml @@ -16,152 +16,7 @@ open Common open Lwt.Infix -(* - * Connects two stacks to the same backend. - * One is a complete v4 stack (the system under test, referred to as [sut]). - * The other gives us low level access to inject crafted TCP packets, - * and sends and receives crafted packets to check the [sut] behavior. - *) -module VNETIF_STACK = Vnetif_common.VNETIF_STACK(Vnetif_backends.Basic) - -module Time = Vnetif_common.Time -module V = Vnetif.Make(Vnetif_backends.Basic) -module E = Ethernet.Make(V) -module A = Arp.Make(E)(Time) -module I = Static_ipv4.Make(Mirage_random_test)(Vnetif_common.Clock)(E)(A) -module Wire = Tcp.Wire -module WIRE = Wire.Make(I) -module Tcp_wire = Tcp.Tcp_wire -module Tcp_unmarshal = Tcp.Tcp_packet.Unmarshal -module Sequence = Tcp.Sequence - -let sut_cidr = Ipaddr.V4.Prefix.of_string_exn "10.0.0.101/24" -let server_ip = Ipaddr.V4.of_string_exn "10.0.0.100" -let server_cidr = Ipaddr.V4.Prefix.make 24 server_ip -let gateway = Ipaddr.V4.of_string_exn "10.0.0.1" - -let header_size = Ethernet.Packet.sizeof_ethernet - -(* defaults when injecting packets *) -let options = [] -let window = 5120 - -let create_sut_stack backend = - VNETIF_STACK.create_stack ~cidr:sut_cidr ~gateway backend - -let create_raw_stack backend = - V.connect backend >>= fun netif -> - E.connect netif >>= fun ethif -> - A.connect ethif >>= fun arpv4 -> - I.connect ~cidr:server_cidr ~gateway ethif arpv4 >>= fun ip -> - Lwt.return (netif, ethif, arpv4, ip) - -type 'state fsm_result = - | Fsm_next of 'state - | Fsm_done - | Fsm_error of string - -(* This could be moved to a common module and reused for other low level tcp tests *) - -(* setups network and run a given sut and raw fsm *) -let run backend fsm sut () = - let initial_state, fsm_handler = fsm in - create_sut_stack backend >>= fun stackv4 -> - create_raw_stack backend >>= fun (netif, ethif, arp, rawip) -> - let error_mbox = Lwt_mvar.create_empty () in - let stream, pushf = Lwt_stream.create () in - Lwt.pick [ - VNETIF_STACK.Stackv4.listen stackv4; - - (* Consume TCP packets one by one, in sequence *) - let rec fsm_thread state = - Lwt_stream.next stream >>= fun (src, dst, data) -> - fsm_handler rawip state ~src ~dst data >>= function - | Fsm_next s -> - fsm_thread s - | Fsm_done -> - Lwt.return_unit - | Fsm_error err -> - Lwt_mvar.put error_mbox err >>= fun () -> - (* it will be terminated anyway when the error is picked up *) - fsm_thread state in - - Lwt.async (fun () -> - (V.listen netif ~header_size - (E.input - ~arpv4:(A.input arp) - ~ipv4:(I.input - ~tcp: (fun ~src ~dst data -> pushf (Some(src,dst,data)); Lwt.return_unit) - ~udp:(fun ~src:_ ~dst:_ _data -> Lwt.return_unit) - ~default:(fun ~proto ~src ~dst _data -> - Logs.debug (fun f -> f "default handler invoked for packet from %a to %a, protocol %d -- dropping" Ipaddr.V4.pp src Ipaddr.V4.pp dst proto); Lwt.return_unit) - rawip - ) - ~ipv6:(fun _buf -> - Logs.debug (fun f -> f "IPv6 packet -- dropping"); - Lwt.return_unit) - ethif) ) >|= fun _ -> ()); - - (* Either both fsm and the sut terminates, or a timeout occurs, or one of the sut/fsm informs an error *) - Lwt.pick [ - (Time.sleep_ns (Duration.of_sec 5) >>= fun () -> - Lwt.return_some "timed out"); - - (Lwt.join [ - (fsm_thread initial_state); - - (* time to let the other end connects to the network and listen. - * Otherwise initial syn might need to be repeated slowing down the test *) - (Time.sleep_ns (Duration.of_ms 100) >>= fun () -> - sut stackv4 (Lwt_mvar.put error_mbox) >>= fun _ -> - Time.sleep_ns (Duration.of_ms 100)); - ] >>= fun () -> Lwt.return_none); - - (Lwt_mvar.take error_mbox >>= fun cause -> - Lwt.return_some cause); - ] >|= function - | None -> () - | Some err -> Alcotest.fail err - ] - - -(* Helper functions *) -let reply_id_from ~src ~dst data = - let sport = Tcp_wire.get_tcp_src_port data in - let dport = Tcp_wire.get_tcp_dst_port data in - WIRE.v ~dst_port:sport ~dst:src ~src_port:dport ~src:dst - -let ack_for data = - match Tcp_unmarshal.of_cstruct data with - | Error s -> Alcotest.fail ("attempting to ack data: " ^ s) - | Ok (packet, data) -> - let open Tcp.Tcp_packet in - let data_len = - Sequence.of_int ((Cstruct.length data) + - (if packet.fin then 1 else 0) + - (if packet.syn then 1 else 0)) in - let sequence = packet.sequence in - let ack_n = Sequence.(add sequence data_len) in - ack_n - -let ack data = - Some(ack_for data) - -let ack_in_future data off = - Some Sequence.(add (ack_for data) (of_int off)) - -let ack_from_past data off = - Some Sequence.(sub (ack_for data) (of_int off)) - -let fail_result_not_expected fail = function - | Error _err -> - fail "error not expected" - | Ok `Eof -> - fail "eof" - | Ok (`Data data) -> - Alcotest.fail (Format.asprintf "data not expected but received: %a" - Cstruct.hexdump_pp data) - +open Low_level (* Test scenarios *) From 1ba2e7e43e4975cf19391785baf3e6e17db3dc2e Mon Sep 17 00:00:00 2001 From: Lucas Pluvinage Date: Thu, 30 Jun 2022 10:51:25 +0200 Subject: [PATCH 02/11] add failing test for simultaneous close --- src/tcp/flow.ml | 2 + src/tcp/flow.mli | 6 ++ test/test.ml | 3 +- test/test_simulatenous_close.ml | 111 ++++++++++++++++++++++++++++++++ test/vnetif_common.ml | 12 +++- 5 files changed, 130 insertions(+), 4 deletions(-) create mode 100644 test/test_simulatenous_close.ml diff --git a/src/tcp/flow.ml b/src/tcp/flow.ml index 9b3a42a7..1712f6b2 100644 --- a/src/tcp/flow.ml +++ b/src/tcp/flow.ml @@ -75,6 +75,8 @@ struct connects: (WIRE.t, ((connection, error) result Lwt.u * Sequence.t * Tcpip.Tcp.Keepalive.t option)) Hashtbl.t; } + let internal_n_channels t = Hashtbl.length t.channels + let listen t ~port ?keepalive cb = if port < 0 || port > 65535 then raise (Invalid_argument (Printf.sprintf "invalid port number (%d)" port)) diff --git a/src/tcp/flow.mli b/src/tcp/flow.mli index 8cbe9531..e36af187 100644 --- a/src/tcp/flow.mli +++ b/src/tcp/flow.mli @@ -20,4 +20,10 @@ module Make (IP:Tcpip.Ip.S) (R:Mirage_random.S) : sig include Tcpip.Tcp.S with type ipaddr = IP.ipaddr val connect : IP.t -> t Lwt.t + + (**/**) + (* the number of open connections *) + val internal_n_channels : t -> int + (**/**) + end diff --git a/test/test.ml b/test/test.ml index fb1fb7af..153c422f 100644 --- a/test/test.ml +++ b/test/test.ml @@ -30,7 +30,8 @@ let suite = [ "deadlock" , Test_deadlock.suite ; "iperf" , Test_iperf.suite ; "iperf_ipv6" , Test_iperf_ipv6.suite ; - "keepalive" , Test_keepalive.suite ; + "keepalive" , Test_keepalive.suite ; + "simultaneous_close", Test_simulatenous_close.suite ] let run test () = diff --git a/test/test_simulatenous_close.ml b/test/test_simulatenous_close.ml new file mode 100644 index 00000000..b6c17f5e --- /dev/null +++ b/test/test_simulatenous_close.ml @@ -0,0 +1,111 @@ +open Common + +open Low_level +open Lwt.Infix + +let close_ack_scenario = + let fsm ip state ~src ~dst data = + match state with + | `WAIT_FOR_SYN -> + let syn = Tcp_wire.get_syn data in + if syn then ( + let id = reply_id_from ~src ~dst data in + WIRE.xmit ~ip id ~syn:true ~rx_ack:(ack data) + ~seq:(Sequence.of_int32 1000000l) ~window + ~options (Cstruct.create 0) + >|= Result.get_ok >>= fun () -> + Lwt.return (Fsm_next `WAIT_FOR_ACK) + ) else + Lwt.return (Fsm_error "Expected initial syn request") + | `WAIT_FOR_ACK -> + if Tcp_wire.get_ack data then ( + let id = reply_id_from ~src ~dst data in + WIRE.xmit ~ip id ~rx_ack:(ack data) ~fin:true ~seq:(Sequence.of_int32 1000001l) + ~window ~options (Cstruct.create 0) + >|= Result.get_ok >>= fun () -> + Lwt.return (Fsm_next `WAIT_FOR_FIN) + ) else + Lwt.return (Fsm_error "Expected final ack of three step dance") + | `WAIT_FOR_FIN -> + if (Tcp_wire.get_fin data) then + let id = reply_id_from ~src ~dst data in + WIRE.xmit ~ip id ~rx_ack:(ack data) ~seq:(Sequence.of_int32 1000002l) + ~window:0 ~options (Cstruct.create 0) + >|= Result.get_ok >>= fun () -> + Lwt.return Fsm_done + else + Lwt.return (Fsm_error "Fin expected") in + + let sut stack _fail_callback = + let conn = VNETIF_STACK.Stackv4.TCPV4.create_connection (VNETIF_STACK.Stackv4.tcpv4 stack) in + or_error "connect" conn (server_ip, 80) >>= fun flow -> + (* We should receive the data *) + VNETIF_STACK.Stackv4.TCPV4.close flow >>= fun () -> + Lwt_unix.sleep 4.0 >>= fun () -> + Alcotest.(check int) "connection is cleaned" 0 (VNETIF_STACK.T4.internal_n_channels ((VNETIF_STACK.Stackv4.tcpv4 stack))); + Lwt.return_unit + in + (`WAIT_FOR_SYN, fsm), sut + + let close_reset_scenario = + let fsm ip state ~src ~dst data = + match state with + | `WAIT_FOR_SYN -> + let syn = Tcp_wire.get_syn data in + if syn then ( + let id = reply_id_from ~src ~dst data in + WIRE.xmit ~ip id ~syn:true ~rx_ack:(ack data) + ~seq:(Sequence.of_int32 1000000l) ~window + ~options (Cstruct.create 0) + >|= Result.get_ok >>= fun () -> + Lwt.return (Fsm_next `WAIT_FOR_ACK) + ) else + Lwt.return (Fsm_error "Expected initial syn request") + | `WAIT_FOR_ACK -> + if Tcp_wire.get_ack data then ( + let id = reply_id_from ~src ~dst data in + WIRE.xmit ~ip id ~rx_ack:(ack data) ~fin:true ~seq:(Sequence.of_int32 1000001l) + ~window ~options (Cstruct.create 0) + >|= Result.get_ok >>= fun () -> + Lwt.return (Fsm_next `WAIT_FOR_FIN) + ) else + Lwt.return (Fsm_error "Expected final ack of three step dance") + | `WAIT_FOR_FIN -> + if (Tcp_wire.get_fin data) then + let id = reply_id_from ~src ~dst data in + WIRE.xmit ~ip id ~rx_ack:None ~rst:true ~seq:(Sequence.of_int32 1000001l) + ~window:0 ~options (Cstruct.create 0) + >|= Result.get_ok >>= fun () -> + Lwt.return (Fsm_next `WAIT_FOR_CHALLENGE_ACK) + else + Lwt.return (Fsm_error "Expected fin") + | `WAIT_FOR_CHALLENGE_ACK -> + if (Tcp_wire.get_ack data) then + let id = reply_id_from ~src ~dst data in + WIRE.xmit ~ip id ~rx_ack:None ~rst:true ~seq:(Sequence.of_int32 1000002l) + ~window:0 ~options (Cstruct.create 0) + >|= Result.get_ok >>= fun () -> + Lwt.return (Fsm_done) + else + Lwt.return (Fsm_error "Expected challenge ack") + in + + let sut stack _fail_callback = + let conn = VNETIF_STACK.Stackv4.TCPV4.create_connection (VNETIF_STACK.Stackv4.tcpv4 stack) in + or_error "connect" conn (server_ip, 80) >>= fun flow -> + (* We should receive the data *) + VNETIF_STACK.Stackv4.TCPV4.close flow >>= fun () -> + Lwt_unix.sleep 4.0 >>= fun () -> + Alcotest.(check int) "connection is cleaned" 0 (VNETIF_STACK.T4.internal_n_channels ((VNETIF_STACK.Stackv4.tcpv4 stack))); + Lwt.return_unit + in + (`WAIT_FOR_SYN, fsm), sut + +let run_test pcap_file ((initial_state, fsm), sut) () = + let backend = VNETIF_STACK.create_backend () in + VNETIF_STACK.record_pcap backend pcap_file (run backend (initial_state, fsm) sut) + +let suite = [ + "close with ack", `Slow, run_test "close_ack.pcap" close_ack_scenario; + "close with reset, challenge ack ok", `Slow, run_test "close_reset.pcap" close_reset_scenario; +] \ No newline at end of file diff --git a/test/vnetif_common.ml b/test/vnetif_common.ml index eb7ee381..a1d6b9e4 100644 --- a/test/vnetif_common.ml +++ b/test/vnetif_common.ml @@ -58,9 +58,15 @@ sig val record_pcap : backend -> string -> (unit -> unit Lwt.t) -> unit Lwt.t end -module VNETIF_STACK (B: Vnetif_backends.Backend): - VNETIF_STACK with type backend = B.t = -struct +module VNETIF_STACK (B: Vnetif_backends.Backend): sig + include VNETIF_STACK with + type backend = B.t + + module T4 : sig + val internal_n_channels : Stackv4.TCPV4.t -> int + end +end += struct type backend = B.t type buffer = B.buffer type 'a io = 'a B.io From bb6e85d29e048a2ecf730e5737e231e4723db596 Mon Sep 17 00:00:00 2001 From: Lucas Pluvinage Date: Thu, 30 Jun 2022 10:52:39 +0200 Subject: [PATCH 03/11] Add timewait timeout for Closing -> Time_wait transition --- src/tcp/state.ml | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/tcp/state.ml b/src/tcp/state.ml index 8a66c097..ce6ef655 100644 --- a/src/tcp/state.ml +++ b/src/tcp/state.ml @@ -151,7 +151,13 @@ module Make(Time:Mirage_time.S) = struct | Fin_wait_2 _, Recv_fin -> Lwt.async (fun () -> timewait t time_wait_time); Time_wait - | Closing a, Recv_ack b -> if diffone b a then Time_wait else Closing a + | Closing a, Recv_ack b -> + if diffone b a then + begin + Lwt.async (fun () -> timewait t time_wait_time); + Time_wait + end + else Closing a | Closing _, Timeout -> t.on_close (); Closed | Closing _, Recv_rst -> t.on_close (); Reset | Time_wait, Timeout -> t.on_close (); Closed From c088044e8e70347ffb858ef93f3ea8264606aafe Mon Sep 17 00:00:00 2001 From: Lucas Pluvinage Date: Thu, 30 Jun 2022 14:11:47 +0200 Subject: [PATCH 04/11] Add retransmit timer when in a Closing state --- src/tcp/segment.ml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tcp/segment.ml b/src/tcp/segment.ml index 75872594..a1639b73 100644 --- a/src/tcp/segment.ml +++ b/src/tcp/segment.ml @@ -287,7 +287,7 @@ module Tx (Time:Mirage_time.S) (Clock:Mirage_clock.MCLOCK) = struct let ontimer xmit st segs wnd seq = match State.state st with | State.Syn_rcvd _ | State.Established | State.Fin_wait_1 _ - | State.Close_wait | State.Last_ack _ -> + | State.Close_wait | State.Closing _ | State.Last_ack _ -> begin match peek_opt_l segs with | None -> Lwt.return Tcptimer.Stoptimer | Some rexmit_seg -> From b9b88d956279bc8b31732cf063f2541fd21c427d Mon Sep 17 00:00:00 2001 From: Lucas Pluvinage Date: Thu, 30 Jun 2022 14:30:39 +0200 Subject: [PATCH 05/11] enable challenge ack even after FIN --- src/tcp/flow.ml | 2 +- src/tcp/segment.ml | 10 ++++------ src/tcp/segment.mli | 1 + 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/tcp/flow.ml b/src/tcp/flow.ml index 1712f6b2..2291b3d8 100644 --- a/src/tcp/flow.ml +++ b/src/tcp/flow.ml @@ -360,7 +360,7 @@ struct in (* The user application transmit buffer *) let utx = UTX.create ~wnd ~txq ~max_size:16384l in - let rxq = RXS.create ~rx_data ~wnd ~state ~tx_ack in + let rxq = RXS.create ~rx_data ~send_ack ~wnd ~state ~tx_ack in (* Set up ACK module *) let ack = ACK.t ~send_ack ~last:(Sequence.succ rx_isn) in (* Set up the keepalive state if requested *) diff --git a/src/tcp/segment.ml b/src/tcp/segment.ml index a1639b73..ad07aa22 100644 --- a/src/tcp/segment.ml +++ b/src/tcp/segment.ml @@ -84,14 +84,15 @@ module Rx(Time:Mirage_time.S) = struct type t = { mutable segs: S.t; rx_data: (Cstruct.t list option * Sequence.t option) Lwt_mvar.t; (* User receive channel *) + send_ack: Sequence.t Lwt_mvar.t; tx_ack: (Sequence.t * int) Lwt_mvar.t; (* Acks of our transmitted segs *) wnd: Window.t; state: State.t; } - let create ~rx_data ~wnd ~state ~tx_ack = + let create ~rx_data ~send_ack ~wnd ~state ~tx_ack = let segs = S.empty in - { segs; rx_data; tx_ack; wnd; state } + { segs; rx_data; send_ack; tx_ack; wnd; state } let pp fmt t = let pp_v fmt seg = @@ -135,10 +136,7 @@ module Rx(Time:Mirage_time.S) = struct let send_challenge_ack q = (* TODO: rfc5961 ACK Throttling *) - (* Is this the correct way trigger an ack? *) - if Lwt_mvar.is_empty q.rx_data - then Lwt_mvar.put q.rx_data (Some [], Some Sequence.zero) - else Lwt.return_unit + Lwt_mvar.put q.send_ack Sequence.zero (* Given an input segment, the window information, and a receive queue, update the window, extract any ready segments into the diff --git a/src/tcp/segment.mli b/src/tcp/segment.mli index 60590434..137da18a 100644 --- a/src/tcp/segment.mli +++ b/src/tcp/segment.mli @@ -38,6 +38,7 @@ module Rx (T:Mirage_time.S) : sig val create: rx_data:(Cstruct.t list option * Sequence.t option) Lwt_mvar.t -> + send_ack:Sequence.t Lwt_mvar.t -> wnd:Window.t -> state:State.t -> tx_ack:(Sequence.t * int) Lwt_mvar.t -> From bebfcf8d4e81c930a15055083dc41fd6e04b6f1d Mon Sep 17 00:00:00 2001 From: Lucas Pluvinage Date: Mon, 18 Jul 2022 20:13:22 +0200 Subject: [PATCH 06/11] trim whitespace --- src/tcp/state.ml | 10 +++++----- test/test.ml | 2 +- test/test_simulatenous_close.ml | 8 ++++---- test/vnetif_common.ml | 4 ++-- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/tcp/state.ml b/src/tcp/state.ml index ce6ef655..dec1c3a7 100644 --- a/src/tcp/state.ml +++ b/src/tcp/state.ml @@ -151,12 +151,12 @@ module Make(Time:Mirage_time.S) = struct | Fin_wait_2 _, Recv_fin -> Lwt.async (fun () -> timewait t time_wait_time); Time_wait - | Closing a, Recv_ack b -> - if diffone b a then - begin + | Closing a, Recv_ack b -> + if diffone b a then + begin Lwt.async (fun () -> timewait t time_wait_time); - Time_wait - end + Time_wait + end else Closing a | Closing _, Timeout -> t.on_close (); Closed | Closing _, Recv_rst -> t.on_close (); Reset diff --git a/test/test.ml b/test/test.ml index 153c422f..92e8c0b8 100644 --- a/test/test.ml +++ b/test/test.ml @@ -30,7 +30,7 @@ let suite = [ "deadlock" , Test_deadlock.suite ; "iperf" , Test_iperf.suite ; "iperf_ipv6" , Test_iperf_ipv6.suite ; - "keepalive" , Test_keepalive.suite ; + "keepalive" , Test_keepalive.suite ; "simultaneous_close", Test_simulatenous_close.suite ] diff --git a/test/test_simulatenous_close.ml b/test/test_simulatenous_close.ml index b6c17f5e..57d02de5 100644 --- a/test/test_simulatenous_close.ml +++ b/test/test_simulatenous_close.ml @@ -3,7 +3,7 @@ open Common open Low_level open Lwt.Infix -let close_ack_scenario = +let close_ack_scenario = let fsm ip state ~src ~dst data = match state with | `WAIT_FOR_SYN -> @@ -78,7 +78,7 @@ let close_ack_scenario = >|= Result.get_ok >>= fun () -> Lwt.return (Fsm_next `WAIT_FOR_CHALLENGE_ACK) else - Lwt.return (Fsm_error "Expected fin") + Lwt.return (Fsm_error "Expected fin") | `WAIT_FOR_CHALLENGE_ACK -> if (Tcp_wire.get_ack data) then let id = reply_id_from ~src ~dst data in @@ -87,9 +87,9 @@ let close_ack_scenario = >|= Result.get_ok >>= fun () -> Lwt.return (Fsm_done) else - Lwt.return (Fsm_error "Expected challenge ack") + Lwt.return (Fsm_error "Expected challenge ack") in - + let sut stack _fail_callback = let conn = VNETIF_STACK.Stackv4.TCPV4.create_connection (VNETIF_STACK.Stackv4.tcpv4 stack) in or_error "connect" conn (server_ip, 80) >>= fun flow -> diff --git a/test/vnetif_common.ml b/test/vnetif_common.ml index a1d6b9e4..e0aad8b5 100644 --- a/test/vnetif_common.ml +++ b/test/vnetif_common.ml @@ -59,10 +59,10 @@ sig end module VNETIF_STACK (B: Vnetif_backends.Backend): sig - include VNETIF_STACK with + include VNETIF_STACK with type backend = B.t - module T4 : sig + module T4 : sig val internal_n_channels : Stackv4.TCPV4.t -> int end end From ba8f23da8a0dcfc087c72af0d6d3c3d753e58a9b Mon Sep 17 00:00:00 2001 From: Lucas Pluvinage Date: Mon, 18 Jul 2022 20:14:47 +0200 Subject: [PATCH 07/11] Rename internal_n_channels to num_open_channels --- src/tcp/flow.ml | 2 +- src/tcp/flow.mli | 2 +- test/test_simulatenous_close.ml | 4 ++-- test/vnetif_common.ml | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/tcp/flow.ml b/src/tcp/flow.ml index 2291b3d8..4b72aa84 100644 --- a/src/tcp/flow.ml +++ b/src/tcp/flow.ml @@ -75,7 +75,7 @@ struct connects: (WIRE.t, ((connection, error) result Lwt.u * Sequence.t * Tcpip.Tcp.Keepalive.t option)) Hashtbl.t; } - let internal_n_channels t = Hashtbl.length t.channels + let num_open_channels t = Hashtbl.length t.channels let listen t ~port ?keepalive cb = if port < 0 || port > 65535 then diff --git a/src/tcp/flow.mli b/src/tcp/flow.mli index e36af187..c3c1c2b2 100644 --- a/src/tcp/flow.mli +++ b/src/tcp/flow.mli @@ -23,7 +23,7 @@ module Make (IP:Tcpip.Ip.S) (**/**) (* the number of open connections *) - val internal_n_channels : t -> int + val num_open_channels : t -> int (**/**) end diff --git a/test/test_simulatenous_close.ml b/test/test_simulatenous_close.ml index 57d02de5..8574057e 100644 --- a/test/test_simulatenous_close.ml +++ b/test/test_simulatenous_close.ml @@ -42,7 +42,7 @@ let close_ack_scenario = (* We should receive the data *) VNETIF_STACK.Stackv4.TCPV4.close flow >>= fun () -> Lwt_unix.sleep 4.0 >>= fun () -> - Alcotest.(check int) "connection is cleaned" 0 (VNETIF_STACK.T4.internal_n_channels ((VNETIF_STACK.Stackv4.tcpv4 stack))); + Alcotest.(check int) "connection is cleaned" 0 (VNETIF_STACK.T4.num_open_channels ((VNETIF_STACK.Stackv4.tcpv4 stack))); Lwt.return_unit in (`WAIT_FOR_SYN, fsm), sut @@ -96,7 +96,7 @@ let close_ack_scenario = (* We should receive the data *) VNETIF_STACK.Stackv4.TCPV4.close flow >>= fun () -> Lwt_unix.sleep 4.0 >>= fun () -> - Alcotest.(check int) "connection is cleaned" 0 (VNETIF_STACK.T4.internal_n_channels ((VNETIF_STACK.Stackv4.tcpv4 stack))); + Alcotest.(check int) "connection is cleaned" 0 (VNETIF_STACK.T4.num_open_channels ((VNETIF_STACK.Stackv4.tcpv4 stack))); Lwt.return_unit in (`WAIT_FOR_SYN, fsm), sut diff --git a/test/vnetif_common.ml b/test/vnetif_common.ml index e0aad8b5..d9da0c9f 100644 --- a/test/vnetif_common.ml +++ b/test/vnetif_common.ml @@ -63,7 +63,7 @@ module VNETIF_STACK (B: Vnetif_backends.Backend): sig type backend = B.t module T4 : sig - val internal_n_channels : Stackv4.TCPV4.t -> int + val num_open_channels : Stackv4.TCPV4.t -> int end end = struct From cd90787da36cb289a6b7f28c1fc394c6a950c8d2 Mon Sep 17 00:00:00 2001 From: Lucas Pluvinage Date: Mon, 18 Jul 2022 20:16:25 +0200 Subject: [PATCH 08/11] Re-add the is_empty check before writing in the send_ack mvar --- src/tcp/segment.ml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/tcp/segment.ml b/src/tcp/segment.ml index ad07aa22..879b727a 100644 --- a/src/tcp/segment.ml +++ b/src/tcp/segment.ml @@ -136,7 +136,9 @@ module Rx(Time:Mirage_time.S) = struct let send_challenge_ack q = (* TODO: rfc5961 ACK Throttling *) - Lwt_mvar.put q.send_ack Sequence.zero + if Lwt_mvar.is_empty q.send_ack + then Lwt_mvar.put q.send_ack Sequence.zero + else Lwt.return_unit (* Given an input segment, the window information, and a receive queue, update the window, extract any ready segments into the From 1bd6270b2b4a3f3974a0402b44aa61a984f2d8ab Mon Sep 17 00:00:00 2001 From: Lucas Pluvinage Date: Mon, 18 Jul 2022 20:18:39 +0200 Subject: [PATCH 09/11] extract timewait transition to a new function --- src/tcp/state.ml | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/tcp/state.ml b/src/tcp/state.ml index dec1c3a7..448ee6e1 100644 --- a/src/tcp/state.ml +++ b/src/tcp/state.ml @@ -119,6 +119,10 @@ module Make(Time:Mirage_time.S) = struct t.on_close (); Lwt.return_unit + let transition_to_timewait t = + Lwt.async (fun () -> timewait t time_wait_time); + Time_wait + let tick t (i:action) = let diffone x y = Sequence.succ y = x in let tstr s (i:action) = @@ -148,15 +152,10 @@ module Make(Time:Mirage_time.S) = struct | Fin_wait_1 _, Recv_rst -> t.on_close (); Reset | Fin_wait_2 i, Recv_ack _ -> Fin_wait_2 (i + 1) | Fin_wait_2 _, Recv_rst -> t.on_close (); Reset - | Fin_wait_2 _, Recv_fin -> - Lwt.async (fun () -> timewait t time_wait_time); - Time_wait + | Fin_wait_2 _, Recv_fin -> transition_to_timewait t | Closing a, Recv_ack b -> if diffone b a then - begin - Lwt.async (fun () -> timewait t time_wait_time); - Time_wait - end + transition_to_timewait t else Closing a | Closing _, Timeout -> t.on_close (); Closed | Closing _, Recv_rst -> t.on_close (); Reset From 1813264ba564136b0400f7f4a66034ea05fa24e7 Mon Sep 17 00:00:00 2001 From: Lucas Pluvinage Date: Thu, 21 Jul 2022 13:07:51 +0200 Subject: [PATCH 10/11] Use the ACK module to send the challenge ACK --- src/tcp/flow.ml | 10 +++++----- src/tcp/segment.ml | 12 +++++------- src/tcp/segment.mli | 4 ++-- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/src/tcp/flow.ml b/src/tcp/flow.ml index 4b72aa84..a96e2400 100644 --- a/src/tcp/flow.ml +++ b/src/tcp/flow.ml @@ -23,9 +23,9 @@ module Log = (val Logs.src_log src : Logs.LOG) module Make(Ip: Tcpip.Ip.S)(Time:Mirage_time.S)(Clock:Mirage_clock.MCLOCK)(Random:Mirage_random.S) = struct - module RXS = Segment.Rx(Time) - module TXS = Segment.Tx(Time)(Clock) module ACK = Ack.Immediate + module RXS = Segment.Rx(Time)(ACK) + module TXS = Segment.Tx(Time)(Clock) module UTX = User_buffer.Tx(Time)(Clock) module WIRE = Wire.Make(Ip) module STATE = State.Make(Time) @@ -358,11 +358,11 @@ struct let txq, _tx_t = TXS.create ~xmit:(Tx.xmit_pcb t.ip id) ~wnd ~state ~rx_ack ~tx_ack ~tx_wnd_update in - (* The user application transmit buffer *) - let utx = UTX.create ~wnd ~txq ~max_size:16384l in - let rxq = RXS.create ~rx_data ~send_ack ~wnd ~state ~tx_ack in (* Set up ACK module *) let ack = ACK.t ~send_ack ~last:(Sequence.succ rx_isn) in + (* The user application transmit buffer *) + let utx = UTX.create ~wnd ~txq ~max_size:16384l in + let rxq = RXS.create ~rx_data ~ack ~wnd ~state ~tx_ack in (* Set up the keepalive state if requested *) let keepalive = match keepalive with | None -> None diff --git a/src/tcp/segment.ml b/src/tcp/segment.ml index 879b727a..2aa9ffe8 100644 --- a/src/tcp/segment.ml +++ b/src/tcp/segment.ml @@ -55,7 +55,7 @@ let rec reset_seq segs = It also looks for control messages and dispatches them to the Rtx queue to ack messages or close channels. *) -module Rx(Time:Mirage_time.S) = struct +module Rx(Time:Mirage_time.S)(ACK: Ack.M) = struct open Tcp_packet module StateTick = State.Make(Time) @@ -84,15 +84,15 @@ module Rx(Time:Mirage_time.S) = struct type t = { mutable segs: S.t; rx_data: (Cstruct.t list option * Sequence.t option) Lwt_mvar.t; (* User receive channel *) - send_ack: Sequence.t Lwt_mvar.t; + ack: ACK.t; tx_ack: (Sequence.t * int) Lwt_mvar.t; (* Acks of our transmitted segs *) wnd: Window.t; state: State.t; } - let create ~rx_data ~send_ack ~wnd ~state ~tx_ack = + let create ~rx_data ~ack ~wnd ~state ~tx_ack = let segs = S.empty in - { segs; rx_data; send_ack; tx_ack; wnd; state } + { segs; rx_data; ack; tx_ack; wnd; state } let pp fmt t = let pp_v fmt seg = @@ -136,9 +136,7 @@ module Rx(Time:Mirage_time.S) = struct let send_challenge_ack q = (* TODO: rfc5961 ACK Throttling *) - if Lwt_mvar.is_empty q.send_ack - then Lwt_mvar.put q.send_ack Sequence.zero - else Lwt.return_unit + ACK.pushack q.ack Sequence.zero (* Given an input segment, the window information, and a receive queue, update the window, extract any ready segments into the diff --git a/src/tcp/segment.mli b/src/tcp/segment.mli index 137da18a..8b5133d5 100644 --- a/src/tcp/segment.mli +++ b/src/tcp/segment.mli @@ -24,7 +24,7 @@ the Rtx queue to ack messages or close channels. *) -module Rx (T:Mirage_time.S) : sig +module Rx (T:Mirage_time.S)(ACK:Ack.M) : sig type segment = { header: Tcp_packet.t; payload: Cstruct.t } (** Individual received TCP segment *) @@ -38,7 +38,7 @@ module Rx (T:Mirage_time.S) : sig val create: rx_data:(Cstruct.t list option * Sequence.t option) Lwt_mvar.t -> - send_ack:Sequence.t Lwt_mvar.t -> + ack:ACK.t -> wnd:Window.t -> state:State.t -> tx_ack:(Sequence.t * int) Lwt_mvar.t -> From aac0e02f3b2e3da016f6f7cfb9790c4949be8456 Mon Sep 17 00:00:00 2001 From: Lucas Pluvinage Date: Wed, 27 Jul 2022 12:16:17 +0200 Subject: [PATCH 11/11] changes for 7.1.2 --- CHANGES.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index cdf12652..9bfa697e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,12 @@ +### v7.1.2 (2022-07-27) + +* TCP: fix memory leaks on connection close in three scenarios (#489 @TheLortex) + - simultanous close: set up the timewait timer in the `Closing(1) - Recv_ack(2) -> Time_wait` + state transition + - client sends a RST instead of a FIN: enable sending a challenge ACK even when the reception + thread is stopped + - client doesn't ACK server's FIN: enable the retransmit timer in the `Closing(_)` state + ### v7.1.1 (2022-05-24) * Ndpv6: demote more logs to debug level (#480 @reynir)