forked from LukasForst/fides
-
Notifications
You must be signed in to change notification settings - Fork 1
/
generators.py
152 lines (134 loc) · 7.48 KB
/
generators.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import random
from typing import Dict, List
from fides.model.aliases import Target, Score
from fides.model.peer import PeerInfo
from fides.utils.logger import Logger
from simulations.model import SimulationConfiguration
from simulations.peer import PeerBehavior, Peer, ConfidentCorrectPeer, UncertainPeer, ConfidentIncorrectPeer, \
MaliciousPeer
from simulations.utils import Click
logger = Logger(__name__)
def generate_simulations(evaluation_strategies, gaining_trust_periods, initial_reputations, local_slips_acts_ass,
malicious_peers_lie_abouts, malicious_targets, peers_count, peers_distribution,
pre_trusted_peers, service_history_sizes, simulation_lengths, targets,
ti_aggregation_strategies):
ns = len(peers_count) * len(peers_distribution) * len(pre_trusted_peers) * len(targets) * \
len(malicious_targets) * len(malicious_peers_lie_abouts) * \
len(gaining_trust_periods) * len(simulation_lengths) * len(service_history_sizes) * \
len(evaluation_strategies) * len(ti_aggregation_strategies) * \
len(initial_reputations) * len(local_slips_acts_ass)
logger.warn(f'Number of combinations for parameters: {ns}')
simulations = []
for peer_count in peers_count:
for distribution in peers_distribution:
p_distribution = {
PeerBehavior.CONFIDENT_CORRECT: round(distribution[0] * peer_count),
PeerBehavior.UNCERTAIN_PEER: round(distribution[1] * peer_count),
PeerBehavior.CONFIDENT_INCORRECT: round(distribution[2] * peer_count),
PeerBehavior.MALICIOUS_PEER: round(distribution[3] * peer_count)
}
p_distribution[PeerBehavior.UNCERTAIN_PEER] = \
peer_count - p_distribution[PeerBehavior.CONFIDENT_CORRECT] - \
p_distribution[PeerBehavior.CONFIDENT_INCORRECT] - \
p_distribution[PeerBehavior.MALICIOUS_PEER]
for pre_trusted_peer in pre_trusted_peers:
if pre_trusted_peer > distribution[0]:
continue
for target in targets:
for malicious_target in malicious_targets:
malicious_targets_count = int(target * malicious_target)
benign_targets_count = target - malicious_targets_count
for malicious_peers_lie_about in malicious_peers_lie_abouts:
for gaining_trust_period in gaining_trust_periods:
for simulation_length in simulation_lengths:
for service_history_size in service_history_sizes:
for evaluation_strategy in evaluation_strategies:
for ti_aggregation_strategy in ti_aggregation_strategies:
for initial_reputation in initial_reputations:
for local_slips_acts_as in local_slips_acts_ass:
simulation_configuration = SimulationConfiguration(
benign_targets=benign_targets_count,
malicious_targets=malicious_targets_count,
malicious_peers_lie_about_targets=malicious_peers_lie_about,
peers_distribution=p_distribution,
simulation_length=simulation_length,
malicious_peers_lie_since=gaining_trust_period,
service_history_size=service_history_size,
pre_trusted_peers_count=round(
peer_count * pre_trusted_peer),
initial_reputation=initial_reputation,
local_slips_acts_as=local_slips_acts_as,
evaluation_strategy=evaluation_strategy,
ti_aggregation_strategy=ti_aggregation_strategy,
)
simulations.append(simulation_configuration)
random.shuffle(simulations)
return simulations
def generate_peers_distributions() -> List[List[float]]:
# [CONFIDENT_CORRECT, UNCERTAIN_PEER, CONFIDENT_INCORRECT, MALICIOUS]
sample = {
'cc': [0.0, 0.25, 0.5, 0.75],
'up': [0.0, 0.25, 0.5, 0.75],
'ci': [0.0, 0.25, 0.5, 0.75],
'ma': [0.0, 0.25, 0.5, 0.75],
}
data = []
def append_if_possible(r, value) -> bool:
if sum(r) + value <= 1:
r.append(value)
return True
else:
return False
for cc in sample['cc']:
row = [cc]
for up in sample['up']:
if not append_if_possible(row, up):
continue
for ci in sample['ci']:
if not append_if_possible(row, ci):
continue
for ma in sample['ma']:
if not append_if_possible(row, ma):
continue
if sum(row) == 1:
data.append(row)
row = row[:-1]
row = row[:-1]
row = row[:-1]
return data
def generate_targets(benign: int, malicious: int) -> Dict[Target, Score]:
benign = [(f"BENIGN #{i}", 1) for i in range(benign)]
malicious = [(f"MALICIOUS #{i}", -1) for i in range(malicious)]
all_targets = benign + malicious
random.shuffle(all_targets)
return {target: score for (target, score) in all_targets}
def generate_peers(
service_history_size: int,
recommendation_history_size: int,
distribution: Dict[PeerBehavior, int],
malicious_lie_about: List[Target],
malicious_start_lie_at: Click
) -> List[Peer]:
peers = []
for idx, (peer_type, count) in enumerate(distribution.items()):
if peer_type == PeerBehavior.CONFIDENT_CORRECT:
p = [ConfidentCorrectPeer(PeerInfo(f"CONFIDENT_CORRECT #{i}", []), service_history_size,
recommendation_history_size)
for i in range(count)]
elif peer_type == PeerBehavior.UNCERTAIN_PEER:
p = [UncertainPeer(PeerInfo(f"UNCERTAIN_PEER #{i}", []), service_history_size, recommendation_history_size)
for i in range(count)]
elif peer_type == PeerBehavior.CONFIDENT_INCORRECT:
p = [ConfidentIncorrectPeer(PeerInfo(f"CONFIDENT_INCORRECT #{i}", []), service_history_size,
recommendation_history_size)
for i in range(count)]
elif peer_type == PeerBehavior.MALICIOUS_PEER:
p = [MaliciousPeer(
PeerInfo(f"MALICIOUS_PEER #{i}", []),
service_history_size, recommendation_history_size, malicious_lie_about, malicious_start_lie_at)
for i in range(count)]
else:
raise ValueError()
peers.extend(p)
random.shuffle(peers)
return peers