forked from spcl/serverless-benchmarks
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsystem.py
307 lines (261 loc) · 10.7 KB
/
system.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
from abc import ABC
from abc import abstractmethod
from random import randrange
from typing import Dict, List, Optional, Tuple, Type
import docker
from sebs.benchmark import Benchmark
from sebs.cache import Cache
from sebs.config import SeBSConfig
from sebs.faas.function import Function, Trigger, ExecutionResult
from sebs.faas.storage import PersistentStorage
from sebs.utils import LoggingBase
from .config import Config
"""
This class provides basic abstractions for the FaaS system.
It provides the interface for initialization of the system and storage
services, creation and update of serverless functions and querying
logging and measurements services to obtain error messages and performance
measurements.
"""
class System(ABC, LoggingBase):
def __init__(
self,
system_config: SeBSConfig,
cache_client: Cache,
docker_client: docker.client,
):
super().__init__()
self._system_config = system_config
self._docker_client = docker_client
self._cache_client = cache_client
self._cold_start_counter = randrange(100)
@property
def system_config(self) -> SeBSConfig:
return self._system_config
@property
def docker_client(self) -> docker.client:
return self._docker_client
@property
def cache_client(self) -> Cache:
return self._cache_client
@property
def cold_start_counter(self) -> int:
return self._cold_start_counter
@cold_start_counter.setter
def cold_start_counter(self, val: int):
self._cold_start_counter = val
@property
@abstractmethod
def config(self) -> Config:
pass
@staticmethod
@abstractmethod
def function_type() -> "Type[Function]":
pass
"""
Initialize the system. After the call the local or remot
FaaS system should be ready to allocate functions, manage
storage resources and invoke functions.
:param config: systems-specific parameters
"""
def initialize(self, config: Dict[str, str] = {}):
pass
"""
Access persistent storage instance.
It might be a remote and truly persistent service (AWS S3, Azure Blob..),
or a dynamically allocated local instance.
:param replace_existing: replace benchmark input data if exists already
"""
@abstractmethod
def get_storage(self, replace_existing: bool) -> PersistentStorage:
pass
"""
Apply the system-specific code packaging routine to build benchmark.
The benchmark creates a code directory with the following structure:
- [benchmark sources]
- [benchmark resources]
- [dependence specification], e.g. requirements.txt or package.json
- [handlers implementation for the language and deployment]
This step allows us to change the structure above to fit different
deployment requirements, Example: a zip file for AWS or a specific
directory structure for Azure.
:return: path to packaged code and its size
"""
@abstractmethod
def package_code(
self,
directory: str,
language_name: str,
language_version: str,
benchmark: str,
is_cached: bool,
) -> Tuple[str, int]:
pass
@abstractmethod
def create_function(self, code_package: Benchmark, func_name: str) -> Function:
pass
@abstractmethod
def cached_function(self, function: Function):
pass
@abstractmethod
def update_function(self, function: Function, code_package: Benchmark):
pass
"""
a) if a cached function with given name is present and code has not changed,
then just return function name
b) if a cached function is present and the cloud code has a different
code version, then upload new code
c) if no cached function is present, then create code package and
either create new function or update an existing but uncached one
Benchmark rebuild is requested but will be skipped if source code is
not changed and user didn't request update.
"""
def get_function(self, code_package: Benchmark, func_name: Optional[str] = None) -> Function:
if code_package.language_version not in self.system_config.supported_language_versions(
self.name(), code_package.language_name
):
raise Exception(
"Unsupported {language} version {version} in {system}!".format(
language=code_package.language_name,
version=code_package.language_version,
system=self.name(),
)
)
if not func_name:
func_name = self.default_function_name(code_package)
rebuilt, _ = code_package.build(self.package_code)
"""
There's no function with that name?
a) yes -> create new function. Implementation might check if a function
with that name already exists in the cloud and update its code.
b) no -> retrieve function from the cache. Function code in cloud will
be updated if the local version is different.
"""
functions = code_package.functions
if not functions or func_name not in functions:
msg = (
"function name not provided."
if not func_name
else "function {} not found in cache.".format(func_name)
)
self.logging.info("Creating new function! Reason: " + msg)
function = self.create_function(code_package, func_name)
self.cache_client.add_function(
deployment_name=self.name(),
language_name=code_package.language_name,
code_package=code_package,
function=function,
)
code_package.query_cache()
return function
else:
# retrieve function
cached_function = functions[func_name]
code_location = code_package.code_location
function = self.function_type().deserialize(cached_function)
self.cached_function(function)
self.logging.info(
"Using cached function {fname} in {loc}".format(fname=func_name, loc=code_location)
)
# is the function up-to-date?
if function.code_package_hash != code_package.hash or rebuilt:
if function.code_package_hash != code_package.hash:
self.logging.info(
f"Cached function {func_name} with hash "
f"{function.code_package_hash} is not up to date with "
f"current build {code_package.hash} in "
f"{code_location}, updating cloud version!"
)
if rebuilt:
self.logging.info(
f"Enforcing rebuild and update of of cached function "
f"{func_name} with hash {function.code_package_hash}."
)
self.update_function(function, code_package)
function.code_package_hash = code_package.hash
function.updated_code = True
self.cache_client.add_function(
deployment_name=self.name(),
language_name=code_package.language_name,
code_package=code_package,
function=function,
)
code_package.query_cache()
# code up to date, but configuration needs to be updated
# FIXME: detect change in function config
elif self.is_configuration_changed(function, code_package):
self.update_function_configuration(function, code_package)
self.cache_client.update_function(function)
code_package.query_cache()
else:
self.logging.info(f"Cached function {func_name} is up to date.")
return function
@abstractmethod
def update_function_configuration(self, cached_function: Function, benchmark: Benchmark):
pass
"""
This function checks for common function parameters to verify if their value is
still up to date.
"""
def is_configuration_changed(self, cached_function: Function, benchmark: Benchmark) -> bool:
changed = False
for attr in ["timeout", "memory"]:
new_val = getattr(benchmark.benchmark_config, attr)
old_val = getattr(cached_function.config, attr)
if new_val != old_val:
self.logging.info(
f"Updating function configuration due to changed attribute {attr}: "
f"cached function has value {old_val} whereas {new_val} has been requested."
)
changed = True
setattr(cached_function.config, attr, new_val)
for lang_attr in [["language"] * 2, ["language_version", "version"]]:
new_val = getattr(benchmark, lang_attr[0])
old_val = getattr(cached_function.config.runtime, lang_attr[1])
if new_val != old_val:
# FIXME: should this even happen? we should never pick the function with
# different runtime - that should be encoded in the name
self.logging.info(
f"Updating function configuration due to changed runtime attribute {attr}: "
f"cached function has value {old_val} whereas {new_val} has been requested."
)
changed = True
setattr(cached_function.config.runtime, lang_attr[1], new_val)
return changed
@abstractmethod
def default_function_name(self, code_package: Benchmark) -> str:
pass
@abstractmethod
def enforce_cold_start(self, functions: List[Function], code_package: Benchmark):
pass
@abstractmethod
def download_metrics(
self,
function_name: str,
start_time: int,
end_time: int,
requests: Dict[str, ExecutionResult],
metrics: dict,
):
pass
@abstractmethod
def create_trigger(self, function: Function, trigger_type: Trigger.TriggerType) -> Trigger:
pass
# @abstractmethod
# def get_invocation_error(self, function_name: str,
# start_time: int, end_time: int):
# pass
"""
Shutdown local FaaS instances, connections and clients.
"""
@abstractmethod
def shutdown(self) -> None:
try:
self.cache_client.lock()
self.config.update_cache(self.cache_client)
finally:
self.cache_client.unlock()
@staticmethod
@abstractmethod
def name() -> str:
pass