-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathservices.go
135 lines (110 loc) · 2.72 KB
/
services.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package minke
import (
"context"
"encoding/json"
"sync"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog/v2"
listcorev1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
)
type svcKey struct {
namespace, name string
}
type svcItem struct {
svc *corev1.Service
appProtos map[string]string
}
type svcUpdater struct {
c *Controller
mu sync.RWMutex
svcs map[svcKey]svcItem
}
var annAppProtos = "service.alpha.kubernetes.io/app-protocol"
func (u *svcUpdater) addItem(obj interface{}) error {
sobj, ok := obj.(*corev1.Service)
if !ok {
return nil
}
klog.Infof("service added, %s/%s", sobj.GetNamespace(), sobj.GetName())
u.mu.Lock()
defer u.mu.Unlock()
var appProtos map[string]string
appsJSON := sobj.Annotations[annAppProtos]
json.Unmarshal([]byte(appsJSON), &appProtos)
if appProtos == nil {
appProtos = make(map[string]string)
}
for _, p := range sobj.Spec.Ports {
if p.AppProtocol != nil {
appProtos[p.Name] = *p.AppProtocol
}
}
u.svcs[svcKey{sobj.Namespace, sobj.Name}] = svcItem{
svc: sobj,
appProtos: appProtos,
}
return nil
}
func (u *svcUpdater) delItem(obj interface{}) error {
sobj, ok := obj.(*corev1.Secret)
if !ok {
return nil
}
klog.Infof("service removed, %s/%s", sobj.GetNamespace(), sobj.GetName())
u.mu.Lock()
defer u.mu.Unlock()
klog.Infof("secret deleted, %s/%s", sobj.GetNamespace(), sobj.GetName())
delete(u.svcs, svcKey{sobj.Namespace, sobj.Name})
return nil
}
func (u *svcUpdater) getServicePortScheme(key serviceKey) string {
u.mu.RLock()
defer u.mu.RUnlock()
v, ok := u.svcs[svcKey{key.namespace, key.name}]
if !ok || v.appProtos == nil {
return "http"
}
if proto, ok := v.appProtos[key.portName]; ok {
switch proto {
case "HTTP", "http":
return "http"
case "HTTP2", "http2":
return "http2"
case "HTTPS":
return "https"
default:
// this should be logged and metric'd
return "http"
}
}
return "http"
}
func (c *Controller) setupServiceProcess(ctx context.Context) error {
upd := &svcUpdater{
c: c,
svcs: make(map[svcKey]svcItem),
}
c.svcProc = makeProcessor(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return c.client.CoreV1().Services(c.namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return c.client.CoreV1().Services(c.namespace).Watch(ctx, options)
},
},
&corev1.Service{},
c.refresh,
upd,
)
c.svcList = listcorev1.NewServiceLister(c.svcProc.informer.GetIndexer())
c.svc = upd
return nil
}
func (c *Controller) processSvcItem(string) error {
return nil
}