From 269b1a35b8e6e1f71e5ac75622c3e1afba3dfb3e Mon Sep 17 00:00:00 2001 From: Eric Daniels Date: Thu, 16 Feb 2023 15:09:34 -0500 Subject: [PATCH] RSDK-1958 - webcam: fix double close and invalid reuse (#1883) --- components/audioinput/audio_input.go | 40 ++- .../audioinput/microphone/microphone.go | 1 + components/camera/camera.go | 49 ++- .../camera/videosource/camera_waitgroup.go | 49 --- components/camera/videosource/webcam.go | 298 ++++++++++++------ rimage/cmd/stream_camera/main.go | 2 +- 6 files changed, 281 insertions(+), 158 deletions(-) delete mode 100644 components/camera/videosource/camera_waitgroup.go diff --git a/components/audioinput/audio_input.go b/components/audioinput/audio_input.go index 92ac0130b2b..8666ec3f2f7 100644 --- a/components/audioinput/audio_input.go +++ b/components/audioinput/audio_input.go @@ -84,13 +84,38 @@ func WrapWithReconfigurable(r interface{}, name resource.Name) (resource.Reconfi if reconfigurable, ok := i.(*reconfigurableAudioInput); ok { return reconfigurable, nil } + reconfigurable := newReconfigurable(i, name) + + if mon, ok := i.(LivenessMonitor); ok { + mon.Monitor(func() { + reconfigurable.mu.Lock() + defer reconfigurable.mu.Unlock() + reconfigurable.reconfigureKnownAudioInput(newReconfigurable(i, name)) + }) + } + + return reconfigurable, nil +} + +func newReconfigurable(i AudioInput, name resource.Name) *reconfigurableAudioInput { cancelCtx, cancel := context.WithCancel(context.Background()) return &reconfigurableAudioInput{ name: name, actual: i, cancelCtx: cancelCtx, cancel: cancel, - }, nil + } +} + +// A LivenessMonitor is responsible for monitoring the liveness of an audio input. An example +// is connectivity. Since the model itself knows best about how to maintain this state, +// the reconfigurable offers a safe way to notify if a state needs to be reset due +// to some exceptional event (like a reconnect). +// It is expected that the monitoring code is tied to the lifetime of the resource +// and once the resource is closed, so should the monitor. That is, it should +// no longer send any resets once a Close on its associated resource has returned. +type LivenessMonitor interface { + Monitor(notifyReset func()) } var ( @@ -301,12 +326,17 @@ func (i *reconfigurableAudioInput) Reconfigure(ctx context.Context, newAudioInpu if err := viamutils.TryClose(ctx, i.actual); err != nil { golog.Global().Errorw("error closing old", "error", err) } + i.reconfigureKnownAudioInput(actual) + return nil +} + +// assumes lock is held. +func (i *reconfigurableAudioInput) reconfigureKnownAudioInput(newAudioInput *reconfigurableAudioInput) { i.cancel() // reset - i.actual = actual.actual - i.cancelCtx = actual.cancelCtx - i.cancel = actual.cancel - return nil + i.actual = newAudioInput.actual + i.cancelCtx = newAudioInput.cancelCtx + i.cancel = newAudioInput.cancel } // UpdateAction helps hint the reconfiguration process on what strategy to use given a modified config. diff --git a/components/audioinput/microphone/microphone.go b/components/audioinput/microphone/microphone.go index 04aa88f3638..906d29ffc03 100644 --- a/components/audioinput/microphone/microphone.go +++ b/components/audioinput/microphone/microphone.go @@ -121,5 +121,6 @@ func tryMicrophoneOpen( if err != nil { return nil, err } + // TODO(XXX): implement LivenessMonitor return audioinput.NewFromSource(source) } diff --git a/components/camera/camera.go b/components/camera/camera.go index 6d70eb73f8f..df734e0f155 100644 --- a/components/camera/camera.go +++ b/components/camera/camera.go @@ -315,13 +315,38 @@ func WrapWithReconfigurable(r interface{}, name resource.Name) (resource.Reconfi if reconfigurable, ok := c.(*reconfigurableCamera); ok { return reconfigurable, nil } + reconfigurable := newReconfigurable(c, name) + + if mon, ok := c.(LivenessMonitor); ok { + mon.Monitor(func() { + reconfigurable.mu.Lock() + defer reconfigurable.mu.Unlock() + reconfigurable.reconfigureKnownCamera(newReconfigurable(c, name)) + }) + } + + return reconfigurable, nil +} + +func newReconfigurable(c Camera, name resource.Name) *reconfigurableCamera { cancelCtx, cancel := context.WithCancel(context.Background()) return &reconfigurableCamera{ name: name, actual: c, cancelCtx: cancelCtx, cancel: cancel, - }, nil + } +} + +// A LivenessMonitor is responsible for monitoring the liveness of a camera. An example +// is connectivity. Since the model itself knows best about how to maintain this state, +// the reconfigurable offers a safe way to notify if a state needs to be reset due +// to some exceptional event (like a reconnect). +// It is expected that the monitoring code is tied to the lifetime of the resource +// and once the resource is closed, so should the monitor. That is, it should +// no longer send any resets once a Close on its associated resource has returned. +type LivenessMonitor interface { + Monitor(notifyReset func()) } var ( @@ -390,6 +415,7 @@ func (c *reconfigurableCamera) Stream( stream := &reconfigurableCameraStream{ c: c, errHandlers: errHandlers, + cancelCtx: c.cancelCtx, } stream.mu.Lock() defer stream.mu.Unlock() @@ -405,6 +431,7 @@ type reconfigurableCameraStream struct { c *reconfigurableCamera errHandlers []gostream.ErrorHandler stream gostream.VideoStream + cancelCtx context.Context } func (cs *reconfigurableCameraStream) init(ctx context.Context) error { @@ -417,7 +444,7 @@ func (cs *reconfigurableCameraStream) Next(ctx context.Context) (image.Image, fu cs.mu.Lock() defer cs.mu.Unlock() - if cs.stream == nil || cs.c.cancelCtx.Err() != nil { + if cs.stream == nil || cs.cancelCtx.Err() != nil { if err := func() error { cs.c.mu.Lock() defer cs.c.mu.Unlock() @@ -470,19 +497,27 @@ func (c *reconfigurableCamera) DoCommand(ctx context.Context, cmd map[string]int } // Reconfigure reconfigures the resource. -func (c *reconfigurableCamera) Reconfigure(_ context.Context, newCamera resource.Reconfigurable) error { +func (c *reconfigurableCamera) Reconfigure(ctx context.Context, newCamera resource.Reconfigurable) error { c.mu.Lock() defer c.mu.Unlock() actual, ok := newCamera.(*reconfigurableCamera) if !ok { return utils.NewUnexpectedTypeError(c, newCamera) } + if err := viamutils.TryClose(ctx, c.actual); err != nil { + golog.Global().Errorw("error closing old", "error", err) + } + c.reconfigureKnownCamera(actual) + return nil +} + +// assumes lock is held. +func (c *reconfigurableCamera) reconfigureKnownCamera(newCamera *reconfigurableCamera) { c.cancel() // reset - c.actual = actual.actual - c.cancelCtx = actual.cancelCtx - c.cancel = actual.cancel - return nil + c.actual = newCamera.actual + c.cancelCtx = newCamera.cancelCtx + c.cancel = newCamera.cancel } // UpdateAction helps hint the reconfiguration process on what strategy to use given a modified config. diff --git a/components/camera/videosource/camera_waitgroup.go b/components/camera/videosource/camera_waitgroup.go deleted file mode 100644 index edd3c3b4852..00000000000 --- a/components/camera/videosource/camera_waitgroup.go +++ /dev/null @@ -1,49 +0,0 @@ -package videosource - -import ( - "context" - "sync" - - "github.com/edaniels/gostream" - - "go.viam.com/rdk/components/camera" - "go.viam.com/rdk/pointcloud" - "go.viam.com/rdk/rimage/transform" -) - -// CameraWaitGroup is a wrapper for camera.Camera with a sync.WaitGroup. -type CameraWaitGroup struct { - cam camera.Camera - activeBackgroundWorkers sync.WaitGroup -} - -// DoCommand wraps camera.Camera.DoCommand. -func (c *CameraWaitGroup) DoCommand(ctx context.Context, cmd map[string]interface{}) (map[string]interface{}, error) { - return c.cam.DoCommand(ctx, cmd) -} - -// Projector wraps camera.Camera.Projector. -func (c *CameraWaitGroup) Projector(ctx context.Context) (transform.Projector, error) { - return c.cam.Projector(ctx) -} - -// Stream wraps camera.Camera.Stream. -func (c *CameraWaitGroup) Stream(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) { - return c.cam.Stream(ctx, errHandlers...) -} - -// NextPointCloud wraps camera.Camera.NextPointCloud. -func (c *CameraWaitGroup) NextPointCloud(ctx context.Context) (pointcloud.PointCloud, error) { - return c.cam.NextPointCloud(ctx) -} - -// Properties wraps camera.Camera.Properties. -func (c *CameraWaitGroup) Properties(ctx context.Context) (camera.Properties, error) { - return c.cam.Properties(ctx) -} - -// Close calls WaitGroup.Wait before calling camera.Camera.Close. -func (c *CameraWaitGroup) Close(ctx context.Context) error { - c.activeBackgroundWorkers.Wait() - return c.cam.Close(ctx) -} diff --git a/components/camera/videosource/webcam.go b/components/camera/videosource/webcam.go index 9f5ffbc41f6..a6bc2bf79b7 100644 --- a/components/camera/videosource/webcam.go +++ b/components/camera/videosource/webcam.go @@ -5,6 +5,7 @@ import ( "image" "path/filepath" "strings" + "sync" "time" "github.com/edaniels/golog" @@ -21,6 +22,7 @@ import ( "go.viam.com/rdk/components/camera" "go.viam.com/rdk/config" "go.viam.com/rdk/discovery" + "go.viam.com/rdk/pointcloud" "go.viam.com/rdk/registry" "go.viam.com/rdk/resource" "go.viam.com/rdk/rimage/transform" @@ -43,7 +45,7 @@ func init() { if !ok { return nil, utils.NewUnexpectedTypeError(attrs, config.ConvertedAttributes) } - return NewWebcamSource(ctx, attrs, logger) + return NewWebcamSource(ctx, config.Name, attrs, logger) }}) config.RegisterComponentAttributeMapConverter(camera.Subtype, model, @@ -184,35 +186,32 @@ func makeConstraints(attrs *WebcamAttrs, debug bool, logger golog.Logger) mediad } } -// findCamera finds a video device and returns a reconfigurable camera with that video device as the source. -func findCamera( +// findAndMakeCamera finds a video device and returns a camera with that video device as the source. +func findAndMakeCamera( ctx context.Context, attrs *WebcamAttrs, label string, logger golog.Logger, -) (resource.Reconfigurable, error) { - var cam camera.Camera - var err error - +) (camera.Camera, error) { debug := attrs.Debug constraints := makeConstraints(attrs, debug, logger) if label != "" { - cam, err = tryWebcamOpen(ctx, attrs, label, false, constraints, logger) + cam, err := tryWebcamOpen(ctx, attrs, label, false, constraints, logger) if err != nil { return nil, errors.Wrap(err, "cannot open webcam") } - } else { - source, err := gostream.GetAnyVideoSource(constraints, logger) - if err != nil { - return nil, errors.Wrap(err, "found no webcams") - } - cam, err = makeCameraFromSource(ctx, source, attrs) - if err != nil { - return nil, errors.Wrap(err, "cannot make webcam from source") - } + return cam, nil } - return camera.WrapWithReconfigurable(cam, camera.Named(model.String())) + source, err := gostream.GetAnyVideoSource(constraints, logger) + if err != nil { + return nil, errors.Wrap(err, "found no webcams") + } + cam, err := makeCameraFromSource(ctx, source, attrs) + if err != nil { + return nil, errors.Wrap(err, "cannot make webcam from source") + } + return cam, nil } // getLabelFromCameraOrPath returns the path from the camera or an empty string if a path is not found. @@ -232,93 +231,29 @@ func getLabelFromCamera(cam camera.Camera, logger golog.Logger) string { return labels[0] } -// isConnected returns true if the reconfigurable camera is connected, otherwise false. -func isConnected(reconfCam resource.Reconfigurable) (bool, error) { - actualCam := utils.UnwrapProxy(reconfCam).(camera.Camera) - src, err := camera.SourceFromCamera(actualCam) - if err != nil { - return true, errors.Wrap(err, "cannot get source from camera") - } - props, err := gostream.PropertiesFromMediaSource[image.Image, prop.Video](src) - if err != nil { - return true, errors.Wrap(err, "cannot get properties from media source") - } - // github.com/pion/mediadevices connects to the OS to get the props for a driver. On disconnect props will be empty. - return len(props) != 0, nil -} - -// reconfigureCamera creates a new camera and reconfigures the given reconfigurable camera. -func reconfigureCamera( - ctx context.Context, - oldCam resource.Reconfigurable, - attrs *WebcamAttrs, - label string, - logger golog.Logger, -) (err error) { - goutils.UncheckedError(goutils.TryClose(ctx, oldCam)) - logger.Debugw("camera disconnected", "label", label) - - newCam, err := findCamera(ctx, attrs, label, logger) - defer func() { - if err != nil { - goutils.UncheckedError(goutils.TryClose(ctx, newCam)) - } - }() - if err != nil { - return errors.Wrap(err, "cannot make camera") - } - - if err = oldCam.Reconfigure(ctx, newCam); err != nil { - return errors.Wrap(err, "cannot reconfigure camera") - } - return -} - // NewWebcamSource returns a new source based on a webcam discovered from the given attributes. -func NewWebcamSource(ctx context.Context, attrs *WebcamAttrs, logger golog.Logger) (camera.Camera, error) { - cam, err := findCamera(ctx, attrs, attrs.Path, logger) +func NewWebcamSource(ctx context.Context, name string, attrs *WebcamAttrs, logger golog.Logger) (camera.Camera, error) { + cam, err := findAndMakeCamera(ctx, attrs, attrs.Path, logger) if err != nil { return nil, errors.Wrap(err, "cannot find video source for camera") } + logger = logger.With("camera_name", name) label := attrs.Path if label == "" { - label = getLabelFromCamera(utils.UnwrapProxy(cam).(camera.Camera), logger) + label = getLabelFromCamera(cam, logger) + logger = logger.With("camera_label", label) } - const wait = 500 * time.Millisecond - camWg := CameraWaitGroup{cam: cam.(camera.Camera)} - camWg.activeBackgroundWorkers.Add(1) - goutils.ManagedGo(func() { - defer goutils.UncheckedError(goutils.TryClose(ctx, cam)) - for { - if !goutils.SelectContextOrWait(ctx, wait) { - return - } - - ok, err := isConnected(cam) - if err != nil { - logger.Debugw("cannot determine camera status", "error", err) - continue - } - - if !ok { - for { - if !goutils.SelectContextOrWait(ctx, wait) { - return - } - if err := reconfigureCamera(ctx, cam, attrs, label, logger); err != nil { - logger.Debugw("cannot reconfigure camera", "label", label, "error", err) - continue - } - logger.Debugw("camera connected", "label", label) - break - } - } - } - }, camWg.activeBackgroundWorkers.Done) - - return &camWg, nil + cancelCtx, cancel := context.WithCancel(context.Background()) + return &monitoredWebcam{ + cam: cam, + label: label, + attrs: attrs, + cancelCtx: cancelCtx, + cancel: cancel, + logger: logger, + }, nil } // tryWebcamOpen uses getNamedVideoSource to try and find a video device (gostream.MediaSource). @@ -387,3 +322,174 @@ func getNamedVideoSource( } return gostream.GetNamedVideoSource(filepath.Base(path), constraints, logger) } + +var _ = camera.LivenessMonitor(&monitoredWebcam{}) + +// monitoredWebcam tries to ensure its underlying camera stays connected. +type monitoredWebcam struct { + mu sync.RWMutex + cam camera.Camera + label string + attrs *WebcamAttrs + + cancelCtx context.Context + cancel func() + closed bool + disconnected bool + activeBackgroundWorkers sync.WaitGroup + logger golog.Logger +} + +func (c *monitoredWebcam) isCameraConnected() (bool, error) { + c.mu.RLock() + defer c.mu.RUnlock() + src, err := camera.SourceFromCamera(c.cam) + if err != nil { + return true, errors.Wrap(err, "cannot get source from camera") + } + props, err := gostream.PropertiesFromMediaSource[image.Image, prop.Video](src) + if err != nil { + return true, errors.Wrap(err, "cannot get properties from media source") + } + // github.com/pion/mediadevices connects to the OS to get the props for a driver. On disconnect props will be empty. + // TODO(RSDK-1959): this only works for linux + return len(props) != 0, nil +} + +func (c *monitoredWebcam) reconnectCamera(notifyReset func()) error { + c.logger.Debug("closing disconnected camera") + if err := c.cam.Close(c.cancelCtx); err != nil { + c.logger.Errorw("failed to close disconnected camera", "error", err) + } + + newCam, err := findAndMakeCamera(c.cancelCtx, c.attrs, c.label, c.logger) + if err != nil { + return errors.Wrap(err, "failed to find camera") + } + + c.mu.Lock() + c.cam = newCam + c.disconnected = false + c.closed = false + c.mu.Unlock() + + notifyReset() + return nil +} + +func (c *monitoredWebcam) Monitor(notifyReset func()) { + const wait = 500 * time.Millisecond + c.activeBackgroundWorkers.Add(1) + + goutils.ManagedGo(func() { + for { + if !goutils.SelectContextOrWait(c.cancelCtx, wait) { + return + } + + ok, err := c.isCameraConnected() + if err != nil { + c.logger.Debugw("cannot determine camera status", "error", err) + continue + } + + if !ok { + c.mu.Lock() + c.disconnected = true + c.mu.Unlock() + + c.logger.Error("camera no longer connected; reconnecting") + for { + if !goutils.SelectContextOrWait(c.cancelCtx, wait) { + return + } + if err := c.reconnectCamera(notifyReset); err != nil { + c.logger.Errorw("failed to reconnect camera", "error", err) + continue + } + c.logger.Infow("camera reconnected") + break + } + } + } + }, c.activeBackgroundWorkers.Done) +} + +func (c *monitoredWebcam) DoCommand(ctx context.Context, cmd map[string]interface{}) (map[string]interface{}, error) { + c.mu.RLock() + defer c.mu.RUnlock() + if err := c.ensureActive(); err != nil { + return nil, err + } + return c.cam.DoCommand(ctx, cmd) +} + +func (c *monitoredWebcam) Projector(ctx context.Context) (transform.Projector, error) { + c.mu.RLock() + defer c.mu.RUnlock() + if err := c.ensureActive(); err != nil { + return nil, err + } + return c.cam.Projector(ctx) +} + +func (c *monitoredWebcam) Stream(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) { + c.mu.RLock() + defer c.mu.RUnlock() + if err := c.ensureActive(); err != nil { + return nil, err + } + return c.cam.Stream(ctx, errHandlers...) +} + +func (c *monitoredWebcam) NextPointCloud(ctx context.Context) (pointcloud.PointCloud, error) { + c.mu.RLock() + defer c.mu.RUnlock() + if err := c.ensureActive(); err != nil { + return nil, err + } + return c.cam.NextPointCloud(ctx) +} + +func (c *monitoredWebcam) Properties(ctx context.Context) (camera.Properties, error) { + c.mu.RLock() + defer c.mu.RUnlock() + if err := c.ensureActive(); err != nil { + return camera.Properties{}, err + } + return c.cam.Properties(ctx) +} + +var ( + errClosed = errors.New("camera has been closed") + errDisconnected = errors.New("camera is disconnected; please try again in a few moments") +) + +func (c *monitoredWebcam) ensureActive() error { + if c.closed { + return errClosed + } + if c.disconnected { + return errDisconnected + } + return nil +} + +func (c *monitoredWebcam) Close(ctx context.Context) error { + c.mu.Lock() + if c.closed { + c.mu.Unlock() + return errors.New("webcam already closed") + } + c.closed = true + c.mu.Unlock() + c.cancel() + c.activeBackgroundWorkers.Wait() + return c.cam.Close(ctx) +} + +func (c *monitoredWebcam) ProxyFor() interface{} { + c.mu.RLock() + defer c.mu.RUnlock() + return c.cam +} diff --git a/rimage/cmd/stream_camera/main.go b/rimage/cmd/stream_camera/main.go index db3d584fdef..ab8941b72a1 100644 --- a/rimage/cmd/stream_camera/main.go +++ b/rimage/cmd/stream_camera/main.go @@ -87,7 +87,7 @@ func mainWithArgs(ctx context.Context, args []string, logger golog.Logger) error } func viewCamera(ctx context.Context, attrs videosource.WebcamAttrs, port int, debug bool, logger golog.Logger) error { - webcam, err := videosource.NewWebcamSource(ctx, &attrs, logger) + webcam, err := videosource.NewWebcamSource(ctx, "camera", &attrs, logger) if err != nil { return err }