-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathget_logs.py
executable file
·118 lines (100 loc) · 3.22 KB
/
get_logs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#!/usr/bin/env python
import argparse
from datetime import datetime
import boto3
from tqdm.auto import tqdm
client = boto3.client("logs")
def get_latest_stream_name(log_group_name):
"""Get the latest stream name for a given log group
Parameters
----------
log_group_name : str
The name of the log group to query for streams.
Returns
-------
str
The name of the latest log stream in the log group
"""
response = client.describe_log_streams(
logGroupName=log_group_name,
orderBy="LastEventTime",
descending=True,
limit=1,
)
return response["logStreams"][0]["logStreamName"]
def get_logs(
log_stream_name, log_group_name, forward_token=None,
):
"""
Get the logs for a given log stream, optionally using
a forward continuation token.
Parameters
----------
log_stream_name : str
The name of the log stream to query
log_group_name : str
The log group that the stream belongs to
forward_token : str or None,default=None
The forward continuation token extracted from an earlier response
Returns
-------
dict
Log events as returned by boto, for more details
see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html#CloudWatchLogs.Client.get_log_events
"""
kwargs = {
"logGroupName": log_group_name,
"logStreamName": log_stream_name,
"startFromHead": True,
}
if forward_token is not None:
kwargs["nextToken"] = forward_token
return client.get_log_events(**kwargs)
if __name__ == "__main__":
# Arguments setup
parser = argparse.ArgumentParser(description="Get a whole log stream with all the fragments from AWS.")
parser.add_argument(
"-g",
"--group-name",
help="The name of the log group to use.",
default="/ecs/warehouse-prod-pipeline",
)
parser.add_argument(
"-s",
"--stream-name",
help="The log stream name to get. "
+ "By default the latest stream is queried and downloaded.",
)
parser.add_argument(
"-o", "--output-file", help="File to save the log results to.",
)
args = parser.parse_args()
# End arguments setup
stream_name = (
get_latest_stream_name(log_group_name=args.group_name)
if args.stream_name is None
else args.stream_name
)
f = open(args.output_file, "w") if args.output_file else None
print_kwargs = {"file": f} if f is not None else {}
forward_token = None
# Display progress bar
pbar = tqdm(desc="Gathering log fragments", unit="piece")
while True:
result = get_logs(
stream_name,
log_group_name=args.group_name,
forward_token=forward_token,
)
pbar.update(1)
count = len(result["events"])
if count == 0:
pbar.close()
break
forward_token = result["nextForwardToken"]
for event in result["events"]:
timestamp = datetime.fromtimestamp(event["timestamp"] / 1000)
message = event["message"]
print(f"{timestamp.isoformat()} {message}", **print_kwargs)
if f is not None:
f.close()