Skip to content

Commit

Permalink
Merge pull request #148 from balrajsingh/master
Browse files Browse the repository at this point in the history
first cut at full RST processing for established connections
  • Loading branch information
samoht committed Jun 9, 2015
2 parents ac7e93e + 827839b commit aab5709
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 25 deletions.
22 changes: 13 additions & 9 deletions tcp/pcb.ml
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,18 @@ struct

(* Process an incoming TCP packet that has an active PCB *)
let input _t pkt (pcb,_) =
(* URG_TODO: Deal correctly with incomming RST segment *)
let sequence = Sequence.of_int32 (Tcp_wire.get_tcp_sequence pkt) in
let ack_number =
Sequence.of_int32 (Tcp_wire.get_tcp_ack_number pkt)
in
let fin = Tcp_wire.get_fin pkt in
let syn = Tcp_wire.get_syn pkt in
let ack = Tcp_wire.get_ack pkt in
let rst = Tcp_wire.get_rst pkt in
let window = Tcp_wire.get_tcp_window pkt in
let data = Wire.get_payload pkt in
let seg =
RXS.segment ~sequence ~fin ~syn ~ack ~ack_number ~window ~data
RXS.segment ~sequence ~fin ~syn ~rst ~ack ~ack_number ~window ~data
in
let { rxq; _ } = pcb in
(* Coalesce any outstanding segments and retrieve ready segments *)
Expand Down Expand Up @@ -193,13 +193,16 @@ struct

module Wnd = struct

let thread ~urx:_ ~utx ~wnd:_ ~tx_wnd_update =
let thread ~urx:_ ~utx ~wnd:_ ~state ~tx_wnd_update =
(* Monitor our transmit window when updates are received
remotely, and tell the application that new space is
available when it is blocked *)
let rec tx_window_t () =
Lwt_mvar.take tx_wnd_update >>= fun tx_wnd ->
UTX.free utx tx_wnd >>= fun () ->
begin match State.state state with
| State.Reset -> UTX.reset utx
| _ -> UTX.free utx tx_wnd
end >>= fun () ->
tx_window_t ()
in
tx_window_t ()
Expand Down Expand Up @@ -303,7 +306,7 @@ struct
let th =
(Tx.thread t pcb ~send_ack ~rx_ack) <?>
(Rx.thread pcb ~rx_data) <?>
(Wnd.thread ~utx ~urx ~wnd ~tx_wnd_update)
(Wnd.thread ~utx ~urx ~wnd ~state ~tx_wnd_update)
in
pcb_allocs := !pcb_allocs + 1;
th_allocs := !th_allocs + 1;
Expand Down Expand Up @@ -470,7 +473,6 @@ struct
min 4000 (min (Window.tx_mss pcb.wnd)
(Int32.to_int (UTX.available pcb.utx)))

(* URG_TODO: raise exception if not in Established or Close_wait state *)
(* Wait for more write space *)
let write_wait_for pcb sz =
UTX.wait_for pcb.utx (Int32.of_int sz)
Expand All @@ -486,10 +488,12 @@ struct
let remaing_bit = Cstruct.sub data av_len (len - av_len) in
writefn pcb wfn first_bit >>= fun () ->
writefn pcb wfn remaing_bit
| _ -> wfn [data]
| _ ->
match State.state pcb.state with
| State.Established | State.Close_wait -> wfn [data]
(* URG_TODO: return error instead of dropping silently *)
| _ -> return_unit

(* URG_TODO: raise exception when trying to write to closed connection
instead of quietly returning *)
(* Blocking write on a PCB *)
let write pcb data = writefn pcb (UTX.write pcb.utx) data
let writev pcb data = Lwt_list.iter_s (fun d -> write pcb d) data
Expand Down
43 changes: 33 additions & 10 deletions tcp/segment.ml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ module Rx(Time:V1_LWT.TIME) = struct
fin: bool;
syn: bool;
ack: bool;
rst: bool;
ack_number: Sequence.t;
window: int;
}
Expand All @@ -52,8 +53,8 @@ module Rx(Time:V1_LWT.TIME) = struct
(Sequence.to_string seg.sequence) seg.fin seg.syn seg.ack
(Sequence.to_string seg.ack_number) seg.window

