From 838097c873ba3310e6eec2e10d8eb8752c0652ee Mon Sep 17 00:00:00 2001 From: Balraj Singh Date: Sun, 7 Jun 2015 08:58:40 +0100 Subject: [PATCH 1/2] first cut at full RST processing for established connections --- tcp/pcb.ml | 22 +++++++++++++--------- tcp/segment.ml | 43 +++++++++++++++++++++++++++++++++---------- tcp/segment.mli | 2 +- tcp/state.ml | 15 +++++++++++---- tcp/state.mli | 3 ++- tcp/user_buffer.ml | 15 +++++++++++++++ tcp/user_buffer.mli | 1 + 7 files changed, 76 insertions(+), 25 deletions(-) diff --git a/tcp/pcb.ml b/tcp/pcb.ml index d34b0243c..b5edd90dd 100644 --- a/tcp/pcb.ml +++ b/tcp/pcb.ml @@ -137,7 +137,6 @@ struct (* Process an incoming TCP packet that has an active PCB *) let input _t pkt (pcb,_) = - (* URG_TODO: Deal correctly with incomming RST segment *) let sequence = Sequence.of_int32 (Tcp_wire.get_tcp_sequence pkt) in let ack_number = Sequence.of_int32 (Tcp_wire.get_tcp_ack_number pkt) @@ -145,10 +144,11 @@ struct let fin = Tcp_wire.get_fin pkt in let syn = Tcp_wire.get_syn pkt in let ack = Tcp_wire.get_ack pkt in + let rst = Tcp_wire.get_rst pkt in let window = Tcp_wire.get_tcp_window pkt in let data = Wire.get_payload pkt in let seg = - RXS.segment ~sequence ~fin ~syn ~ack ~ack_number ~window ~data + RXS.segment ~sequence ~fin ~syn ~rst ~ack ~ack_number ~window ~data in let { rxq; _ } = pcb in (* Coalesce any outstanding segments and retrieve ready segments *) @@ -193,13 +193,16 @@ struct module Wnd = struct - let thread ~urx:_ ~utx ~wnd:_ ~tx_wnd_update = + let thread ~urx:_ ~utx ~wnd:_ ~state ~tx_wnd_update = (* Monitor our transmit window when updates are received remotely, and tell the application that new space is available when it is blocked *) let rec tx_window_t () = Lwt_mvar.take tx_wnd_update >>= fun tx_wnd -> - UTX.free utx tx_wnd >>= fun () -> + begin match State.state state with + | State.Reset -> UTX.reset utx + | _ -> UTX.free utx tx_wnd + end >>= fun () -> tx_window_t () in tx_window_t () @@ -303,7 +306,7 @@ struct let th = (Tx.thread t pcb ~send_ack ~rx_ack) (Rx.thread pcb ~rx_data) - (Wnd.thread ~utx ~urx ~wnd ~tx_wnd_update) + (Wnd.thread ~utx ~urx ~wnd ~state ~tx_wnd_update) in pcb_allocs := !pcb_allocs + 1; th_allocs := !th_allocs + 1; @@ -470,7 +473,6 @@ struct min 4000 (min (Window.tx_mss pcb.wnd) (Int32.to_int (UTX.available pcb.utx))) - (* URG_TODO: raise exception if not in Established or Close_wait state *) (* Wait for more write space *) let write_wait_for pcb sz = UTX.wait_for pcb.utx (Int32.of_int sz) @@ -486,10 +488,12 @@ struct let remaing_bit = Cstruct.sub data av_len (len - av_len) in writefn pcb wfn first_bit >>= fun () -> writefn pcb wfn remaing_bit - | _ -> wfn [data] + | _ -> + match State.state pcb.state with + | State.Established | State.Close_wait -> wfn [data] + (* URG_TODO: return error instead of dropping silently *) + | _ -> return_unit - (* URG_TODO: raise exception when trying to write to closed connection - instead of quietly returning *) (* Blocking write on a PCB *) let write pcb data = writefn pcb (UTX.write pcb.utx) data let writev pcb data = Lwt_list.iter_s (fun d -> write pcb d) data diff --git a/tcp/segment.ml b/tcp/segment.ml index 851d06667..b16333eff 100644 --- a/tcp/segment.ml +++ b/tcp/segment.ml @@ -43,6 +43,7 @@ module Rx(Time:V1_LWT.TIME) = struct fin: bool; syn: bool; ack: bool; + rst: bool; ack_number: Sequence.t; window: int; } @@ -52,8 +53,8 @@ module Rx(Time:V1_LWT.TIME) = struct (Sequence.to_string seg.sequence) seg.fin seg.syn seg.ack (Sequence.to_string seg.ack_number) seg.window - let segment ~sequence ~fin ~syn ~ack ~ack_number ~window ~data = - { sequence; fin; syn; ack; ack_number; window; data } + let segment ~sequence ~fin ~syn ~rst ~ack ~ack_number ~window ~data = + { sequence; fin; syn; ack; rst; ack_number; window; data } let len seg = (Cstruct.len seg.data) + @@ -104,7 +105,20 @@ module Rx(Time:V1_LWT.TIME) = struct let input (q:t) seg = (* Check that the segment fits into the valid receive window *) let force_ack = ref false in - if not (Window.valid q.wnd seg.sequence) then return_unit + (* URG_TODO also check that this is a valid RST i.e. the seq/ack numbers are in the window *) + if (seg.rst) then begin + StateTick.tick q.state State.Recv_rst; + (* Dump all the received but out of order frames *) + q.segs <- S.empty; + (* Signal TX side *) + let txalert ack_svcd = match ack_svcd with + | true -> Lwt_mvar.put q.tx_ack ((Window.ack_seq q.wnd), (Window.ack_win q.wnd)) + | false -> return_unit + in + txalert (Window.ack_serviced q.wnd) >>= fun () -> + (* Use the fin path to inform the application of end of stream *) + Lwt_mvar.put q.rx_data (None, Some 0) + end else if not (Window.valid q.wnd seg.sequence) then return_unit else (* Insert the latest segment *) let segs = S.add seg q.segs in @@ -327,13 +341,22 @@ module Tx (Time:V1_LWT.TIME) (Clock:V1.CLOCK) = struct Window.set_ack_serviced q.wnd true; let seq = Window.ack_seq q.wnd in let win = Window.ack_win q.wnd in - let ack_len = Sequence.sub seq (Window.tx_una q.wnd) in - let dupacktest () = - 0l = Sequence.to_int32 ack_len && - Window.tx_wnd_unscaled q.wnd = Int32.of_int win && - not (Lwt_sequence.is_empty q.segs) - in - serviceack (dupacktest ()) ack_len seq win; + begin match State.state q.state with + | State.Reset -> let rec empty_segs segs = + match Lwt_sequence.take_opt_l segs with + | None -> () + | Some s -> empty_segs segs + in + empty_segs q.segs + | _ -> + let ack_len = Sequence.sub seq (Window.tx_una q.wnd) in + let dupacktest () = + 0l = Sequence.to_int32 ack_len && + Window.tx_wnd_unscaled q.wnd = Int32.of_int win && + not (Lwt_sequence.is_empty q.segs) + in + serviceack (dupacktest ()) ack_len seq win + end; (* Inform the window thread of updates to the transmit window *) Lwt_mvar.put q.tx_wnd_update win >>= fun () -> tx_ack_t () diff --git a/tcp/segment.mli b/tcp/segment.mli index 1278dd91f..c39841720 100644 --- a/tcp/segment.mli +++ b/tcp/segment.mli @@ -31,7 +31,7 @@ module Rx (T:V1_LWT.TIME) : sig val string_of_segment: segment -> string val segment: - sequence:Sequence.t -> fin:bool -> syn:bool -> ack:bool -> + sequence:Sequence.t -> fin:bool -> syn:bool -> rst:bool -> ack:bool -> ack_number:Sequence.t -> window:int -> data:Cstruct.t -> segment diff --git a/tcp/state.ml b/tcp/state.ml index 58eeae781..629ec9dd5 100644 --- a/tcp/state.ml +++ b/tcp/state.ml @@ -23,7 +23,7 @@ type action = | Recv_synack of Sequence.t | Recv_ack of Sequence.t | Recv_fin - | Recv_finack of Sequence.t + (* | Recv_finack of Sequence.t *) | Send_syn of Sequence.t | Send_synack of Sequence.t | Send_rst @@ -42,6 +42,7 @@ type tcpstate = | Fin_wait_2 of int | Closing of Sequence.t | Time_wait + | Reset type close_cb = unit -> unit @@ -61,7 +62,7 @@ let string_of_action = function | Recv_synack x -> "Recv_synack " ^ (Sequence.to_string x) | Recv_ack x -> "Recv_ack " ^ (Sequence.to_string x) | Recv_fin -> "Recv_fin" - | Recv_finack x -> "Recv_finack " ^ (Sequence.to_string x) + (* | Recv_finack x -> "Recv_finack " ^ (Sequence.to_string x) *) | Send_syn x -> "Send_syn " ^ (Sequence.to_string x) | Send_synack x -> "Send_synack " ^ (Sequence.to_string x) | Send_rst -> "Send_rst" @@ -127,7 +128,8 @@ module Make(Time:V1_LWT.TIME) = struct | Established, Recv_ack _ -> Established | Established, Send_fin a -> Fin_wait_1 a | Established, Recv_fin -> Close_wait - | Established, Timeout -> t.on_close (); Closed + | Established, Timeout -> t.on_close (); Closed + | Established, Recv_rst -> t.on_close (); Reset | Fin_wait_1 a, Recv_ack b -> if diffone b a then let count = 0 in @@ -136,16 +138,21 @@ module Make(Time:V1_LWT.TIME) = struct else Fin_wait_1 a | Fin_wait_1 a, Recv_fin -> Closing a - | Fin_wait_1 a, Recv_finack b -> if diffone b a then Time_wait else Fin_wait_1 a | Fin_wait_1 _, Timeout -> t.on_close (); Closed + | Fin_wait_1 _, Recv_rst -> t.on_close (); Reset | Fin_wait_2 i, Recv_ack _ -> Fin_wait_2 (i + 1) | Fin_wait_2 _, Recv_fin -> let _ = timewait t time_wait_time in Time_wait + | Fin_wait_2 _, Recv_rst -> t.on_close (); Reset | Closing a, Recv_ack b -> if diffone b a then Time_wait else Closing a + | Closing _, Timeout -> t.on_close (); Closed + | Closing _, Recv_rst -> t.on_close (); Reset | Time_wait, Timeout -> t.on_close (); Closed | Close_wait, Send_fin a -> Last_ack a | Close_wait, Timeout -> t.on_close (); Closed + | Close_wait, Recv_rst -> t.on_close (); Reset | Last_ack a, Recv_ack b -> if diffone b a then (t.on_close (); Closed) else Last_ack a | Last_ack _, Timeout -> t.on_close (); Closed + | Last_ack _, Recv_rst -> t.on_close (); Reset | x, _ -> x in t.state <- tstr t.state i diff --git a/tcp/state.mli b/tcp/state.mli index 83b7886ac..d914b69f2 100644 --- a/tcp/state.mli +++ b/tcp/state.mli @@ -20,7 +20,7 @@ type action = | Recv_synack of Sequence.t | Recv_ack of Sequence.t | Recv_fin - | Recv_finack of Sequence.t + (* | Recv_finack of Sequence.t *) | Send_syn of Sequence.t | Send_synack of Sequence.t | Send_rst @@ -41,6 +41,7 @@ type tcpstate = | Fin_wait_2 of int | Closing of Sequence.t | Time_wait + | Reset val string_of_tcpstate : tcpstate -> string diff --git a/tcp/user_buffer.ml b/tcp/user_buffer.ml index 0525becbc..b5223f001 100644 --- a/tcp/user_buffer.ml +++ b/tcp/user_buffer.ml @@ -22,6 +22,9 @@ open Lwt queue size changes *) module Rx = struct + (* TODO: check that flow control works on the rx side - ie if the application + stops taking data the window closes so the other side stops sending *) + type t = { q: Cstruct.t option Lwt_sequence.t; wnd: Window.t; @@ -307,6 +310,7 @@ module Tx(Time:V1_LWT.TIME)(Clock:V1.CLOCK) = struct | None -> return_unit | Some w -> Lwt.wakeup w (); + (* TODO: check if this should wake all writers not just one *) return_unit (* Indicate that more bytes are available for waiting writers. @@ -317,4 +321,15 @@ module Tx(Time:V1_LWT.TIME)(Clock:V1.CLOCK) = struct clear_buffer t >>= fun () -> inform_app t + let rec dump_buffer t = + match Lwt_sequence.is_empty t.buffer with + | true -> return_unit + | false -> + let _ = Lwt_sequence.take_l t.buffer in + dump_buffer t + + let reset t = + dump_buffer t >>= fun () -> + inform_app t + end diff --git a/tcp/user_buffer.mli b/tcp/user_buffer.mli index d1916ef86..5a16ce8bd 100644 --- a/tcp/user_buffer.mli +++ b/tcp/user_buffer.mli @@ -43,4 +43,5 @@ module Tx(Time:V1_LWT.TIME)(Clock:V1.CLOCK) : sig val write: t -> Cstruct.t list -> unit Lwt.t val write_nodelay: t -> Cstruct.t list -> unit Lwt.t val free: t -> int -> unit Lwt.t + val reset: t -> unit Lwt.t end From 827839b7b4265a8ed1ce869efea6293fd17bd1ab Mon Sep 17 00:00:00 2001 From: Balraj Singh Date: Sun, 7 Jun 2015 18:37:29 +0100 Subject: [PATCH 2/2] added RST acceptance test --- tcp/segment.ml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tcp/segment.ml b/tcp/segment.ml index b16333eff..ce148bb81 100644 --- a/tcp/segment.ml +++ b/tcp/segment.ml @@ -105,8 +105,8 @@ module Rx(Time:V1_LWT.TIME) = struct let input (q:t) seg = (* Check that the segment fits into the valid receive window *) let force_ack = ref false in - (* URG_TODO also check that this is a valid RST i.e. the seq/ack numbers are in the window *) - if (seg.rst) then begin + (* TODO check that this test for a valid RST is valid *) + if (seg.rst && (Window.valid q.wnd seg.sequence)) then begin StateTick.tick q.state State.Recv_rst; (* Dump all the received but out of order frames *) q.segs <- S.empty;