-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpython-dataset.txt
2413 lines (2008 loc) · 89.7 KB
/
python-dataset.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Base connection class for netmiko
Handles SSH connection and methods that are generically applicable to different
platforms (Cisco and non-Cisco).
Also defines methods that should generally be supported by child classes
"""
from typing import (
Optional,
Callable,
Any,
List,
Dict,
TypeVar,
cast,
Type,
Sequence,
Iterator,
TextIO,
Union,
Tuple,
Deque,
)
from typing import TYPE_CHECKING
from types import TracebackType
import io
import re
import socket
import telnetlib
import time
from collections import deque
from os import path
from pathlib import Path
from threading import Lock
import functools
import logging
import itertools
import paramiko
import serial
from tenacity import retry, stop_after_attempt, wait_exponential
import warnings
from netmiko import log
from netmiko.netmiko_globals import BACKSPACE_CHAR
from netmiko.exceptions import (
NetmikoTimeoutException,
NetmikoAuthenticationException,
ConfigInvalidException,
ReadException,
ReadTimeout,
)
from netmiko.channel import Channel, SSHChannel, TelnetChannel, SerialChannel
from netmiko.session_log import SessionLog
from netmiko.utilities import (
write_bytes,
check_serial_port,
structured_data_converter,
run_ttp_template,
select_cmd_verify,
calc_old_timeout,
)
from netmiko.utilities import m_exec_time # noqa
if TYPE_CHECKING:
from os import PathLike
# For decorators
F = TypeVar("F", bound=Callable[..., Any])
DELAY_FACTOR_DEPR_SIMPLE_MSG = """\n
Netmiko 4.x and later has deprecated the use of delay_factor and/or
max_loops in this context. You should remove any use of delay_factor=x
from this method call.\n"""
# Logging filter for #2597
class SecretsFilter(logging.Filter):
def __init__(self, no_log: Optional[Dict[Any, str]] = None) -> None:
self.no_log = no_log
def filter(self, record: logging.LogRecord) -> bool:
"""Removes secrets (no_log) from messages"""
if self.no_log:
for hidden_data in self.no_log.values():
record.msg = record.msg.replace(hidden_data, "********")
return True
def lock_channel(func: F) -> F:
@functools.wraps(func)
def wrapper_decorator(self: "BaseConnection", *args: Any, **kwargs: Any) -> Any:
self._lock_netmiko_session()
try:
return_val = func(self, *args, **kwargs)
finally:
# Always unlock the channel, even on exception.
self._unlock_netmiko_session()
return return_val
return cast(F, wrapper_decorator)
def log_writes(func: F) -> F:
"""Handle both session_log and log of writes."""
@functools.wraps(func)
def wrapper_decorator(self: "BaseConnection", out_data: str) -> None:
func(self, out_data)
try:
log.debug(
"write_channel: {}".format(
str(write_bytes(out_data, encoding=self.encoding))
)
)
if self.session_log:
if self.session_log.fin or self.session_log.record_writes:
self.session_log.write(out_data)
except UnicodeDecodeError:
# Don't log non-ASCII characters; this is null characters and telnet IAC (PY2)
pass
return None
return cast(F, wrapper_decorator)
class BaseConnection:
"""
Defines vendor independent methods.
Otherwise method left as a stub method.
"""
def __init__(
self,
ip: str = "",
host: str = "",
username: str = "",
password: Optional[str] = None,
secret: str = "",
port: Optional[int] = None,
device_type: str = "",
verbose: bool = False,
global_delay_factor: float = 1.0,
global_cmd_verify: Optional[bool] = None,
use_keys: bool = False,
key_file: Optional[str] = None,
pkey: Optional[paramiko.PKey] = None,
passphrase: Optional[str] = None,
disabled_algorithms: Optional[Dict[str, Any]] = None,
allow_agent: bool = False,
ssh_strict: bool = False,
system_host_keys: bool = False,
alt_host_keys: bool = False,
alt_key_file: str = "",
ssh_config_file: Optional[str] = None,
#
# Connect timeouts
# ssh-connect --> TCP conn (conn_timeout) --> SSH-banner (banner_timeout)
# --> Auth response (auth_timeout)
conn_timeout: int = 10,
# Timeout to wait for authentication response
auth_timeout: Optional[int] = None,
banner_timeout: int = 15, # Timeout to wait for the banner to be presented
# Other timeouts
blocking_timeout: int = 20, # Read blocking timeout
timeout: int = 100, # TCP connect timeout | overloaded to read-loop timeout
session_timeout: int = 60, # Used for locking/sharing the connection
read_timeout_override: Optional[float] = None,
keepalive: int = 0,
default_enter: Optional[str] = None,
response_return: Optional[str] = None,
serial_settings: Optional[Dict[str, Any]] = None,
fast_cli: bool = True,
_legacy_mode: bool = False,
session_log: Optional[SessionLog] = None,
session_log_record_writes: bool = False,
session_log_file_mode: str = "write",
allow_auto_change: bool = False,
encoding: str = "utf-8",
sock: Optional[socket.socket] = None,
auto_connect: bool = True,
delay_factor_compat: bool = False,
) -> None:
"""
Initialize attributes for establishing connection to target device.
:param ip: IP address of target device. Not required if `host` is
provided.
:param host: Hostname of target device. Not required if `ip` is
provided.
:param username: Username to authenticate against target device if
required.
:param password: Password to authenticate against target device if
required.
:param secret: The enable password if target device requires one.
:param port: The destination port used to connect to the target
device.
:param device_type: Class selection based on device type.
:param verbose: Enable additional messages to standard output.
:param global_delay_factor: Multiplication factor affecting Netmiko delays (default: 1).
:param use_keys: Connect to target device using SSH keys.
:param key_file: Filename path of the SSH key file to use.
:param pkey: SSH key object to use.
:param passphrase: Passphrase to use for encrypted key; password will be used for key
decryption if not specified.
:param disabled_algorithms: Dictionary of SSH algorithms to disable. Refer to the Paramiko
documentation for a description of the expected format.
:param allow_agent: Enable use of SSH key-agent.
:param ssh_strict: Automatically reject unknown SSH host keys (default: False, which
means unknown SSH host keys will be accepted).
:param system_host_keys: Load host keys from the users known_hosts file.
:param alt_host_keys: If `True` host keys will be loaded from the file specified in
alt_key_file.
:param alt_key_file: SSH host key file to use (if alt_host_keys=True).
:param ssh_config_file: File name of OpenSSH configuration file.
:param timeout: Connection timeout.
:param session_timeout: Set a timeout for parallel requests.
:param auth_timeout: Set a timeout (in seconds) to wait for an authentication response.
:param banner_timeout: Set a timeout to wait for the SSH banner (pass to Paramiko).
:param keepalive: Send SSH keepalive packets at a specific interval, in seconds.
Currently defaults to 0, for backwards compatibility (it will not attempt
to keep the connection alive).
:param default_enter: Character(s) to send to correspond to enter key (default: \n).
:param response_return: Character(s) to use in normalized return data to represent
enter key (default: \n)
:param fast_cli: Provide a way to optimize for performance. Converts select_delay_factor
to select smallest of global and specific. Sets default global_delay_factor to .1
(default: True)
:param session_log: File path or BufferedIOBase subclass object to write the session log to.
:param session_log_record_writes: The session log generally only records channel reads due
to eliminate command duplication due to command echo. You can enable this if you
want to record both channel reads and channel writes in the log (default: False).
:param session_log_file_mode: "write" or "append" for session_log file mode
(default: "write")
:param allow_auto_change: Allow automatic configuration changes for terminal settings.
(default: False)
:param encoding: Encoding to be used when writing bytes to the output channel.
(default: ascii)
:param sock: An open socket or socket-like object (such as a `.Channel`) to use for
communication to the target host (default: None).
:param global_cmd_verify: Control whether command echo verification is enabled or disabled
(default: None). Global attribute takes precedence over function `cmd_verify`
argument. Value of `None` indicates to use function `cmd_verify` argument.
:param auto_connect: Control whether Netmiko automatically establishes the connection as
part of the object creation (default: True).
:param delay_factor_compat: Set send_command and send_command_timing back to using Netmiko
3.x behavior for delay_factor/global_delay_factor/max_loops. This argument will be
eliminated in Netmiko 5.x (default: False).
"""
self.remote_conn: Union[
None, telnetlib.Telnet, paramiko.Channel, serial.Serial
] = None
# Does the platform support a configuration mode
self._config_mode = True
self._read_buffer = ""
self.delay_factor_compat = delay_factor_compat
self.TELNET_RETURN = "\r\n"
if default_enter is None:
if "telnet" not in device_type:
self.RETURN = "\n"
else:
self.RETURN = self.TELNET_RETURN
else:
self.RETURN = default_enter
# Line Separator in response lines
self.RESPONSE_RETURN = "\n" if response_return is None else response_return
if ip:
self.host = ip.strip()
elif host:
self.host = host.strip()
if not ip and not host and "serial" not in device_type:
raise ValueError("Either ip or host must be set")
if port is None:
if "telnet" in device_type:
port = 23
else:
port = 22
self.port = int(port)
self.username = username
self.password = password
self.secret = secret
self.device_type = device_type
self.ansi_escape_codes = False
self.verbose = verbose
self.auth_timeout = auth_timeout
self.banner_timeout = banner_timeout
self.blocking_timeout = blocking_timeout
self.conn_timeout = conn_timeout
self.session_timeout = session_timeout
self.timeout = timeout
self.read_timeout_override = read_timeout_override
self.keepalive = keepalive
self.allow_auto_change = allow_auto_change
self.encoding = encoding
self.sock = sock
self.fast_cli = fast_cli
self._legacy_mode = _legacy_mode
self.global_delay_factor = global_delay_factor
self.global_cmd_verify = global_cmd_verify
if self.fast_cli and self.global_delay_factor == 1:
self.global_delay_factor = 0.1
self.session_log = None
self._session_log_close = False
# prevent logging secret data
no_log = {}
if self.password:
no_log["password"] = self.password
if self.secret:
no_log["secret"] = self.secret
log.addFilter(SecretsFilter(no_log=no_log))
# Netmiko will close the session_log if we open the file
if session_log is not None:
if isinstance(session_log, str):
# If session_log is a string, open a file corresponding to string name.
self.session_log = SessionLog(
file_name=session_log,
file_mode=session_log_file_mode,
no_log=no_log,
record_writes=session_log_record_writes,
)
self.session_log.open()
elif isinstance(session_log, io.BufferedIOBase):
# In-memory buffer or an already open file handle
self.session_log = SessionLog(
buffered_io=session_log,
no_log=no_log,
record_writes=session_log_record_writes,
)
else:
raise ValueError(
"session_log must be a path to a file, a file handle, "
"or a BufferedIOBase subclass."
)
# Default values
self.serial_settings = {
"port": "COM1",
"baudrate": 9600,
"bytesize": serial.EIGHTBITS,
"parity": serial.PARITY_NONE,
"stopbits": serial.STOPBITS_ONE,
}
if serial_settings is None:
serial_settings = {}
self.serial_settings.update(serial_settings)
if "serial" in device_type:
self.host = "serial"
comm_port = self.serial_settings.pop("port")
# Get the proper comm port reference if a name was enterred
comm_port = check_serial_port(comm_port)
self.serial_settings.update({"port": comm_port})
# set in set_base_prompt method
self.base_prompt = ""
self._session_locker = Lock()
# determine if telnet or SSH
if "_telnet" in device_type:
self.protocol = "telnet"
self.password = password or ""
elif "_serial" in device_type:
self.protocol = "serial"
self.password = password or ""
else:
self.protocol = "ssh"
self.key_policy: paramiko.client.MissingHostKeyPolicy
if not ssh_strict:
self.key_policy = paramiko.AutoAddPolicy()
else:
self.key_policy = paramiko.RejectPolicy()
# Options for SSH host_keys
self.use_keys = use_keys
self.key_file = (
path.abspath(path.expanduser(key_file)) if key_file else None
)
if self.use_keys is True:
self._key_check()
self.pkey = pkey
self.passphrase = passphrase
self.allow_agent = allow_agent
self.system_host_keys = system_host_keys
self.alt_host_keys = alt_host_keys
self.alt_key_file = alt_key_file
self.disabled_algorithms = disabled_algorithms or {}
# For SSH proxy support
self.ssh_config_file = ssh_config_file
# Establish the remote connection
if auto_connect:
self._open()
def _open(self) -> None:
"""Decouple connection creation from __init__ for mocking."""
self._modify_connection_params()
self.establish_connection()
self._try_session_preparation()
def __enter__(self) -> "BaseConnection":
"""Establish a session using a Context Manager."""
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
"""Gracefully close connection on Context Manager exit."""
self.disconnect()
def _modify_connection_params(self) -> None:
"""Modify connection parameters prior to SSH connection."""
pass
def _timeout_exceeded(self, start: float, msg: str = "Timeout exceeded!") -> bool:
"""Raise NetmikoTimeoutException if waiting too much in the serving queue.
:param start: Initial start time to see if session lock timeout has been exceeded
:type start: float (from time.time() call i.e. epoch time)
:param msg: Exception message if timeout was exceeded
:type msg: str
"""
if not start:
# Must provide a comparison time
return False
if time.time() - start > self.session_timeout:
# session_timeout exceeded
raise NetmikoTimeoutException(msg)
return False
def _lock_netmiko_session(self, start: Optional[float] = None) -> bool:
"""Try to acquire the Netmiko session lock. If not available, wait in the queue until
the channel is available again.
:param start: Initial start time to measure the session timeout
:type start: float (from time.time() call i.e. epoch time)
"""
if not start:
start = time.time()
# Wait here until the SSH channel lock is acquired or until session_timeout exceeded
while not self._session_locker.acquire(False) and not self._timeout_exceeded(
start, "The netmiko channel is not available!"
):
time.sleep(0.1)
return True
def _unlock_netmiko_session(self) -> None:
"""
Release the channel at the end of the task.
"""
if self._session_locker.locked():
self._session_locker.release()
def _autodetect_fs(self, cmd: str = "", pattern: str = "") -> str:
raise NotImplementedError
def _enter_shell(self) -> str:
raise NotImplementedError
def _return_cli(self) -> str:
raise NotImplementedError
def _key_check(self) -> bool:
"""Verify key_file exists."""
msg = f"""
use_keys has been set to True, but specified key_file does not exist:
use_keys: {self.use_keys}
key_file: {self.key_file}
"""
if self.key_file is None:
raise ValueError(msg)
my_key_file = Path(self.key_file)
if not my_key_file.is_file():
raise ValueError(msg)
return True
@lock_channel
@log_writes
def write_channel(self, out_data: str) -> None:
"""Generic method that will write data out the channel.
:param out_data: data to be written to the channel
:type out_data: str
"""
self.channel.write_channel(out_data)
def is_alive(self) -> bool:
"""Returns a boolean flag with the state of the connection."""
null = chr(0)
if self.remote_conn is None:
log.error("Connection is not initialised, is_alive returns False")
return False
if self.protocol == "telnet":
try:
# Try sending IAC + NOP (IAC is telnet way of sending command)
# IAC = Interpret as Command; it comes before the NOP.
log.debug("Sending IAC + NOP")
# Need to send multiple times to test connection
assert isinstance(self.remote_conn, telnetlib.Telnet)
telnet_socket = self.remote_conn.get_socket()
telnet_socket.sendall(telnetlib.IAC + telnetlib.NOP)
telnet_socket.sendall(telnetlib.IAC + telnetlib.NOP)
telnet_socket.sendall(telnetlib.IAC + telnetlib.NOP)
return True
except AttributeError:
return False
else:
# SSH
try:
# Try sending ASCII null byte to maintain the connection alive
log.debug("Sending the NULL byte")
self.write_channel(null)
assert isinstance(self.remote_conn, paramiko.Channel)
assert self.remote_conn.transport is not None
result = self.remote_conn.transport.is_active()
assert isinstance(result, bool)
return result
except (socket.error, EOFError):
log.error("Unable to send", exc_info=True)
# If unable to send, we can tell for sure that the connection is unusable
return False
return False
@lock_channel
def read_channel(self) -> str:
"""Generic handler that will read all the data from given channel."""
new_data = self.channel.read_channel()
new_data = self.normalize_linefeeds(new_data)
if self.ansi_escape_codes:
new_data = self.strip_ansi_escape_codes(new_data)
log.debug(f"read_channel: {new_data}")
if self.session_log:
self.session_log.write(new_data)
# If data had been previously saved to the buffer, the prepend it to output
# do post read_channel so session_log/log doesn't record buffered data twice
if self._read_buffer:
output = self._read_buffer + new_data
self._read_buffer = ""
else:
output = new_data
return output
def read_until_pattern(
self,
pattern: str = "",
read_timeout: float = 10.0,
re_flags: int = 0,
max_loops: Optional[int] = None,
) -> str:
"""Read channel until pattern is detected.
Will return string up to and including pattern.
Returns ReadTimeout if pattern not detected in read_timeout seconds.
:param pattern: Regular expression pattern used to identify that reading is done.
:param read_timeout: maximum time to wait looking for pattern. Will raise ReadTimeout.
A read_timeout value of 0 will cause the loop to never timeout (i.e. it will keep
reading indefinitely until pattern is detected.
:param re_flags: regex flags used in conjunction with pattern (defaults to no flags).
:param max_loops: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.
"""
if max_loops is not None:
msg = """\n
Netmiko 4.x has deprecated the use of max_loops with read_until_pattern.
You should convert all uses of max_loops over to read_timeout=x
where x is the total number of seconds to wait before timing out.\n"""
warnings.warn(msg, DeprecationWarning)
if self.read_timeout_override:
read_timeout = self.read_timeout_override
output = ""
loop_delay = 0.01
start_time = time.time()
# if read_timeout == 0 or 0.0 keep reading indefinitely
while (time.time() - start_time < read_timeout) or (not read_timeout):
output += self.read_channel()
if re.search(pattern, output, flags=re_flags):
results = re.split(pattern, output, maxsplit=1, flags=re_flags)
# The string matched by pattern must be retained in the output string.
# re.split will do this if capturing parenthesis are used.
if len(results) == 2:
# no capturing parenthesis, convert and try again.
pattern = f"({pattern})"
results = re.split(pattern, output, maxsplit=1, flags=re_flags)
if len(results) != 3:
# well, we tried
msg = f"""Unable to successfully split output based on pattern:
pattern={pattern}
output={repr(output)}
results={results}
"""
raise ReadException(msg)
# Process such that everything before and including pattern is return.
# Everything else is retained in the _read_buffer
output, match_str, buffer = results
output = output + match_str
if buffer:
self._read_buffer += buffer
log.debug(f"Pattern found: {pattern} {output}")
return output
time.sleep(loop_delay)
msg = f"""\n\nPattern not detected: {repr(pattern)} in output.
Things you might try to fix this:
1. Adjust the regex pattern to better identify the terminating string. Note, in
many situations the pattern is automatically based on the network device's prompt.
2. Increase the read_timeout to a larger value.
You can also look at the Netmiko session_log or debug log for more information.\n\n"""
raise ReadTimeout(msg)
def read_channel_timing(
self,
last_read: float = 2.0,
read_timeout: float = 120.0,
delay_factor: Optional[float] = None,
max_loops: Optional[int] = None,
) -> str:
"""Read data on the channel based on timing delays.
General pattern is keep reading until no new data is read.
Once no new data is read wait `last_read` amount of time (one last read).
As long as no new data, then return data.
`read_timeout` is an absolute timer for how long to keep reading (which presupposes
we are still getting new data).
Setting `read_timeout` to zero will cause read_channel_timing to never expire based
on an absolute timeout. It will only complete based on timeout based on their being
no new data.
:param delay_factor: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.
:param max_loops: Deprecated in Netmiko 4.x. Will be eliminated in Netmiko 5.
"""
if delay_factor is not None or max_loops is not None:
warnings.warn(DELAY_FACTOR_DEPR_SIMPLE_MSG, DeprecationWarning)
if self.read_timeout_override:
read_timeout = self.read_timeout_override
# Time to delay in each read loop
loop_delay = 0.1
channel_data = ""
start_time = time.time()
# Set read_timeout to 0 to never timeout
while (time.time() - start_time < read_timeout) or (not read_timeout):
time.sleep(loop_delay)
new_data = self.read_channel()
# gather new output
if new_data:
channel_data += new_data
# if we have some output, but nothing new, then do the last read
elif channel_data != "":
# Make sure really done (i.e. no new data)
time.sleep(last_read)
new_data = self.read_channel()
if not new_data:
break
else:
channel_data += new_data
else:
msg = f"""\n
read_channel_timing's absolute timer expired.
The network device was continually outputting data for longer than {read_timeout}
seconds.
If this is expected i.e. the command you are executing is continually emitting
data for a long period of time, then you can set 'read_timeout=x' seconds. If
you want Netmiko to keep reading indefinitely (i.e. to only stop when there is
no new data), then you can set 'read_timeout=0'.
You can look at the Netmiko session_log or debug log for more information.
"""
raise ReadTimeout(msg)
return channel_data
def read_until_prompt(
self,
read_timeout: float = 10.0,
read_entire_line: bool = False,
re_flags: int = 0,
max_loops: Optional[int] = None,
) -> str:
"""Read channel up to and including self.base_prompt."""
pattern = re.escape(self.base_prompt)
if read_entire_line:
pattern = f"{pattern}.*"
return self.read_until_pattern(
pattern=pattern,
re_flags=re_flags,
max_loops=max_loops,
read_timeout=read_timeout,
)
def read_until_prompt_or_pattern(
self,
pattern: str = "",
read_timeout: float = 10.0,
read_entire_line: bool = False,
re_flags: int = 0,
max_loops: Optional[int] = None,
) -> str:
"""Read until either self.base_prompt or pattern is detected."""
prompt_pattern = re.escape(self.base_prompt)
if read_entire_line:
prompt_pattern = f"{prompt_pattern}.*"
if pattern:
combined_pattern = r"(?:{}|{})".format(prompt_pattern, pattern)
else:
combined_pattern = prompt_pattern
return self.read_until_pattern(
pattern=combined_pattern,
re_flags=re_flags,
max_loops=max_loops,
read_timeout=read_timeout,
)
def serial_login(
self,
pri_prompt_terminator: str = r"#\s*$",
alt_prompt_terminator: str = r">\s*$",
username_pattern: str = r"(?:[Uu]ser:|sername|ogin)",
pwd_pattern: str = r"assword",
delay_factor: float = 1.0,
max_loops: int = 20,
) -> str:
return self.telnet_login(
pri_prompt_terminator,
alt_prompt_terminator,
username_pattern,
pwd_pattern,
delay_factor,
max_loops,
)
def telnet_login(
self,
pri_prompt_terminator: str = r"#\s*$",
alt_prompt_terminator: str = r">\s*$",
username_pattern: str = r"(?:user:|username|login|user name)",
pwd_pattern: str = r"assword",
delay_factor: float = 1.0,
max_loops: int = 20,
) -> str:
"""Telnet login. Can be username/password or just password.
:param pri_prompt_terminator: Primary trailing delimiter for identifying a device prompt
:param alt_prompt_terminator: Alternate trailing delimiter for identifying a device prompt
:param username_pattern: Pattern used to identify the username prompt
:param delay_factor: See __init__: global_delay_factor
:param max_loops: Controls the wait time in conjunction with the delay_factor
"""
delay_factor = self.select_delay_factor(delay_factor)
# Revert telnet_login back to old speeds/delays
if delay_factor < 1:
if not self._legacy_mode and self.fast_cli:
delay_factor = 1
time.sleep(1 * delay_factor)
output = ""
return_msg = ""
i = 1
while i <= max_loops:
try:
output = self.read_channel()
return_msg += output
# Search for username pattern / send username
if re.search(username_pattern, output, flags=re.I):
# Sometimes username/password must be terminated with "\r" and not "\r\n"
self.write_channel(self.username + "\r")
time.sleep(1 * delay_factor)
output = self.read_channel()
return_msg += output
# Search for password pattern / send password
if re.search(pwd_pattern, output, flags=re.I):
# Sometimes username/password must be terminated with "\r" and not "\r\n"
assert isinstance(self.password, str)
self.write_channel(self.password + "\r")
time.sleep(0.5 * delay_factor)
output = self.read_channel()
return_msg += output
if re.search(
pri_prompt_terminator, output, flags=re.M
) or re.search(alt_prompt_terminator, output, flags=re.M):
return return_msg
# Check if proper data received
if re.search(pri_prompt_terminator, output, flags=re.M) or re.search(
alt_prompt_terminator, output, flags=re.M
):
return return_msg
self.write_channel(self.TELNET_RETURN)
time.sleep(0.5 * delay_factor)
i += 1
except EOFError:
assert self.remote_conn is not None
self.remote_conn.close()
msg = f"Login failed: {self.host}"
raise NetmikoAuthenticationException(msg)
# Last try to see if we already logged in
self.write_channel(self.TELNET_RETURN)
time.sleep(0.5 * delay_factor)
output = self.read_channel()
return_msg += output
if re.search(pri_prompt_terminator, output, flags=re.M) or re.search(
alt_prompt_terminator, output, flags=re.M
):
return return_msg
msg = f"Login failed: {self.host}"
assert self.remote_conn is not None
self.remote_conn.close()
raise NetmikoAuthenticationException(msg)
def _try_session_preparation(self, force_data: bool = True) -> None:
"""
In case of an exception happening during `session_preparation()` Netmiko should
gracefully clean-up after itself. This might be challenging for library users
to do since they do not have a reference to the object. This is possibly related
to threads used in Paramiko.
"""
try:
# Netmiko needs there to be data for session_preparation to work.
if force_data:
self.write_channel(self.RETURN)
time.sleep(0.1)
self.session_preparation()
except Exception:
self.disconnect()
raise
def session_preparation(self) -> None:
"""
Prepare the session after the connection has been established
This method handles some differences that occur between various devices
early on in the session.
In general, it should include:
self._test_channel_read(pattern=r"some_pattern")
self.set_base_prompt()
self.set_terminal_width()
self.disable_paging()
"""
self._test_channel_read()
self.set_base_prompt()
self.set_terminal_width()
self.disable_paging()
def _use_ssh_config(self, dict_arg: Dict[str, Any]) -> Dict[str, Any]:
"""Update SSH connection parameters based on contents of SSH config file.
:param dict_arg: Dictionary of SSH connection parameters
"""
connect_dict = dict_arg.copy()
# Use SSHConfig to generate source content.
assert self.ssh_config_file is not None
full_path = path.abspath(path.expanduser(self.ssh_config_file))
source: Union[paramiko.config.SSHConfigDict, Dict[str, Any]]
if path.exists(full_path):
ssh_config_instance = paramiko.SSHConfig()
with io.open(full_path, "rt", encoding="utf-8") as f:
ssh_config_instance.parse(f)
source = ssh_config_instance.lookup(self.host)
else:
source = {}
# Keys get normalized to lower-case
proxy: Optional[paramiko.proxy.ProxyCommand]
if "proxycommand" in source:
proxy = paramiko.ProxyCommand(source["proxycommand"])
elif "proxyjump" in source:
hops = list(reversed(source["proxyjump"].split(",")))
if len(hops) > 1:
raise ValueError(
"ProxyJump with more than one proxy server is not supported."
)
port = source.get("port", self.port)
host = source.get("hostname", self.host)
# -F {full_path} forces the continued use of the same SSH config file
cmd = "ssh -F {} -W {}:{} {}".format(full_path, host, port, hops[0])
proxy = paramiko.ProxyCommand(cmd)
else:
proxy = None
# Only update 'hostname', 'sock', 'port', and 'username'
# For 'port' and 'username' only update if using object defaults
if connect_dict["port"] == 22:
connect_dict["port"] = int(source.get("port", self.port))
if connect_dict["username"] == "":
connect_dict["username"] = source.get("user", self.username)
if proxy:
connect_dict["sock"] = proxy
connect_dict["hostname"] = source.get("hostname", self.host)
return connect_dict
def _connect_params_dict(self) -> Dict[str, Any]:
"""Generate dictionary of Paramiko connection parameters."""
conn_dict = {
"hostname": self.host,
"port": self.port,
"username": self.username,
"password": self.password,
"look_for_keys": self.use_keys,
"allow_agent": self.allow_agent,
"key_filename": self.key_file,
"pkey": self.pkey,
"passphrase": self.passphrase,
"disabled_algorithms": self.disabled_algorithms,
"timeout": self.conn_timeout,
"auth_timeout": self.auth_timeout,
"banner_timeout": self.banner_timeout,
"sock": self.sock,
}
# Check if using SSH 'config' file mainly for SSH proxy support
if self.ssh_config_file:
conn_dict = self._use_ssh_config(conn_dict)
return conn_dict
def _sanitize_output(