Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reader: by default, archive failed messages to disk #135

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions nsq/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import cgi
import warnings
import inspect
import datetime
import os.path

try:
import simplejson as json
Expand Down Expand Up @@ -131,6 +133,8 @@ def process_message(message):

:param max_backoff_duration: the maximum time we will allow a backoff state to last in seconds

:param data_path: the directory failed messages will be archived to after ``max_tries``

:param \*\*kwargs: passed to :class:`nsq.AsyncConn` initialization
"""
def __init__(
Expand All @@ -149,6 +153,7 @@ def __init__(
lookupd_poll_jitter=0.3,
lookupd_connect_timeout=1,
lookupd_request_timeout=2,
data_path=None,
**kwargs):
super(Reader, self).__init__(**kwargs)

Expand All @@ -161,6 +166,7 @@ def __init__(
assert isinstance(lookupd_poll_jitter, float)
assert isinstance(lookupd_connect_timeout, int)
assert isinstance(lookupd_request_timeout, int)
assert isinstance(data_path, (str, unicode, None.__class__))

assert lookupd_poll_jitter >= 0 and lookupd_poll_jitter <= 1

Expand Down Expand Up @@ -222,6 +228,7 @@ def __init__(

self.redist_periodic = None
self.query_periodic = None
self.data_path = data_path

def _run(self):
assert self.message_handler, "you must specify the Reader's message_handler"
Expand Down Expand Up @@ -702,14 +709,31 @@ def process_message(self, message):

def giving_up(self, message):
"""
Called when a message has been received where ``msg.attempts > max_tries``
Called when a message has been received where ``msg.attempts > max_tries``.

Failed messages will be archived to ``$data_path/$channel_$topic/%Y%m%d-%H%M%S-%f_$sequence.failed.msg``

This is useful to subclass and override to perform a task (such as writing to disk, etc.)
This is useful to subclass and override to perform a custom task (such as writing to disk, etc.)

:param message: the :class:`nsq.Message` received
"""
logger.warning('[%s] giving up on message %s after %d tries (max:%d) %r',
self.name, message.id, message.attempts, self.max_tries, message.body)

self.failed_count += 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see where this is being initialized?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha. just copy & pasting from my custom giving_up() here. I'll try some real validation =)


path = os.path.join(self.data_path or "", self.topic + '_' + self.channel)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably use a separator that isn't a valid character for a topic/channel. Perhaps the channel should just be a real subdir?

if not os.path.exists(path):
os.makedirs(path)

date_str = datetime.datetime.utcnow().strftime("%Y%m%d-%H%M%S-%f")
filename = "%s_%d.failed.msg" % (date_str, self.failed_count)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like message.id should be in the filename?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(too)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have mixed feelings. In some ways it could augment self.failed_count (which has some downsides across multiple).

however since message.id is source unique not globally unique (depends on nsqd workerID) putting in the filename might imply uniqueness that it shouldn't. Also, even though we create hex message id's, the protocol defines it as arbitrary bytes which wouldn't be safe for filename.

I chose to calculate the filename and include that in the log line as an alternative way a) raise awareness of the file archiving and b) link the two.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose to calculate the filename and include that in the log line as an alternative way a) raise awareness of the file archiving and b) link the two.

Fair point, that was the only reason why I thought it would be useful in the filename.

filename = os.path.join(path, filename)

logging.warning('[%s] giving up on message %s after %d tries (max:%d). Archived to %s %r',
self.name, message.id, message.attempts, self.max_tries, filename, message.body)

f = open(filename, 'wb')
f.write(message.body + '\n')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure we should append anything to the message.body, in the case of a binary message format

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah... this is a tough one. I'm certainly biased by my own json msg usage, but any newline expectations (like from nsq_to_file, nsq_tail, etc) are carried through here. Having this means you can cat *.msg | to_nsq, etc.

I think there are three basic options

a) document and move on,
b) remove newline
c) add a parameter to disable.

I'm inlined to do a or c

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a fan of more options (c) and I suppose that they can still override this method if one doesn't want a newline appended, so let's do (a)?

f.close()


def _on_connection_identify_response(self, conn, data, **kwargs):
Expand Down