-
Notifications
You must be signed in to change notification settings - Fork 126
RSDK-1959 — Refactor webcam to fix worker lifecycle bugs and to support hot unplug/replug for darwin #5492
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| c.buffer.worker.Stop() // Calling this before locking shuts down the goroutines, and allows stopBuffer() to handle rest of the shutdown. | ||
|
|
||
| // Stop the driver and frame buffer worker | ||
| c.mu.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason this shouldn't be an AlwaysRebuild resource? Seems like that may simplify things but lmk if im missing something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm 90% sure it could be one. Would just be a rather big change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| // and once the resource is closed, so should the monitor. That is, it should | ||
| // no longer send any resets once a Close on its associated resource has returned. | ||
| func (c *webcam) Monitor() { | ||
| // startMonitorWorker starts a worker that monitors camera connectivity and handles reconnection. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you be more precise here now how this monitors connectivity and how it handles reconnection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| c.mu.Unlock() | ||
| continue | ||
| } | ||
| c.buffer.frame = img |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make sure I understand the model here:
- There's a 1:1 mapping fro phisical webcam to
webcaminstance - Each webcam instance hold its own buffer
- If there are 2 webcams connected, each will have its own dedicated
webcaminstance - So each will write to its own buffer
- Each
webcaminstance nows from where to read thanks to thetargetPathparameters. - i.e. if we have to
webcaminstances both pointing to the sametargetPath, there will be contention on the physical camera resource.
Am I right with all of this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a 1:1 mapping fro phisical webcam to webcam instance
Yes, should be
Each webcam instance hold its own buffer
Yes, no shared buffers across resources
If there are 2 webcams connected, each will have its own dedicated webcam instance
Should have if configured properly.
So each will write to its own buffer
Yes
Each webcam instance knows from where to read thanks to the targetPath parameters
Yes
i.e. if we have to webcam instances both pointing to the same targetPath, there will be contention on the physical camera resource.
I'm not sure about this question. Let me dive a bit deeper on the consequences of setting the same targetPath for two resources tomorrow. I will get back to you on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another question.
Each webcam instance runs in a dedicated thread, and each of them in turn spawns new workers (i.e. goroutines) (monitoring and buffer workers), right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each webcam instance runs in a dedicated thread, and each of them in turn spawns new workers (i.e. goroutines) (monitoring and buffer workers), right?
Yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just tested pointing two cameras to the same video path on mac and linux, and the second resource, later to the punch than the first resouce, will get a device busy error and not be able to connect. So to answer your question @SebastianMunozP, it is not possible to stream from the same video path for two different resources. On linux, it is safe to do so as well because the linux kernel has a lock per video device: https://github.com/torvalds/linux/blob/0048fbb4011ec55c32d3148b2cda56433f273375/drivers/media/v4l2-core/v4l2-dev.c#L417
However, for mac, their code is close sourced, so I do not know if it is safe.
| func (c *webcam) Images(_ context.Context, _ []string, _ map[string]interface{}) ([]camera.NamedImage, resource.ResponseMetadata, error) { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
| if err := c.ensureActive(); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a little comment here on why this is failing?, i.e. what makes ensureActive() be false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added comments above disconnected and closed, the members that ensureActive checks
| // It must be protected by the mutex in the webcam struct. | ||
| type webcamBuffer struct { | ||
| frame image.Image | ||
| release func() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a little explanation here on what release function does and how and who sets it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
seanavery
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice improvements!!
Do you think we should add release behavior tests in webcam_tests.go?
| constraint.FrameRate = prop.FloatRanged{Min: 0.0, Ideal: 30.0, Max: 140.0} | ||
| } | ||
|
|
||
| if conf.Format == "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[possible nit] I know this was just moved over, but, is the pixel format support not exposed by mediadevices somehow? would be nice not to have to maintain this against mediadevices changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO changing this in this PR feels out of scope. Do you mind fleshing out your idea and putting it into a Jira ticket in this epic: https://viam.atlassian.net/browse/RSDK-12198?atlOrigin=eyJpIjoiZjQ3OTA2MDQ4NTA4NDllMDhmNWZhOThjYTNjNTc1ZTUiLCJwIjoiaiJ9
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| } | ||
| // Call release and read outside the lock to avoid holding the lock during I/O | ||
| if oldRelease != nil { | ||
| oldRelease() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just want to make sure this strategy of releasing the last from just before reading the new frame works well
Would we not want to release immediately after retrieving the frame storing into the webcamBuffer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would we not want to release immediately after retrieving the frame storing into the webcamBuffer?
Like during an Image/Images call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was asking about in this loop after we stuff the image into the buffer c.buffer.frame = img
But realizing we do not copy so release will wipe the frame, so we need the prev release right before capturing again.
| workers *goutils.StoppableWorkers | ||
| // workers take the mutex when starting and stopping, so do not call them while holding mu | ||
| workers *goutils.StoppableWorkers | ||
| buffer *webcamBuffer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit but you can you follow this convention for what the mutex protects https://dmitri.shuralyov.com/idiomatic-go/entries/2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. Reordered
| conf resource.Config, | ||
| ) error { | ||
| newConf, err := resource.NativeConfig[*WebcamConfig](conf) | ||
| c.targetPath = nativeConf.Path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be nil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since Path is Path string , its falsy value is empty string, and we have a code path for "give me any driver available" in the // Handle "any" path conditional
So no, I don't think it can be nil
| } | ||
|
|
||
| c.hasLoggedIntrinsicsInfo = false | ||
| c.logger = c.logger.WithFields("camera_name", c.Name().ShortName(), "camera_label", c.targetPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ive never seen this before, why can't we use the logger passed into the constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this WithFields method just appends the fields specified to the end of any logger message it emits.
Example:
12/4/2025, 3:47:13 PM error rdk.resource_manager.rdk:component:camera/C922ProStreamWebcam-15 videosource/webcam.go:248 camera no longer connected; reconnecting camera_name C922ProStreamWebcam-15 camera_label 0x1140000328f0073
| } | ||
|
|
||
| // isCameraConnected is a helper for monitoring connectivity to the driver. | ||
| // Must be called with mu held. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not just lock inside the function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm gonna change this to not require the mutex to be held at all, because Claude pointed out that it is making an I/O call under the hood, which is bad practice to do with mutex held.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you end up making this change here?, to avoid needing to hold a mutex here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch I did not push to remote. My bad
| logger := c.logger | ||
| c.mu.RUnlock() | ||
|
|
||
| ok, err := c.isCameraConnected() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how does this work on mac if isCameraConnected only works on linux?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't work on mac i.e. if you unplug then replug on mac, it won't reconnect unless you restart viam-server.
pion/mediadevices#670 is in draft though, which adds darwin support
| logger.Debugw("cannot determine camera status", "error", err) | ||
| continue | ||
| } | ||
| if ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit but imo its more readable to have the if !ok block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, maybe it's the python developer in me, but I hate curly braces and nesting, so would like to avoid if possible, preferring guard pattern. And considering there is a continue already in the error case, I would like to match that pattern.
|
|
||
| closed bool | ||
| // Set by the Close method | ||
| closed bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
opt but could use atomic.bool for these
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think atomics would introduce unnecessary complexity because these bools are read and written to in critical sections with lock protection along with other members anyways
| return nil | ||
| } | ||
| if c.disconnected { | ||
| c.mu.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this deadlock if c.isdonnected is false ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. Could you elaborate on why it could happen here?
|
|
||
| // Reset buffer state | ||
| c.buffer.err = nil | ||
| c.buffer.frame = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which thread reads from the c.buffer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the buffer worker goroutine
| func (c *webcam) startBuffer() { | ||
| if c.buffer.frame != nil { | ||
| return // webcam buffer already started | ||
| if c.closed { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this supposed to be if !c.closed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the error is a typo. Fixed.
@seanavery release is actually a no-op if you look at how Read is implemented on windows, darwin and linux in mediadevices (see comment above the release struct member). So making sure release() is called properly is low priority IMO. Unit testing is a separate ticket and I want to unblock this PR to fix the freeze. Mind if we save it for https://viam.atlassian.net/browse/RSDK-12918 ? |
| c.mu.Unlock() | ||
| return fmt.Errorf("webcam already closed: %w", errClosed) | ||
| } | ||
| c.closed = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this cause issues if the line below err := oldDriver.Close() returns an error? Should we only set c.closed if all calls in this function are successful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm okay with leaving it here because closed is an indicator to resource methods Properties, Image, and Images to not run after Close has been called. Whether or not the driver successfully closes or not is not relevant.
oliviamiller
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm!
| } | ||
| // Call release and read outside the lock to avoid holding the lock during I/O | ||
| if oldRelease != nil { | ||
| oldRelease() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was asking about in this loop after we stuff the image into the buffer c.buffer.frame = img
But realizing we do not copy so release will wipe the frame, so we need the prev release right before capturing again.
Summary
Refactors webcam
GetPointCloudsReconfigurein favor ofAlwaysRebuildStoppableWorkersgroup instead of maintaining them in separate structsTests
Tests with go.mod replace line pointing to this version of mediadevices pion/mediadevices#674. So to be clear, this PR itself will not fix unplug/replug hang. It will, in conjunction with the aforementioned upstream dep PR, fix it.
On linux/arm64 (rpi5b)
In fact, this PR fixed an unreported linux bug too (unplug/replug is not working on linux/arm64 on the main branch).
On darwin/arm64 (viam issued macbook air laptop)