Skip to content

Commit

Permalink
Removed named pipe support because of file descriptor leaks.
Browse files Browse the repository at this point in the history
  • Loading branch information
didfet committed Nov 30, 2017
1 parent 16adcd5 commit 2722fc3
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private int readFile(FileState state, int spaceLeftInSpool) {

private boolean isCompressedFile(FileState state) {
RandomAccessFile reader = state.getRandomAccessFile();
if (!reader.canSeek()) return false;

try {
for(byte[] magic : MAGICS) {
byte[] fileBytes = new byte[magic.length];
Expand Down
9 changes: 1 addition & 8 deletions src/main/java/info/fetter/logstashforwarder/FileSigner.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,8 @@

public class FileSigner {
private static final Adler32 adler32 = new Adler32();
private static long fakeSignatureForPipes = System.currentTimeMillis();


public static long computeSignature(RandomAccessFile file, int signatureLength) throws IOException {
// If the file is not seekable, a pipe for instance,
// we report an ever-changing fake signature to keep
// FileWatcher trying to read again as it would with
// a normal file that had changed.
if (!file.canSeek()) return ++fakeSignatureForPipes;

adler32.reset();
byte[] input = new byte[signatureLength];
file.seek(0);
Expand Down
88 changes: 19 additions & 69 deletions src/main/java/info/fetter/logstashforwarder/FileWatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

/*
* Copyright 2015 Didier Fetter
* Copyright 2017 Alberto González Palomo https://sentido-labs.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -60,7 +59,7 @@ public void initialize() throws IOException {
oldWatchMap.put(state.getFile(), state);
}
}
synchronized (newWatchMap) { processModifications(); }
processModifications();
if(tail) {
for(FileState state : oldWatchMap.values()) {
if(state.getPointer() == 0) {
Expand Down Expand Up @@ -91,7 +90,7 @@ public void checkFiles() throws IOException {
for(FileAlterationObserver observer : observerList) {
observer.checkAndNotify();
}
synchronized (newWatchMap) { processModifications(); }
processModifications();
printWatchMap();
}

Expand Down Expand Up @@ -224,33 +223,12 @@ private void processModifications() throws IOException {
removeMarkedFilesFromWatchMap();
}

// This filter will accept anything that is not a directory,
// including named pipes (FIFOs), sockets and device files.
// The standard org.apache.commons.io.filefilter.FileFileFilter excludes
// them even if their documentation says
// "This filter accepts Files that are files (not directories)."
protected class FileFileFilter implements IOFileFilter
{
@Override
public boolean accept(File file) {
return !file.isDirectory();
}

@Override
public boolean accept(File dir, String name) {
return accept(new File(dir, name));
}
}
protected IOFileFilter fileFileFilter() {
return new FileFileFilter();
}

private void addSingleFile(String fileToWatch, Event fields, long deadTime, Multiline multiline, Filter filter) throws Exception {
logger.info("Watching file : " + new File(fileToWatch).getCanonicalPath());
String directory = FilenameUtils.getFullPath(fileToWatch);
String fileName = FilenameUtils.getName(fileToWatch);
IOFileFilter fileFilter = FileFilterUtils.and(
fileFileFilter(),
FileFilterUtils.fileFileFilter(),
FileFilterUtils.nameFileFilter(fileName),
new LastModifiedFileFilter(deadTime));
initializeWatchMap(new File(directory), fileFilter, fields, multiline, filter);
Expand All @@ -262,7 +240,7 @@ private void addWildCardFiles(String filesToWatch, Event fields, long deadTime,
String wildcard = FilenameUtils.getName(filesToWatch);
logger.trace("Directory : " + new File(directory).getCanonicalPath() + ", wildcard : " + wildcard);
IOFileFilter fileFilter = FileFilterUtils.and(
fileFileFilter(),
FileFilterUtils.fileFileFilter(),
new WildcardFileFilter(wildcard),
new LastModifiedFileFilter(deadTime));
initializeWatchMap(new File(directory), fileFilter, fields, multiline, filter);
Expand All @@ -289,50 +267,22 @@ private void initializeWatchMap(File directory, IOFileFilter fileFilter, Event f
}
}

// This class will wait until the file is open, which happens in
// new FileState(file).
// Normal files open immediately, but named pipes block until data
// is written in them.
protected class FileAdderThread extends Thread
{
Map<File,FileState> map;
File file;
Event fields;
Multiline multiline;
Filter filter;

private FileAdderThread() {}
public FileAdderThread(Map<File,FileState> map, File file, Event fields, Multiline multiline, Filter filter) {
this.map = map;
this.file = file;
this.fields = fields;
this.multiline = multiline;
this.filter = filter;
}

public void run() {
try {
FileState state = new FileState(file);
state.setFields(fields);
int signatureLength = (int) (state.getSize() > maxSignatureLength ? maxSignatureLength : state.getSize());
state.setSignatureLength(signatureLength);
long signature = FileSigner.computeSignature(state.getRandomAccessFile(), signatureLength);
state.setSignature(signature);
logger.trace("Setting signature of size : " + signatureLength + " on file : " + file + " : " + signature);
state.setMultiline(multiline);
state.setFilter(filter);
synchronized (map /* This is actually newWatchMap. */) {
map.put(file, state);
}
} catch(IOException e) {
logger.error("Caught IOException in addFileToWatchMap : " +
e.getMessage());
}
}
}

private void addFileToWatchMap(Map<File,FileState> map, File file, Event fields, Multiline multiline, Filter filter) {
(new FileAdderThread(map, file, fields, multiline, filter)).start();
try {
FileState state = new FileState(file);
state.setFields(fields);
int signatureLength = (int) (state.getSize() > maxSignatureLength ? maxSignatureLength : state.getSize());
state.setSignatureLength(signatureLength);
long signature = FileSigner.computeSignature(state.getRandomAccessFile(), signatureLength);
state.setSignature(signature);
logger.trace("Setting signature of size : " + signatureLength + " on file : " + file + " : " + signature);
state.setMultiline(multiline);
state.setFilter(filter);
map.put(file, state);
} catch(IOException e) {
logger.error("Caught IOException in addFileToWatchMap : " +
e.getMessage());
}
}

public void onFileChange(File file, Event fields, Multiline multiline, Filter filter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.commons.lang.builder.ToStringBuilder;

import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/*
* Copyright 1998-2009 University Corporation for Atmospheric Research/Unidata
* Copyright 2017 Alberto González Palomo https://sentido-labs.com
*
* Portions of this software were developed by the Unidata Program at the
* University Corporation for Atmospheric Research.
Expand Down Expand Up @@ -161,14 +160,13 @@ static public long getDebugNbytes() {
*/
protected java.io.RandomAccessFile file;
protected java.nio.channels.FileChannel fileChannel;
protected boolean canSeek;

/**
* The offset in bytes from the file start, of the next read or
* write operation.
*/
protected long filePosition;

/**
* The buffer used for reading the data.
*/
Expand Down Expand Up @@ -263,12 +261,6 @@ public RandomAccessFile(String location, String mode, int bufferSize) throws IOE
}

this.file = new java.io.RandomAccessFile(location, mode);
try {
this.file.seek(0);
canSeek = true;
} catch (IOException e) {
canSeek = false;
}
this.readonly = mode.equals("r");
init(bufferSize);

Expand Down Expand Up @@ -382,10 +374,6 @@ public void seek(long pos) throws IOException {
readBuffer(pos);
}

public boolean canSeek() {
return canSeek;
}

protected void readBuffer(long pos) throws IOException {
// If the current buffer is modified, write it to disk.
if (bufferModified) {
Expand Down Expand Up @@ -436,27 +424,14 @@ public String getLocation() {
* @throws IOException if an I/O error occurrs.
*/
public long length() throws IOException {
long fileLength = canSeek? file.length(): 0;
long fileLength = file.length();
if (fileLength < dataEnd) {
return dataEnd;
} else {
return fileLength;
}
}

/**
* Check whether the file is empty: normal files are empty
* when their size is zero, but other kinds of files like
* named pipes / FIFOs do not report a size.
*/
public boolean isEmpty() throws IOException {
if (canSeek) {
return length() == 0;
} else {
return false;
}
}

/**
* Change the current endian mode. Subsequent reads of short, int, float, double, long, char will
* use this. Does not currently affect writes.
Expand All @@ -478,6 +453,10 @@ public void order(int endian) {
public FileDescriptor getFD() throws IOException {
return (file == null) ? null : file.getFD();
}

public boolean isEmpty() throws IOException {
return length() == 0;
}

/**
* Copy the contents of the buffer to the disk.
Expand All @@ -486,7 +465,7 @@ public FileDescriptor getFD() throws IOException {
*/
public void flush() throws IOException {
if (bufferModified) {
if (canSeek) file.seek(bufferStart);
file.seek(bufferStart);
file.write(buffer, 0, dataSize);
//System.out.println("--flush at "+bufferStart+" dataSize= "+dataSize+ " filePosition= "+filePosition);
bufferModified = false;
Expand Down Expand Up @@ -653,7 +632,7 @@ public long readToByteChannel(WritableByteChannel dest, long offset, long nbytes
* @throws IOException on io error
*/
protected int read_(long pos, byte[] b, int offset, int len) throws IOException {
if (canSeek) file.seek(pos);
file.seek(pos);
int n = file.read(b, offset, len);
if (debugAccess) {
if (showRead) System.out.println(" **read_ " + location + " = " + len + " bytes at " + pos + "; block = " + (pos / buffer.length));
Expand Down Expand Up @@ -877,7 +856,7 @@ public void writeBytes(byte b[], int off, int len) throws IOException {
if (bufferModified) {
flush();
}
if (canSeek) file.seek(filePosition); // moved per Steve Cerruti; Jan 14, 2005
file.seek(filePosition); // moved per Steve Cerruti; Jan 14, 2005
file.write(b, off, len);
//System.out.println("--write at "+filePosition+" "+len);

Expand Down Expand Up @@ -1759,3 +1738,5 @@ public boolean searchForward(KMPMatch match, int maxBytes) throws IOException {
}

}


0 comments on commit 2722fc3

Please sign in to comment.