diff --git a/README.md b/README.md index 3ad05ed21..7745aee80 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ Build upon the essence of Riak KV's core with an up-to-date, modular and extensi ![Build](https://img.shields.io/badge/build-rebar3%203.15.1-brightgreen.svg) [![Hex pm](https://img.shields.io/hexpm/v/riak_core_lite.svg)](https://hex.pm/packages/riak_core_lite) -![Erlang CI](https://github.com/albsch/riak_core_lite/workflows/Erlang%20CI/badge.svg) +![Erlang CI](https://github.com/riak-core-lite/riak_core_lite/workflows/Erlang%20CI/badge.svg) [![Coverage Status](https://coveralls.io/repos/github/riak-core-lite/riak_core_lite/badge.svg?branch=master)](https://coveralls.io/github/riak-core-lite/riak_core_lite?branch=master) diff --git a/include/riak_core_vnode.hrl b/include/riak_core_vnode.hrl index 2a0e2eb0b..d3e8c3fb1 100644 --- a/include/riak_core_vnode.hrl +++ b/include/riak_core_vnode.hrl @@ -1,16 +1,6 @@ --type sender_type() :: fsm | server | raw. +-type sender_type() :: fsm | server . --type sender() :: {sender_type(), reference() | tuple(), - pid()} | - {fsm, undefined, pid()} | - {server, undefined, undefined} | - ignore. - - %% TODO: Double-check that these special cases are kosher - - % what are these special cases and what is the reference used for?? - - % special case in riak_core_vnode_master.erl +-type sender() :: {sender_type(), reference() | tuple() | ignore_ref , pid()} | ignore. -type partition() :: chash:index_as_int(). diff --git a/src/riak_core.app.src b/src/riak_core.app.src index aed3414cb..7e0d75780 100644 --- a/src/riak_core.app.src +++ b/src/riak_core.app.src @@ -1,7 +1,7 @@ {application, riak_core, [{description, "Riak Core Lite"}, - {vsn, "0.10.3"}, + {vsn, "0.10.4"}, {applications, [kernel, stdlib, crypto, os_mon, poolboy]}, {mod, {riak_core_app, []}}, diff --git a/src/riak_core_send_msg.erl b/src/riak_core_send_msg.erl index e06094d84..7f9e2221b 100644 --- a/src/riak_core_send_msg.erl +++ b/src/riak_core_send_msg.erl @@ -23,33 +23,13 @@ -module(riak_core_send_msg). -export([reply_unreliable/2, - cast_unreliable/2, - send_event_unreliable/2, - bang_unreliable/2]). - --ifdef(TEST). - --ifdef(PULSE). - --compile(export_all). - --compile({parse_transform, pulse_instrument}). - --compile({pulse_replace_module, - [{gen_fsm, pulse_gen_fsm}, - {gen_fsm_compat, pulse_gen_fsm}, - {gen_server, pulse_gen_server}]}). - --endif. - --endif. + send_event_unreliable/2]). %% NOTE: We'ed peeked inside gen_server.erl's guts to see its internals. reply_unreliable({To, Tag}, Reply) -> - bang_unreliable(To, {Tag, Reply}). - -cast_unreliable(Dest, Request) -> - bang_unreliable(Dest, {'$gen_cast', Request}). + bang_unreliable(To, {Tag, Reply}); +reply_unreliable(To, Reply) -> + bang_unreliable(To, Reply). %% NOTE: We'ed peeked inside gen_fsm.erl's guts to see its internals. send_event_unreliable({global, _Name} = GlobalTo, diff --git a/src/riak_core_vnode.erl b/src/riak_core_vnode.erl index b95089dae..97e65c764 100644 --- a/src/riak_core_vnode.erl +++ b/src/riak_core_vnode.erl @@ -41,7 +41,7 @@ terminate/3, code_change/4]). --export([reply/2, monitor/1]). +-export([reply/2]). -export([get_mod_index/1, get_modstate/1, @@ -260,115 +260,118 @@ start_link(Mod, Index, InitialInactivityTimeout, [Mod, Index, InitialInactivityTimeout, Forward], []). +%% ========================= +%% sync_send_event +%% ========================= + %% #1 - State started -wait_for_init(Vnode) -> - gen_fsm_compat:sync_send_event(Vnode, - wait_for_init, - infinity). +wait_for_init(Vnode) -> gen_fsm_compat:sync_send_event(Vnode, wait_for_init, infinity). + +%% ========================= +%% send_event +%% ========================= -%% #2 - +%% #2.1 - %% Send a command message for the vnode module by Pid - %% typically to do some deferred processing after returning yourself -send_command(Pid, Request) -> - gen_fsm_compat:send_event(Pid, - #riak_vnode_req_v1{request = Request}). +send_command(Pid, Request) -> gen_fsm_compat:send_event(Pid, #riak_vnode_req_v1{request = Request}). + +%% #2.2 - +handoff_error(Vnode, Err, Reason) -> gen_fsm_compat:send_event(Vnode, {handoff_error, Err, Reason}). + +%% #2.3 - riak_core_vnode_master - send_an_event +send_an_event(VNode, Event) -> gen_fsm_compat:send_event(VNode, Event). + +%% #2.4 - riak_core_vnode_master - handle_cast/handle_call +%riak_core_vnode_master - command2 +%riak_core_vnode_proxy - handle_call +send_req(VNode, Req) -> gen_fsm_compat:send_event(VNode, Req). + +%% #2.5 - riak:core_handoff_sender - start_fold_ +-spec handoff_complete(VNode :: pid()) -> ok. +handoff_complete(VNode) -> gen_fsm_compat:send_event(VNode, handoff_complete). -%% #3 - -handoff_error(Vnode, Err, Reason) -> - gen_fsm_compat:send_event(Vnode, - {handoff_error, Err, Reason}). +%% #2.6 - riak:core_handoff_sender - start_fold_ +-spec resize_transfer_complete(VNode :: pid(), NotSentAcc :: term()) -> ok. +resize_transfer_complete(VNode, NotSentAcc) -> gen_fsm_compat:send_event(VNode, {resize_transfer_complete, NotSentAcc}). -%% #4 - +%% #2.7 - riak_core_vnode_proxy - handle_cast +unregistered(VNode) -> gen_fsm_compat:send_event(VNode, unregistered). + +%% ========================= +%% sync_send_all_state_event +%% ========================= + +%% #3.1 get_mod_index(VNode) -> gen_fsm_compat:sync_send_all_state_event(VNode, get_mod_index). +%% #3.2 +core_status(VNode) -> + gen_fsm_compat:sync_send_all_state_event(VNode, core_status). -%% #5 +%% #3.3 - riak_core_handoff_receiver - process_message +handoff_data(VNode, MsgData, VNodeTimeout) -> + gen_fsm_compat:sync_send_all_state_event(VNode, {handoff_data, MsgData}, VNodeTimeout). + +%% ========================= +%% send_all_state_event +%% ========================= + +%% #4.1 set_forwarding(VNode, ForwardTo) -> gen_fsm_compat:send_all_state_event(VNode, {set_forwarding, ForwardTo}). -%% #6 +%% #4.2 trigger_handoff(VNode, TargetIdx, TargetNode) -> gen_fsm_compat:send_all_state_event(VNode, {trigger_handoff, TargetIdx, TargetNode}). -%% #7 +%% #4.3 trigger_handoff(VNode, TargetNode) -> gen_fsm_compat:send_all_state_event(VNode, {trigger_handoff, TargetNode}). -%% #8 +%% #4.4 trigger_delete(VNode) -> gen_fsm_compat:send_all_state_event(VNode, trigger_delete). -%% #9 -core_status(VNode) -> - gen_fsm_compat:sync_send_all_state_event(VNode, - core_status). - -%% #10 -%% Sends a command to the FSM that called it after Time -%% has passed. --spec send_command_after(integer(), - term()) -> reference(). - -send_command_after(Time, Request) -> - gen_fsm_compat:send_event_after(Time, - #riak_vnode_req_v1{request = Request}). - -%%%%%%% %new APIs -%% #11 - riak_core_vnode_manager - handle_vnode_event +%% #4.5 - riak_core_vnode_manager - handle_vnode_event cast_finish_handoff(VNode) -> gen_fsm_compat:send_all_state_event(VNode, finish_handoff). -%% #12 - riak_core_vnode_manager - handle_vnode_event +%% #4.6 - riak_core_vnode_manager - handle_vnode_event cancel_handoff(VNode) -> gen_fsm_compat:send_all_state_event(VNode, cancel_handoff). -%% #13 - riak_core_vnode_master - send_an_event -send_an_event(VNode, Event) -> - gen_fsm_compat:send_event(VNode, Event). - -%% #14 - riak_core_vnode_master - handle_cast/handle_call - -%riak_core_vnode_master - command2 -%riak_core_vnode_proxy - handle_call -send_req(VNode, Req) -> - gen_fsm_compat:send_event(VNode, Req). - -%% #15 - riak_core_vnode_master - handle_call +%% #4.7 - riak_core_vnode_master - handle_call send_all_proxy_req(VNode, Req) -> gen_fsm_compat:send_all_state_event(VNode, Req). -%% #16 - riak:core_handoff_sender - start_fold_ --spec handoff_complete(VNode :: pid()) -> ok. +%% ========================= +%% send_event_after +%% ========================= + +%% #5 +%% Sends a command to the FSM that called it after Time +%% has passed. +-spec send_command_after(integer(), + term()) -> reference(). + +send_command_after(Time, Request) -> + gen_fsm_compat:send_event_after(Time, + #riak_vnode_req_v1{request = Request}). -handoff_complete(VNode) -> - gen_fsm_compat:send_event(VNode, handoff_complete). -%% #17 - riak:core_handoff_sender - start_fold_ --spec resize_transfer_complete(VNode :: pid(), - NotSentAcc :: term()) -> ok. -resize_transfer_complete(VNode, NotSentAcc) -> - gen_fsm_compat:send_event(VNode, - {resize_transfer_complete, NotSentAcc}). -%% #18 - riak_core_handoff_receiver - process_message -handoff_data(VNode, MsgData, VNodeTimeout) -> - gen_fsm_compat:sync_send_all_state_event(VNode, - {handoff_data, MsgData}, - VNodeTimeout). -%% #19 - riak_core_vnode_proxy - handle_cast -unregistered(VNode) -> - gen_fsm_compat:send_event(VNode, unregistered). %% @doc Send a reply to a vnode request. If %% the Ref is undefined just send the reply @@ -379,34 +382,17 @@ unregistered(VNode) -> %% -spec reply(sender(), term()) -> any(). -reply({fsm, undefined, From}, Reply) -> +reply({fsm, ignore_ref, From}, Reply) -> riak_core_send_msg:send_event_unreliable(From, Reply); reply({fsm, Ref, From}, Reply) -> - riak_core_send_msg:send_event_unreliable(From, - {Ref, Reply}); -reply({server, undefined, From}, Reply) -> + riak_core_send_msg:send_event_unreliable(From, {Ref, Reply}); + +reply({server, ignore_ref, From}, Reply) -> riak_core_send_msg:reply_unreliable(From, Reply); reply({server, Ref, From}, Reply) -> riak_core_send_msg:reply_unreliable(From, {Ref, Reply}); -reply({raw, Ref, From}, Reply) -> - riak_core_send_msg:bang_unreliable(From, {Ref, Reply}); reply(ignore, _Reply) -> ok. -%% @doc Set up a monitor for the pid named by a {@type sender()} vnode -%% argument. If `Sender' was the atom `ignore', this function sets up -%% a monitor on `self()' in order to return a valid (if useless) -%% monitor reference. --spec monitor(Sender :: sender()) -> Monitor :: - reference(). - -monitor({fsm, _, From}) -> - erlang:monitor(process, From); -monitor({server, _, {Pid, _Ref}}) -> - erlang:monitor(process, Pid); -monitor({raw, _, From}) -> - erlang:monitor(process, From); -monitor(ignore) -> erlang:monitor(process, self()). - %% ======================== %% ======== %% State, Mode, Init, Terminate @@ -1539,6 +1525,9 @@ pool_death_test() -> exit(Pid, normal), wait_for_process_death(Pid), meck:validate(test_pool_mod), - meck:validate(test_vnode). + meck:validate(test_vnode), + % TODO why is a short sleep needed to swallow crash message + timer:sleep(10), + error_logger:tty(true). -endif. diff --git a/src/riak_core_vnode_master.erl b/src/riak_core_vnode_master.erl index 8b2407aa4..80a89438c 100644 --- a/src/riak_core_vnode_master.erl +++ b/src/riak_core_vnode_master.erl @@ -40,8 +40,6 @@ coverage/5, command_return_vnode/4, sync_spawn_command/3, - make_request/3, - make_coverage_request/4, all_nodes/1, reg_name/1]). @@ -56,7 +54,7 @@ -define(LONG_TIMEOUT, 120 * 1000). --type riak_vnode_req_v1() :: #riak_vnode_req_v1{}. +%-type riak_vnode_req_v1() :: #riak_vnode_req_v1{}. -type riak_coverage_req_v1() :: #riak_coverage_req_v1{}. @@ -105,19 +103,19 @@ command2([], _Msg, _Sender, _VMaster, _How) -> ok; command2([{Index, Pid} | Rest], Msg, Sender, VMaster, How = normal) when is_pid(Pid) -> - Request = make_request(Msg, Sender, Index), + Request = #riak_vnode_req_v1{request = Msg, sender = Sender, index = Index}, riak_core_vnode:send_req(Pid, Request), command2(Rest, Msg, Sender, VMaster, How); command2([{Index, Pid} | Rest], Msg, Sender, VMaster, How = unreliable) when is_pid(Pid) -> riak_core_send_msg:send_event_unreliable(Pid, - make_request(Msg, Sender, Index)), + #riak_vnode_req_v1{request = Msg, sender = Sender, index = Index}), command2(Rest, Msg, Sender, VMaster, How); command2([{Index, Node} | Rest], Msg, Sender, VMaster, How) -> proxy_cast({VMaster, Node}, - make_request(Msg, Sender, Index), + #riak_vnode_req_v1{request = Msg, sender = Sender, index = Index}, How), command2(Rest, Msg, Sender, VMaster, How); command2(DestTuple, Msg, Sender, VMaster, How) @@ -145,7 +143,7 @@ coverage(Msg, {Index, Node}, Keyspaces, Sender, %% return the pid for the vnode handling the request, as `{ok, VnodePid}'. command_return_vnode({Index, Node}, Msg, Sender, VMaster) -> - Req = make_request(Msg, Sender, Index), + Req = #riak_vnode_req_v1{request = Msg, sender = Sender, index = Index}, Mod = vmaster_to_vmod(VMaster), riak_core_vnode_proxy:command_return_vnode({Mod, Index, @@ -161,10 +159,7 @@ sync_command({Index, Node}, Msg, VMaster, Timeout) -> %% Issue the call to the master, it will update the Sender with %% the From for handle_call so that the {reply} return gets %% sent here. - Request = make_request(Msg, - {server, undefined, undefined}, - Index), - case gen_server:call({VMaster, Node}, Request, Timeout) + case gen_server:call({VMaster, Node}, {call, {Index, Msg}}, Timeout) of {vnode_error, {Error, _Args}} -> error(Error); {vnode_error, Error} -> error(Error); @@ -175,26 +170,12 @@ sync_command({Index, Node}, Msg, VMaster, Timeout) -> %% Will not return until the vnode has returned, but the vnode_master will %% continue to handle requests. sync_spawn_command({Index, Node}, Msg, VMaster) -> - Request = make_request(Msg, - {server, undefined, undefined}, - Index), - case gen_server:call({VMaster, Node}, - {spawn, Request}, - infinity) - of + case gen_server:call({VMaster, Node}, {spawn, {Index, Msg}}, infinity) of {vnode_error, {Error, _Args}} -> error(Error); {vnode_error, Error} -> error(Error); Else -> Else end. -%% Make a request record - exported for use by legacy modules --spec make_request(vnode_req(), sender(), - partition()) -> riak_vnode_req_v1(). - -make_request(Request, Sender, Index) -> - #riak_vnode_req_v1{index = Index, sender = Sender, - request = Request}. - %% Make a request record - exported for use by legacy modules -spec make_coverage_request(vnode_req(), keyspaces(), sender(), partition()) -> riak_coverage_req_v1(). @@ -269,29 +250,20 @@ handle_call({return_vnode, node()}, Req), {reply, {ok, Pid}, State}; -handle_call(Req = #riak_vnode_req_v1{index = Idx, - sender = {server, undefined, undefined}}, +handle_call({call, {Index, Message}}, From, State = #state{vnode_mod = Mod}) -> - Proxy = riak_core_vnode_proxy:reg_name(Mod, Idx), - riak_core_vnode:send_req(Proxy, - Req#riak_vnode_req_v1{sender = - {server, - undefined, - From}}), + Proxy = riak_core_vnode_proxy:reg_name(Mod, Index), + Sender = {server, ignore_ref, From}, + riak_core_vnode:send_req(Proxy, #riak_vnode_req_v1{index = Index, request = Message, sender = Sender}), {noreply, State}; -handle_call({spawn, - Req = #riak_vnode_req_v1{index = Idx, - sender = {server, undefined, undefined}}}, - From, State = #state{vnode_mod = Mod}) -> - Proxy = riak_core_vnode_proxy:reg_name(Mod, Idx), - Sender = {server, undefined, From}, - spawn_link(fun () -> - riak_core_vnode:send_all_proxy_req(Proxy, - Req#riak_vnode_req_v1{sender - = - Sender}) - end), - {noreply, State}. +handle_call({spawn, {Index, Message}}, + From, State = #state{vnode_mod = Mod}) -> + Proxy = riak_core_vnode_proxy:reg_name(Mod, Index), + Sender = {server, ignore_ref, From}, + spawn_link(fun () -> + riak_core_vnode:send_all_proxy_req(Proxy, #riak_vnode_req_v1{index = Index, request = Message, sender = Sender}) + end), + {noreply, State}. handle_info(_Info, State) -> {noreply, State}. diff --git a/test/worker_pool_test.erl b/test/worker_pool_test.erl index 499fad52c..adedbd5ee 100644 --- a/test/worker_pool_test.erl +++ b/test/worker_pool_test.erl @@ -77,7 +77,7 @@ deadlock_test() -> {worker_init, self()}, receive continue -> ok end end, - {raw, 1, self()}), + {server, 1, self()}), CoordinatorLoop ! {wait_for_worker, self()}, receive continue -> ok end, riak_core_vnode_worker_pool:handle_work(Pool, @@ -86,7 +86,7 @@ deadlock_test() -> {ready_to_crash}, erlang:error(-1) end, - {raw, 1, self()}), + {server, 1, self()}), % now we have to wait a bit % because handle_work is a cast and there is no way to check when it's in the queue timer:sleep(50), @@ -98,7 +98,7 @@ deadlock_test() -> CoordinatorLoop ! finish_test end, - {raw, 1, self()}), + {server, 1, self()}), receive finish_test -> ok end, unlink(Pool), ok = riak_core_vnode_worker_pool:stop(Pool, normal), @@ -116,7 +116,7 @@ simple_reply_worker_pool() -> timer:sleep(10), 1 / (N rem 2) end, - {raw, N, self()}) + {server, N, self()}) || N <- lists:seq(1, 10)], timer:sleep(200), %% make sure we got all replies @@ -138,7 +138,7 @@ simple_noreply_worker_pool() -> timer:sleep(10), 1 / (N rem 2) end, - {raw, N, self()}) + {server, N, self()}) || N <- lists:seq(1, 10)], timer:sleep(200), %% make sure that the non-crashing work calls receive timeouts @@ -172,7 +172,7 @@ shutdown_pool_worker_finish_success() -> []), riak_core_vnode_worker_pool:handle_work(Pool, fun () -> timer:sleep(50) end, - {raw, 1, self()}), + {server, 1, self()}), unlink(Pool), ok = riak_core_vnode_worker_pool:shutdown_pool(Pool, 100), @@ -188,7 +188,7 @@ shutdown_pool_force_timeout() -> []), riak_core_vnode_worker_pool:handle_work(Pool, fun () -> timer:sleep(100) end, - {raw, 1, self()}), + {server, 1, self()}), unlink(Pool), {error, vnode_shutdown} = riak_core_vnode_worker_pool:shutdown_pool(Pool, 50), @@ -204,7 +204,7 @@ shutdown_pool_duplicate_calls() -> []), riak_core_vnode_worker_pool:handle_work(Pool, fun () -> timer:sleep(100) end, - {raw, 1, self()}), + {server, 1, self()}), unlink(Pool), %% request shutdown a bit later a second time spawn_link(fun () ->