-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdms.py
205 lines (176 loc) · 6.92 KB
/
dms.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import os
import base64
from pm4py.objects.ocel.obj import OCEL
import pm4py
from datetime import date
from utils.constants import UPLOAD_DIRECTORY
import shutil
class SingletonClass(object):
# Override the default __new__ method to create a single instance of the class
def __new__(cls):
# Check if an instance of the class already exists
if not hasattr(cls, 'instance'):
# Create a new instance of the class and store it in the instance attribute
cls.instance = super(SingletonClass, cls).__new__(cls)
# Initialize the data attribute as an empty dictionary
cls.instance.data = {}
cls.instance.selected = 'example_order_process.jsonocel'
# Return the existing instance of the class
return cls.instance
class DataManagementSystem:
"""Singleton class to manage data storage and retrieval"""
# A changeable variable of the Singelton storing a configuration dict
sap_config = {
'user': '',
'passwd': '',
'ashost': '',
'saprouter': '',
'msserv': '',
'sysid': '',
'group': '',
'client': '',
'lang': '',
'trace': ''
}
extraction_config = {
"from_date": date(2020, 1, 1),
"to_date": date.today(),
}
use_sqlite = False
@classmethod
def __add_version_control(cls, key):
# Get the single instance of the SingletonClass object
singleton_instance = SingletonClass()
list_key = key + '_filter_list'
# Start the list with the original log (and then append new, filtered logs)
# Store a separate filtering-list per log
list_values = [singleton_instance.data[key]]
singleton_instance.data[list_key] = list_values
@classmethod
def store_version_control(cls, key, ocel):
list_key = key + '_filter_list'
filter_list = cls.__load(list_key)
filter_list.append(ocel)
@classmethod
def load_version_control(cls, key) -> OCEL:
list_key = key + '_filter_list'
filter_list = cls.__load(list_key)
loaded = filter_list[len(filter_list) - 1]
if type(loaded) is OCEL:
return loaded
return pm4py.read_ocel(loaded)
@classmethod
def store(cls, key, content):
"""Save content to file and store path in singleton instance"""
# Get the single instance of the SingletonClass object
singleton_instance = SingletonClass()
# Store the data infile and store path in singleton instance
data = content.encode("utf8").split(b";base64,")[1]
path = os.path.join(UPLOAD_DIRECTORY, key)
with open(path, "wb+") as fp:
fp.write(base64.decodebytes(data))
singleton_instance.data[key] = path
# Add version control for future filtering
cls.__add_version_control(key)
@classmethod
def __load(cls, key):
# Get the single instance of the SingletonClass object
singleton_instance = SingletonClass()
# Retrieve the data from the data attribute of the singleton instance
return singleton_instance.data.get(key)
@classmethod
def __load_selected(cls) -> OCEL:
"""Get path of selected ocel
"""
# Get the single instance of the SingletonClass object
singleton_instance = SingletonClass()
# Retrieve the data from the data attribute of the singleton instance
return cls.load_version_control(singleton_instance.selected)
@classmethod
def get_ocel(cls) -> OCEL:
return cls.__load_selected()
@classmethod
def select(cls, key):
"""Set selected ocel"""
singleton_instance = SingletonClass()
singleton_instance.selected = key
@classmethod
def all_upload_keys(cls) -> list[str]:
"""Get all keys stored in singleton instance"""
# Get the single instance of the SingletonClass object
singleton_instance = SingletonClass()
# Retrieve the data from the data attribute of the singleton instance
funct = lambda x: UPLOAD_DIRECTORY in x[1]
return dict(list(filter(funct, singleton_instance.data.items()))).keys()
@classmethod
def register(cls, key, path):
"""Store already existing ocel path in singleton instance"""
key = cls.get_a_unique_filename(key)
# Get the single instance of the SingletonClass object
singleton_instance = SingletonClass()
# Store the data infile and store path in singleton instance
singleton_instance.data[key] = path
# Add version control for future filtering
if UPLOAD_DIRECTORY in path:
cls.__add_version_control(key)
@staticmethod
def reset_to_original(key):
singleton_instance = SingletonClass()
list_key = key + '_filter_list'
singleton_instance.data[list_key] = [singleton_instance.data[list_key][0]]
@classmethod
def rollback(cls):
singleton_instance = SingletonClass()
list_key = singleton_instance.selected + '_filter_list'
filter_list = cls.__load(list_key)
nr = len(filter_list) - 1
if len(filter_list) > 1:
filter_list.pop()
return nr
@classmethod
def rollback_all(cls):
singleton_instance = SingletonClass()
list_key = singleton_instance.selected + '_filter_list'
filter_list = cls.__load(list_key)
nr = len(filter_list) - 1
while len(filter_list) > 1:
filter_list.pop()
return nr
def clear(self):
if os.path.exists("data/uploads"):
shutil.rmtree("data/uploads")
SingletonClass().data = {}
os.mkdir("data/uploads")
shutil.copy("data/resources/initial/example_order_process.jsonocel",
"data/uploads/example_order_process.jsonocel")
self.register("example_order_process.jsonocel", "data/uploads/example_order_process.jsonocel")
self.select("example_order_process.jsonocel")
def delete_selected(self):
singleton_instance = SingletonClass()
if len(self.all_upload_keys()) == 1:
raise Exception
else:
key = singleton_instance.selected
data = singleton_instance.data
list_key = key + '_filter_list'
del data[list_key]
os.remove(data[key])
del data[key]
for key in data.keys():
self.select(key)
break
@staticmethod
def get_a_unique_filename(key) -> str:
if key is None:
return key
if key.endswith('_filter_list'):
key = key + '_'
data = SingletonClass().data
if key not in data:
return key
i = 1
key = key.rpartition('.jsonocel')[0] + '(1)' + '.jsonocel'
while key in data:
i = i + 1
key = key.rpartition('(')[0] + '(' + str(i) + ')' + '.jsonocel'
return key