Skip to content

Commit

Permalink
Simplify pool management.
Browse files Browse the repository at this point in the history
  • Loading branch information
mnlipp committed Jan 15, 2025
1 parent 4943baf commit bd5227f
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,20 @@
public class VmPool {

private String name;
private boolean defined;
private List<Grant> permissions = Collections.emptyList();
private final Set<String> vms
= Collections.synchronizedSet(new HashSet<>());

/**
* Instantiates a new vm pool.
*
* @param name the name
*/
public VmPool(String name) {
this.name = name;
}

/**
* Returns the name.
*
Expand All @@ -58,6 +68,24 @@ public void setName(String name) {
this.name = name;
}

/**
* Checks if is defined.
*
* @return the result
*/
public boolean isDefined() {
return defined;
}

/**
* Sets if is.
*
* @param defined the defined to set
*/
public void setDefined(boolean defined) {
this.defined = defined;
}

/**
* Permissions granted for a VM from the pool.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,13 @@
package org.jdrupes.vmoperator.manager;

import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.util.Watch;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import static org.jdrupes.vmoperator.common.Constants.VM_OP_GROUP;
import org.jdrupes.vmoperator.common.K8s;
import org.jdrupes.vmoperator.common.K8sClient;
Expand All @@ -56,8 +52,6 @@
public class PoolMonitor extends
AbstractMonitor<K8sDynamicModel, K8sDynamicModels, Channel> {

private final ReentrantLock pendingLock = new ReentrantLock();
private final Map<String, Set<String>> pending = new ConcurrentHashMap<>();
private final Map<String, VmPool> pools = new ConcurrentHashMap<>();
private EventPipeline poolPipeline;

Expand Down Expand Up @@ -107,18 +101,13 @@ protected void handleChange(K8sClient client,

// When pool is deleted, save VMs in pending
if (type == ResponseType.DELETED) {
try {
pendingLock.lock();
Optional.ofNullable(pools.get(poolName)).ifPresent(
p -> {
pending.computeIfAbsent(poolName, k -> Collections
.synchronizedSet(new HashSet<>())).addAll(p.vms());
pools.remove(poolName);
poolPipeline.fire(new VmPoolChanged(p, true));
});
} finally {
pendingLock.unlock();
}
Optional.ofNullable(pools.get(poolName)).ifPresent(pool -> {
pool.setDefined(false);
if (pool.vms().isEmpty()) {
pools.remove(poolName);
}
poolPipeline.fire(new VmPoolChanged(pool, true));
});
return;
}

Expand All @@ -135,32 +124,13 @@ protected void handleChange(K8sClient client,
}
}

// Convert to VM pool
var vmPool = client().getJSON().getGson().fromJson(
GsonPtr.to(poolModel.data()).to("spec").get(),
VmPool.class);
V1ObjectMeta metadata = response.object.getMetadata();
vmPool.setName(metadata.getName());

// If modified, merge changes and notify
if (type == ResponseType.MODIFIED && pools.containsKey(poolName)) {
pools.get(poolName).setPermissions(vmPool.permissions());
poolPipeline.fire(new VmPoolChanged(vmPool));
return;
}

// Add new pool
try {
pendingLock.lock();
Optional.ofNullable(pending.get(poolName)).ifPresent(s -> {
vmPool.vms().addAll(s);
});
pending.remove(poolName);
pools.put(poolName, vmPool);
poolPipeline.fire(new VmPoolChanged(vmPool));
} finally {
pendingLock.unlock();
}
// Get pool and merge changes
var vmPool = pools.computeIfAbsent(poolName, k -> new VmPool(poolName));
var newData = client().getJSON().getGson().fromJson(
GsonPtr.to(poolModel.data()).to("spec").get(), VmPool.class);
vmPool.setPermissions(newData.permissions());
vmPool.setDefined(true);
poolPipeline.fire(new VmPoolChanged(vmPool));
}

/**
Expand All @@ -173,35 +143,19 @@ public void onVmDefChanged(VmDefChanged event) {
String vmName = event.vmDefinition().name();
switch (event.type()) {
case ADDED:
try {
pendingLock.lock();
event.vmDefinition().<List<String>> fromSpec("pools")
.orElse(Collections.emptyList()).stream().forEach(p -> {
if (pools.containsKey(p)) {
pools.get(p).vms().add(vmName);
} else {
pending.computeIfAbsent(p, k -> Collections
.synchronizedSet(new HashSet<>())).add(vmName);
}
poolPipeline.fire(new VmPoolChanged(pools.get(p)));
});
} finally {
pendingLock.unlock();
}
event.vmDefinition().<List<String>> fromSpec("pools")
.orElse(Collections.emptyList()).stream().forEach(p -> {
pools.computeIfAbsent(p, k -> new VmPool(p))
.vms().add(vmName);
poolPipeline.fire(new VmPoolChanged(pools.get(p)));
});
break;
case DELETED:
try {
pendingLock.lock();
pools.values().stream().forEach(p -> {
if (p.vms().remove(vmName)) {
poolPipeline.fire(new VmPoolChanged(p));
}
});
// Should not be necessary, but just in case
pending.values().stream().forEach(s -> s.remove(vmName));
} finally {
pendingLock.unlock();
}
pools.values().stream().forEach(p -> {
if (p.vms().remove(vmName)) {
poolPipeline.fire(new VmPoolChanged(p));
}
});
break;
default:
break;
Expand Down

0 comments on commit bd5227f

Please sign in to comment.