From fb0fcf6bd76026ceed21fda5194af3600ef5f541 Mon Sep 17 00:00:00 2001 From: medcl Date: Sat, 14 Dec 2019 11:36:07 +0800 Subject: [PATCH] fix duplicate content when soft link, replace lumberjack to tcp protocol --- .gitignore | 4 +- HOWTO-KEYSTORE.md | 0 LICENSE.md | 0 README.md | 75 ++-- config.json | 12 + pom.xml | 15 +- src/assembly/tarball.xml | 0 .../info/fetter/logstashforwarder/Event.java | 18 +- .../FileModificationListener.java | 0 .../fetter/logstashforwarder/FileReader.java | 16 +- .../fetter/logstashforwarder/FileSigner.java | 0 .../fetter/logstashforwarder/FileState.java | 4 +- .../fetter/logstashforwarder/FileWatcher.java | 6 +- .../info/fetter/logstashforwarder/Filter.java | 0 .../fetter/logstashforwarder/Forwarder.java | 34 +- .../fetter/logstashforwarder/InputReader.java | 0 .../fetter/logstashforwarder/Multiline.java | 0 .../logstashforwarder/ProtocolAdapter.java | 0 .../info/fetter/logstashforwarder/Reader.java | 5 +- .../fetter/logstashforwarder/Registrar.java | 0 .../config/Configuration.java | 0 .../config/ConfigurationManager.java | 2 +- .../config/FilesSection.java | 6 +- .../config/NetworkSection.java | 0 .../protocol/LumberjackClient.java | 2 +- .../logstashforwarder/protocol/TCPClient.java | 125 +++++++ .../util/AdapterException.java | 0 .../logstashforwarder/util/KMPMatch.java | 0 .../util/LastModifiedFileFilter.java | 0 .../util/RandomAccessFile.java | 0 src/main/resources/deploy.properties | 13 + .../logstashforwarder/FileReaderTest.java | 198 +++++----- .../logstashforwarder/FileWatcherTest.java | 338 +++++++++--------- .../logstashforwarder/InputReaderTest.java | 248 ++++++------- .../MockProtocolAdapter.java | 116 +++--- .../logstashforwarder/RegistrarTest.java | 0 .../config/ConfigurationManagerTest.java | 0 src/test/resources/config1.json | 0 src/test/resources/state1.json | 0 testFileWatcher1.txt | 9 + testFileWatcher2.txt | 3 + 41 files changed, 713 insertions(+), 536 deletions(-) mode change 100644 => 100755 HOWTO-KEYSTORE.md mode change 100644 => 100755 LICENSE.md mode change 100644 => 100755 README.md create mode 100644 config.json mode change 100644 => 100755 pom.xml mode change 100644 => 100755 src/assembly/tarball.xml mode change 100644 => 100755 src/main/java/info/fetter/logstashforwarder/Event.java mode change 100644 => 100755 src/main/java/info/fetter/logstashforwarder/FileModificationListener.java mode change 100644 => 100755 src/main/java/info/fetter/logstashforwarder/FileReader.java mode change 100644 => 100755 src/main/java/info/fetter/logstashforwarder/FileSigner.java mode change 100644 => 100755 src/main/java/info/fetter/logstashforwarder/FileState.java mode change 100644 => 100755 src/main/java/info/fetter/logstashforwarder/FileWatcher.java mode change 100644 => 100755 src/main/java/info/fetter/logstashforwarder/Filter.java mode change 100644 => 100755 src/main/java/info/fetter/logstashforwarder/Forwarder.java mode change 100644 => 100755 src/main/java/info/fetter/logstashforwarder/InputReader.java mode change 100644 => 100755 src/main/java/info/fetter/logstashforwarder/Multiline.java mode change 100644 => 100755 src/main/java/info/fetter/logstashforwarder/ProtocolAdapter.java mode change 100644 => 100755 src/main/java/info/fetter/logstashforwarder/Reader.java mode change 100644 => 100755 src/main/java/info/fetter/logstashforwarder/Registrar.java mode change 100644 => 100755 src/main/java/info/fetter/logstashforwarder/config/Configuration.java mode change 100644 => 100755 src/main/java/info/fetter/logstashforwarder/config/ConfigurationManager.java mode change 100644 => 100755 src/main/java/info/fetter/logstashforwarder/config/FilesSection.java mode change 100644 => 100755 src/main/java/info/fetter/logstashforwarder/config/NetworkSection.java mode change 100644 => 100755 src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java create mode 100644 src/main/java/info/fetter/logstashforwarder/protocol/TCPClient.java mode change 100644 => 100755 src/main/java/info/fetter/logstashforwarder/util/AdapterException.java mode change 100644 => 100755 src/main/java/info/fetter/logstashforwarder/util/KMPMatch.java mode change 100644 => 100755 src/main/java/info/fetter/logstashforwarder/util/LastModifiedFileFilter.java mode change 100644 => 100755 src/main/java/info/fetter/logstashforwarder/util/RandomAccessFile.java create mode 100755 src/main/resources/deploy.properties mode change 100644 => 100755 src/test/java/info/fetter/logstashforwarder/FileReaderTest.java mode change 100644 => 100755 src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java mode change 100644 => 100755 src/test/java/info/fetter/logstashforwarder/InputReaderTest.java mode change 100644 => 100755 src/test/java/info/fetter/logstashforwarder/MockProtocolAdapter.java mode change 100644 => 100755 src/test/java/info/fetter/logstashforwarder/RegistrarTest.java mode change 100644 => 100755 src/test/java/info/fetter/logstashforwarder/config/ConfigurationManagerTest.java mode change 100644 => 100755 src/test/resources/config1.json mode change 100644 => 100755 src/test/resources/state1.json create mode 100644 testFileWatcher1.txt create mode 100644 testFileWatcher2.txt diff --git a/.gitignore b/.gitignore index 5d5e58d..779c8c6 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,6 @@ /testFileReader1.txt /state2.json .idea/ -logstash-forwarder-java.iml \ No newline at end of file +logstash-forwarder-java.iml +.DS_Store +.logstash-forwarder-java diff --git a/HOWTO-KEYSTORE.md b/HOWTO-KEYSTORE.md old mode 100644 new mode 100755 diff --git a/LICENSE.md b/LICENSE.md old mode 100644 new mode 100755 diff --git a/README.md b/README.md old mode 100644 new mode 100755 index 4eaf5ae..5a117c9 --- a/README.md +++ b/README.md @@ -1,62 +1,73 @@ -# logstash-forwarder-java +# logstash-forwarder-java(tcp-input) -## What is this ? +Forked from https://github.com/didfet/logstash-forwarder-java -Logstash-forwarder-java is a log shipper program written in java. This is in fact a java version of [logstash-forwarder](https://github.com/elasticsearch/logstash-forwarder) by jordansissel. -Here are a few features of this program : - - compatible with Java 5 runtime - - lightweight : package size is ~2MB and memory footprint ~8MB - - configuration compatible with logstash-forwarder - - lumberjack output (including zlib compression) +Logstash Lumberjack input is not robust enough, this fork replaced Lumberjack protocol to plain TCP. -## Why ? +## Example -Logstash-forwarder is written in go. This programming language is not available on all platforms (for example AIX), that's why a java version is more portable. +### Logstash pipelines -Logstash runs on java and provides a lumberjack output, but the file input doesn't run on all plaforms (for example AIX) and logstash requires a recent JVM. Moreover Logstash is heavier : it is a big package and uses more system resources. +``` +input { + tcp { + port => 5055 + codec => json_lines + #ssl_cert => "/etc/logstash/server.crt" + #ssl_certificate_authorities => "/etc/logstash/ca.crt" + #ssl_enable => true + #ssl_key => "/etc/logstash/server.key" + #ssl_key_passphrase => "${TCP_KEY_PASS}" + #ssl_verify => false + tcp_keep_alive => true + } +} -So logstash-forwarder-java is a solution for those who want a portable, lightweight log shipper for their ELK stack. +filter { +} -## How to install it ? +output { + stdout{} +} +``` +### Run logstash-forwarder -Download one of the following archives : - - [logstash-forwarder-java-0.2.4-bin.zip](https://github.com/didfet/logstash-forwarder-java/releases/download/0.2.4/logstash-forwarder-java-0.2.4-bin.zip) - - [logstash-forwarder-java-0.2.4-bin.tar.gz](https://github.com/didfet/logstash-forwarder-java/releases/download/0.2.4/logstash-forwarder-java-0.2.4-bin.tar.gz) - - [logstash-forwarder-java-0.2.4-bin.tar.bz2](https://github.com/didfet/logstash-forwarder-java/releases/download/0.2.4/logstash-forwarder-java-0.2.4-bin.tar.bz2) +Just run this command : -Or download the maven project and run maven package. Then you can install one of the archives located in the target directory. + java -jar logstash-forwarder-java-X.Y.Z.jar -config /path/to/config/file.json -## How to run it ? +### Run logstash-forwarder with embedded JRE -Just run this command : +Go to [https://github.com/medcl/logstash-forwarder-java/releases/tag/TCP-Forwarder](https://github.com/medcl/logstash-forwarder-java/releases/tag/TCP-Forwarder) - java -jar logstash-forwarder-java-X.Y.Z.jar -config /path/to/config/file.json +1. Download `logstash-forwarder-java.zip`, unzip it + +2. Download related JDK to `logstash-forwarder-java` folder, unzip it, `7z x jdk8u232-b09-jre-.7z` + +3. Change `run.sh`, update JDK path + +4. Update `config.json`, with your server and file path -For help run : +5. Run `run.sh` - java -jar logstash-forwarder-java-X.Y.Z.jar -help +For more JRE, download from here: -## Differences with logstash-forwarder +- https://adoptopenjdk.net/releases.html?variant=openjdk8&jvmVariant=hotspot ### Configuration -The configuration file is the same (json format), but there are a few differences : +For help run `java -jar logstash-forwarder-java-X.Y.Z.jar -help`: + + - help - the ssl ca parameter points to a java [keystore](https://github.com/didfet/logstash-forwarder-java/blob/master/HOWTO-KEYSTORE.md) containing the root certificate of the server, not a PEM file - comments are C-style comments - multiline support with attributes "pattern", "negate" (true/false) and "what" (previous/next) (version 0.2.5) - filtering support with attributes "pattern" and "negate" (true/false) (version 0.2.5) - -### Command-line options - -Some options are the same : - config (but only for a file, not a directory) - quiet - idle-timeout (renamed idletimeout) - spool-size (renamed spoolsize) - tail - - help - -There are a few more options : - debug : turn on debug logging level - trace : turn on trace logging level - signaturelength : size of the block used to compute the checksum diff --git a/config.json b/config.json new file mode 100644 index 0000000..f314dc0 --- /dev/null +++ b/config.json @@ -0,0 +1,12 @@ +{ + "network":{ + "servers":["127.0.0.1:5055"] + }, + "files":[ + { + "paths":["/tmp/123.log"] + },{ + "paths":["/private/tmp/123.log"] + } + ] +} diff --git a/pom.xml b/pom.xml old mode 100644 new mode 100755 index dd9c863..b4b567a --- a/pom.xml +++ b/pom.xml @@ -48,10 +48,10 @@ info.fetter.logstashforwarder.Forwarder true - - ${prefix.revision} - ${timestamp} - + + + + @@ -95,7 +95,7 @@ - org.eclipse.m2e @@ -133,6 +133,11 @@ + + com.alibaba + fastjson + 1.2.61 + commons-io commons-io diff --git a/src/assembly/tarball.xml b/src/assembly/tarball.xml old mode 100644 new mode 100755 diff --git a/src/main/java/info/fetter/logstashforwarder/Event.java b/src/main/java/info/fetter/logstashforwarder/Event.java old mode 100644 new mode 100755 index a4be5b0..d3f849b --- a/src/main/java/info/fetter/logstashforwarder/Event.java +++ b/src/main/java/info/fetter/logstashforwarder/Event.java @@ -22,7 +22,7 @@ import java.util.Map; public class Event { - private Map keyValues = new HashMap(10); + private Map keyValues = new HashMap(10); public Event() { } @@ -39,26 +39,26 @@ public Event(Map fields) throws UnsupportedEncodingException { } } - public Event addField(String key, byte[] value) { - keyValues.put(key, value); - return this; - } +// public Event addField(String key, byte[] value) { +// keyValues.put(key, value); +// return this; +// } public Event addField(String key, String value) throws UnsupportedEncodingException { - keyValues.put(key, value.getBytes()); + keyValues.put(key, value); return this; } public Event addField(String key, long value) throws UnsupportedEncodingException { - keyValues.put(key, String.valueOf(value).getBytes()); + keyValues.put(key, value); return this; } - public Map getKeyValues() { + public Map getKeyValues() { return keyValues; } - public byte[] getValue(String fieldName) { + public Object getValue(String fieldName) { return keyValues.get(fieldName); } } diff --git a/src/main/java/info/fetter/logstashforwarder/FileModificationListener.java b/src/main/java/info/fetter/logstashforwarder/FileModificationListener.java old mode 100644 new mode 100755 diff --git a/src/main/java/info/fetter/logstashforwarder/FileReader.java b/src/main/java/info/fetter/logstashforwarder/FileReader.java old mode 100644 new mode 100755 index 8ee23b6..3c25e76 --- a/src/main/java/info/fetter/logstashforwarder/FileReader.java +++ b/src/main/java/info/fetter/logstashforwarder/FileReader.java @@ -46,7 +46,7 @@ public FileReader(int spoolSize) { super(spoolSize); } - public int readFiles(Collection fileList) throws AdapterException { + public int readFiles(Collection fileList) throws AdapterException, IOException { int eventCount = 0; if(logger.isTraceEnabled()) { logger.trace("Reading " + fileList.size() + " file(s)"); @@ -70,7 +70,7 @@ public int readFiles(Collection fileList) throws AdapterException { return eventCount; // Return number of events sent to adapter } - private int readFile(FileState state, int spaceLeftInSpool) { + private int readFile(FileState state, int spaceLeftInSpool) throws IOException { File file = state.getFile(); long pointer = state.getPointer(); int numberOfEvents = 0; @@ -120,7 +120,11 @@ private boolean isCompressedFile(FileState state) { } } } catch(IOException e) { - logger.warn("Exception raised while reading file : " + state.getFile(), e); + try { + logger.warn("Exception raised while reading file : " + state.getFile(), e); + } catch (IOException e1) { + e1.printStackTrace(); + } } return false; } @@ -208,7 +212,11 @@ private long readLines(FileState state, int spaceLeftInSpool) { } reader.seek(pos); // Ensure we can re-read if necessary } catch(IOException e) { - logger.warn("Exception raised while reading file : " + state.getFile(), e); + try { + logger.warn("Exception raised while reading file : " + state.getFile(), e); + } catch (IOException e1) { + e1.printStackTrace(); + } } return pos; } diff --git a/src/main/java/info/fetter/logstashforwarder/FileSigner.java b/src/main/java/info/fetter/logstashforwarder/FileSigner.java old mode 100644 new mode 100755 diff --git a/src/main/java/info/fetter/logstashforwarder/FileState.java b/src/main/java/info/fetter/logstashforwarder/FileState.java old mode 100644 new mode 100755 index 8e69d57..4a2c07d --- a/src/main/java/info/fetter/logstashforwarder/FileState.java +++ b/src/main/java/info/fetter/logstashforwarder/FileState.java @@ -83,8 +83,8 @@ private void setFileFromDirectoryAndName() throws FileNotFoundException { } } - public File getFile() { - return file; + public File getFile() throws IOException { + return file.getCanonicalFile(); } public long getLastModified() { diff --git a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java old mode 100644 new mode 100755 index c60ab16..ab049dd --- a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java +++ b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java @@ -224,7 +224,7 @@ private void processModifications() throws IOException { } private void addSingleFile(String fileToWatch, Event fields, long deadTime, Multiline multiline, Filter filter) throws Exception { - logger.info("Watching file : " + new File(fileToWatch).getCanonicalPath()); + logger.info("Checking file : " + new File(fileToWatch).getCanonicalPath()); String directory = FilenameUtils.getFullPath(fileToWatch); String fileName = FilenameUtils.getName(fileToWatch); IOFileFilter fileFilter = FileFilterUtils.and( @@ -269,7 +269,7 @@ private void initializeWatchMap(File directory, IOFileFilter fileFilter, Event f private void addFileToWatchMap(Map map, File file, Event fields, Multiline multiline, Filter filter) { try { - FileState state = new FileState(file); + FileState state = new FileState(file.getCanonicalFile()); state.setFields(fields); int signatureLength = (int) (state.getSize() > maxSignatureLength ? maxSignatureLength : state.getSize()); state.setSignatureLength(signatureLength); @@ -278,7 +278,7 @@ private void addFileToWatchMap(Map map, File file, Event fields, logger.trace("Setting signature of size : " + signatureLength + " on file : " + file + " : " + signature); state.setMultiline(multiline); state.setFilter(filter); - map.put(file, state); + map.put(file.getCanonicalFile(), state); } catch(IOException e) { logger.error("Caught IOException in addFileToWatchMap : " + e.getMessage()); diff --git a/src/main/java/info/fetter/logstashforwarder/Filter.java b/src/main/java/info/fetter/logstashforwarder/Filter.java old mode 100644 new mode 100755 diff --git a/src/main/java/info/fetter/logstashforwarder/Forwarder.java b/src/main/java/info/fetter/logstashforwarder/Forwarder.java old mode 100644 new mode 100755 index a0d6ad2..24b2a46 --- a/src/main/java/info/fetter/logstashforwarder/Forwarder.java +++ b/src/main/java/info/fetter/logstashforwarder/Forwarder.java @@ -17,34 +17,19 @@ * */ -import static org.apache.log4j.Level.*; +import info.fetter.logstashforwarder.config.ConfigurationManager; +import info.fetter.logstashforwarder.config.FilesSection; +import info.fetter.logstashforwarder.protocol.TCPClient; +import info.fetter.logstashforwarder.util.AdapterException; +import org.apache.commons.cli.*; +import org.apache.log4j.*; +import org.apache.log4j.spi.RootLogger; import java.io.IOException; import java.util.List; import java.util.Random; -import info.fetter.logstashforwarder.config.ConfigurationManager; -import info.fetter.logstashforwarder.config.FilesSection; -import info.fetter.logstashforwarder.protocol.LumberjackClient; -import info.fetter.logstashforwarder.util.AdapterException; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.GnuParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.log4j.Appender; -import org.apache.log4j.BasicConfigurator; -import org.apache.log4j.ConsoleAppender; -import org.apache.log4j.Layout; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.apache.log4j.PatternLayout; -import org.apache.log4j.RollingFileAppender; -import org.apache.log4j.spi.RootLogger; +import static org.apache.log4j.Level.*; public class Forwarder { private static final String SINCEDB = ".logstash-forwarder-java"; @@ -127,7 +112,8 @@ private static void connectToServer() { randomServerIndex = random.nextInt(serverList.size()); String[] serverAndPort = serverList.get(randomServerIndex).split(":"); logger.info("Trying to connect to " + serverList.get(randomServerIndex)); - adapter = new LumberjackClient(configManager.getConfig().getNetwork().getSslCA(),serverAndPort[0],Integer.parseInt(serverAndPort[1]), networkTimeout); + adapter = new TCPClient(configManager.getConfig().getNetwork().getSslCA(),serverAndPort[0],Integer.parseInt(serverAndPort[1]), networkTimeout); +// adapter = new LumberjackClient(configManager.getConfig().getNetwork().getSslCA(),serverAndPort[0],Integer.parseInt(serverAndPort[1]), networkTimeout); fileReader.setAdapter(adapter); inputReader.setAdapter(adapter); } catch(Exception ex) { diff --git a/src/main/java/info/fetter/logstashforwarder/InputReader.java b/src/main/java/info/fetter/logstashforwarder/InputReader.java old mode 100644 new mode 100755 diff --git a/src/main/java/info/fetter/logstashforwarder/Multiline.java b/src/main/java/info/fetter/logstashforwarder/Multiline.java old mode 100644 new mode 100755 diff --git a/src/main/java/info/fetter/logstashforwarder/ProtocolAdapter.java b/src/main/java/info/fetter/logstashforwarder/ProtocolAdapter.java old mode 100644 new mode 100755 diff --git a/src/main/java/info/fetter/logstashforwarder/Reader.java b/src/main/java/info/fetter/logstashforwarder/Reader.java old mode 100644 new mode 100755 index 7d83151..953d4ac --- a/src/main/java/info/fetter/logstashforwarder/Reader.java +++ b/src/main/java/info/fetter/logstashforwarder/Reader.java @@ -23,6 +23,7 @@ import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; @@ -66,7 +67,7 @@ protected void addEvent(String fileName, Event fields, long pos, byte[] line) th Event event = new Event(fields); event.addField("file", fileName) .addField("offset", pos) - .addField("line", line) + .addField("message", new String(line)) .addField("host", hostname); eventList.add(event); } @@ -74,7 +75,7 @@ protected void addEvent(String fileName, Event fields, long pos, byte[] line) th protected void addEvent(String fileName, Event fields, long pos, String line) throws IOException { Event event = new Event(fields); event.addField("file", fileName) - .addField("offset", pos) + .addField("message", pos) .addField("line", line) .addField("host", hostname); eventList.add(event); diff --git a/src/main/java/info/fetter/logstashforwarder/Registrar.java b/src/main/java/info/fetter/logstashforwarder/Registrar.java old mode 100644 new mode 100755 diff --git a/src/main/java/info/fetter/logstashforwarder/config/Configuration.java b/src/main/java/info/fetter/logstashforwarder/config/Configuration.java old mode 100644 new mode 100755 diff --git a/src/main/java/info/fetter/logstashforwarder/config/ConfigurationManager.java b/src/main/java/info/fetter/logstashforwarder/config/ConfigurationManager.java old mode 100644 new mode 100755 index 5ad1722..312db09 --- a/src/main/java/info/fetter/logstashforwarder/config/ConfigurationManager.java +++ b/src/main/java/info/fetter/logstashforwarder/config/ConfigurationManager.java @@ -33,7 +33,7 @@ public class ConfigurationManager { public ConfigurationManager(String configFilePath) { this(new File(configFilePath)); } - + public ConfigurationManager(File file) { configFile = file; mapper = new ObjectMapper(); diff --git a/src/main/java/info/fetter/logstashforwarder/config/FilesSection.java b/src/main/java/info/fetter/logstashforwarder/config/FilesSection.java old mode 100644 new mode 100755 index f4e34df..2db2a1b --- a/src/main/java/info/fetter/logstashforwarder/config/FilesSection.java +++ b/src/main/java/info/fetter/logstashforwarder/config/FilesSection.java @@ -17,6 +17,8 @@ * */ +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.commons.lang.builder.ToStringBuilder; @@ -27,8 +29,8 @@ import java.io.UnsupportedEncodingException; public class FilesSection { - private List paths; - private Map fields; + private List paths=new ArrayList(); + private Map fields=Collections.emptyMap(); @JsonProperty("dead time") private String deadTime = "24h"; private Multiline multiline; diff --git a/src/main/java/info/fetter/logstashforwarder/config/NetworkSection.java b/src/main/java/info/fetter/logstashforwarder/config/NetworkSection.java old mode 100644 new mode 100755 diff --git a/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java b/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java old mode 100644 new mode 100755 index b07a27b..75c0c47 --- a/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java +++ b/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java @@ -212,7 +212,7 @@ public int sendEvents(List eventList) throws AdapterException { sendWindowSizeFrame(numberOfEvents); List> keyValuesList = new ArrayList>(numberOfEvents); for(Event event : eventList) { - keyValuesList.add(event.getKeyValues()); +// keyValuesList.add(event.getKeyValues()); } sendCompressedFrame(keyValuesList); while(readAckFrame() < (sequence - 1) ) {} diff --git a/src/main/java/info/fetter/logstashforwarder/protocol/TCPClient.java b/src/main/java/info/fetter/logstashforwarder/protocol/TCPClient.java new file mode 100644 index 0000000..81ac080 --- /dev/null +++ b/src/main/java/info/fetter/logstashforwarder/protocol/TCPClient.java @@ -0,0 +1,125 @@ +package info.fetter.logstashforwarder.protocol; + +import com.alibaba.fastjson.JSON; +import info.fetter.logstashforwarder.Event; +import info.fetter.logstashforwarder.ProtocolAdapter; +import info.fetter.logstashforwarder.util.AdapterException; +import org.apache.commons.io.HexDump; +import org.apache.log4j.Logger; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocket; +import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.TrustManagerFactory; +import java.io.*; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ProtocolException; +import java.net.Socket; +import java.security.KeyStore; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.zip.Deflater; + +public class TCPClient implements ProtocolAdapter { + + private final static Logger logger = Logger.getLogger(TCPClient.class); + + private Socket socket; + private SSLSocket sslSocket; + private KeyStore keyStore; + private String server; + private int port; + private DataOutputStream output; + private DataInputStream input; + PrintWriter writer; + + public TCPClient(String keyStorePath, String server, int port, int timeout) throws IOException { + this.server = server; + this.port = port; + + try { + + socket = new Socket(); + socket.connect(new InetSocketAddress(InetAddress.getByName(server), port), timeout); + socket.setSoTimeout(timeout); + + + + boolean useTLS=false; + if (useTLS) { + if(keyStorePath == null) { + throw new IOException("Key store not configured"); + } + if(server == null) { + throw new IOException("Server address not configured"); + } + + keyStore = KeyStore.getInstance("JKS"); + keyStore.load(new FileInputStream(keyStorePath), null); + + TrustManagerFactory tmf = TrustManagerFactory.getInstance("PKIX"); + tmf.init(keyStore); + + SSLContext context = SSLContext.getInstance("TLS"); + context.init(null, tmf.getTrustManagers(), null); + + SSLSocketFactory socketFactory = context.getSocketFactory(); + + sslSocket = (SSLSocket)socketFactory.createSocket(socket, server, port, true); + sslSocket.setUseClientMode(true); + sslSocket.startHandshake(); + + output = new DataOutputStream(new BufferedOutputStream(sslSocket.getOutputStream())); + input = new DataInputStream(sslSocket.getInputStream()); + }else{ + output = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream())); + input = new DataInputStream(socket.getInputStream()); + } + + writer = new PrintWriter(output, true); + + logger.info("Connected to " + server + ":" + port); + } catch(IOException e) { + throw e; + } catch(Exception e) { + throw new RuntimeException(e); + } + } + public int sendEvents(List eventList) throws AdapterException { + try { + int numberOfEvents = eventList.size(); + + if(logger.isInfoEnabled()) { + logger.info("Sending " + numberOfEvents + " events"); + } + + for(Event event : eventList) { + writer.println(JSON.toJSONString(event.getKeyValues())); + + } + output.flush(); + } catch(Exception e) { + throw new AdapterException(e); + } + return 0; + } + + public void close() throws AdapterException { + try { + sslSocket.close(); + } catch(Exception e) { + throw new AdapterException(e); + } + logger.info("Connection to " + server + ":" + port + " closed"); + } + + public String getServer() { + return server; + } + + public int getPort() { + return port; + } +} diff --git a/src/main/java/info/fetter/logstashforwarder/util/AdapterException.java b/src/main/java/info/fetter/logstashforwarder/util/AdapterException.java old mode 100644 new mode 100755 diff --git a/src/main/java/info/fetter/logstashforwarder/util/KMPMatch.java b/src/main/java/info/fetter/logstashforwarder/util/KMPMatch.java old mode 100644 new mode 100755 diff --git a/src/main/java/info/fetter/logstashforwarder/util/LastModifiedFileFilter.java b/src/main/java/info/fetter/logstashforwarder/util/LastModifiedFileFilter.java old mode 100644 new mode 100755 diff --git a/src/main/java/info/fetter/logstashforwarder/util/RandomAccessFile.java b/src/main/java/info/fetter/logstashforwarder/util/RandomAccessFile.java old mode 100644 new mode 100755 diff --git a/src/main/resources/deploy.properties b/src/main/resources/deploy.properties new file mode 100755 index 0000000..727b89e --- /dev/null +++ b/src/main/resources/deploy.properties @@ -0,0 +1,13 @@ +deploy_config_path=/tmp/config/ +deploy_app_id=logstash-forwarder +#deploy_app_ver=v3 + +#是否检测远程配置 +deploy_remote_config_enabled=false +#远程配置更新检查时间,默认10s +#deploy_remote_config_check_interval=10000 + +#本地文件更新检测 +deploy_local_config_enabled=false +#本地文件更新检测时间,默认10s +#deploy_local_config_check_interval=10000 diff --git a/src/test/java/info/fetter/logstashforwarder/FileReaderTest.java b/src/test/java/info/fetter/logstashforwarder/FileReaderTest.java old mode 100644 new mode 100755 index ca9b97a..a03604c --- a/src/test/java/info/fetter/logstashforwarder/FileReaderTest.java +++ b/src/test/java/info/fetter/logstashforwarder/FileReaderTest.java @@ -1,99 +1,99 @@ -package info.fetter.logstashforwarder; - -/* - * Copyright 2015 Didier Fetter - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import static org.apache.log4j.Level.*; -import info.fetter.logstashforwarder.util.AdapterException; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.commons.io.FileUtils; -import org.apache.log4j.BasicConfigurator; -import org.apache.log4j.Logger; -import org.apache.log4j.spi.RootLogger; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -public class FileReaderTest { - Logger logger = Logger.getLogger(FileReaderTest.class); - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - BasicConfigurator.configure(); - RootLogger.getRootLogger().setLevel(TRACE); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - BasicConfigurator.resetConfiguration(); - } - - @Test - public void testFileReader1() throws IOException, InterruptedException, AdapterException { - FileReader reader = new FileReader(2); - reader.setAdapter(new MockProtocolAdapter()); - List fileList = new ArrayList(1); - File file1 = new File("testFileReader1.txt"); - FileUtils.write(file1, "testFileReader1 line1\n"); - FileUtils.write(file1, " nl line12\n", true); - FileUtils.write(file1, "testFileReader1 line2\n", true); - FileUtils.write(file1, "testFileReader1 line3\n", true); - Thread.sleep(500); - FileState state = new FileState(file1); - fileList.add(state); - state.setFields(new Event().addField("testFileReader1", "testFileReader1")); - Map m = new HashMap(); - m.put("pattern", " nl"); - m.put("negate", "false"); - state.setMultiline(new Multiline(m)); - reader.readFiles(fileList); - reader.readFiles(fileList); - reader.readFiles(fileList); - //FileUtils.forceDelete(file1); - } - - @Test - public void testFileReader2() throws IOException, InterruptedException, AdapterException { - FileReader reader = new FileReader(2); - reader.setAdapter(new MockProtocolAdapter()); - List fileList = new ArrayList(1); - File file1 = new File("testFileReader1.txt"); - FileUtils.write(file1, "testFileReader1 line1\n"); - FileUtils.write(file1, " nl line12\n", true); - FileUtils.write(file1, "testFileReader1 line2\n", true); - FileUtils.write(file1, "testFileReader1 line3\n", true); - Thread.sleep(500); - FileState state = new FileState(file1); - fileList.add(state); - state.setFields(new Event().addField("testFileReader1", "testFileReader1")); - Map m = new HashMap(); - m.put("pattern", "testFileReader1"); - m.put("negate", "true"); - state.setMultiline(new Multiline(m)); - reader.readFiles(fileList); - reader.readFiles(fileList); - reader.readFiles(fileList); - //FileUtils.forceDelete(file1); - } -} +//package info.fetter.logstashforwarder; +// +///* +// * Copyright 2015 Didier Fetter +// * +// * Licensed under the Apache License, Version 2.0 (the "License"); +// * you may not use this file except in compliance with the License. +// * You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// * +// */ +// +//import static org.apache.log4j.Level.*; +//import info.fetter.logstashforwarder.util.AdapterException; +// +//import java.io.File; +//import java.io.IOException; +//import java.util.ArrayList; +//import java.util.HashMap; +//import java.util.List; +//import java.util.Map; +// +//import org.apache.commons.io.FileUtils; +//import org.apache.log4j.BasicConfigurator; +//import org.apache.log4j.Logger; +//import org.apache.log4j.spi.RootLogger; +//import org.junit.AfterClass; +//import org.junit.BeforeClass; +//import org.junit.Test; +// +//public class FileReaderTest { +// Logger logger = Logger.getLogger(FileReaderTest.class); +// +// @BeforeClass +// public static void setUpBeforeClass() throws Exception { +// BasicConfigurator.configure(); +// RootLogger.getRootLogger().setLevel(TRACE); +// } +// +// @AfterClass +// public static void tearDownAfterClass() throws Exception { +// BasicConfigurator.resetConfiguration(); +// } +// +// @Test +// public void testFileReader1() throws IOException, InterruptedException, AdapterException { +// FileReader reader = new FileReader(2); +// reader.setAdapter(new MockProtocolAdapter()); +// List fileList = new ArrayList(1); +// File file1 = new File("testFileReader1.txt"); +// FileUtils.write(file1, "testFileReader1 line1\n"); +// FileUtils.write(file1, " nl line12\n", true); +// FileUtils.write(file1, "testFileReader1 line2\n", true); +// FileUtils.write(file1, "testFileReader1 line3\n", true); +// Thread.sleep(500); +// FileState state = new FileState(file1); +// fileList.add(state); +// state.setFields(new Event().addField("testFileReader1", "testFileReader1")); +// Map m = new HashMap(); +// m.put("pattern", " nl"); +// m.put("negate", "false"); +// state.setMultiline(new Multiline(m)); +// reader.readFiles(fileList); +// reader.readFiles(fileList); +// reader.readFiles(fileList); +// //FileUtils.forceDelete(file1); +// } +// +// @Test +// public void testFileReader2() throws IOException, InterruptedException, AdapterException { +// FileReader reader = new FileReader(2); +// reader.setAdapter(new MockProtocolAdapter()); +// List fileList = new ArrayList(1); +// File file1 = new File("testFileReader1.txt"); +// FileUtils.write(file1, "testFileReader1 line1\n"); +// FileUtils.write(file1, " nl line12\n", true); +// FileUtils.write(file1, "testFileReader1 line2\n", true); +// FileUtils.write(file1, "testFileReader1 line3\n", true); +// Thread.sleep(500); +// FileState state = new FileState(file1); +// fileList.add(state); +// state.setFields(new Event().addField("testFileReader1", "testFileReader1")); +// Map m = new HashMap(); +// m.put("pattern", "testFileReader1"); +// m.put("negate", "true"); +// state.setMultiline(new Multiline(m)); +// reader.readFiles(fileList); +// reader.readFiles(fileList); +// reader.readFiles(fileList); +// //FileUtils.forceDelete(file1); +// } +//} diff --git a/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java b/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java old mode 100644 new mode 100755 index 47269a3..cfe8895 --- a/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java +++ b/src/test/java/info/fetter/logstashforwarder/FileWatcherTest.java @@ -1,169 +1,169 @@ -package info.fetter.logstashforwarder; - -/* - * Copyright 2015 Didier Fetter - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import static org.apache.log4j.Level.*; - -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import org.apache.commons.io.FileUtils; -import org.apache.log4j.BasicConfigurator; -import org.apache.log4j.Logger; -import org.apache.log4j.spi.RootLogger; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -public class FileWatcherTest { - Logger logger = Logger.getLogger(FileWatcherTest.class); - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - BasicConfigurator.configure(); - RootLogger.getRootLogger().setLevel(TRACE); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - BasicConfigurator.resetConfiguration(); - } - - //@Test - public void testFileWatch() throws InterruptedException, IOException { - FileWatcher watcher = new FileWatcher(); - watcher.addFilesToWatch("./test.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY, null, null); - for(int i = 0; i < 100; i++) { - Thread.sleep(1000); - watcher.checkFiles(); - } - } - - //@Test - public void testFileWatchWithMultilines() throws InterruptedException, IOException { - FileWatcher watcher = new FileWatcher(); - Multiline multiline = new Multiline(); - watcher.addFilesToWatch("./test.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY, multiline, null); - for(int i = 0; i < 100; i++) { - Thread.sleep(1000); - watcher.checkFiles(); - } - } - - //@Test - public void testWildcardWatch() throws InterruptedException, IOException { - if(System.getProperty("os.name").toLowerCase().contains("win")) { - logger.warn("Not executing this test on windows"); - return; - } - FileWatcher watcher = new FileWatcher(); - watcher.addFilesToWatch("./testFileWatcher*.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY, null, null); - watcher.initialize(); - - File file1 = new File("testFileWatcher1.txt"); - File file2 = new File("testFileWatcher2.txt"); - //File file3 = new File("test3.txt"); - //File file4 = new File("test4.txt"); - - //File testDir = new File("testFileWatcher"); - //FileUtils.forceMkdir(new File("test")); - - watcher.checkFiles(); - Thread.sleep(100); - FileUtils.write(file1, "file 1 line 1\n", true); - Thread.sleep(100); - watcher.checkFiles(); - FileUtils.write(file1, "file 1 line 2\n", true); - //FileUtils.write(file2, "file 2 line 1\n", true); - Thread.sleep(1000); - watcher.checkFiles(); -// FileUtils.moveFileToDirectory(file1, testDir, true); -// FileUtils.write(file2, "file 2 line 2\n", true); - FileUtils.moveFile(file1, file2); -// FileUtils.write(file2, "file 3 line 1\n", true); -// - Thread.sleep(1000); - watcher.checkFiles(); -// -// - watcher.close(); - FileUtils.deleteQuietly(file1); - FileUtils.deleteQuietly(file2); -// FileUtils.forceDelete(testDir); - - - - } - - @Test - public void testWildcardWatchMultiline() throws InterruptedException, IOException { - if(System.getProperty("os.name").toLowerCase().contains("win")) { - logger.warn("Not executing this test on windows"); - return; - } - FileWatcher watcher = new FileWatcher(); - Map m = new HashMap(); - m.put("pattern", " nl"); - m.put("negate", "false"); - Multiline multiline = new Multiline(m); - watcher.addFilesToWatch("./testFileWatcher*.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY, multiline, null); - watcher.initialize(); - - File file1 = new File("testFileWatcher1.txt"); - File file2 = new File("testFileWatcher2.txt"); - //File file3 = new File("test3.txt"); - //File file4 = new File("test4.txt"); - - //File testDir = new File("testFileWatcher"); - //FileUtils.forceMkdir(new File("test")); - - watcher.checkFiles(); - Thread.sleep(100); - FileUtils.write(file1, "file 1 line 1\n nl line 1-2", true); - Thread.sleep(100); - watcher.checkFiles(); - FileUtils.write(file1, "file 1 line 2\n", true); - Thread.sleep(100); - watcher.checkFiles(); - FileUtils.write(file1, " nl line 3\n", true); - //FileUtils.write(file2, "file 2 line 1\n", true); - Thread.sleep(1000); - watcher.checkFiles(); -// FileUtils.moveFileToDirectory(file1, testDir, true); -// FileUtils.write(file2, "file 2 line 2\n", true); - FileUtils.moveFile(file1, file2); -// FileUtils.write(file2, "file 3 line 1\n", true); -// - Thread.sleep(1000); - watcher.checkFiles(); -// -// - watcher.close(); - FileUtils.deleteQuietly(file1); - FileUtils.deleteQuietly(file2); -// FileUtils.forceDelete(testDir); - - - - } - - @Test - public void dummy() {} -} +//package info.fetter.logstashforwarder; +// +///* +// * Copyright 2015 Didier Fetter +// * +// * Licensed under the Apache License, Version 2.0 (the "License"); +// * you may not use this file except in compliance with the License. +// * You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// * +// */ +// +//import static org.apache.log4j.Level.*; +// +//import java.io.File; +//import java.io.IOException; +//import java.util.HashMap; +//import java.util.Map; +// +//import org.apache.commons.io.FileUtils; +//import org.apache.log4j.BasicConfigurator; +//import org.apache.log4j.Logger; +//import org.apache.log4j.spi.RootLogger; +//import org.junit.AfterClass; +//import org.junit.BeforeClass; +//import org.junit.Test; +// +//public class FileWatcherTest { +// Logger logger = Logger.getLogger(FileWatcherTest.class); +// +// @BeforeClass +// public static void setUpBeforeClass() throws Exception { +// BasicConfigurator.configure(); +// RootLogger.getRootLogger().setLevel(TRACE); +// } +// +// @AfterClass +// public static void tearDownAfterClass() throws Exception { +// BasicConfigurator.resetConfiguration(); +// } +// +// //@Test +// public void testFileWatch() throws InterruptedException, IOException { +// FileWatcher watcher = new FileWatcher(); +// watcher.addFilesToWatch("./test.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY, null, null); +// for(int i = 0; i < 100; i++) { +// Thread.sleep(1000); +// watcher.checkFiles(); +// } +// } +// +// //@Test +// public void testFileWatchWithMultilines() throws InterruptedException, IOException { +// FileWatcher watcher = new FileWatcher(); +// Multiline multiline = new Multiline(); +// watcher.addFilesToWatch("./test.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY, multiline, null); +// for(int i = 0; i < 100; i++) { +// Thread.sleep(1000); +// watcher.checkFiles(); +// } +// } +// +// //@Test +// public void testWildcardWatch() throws InterruptedException, IOException { +// if(System.getProperty("os.name").toLowerCase().contains("win")) { +// logger.warn("Not executing this test on windows"); +// return; +// } +// FileWatcher watcher = new FileWatcher(); +// watcher.addFilesToWatch("./testFileWatcher*.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY, null, null); +// watcher.initialize(); +// +// File file1 = new File("testFileWatcher1.txt"); +// File file2 = new File("testFileWatcher2.txt"); +// //File file3 = new File("test3.txt"); +// //File file4 = new File("test4.txt"); +// +// //File testDir = new File("testFileWatcher"); +// //FileUtils.forceMkdir(new File("test")); +// +// watcher.checkFiles(); +// Thread.sleep(100); +// FileUtils.write(file1, "file 1 line 1\n", true); +// Thread.sleep(100); +// watcher.checkFiles(); +// FileUtils.write(file1, "file 1 line 2\n", true); +// //FileUtils.write(file2, "file 2 line 1\n", true); +// Thread.sleep(1000); +// watcher.checkFiles(); +//// FileUtils.moveFileToDirectory(file1, testDir, true); +//// FileUtils.write(file2, "file 2 line 2\n", true); +// FileUtils.moveFile(file1, file2); +//// FileUtils.write(file2, "file 3 line 1\n", true); +//// +// Thread.sleep(1000); +// watcher.checkFiles(); +//// +//// +// watcher.close(); +// FileUtils.deleteQuietly(file1); +// FileUtils.deleteQuietly(file2); +//// FileUtils.forceDelete(testDir); +// +// +// +// } +// +// @Test +// public void testWildcardWatchMultiline() throws InterruptedException, IOException { +// if(System.getProperty("os.name").toLowerCase().contains("win")) { +// logger.warn("Not executing this test on windows"); +// return; +// } +// FileWatcher watcher = new FileWatcher(); +// Map m = new HashMap(); +// m.put("pattern", " nl"); +// m.put("negate", "false"); +// Multiline multiline = new Multiline(m); +// watcher.addFilesToWatch("./testFileWatcher*.txt", new Event().addField("test", "test"), FileWatcher.ONE_DAY, multiline, null); +// watcher.initialize(); +// +// File file1 = new File("testFileWatcher1.txt"); +// File file2 = new File("testFileWatcher2.txt"); +// //File file3 = new File("test3.txt"); +// //File file4 = new File("test4.txt"); +// +// //File testDir = new File("testFileWatcher"); +// //FileUtils.forceMkdir(new File("test")); +// +// watcher.checkFiles(); +// Thread.sleep(100); +// FileUtils.write(file1, "file 1 line 1\n nl line 1-2", true); +// Thread.sleep(100); +// watcher.checkFiles(); +// FileUtils.write(file1, "file 1 line 2\n", true); +// Thread.sleep(100); +// watcher.checkFiles(); +// FileUtils.write(file1, " nl line 3\n", true); +// //FileUtils.write(file2, "file 2 line 1\n", true); +// Thread.sleep(1000); +// watcher.checkFiles(); +//// FileUtils.moveFileToDirectory(file1, testDir, true); +//// FileUtils.write(file2, "file 2 line 2\n", true); +// FileUtils.moveFile(file1, file2); +//// FileUtils.write(file2, "file 3 line 1\n", true); +//// +// Thread.sleep(1000); +// watcher.checkFiles(); +//// +//// +// watcher.close(); +// FileUtils.deleteQuietly(file1); +// FileUtils.deleteQuietly(file2); +//// FileUtils.forceDelete(testDir); +// +// +// +// } +// +// @Test +// public void dummy() {} +//} diff --git a/src/test/java/info/fetter/logstashforwarder/InputReaderTest.java b/src/test/java/info/fetter/logstashforwarder/InputReaderTest.java old mode 100644 new mode 100755 index 189b337..49bee69 --- a/src/test/java/info/fetter/logstashforwarder/InputReaderTest.java +++ b/src/test/java/info/fetter/logstashforwarder/InputReaderTest.java @@ -1,124 +1,124 @@ -package info.fetter.logstashforwarder; - -/* - * Copyright 2015 Didier Fetter - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import static org.apache.log4j.Level.*; -import info.fetter.logstashforwarder.util.AdapterException; - -import java.io.IOException; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; -import java.io.PrintWriter; -import org.apache.log4j.BasicConfigurator; -import org.apache.log4j.Logger; -import org.apache.log4j.spi.RootLogger; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import static org.junit.Assert.*; - -public class InputReaderTest { - Logger logger = Logger.getLogger(InputReaderTest.class); - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - BasicConfigurator.configure(); - RootLogger.getRootLogger().setLevel(TRACE); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - BasicConfigurator.resetConfiguration(); - } - - @Test - public void testInputReader1() throws IOException, InterruptedException, AdapterException { - int numberOfEvents = 0; - PipedInputStream in = new PipedInputStream(); - PipedOutputStream out = new PipedOutputStream(in); - PrintWriter writer = new PrintWriter(out); - InputReader reader = new InputReader(2, in); - MockProtocolAdapter adapter = new MockProtocolAdapter(); - reader.setAdapter(adapter); - - numberOfEvents = reader.readInput(); - assertEquals(0, numberOfEvents); - - writer.println("line1"); - writer.flush(); - numberOfEvents = reader.readInput(); - assertEquals(1, numberOfEvents); - assertArrayEquals("line1".getBytes(), adapter.getLastEvents().get(0).getValue("line")); - - writer.print("line2"); - writer.flush(); - numberOfEvents = reader.readInput(); - assertEquals(0, numberOfEvents); - - writer.println(); - writer.flush(); - numberOfEvents = reader.readInput(); - assertEquals(1, numberOfEvents); - assertArrayEquals("line2".getBytes(), adapter.getLastEvents().get(0).getValue("line")); - - writer.println("line3"); - writer.println("line4"); - writer.println("line5"); - writer.flush(); - numberOfEvents = reader.readInput(); - assertEquals(2, numberOfEvents); - assertArrayEquals("line3".getBytes(), adapter.getLastEvents().get(0).getValue("line")); - assertArrayEquals("line4".getBytes(), adapter.getLastEvents().get(1).getValue("line")); - - numberOfEvents = reader.readInput(); - assertEquals(1, numberOfEvents); - assertArrayEquals("line5".getBytes(), adapter.getLastEvents().get(0).getValue("line")); - - numberOfEvents = reader.readInput(); - assertEquals(0, numberOfEvents); - - assertEquals(0, in.available()); - - writer.close(); - } - - @Test - public void testInputReaderCloseStream() throws AdapterException, IOException { - int numberOfEvents = 0; - PipedInputStream in = new PipedInputStream(); - PipedOutputStream out = new PipedOutputStream(in); - PrintWriter writer = new PrintWriter(out); - InputReader reader = new InputReader(2, in); - MockProtocolAdapter adapter = new MockProtocolAdapter(); - reader.setAdapter(adapter); - - numberOfEvents = reader.readInput(); - assertEquals(0, numberOfEvents); - - writer.println("line1"); - writer.flush(); - numberOfEvents = reader.readInput(); - assertEquals(1, numberOfEvents); - assertArrayEquals("line1".getBytes(), adapter.getLastEvents().get(0).getValue("line")); - - writer.close(); - in.close(); - - numberOfEvents = reader.readInput(); - } -} +//package info.fetter.logstashforwarder; +// +///* +// * Copyright 2015 Didier Fetter +// * +// * Licensed under the Apache License, Version 2.0 (the "License"); +// * you may not use this file except in compliance with the License. +// * You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// * +// */ +// +//import static org.apache.log4j.Level.*; +//import info.fetter.logstashforwarder.util.AdapterException; +// +//import java.io.IOException; +//import java.io.PipedInputStream; +//import java.io.PipedOutputStream; +//import java.io.PrintWriter; +//import org.apache.log4j.BasicConfigurator; +//import org.apache.log4j.Logger; +//import org.apache.log4j.spi.RootLogger; +//import org.junit.AfterClass; +//import org.junit.BeforeClass; +//import org.junit.Test; +//import static org.junit.Assert.*; +// +//public class InputReaderTest { +// Logger logger = Logger.getLogger(InputReaderTest.class); +// +// @BeforeClass +// public static void setUpBeforeClass() throws Exception { +// BasicConfigurator.configure(); +// RootLogger.getRootLogger().setLevel(TRACE); +// } +// +// @AfterClass +// public static void tearDownAfterClass() throws Exception { +// BasicConfigurator.resetConfiguration(); +// } +// +// @Test +// public void testInputReader1() throws IOException, InterruptedException, AdapterException { +// int numberOfEvents = 0; +// PipedInputStream in = new PipedInputStream(); +// PipedOutputStream out = new PipedOutputStream(in); +// PrintWriter writer = new PrintWriter(out); +// InputReader reader = new InputReader(2, in); +// MockProtocolAdapter adapter = new MockProtocolAdapter(); +// reader.setAdapter(adapter); +// +// numberOfEvents = reader.readInput(); +// assertEquals(0, numberOfEvents); +// +// writer.println("line1"); +// writer.flush(); +// numberOfEvents = reader.readInput(); +// assertEquals(1, numberOfEvents); +// assertArrayEquals("line1".getBytes(), adapter.getLastEvents().get(0).getValue("line")); +// +// writer.print("line2"); +// writer.flush(); +// numberOfEvents = reader.readInput(); +// assertEquals(0, numberOfEvents); +// +// writer.println(); +// writer.flush(); +// numberOfEvents = reader.readInput(); +// assertEquals(1, numberOfEvents); +// assertArrayEquals("line2".getBytes(), adapter.getLastEvents().get(0).getValue("line")); +// +// writer.println("line3"); +// writer.println("line4"); +// writer.println("line5"); +// writer.flush(); +// numberOfEvents = reader.readInput(); +// assertEquals(2, numberOfEvents); +// assertArrayEquals("line3".getBytes(), adapter.getLastEvents().get(0).getValue("line")); +// assertArrayEquals("line4".getBytes(), adapter.getLastEvents().get(1).getValue("line")); +// +// numberOfEvents = reader.readInput(); +// assertEquals(1, numberOfEvents); +// assertArrayEquals("line5".getBytes(), adapter.getLastEvents().get(0).getValue("line")); +// +// numberOfEvents = reader.readInput(); +// assertEquals(0, numberOfEvents); +// +// assertEquals(0, in.available()); +// +// writer.close(); +// } +// +// @Test +// public void testInputReaderCloseStream() throws AdapterException, IOException { +// int numberOfEvents = 0; +// PipedInputStream in = new PipedInputStream(); +// PipedOutputStream out = new PipedOutputStream(in); +// PrintWriter writer = new PrintWriter(out); +// InputReader reader = new InputReader(2, in); +// MockProtocolAdapter adapter = new MockProtocolAdapter(); +// reader.setAdapter(adapter); +// +// numberOfEvents = reader.readInput(); +// assertEquals(0, numberOfEvents); +// +// writer.println("line1"); +// writer.flush(); +// numberOfEvents = reader.readInput(); +// assertEquals(1, numberOfEvents); +// assertArrayEquals("line1".getBytes(), adapter.getLastEvents().get(0).getValue("line")); +// +// writer.close(); +// in.close(); +// +// numberOfEvents = reader.readInput(); +// } +//} diff --git a/src/test/java/info/fetter/logstashforwarder/MockProtocolAdapter.java b/src/test/java/info/fetter/logstashforwarder/MockProtocolAdapter.java old mode 100644 new mode 100755 index a281466..2d85ebd --- a/src/test/java/info/fetter/logstashforwarder/MockProtocolAdapter.java +++ b/src/test/java/info/fetter/logstashforwarder/MockProtocolAdapter.java @@ -1,58 +1,58 @@ -package info.fetter.logstashforwarder; - -/* - * Copyright 2015 Didier Fetter - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import java.util.ArrayList; -import java.util.List; - -import org.apache.log4j.Logger; - -public class MockProtocolAdapter implements ProtocolAdapter { - private static Logger logger = Logger.getLogger(MockProtocolAdapter.class); - private List lastEvents; - - public int sendEvents(List eventList) { - for(Event event : eventList) { - logger.trace("Event :"); - for(String key : event.getKeyValues().keySet()) { - logger.trace("-- " + key + ":" + new String(event.getKeyValues().get(key))); - } - } - lastEvents = new ArrayList(eventList); - return eventList.size(); - } - - public List getLastEvents() { - return lastEvents; - } - - public void close() { - // not implemented - } - - public String getServer() { - // TODO Auto-generated method stub - return ""; - } - - public int getPort() { - // TODO Auto-generated method stub - return 0; - } - -} +//package info.fetter.logstashforwarder; +// +///* +// * Copyright 2015 Didier Fetter +// * +// * Licensed under the Apache License, Version 2.0 (the "License"); +// * you may not use this file except in compliance with the License. +// * You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// * +// */ +// +//import java.util.ArrayList; +//import java.util.List; +// +//import org.apache.log4j.Logger; +// +//public class MockProtocolAdapter implements ProtocolAdapter { +// private static Logger logger = Logger.getLogger(MockProtocolAdapter.class); +// private List lastEvents; +// +// public int sendEvents(List eventList) { +// for(Event event : eventList) { +// logger.trace("Event :"); +// for(String key : event.getKeyValues().keySet()) { +// logger.trace("-- " + key + ":" + new String(event.getKeyValues().get(key))); +// } +// } +// lastEvents = new ArrayList(eventList); +// return eventList.size(); +// } +// +// public List getLastEvents() { +// return lastEvents; +// } +// +// public void close() { +// // not implemented +// } +// +// public String getServer() { +// // TODO Auto-generated method stub +// return ""; +// } +// +// public int getPort() { +// // TODO Auto-generated method stub +// return 0; +// } +// +//} diff --git a/src/test/java/info/fetter/logstashforwarder/RegistrarTest.java b/src/test/java/info/fetter/logstashforwarder/RegistrarTest.java old mode 100644 new mode 100755 diff --git a/src/test/java/info/fetter/logstashforwarder/config/ConfigurationManagerTest.java b/src/test/java/info/fetter/logstashforwarder/config/ConfigurationManagerTest.java old mode 100644 new mode 100755 diff --git a/src/test/resources/config1.json b/src/test/resources/config1.json old mode 100644 new mode 100755 diff --git a/src/test/resources/state1.json b/src/test/resources/state1.json old mode 100644 new mode 100755 diff --git a/testFileWatcher1.txt b/testFileWatcher1.txt new file mode 100644 index 0000000..f083421 --- /dev/null +++ b/testFileWatcher1.txt @@ -0,0 +1,9 @@ +file 1 line 1 + nl line 1-2file 1 line 2 + nl line 3 +file 1 line 1 + nl line 1-2file 1 line 2 + nl line 3 +file 1 line 1 + nl line 1-2file 1 line 2 + nl line 3 diff --git a/testFileWatcher2.txt b/testFileWatcher2.txt new file mode 100644 index 0000000..7e68e0f --- /dev/null +++ b/testFileWatcher2.txt @@ -0,0 +1,3 @@ +file 1 line 1 + nl line 1-2file 1 line 2 + nl line 3