diff --git a/pkg/workceptor/kubernetes.go b/pkg/workceptor/kubernetes.go index ce094303c..c9c5ec679 100644 --- a/pkg/workceptor/kubernetes.go +++ b/pkg/workceptor/kubernetes.go @@ -229,6 +229,24 @@ func podRunningAndReady() func(event watch.Event) (bool, error) { return inner } +func GetTimeoutOpenLogstream(kw *KubeUnit) int { + // RECEPTOR_OPEN_LOGSTREAM_TIMEOUT + // default: 1 + openLogStreamTimeout := 1 + envTimeout := os.Getenv("RECEPTOR_OPEN_LOGSTREAM_TIMEOUT") + if envTimeout != "" { + var err error + openLogStreamTimeout, err = strconv.Atoi(envTimeout) + if err != nil { + // ignore error, use default + kw.GetWorkceptor().nc.GetLogger().Warning("Invalid value for RECEPTOR_OPEN_LOGSTREAM_TIMEOUT: %s. Ignoring", envTimeout) + openLogStreamTimeout = 1 + } + } + kw.GetWorkceptor().nc.GetLogger().Debug("RECEPTOR_OPEN_LOGSTREAM_TIMEOUT: %d", openLogStreamTimeout) + return openLogStreamTimeout +} + func (kw *KubeUnit) kubeLoggingConnectionHandler(timestamps bool, sinceTime time.Time) (io.ReadCloser, error) { var logStream io.ReadCloser var err error @@ -257,7 +275,7 @@ func (kw *KubeUnit) kubeLoggingConnectionHandler(timestamps bool, sinceTime time retries, err, ) - time.Sleep(time.Second) + time.Sleep(time.Duration(GetTimeoutOpenLogstream(kw)) * time.Second) } if err != nil { errMsg := fmt.Sprintf("Error opening log stream for pod %s/%s. Error: %s", podNamespace, podName, err) diff --git a/pkg/workceptor/kubernetes_test.go b/pkg/workceptor/kubernetes_test.go index d42f11940..fccfd9a88 100644 --- a/pkg/workceptor/kubernetes_test.go +++ b/pkg/workceptor/kubernetes_test.go @@ -101,6 +101,52 @@ func TestShouldUseReconnect(t *testing.T) { } } +func TestGetTimeoutOpenLogstream(t *testing.T) { + const envVariable string = "RECEPTOR_OPEN_LOGSTREAM_TIMEOUT" + + kw, err := startNetceptorNodeWithWorkceptor() + if err != nil { + t.Fatal(err) + } + + tests := []struct { + name string + envValue string + want int + }{ + { + name: "No env value set", + envValue: "", + want: 1, + }, + { + name: "Env value set incorrectly", + envValue: "text instead of int", + want: 1, + }, + { + name: "Env value set correctly", + envValue: "2", + want: 2, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.envValue != "" { + os.Setenv(envVariable, tt.envValue) + defer os.Unsetenv(envVariable) + } else { + os.Unsetenv(envVariable) + } + + if got := workceptor.GetTimeoutOpenLogstream(kw); got != tt.want { + t.Errorf("GetTimeoutOpenLogstream() = %v, want %v", got, tt.want) + } + }) + } + +} + func TestParseTime(t *testing.T) { type args struct { s string