Skip to content

Commit

Permalink
Call LineRecorder receiver's closeStream() (#433)
Browse files Browse the repository at this point in the history
Under some circumstances (e.g. if the JVM is not stopped after processing)
the streams of receivers connected to LineRecorder are not closed, resulting
in empty or missing output data.

By switch from "implements ObjectPipe" to "extends DefaultObjectPipe" the "DefaultSender"
is used which implements the Sender interface. This ensures the proper closing of streams.

- complement test
  • Loading branch information
dr0i committed Dec 30, 2021
1 parent 1ecaa75 commit 11ef5aa
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
package org.metafacture.strings;

import org.metafacture.framework.FluxCommand;
import org.metafacture.framework.ObjectPipe;
import org.metafacture.framework.ObjectReceiver;
import org.metafacture.framework.annotations.Description;
import org.metafacture.framework.annotations.In;
import org.metafacture.framework.annotations.Out;
import org.metafacture.framework.helpers.DefaultObjectPipe;

/**
* Collects strings and emits them as records when a line matches the pattern.
Expand All @@ -34,7 +34,7 @@
@In(String.class)
@Out(String.class)
@FluxCommand("lines-to-records")
public final class LineRecorder implements ObjectPipe<String, ObjectReceiver<String>> {
public final class LineRecorder extends DefaultObjectPipe<String, ObjectReceiver<String>> {

private static final int SB_CAPACITY = 4096 * 7;
// empty line is the default
Expand Down Expand Up @@ -70,34 +70,16 @@ record = new StringBuilder(SB_CAPACITY);
}
}

private boolean isClosed() {
return isClosed;
}

@Override
public void resetStream() {
record = new StringBuilder(SB_CAPACITY);
}

@Override
public void closeStream() {
getReceiver().process(record.toString());
isClosed = true;
protected void onCloseStream() {
if (record.length() > 0) {
getReceiver().process(record.toString());
}
}

@Override
public <R extends ObjectReceiver<String>> R setReceiver(final R newReceiver) {
receiver = newReceiver;
return newReceiver;
}

/**
* Returns a reference to the downstream module.
*
* @return reference to the downstream module
*/
protected ObjectReceiver<String> getReceiver() {
return receiver;
public void onResetStream() {
record = new StringBuilder(SB_CAPACITY);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public void shouldEmitLastRecordWithoutRecordMarkerWhenClosingStream() {
LINE_SEPARATOR +
RECORD3_PART2 +
LINE_SEPARATOR);
ordered.verify(receiver).closeStream();
ordered.verifyNoMoreInteractions();
}

Expand Down

0 comments on commit 11ef5aa

Please sign in to comment.