Skip to content

Commit

Permalink
Merge pull request rsyslog#4895 from rgerhards/i4797-fix
Browse files Browse the repository at this point in the history
imfile bugfix: message loss/duplication when monitored file is rotated
  • Loading branch information
rgerhards authored Aug 5, 2022
2 parents 53dc5ed + 5520373 commit 8dce9f2
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 75 deletions.
49 changes: 38 additions & 11 deletions plugins/imfile/imfile.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ MODULE_CNFNAME("imfile")
/* defines */
#define FILE_ID_HASH_SIZE 20 /* max size of a file_id hash */
#define FILE_ID_SIZE 512 /* how many bytes are used for file-id? */
#define FILE_DELETE_DELAY 5 /* how many seconds to wait before finally deleting a gone file */

/* Module static data */
DEF_IMOD_STATIC_DATA /* must be present, starts static data */
Expand Down Expand Up @@ -209,6 +210,7 @@ struct act_obj_s {
ratelimit_t *ratelimiter;
multi_submit_t multiSub;
int is_symlink;
time_t time_to_delete; /* Helper variable to DELAY the actual file delete in act_obj_unlink */
};
struct fs_edge_s {
fs_node_t *parent; /* node pointing to this edge */
Expand Down Expand Up @@ -774,6 +776,7 @@ act_obj_add(fs_edge_t *const edge, const char *const name, const int is_file,
act->file_id_prev[0] = '\0';
act->is_symlink = is_symlink;
act->ratelimiter = NULL;
act->time_to_delete = 0;
if (source) { /* we are target of symlink */
CHKmalloc(act->source_name = strdup(source));
} else {
Expand Down Expand Up @@ -827,26 +830,49 @@ detect_updates(fs_edge_t *const edge)

for(act = edge->active ; act != NULL ; act = act->next) {
DBGPRINTF("detect_updates checking active obj '%s'\n", act->name);
const int r = lstat(act->name, &fileInfo);
// lstat() has the disadvantage, that we get "deleted" when the name has changed
// but inode is still the same (like with logrotate)
int r = lstat(act->name, &fileInfo);
if(r == -1) { /* object gone away? */
DBGPRINTF("object gone away, unlinking: '%s'\n", act->name);
act_obj_unlink(act);
restart = 1;
/* now let's see if the file itself already exist (e.g. rotated away) */
/* NOTE: this will NOT stall the file. The reason is that when a new file
* with the same name is detected, we will not run into this code.
TODO: check the full implications, there are for sure some!
e.g. file has been closed, so we will never have old inode (but
why was it closed then? --> check)
*/
r = fstat(act->ino, &fileInfo);
if(r == -1) {
time_t ttNow;
time(&ttNow);
if (act->time_to_delete == 0) {
act->time_to_delete = ttNow;
}
/* First time we run into this code, we need to give imfile a little time to process
* the old file in case a process is still writing into it until the FILE_DELETE_DELAY
* is reached OR the inode has changed (see elseif below). In most cases, the
* delay will never be reached and the file will be closed when the inode has changed.
*/
if (act->time_to_delete + FILE_DELETE_DELAY < ttNow) {
DBGPRINTF("detect_updates obj gone away, unlinking: '%s', ttDelete: %ld/%ld\n",
act->name, act->time_to_delete, ttNow);
act_obj_unlink(act);
restart = 1;
} else {
DBGPRINTF("detect_updates obj gone away, keep '%s' open: %ld/%ld/%lds!\n",
act->name, act->time_to_delete, ttNow, ttNow - act->time_to_delete);
pollFile(act);
}
}
break;
} else if(fileInfo.st_ino != act->ino) {
DBGPRINTF("file '%s' inode changed from %llu to %llu, unlinking from "
"internal lists\n", act->name, (long long unsigned) act->ino,
(long long unsigned) fileInfo.st_ino);
if(act->pStrm != NULL) {
/* we do no need to re-set later, as act_obj_unlink
* will destroy the strm obj */
strmSet_checkRotation(act->pStrm, STRM_ROTATION_DO_NOT_CHECK);
}
act_obj_unlink(act);
restart = 1;
break;
}

}

if (restart) {
Expand Down Expand Up @@ -1103,7 +1129,8 @@ chk_active(const act_obj_t *act, const act_obj_t *const deleted)
static void ATTR_NONNULL()
act_obj_unlink(act_obj_t *act)
{
DBGPRINTF("act_obj_unlink %p: %s, pStrm %p\n", act, act->name, act->pStrm);
DBGPRINTF("act_obj_unlink %p: %s, pStrm %p, ttDelete: %ld\n",
act, act->name, act->pStrm, act->time_to_delete);
if(act->prev == NULL) {
act->edge->active = act->next;
} else {
Expand Down
58 changes: 4 additions & 54 deletions runtime/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -587,47 +587,6 @@ strmNextFile(strm_t *pThis)
}


/* handle the eof case for monitored files.
* If we are monitoring a file, someone may have rotated it. In this case, we
* also need to close it and reopen it under the same name.
* rgerhards, 2008-02-13
* The previous code also did a check for file truncation, in which case the
* file was considered rewritten. However, this potential border case turned
* out to be a big trouble spot on busy systems. It caused massive message
* duplication (I guess stat() can return a too-low number under some
* circumstances). So starting as of now, we only check the inode number and
* a file change is detected only if the inode changes. -- rgerhards, 2011-01-10
*/
static rsRetVal ATTR_NONNULL()
strmHandleEOFMonitor(strm_t *const pThis)
{
DEFiRet;
struct stat statName;

ISOBJ_TYPE_assert(pThis, strm);
if(stat((char*) pThis->pszCurrFName, &statName) == -1)
ABORT_FINALIZE(RS_RET_IO_ERROR);
DBGPRINTF("strmHandleEOFMonitor: stream checking for file change on '%s', inode %u/%u size %llu/%llu\n",
pThis->pszCurrFName, (unsigned) pThis->inode, (unsigned) statName.st_ino,
(long long unsigned) pThis->iCurrOffs, (long long unsigned) statName.st_size);

/* Inode unchanged but file size on disk is less than current offset
* means file was truncated, we also reopen if 'reopenOnTruncate' is on
*/
if (pThis->inode != statName.st_ino
|| (pThis->bReopenOnTruncate && statName.st_size < pThis->iCurrOffs)) {
DBGPRINTF("we had a file change on '%s'\n", pThis->pszCurrFName);
CHKiRet(strmCloseFile(pThis));
CHKiRet(strmOpenFile(pThis));
} else {
ABORT_FINALIZE(RS_RET_EOF);
}

finalize_it:
RETiRet;
}


/* handle the EOF case of a stream
* The EOF case is somewhat complicated, as the proper action depends on the
* mode the stream is in. If there are multiple files (circular logs, most
Expand Down Expand Up @@ -655,11 +614,8 @@ strmHandleEOF(strm_t *const pThis)
case STREAMTYPE_FILE_MONITOR:
DBGOPRINT((obj_t*) pThis, "file '%s' (%d) EOF, rotationCheck %d\n",
pThis->pszCurrFName, pThis->fd, pThis->rotationCheck);
if(pThis->rotationCheck == STRM_ROTATION_DO_CHECK) {
CHKiRet(strmHandleEOFMonitor(pThis));
} else {
ABORT_FINALIZE(RS_RET_EOF);
}
DBGPRINTF("RGER: EOF!\n");
ABORT_FINALIZE(RS_RET_EOF);
break;
}

Expand Down Expand Up @@ -776,6 +732,7 @@ strmReadBuf(strm_t *pThis, int *padBytes)
}
iLenRead = read(pThis->fd, pThis->pIOBuf, toRead);
DBGOPRINT((obj_t*) pThis, "file %d read %ld bytes\n", pThis->fd, iLenRead);
DBGOPRINT((obj_t*) pThis, "file %d read %*s\n", pThis->fd, (unsigned) iLenRead, (char*) pThis->pIOBuf);
/* end crypto */
if(iLenRead == 0) {
CHKiRet(strmHandleEOF(pThis));
Expand Down Expand Up @@ -1025,6 +982,7 @@ strmReadLine(strm_t *const pThis, cstr_t **ppCStr, uint8_t mode, sbool bEscapeLF
}
pThis->strtOffs = pThis->iCurrOffs; /* we are at begin of next line */
} else {
DBGPRINTF("RGER: strmReadLine iRet %d\n", iRet);
if(*ppCStr != NULL) {
if(cstrLen(*ppCStr) > 0) {
/* we may have an empty string in an unsuccesfull poll or after restart! */
Expand Down Expand Up @@ -2161,14 +2119,6 @@ strmSetReadTimeout(strm_t *const __restrict__ pThis, const int val)
pThis->readTimeout = val;
}

void ATTR_NONNULL()
strmSet_checkRotation(strm_t *const pThis, const int val) {
ISOBJ_TYPE_assert(pThis, strm);
assert(val == STRM_ROTATION_DO_CHECK || val == STRM_ROTATION_DO_NOT_CHECK);
pThis->rotationCheck = val;
}


static rsRetVal ATTR_NONNULL()
strmSetbDeleteOnClose(strm_t *const pThis, const int val)
{
Expand Down
5 changes: 0 additions & 5 deletions runtime/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,6 @@ typedef enum {
STRM_COMPRESS_ZSTD = 1
} strm_compressionDriver_t;

/* settings for stream rotation (applies not to all processing modes!) */
#define STRM_ROTATION_DO_CHECK 0
#define STRM_ROTATION_DO_NOT_CHECK 1

#define STREAM_ASYNC_NUMBUFS 2 /* must be a power of 2 -- TODO: make configurable */
/* The strm_t data structure */
struct strm_s {
Expand Down Expand Up @@ -254,6 +250,5 @@ void strmSetReadTimeout(strm_t *const __restrict__ pThis, const int val);
const uchar * ATTR_NONNULL() strmGetPrevLineSegment(strm_t *const pThis);
const uchar * ATTR_NONNULL() strmGetPrevMsgSegment(strm_t *const pThis);
int ATTR_NONNULL() strmGetPrevWasNL(const strm_t *const pThis);
void ATTR_NONNULL() strmSet_checkRotation(strm_t *const pThis, const int val);

#endif /* #ifndef STREAM_H_INCLUDED */
2 changes: 2 additions & 0 deletions tests/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -1565,6 +1565,7 @@ TESTS += \
imfile-symlink.sh \
imfile-symlink-multi.sh \
imfile-logrotate.sh \
imfile-logrotate-async.sh \
imfile-logrotate-multiple.sh \
imfile-logrotate-copytruncate.sh \
imfile-logrotate-nocopytruncate.sh \
Expand Down Expand Up @@ -2575,6 +2576,7 @@ EXTRA_DIST= \
imfile-symlink.sh \
imfile-symlink-multi.sh \
imfile-logrotate.sh \
imfile-logrotate-async.sh \
imfile-logrotate-copytruncate.sh \
imfile-logrotate-nocopytruncate.sh \
imfile-logrotate-multiple.sh \
Expand Down
82 changes: 82 additions & 0 deletions tests/imfile-logrotate-async.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#!/bin/bash
# This is part of the rsyslog testbench, licensed under ASL 2.0
. $srcdir/diag.sh check-inotify-only
. ${srcdir:=.}/diag.sh init
check_command_available logrotate
export NUMMESSAGES=10000
export RETRIES=50

# Write logrotate config file
echo '"./'$RSYSLOG_DYNNAME'.input*.log"
{
#daily
rotate 60
missingok
notifempty
sharedscripts
postrotate
kill -HUP $(cat '$RSYSLOG_DYNNAME'.inputfilegen_pid)
endscript
#olddir /logs/old
}' > $RSYSLOG_DYNNAME.logrotate


generate_conf
add_conf '
$WorkDirectory '$RSYSLOG_DYNNAME'.spool
global( debug.whitelist="on"
debug.files=["imfile.c", "stream.c"]
)
module(load="../plugins/imfile/.libs/imfile" mode="inotify" PollingInterval="2")
input(type="imfile" File="./'$RSYSLOG_DYNNAME'.input*.log" Tag="file:"
Severity="error" Facility="local7" addMetadata="on" reopenOnTruncate="on")
$template outfmt,"%msg:F,58:2%\n"
if $msg contains "msgnum:" then
action(type="omfile" file="'$RSYSLOG_OUT_LOG'" template="outfmt")
'
startup

./inputfilegen -m $NUMMESSAGES -S 5 -B 500 -f $RSYSLOG_DYNNAME.input.log &
INPUTFILEGEN_PID=$!
echo "$INPUTFILEGEN_PID" > $RSYSLOG_DYNNAME.inputfilegen_pid



./msleep 1
logrotate --state $RSYSLOG_DYNNAME.logrotate.state -f $RSYSLOG_DYNNAME.logrotate
./msleep 20
echo INPUT FILES:
ls -li $RSYSLOG_DYNNAME.input*
logrotate --state $RSYSLOG_DYNNAME.logrotate.state -f $RSYSLOG_DYNNAME.logrotate
./msleep 20
echo INPUT FILES:
ls -li $RSYSLOG_DYNNAME.input*
logrotate --state $RSYSLOG_DYNNAME.logrotate.state -f $RSYSLOG_DYNNAME.logrotate
echo INPUT FILES:
ls -li $RSYSLOG_DYNNAME.input*
echo ls ${RSYSLOG_DYNNAME}.spool:
ls -li ${RSYSLOG_DYNNAME}.spool
echo INPUT FILES:
ls -li $RSYSLOG_DYNNAME.input*

# generate more input after logrotate into new logfile
#./inputfilegen -m $TESTMESSAGES -i $TESTMESSAGES >> $RSYSLOG_DYNNAME.input.1.log
#ls -l $RSYSLOG_DYNNAME.input*

#msgcount=$((2* TESTMESSAGES))
#wait_file_lines $RSYSLOG_OUT_LOG $msgcount $RETRIES
wait_file_lines

touch $RSYSLOG_DYNNAME.input.log
./msleep 1000

shutdown_when_empty
wait_shutdown
seq_check
#seq_check 0 $TESTMESSAGESFULL
exit_test
Loading

0 comments on commit 8dce9f2

Please sign in to comment.