Skip to content

Commit

Permalink
In case of unsuccessful drain of a node we remove the node IP from th…
Browse files Browse the repository at this point in the history
…e elastic search excluded IPs.
  • Loading branch information
Abouzar Kamaee committed Jun 11, 2024
1 parent 54dc371 commit a3aa704
Showing 1 changed file with 49 additions and 1 deletion.
50 changes: 49 additions & 1 deletion operator/es_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (c *ESClient) Drain(ctx context.Context, pod *v1.Pod) error {
return c.waitForEmptyEsNode(ctx, pod)
}

func (c *ESClient) Cleanup(ctx context.Context) error {
func (c *ESClient) Cleanup(_ context.Context) error {

// 1. fetch IPs from _cat/nodes
nodes, err := c.GetNodes()
Expand Down Expand Up @@ -309,6 +309,50 @@ func (c *ESClient) excludePodIP(pod *v1.Pod) error {
return err
}

// undoExcludePodIP Removes the pod's IP from Elasticsearch exclude._ip list
func (c *ESClient) undoExcludePodIP(pod *v1.Pod) error {
c.mux.Lock()
defer c.mux.Unlock()

podIP := pod.Status.PodIP

esSettings, err := c.getClusterSettings()
if err != nil {
return err
}

esExcludedIPsString := esSettings.GetPersistentExcludeIPs().ValueOrZero()
if esExcludedIPsString == "" {
// No excluded IPs, nothing to do
return nil
}

esExcludedIPs := strings.Split(esExcludedIPsString, ",")
var newESExcludedIPs []string

for _, esExcludedIP := range esExcludedIPs {
if esExcludedIP == podIP {
// Skip the pod IP we want to remove
continue
}
newESExcludedIPs = append(newESExcludedIPs, esExcludedIP)
}

// Sort and join the new list of excluded IPs
sort.Strings(newESExcludedIPs)
newESExcludedPodIPsString := strings.Join(newESExcludedIPs, ",")

if newESExcludedPodIPsString == esExcludedIPsString {
// No changes, so no update needed
return nil
}

c.logger().Infof("Updating exclude._ip list to '%s' after removing IP '%s'", newESExcludedPodIPsString, podIP)

// Update exclude._ip setting
return c.setExcludeIPs(newESExcludedPodIPsString, esSettings)
}

func (c *ESClient) setExcludeIPs(ips string, originalESSettings *ESSettings) error {
originalESSettings.updateExcludeIps(ips)
resp, err := resty.New().R().
Expand Down Expand Up @@ -391,6 +435,10 @@ func (c *ESClient) waitForEmptyEsNode(ctx context.Context, pod *v1.Pod) error {
).R().
Get(c.Endpoint.String() + "/_cat/shards?h=index,ip&format=json")
if err != nil {
// If we were not able to finish the drain operation with success, remove the pod IP from the ES excluded IP list
if undoErr := c.undoExcludePodIP(pod); undoErr != nil {
return fmt.Errorf("failed to undo excluded pod IP: %v, original error: %v", undoErr, err)
}
return err
}
return nil
Expand Down

0 comments on commit a3aa704

Please sign in to comment.