-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[consumer] Add new otlp-centric error type #11085
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package consumererror // import "go.opentelemetry.io/collector/consumer/consumererror" | ||
|
||
import ( | ||
"net/http" | ||
|
||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/status" | ||
|
||
"go.opentelemetry.io/collector/consumer/consumererror/internal/statusconversion" | ||
) | ||
|
||
// Error is intended to be used to encapsulate various information that can add | ||
// context to an error that occurred within a pipeline component. Error objects | ||
// are constructed through calling `New` with the relevant options to capture | ||
// data around the error that occurred. | ||
// | ||
// It may hold multiple errors from downstream components, and can be merged | ||
// with other errors as it travels upstream using `Combine`. The `Error` should | ||
// be obtained from a given `error` object using `errors.As`. | ||
// | ||
// Experimental: This API is at the early stage of development and may change | ||
// without backward compatibility | ||
Comment on lines
+24
to
+25
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How are we going to mark consumer as 1.0 if we have this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the plan from the other PR was to quickly remove it. We could also choose to not start with it. |
||
type Error struct { | ||
error | ||
httpStatus int | ||
grpcStatus *status.Status | ||
retryable bool | ||
} | ||
|
||
var _ error = (*Error)(nil) | ||
|
||
// NewOTLPHTTPError records an HTTP status code that was received from a server | ||
// during data submission. | ||
func NewOTLPHTTPError(origErr error, httpStatus int) error { | ||
return &Error{error: origErr, httpStatus: httpStatus} | ||
} | ||
|
||
// NewOTLPGRPCError records a gRPC status code that was received from a server | ||
// during data submission. | ||
func NewOTLPGRPCError(origErr error, status *status.Status) error { | ||
return &Error{error: origErr, grpcStatus: status} | ||
} | ||
|
||
// NewRetryableError records that this error is retryable according to OTLP specification. | ||
func NewRetryableError(origErr error) error { | ||
return &Error{error: origErr, retryable: true} | ||
} | ||
|
||
// Error implements the error interface. | ||
func (e *Error) Error() string { | ||
return e.error.Error() | ||
} | ||
|
||
// Unwrap returns the wrapped error for use by `errors.Is` and `errors.As`. | ||
func (e *Error) Unwrap() error { | ||
return e.error | ||
} | ||
|
||
// OTLPHTTPStatus returns an HTTP status code either directly set by the source, | ||
// derived from a gRPC status code set by the source, or derived from Retryable. | ||
// When deriving the value, the OTLP specification is used to map to HTTP. | ||
// See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md for more details. | ||
// | ||
// If a http status code cannot be derived from these three sources then 500 is returned. | ||
// | ||
// Experimental: This API is at the early stage of development and may change | ||
// without backward compatibility | ||
func (e *Error) OTLPHTTPStatus() int { | ||
if e.httpStatus != 0 { | ||
return e.httpStatus | ||
} | ||
if e.grpcStatus != nil { | ||
return statusconversion.GetHTTPStatusCodeFromStatus(e.grpcStatus) | ||
} | ||
if e.retryable { | ||
return http.StatusServiceUnavailable | ||
} | ||
return http.StatusInternalServerError | ||
} | ||
|
||
// OTLPGRPCStatus returns an gRPC status code either directly set by the source, | ||
// derived from an HTTP status code set by the source, or derived from Retryable. | ||
// When deriving the value, the OTLP specification is used to map to GRPC. | ||
// See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md for more details. | ||
// | ||
// If a grpc code cannot be derived from these three sources then INTERNAL is returned. | ||
// | ||
// Experimental: This API is at the early stage of development and may change | ||
// without backward compatibility | ||
func (e *Error) OTLPGRPCStatus() *status.Status { | ||
if e.grpcStatus != nil { | ||
return e.grpcStatus | ||
} | ||
if e.httpStatus != 0 { | ||
return statusconversion.NewStatusFromMsgAndHTTPCode(e.Error(), e.httpStatus) | ||
} | ||
if e.retryable { | ||
return status.New(codes.Unavailable, e.Error()) | ||
} | ||
return status.New(codes.Internal, e.Error()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have we considered There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am an http fanboy, if there is any improperly used grpc codes please let me know. Is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think |
||
} | ||
|
||
// Retryable returns true if the error was created with the WithRetryable set to true, | ||
// if the http status code is retryable according to OTLP, | ||
// or if the grpc status is retryable according to OTLP. | ||
// Otherwise, returns false. | ||
// | ||
// See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md for retryable | ||
// http and grpc codes. | ||
// | ||
// Experimental: This API is at the early stage of development and may change | ||
// without backward compatibility | ||
func (e *Error) Retryable() bool { | ||
if e.retryable { | ||
return true | ||
} | ||
switch e.httpStatus { | ||
case http.StatusTooManyRequests, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Usually we follow the OTLP spec for determining whether an error is retryable or non-retryable, and according to the OTLP spec 500 errors aren't retryable. |
||
return true | ||
} | ||
if e.grpcStatus != nil { | ||
switch e.grpcStatus.Code() { | ||
case codes.Canceled, codes.DeadlineExceeded, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss: | ||
return true | ||
} | ||
} | ||
return false | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,196 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package consumererror | ||
|
||
import ( | ||
"errors" | ||
"net/http" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/status" | ||
) | ||
|
||
var errTest = errors.New("consumererror testing error") | ||
|
||
func Test_NewOTLPHTTPError(t *testing.T) { | ||
httpStatus := 500 | ||
wantErr := &Error{ | ||
error: errTest, | ||
httpStatus: httpStatus, | ||
} | ||
|
||
newErr := NewOTLPHTTPError(errTest, httpStatus) | ||
|
||
require.Equal(t, wantErr, newErr) | ||
} | ||
|
||
func Test_NewOTLPGRPCError(t *testing.T) { | ||
grpcStatus := status.New(codes.Aborted, "aborted") | ||
wantErr := &Error{ | ||
error: errTest, | ||
grpcStatus: grpcStatus, | ||
} | ||
|
||
newErr := NewOTLPGRPCError(errTest, grpcStatus) | ||
|
||
require.Equal(t, wantErr, newErr) | ||
} | ||
|
||
func Test_NewRetryableError(t *testing.T) { | ||
wantErr := &Error{ | ||
error: errTest, | ||
retryable: true, | ||
} | ||
|
||
newErr := NewRetryableError(errTest) | ||
|
||
require.Equal(t, wantErr, newErr) | ||
} | ||
|
||
func Test_Error(t *testing.T) { | ||
newErr := Error{error: errTest} | ||
|
||
require.Equal(t, errTest.Error(), newErr.Error()) | ||
} | ||
|
||
func TestUnwrap(t *testing.T) { | ||
err := &Error{ | ||
error: errTest, | ||
} | ||
|
||
unwrapped := err.Unwrap() | ||
|
||
require.Equal(t, errTest, unwrapped) | ||
} | ||
|
||
func TestAs(t *testing.T) { | ||
err := &Error{ | ||
error: errTest, | ||
} | ||
|
||
secondError := errors.Join(errors.New("test"), err) | ||
|
||
var e *Error | ||
require.ErrorAs(t, secondError, &e) | ||
assert.Equal(t, errTest.Error(), e.Error()) | ||
} | ||
|
||
func TestError_Error(t *testing.T) { | ||
err := &Error{ | ||
error: errTest, | ||
} | ||
|
||
require.Equal(t, errTest.Error(), err.Error()) | ||
} | ||
|
||
func TestError_Unwrap(t *testing.T) { | ||
err := &Error{ | ||
error: errTest, | ||
} | ||
|
||
require.Equal(t, errTest, err.Unwrap()) | ||
} | ||
|
||
func TestError_OTLPHTTPStatus(t *testing.T) { | ||
serverErr := http.StatusTooManyRequests | ||
testCases := []struct { | ||
name string | ||
httpStatus int | ||
grpcStatus *status.Status | ||
want int | ||
hasCode bool | ||
}{ | ||
{ | ||
name: "Passes through HTTP status", | ||
httpStatus: serverErr, | ||
want: serverErr, | ||
hasCode: true, | ||
}, | ||
{ | ||
name: "Converts gRPC status", | ||
grpcStatus: status.New(codes.ResourceExhausted, errTest.Error()), | ||
want: serverErr, | ||
hasCode: true, | ||
}, | ||
{ | ||
name: "Passes through HTTP status when gRPC status also present", | ||
httpStatus: serverErr, | ||
grpcStatus: status.New(codes.OK, errTest.Error()), | ||
want: serverErr, | ||
hasCode: true, | ||
}, | ||
{ | ||
name: "No statuses set", | ||
want: http.StatusInternalServerError, | ||
}, | ||
} | ||
|
||
for _, tt := range testCases { | ||
t.Run(tt.name, func(t *testing.T) { | ||
err := Error{ | ||
error: errTest, | ||
httpStatus: tt.httpStatus, | ||
grpcStatus: tt.grpcStatus, | ||
} | ||
|
||
s := err.OTLPHTTPStatus() | ||
|
||
require.Equal(t, tt.want, s) | ||
}) | ||
} | ||
} | ||
|
||
func TestError_OTLPGRPCStatus(t *testing.T) { | ||
httpStatus := http.StatusTooManyRequests | ||
otherOTLPHTTPStatus := http.StatusOK | ||
serverErr := status.New(codes.ResourceExhausted, errTest.Error()) | ||
testCases := []struct { | ||
name string | ||
httpStatus int | ||
grpcStatus *status.Status | ||
want *status.Status | ||
hasCode bool | ||
}{ | ||
{ | ||
name: "Converts HTTP status", | ||
httpStatus: httpStatus, | ||
want: serverErr, | ||
hasCode: true, | ||
}, | ||
{ | ||
name: "Passes through gRPC status", | ||
grpcStatus: serverErr, | ||
want: serverErr, | ||
hasCode: true, | ||
}, | ||
{ | ||
name: "Passes through gRPC status when gRPC status also present", | ||
httpStatus: otherOTLPHTTPStatus, | ||
grpcStatus: serverErr, | ||
want: serverErr, | ||
hasCode: true, | ||
}, | ||
{ | ||
name: "No statuses set", | ||
want: status.New(codes.Internal, errTest.Error()), | ||
}, | ||
} | ||
|
||
for _, tt := range testCases { | ||
t.Run(tt.name, func(t *testing.T) { | ||
err := Error{ | ||
error: errTest, | ||
httpStatus: tt.httpStatus, | ||
grpcStatus: tt.grpcStatus, | ||
} | ||
|
||
s := err.OTLPGRPCStatus() | ||
|
||
require.Equal(t, tt.want, s) | ||
}) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package statusconversion // import "go.opentelemetry.io/collector/consumer/consumererror/internal/statusconversion" | ||
|
||
import ( | ||
"net/http" | ||
|
||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/status" | ||
) | ||
|
||
func GetHTTPStatusCodeFromStatus(s *status.Status) int { | ||
// See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures | ||
// to see if a code is retryable. | ||
// See https://github.com/open-telemetry/opentelemetry-proto/blob/main/docs/specification.md#failures-1 | ||
// to see a list of retryable http status codes. | ||
switch s.Code() { | ||
// Retryable | ||
case codes.Canceled, codes.DeadlineExceeded, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss: | ||
return http.StatusServiceUnavailable | ||
// Retryable | ||
case codes.ResourceExhausted: | ||
return http.StatusTooManyRequests | ||
// Not Retryable | ||
case codes.InvalidArgument: | ||
return http.StatusBadRequest | ||
// Not Retryable | ||
case codes.Unauthenticated: | ||
return http.StatusUnauthorized | ||
// Not Retryable | ||
case codes.PermissionDenied: | ||
return http.StatusForbidden | ||
// Not Retryable | ||
case codes.Unimplemented: | ||
return http.StatusNotFound | ||
// Not Retryable | ||
default: | ||
return http.StatusInternalServerError | ||
} | ||
} | ||
|
||
func NewStatusFromMsgAndHTTPCode(errMsg string, statusCode int) *status.Status { | ||
var c codes.Code | ||
// Mapping based on https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md | ||
// 429 mapping to ResourceExhausted and 400 mapping to StatusBadRequest are exceptions. | ||
switch statusCode { | ||
case http.StatusBadRequest: | ||
c = codes.InvalidArgument | ||
case http.StatusUnauthorized: | ||
c = codes.Unauthenticated | ||
case http.StatusForbidden: | ||
c = codes.PermissionDenied | ||
case http.StatusNotFound: | ||
c = codes.Unimplemented | ||
case http.StatusTooManyRequests: | ||
c = codes.ResourceExhausted | ||
case http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout: | ||
c = codes.Unavailable | ||
default: | ||
c = codes.Unknown | ||
} | ||
return status.New(c, errMsg) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#9041 includes a
Combine
function to aggregate multipleError
together. I can add that feature to this PR if it is still necessary.