-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRTOS.py
106 lines (90 loc) · 4.31 KB
/
RTOS.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
from scheduler.EDF import EDF
from scheduler.NP_EDF import NP_EDF
from scheduler.DM import DM
from scheduler.DM_NPP import DM_NPP
from scheduler.DM_HLP import DM_HLP
from scheduler.RM import RM
from taskset import TaskSet
from chart import Chart
from task import RUNNING, COMPLETED, READY, SUSPENDED
class RTOS:
# GIVEN and algorithm
def __init__(self, task_set: TaskSet, scheduler_kind: str) -> None:
# it show clock time of cpu (here we have time step as int)
self.cpu_time = 0
self.task_set = task_set
self.executing_job = None
# duration list shows which task was running at what time! - only print it in debug mode
self.task_duration_list = []
self.total_executing_time = 0
# set scheduler
if scheduler_kind == 'NP_EDF':
self.scheduler = NP_EDF()
elif scheduler_kind == 'RM':
self.scheduler = RM()
elif scheduler_kind == 'DM':
self.scheduler = DM()
elif scheduler_kind == 'DM_NPP':
self.scheduler = DM_NPP()
elif scheduler_kind == 'DM_HLP':
self.scheduler = DM_HLP()
else:
# Default scheduler
self.scheduler = EDF()
# TODO: check the non increasing over job.increase_uptime() - if it returns non zero - must split duration
# append latest executing task duration to this list for simulation purposes
def append_task_duration(self, name: str, start: int, stop: int):
self.task_duration_list.append({'name':name, 'duration':(start, stop)})
# TODO: complete here
def run(self, duration=100):
latest_activation = 0
for i in range(duration):
# get a job to execute
job = self.scheduler.schedule(self.task_set, self.cpu_time)
if job == None:
# preempt job if one is executing
if self.executing_job != None:
if self.executing_job.state == RUNNING:
self.executing_job.state = READY
self.append_task_duration(self.executing_job.get_name(), latest_activation, self.cpu_time)
self.executing_job = None
latest_activation = self.cpu_time
else:
# new job is being exec
if self.executing_job == None:
if job.state == READY:
latest_activation = self.cpu_time
job.state = RUNNING
if job.increase_uptime() == 0:
self.total_executing_time += 1
self.executing_job = job
# no changes to previous job - continue executing
elif job.uuid == self.executing_job.uuid:
if job.increase_uptime() == 0:
self.total_executing_time += 1
# preempt previous job and make set given job to executioner
else:
self.executing_job.set_state(READY)
self.append_task_duration(self.executing_job.get_name(), latest_activation, self.cpu_time)
latest_activation = self.cpu_time
self.executing_job = job
self.executing_job.set_state(RUNNING)
if job.increase_uptime() == 0:
self.total_executing_time += 1
# debug
# if self.executing_job != None:
# print("Job info-> uuid: {}, name: {}, uptime: {}, state: {}, ADeadline: {}, cpu_time: {}".format(self.executing_job.uuid, self.executing_job.get_name(), self.executing_job.uptime, self.executing_job.state, self.executing_job.ADeadline, self.cpu_time))
# go to next clock
self.cpu_time += 1
if self.executing_job != None:
self.append_task_duration(self.executing_job.get_name(), latest_activation, self.cpu_time)
# displays result
def print_result(self, chart_mode):
print("Utilization: {}".format(self.total_executing_time/self.cpu_time))
print("Feasibility: {}".format(str(self.task_set.feasible)))
print("Task Durations:")
for item in self.task_duration_list:
print(item)
if chart_mode == 'ON':
chart = Chart(self.task_duration_list)
chart.draw()