Skip to content

Commit

Permalink
Vnode simplification (#102)
Browse files Browse the repository at this point in the history
* reorder and categorize api
* Consolidated special cases in vnode_req record
* Restricted vnode access through its fsm and vnode_master only. Removed raw access. Fixed worker pool test.
* Removed api to send_event_unreliable
* Removed unreliable cast
* Removed monitor interface
  • Loading branch information
albsch authored and Albert Schimpf committed Jul 19, 2021
1 parent fadf987 commit 9c3632a
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 181 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Build upon the essence of Riak KV's core with an up-to-date, modular and extensi
![Build](https://img.shields.io/badge/build-rebar3%203.15.1-brightgreen.svg)

[![Hex pm](https://img.shields.io/hexpm/v/riak_core_lite.svg)](https://hex.pm/packages/riak_core_lite)
![Erlang CI](https://github.com/albsch/riak_core_lite/workflows/Erlang%20CI/badge.svg)
![Erlang CI](https://github.com/riak-core-lite/riak_core_lite/workflows/Erlang%20CI/badge.svg)
[![Coverage Status](https://coveralls.io/repos/github/riak-core-lite/riak_core_lite/badge.svg?branch=master)](https://coveralls.io/github/riak-core-lite/riak_core_lite?branch=master)


Expand Down
14 changes: 2 additions & 12 deletions include/riak_core_vnode.hrl
Original file line number Diff line number Diff line change
@@ -1,16 +1,6 @@
-type sender_type() :: fsm | server | raw.
-type sender_type() :: fsm | server .

-type sender() :: {sender_type(), reference() | tuple(),
pid()} |
{fsm, undefined, pid()} |
{server, undefined, undefined} |
ignore.

%% TODO: Double-check that these special cases are kosher

% what are these special cases and what is the reference used for??

% special case in riak_core_vnode_master.erl
-type sender() :: {sender_type(), reference() | tuple() | ignore_ref , pid()} | ignore.

-type partition() :: chash:index_as_int().

Expand Down
2 changes: 1 addition & 1 deletion src/riak_core.app.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{application,
riak_core,
[{description, "Riak Core Lite"},
{vsn, "0.10.3"},
{vsn, "0.10.4"},
{applications,
[kernel, stdlib, crypto, os_mon, poolboy]},
{mod, {riak_core_app, []}},
Expand Down
28 changes: 4 additions & 24 deletions src/riak_core_send_msg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,33 +23,13 @@
-module(riak_core_send_msg).

-export([reply_unreliable/2,
cast_unreliable/2,
send_event_unreliable/2,
bang_unreliable/2]).

-ifdef(TEST).

-ifdef(PULSE).

-compile(export_all).

-compile({parse_transform, pulse_instrument}).

-compile({pulse_replace_module,
[{gen_fsm, pulse_gen_fsm},
{gen_fsm_compat, pulse_gen_fsm},
{gen_server, pulse_gen_server}]}).

-endif.

-endif.
send_event_unreliable/2]).

%% NOTE: We'ed peeked inside gen_server.erl's guts to see its internals.
reply_unreliable({To, Tag}, Reply) ->
bang_unreliable(To, {Tag, Reply}).

cast_unreliable(Dest, Request) ->
bang_unreliable(Dest, {'$gen_cast', Request}).
bang_unreliable(To, {Tag, Reply});
reply_unreliable(To, Reply) ->
bang_unreliable(To, Reply).

%% NOTE: We'ed peeked inside gen_fsm.erl's guts to see its internals.
send_event_unreliable({global, _Name} = GlobalTo,
Expand Down
165 changes: 77 additions & 88 deletions src/riak_core_vnode.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
terminate/3,
code_change/4]).

-export([reply/2, monitor/1]).
-export([reply/2]).

-export([get_mod_index/1,
get_modstate/1,
Expand Down Expand Up @@ -260,115 +260,118 @@ start_link(Mod, Index, InitialInactivityTimeout,
[Mod, Index, InitialInactivityTimeout, Forward],
[]).

%% =========================
%% sync_send_event
%% =========================

%% #1 - State started
wait_for_init(Vnode) ->
gen_fsm_compat:sync_send_event(Vnode,
wait_for_init,
infinity).
wait_for_init(Vnode) -> gen_fsm_compat:sync_send_event(Vnode, wait_for_init, infinity).

%% =========================
%% send_event
%% =========================

%% #2 -
%% #2.1 -
%% Send a command message for the vnode module by Pid -
%% typically to do some deferred processing after returning yourself
send_command(Pid, Request) ->
gen_fsm_compat:send_event(Pid,
#riak_vnode_req_v1{request = Request}).
send_command(Pid, Request) -> gen_fsm_compat:send_event(Pid, #riak_vnode_req_v1{request = Request}).

%% #2.2 -
handoff_error(Vnode, Err, Reason) -> gen_fsm_compat:send_event(Vnode, {handoff_error, Err, Reason}).

%% #2.3 - riak_core_vnode_master - send_an_event
send_an_event(VNode, Event) -> gen_fsm_compat:send_event(VNode, Event).

%% #2.4 - riak_core_vnode_master - handle_cast/handle_call
%riak_core_vnode_master - command2
%riak_core_vnode_proxy - handle_call
send_req(VNode, Req) -> gen_fsm_compat:send_event(VNode, Req).

%% #2.5 - riak:core_handoff_sender - start_fold_
-spec handoff_complete(VNode :: pid()) -> ok.
handoff_complete(VNode) -> gen_fsm_compat:send_event(VNode, handoff_complete).

%% #3 -
handoff_error(Vnode, Err, Reason) ->
gen_fsm_compat:send_event(Vnode,
{handoff_error, Err, Reason}).
%% #2.6 - riak:core_handoff_sender - start_fold_
-spec resize_transfer_complete(VNode :: pid(), NotSentAcc :: term()) -> ok.
resize_transfer_complete(VNode, NotSentAcc) -> gen_fsm_compat:send_event(VNode, {resize_transfer_complete, NotSentAcc}).

%% #4 -
%% #2.7 - riak_core_vnode_proxy - handle_cast
unregistered(VNode) -> gen_fsm_compat:send_event(VNode, unregistered).

%% =========================
%% sync_send_all_state_event
%% =========================

%% #3.1
get_mod_index(VNode) ->
gen_fsm_compat:sync_send_all_state_event(VNode,
get_mod_index).
%% #3.2
core_status(VNode) ->
gen_fsm_compat:sync_send_all_state_event(VNode, core_status).

%% #5
%% #3.3 - riak_core_handoff_receiver - process_message
handoff_data(VNode, MsgData, VNodeTimeout) ->
gen_fsm_compat:sync_send_all_state_event(VNode, {handoff_data, MsgData}, VNodeTimeout).

%% =========================
%% send_all_state_event
%% =========================

%% #4.1
set_forwarding(VNode, ForwardTo) ->
gen_fsm_compat:send_all_state_event(VNode,
{set_forwarding, ForwardTo}).

%% #6
%% #4.2
trigger_handoff(VNode, TargetIdx, TargetNode) ->
gen_fsm_compat:send_all_state_event(VNode,
{trigger_handoff,
TargetIdx,
TargetNode}).

%% #7
%% #4.3
trigger_handoff(VNode, TargetNode) ->
gen_fsm_compat:send_all_state_event(VNode,
{trigger_handoff, TargetNode}).

%% #8
%% #4.4
trigger_delete(VNode) ->
gen_fsm_compat:send_all_state_event(VNode,
trigger_delete).

%% #9
core_status(VNode) ->
gen_fsm_compat:sync_send_all_state_event(VNode,
core_status).

%% #10
%% Sends a command to the FSM that called it after Time
%% has passed.
-spec send_command_after(integer(),
term()) -> reference().

send_command_after(Time, Request) ->
gen_fsm_compat:send_event_after(Time,
#riak_vnode_req_v1{request = Request}).

%%%%%%% %new APIs
%% #11 - riak_core_vnode_manager - handle_vnode_event
%% #4.5 - riak_core_vnode_manager - handle_vnode_event
cast_finish_handoff(VNode) ->
gen_fsm_compat:send_all_state_event(VNode,
finish_handoff).

%% #12 - riak_core_vnode_manager - handle_vnode_event
%% #4.6 - riak_core_vnode_manager - handle_vnode_event
cancel_handoff(VNode) ->
gen_fsm_compat:send_all_state_event(VNode,
cancel_handoff).

%% #13 - riak_core_vnode_master - send_an_event
send_an_event(VNode, Event) ->
gen_fsm_compat:send_event(VNode, Event).

%% #14 - riak_core_vnode_master - handle_cast/handle_call

%riak_core_vnode_master - command2
%riak_core_vnode_proxy - handle_call
send_req(VNode, Req) ->
gen_fsm_compat:send_event(VNode, Req).

%% #15 - riak_core_vnode_master - handle_call
%% #4.7 - riak_core_vnode_master - handle_call
send_all_proxy_req(VNode, Req) ->
gen_fsm_compat:send_all_state_event(VNode, Req).

%% #16 - riak:core_handoff_sender - start_fold_
-spec handoff_complete(VNode :: pid()) -> ok.
%% =========================
%% send_event_after
%% =========================

%% #5
%% Sends a command to the FSM that called it after Time
%% has passed.
-spec send_command_after(integer(),
term()) -> reference().

send_command_after(Time, Request) ->
gen_fsm_compat:send_event_after(Time,
#riak_vnode_req_v1{request = Request}).

handoff_complete(VNode) ->
gen_fsm_compat:send_event(VNode, handoff_complete).

%% #17 - riak:core_handoff_sender - start_fold_
-spec resize_transfer_complete(VNode :: pid(),
NotSentAcc :: term()) -> ok.

resize_transfer_complete(VNode, NotSentAcc) ->
gen_fsm_compat:send_event(VNode,
{resize_transfer_complete, NotSentAcc}).

%% #18 - riak_core_handoff_receiver - process_message
handoff_data(VNode, MsgData, VNodeTimeout) ->
gen_fsm_compat:sync_send_all_state_event(VNode,
{handoff_data, MsgData},
VNodeTimeout).

%% #19 - riak_core_vnode_proxy - handle_cast
unregistered(VNode) ->
gen_fsm_compat:send_event(VNode, unregistered).

%% @doc Send a reply to a vnode request. If
%% the Ref is undefined just send the reply
Expand All @@ -379,34 +382,17 @@ unregistered(VNode) ->
%%
-spec reply(sender(), term()) -> any().

reply({fsm, undefined, From}, Reply) ->
reply({fsm, ignore_ref, From}, Reply) ->
riak_core_send_msg:send_event_unreliable(From, Reply);
reply({fsm, Ref, From}, Reply) ->
riak_core_send_msg:send_event_unreliable(From,
{Ref, Reply});
reply({server, undefined, From}, Reply) ->
riak_core_send_msg:send_event_unreliable(From, {Ref, Reply});

reply({server, ignore_ref, From}, Reply) ->
riak_core_send_msg:reply_unreliable(From, Reply);
reply({server, Ref, From}, Reply) ->
riak_core_send_msg:reply_unreliable(From, {Ref, Reply});
reply({raw, Ref, From}, Reply) ->
riak_core_send_msg:bang_unreliable(From, {Ref, Reply});
reply(ignore, _Reply) -> ok.

%% @doc Set up a monitor for the pid named by a {@type sender()} vnode
%% argument. If `Sender' was the atom `ignore', this function sets up
%% a monitor on `self()' in order to return a valid (if useless)
%% monitor reference.
-spec monitor(Sender :: sender()) -> Monitor ::
reference().

monitor({fsm, _, From}) ->
erlang:monitor(process, From);
monitor({server, _, {Pid, _Ref}}) ->
erlang:monitor(process, Pid);
monitor({raw, _, From}) ->
erlang:monitor(process, From);
monitor(ignore) -> erlang:monitor(process, self()).

%% ========================
%% ========
%% State, Mode, Init, Terminate
Expand Down Expand Up @@ -1539,6 +1525,9 @@ pool_death_test() ->
exit(Pid, normal),
wait_for_process_death(Pid),
meck:validate(test_pool_mod),
meck:validate(test_vnode).
meck:validate(test_vnode),
% TODO why is a short sleep needed to swallow crash message
timer:sleep(10),
error_logger:tty(true).

-endif.
Loading

0 comments on commit 9c3632a

Please sign in to comment.