-
Notifications
You must be signed in to change notification settings - Fork 16
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
LS-5226: Add support for Proto over HTTP
- Fixed an issue in examples where argparser will not parse bool as expected (use_tls) - Added options to Tracer use_thrift or use_http - Created Converter abstract class, either a Thrift or an HTTP converter will be used - Implemented HTTPConnection
- Loading branch information
1 parent
ffc71ff
commit b3a2b33
Showing
13 changed files
with
490 additions
and
209 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
from abc import ABCMeta, abstractmethod | ||
|
||
|
||
class Converter(object): | ||
"""Converter is a simple abstract interface for converting span data to wire compatible formats for the Satellites. | ||
""" | ||
|
||
__metaclass__ = ABCMeta | ||
|
||
@abstractmethod | ||
def create_auth(self, access_token): | ||
pass | ||
|
||
@abstractmethod | ||
def create_runtime(self, component_name, tags, guid): | ||
pass | ||
|
||
@abstractmethod | ||
def create_span_record(self, span, guid): | ||
pass | ||
|
||
@abstractmethod | ||
def append_attribute(self, span_record, key, value): | ||
pass | ||
|
||
@abstractmethod | ||
def append_join_id(self, span_record, key, value): | ||
pass | ||
|
||
@abstractmethod | ||
def append_log(self, span_record, log): | ||
pass | ||
|
||
@abstractmethod | ||
def create_report(self, runtime, span_records): | ||
pass | ||
|
||
@abstractmethod | ||
def combine_span_records(self, report_request, span_records): | ||
pass | ||
|
||
@abstractmethod | ||
def num_span_records(self, report_request): | ||
pass | ||
|
||
@abstractmethod | ||
def get_span_records(self, report_request): | ||
pass | ||
|
||
@abstractmethod | ||
def get_span_name(self, span_record): | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
""" Connection class establishes HTTP connection with server. | ||
Utilized to send Proto Report Requests. | ||
""" | ||
import threading | ||
import requests | ||
|
||
from lightstep.collector_pb2 import ReportResponse | ||
|
||
CONSECUTIVE_ERRORS_BEFORE_RECONNECT = 200 | ||
|
||
|
||
class _HTTPConnection(object): | ||
"""Instances of _Connection are used to establish a connection to the | ||
server via HTTP protocol. | ||
""" | ||
def __init__(self, collector_url): | ||
self._collector_url = collector_url | ||
self._lock = threading.Lock() | ||
self.ready = True | ||
self._report_eof_count = 0 | ||
self._report_consecutive_errors = 0 | ||
|
||
def open(self): | ||
"""Establish HTTP connection to the server. | ||
""" | ||
pass | ||
|
||
# May throw an Exception on failure. | ||
def report(self, *args, **kwargs): | ||
"""Report to the server.""" | ||
# Notice the annoying case change on the method name. I chose to stay | ||
# consistent with casing in this class vs staying consistent with the | ||
# casing of the pass-through method. | ||
auth = args[0] | ||
report = args[1] | ||
with self._lock: | ||
try: | ||
report.auth.access_token = auth.access_token | ||
headers = {"Content-Type": "application/octet-stream", | ||
"Accept": "application/octet-stream"} | ||
|
||
r = requests.post(self._collector_url, headers=headers, data=report.SerializeToString()) | ||
resp = ReportResponse() | ||
resp.ParseFromString(r.content) | ||
self._report_consecutive_errors = 0 | ||
return resp | ||
except EOFError: | ||
self._report_consecutive_errors += 1 | ||
self._report_eof_count += 1 | ||
raise Exception('EOFError') | ||
finally: | ||
# In case the client has fallen into an unrecoverable state, | ||
# recreate the data structure if there are continued report | ||
# failures | ||
if self._report_consecutive_errors == CONSECUTIVE_ERRORS_BEFORE_RECONNECT: | ||
self._report_consecutive_errors = 0 | ||
self.ready = False | ||
|
||
def close(self): | ||
"""Close HTTP connection to the server.""" | ||
self.ready = False | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
from lightstep.collector_pb2 import Auth, ReportRequest, Span, Reporter, KeyValue, Reference, SpanContext | ||
from lightstep.converter import Converter | ||
from . import util | ||
from . import version as tracer_version | ||
import sys | ||
from google.protobuf.timestamp_pb2 import Timestamp | ||
|
||
|
||
class HttpConverter(Converter): | ||
|
||
def create_auth(self, access_token): | ||
auth = Auth() | ||
auth.access_token = access_token | ||
return auth | ||
|
||
def create_runtime(self, component_name, tags, guid): | ||
if component_name is None: | ||
component_name = sys.argv[0] | ||
|
||
python_version = '.'.join(map(str, sys.version_info[0:3])) | ||
|
||
if tags is None: | ||
tags = {} | ||
tracer_tags = tags.copy() | ||
|
||
tracer_tags.update({ | ||
'lightstep.tracer_platform': 'python', | ||
'lightstep.tracer_platform_version': python_version, | ||
'lightstep.tracer_version': tracer_version.LIGHTSTEP_PYTHON_TRACER_VERSION, | ||
'lightstep.component_name': component_name, | ||
'lightstep.guid': util._id_to_hex(guid), | ||
}) | ||
|
||
# Convert tracer_tags to a list of KeyValue pairs. | ||
runtime_attrs = [KeyValue(key=k, string_value=util._coerce_str(v)) for (k, v) in tracer_tags.items()] | ||
|
||
return Reporter(reporter_id=guid, tags=runtime_attrs) | ||
|
||
def create_span_record(self, span, guid): | ||
span_context = SpanContext(trace_id=span.context.trace_id, | ||
span_id=span.context.span_id) | ||
span_record = Span(span_context=span_context, | ||
operation_name=util._coerce_str(span.operation_name), | ||
start_timestamp=Timestamp(seconds=int(span.start_time)), | ||
duration_micros=int(util._time_to_micros(span.duration))) | ||
if span.parent_id is not None: | ||
reference = span_record.references.add() | ||
reference.relationship=Reference.CHILD_OF | ||
reference.span_context.span_id=span.parent_id | ||
|
||
return span_record | ||
|
||
def append_attribute(self, span_record, key, value): | ||
kv = span_record.tags.add() | ||
kv.key = key | ||
kv.string_value = value | ||
|
||
def append_join_id(self, span_record, key, value): | ||
self.append_attribute(span_record, key, value) | ||
|
||
def append_log(self, span_record, log): | ||
if log.key_values is not None and len(log.key_values) > 0: | ||
proto_log = span_record.logs.add() | ||
proto_log.timestamp.seconds=int(log.timestamp) | ||
for k, v in log.key_values.items(): | ||
field = proto_log.fields.add() | ||
field.key = k | ||
field.string_value = util._coerce_str(v) | ||
|
||
def create_report(self, runtime, span_records): | ||
return ReportRequest(reporter=runtime, spans=span_records) | ||
|
||
def combine_span_records(self, report_request, span_records): | ||
report_request.spans.extend(span_records) | ||
return report_request.spans | ||
|
||
def num_span_records(self, report_request): | ||
return len(report_request.spans) | ||
|
||
def get_span_records(self, report_request): | ||
return report_request.spans | ||
|
||
def get_span_name(self, span_record): | ||
return span_record.operation_name |
Oops, something went wrong.