Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_s3: replace MAX_UPLOAD_ERRORS constant with retry_limit #8985

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -1202,12 +1202,12 @@ static int put_all_chunks(struct flb_s3 *ctx)
continue;
}

if (chunk->failures >= MAX_UPLOAD_ERRORS) {
if (chunk->failures >= ctx->ins->retry_limit) {
flb_plg_warn(ctx->ins,
"Chunk for tag %s failed to send %i times, "
"will not retry",
(char *) fsf->meta_buf, MAX_UPLOAD_ERRORS);
flb_fstore_file_inactive(ctx->fs, fsf);
(char *) fsf->meta_buf, ctx->ins->retry_limit);
flb_fstore_file_delete(ctx->fs, fsf);
continue;
}

Expand Down Expand Up @@ -1484,13 +1484,17 @@ static struct multipart_upload *get_upload(struct flb_s3 *ctx,
struct mk_list *tmp;
struct mk_list *head;

if (mk_list_is_empty(&ctx->uploads)) {
return NULL;
}

mk_list_foreach_safe(head, tmp, &ctx->uploads) {
tmp_upload = mk_list_entry(head, struct multipart_upload, _head);

if (tmp_upload->upload_state == MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS) {
continue;
}
if (tmp_upload->upload_errors >= MAX_UPLOAD_ERRORS) {
if (tmp_upload->upload_errors >= ctx->ins->retry_limit) {
tmp_upload->upload_state = MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS;
flb_plg_error(ctx->ins, "Upload for %s has reached max upload errors",
tmp_upload->s3_key);
Expand Down Expand Up @@ -1727,10 +1731,10 @@ static void s3_upload_queue(struct flb_config *config, void *out_context)

/* If retry limit was reached, discard file and remove file from queue */
upload_contents->retry_counter++;
if (upload_contents->retry_counter >= MAX_UPLOAD_ERRORS) {
if (upload_contents->retry_counter >= ctx->ins->retry_limit) {
flb_plg_warn(ctx->ins, "Chunk file failed to send %d times, will not "
"retry", upload_contents->retry_counter);
s3_store_file_inactive(ctx, upload_contents->upload_file);
s3_store_file_delete(ctx, upload_contents->upload_file);
multipart_upload_destroy(upload_contents->m_upload_file);
remove_from_queue(upload_contents);
continue;
Expand Down Expand Up @@ -1805,7 +1809,7 @@ static void cb_s3_upload(struct flb_config *config, void *data)
m_upload = mk_list_entry(head, struct multipart_upload, _head);
complete = FLB_FALSE;

if (m_upload->complete_errors >= MAX_UPLOAD_ERRORS) {
if (m_upload->complete_errors >= ctx->ins->retry_limit) {
flb_plg_error(ctx->ins,
"Upload for %s has reached max completion errors, "
"plugin will give up", m_upload->s3_key);
Expand Down Expand Up @@ -2162,11 +2166,11 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk,
m_upload_file, file_first_log_time);
}

/* Discard upload_file if it has failed to upload MAX_UPLOAD_ERRORS times */
if (upload_file != NULL && upload_file->failures >= MAX_UPLOAD_ERRORS) {
/* Discard upload_file if it has failed to upload ctx->ins->retry_limit times */
if (upload_file != NULL && upload_file->failures >= ctx->ins->retry_limit) {
flb_plg_warn(ctx->ins, "File with tag %s failed to send %d times, will not "
"retry", event_chunk->tag, MAX_UPLOAD_ERRORS);
s3_store_file_inactive(ctx, upload_file);
"retry", event_chunk->tag, ctx->ins->retry_limit);
s3_store_file_delete(ctx, upload_file);
upload_file = NULL;
}

Expand Down Expand Up @@ -2272,14 +2276,15 @@ static int cb_s3_exit(void *data, struct flb_config *config)
continue;
}

if (m_upload->bytes > 0) {
if (m_upload->bytes > 0 && m_upload->complete_errors < ctx->ins->retry_limit) {
m_upload->upload_state = MULTIPART_UPLOAD_STATE_COMPLETE_IN_PROGRESS;
mk_list_del(&m_upload->_head);
ret = complete_multipart_upload(ctx, m_upload);
if (ret == 0) {
multipart_upload_destroy(m_upload);
}
else {
m_upload->complete_errors += 1;
mk_list_add(&m_upload->_head, &ctx->uploads);
flb_plg_error(ctx->ins, "Could not complete upload %s",
m_upload->s3_key);
Expand Down
15 changes: 2 additions & 13 deletions plugins/out_s3/s3.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,6 @@
/* Allowed max file size 1 GB for publishing to S3 */
#define MAX_FILE_SIZE_PUT_OBJECT 1000000000

#define DEFAULT_UPLOAD_TIMEOUT 3600

/*
* If we see repeated errors on an upload/chunk, we will discard it
* This saves us from scenarios where something goes wrong and an upload can
* not proceed (may be some other process completed it or deleted the upload)
* instead of erroring out forever, we eventually discard the upload.
*
* The same is done for chunks, just to be safe, even though realistically
* I can't think of a reason why a chunk could become unsendable.
*/
#define MAX_UPLOAD_ERRORS 5

struct upload_queue {
struct s3_file *upload_file;
Expand Down Expand Up @@ -95,8 +83,9 @@ struct multipart_upload {

struct mk_list _head;

/* see note for MAX_UPLOAD_ERRORS */
/* counter for chunk upload errors */
int upload_errors;
/* counter for multipart upload completion errors */
int complete_errors;
};

Expand Down
Loading