-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.py
147 lines (119 loc) · 5.05 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import os
import socket
import threading
from hashlib import sha256
from Cryptodome.PublicKey import RSA
from Cryptodome.Cipher import AES, PKCS1_OAEP
from client_gui import Gui
class Client:
def __init__(self, ip: str, port: int):
self.ip = ip
self.port = port
self.stop = False
self.__public_key = None
self.__ssk = None
self.user_name = None
self.client_socket = None
self.msg = ''
self.gui = Gui(self)
def run(self):
self.create_socket()
self.gui.init_gui()
self.gui.window.mainloop()
def create_socket(self):
try:
self.client_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
self.client_socket.connect((self.ip, self.port))
except ConnectionRefusedError:
self.gui.messagebox_info('Offline', 'Server if offline')
exit()
def __handshake(self):
self.generate_rsa_keys()
self.client_socket.sendall(self.__public_key)
key_from_server = self.client_socket.recv(2048)
self.decrypt_session_key(key_from_server)
def join_to_chat(self):
self.user_name = self.gui.username_input_area.get()
if not self.user_name.strip():
self.gui.messagebox_error()
else:
self.gui.clear_text_input()
self.gui.destroy_user_input()
self.client_socket.sendall(self.user_name.encode('utf-8'))
self.__handshake()
listen_thread = threading.Thread(target=self.listen_server, daemon=True)
listen_thread.start()
def send_to_server(self, event):
if not self.user_name:
self.gui.messagebox_error()
else:
self.msg = self.gui.grab_message()
if self.msg:
self.gui.clear_text_input()
self.gui.handler_text_area(data=self.msg)
nonce, tag, cipher_text = self.encrypt_message(message=self.msg.encode('utf-8'))
self.client_socket.sendall(nonce + b'!:@' + tag + b'!:@' + cipher_text)
def listen_server(self):
while not self.stop:
try:
self.msg = self.client_socket.recv(2048)
if b'joined' in self.msg or b'LEFT' in self.msg:
self.gui.handler_text_area(data=self.msg.replace(b'&??*', b' '))
else:
nickname, letter = self.msg.split(b'&??*')
received_letter = self.decrypt_message(letter)
self.gui.handler_text_area(data=nickname + b': ' + received_letter + b'\n')
except (ConnectionResetError, ConnectionAbortedError) as e:
print(e, type(e))
self.emergency_closure(title='Offline', text='Server is offline')
def exit_from_chat(self):
if self.gui.messagebox_exit():
self.stop = True
self.gui.window.destroy()
self.client_socket.close()
exit()
def emergency_closure(self, title, text):
self.gui.messagebox_info(title=title, text=text)
self.gui.window.destroy()
self.client_socket.close()
exit()
def generate_rsa_keys(self):
key = RSA.generate(2048)
private_key = key.export_key()
with open('private_key.pem', 'wb') as f:
f.write(private_key)
self.__public_key = key.public_key().export_key()
def encrypt_message(self, message):
try:
cipher_aes = AES.new(self.__ssk, AES.MODE_EAX)
cipher_text, tag = cipher_aes.encrypt_and_digest(message)
return cipher_aes.nonce, tag, cipher_text
except ValueError as e:
print('Hacking attempt!', e)
self.emergency_closure(title='Hacking attempt!', text='Hacking attempt!!! Termination of work')
def decrypt_message(self, message):
try:
text = message.split(b'!:@')
nonce, tag, cipher_text = text
cipher_aes = AES.new(self.__ssk, AES.MODE_EAX, nonce)
data = cipher_aes.decrypt_and_verify(cipher_text, tag)
return data
except ValueError as e:
print('Hacking attempt!', e)
self.emergency_closure(title='Hacking attempt!', text='Hacking attempt!!! Termination of work')
def decrypt_session_key(self, crypto_session_key):
try:
ssk, hash_ssk = crypto_session_key.split(b'&-^*')
private_key = RSA.import_key(open('private_key.pem').read())
cipher_rsa = PKCS1_OAEP.new(private_key)
self.__ssk = cipher_rsa.decrypt(ssk)
check_hash_ssk = sha256(self.__ssk).digest()
if hash_ssk != check_hash_ssk:
raise ValueError
os.remove(os.path.join(os.path.abspath(os.path.dirname(__file__)), 'private_key.pem'))
except ValueError as e:
print('Hacking attempt!', e)
self.emergency_closure(title='Hacking attempt!', text='Hacking attempt!!! Termination of work')
if __name__ == '__main__':
client = Client('127.0.0.1', 6666)
client.run()