From bd5227fda3e3981f81bb3bddad3a942544888497 Mon Sep 17 00:00:00 2001 From: "Michael N. Lipp" Date: Wed, 15 Jan 2025 21:58:08 +0100 Subject: [PATCH] Simplify pool management. --- .../org/jdrupes/vmoperator/common/VmPool.java | 28 ++++++ .../vmoperator/manager/PoolMonitor.java | 96 +++++-------------- 2 files changed, 53 insertions(+), 71 deletions(-) diff --git a/org.jdrupes.vmoperator.common/src/org/jdrupes/vmoperator/common/VmPool.java b/org.jdrupes.vmoperator.common/src/org/jdrupes/vmoperator/common/VmPool.java index 07e06161b..426a69cd8 100644 --- a/org.jdrupes.vmoperator.common/src/org/jdrupes/vmoperator/common/VmPool.java +++ b/org.jdrupes.vmoperator.common/src/org/jdrupes/vmoperator/common/VmPool.java @@ -36,10 +36,20 @@ public class VmPool { private String name; + private boolean defined; private List permissions = Collections.emptyList(); private final Set vms = Collections.synchronizedSet(new HashSet<>()); + /** + * Instantiates a new vm pool. + * + * @param name the name + */ + public VmPool(String name) { + this.name = name; + } + /** * Returns the name. * @@ -58,6 +68,24 @@ public void setName(String name) { this.name = name; } + /** + * Checks if is defined. + * + * @return the result + */ + public boolean isDefined() { + return defined; + } + + /** + * Sets if is. + * + * @param defined the defined to set + */ + public void setDefined(boolean defined) { + this.defined = defined; + } + /** * Permissions granted for a VM from the pool. * diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/PoolMonitor.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/PoolMonitor.java index 49c708dbc..27dfb7d12 100644 --- a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/PoolMonitor.java +++ b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/PoolMonitor.java @@ -19,17 +19,13 @@ package org.jdrupes.vmoperator.manager; import io.kubernetes.client.openapi.ApiException; -import io.kubernetes.client.openapi.models.V1ObjectMeta; import io.kubernetes.client.util.Watch; import java.io.IOException; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantLock; import static org.jdrupes.vmoperator.common.Constants.VM_OP_GROUP; import org.jdrupes.vmoperator.common.K8s; import org.jdrupes.vmoperator.common.K8sClient; @@ -56,8 +52,6 @@ public class PoolMonitor extends AbstractMonitor { - private final ReentrantLock pendingLock = new ReentrantLock(); - private final Map> pending = new ConcurrentHashMap<>(); private final Map pools = new ConcurrentHashMap<>(); private EventPipeline poolPipeline; @@ -107,18 +101,13 @@ protected void handleChange(K8sClient client, // When pool is deleted, save VMs in pending if (type == ResponseType.DELETED) { - try { - pendingLock.lock(); - Optional.ofNullable(pools.get(poolName)).ifPresent( - p -> { - pending.computeIfAbsent(poolName, k -> Collections - .synchronizedSet(new HashSet<>())).addAll(p.vms()); - pools.remove(poolName); - poolPipeline.fire(new VmPoolChanged(p, true)); - }); - } finally { - pendingLock.unlock(); - } + Optional.ofNullable(pools.get(poolName)).ifPresent(pool -> { + pool.setDefined(false); + if (pool.vms().isEmpty()) { + pools.remove(poolName); + } + poolPipeline.fire(new VmPoolChanged(pool, true)); + }); return; } @@ -135,32 +124,13 @@ protected void handleChange(K8sClient client, } } - // Convert to VM pool - var vmPool = client().getJSON().getGson().fromJson( - GsonPtr.to(poolModel.data()).to("spec").get(), - VmPool.class); - V1ObjectMeta metadata = response.object.getMetadata(); - vmPool.setName(metadata.getName()); - - // If modified, merge changes and notify - if (type == ResponseType.MODIFIED && pools.containsKey(poolName)) { - pools.get(poolName).setPermissions(vmPool.permissions()); - poolPipeline.fire(new VmPoolChanged(vmPool)); - return; - } - - // Add new pool - try { - pendingLock.lock(); - Optional.ofNullable(pending.get(poolName)).ifPresent(s -> { - vmPool.vms().addAll(s); - }); - pending.remove(poolName); - pools.put(poolName, vmPool); - poolPipeline.fire(new VmPoolChanged(vmPool)); - } finally { - pendingLock.unlock(); - } + // Get pool and merge changes + var vmPool = pools.computeIfAbsent(poolName, k -> new VmPool(poolName)); + var newData = client().getJSON().getGson().fromJson( + GsonPtr.to(poolModel.data()).to("spec").get(), VmPool.class); + vmPool.setPermissions(newData.permissions()); + vmPool.setDefined(true); + poolPipeline.fire(new VmPoolChanged(vmPool)); } /** @@ -173,35 +143,19 @@ public void onVmDefChanged(VmDefChanged event) { String vmName = event.vmDefinition().name(); switch (event.type()) { case ADDED: - try { - pendingLock.lock(); - event.vmDefinition().> fromSpec("pools") - .orElse(Collections.emptyList()).stream().forEach(p -> { - if (pools.containsKey(p)) { - pools.get(p).vms().add(vmName); - } else { - pending.computeIfAbsent(p, k -> Collections - .synchronizedSet(new HashSet<>())).add(vmName); - } - poolPipeline.fire(new VmPoolChanged(pools.get(p))); - }); - } finally { - pendingLock.unlock(); - } + event.vmDefinition().> fromSpec("pools") + .orElse(Collections.emptyList()).stream().forEach(p -> { + pools.computeIfAbsent(p, k -> new VmPool(p)) + .vms().add(vmName); + poolPipeline.fire(new VmPoolChanged(pools.get(p))); + }); break; case DELETED: - try { - pendingLock.lock(); - pools.values().stream().forEach(p -> { - if (p.vms().remove(vmName)) { - poolPipeline.fire(new VmPoolChanged(p)); - } - }); - // Should not be necessary, but just in case - pending.values().stream().forEach(s -> s.remove(vmName)); - } finally { - pendingLock.unlock(); - } + pools.values().stream().forEach(p -> { + if (p.vms().remove(vmName)) { + poolPipeline.fire(new VmPoolChanged(p)); + } + }); break; default: break;