let segment ~sequence ~fin ~syn ~ack ~ack_number ~window ~data =
{ sequence; fin; syn; ack; ack_number; window; data }
let segment ~sequence ~fin ~syn ~rst ~ack ~ack_number ~window ~data =
{ sequence; fin; syn; ack; rst; ack_number; window; data }

let len seg =
(Cstruct.len seg.data) +
Expand Down Expand Up @@ -104,7 +105,20 @@ module Rx(Time:V1_LWT.TIME) = struct
let input (q:t) seg =
(* Check that the segment fits into the valid receive window *)
let force_ack = ref false in
if not (Window.valid q.wnd seg.sequence) then return_unit
(* TODO check that this test for a valid RST is valid *)
if (seg.rst && (Window.valid q.wnd seg.sequence)) then begin
StateTick.tick q.state State.Recv_rst;
(* Dump all the received but out of order frames *)
q.segs <- S.empty;
(* Signal TX side *)
let txalert ack_svcd = match ack_svcd with
| true -> Lwt_mvar.put q.tx_ack ((Window.ack_seq q.wnd), (Window.ack_win q.wnd))
| false -> return_unit
in
txalert (Window.ack_serviced q.wnd) >>= fun () ->
(* Use the fin path to inform the application of end of stream *)
Lwt_mvar.put q.rx_data (None, Some 0)
end else if not (Window.valid q.wnd seg.sequence) then return_unit
else
(* Insert the latest segment *)
let segs = S.add seg q.segs in
Expand Down Expand Up @@ -327,13 +341,22 @@ module Tx (Time:V1_LWT.TIME) (Clock:V1.CLOCK) = struct
Window.set_ack_serviced q.wnd true;
let seq = Window.ack_seq q.wnd in
let win = Window.ack_win q.wnd in
let ack_len = Sequence.sub seq (Window.tx_una q.wnd) in
let dupacktest () =
0l = Sequence.to_int32 ack_len &&
Window.tx_wnd_unscaled q.wnd = Int32.of_int win &&
not (Lwt_sequence.is_empty q.segs)
in
serviceack (dupacktest ()) ack_len seq win;
begin match State.state q.state with
| State.Reset -> let rec empty_segs segs =
match Lwt_sequence.take_opt_l segs with
| None -> ()
| Some s -> empty_segs segs
in
empty_segs q.segs
| _ ->
let ack_len = Sequence.sub seq (Window.tx_una q.wnd) in
let dupacktest () =
0l = Sequence.to_int32 ack_len &&
Window.tx_wnd_unscaled q.wnd = Int32.of_int win &&
not (Lwt_sequence.is_empty q.segs)
in
serviceack (dupacktest ()) ack_len seq win
end;
(* Inform the window thread of updates to the transmit window *)
Lwt_mvar.put q.tx_wnd_update win >>= fun () ->
tx_ack_t ()
Expand Down
2 changes: 1 addition & 1 deletion tcp/segment.mli
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ module Rx (T:V1_LWT.TIME) : sig
val string_of_segment: segment -> string

val segment:
sequence:Sequence.t -> fin:bool -> syn:bool -> ack:bool ->
sequence:Sequence.t -> fin:bool -> syn:bool -> rst:bool -> ack:bool ->
ack_number:Sequence.t -> window:int -> data:Cstruct.t ->
segment

Expand Down
15 changes: 11 additions & 4 deletions tcp/state.ml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type action =
| Recv_synack of Sequence.t
| Recv_ack of Sequence.t
| Recv_fin
| Recv_finack of Sequence.t
(* | Recv_finack of Sequence.t *)
| Send_syn of Sequence.t
| Send_synack of Sequence.t
| Send_rst
Expand All @@ -42,6 +42,7 @@ type tcpstate =
| Fin_wait_2 of int
| Closing of Sequence.t
| Time_wait
| Reset

type close_cb = unit -> unit

