Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The worker is not freed when an unresolved future loses its reference. #20

Open
randy3k opened this issue Dec 3, 2021 · 8 comments
Open

Comments

@randy3k
Copy link

randy3k commented Dec 3, 2021

The "worker" is not freed when an unresolved future loses its reference.

future::plan(future.callr::callr, workers = 10)

future::nbrOfFreeWorkers()
#> [1] 10

x <- future.callr::callr(1)
x <- NULL

future::nbrOfFreeWorkers()
#> [1] 9
  1. If the future is finished, we should remove the future from the registry
  2. If the future is still running, there should be a mechanism to stop the R subprocess
@randy3k
Copy link
Author

randy3k commented Dec 5, 2021

One workaround that I found is to clear the db object manually.

assign("db", list(), envir = environment(environment(future.callr:::nbrOfFreeWorkers.callr)$FutureRegistry))

I guess you could make use of weak references to keep track of the futures in the registry.

@HenrikBengtsson
Copy link
Collaborator

Thanks for reporting. Yes, "this is expected". Modulo issue #21, at least it should always be possible to create one more future in this situation, because when nbrOfFreeWorkers() == 0, then (and only then), an attempt to resolve and collect (="free up") one of the active futures.

Before anything else, I'm curious, how do you end up in this situation?

  1. If the future is finished, we should remove the future from the registry

Yes, one could hook into R's garbage collector by registering a finalizer using one dummy environment per future. When finalized, we could free up the internal registry. FWIW, this is what happens when we lose references to R connections; they're cleaned out when running the GC.

  1. If the future is still running, there should be a mechanism to stop the R subprocess

This is possible but less straightforward because it has to fit in with the Future API, which yet doesn't have a concept of terminating futures. It also introduces a side effect, i.e. terminating instead of letting a future expression complete. So, these type of behaviors must work the same regardless of future backend. It's no the roadmap to figure this out.

@randy3k
Copy link
Author

randy3k commented Dec 6, 2021

I noticed that no workers were available after I have interrupted a call to future.apply::future_lapply.

r$> future::plan(future.callr::callr, workers = 5)

r$> future.apply::future_lapply(1:5, function(.) Sys.sleep(1000))
^C

r$> future::nbrOfFreeWorkers()
[1] 0

@randy3k
Copy link
Author

randy3k commented Dec 6, 2021

But I just realized that it is a more general problem as it also affects multisession

r$> future::plan(future::multisession, workers = 5)

r$> future.apply::future_lapply(1:5, function(.) Sys.sleep(1000))
^C

r$> future::nbrOfFreeWorkers()
[1] 0

@randy3k
Copy link
Author

randy3k commented Dec 6, 2021

Yes, one could hook into R's garbage collector by registering a finalizer using one dummy environment per future. When finalized, we could free up the internal registry. FWIW, this is what happens when we lose references to R connections; they're cleaned out when running the GC.

If the registry is keeping references of the futures, the futures will never be gc'ed. That's why I mentioned weak references a bit earlier.

@HenrikBengtsson
Copy link
Collaborator

Workaround

I don't I ever explained how to get out:

library(future.apply)
plan(multisession, workers = 2)
y <- future_lapply(1:2, function(.) Sys.sleep(1000)) ## Ctrl-C
#> ^C
nbrOfFreeWorkers()
#> [1] 0

where there are no more available workers. To get out of this state, we currently have to force a new set of workers;

plan(sequential)
plan(multisession, workers = 2)
nbrOfFreeWorkers()
#> [1] 2

Moving forward

Having said this, I think there's more that can be done by the future framework here. For example, after the original futures finished, which takes 1000 seconds, we should be able to create new futures, but currently we can't.

One approach could be to check if nbrOfFreeWorkers() == 0L, then an attempt to garbage collect internal futures should be made before creating the first future.

@randy3k
Copy link
Author

randy3k commented Jan 30, 2022

The workaround only works with multisession, but not callr workers.

@HenrikBengtsson HenrikBengtsson added this to the Next release milestone Jan 30, 2022
@HenrikBengtsson
Copy link
Collaborator

The workaround only works with multisession, but not callr workers.

Correct; I forgot about this. There's a similar problem when creating 'cluster' future with a BYO 'cluster';

> cl <- parallelly::makeClusterPSOCK(2L)
> plan(cluster, workers = cl)
> for (kk in 1:2) future(Sys.sleep(1000))
> nbrOfFreeWorkers()
[1] 0
> plan(sequential)
> nbrOfFreeWorkers()
[1] 1
> plan(cluster, workers = cl)
# stalls until one of the `cl` workers is free

I understand why this happens in both cases. It is actually only for plan(multisession), and plan(cluster, workers = nworkers) and some other cases where the plan(sequential) workaround works.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants