Skip to content

Commit

Permalink
json adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
progrium committed Mar 27, 2015
1 parent b9e5a1f commit 415eab3
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 4 deletions.
139 changes: 139 additions & 0 deletions adapters/json/json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package json

import (
"encoding/json"
"errors"
"log"
"net"
"os"
"strconv"
"strings"
"text/template"

"github.com/gliderlabs/logspout/adapters/syslog"
"github.com/gliderlabs/logspout/router"
)

var configDefaults = map[string]string{
"JSON_FIELDS": "time:uint,message,docker.hostname,docker.image",
"JSON_TIME": "{{.Time.Unix}}",
"JSON_MESSAGE": "{{.Data}}",
"JSON_SOURCE": "{{.Source}}",
"JSON_DOCKER_HOSTNAME": "{{.Container.Config.Hostname}}",
"JSON_DOCKER_IMAGE": "{{.Container.Config.Image}}",
"JSON_DOCKER_ID": "{{.Container.ID}}",
"JSON_DOCKER_NAME": "{{.ContainerName}}",
}

func init() {
router.AdapterFactories.Register(NewJSONAdapter, "json")
}

func getopt(name string) string {
value := os.Getenv(name)
if value == "" {
value = configDefaults[name]
}
return value
}

type JSONAdapter struct {
conn net.Conn
route *router.Route
tmpl *template.Template
types map[string]string
}

func NewJSONAdapter(route *router.Route) (router.LogAdapter, error) {
transport, found := router.AdapterTransports.Lookup(route.AdapterTransport("udp"))
if !found {
return nil, errors.New("unable to find adapter: " + route.Adapter)
}
conn, err := transport.Dial(route.Address, route.Options)
if err != nil {
return nil, err
}

fields := strings.Split(getopt("JSON_FIELDS"), ",")
types := make(map[string]string)
var values []string
for _, field := range fields {
parts := strings.Split(field, ":")
if len(parts) > 1 {
types[parts[0]] = parts[1]
}
config := "JSON_" + strings.ToUpper(strings.Replace(parts[0], ".", "_", -1))
values = append(values, parts[0]+":"+getopt(config))
}
tmplStr := strings.Join(values, "\x00")
tmpl, err := template.New("prejson").Parse(tmplStr)
if err != nil {
return nil, err
}

return &JSONAdapter{
route: route,
conn: conn,
tmpl: tmpl,
types: types,
}, nil
}

func (a *JSONAdapter) Stream(logstream chan *router.Message) {
defer a.route.Close()
for message := range logstream {
m := syslog.NewSyslogMessage(message, a.conn)
buf, err := m.Render(a.tmpl)
if err != nil {
log.Println("json:", err)
return
}
data, err := json.Marshal(buildMap(buf.String(), a.types))
if err != nil {
log.Println("json:", err)
return
}
_, err = a.conn.Write(data)
if err != nil {
log.Println("json:", err)
return
}
}
}

func buildMap(input string, types map[string]string) map[string]interface{} {
m := make(map[string]interface{})
fields := strings.Split(input, "\x00")
for _, field := range fields {
kvp := strings.SplitN(field, ":", 2)
keys := strings.Split(kvp[0], ".")
mm := m
if len(keys) > 1 {
for _, key := range keys[:len(keys)-1] {
if mm[key] == nil {
mm[key] = make(map[string]interface{})
}
mm = mm[key].(map[string]interface{})
}
}
var value interface{}
var err error
switch types[field] {
case "uint":
value, err = strconv.ParseUint(kvp[1], 10, 64)
case "int":
value, err = strconv.ParseInt(kvp[1], 10, 64)
case "float":
value, err = strconv.ParseFloat(kvp[1], 64)
case "bool":
value, err = strconv.ParseBool(kvp[1])
case "":
value = kvp[1]
}
if err != nil {
value = nil
}
mm[keys[len(keys)-1]] = value
}
return m
}
12 changes: 8 additions & 4 deletions adapters/syslog/syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ type SyslogAdapter struct {
func (a *SyslogAdapter) Stream(logstream chan *router.Message) {
defer a.route.Close()
for message := range logstream {
m := &SyslogMessage{message, a.conn}
m := NewSyslogMessage(message, a.conn)
buf, err := m.Render(a.tmpl)
if err != nil {
log.Println("syslog:", err)
return
}
_, err = a.conn.Write(buf)
_, err = a.conn.Write(buf.Bytes())
if err != nil {
log.Println("syslog:", err)
return
Expand All @@ -100,13 +100,17 @@ type SyslogMessage struct {
conn net.Conn
}

func (m *SyslogMessage) Render(tmpl *template.Template) ([]byte, error) {
func NewSyslogMessage(message *router.Message, conn net.Conn) *SyslogMessage {
return &SyslogMessage{message, conn}
}

func (m *SyslogMessage) Render(tmpl *template.Template) (*bytes.Buffer, error) {
buf := new(bytes.Buffer)
err := tmpl.Execute(buf, m)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
return buf, nil
}

func (m *SyslogMessage) Priority() syslog.Priority {
Expand Down
1 change: 1 addition & 0 deletions modules.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
_ "github.com/gliderlabs/logspout/adapters/json"
_ "github.com/gliderlabs/logspout/adapters/raw"
_ "github.com/gliderlabs/logspout/adapters/syslog"
_ "github.com/gliderlabs/logspout/httpstream"
Expand Down

0 comments on commit 415eab3

Please sign in to comment.