Expand All @@ -61,7 +62,7 @@ let string_of_action = function
| Recv_synack x -> "Recv_synack " ^ (Sequence.to_string x)
| Recv_ack x -> "Recv_ack " ^ (Sequence.to_string x)
| Recv_fin -> "Recv_fin"
| Recv_finack x -> "Recv_finack " ^ (Sequence.to_string x)
(* | Recv_finack x -> "Recv_finack " ^ (Sequence.to_string x) *)
| Send_syn x -> "Send_syn " ^ (Sequence.to_string x)
| Send_synack x -> "Send_synack " ^ (Sequence.to_string x)
| Send_rst -> "Send_rst"
Expand Down Expand Up @@ -127,7 +128,8 @@ module Make(Time:V1_LWT.TIME) = struct
| Established, Recv_ack _ -> Established
| Established, Send_fin a -> Fin_wait_1 a
| Established, Recv_fin -> Close_wait
| Established, Timeout -> t.on_close (); Closed
| Established, Timeout -> t.on_close (); Closed
| Established, Recv_rst -> t.on_close (); Reset
| Fin_wait_1 a, Recv_ack b ->
if diffone b a then
let count = 0 in
Expand All @@ -136,16 +138,21 @@ module Make(Time:V1_LWT.TIME) = struct
else
Fin_wait_1 a
| Fin_wait_1 a, Recv_fin -> Closing a
| Fin_wait_1 a, Recv_finack b -> if diffone b a then Time_wait else Fin_wait_1 a
| Fin_wait_1 _, Timeout -> t.on_close (); Closed
| Fin_wait_1 _, Recv_rst -> t.on_close (); Reset
| Fin_wait_2 i, Recv_ack _ -> Fin_wait_2 (i + 1)
| Fin_wait_2 _, Recv_fin -> let _ = timewait t time_wait_time in Time_wait
| Fin_wait_2 _, Recv_rst -> t.on_close (); Reset
| Closing a, Recv_ack b -> if diffone b a then Time_wait else Closing a
| Closing _, Timeout -> t.on_close (); Closed
| Closing _, Recv_rst -> t.on_close (); Reset
| Time_wait, Timeout -> t.on_close (); Closed
| Close_wait, Send_fin a -> Last_ack a
| Close_wait, Timeout -> t.on_close (); Closed
| Close_wait, Recv_rst -> t.on_close (); Reset
| Last_ack a, Recv_ack b -> if diffone b a then (t.on_close (); Closed) else Last_ack a
| Last_ack _, Timeout -> t.on_close (); Closed
| Last_ack _, Recv_rst -> t.on_close (); Reset
| x, _ -> x
in
t.state <- tstr t.state i
Expand Down
3 changes: 2 additions & 1 deletion tcp/state.mli
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type action =
| Recv_synack of Sequence.t
| Recv_ack of Sequence.t
| Recv_fin
| Recv_finack of Sequence.t
(* | Recv_finack of Sequence.t *)
| Send_syn of Sequence.t
| Send_synack of Sequence.t
| Send_rst
Expand All @@ -41,6 +41,7 @@ type tcpstate =
| Fin_wait_2 of int
| Closing of Sequence.t
| Time_wait
| Reset

val string_of_tcpstate : tcpstate -> string

Expand Down
15 changes: 15 additions & 0 deletions tcp/user_buffer.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ open Lwt
queue size changes *)
module Rx = struct

(* TODO: check that flow control works on the rx side - ie if the application
stops taking data the window closes so the other side stops sending *)

type t = {
q: Cstruct.t option Lwt_sequence.t;
wnd: Window.t;
Expand Down Expand Up @@ -307,6 +310,7 @@ module Tx(Time:V1_LWT.TIME)(Clock:V1.CLOCK) = struct
| None -> return_unit
| Some w ->
Lwt.wakeup w ();
(* TODO: check if this should wake all writers not just one *)
return_unit

(* Indicate that more bytes are available for waiting writers.
Expand All @@ -317,4 +321,15 @@ module Tx(Time:V1_LWT.TIME)(Clock:V1.CLOCK) = struct
clear_buffer t >>= fun () ->
inform_app t

let rec dump_buffer t =
match Lwt_sequence.is_empty t.buffer with
| true -> return_unit
| false ->
let _ = Lwt_sequence.take_l t.buffer in
dump_buffer t

let reset t =
dump_buffer t >>= fun () ->
inform_app t

end
1 change: 1 addition & 0 deletions tcp/user_buffer.mli
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ module Tx(Time:V1_LWT.TIME)(Clock:V1.CLOCK) : sig
val write: t -> Cstruct.t list -> unit Lwt.t
val write_nodelay: t -> Cstruct.t list -> unit Lwt.t
val free: t -> int -> unit Lwt.t
val reset: t -> unit Lwt.t
end

0 comments on commit aab5709

Please sign in to comment.