diff --git a/metafacture-strings/src/main/java/org/metafacture/strings/LineRecorder.java b/metafacture-strings/src/main/java/org/metafacture/strings/LineRecorder.java index 97101dfa4..936a831b9 100644 --- a/metafacture-strings/src/main/java/org/metafacture/strings/LineRecorder.java +++ b/metafacture-strings/src/main/java/org/metafacture/strings/LineRecorder.java @@ -16,11 +16,11 @@ package org.metafacture.strings; import org.metafacture.framework.FluxCommand; -import org.metafacture.framework.ObjectPipe; import org.metafacture.framework.ObjectReceiver; import org.metafacture.framework.annotations.Description; import org.metafacture.framework.annotations.In; import org.metafacture.framework.annotations.Out; +import org.metafacture.framework.helpers.DefaultObjectPipe; /** * Collects strings and emits them as records when a line matches the pattern. @@ -34,7 +34,7 @@ @In(String.class) @Out(String.class) @FluxCommand("lines-to-records") -public final class LineRecorder implements ObjectPipe> { +public final class LineRecorder extends DefaultObjectPipe> { private static final int SB_CAPACITY = 4096 * 7; // empty line is the default @@ -70,34 +70,16 @@ record = new StringBuilder(SB_CAPACITY); } } - private boolean isClosed() { - return isClosed; - } - @Override - public void resetStream() { - record = new StringBuilder(SB_CAPACITY); - } - - @Override - public void closeStream() { - getReceiver().process(record.toString()); - isClosed = true; + protected void onCloseStream() { + if (record.length() > 0) { + getReceiver().process(record.toString()); + } } @Override - public > R setReceiver(final R newReceiver) { - receiver = newReceiver; - return newReceiver; - } - - /** - * Returns a reference to the downstream module. - * - * @return reference to the downstream module - */ - protected ObjectReceiver getReceiver() { - return receiver; + public void onResetStream() { + record = new StringBuilder(SB_CAPACITY); } } diff --git a/metafacture-strings/src/test/java/org/metafacture/strings/LineRecorderTest.java b/metafacture-strings/src/test/java/org/metafacture/strings/LineRecorderTest.java index 805854091..f77276a9f 100644 --- a/metafacture-strings/src/test/java/org/metafacture/strings/LineRecorderTest.java +++ b/metafacture-strings/src/test/java/org/metafacture/strings/LineRecorderTest.java @@ -111,6 +111,7 @@ public void shouldEmitLastRecordWithoutRecordMarkerWhenClosingStream() { LINE_SEPARATOR + RECORD3_PART2 + LINE_SEPARATOR); + ordered.verify(receiver).closeStream(); ordered.verifyNoMoreInteractions(); }