Skip to content

Commit

Permalink
Add outline of stanza receiver, including structure, config, factory (#…
Browse files Browse the repository at this point in the history
…1096)

* Add outline of stanza receiver, including structure, config, factory

* Fix lint

* Add outline of tests for outlined functionality

* lint

* Removed the emitter operator from stanza receiver
  • Loading branch information
djaglowski authored Sep 24, 2020
1 parent 6782ce6 commit 7c9c588
Show file tree
Hide file tree
Showing 10 changed files with 1,645 additions and 0 deletions.
1 change: 1 addition & 0 deletions receiver/stanzareceiver/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
48 changes: 48 additions & 0 deletions receiver/stanzareceiver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Stanza Receiver

Tails and parses logs from a wide variety of sources using the [stanza](https://github.com/observIQ/stanza/tree/master/docs) log processor.

## Input Sources

Stanza supports pre-defined log sources for dozens of [specific technologies](https://github.com/observIQ/stanza-plugins/tree/master/plugins).

It can also be easily configured to tail and parse any structured or unstructured log file, Windows Event Log, and journald. It can also receive arbitrary logs via TCP and UDP.

## Required Parameters

- `pipeline` is an array of [operators](https://github.com/observIQ/stanza/blob/master/docs/README.md#what-operators-are-available). Each operator performs a simple responsibility, such as reading from a file, or parsing JSON. Chain together operators to process logs into a desired format.

## Optional Parameters

- `plugin_dir` is the path to a directory which contains `stanza` [plugins](https://github.com/observIQ/stanza/blob/master/docs/plugins.md). Plugins are parameterized pipelines that are designed for specific use cases.
- `offsets_file` is the path to a file that `stanza` will use to remember where it left off when reading from files or other persistent input sources. If specified, `stanza` will create and manage this file.

## Operator Basics

- Every operator has a `type`.
- Every operator can be given a unique `id`. If you use the same type of operator more than once in a pipeline, you must specify an `id`. Otherwise, the `id` defaults to the value of `type`.
- Operators will output to the next operator in the pipeline. The last operator in the pipeline will emit from the receiver. Optionally, the `output` parameter can be used to specify the `id` of another operator to which logs will be passed directly.

## Additional Terminology and Features

- An [entry](https://github.com/observIQ/stanza/blob/master/docs/types/entry.md) is the base representation of log data as it moves through a pipeline. All operators either create, modify, or consume entries.
- A [field](https://github.com/observIQ/stanza/blob/master/docs/types/field.md) is used to reference values in an entry.
- A common [expression](https://github.com/observIQ/stanza/blob/master/docs/types/expression.md) syntax is used in several operators. For example, expressions can be used to [filter](https://github.com/observIQ/stanza/blob/master/docs/operators/filter.md) or [route](https://github.com/observIQ/stanza/blob/master/docs/operators/router.md) entries.
- [timestamp](https://github.com/observIQ/stanza/blob/master/docs/types/timestamp.md) parsing is available as a block within all parser operators, and also as a standalone operator. Many common timestamp layouts are supported.
- [severity](https://github.com/observIQ/stanza/blob/master/docs/types/severity.md) parsing is available as a block within all parser operators, and also as a standalone operator. Stanza uses a flexible severity representation which is automatically interpreted by the stanza receiver.


## Example - Tailing a simple json file

Receiver Configuration
```yaml
receivers:
stanza:
pipeline:
- type: file_input
include: [ /var/log/myservice/*.json ]
- type: json_parser
timestamp:
parse_from: time
layout: '%Y-%m-%d %H:%M:%S'
```
28 changes: 28 additions & 0 deletions receiver/stanzareceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package stanzareceiver

import (
"github.com/observiq/stanza/pipeline"
"go.opentelemetry.io/collector/config/configmodels"
)

// Config defines configuration for the stanza receiver
type Config struct {
configmodels.ReceiverSettings `mapstructure:",squash"`
OffsetsFile string `mapstructure:"offsets_file"`
PluginDir string `mapstructure:"plugin_dir"`
Pipeline pipeline.Config `mapstructure:"pipeline"`
}
17 changes: 17 additions & 0 deletions receiver/stanzareceiver/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package stanzareceiver implements a receiver that can be used by the
// Opentelemetry collector to receive logs using the stanza log agent
package stanzareceiver
71 changes: 71 additions & 0 deletions receiver/stanzareceiver/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package stanzareceiver

import (
"context"

stanza "github.com/observiq/stanza/agent"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver/receiverhelper"
)

const (
typeStr = "stanza"
)

// NewFactory creates a factory for Stanza receiver.
func NewFactory() component.ReceiverFactory {
return receiverhelper.NewFactory(
typeStr,
createDefaultConfig,
receiverhelper.WithLogs(createLogsReceiver))
}

func createDefaultConfig() configmodels.Receiver {
return &Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: configmodels.Type(typeStr),
NameVal: typeStr,
},
}
}

// CreateLogsReceiver creates a logs receiver based on provided config
func createLogsReceiver(
ctx context.Context,
params component.ReceiverCreateParams,
cfg configmodels.Receiver,
nextConsumer consumer.LogsConsumer,
) (component.LogsReceiver, error) {

obsConfig := cfg.(*Config)

logAgent, err := stanza.NewBuilder(&stanza.Config{Pipeline: obsConfig.Pipeline}, params.Logger.Sugar()).
WithPluginDir(obsConfig.PluginDir).
WithDatabaseFile(obsConfig.OffsetsFile).
Build()
if err != nil {
return nil, err
}

return &stanzareceiver{
agent: logAgent,
consumer: nextConsumer,
logger: params.Logger,
}, nil
}
58 changes: 58 additions & 0 deletions receiver/stanzareceiver/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package stanzareceiver

import (
"context"
"os"
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/zap"
)

func TestDefaultConfig(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
require.NotNil(t, cfg, "failed to create default config")
require.NoError(t, configcheck.ValidateConfig(cfg))
}

func TestCreateReceiver(t *testing.T) {
params := component.ReceiverCreateParams{
Logger: zap.NewNop(),
}
receiver, err := createLogsReceiver(context.Background(), params, createDefaultConfig(), &mockLogsConsumer{})
require.NoError(t, err, "receiver creation failed")
require.NotNil(t, receiver, "receiver creation failed")

badCfg := createDefaultConfig().(*Config)
badCfg.OffsetsFile = os.Args[0] // current executable cannot be opened
receiver, err = createLogsReceiver(context.Background(), params, badCfg, &mockLogsConsumer{})
require.Error(t, err, "receiver creation should fail if offsets file is invalid")
require.Nil(t, receiver, "receiver creation should have failed due to invalid offsets file")
}

type mockLogsConsumer struct {
received int
}

func (m *mockLogsConsumer) ConsumeLogs(ctx context.Context, ld pdata.Logs) error {
m.received++
return nil
}
11 changes: 11 additions & 0 deletions receiver/stanzareceiver/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/stanzareceiver

go 1.14

require (
github.com/observiq/stanza v0.12.0
github.com/stretchr/testify v1.6.1
go.opentelemetry.io/collector v0.10.1-0.20200916220616-3796e60d6905
go.uber.org/zap v1.16.0
gopkg.in/yaml.v2 v2.3.0
)
Loading

0 comments on commit 7c9c588

Please sign in to comment.