Skip to content

Commit

Permalink
psnotify - process event notifications
Browse files Browse the repository at this point in the history
Change-Id: I79a1265b726f09fe2d404e1e19ae14c930305892
  • Loading branch information
dougm committed Jun 20, 2012
1 parent 35d6c1b commit 845272a
Show file tree
Hide file tree
Showing 6 changed files with 825 additions and 0 deletions.
9 changes: 9 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Copyright (c) [2009-2011] VMware, Inc. All Rights Reserved.

This product is licensed to you under the Apache License, Version 2.0 (the "License").
You may not use this product except in compliance with the License.

This product includes a number of subcomponents with
separate copyright notices and license terms. Your use of these
subcomponents is subject to the terms and conditions of the
subcomponent's license, as noted in the LICENSE file.
50 changes: 50 additions & 0 deletions psnotify/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Process notifications for Go

## Overview

The psnotify package captures process events from the kernel via
kqueue on Darwin/BSD and the netlink connector on Linux.

The psnotify API is similar to the
[fsnotify](https://github.com/howeyc/fsnotify) package.

Example:
```go
watcher, err := psnotify.NewWatcher()
if err != nil {
log.Fatal(err)
}

// Process events
go func() {
for {
select {
case ev := <-watcher.Fork:
log.Println("fork event:", ev)
case ev := <-watcher.Exec:
log.Println("exec event:", ev)
case ev := <-watcher.Exit:
log.Println("exit event:", ev)
case err := <-watcher.Error:
log.Println("error:", err)
}
}
}()

err = watcher.Watch(os.Getpid(), psnotify.PROC_EVENT_ALL)
if err != nil {
log.Fatal(err)
}

/* ... do stuff ... */
watcher.Close()
```

## Supported platforms

Currently targeting modern flavors of Darwin and Linux.
Should work on BSD, but untested.

## License

Apache 2.0
136 changes: 136 additions & 0 deletions psnotify/psnotify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright (c) 2012 VMware, Inc.

package psnotify

import (
"errors"
"fmt"
)

type ProcEventFork struct {
ParentPid int // Pid of the process that called fork()
ChildPid int // Child process pid created by fork()
}

type ProcEventExec struct {
Pid int // Pid of the process that called exec()
}

type ProcEventExit struct {
Pid int // Pid of the process that called exit()
}

type watch struct {
flags uint32 // Saved value of Watch() flags param
}

type eventListener interface {
close() error // Watch.Close() closes the OS specific listener
}

type Watcher struct {
listener eventListener // OS specifics (kqueue or netlink)
watches map[int]*watch // Map of watched process ids
Error chan error // Errors are sent on this channel
Fork chan *ProcEventFork // Fork events are sent on this channel
Exec chan *ProcEventExec // Exec events are sent on this channel
Exit chan *ProcEventExit // Exit events are sent on this channel
done chan bool // Used to stop the readEvents() goroutine
isClosed bool // Set to true when Close() is first called
}

// Initialize event listener and channels
func NewWatcher() (*Watcher, error) {
listener, err := createListener()

if err != nil {
return nil, err
}

w := &Watcher{
listener: listener,
watches: make(map[int]*watch),
Fork: make(chan *ProcEventFork),
Exec: make(chan *ProcEventExec),
Exit: make(chan *ProcEventExit),
Error: make(chan error),
done: make(chan bool, 1),
}

go w.readEvents()
return w, nil
}

// Close event channels when done message is received
func (w *Watcher) finish() {
close(w.Fork)
close(w.Exec)
close(w.Exit)
close(w.Error)
}

// Closes the OS specific event listener,
// removes all watches and closes all event channels.
func (w *Watcher) Close() error {
if w.isClosed {
return nil
}
w.isClosed = true

for pid := range w.watches {
w.RemoveWatch(pid)
}

w.done <- true

w.listener.close()

return nil
}

// Add pid to the watched process set.
// The flags param is a bitmask of process events to capture,
// must be one or more of: PROC_EVENT_FORK, PROC_EVENT_EXEC, PROC_EVENT_EXIT
func (w *Watcher) Watch(pid int, flags uint32) error {
if w.isClosed {
return errors.New("psnotify watcher is closed")
}

watchEntry, found := w.watches[pid]

if found {
watchEntry.flags |= flags
} else {
if err := w.register(pid, flags); err != nil {
return err
}
w.watches[pid] = &watch{flags: flags}
}

return nil
}

// Remove pid from the watched process set.
func (w *Watcher) RemoveWatch(pid int) error {
_, ok := w.watches[pid]
if !ok {
msg := fmt.Sprintf("watch for pid=%d does not exist", pid)
return errors.New(msg)
}
delete(w.watches, pid)
return w.unregister(pid)
}

// Internal helper to check if there is a message on the "done" channel.
// The "done" message is sent by the Close() method; when received here,
// the Watcher.finish method is called to close all channels and return
// true - in which case the caller should break from the readEvents loop.
func (w *Watcher) isDone() bool {
var done bool
select {
case done = <-w.done:
w.finish()
default:
}
return done
}
93 changes: 93 additions & 0 deletions psnotify/psnotify_bsd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright (c) 2012 VMware, Inc.

// +build darwin freebsd netbsd openbsd

// Go interface to BSD kqueue process events.
package psnotify

import (
"syscall"
)

const (
// Flags (from <sys/event.h>)
PROC_EVENT_FORK = syscall.NOTE_FORK // fork() events
PROC_EVENT_EXEC = syscall.NOTE_EXEC // exec() events
PROC_EVENT_EXIT = syscall.NOTE_EXIT // exit() events

// Watch for all process events
PROC_EVENT_ALL = PROC_EVENT_FORK | PROC_EVENT_EXEC | PROC_EVENT_EXIT
)

type kqueueListener struct {
kq int // The syscall.Kqueue() file descriptor
buf [1]syscall.Kevent_t // An event buffer for Add/Remove watch
}

// Initialize bsd implementation of the eventListener interface
func createListener() (eventListener, error) {
listener := &kqueueListener{}
kq, err := syscall.Kqueue()
listener.kq = kq
return listener, err
}

// Initialize Kevent_t fields and propagate changelist for the given pid
func (w *Watcher) kevent(pid int, fflags uint32, flags int) error {
listener, _ := w.listener.(*kqueueListener)
event := &listener.buf[0]

syscall.SetKevent(event, pid, syscall.EVFILT_PROC, flags)
event.Fflags = fflags

_, err := syscall.Kevent(listener.kq, listener.buf[:], nil, nil)

return err
}

// Delete filter for given pid from the queue
func (w *Watcher) unregister(pid int) error {
return w.kevent(pid, 0, syscall.EV_DELETE)
}

// Add and enable filter for given pid in the queue
func (w *Watcher) register(pid int, flags uint32) error {
return w.kevent(pid, flags, syscall.EV_ADD|syscall.EV_ENABLE)
}

// Poll the kqueue file descriptor and dispatch to the Event channels
func (w *Watcher) readEvents() {
listener, _ := w.listener.(*kqueueListener)
events := make([]syscall.Kevent_t, 10)

for {
if w.isDone() {
return
}

n, err := syscall.Kevent(listener.kq, nil, events, nil)
if err != nil {
w.Error <- err
continue
}

for _, ev := range events[:n] {
pid := int(ev.Ident)

switch ev.Fflags {
case syscall.NOTE_FORK:
w.Fork <- &ProcEventFork{ParentPid: pid}
case syscall.NOTE_EXEC:
w.Exec <- &ProcEventExec{Pid: pid}
case syscall.NOTE_EXIT:
w.RemoveWatch(pid)
w.Exit <- &ProcEventExit{Pid: pid}
}
}
}
}

// Close our kqueue file descriptor; deletes any remaining filters
func (listener *kqueueListener) close() error {
return syscall.Close(listener.kq)
}
Loading

0 comments on commit 845272a

Please sign in to comment.