diff --git a/consumer/consumererror/error.go b/consumer/consumererror/error.go new file mode 100644 index 00000000000..fe7e8fa0240 --- /dev/null +++ b/consumer/consumererror/error.go @@ -0,0 +1,179 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package consumererror // import "go.opentelemetry.io/collector/consumer/consumererror" + +import ( + "net/http" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "go.opentelemetry.io/collector/consumer/consumererror/internal/statusconversion" +) + +// Error is intended to be used to encapsulate various information that can add +// context to an error that occurred within a pipeline component. Error objects +// are constructed through calling `New` with the relevant options to capture +// data around the error that occurred. +// +// It may hold multiple errors from downstream components, and can be merged +// with other errors as it travels upstream using `Combine`. The `Error` should +// be obtained from a given `error` object using `errors.As`. +// +// Experimental: This API is at the early stage of development and may change +// without backward compatibility +type Error struct { + error + httpStatus int + grpcStatus *status.Status + retryable bool +} + +var _ error = (*Error)(nil) + +// ErrorOption allows annotating an Error with metadata. +type ErrorOption interface { + applyOption(*Error) +} + +type errorOptionFunc func(*Error) + +func (f errorOptionFunc) applyOption(e *Error) { + f(e) +} + +// New wraps an error that happened while consuming telemetry and adds metadata +// onto it to be passed back up the pipeline. +// At least one option should be provided. +// +// Experimental: This API is at the early stage of development and may change +// without backward compatibility +func New(origErr error, options ...ErrorOption) error { + err := &Error{error: origErr} + + for _, option := range options { + option.applyOption(err) + } + + return err +} + +// WithOTLPHTTPStatus records an HTTP status code that was received from a server +// during data submission. +// It is not necessary to use WithRetryable with creating an error with WithOTLPHTTPStatus +// as the retryable property can be inferred from the HTTP status code using OTLP specification. +// +// Experimental: This API is at the early stage of development and may change +// without backward compatibility +func WithOTLPHTTPStatus(status int) ErrorOption { + return errorOptionFunc(func(err *Error) { + err.httpStatus = status + }) +} + +// WithOTLPGRPCStatus records a gRPC status code that was received from a server +// during data submission. +// It is not necessary to use WithRetryable with creating an error with WithOTLPGRPCStatus +// as the retryable property can be inferred from the grpc status using OTLP specification. +// +// Experimental: This API is at the early stage of development and may change +// without backward compatibility +func WithOTLPGRPCStatus(status *status.Status) ErrorOption { + return errorOptionFunc(func(err *Error) { + err.grpcStatus = status + }) +} + +// WithRetryable records that this error is retryable according to OTLP specification. +// WithRetryable is not necessary when creating an error with WithOTLPHTTPStatus or +// WithOTLPGRPCStatus, as the retryable property can be inferred from OTLP specification. +// +// Experimental: This API is at the early stage of development and may change +// without backward compatibility +func WithRetryable() ErrorOption { + return errorOptionFunc(func(err *Error) { + err.retryable = true + }) +} + +// Error implements the error interface. +func (e *Error) Error() string { + return e.error.Error() +} + +// Unwrap returns the wrapped error for use by `errors.Is` and `errors.As`. +func (e *Error) Unwrap() error { + return e.error +} + +// OTLPHTTPStatus returns an HTTP status code either directly set by the source, +// derived from a gRPC status code set by the source, or derived from Retryable. +// When deriving the value, the OTLP specification is used to map to HTTP. +// See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md for more details. +// +// If a http status code cannot be derived from these three sources then 500 is returned. +// +// Experimental: This API is at the early stage of development and may change +// without backward compatibility +func (e *Error) OTLPHTTPStatus() int { + if e.httpStatus != 0 { + return e.httpStatus + } + if e.grpcStatus != nil { + return statusconversion.GetHTTPStatusCodeFromStatus(e.grpcStatus) + } + if e.retryable { + return http.StatusServiceUnavailable + } + return http.StatusInternalServerError +} + +// OTLPGRPCStatus returns an gRPC status code either directly set by the source, +// derived from an HTTP status code set by the source, or derived from Retryable. +// When deriving the value, the OTLP specification is used to map to GRPC. +// See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md for more details. +// +// If a grpc code cannot be derived from these three sources then INTERNAL is returned. +// +// Experimental: This API is at the early stage of development and may change +// without backward compatibility +func (e *Error) OTLPGRPCStatus() *status.Status { + if e.grpcStatus != nil { + return e.grpcStatus + } + if e.httpStatus != 0 { + return statusconversion.NewStatusFromMsgAndHTTPCode(e.Error(), e.httpStatus) + } + if e.retryable { + return status.New(codes.Unavailable, e.Error()) + } + return status.New(codes.Internal, e.Error()) +} + +// Retryable returns true if the error was created with the WithRetryable set to true, +// if the http status code is retryable according to OTLP, +// or if the grpc status is retryable according to OTLP. +// Otherwise, returns false. +// +// See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md for retryable +// http and grpc codes. +// +// Experimental: This API is at the early stage of development and may change +// without backward compatibility +func (e *Error) Retryable() bool { + if e.retryable { + return true + } + switch e.httpStatus { + case http.StatusTooManyRequests, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout: + return true + } + if e.grpcStatus != nil { + switch e.grpcStatus.Code() { + case codes.Canceled, codes.DeadlineExceeded, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss: + return true + } + } + return false +} diff --git a/consumer/consumererror/error_test.go b/consumer/consumererror/error_test.go new file mode 100644 index 00000000000..b949d1f1c61 --- /dev/null +++ b/consumer/consumererror/error_test.go @@ -0,0 +1,178 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package consumererror + +import ( + "errors" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var errTest = errors.New("consumererror testing error") + +func Test_New(t *testing.T) { + httpStatus := 500 + grpcStatus := status.New(codes.Aborted, "aborted") + wantErr := &Error{ + error: errTest, + httpStatus: httpStatus, + grpcStatus: grpcStatus, + } + + newErr := New(errTest, + WithOTLPHTTPStatus(httpStatus), + WithOTLPGRPCStatus(grpcStatus), + ) + + require.Equal(t, wantErr, newErr) +} + +func Test_Error(t *testing.T) { + newErr := New(errTest) + + require.Equal(t, errTest.Error(), newErr.Error()) +} + +func TestUnwrap(t *testing.T) { + err := &Error{ + error: errTest, + } + + unwrapped := err.Unwrap() + + require.Equal(t, errTest, unwrapped) +} + +func TestAs(t *testing.T) { + err := &Error{ + error: errTest, + } + + secondError := errors.Join(errors.New("test"), err) + + var e *Error + require.True(t, errors.As(secondError, &e)) + assert.Equal(t, errTest.Error(), e.Error()) +} + +func TestError_Error(t *testing.T) { + err := &Error{ + error: errTest, + } + + require.Equal(t, errTest.Error(), err.Error()) +} + +func TestError_Unwrap(t *testing.T) { + err := &Error{ + error: errTest, + } + + require.Equal(t, errTest, err.Unwrap()) +} + +func TestError_OTLPHTTPStatus(t *testing.T) { + serverErr := http.StatusTooManyRequests + testCases := []struct { + name string + httpStatus int + grpcStatus *status.Status + want int + hasCode bool + }{ + { + name: "Passes through HTTP status", + httpStatus: serverErr, + want: serverErr, + hasCode: true, + }, + { + name: "Converts gRPC status", + grpcStatus: status.New(codes.ResourceExhausted, errTest.Error()), + want: serverErr, + hasCode: true, + }, + { + name: "Passes through HTTP status when gRPC status also present", + httpStatus: serverErr, + grpcStatus: status.New(codes.OK, errTest.Error()), + want: serverErr, + hasCode: true, + }, + { + name: "No statuses set", + want: http.StatusInternalServerError, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + err := Error{ + error: errTest, + httpStatus: tt.httpStatus, + grpcStatus: tt.grpcStatus, + } + + s := err.OTLPHTTPStatus() + + require.Equal(t, tt.want, s) + }) + } +} + +func TestError_OTLPGRPCStatus(t *testing.T) { + httpStatus := http.StatusTooManyRequests + otherOTLPHTTPStatus := http.StatusOK + serverErr := status.New(codes.ResourceExhausted, errTest.Error()) + testCases := []struct { + name string + httpStatus int + grpcStatus *status.Status + want *status.Status + hasCode bool + }{ + { + name: "Converts HTTP status", + httpStatus: httpStatus, + want: serverErr, + hasCode: true, + }, + { + name: "Passes through gRPC status", + grpcStatus: serverErr, + want: serverErr, + hasCode: true, + }, + { + name: "Passes through gRPC status when gRPC status also present", + httpStatus: otherOTLPHTTPStatus, + grpcStatus: serverErr, + want: serverErr, + hasCode: true, + }, + { + name: "No statuses set", + want: status.New(codes.Internal, errTest.Error()), + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + err := Error{ + error: errTest, + httpStatus: tt.httpStatus, + grpcStatus: tt.grpcStatus, + } + + s := err.OTLPGRPCStatus() + + require.Equal(t, tt.want, s) + }) + } +} diff --git a/consumer/consumererror/internal/statusconversion/conversion.go b/consumer/consumererror/internal/statusconversion/conversion.go new file mode 100644 index 00000000000..4c34cd0681c --- /dev/null +++ b/consumer/consumererror/internal/statusconversion/conversion.go @@ -0,0 +1,64 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package statusconversion // import "go.opentelemetry.io/collector/consumer/consumererror/internal/statusconversion" + +import ( + "net/http" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func GetHTTPStatusCodeFromStatus(s *status.Status) int { + // See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures + // to see if a code is retryable. + // See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures-1 + // to see a list of retryable http status codes. + switch s.Code() { + // Retryable + case codes.Canceled, codes.DeadlineExceeded, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss: + return http.StatusServiceUnavailable + // Retryable + case codes.ResourceExhausted: + return http.StatusTooManyRequests + // Not Retryable + case codes.InvalidArgument: + return http.StatusBadRequest + // Not Retryable + case codes.Unauthenticated: + return http.StatusUnauthorized + // Not Retryable + case codes.PermissionDenied: + return http.StatusForbidden + // Not Retryable + case codes.Unimplemented: + return http.StatusNotFound + // Not Retryable + default: + return http.StatusInternalServerError + } +} + +func NewStatusFromMsgAndHTTPCode(errMsg string, statusCode int) *status.Status { + var c codes.Code + // Mapping based on https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md + // 429 mapping to ResourceExhausted and 400 mapping to StatusBadRequest are exceptions. + switch statusCode { + case http.StatusBadRequest: + c = codes.InvalidArgument + case http.StatusUnauthorized: + c = codes.Unauthenticated + case http.StatusForbidden: + c = codes.PermissionDenied + case http.StatusNotFound: + c = codes.Unimplemented + case http.StatusTooManyRequests: + c = codes.ResourceExhausted + case http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout: + c = codes.Unavailable + default: + c = codes.Unknown + } + return status.New(c, errMsg) +} diff --git a/consumer/consumererror/internal/statusconversion/conversion_test.go b/consumer/consumererror/internal/statusconversion/conversion_test.go new file mode 100644 index 00000000000..aafcfbb65e9 --- /dev/null +++ b/consumer/consumererror/internal/statusconversion/conversion_test.go @@ -0,0 +1,110 @@ +package statusconversion // import "go.opentelemetry.io/collector/consumer/consumererror/internal/statusconversion" + +import ( + "net/http" + "testing" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func Test_GetHTTPStatusCodeFromStatus(t *testing.T) { + tests := []struct { + name string + input *status.Status + expected int + }{ + { + name: "Retryable Status", + input: status.New(codes.Unavailable, "test"), + expected: http.StatusServiceUnavailable, + }, + { + name: "Non-retryable Status", + input: status.New(codes.InvalidArgument, "test"), + expected: http.StatusBadRequest, + }, + { + name: "Specifically 429", + input: status.New(codes.ResourceExhausted, "test"), + expected: http.StatusTooManyRequests, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := GetHTTPStatusCodeFromStatus(tt.input) + assert.Equal(t, tt.expected, result) + }) + } +} + +func Test_ErrorMsgAndHTTPCodeToStatus(t *testing.T) { + tests := []struct { + name string + errMsg string + statusCode int + expected *status.Status + }{ + { + name: "Bad Request", + errMsg: "test", + statusCode: http.StatusBadRequest, + expected: status.New(codes.InvalidArgument, "test"), + }, + { + name: "Unauthorized", + errMsg: "test", + statusCode: http.StatusUnauthorized, + expected: status.New(codes.Unauthenticated, "test"), + }, + { + name: "Forbidden", + errMsg: "test", + statusCode: http.StatusForbidden, + expected: status.New(codes.PermissionDenied, "test"), + }, + { + name: "Not Found", + errMsg: "test", + statusCode: http.StatusNotFound, + expected: status.New(codes.Unimplemented, "test"), + }, + { + name: "Too Many Requests", + errMsg: "test", + statusCode: http.StatusTooManyRequests, + expected: status.New(codes.ResourceExhausted, "test"), + }, + { + name: "Bad Gateway", + errMsg: "test", + statusCode: http.StatusBadGateway, + expected: status.New(codes.Unavailable, "test"), + }, + { + name: "Service Unavailable", + errMsg: "test", + statusCode: http.StatusServiceUnavailable, + expected: status.New(codes.Unavailable, "test"), + }, + { + name: "Gateway Timeout", + errMsg: "test", + statusCode: http.StatusGatewayTimeout, + expected: status.New(codes.Unavailable, "test"), + }, + { + name: "Unsupported Media Type", + errMsg: "test", + statusCode: http.StatusUnsupportedMediaType, + expected: status.New(codes.Unknown, "test"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := NewStatusFromMsgAndHTTPCode(tt.errMsg, tt.statusCode) + assert.Equal(t, tt.expected, result) + }) + } +} diff --git a/consumer/consumererror/permanent.go b/consumer/consumererror/permanent.go index 71e26df214f..71935dade90 100644 --- a/consumer/consumererror/permanent.go +++ b/consumer/consumererror/permanent.go @@ -13,6 +13,8 @@ type permanent struct { // NewPermanent wraps an error to indicate that it is a permanent error, i.e. an // error that will be always returned if its source receives the same inputs. +// +// Deprecated: [v0.110.0] use Error instead. func NewPermanent(err error) error { return permanent{err: err} } @@ -29,6 +31,8 @@ func (p permanent) Unwrap() error { // IsPermanent checks if an error was wrapped with the NewPermanent function, which // is used to indicate that a given error will always be returned in the case // that its sources receives the same input. +// +// Deprecated: [v0.110.0] use Error.Retryable instead. func IsPermanent(err error) bool { if err == nil { return false diff --git a/consumer/consumererror/permanent_test.go b/consumer/consumererror/permanent_test.go index a8291db0496..99d2080c3af 100644 --- a/consumer/consumererror/permanent_test.go +++ b/consumer/consumererror/permanent_test.go @@ -20,6 +20,7 @@ func (t testErrorType) Error() string { return "" } +// nolint func TestIsPermanent(t *testing.T) { var err error assert.False(t, IsPermanent(err)) @@ -34,6 +35,7 @@ func TestIsPermanent(t *testing.T) { assert.True(t, IsPermanent(err)) } +// nolint func TestPermanent_Unwrap(t *testing.T) { var err error = testErrorType{"testError"} require.False(t, IsPermanent(err)) diff --git a/consumer/consumererror/signalerrors.go b/consumer/consumererror/signalerrors.go index 1d7558ce1ca..f1ba87f9af9 100644 --- a/consumer/consumererror/signalerrors.go +++ b/consumer/consumererror/signalerrors.go @@ -34,7 +34,7 @@ type Traces struct { func NewTraces(err error, data ptrace.Traces) error { return Traces{ retryable: retryable[ptrace.Traces]{ - error: err, + error: New(err, WithRetryable()), data: data, }, } @@ -50,7 +50,7 @@ type Logs struct { func NewLogs(err error, data plog.Logs) error { return Logs{ retryable: retryable[plog.Logs]{ - error: err, + error: New(err, WithRetryable()), data: data, }, } @@ -66,7 +66,7 @@ type Metrics struct { func NewMetrics(err error, data pmetric.Metrics) error { return Metrics{ retryable: retryable[pmetric.Metrics]{ - error: err, + error: New(err, WithRetryable()), data: data, }, } diff --git a/consumer/consumererror/signalerrors_test.go b/consumer/consumererror/signalerrors_test.go index 475d3173bb9..f7948c4853e 100644 --- a/consumer/consumererror/signalerrors_test.go +++ b/consumer/consumererror/signalerrors_test.go @@ -35,6 +35,9 @@ func TestTraces_Unwrap(t *testing.T) { // Unwrapping traceErr for err and assigning to target. require.True(t, errors.As(traceErr, &target)) require.Equal(t, err, target) + var e *Error + require.True(t, errors.As(traceErr, &e)) + assert.True(t, e.Retryable()) } func TestLogs(t *testing.T) { @@ -59,6 +62,9 @@ func TestLogs_Unwrap(t *testing.T) { // Unwrapping logsErr for err and assigning to target. require.True(t, errors.As(logsErr, &target)) require.Equal(t, err, target) + var e *Error + require.True(t, errors.As(logsErr, &e)) + assert.True(t, e.Retryable()) } func TestMetrics(t *testing.T) { @@ -83,4 +89,7 @@ func TestMetrics_Unwrap(t *testing.T) { // Unwrapping metricErr for err and assigning to target. require.True(t, errors.As(metricErr, &target)) require.Equal(t, err, target) + var e *Error + require.True(t, errors.As(metricErr, &e)) + assert.True(t, e.Retryable()) } diff --git a/consumer/go.mod b/consumer/go.mod index 3769cc87422..bc4594532d9 100644 --- a/consumer/go.mod +++ b/consumer/go.mod @@ -7,6 +7,7 @@ require ( go.opentelemetry.io/collector/pdata v1.14.1 go.opentelemetry.io/collector/pdata/testdata v0.108.1 go.uber.org/goleak v1.3.0 + google.golang.org/grpc v1.66.0 ) require ( @@ -22,7 +23,6 @@ require ( golang.org/x/sys v0.21.0 // indirect golang.org/x/text v0.16.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117 // indirect - google.golang.org/grpc v1.66.0 // indirect google.golang.org/protobuf v1.34.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect )