Skip to content

Commit

Permalink
Support consuming N message in a single command (#198)
Browse files Browse the repository at this point in the history
Co-authored-by: Kaili Zhu <[email protected]>
  • Loading branch information
zklgame and zklgame authored Jul 30, 2023
1 parent 757eed4 commit 3409ea7
Show file tree
Hide file tree
Showing 12 changed files with 477 additions and 29 deletions.
25 changes: 23 additions & 2 deletions src/main/java/io/iworkflow/core/command/CommandRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public abstract class CommandRequest {
* @return the command request
*/
public static CommandRequest forAllCommandCompleted(final BaseCommand... commands) {
return ImmutableCommandRequest.builder().addAllCommands(Arrays.asList(commands)).commandWaitingType(CommandWaitingType.ALL_COMPLETED).build();
final List<BaseCommand> allSingleCommands = getAllSingleCommands(commands);
return ImmutableCommandRequest.builder().addAllCommands(allSingleCommands).commandWaitingType(CommandWaitingType.ALL_COMPLETED).build();
}

/**
Expand All @@ -50,12 +51,32 @@ public static CommandRequest forAnyCommandCompleted(final BaseCommand... command
* @return the command request
*/
public static CommandRequest forAnyCommandCombinationCompleted(final List<List<String>> commandCombinationLists, final BaseCommand... commands) {
final List<BaseCommand> allSingleCommands = getAllSingleCommands(commands);

final List<CommandCombination> combinations = new ArrayList<>();
commandCombinationLists.forEach(commandIds -> combinations.add(new CommandCombination().commandIds(commandIds)));
return ImmutableCommandRequest.builder()
.commandCombinations(combinations)
.addAllCommands(Arrays.asList(commands))
.addAllCommands(allSingleCommands)
.commandWaitingType(CommandWaitingType.ANY_COMBINATION_COMPLETED)
.build();
}

private static List<BaseCommand> getAllSingleCommands(final BaseCommand... commands) {
final ArrayList<BaseCommand> allSingleCommands = new ArrayList<>();
Arrays.stream(commands).forEach(
command -> {
if (command instanceof SuperCommand) {
allSingleCommands.addAll(
SuperCommand.toList((SuperCommand) command)
);
return;
}

allSingleCommands.add(command);
}
);

return allSingleCommands;
}
}
47 changes: 47 additions & 0 deletions src/main/java/io/iworkflow/core/command/SuperCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.iworkflow.core.command;

import io.iworkflow.core.communication.InternalChannelCommand;
import io.iworkflow.core.communication.SignalCommand;
import org.immutables.value.Value;

import java.util.ArrayList;
import java.util.List;

@Value.Immutable
public abstract class SuperCommand implements BaseCommand {

public enum Type {
SIGNAL_CHANNEL,
INTERNAL_CHANNEL,
}

public abstract String getName();
public abstract int getCount();
public abstract Type getType();

public static List<BaseCommand> toList(final SuperCommand superCommand) {
final List<BaseCommand> commands = new ArrayList<>();

final boolean hasCommandId = superCommand.getCommandId().isPresent();
for (int i = 0; i < superCommand.getCount(); i ++) {
final BaseCommand command;
if (superCommand.getType() == Type.INTERNAL_CHANNEL) {
if (hasCommandId) {
command = InternalChannelCommand.create(superCommand.getCommandId().get(), superCommand.getName());
} else {
command = InternalChannelCommand.create(superCommand.getName());
}
} else {
if (hasCommandId) {
command = SignalCommand.create(superCommand.getCommandId().get(), superCommand.getName());
} else {
command = SignalCommand.create(superCommand.getName());
}
}

commands.add(command);
}

return commands;
}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,68 @@
package io.iworkflow.core.communication;

import io.iworkflow.core.command.BaseCommand;
import io.iworkflow.core.command.ImmutableSuperCommand;
import io.iworkflow.core.command.SuperCommand;
import org.immutables.value.Value;

@Value.Immutable
public abstract class InternalChannelCommand implements BaseCommand {

public abstract String getChannelName();

/**
* Create a super command that represents one or many internal channel commands.
*
* @param commandId required. All the internal channel commands created here will share the same commandId.
* @param channelName required.
* @param count required. It represents the number of internal channel commands to create.
* @return super command
*/
public static SuperCommand create(final String commandId, final String channelName, final int count) {
return ImmutableSuperCommand.builder()
.commandId(commandId)
.name(channelName)
.count(Math.max(1, count))
.type(SuperCommand.Type.INTERNAL_CHANNEL)
.build();
}

/**
* Create one internal channel command.
*
* @param commandId required.
* @param channelName required.
* @return internal channel command
*/
public static InternalChannelCommand create(final String commandId, final String channelName) {
return ImmutableInternalChannelCommand.builder()
.channelName(channelName)
.commandId(commandId)
.channelName(channelName)
.build();
}

/**
* Create a super command that represents one or many internal channel commands.
*
* @param channelName required.
* @param count required. It represents the number of internal channel commands to create.
* @return super command
*/
public static SuperCommand create(final String channelName, final int count) {
return ImmutableSuperCommand.builder()
.name(channelName)
.count(Math.max(1, count))
.type(SuperCommand.Type.INTERNAL_CHANNEL)
.build();
}

public static InternalChannelCommand create(String channelName) {
/**
* Create one internal channel command.
*
* @param channelName required.
* @return internal channel command
*/
public static InternalChannelCommand create(final String channelName) {
return ImmutableInternalChannelCommand.builder()
.channelName(channelName)
.build();
Expand Down
53 changes: 50 additions & 3 deletions src/main/java/io/iworkflow/core/communication/SignalCommand.java
Original file line number Diff line number Diff line change
@@ -1,21 +1,68 @@
package io.iworkflow.core.communication;

import io.iworkflow.core.command.BaseCommand;
import io.iworkflow.core.command.ImmutableSuperCommand;
import io.iworkflow.core.command.SuperCommand;
import org.immutables.value.Value;

@Value.Immutable
public abstract class SignalCommand implements BaseCommand {

public abstract String getSignalChannelName();

public static SignalCommand create(final String commandId, final String channelName) {
/**
* Create a super command that represents one or many signal commands.
*
* @param commandId required. All the signal commands created here will share the same commandId.
* @param signalName required.
* @param count required. It represents the number of signal commands to create.
* @return super command
*/
public static SuperCommand create(final String commandId, final String signalName, final int count) {
return ImmutableSuperCommand.builder()
.commandId(commandId)
.name(signalName)
.count(Math.max(1, count))
.type(SuperCommand.Type.SIGNAL_CHANNEL)
.build();
}

/**
* Create one signal command.
*
* @param commandId required.
* @param signalName required.
* @return signal command
*/
public static SignalCommand create(final String commandId, final String signalName) {
return ImmutableSignalCommand.builder()
.signalChannelName(channelName)
.commandId(commandId)
.signalChannelName(signalName)
.build();
}

/**
* Create a super command that represents one or many signal commands.
*
* @param signalName required.
* @param count required. It represents the number of signal commands to create.
* @return super command
*/
public static SuperCommand create(final String signalName, final int count) {
return ImmutableSuperCommand.builder()
.name(signalName)
.count(Math.max(1, count))
.type(SuperCommand.Type.SIGNAL_CHANNEL)
.build();
}

public static SignalCommand create(String signalName) {
/**
* Create one signal command.
*
* @param signalName required.
* @return signal command
*/
public static SignalCommand create(final String signalName) {
return ImmutableSignalCommand.builder()
.signalChannelName(signalName)
.build();
Expand Down
20 changes: 16 additions & 4 deletions src/test/java/io/iworkflow/integ/InternalChannelTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

import io.iworkflow.core.Client;
import io.iworkflow.core.ClientOptions;
import io.iworkflow.integ.interstatechannel.BasicInterStateChannelWorkflow;
import io.iworkflow.integ.internalchannel.BasicInternalChannelWorkflow;
import io.iworkflow.integ.internalchannel.MultipleSameInternalChannelWorkflow;
import io.iworkflow.spring.TestSingletonWorkerService;
import io.iworkflow.spring.controller.WorkflowRegistry;
import org.junit.jupiter.api.Assertions;
Expand All @@ -19,13 +20,24 @@ public void setup() throws ExecutionException, InterruptedException {
}

@Test
public void testBasicInterStateWorkflow() throws InterruptedException {
public void testBasicInternalWorkflow() throws InterruptedException {
final Client client = new Client(WorkflowRegistry.registry, ClientOptions.localDefault);
final String wfId = "basic-inter-state-test-id" + System.currentTimeMillis() / 1000;
final String wfId = "basic-internal-test-id" + System.currentTimeMillis() / 1000;
final Integer input = 1;
final String runId = client.startWorkflow(
BasicInterStateChannelWorkflow.class, wfId, 10, input);
BasicInternalChannelWorkflow.class, wfId, 10, input);
final Integer output = client.getSimpleWorkflowResultWithWait(Integer.class, wfId);
Assertions.assertEquals(3, output);
}

@Test
public void testMultipleSameInternalWorkflow() throws InterruptedException {
final Client client = new Client(WorkflowRegistry.registry, ClientOptions.localDefault);
final String wfId = "multiple-same-internal-test-id" + System.currentTimeMillis() / 1000;
final Integer input = 1;
final String runId = client.startWorkflow(
MultipleSameInternalChannelWorkflow.class, wfId, 10, input);
final Integer output = client.getSimpleWorkflowResultWithWait(Integer.class, wfId);
Assertions.assertEquals(5, output);
}
}
19 changes: 19 additions & 0 deletions src/test/java/io/iworkflow/integ/SignalTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import io.iworkflow.gen.models.ErrorSubStatus;
import io.iworkflow.integ.signal.BasicSignalWorkflow;
import io.iworkflow.integ.signal.BasicSignalWorkflowState2;
import io.iworkflow.integ.signal.MultipleSameSignalWorkflow;
import static io.iworkflow.integ.signal.MultipleSameSignalWorkflow.SIGNAL_NAME_1;
import io.iworkflow.spring.TestSingletonWorkerService;
import io.iworkflow.spring.controller.WorkflowRegistry;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -67,4 +69,21 @@ public void testBasicSignalWorkflow() throws InterruptedException {
}
Assertions.fail("signal closed workflow should fail");
}

@Test
public void testMultipleSameSignalWorkflow() {
final Client client = new Client(WorkflowRegistry.registry, ClientOptions.localDefault);
final String wfId = "multiple-same-signal-test-id" + System.currentTimeMillis() / 1000;
final Integer input = 1;
final String runId = client.startWorkflow(
MultipleSameSignalWorkflow.class, wfId, 10, input);
client.signalWorkflow(
MultipleSameSignalWorkflow.class, wfId, runId, SIGNAL_NAME_1, Integer.valueOf(2));

client.signalWorkflow(
MultipleSameSignalWorkflow.class, wfId, runId, SIGNAL_NAME_1, Integer.valueOf(3));

final Integer output = client.getSimpleWorkflowResultWithWait(Integer.class, wfId);
Assertions.assertEquals(5, output);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.iworkflow.integ.interstatechannel;
package io.iworkflow.integ.internalchannel;

import io.iworkflow.core.ObjectWorkflow;
import io.iworkflow.core.StateDef;
Expand All @@ -10,7 +10,7 @@
import java.util.List;

@Component
public class BasicInterStateChannelWorkflow implements ObjectWorkflow {
public class BasicInternalChannelWorkflow implements ObjectWorkflow {
public static final String INTER_STATE_CHANNEL_NAME_1 = "test-inter-state-channel-1";

public static final String INTER_STATE_CHANNEL_NAME_2 = "test-inter-state-channel-2";
Expand All @@ -28,9 +28,9 @@ public List<CommunicationMethodDef> getCommunicationSchema() {
@Override
public List<StateDef> getWorkflowStates() {
return Arrays.asList(
StateDef.startingState(new BasicInterStateChannelWorkflowState0()),
StateDef.nonStartingState(new BasicInterStateChannelWorkflowState1()),
StateDef.nonStartingState(new BasicInterStateChannelWorkflowState2())
StateDef.startingState(new BasicInternalChannelWorkflowState0()),
StateDef.nonStartingState(new BasicInternalChannelWorkflowState1()),
StateDef.nonStartingState(new BasicInternalChannelWorkflowState2())
);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.iworkflow.integ.interstatechannel;
package io.iworkflow.integ.internalchannel;

import io.iworkflow.core.Context;
import io.iworkflow.core.StateDecision;
Expand All @@ -9,7 +9,7 @@
import io.iworkflow.core.communication.Communication;
import io.iworkflow.core.persistence.Persistence;

public class BasicInterStateChannelWorkflowState0 implements WorkflowState<Integer> {
public class BasicInternalChannelWorkflowState0 implements WorkflowState<Integer> {

@Override
public Class<Integer> getInputType() {
Expand All @@ -31,8 +31,8 @@ public StateDecision execute(
CommandResults commandResults,
Persistence persistence, final Communication communication) {
return StateDecision.multiNextStates(
StateMovement.create(BasicInterStateChannelWorkflowState1.class, input),
StateMovement.create(BasicInterStateChannelWorkflowState2.class, input)
StateMovement.create(BasicInternalChannelWorkflowState1.class, input),
StateMovement.create(BasicInternalChannelWorkflowState2.class, input)
);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.iworkflow.integ.interstatechannel;
package io.iworkflow.integ.internalchannel;

import io.iworkflow.core.Context;
import io.iworkflow.core.StateDecision;
Expand All @@ -13,7 +13,7 @@

import java.util.Arrays;

public class BasicInterStateChannelWorkflowState1 implements WorkflowState<Integer> {
public class BasicInternalChannelWorkflowState1 implements WorkflowState<Integer> {
public static final String COMMAND_ID = "test-cmd-id";
public static final String COMMAND_ID_2 = "test-cmd-id-2";

Expand All @@ -32,9 +32,9 @@ public CommandRequest waitUntil(
Arrays.asList(
Arrays.asList(COMMAND_ID, COMMAND_ID_2)
),
InternalChannelCommand.create(COMMAND_ID, BasicInterStateChannelWorkflow.INTER_STATE_CHANNEL_NAME_1),
InternalChannelCommand.create(COMMAND_ID, BasicInterStateChannelWorkflow.INTER_STATE_CHANNEL_NAME_2),
InternalChannelCommand.create(COMMAND_ID_2, BasicInterStateChannelWorkflow.INTER_STATE_CHANNEL_PREFIX_1 + "1")
InternalChannelCommand.create(COMMAND_ID, BasicInternalChannelWorkflow.INTER_STATE_CHANNEL_NAME_1),
InternalChannelCommand.create(COMMAND_ID, BasicInternalChannelWorkflow.INTER_STATE_CHANNEL_NAME_2),
InternalChannelCommand.create(COMMAND_ID_2, BasicInternalChannelWorkflow.INTER_STATE_CHANNEL_PREFIX_1 + "1")
);
}

Expand Down
Loading

0 comments on commit 3409ea7

Please sign in to comment.