Skip to content

Commit

Permalink
Add observability bits (#1)
Browse files Browse the repository at this point in the history
* Add observability stack

* updates for initial grafana

* Add CloudWatch Logs example 1 and 2

* Add metrics to CloudWatch

* Shuffle around the Prometheus code

* Add xray tracing

* add temp changes

* update

* scrub

---------

Co-authored-by: Mike Cowgill <[email protected]>
  • Loading branch information
msambol and moofish32 authored Mar 15, 2023
1 parent 9362bd1 commit 660ffad
Show file tree
Hide file tree
Showing 11 changed files with 311 additions and 19 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
!jest.config.js
*.d.ts
node_modules
.node-version

# CDK asset staging directory
.cdk.staging
cdk.out

*.DS_Store
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@ Building an HL7 data lake on AWS as described [here](https://www.michaelsambol.c

## Deploy

Software versions:
```
❯ node --version
v18.6.0
❯ python --version
Python 3.9.6
❯ cdk --version
2.55.0
```

Install dependencies:
```
npm install
Expand All @@ -20,10 +32,16 @@ Create a Lambda layer with the [HL7 Python package](https://pypi.org/project/hl7
aws lambda publish-layer-version --layer-name hl7 --zip-file fileb://layers/hl7.zip --compatible-runtimes python3.9 --description "Parsing messages of Health Level 7 (HL7) version 2.x into Python objects"
```

Create a Lambda layer with the [AWS Embedded Metrics package](https://pypi.org/project/aws-embedded-metrics/) and drop the ARN in `cdk.context.json`. The zip is included in this repo.
```
aws lambda publish-layer-version --layer-name aws_embedded_metrics --zip-file fileb://layers/aws_embedded_metrics.zip --compatible-runtimes python3.9 --description "Amazon CloudWatch Embedded Metric Format Client Library"
```

Deploy stack:
```
// dev
cdk deploy --context environment=dev Hl7DataLakeStack-dev
cdk deploy --context environment=dev Hl7ObservabilityStack-dev
// add additional environments if desired
```
Expand Down
8 changes: 8 additions & 0 deletions bin/hl7-data-lake.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import 'source-map-support/register'
import { App } from 'aws-cdk-lib'
import { Hl7DataLakeStack } from '../lib/hl7-data-lake-stack'
import { Hl7ObservabilityStack } from '../lib/hl7-observability-stack'

const app = new App()

Expand All @@ -16,5 +17,12 @@ const regionalEnv = {env: {region: envContext.region, account: envContext.accoun
new Hl7DataLakeStack(app, `Hl7DataLakeStack-${environment}`, {
environment,
hl7LayerArn: envContext['hl7LayerArn'],
embeddedMetricsLayerArn: envContext['embeddedMetricsLayerArn'],
otelLayerArn: envContext['otelLayerArn'],
...regionalEnv
})

new Hl7ObservabilityStack(app, `Hl7ObservabilityStack-${environment}`, {
environment,
...regionalEnv
})
4 changes: 3 additions & 1 deletion cdk.context.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
"dev": {
"account": "CHANGE_ME",
"region": "us-east-2",
"hl7LayerArn": "arn:aws:lambda:us-east-2:CHANGE_ME:layer:hl7:3"
"hl7LayerArn": "arn:aws:lambda:us-east-2:CHANGE_ME:layer:hl7:3",
"embeddedMetricsLayerArn": "arn:aws:lambda:us-east-2:CHANGE_ME:layer:aws_embedded_metrics:1",
"otelLayerArn": "arn:aws:lambda:us-east-2:901920570463:layer:aws-otel-python-amd64-ver-1-15-0:1"
}
}
32 changes: 32 additions & 0 deletions lambdas/collector.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
extensions:
sigv4auth:
service: "aps"
region: "us-east-2"

receivers:
otlp:
protocols:
http:

exporters:
prometheusremotewrite:
endpoint: "https://aps-workspaces.us-east-2.amazonaws.com/workspaces/<WORKSPACE_ID>/api/v1/remote_write"
auth:
authenticator: sigv4auth
logging:
loglevel: debug
awsxray:

service:
extensions: [sigv4auth]
pipelines:
metrics:
receivers: [otlp]
processors: []
exporters: [logging, prometheusremotewrite]
traces:
receivers: [otlp]
exporters: [logging, awsxray]
telemetry:
logs:
level: debug
81 changes: 73 additions & 8 deletions lambdas/hl7_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import hl7
import json
import os
from opentelemetry import metrics as ot_metrics
from opentelemetry.metrics import get_meter_provider
from aws_embedded_metrics import metric_scope

from ParsedHL7 import *

Expand All @@ -16,15 +19,26 @@
STREAM_NAME = os.environ.get('STREAM_NAME', '')
PROCESSED_JSON_PREFIX = 'processed_json'
PROCESSED_HL7_PREFIX = 'processed_hl7'
NAMING_PREFIX = os.environ.get('NAMING_PREFIX', '')

# CloudWatch metrics
SUCCESS = '[SUCCESS] HL7 message processed'
FAILED = '[FAILED] Failure during HL7 message processing'

# Prometheus
meter = ot_metrics.get_meter(__name__)
meter_provider = get_meter_provider()
counter = meter.create_counter(
name="invocation_count", unit="1", description="Counts the number of function invocations"
)

def parse_hl7(hl7_body, filename):
try:
hl7_string = hl7_body.replace('\n', '')
parsed_hl7 = ParsedHL7()
h = hl7.parse(hl7_string)
except Exception as e:
raise ValueError(f'Failed to parse HL7 string: {e}')
raise ValueError(f'[FAILED] Failed to parse HL7 string: {e}')

# HL7 data is notoriously bad. I'm wrapping all the extractions in try blocks.
# If there are certain values that you require, you can raise errors.
Expand Down Expand Up @@ -200,13 +214,22 @@ def partition_date(transaction_date):
return '9999-12-31'


def handler(event, context):
@metric_scope
def handler(event, context, metrics):

# Embedded metrics format
metrics.set_namespace(NAMING_PREFIX)
metrics.set_dimensions({'LambdaFunctionName': context.function_name})

try:
for record in event.get('Records', []):
body = json.loads(record.get('body', {}))
message = json.loads(body.get('Message', {}))

for record in message.get('Records', []):
# track messages processed so we can count error rate
metrics.put_metric('Total_HL7_Messages_Processed', 1, 'Count')

bucket = record.get('s3').get('bucket').get('name')
key = record.get('s3').get('object').get('key')
filename = key.split("/")[-1]
Expand All @@ -217,7 +240,7 @@ def handler(event, context):
s3_object = bucket_object.Object(key)
s3_object_body = s3_object.get()['Body'].read().decode('utf-8')
except Exception as e:
raise Exception(f'Failed reading object body from S3: {e}')
raise Exception(f'[FAILED] Failed reading object body from S3: {e}')

parsed_hl7 = parse_hl7(s3_object_body, filename)

Expand All @@ -227,6 +250,27 @@ def handler(event, context):
transaction_date = parsed_hl7['partitions']['transaction_date'] = partition_date(parsed_hl7['transaction_date'])
print(f'Partitions: {json.dumps(parsed_hl7["partitions"])}')


#################
# OBSERVABILITY #
# -- failure -- #
#################

# force a failure if first name == "NONAME"
if parsed_hl7['first_name'] == "NONAME":
# Example 1: Using CloudWatch Logs metric filters
failed = {
'result': 'FAILED',
'message': FAILED,
'lambdaFunctionName': context.function_name,
}
print(json.dumps(failed))

# Example 2: Embedded metric format
metrics.put_metric('Example_2_Failure', 1, 'Count')

continue

try:
# need parsed_hl7['partitions'] for dynamic partitioning in Firehose
firehose.put_record(
Expand All @@ -235,7 +279,7 @@ def handler(event, context):
)
print('Record sent to Firehose!')
except Exception as e:
raise Exception(f'Failed putting record into Firehose: {e}')
raise Exception(f'[FAILED] Failed putting record into Firehose: {e}')

try:
# don't need partitions in the stored json
Expand All @@ -244,24 +288,45 @@ def handler(event, context):
json_object.put(Body=json.dumps(parsed_hl7).encode('UTF-8'))
print('JSON file saved in processed S3 location!')
except Exception as e:
raise Exception(f'Failed putting json into S3: {e}')
raise Exception(f'[FAILED] Failed putting json into S3: {e}')

try:
# move the file to a processed location
copy_source = {'Bucket': bucket, 'Key': key}
s3r.Bucket(bucket).copy(copy_source, f'{PROCESSED_HL7_PREFIX}/{filename}')
print('Copied HL7 file to processed location!')
except Exception as e:
raise Exception(f'Failed copying HL7 to processed S3 location: {e}')
raise Exception(f'[FAILED] Failed copying HL7 to processed S3 location: {e}')

try:
# remove file from original S3 location (this helps gauge how many are left for processing)
s3r.Object(bucket, key).delete()
print('Deleted HL7 file from original S3 location!')
except Exception as e:
raise Exception(f'Failed deleting file from original S3 location: {e}')
raise Exception(f'[FAILED] Failed deleting file from original S3 location: {e}')

#################
# OBSERVABILITY #
# -- success -- #
#################

# Example 1: Using CloudWatch Logs metric filters
success = {
'result': 'SUCCESS',
'message': SUCCESS,
'lambdaFunctionName': context.function_name,
}
print(json.dumps(success))

# Example 2: Embedded metric format
metrics.put_metric('Example_2_Success', 1, 'Count')

# Prometheus
counter.add(1)
if hasattr(meter_provider, 'force_flush'):
meter_provider.force_flush(1000)

except Exception as e:
raise Exception(f'Failed retrieving data from record: {e}')
raise Exception(f'[FAILED] Failed retrieving data from record: {e}')

return True
Binary file added layers/aws_embedded_metrics.zip
Binary file not shown.
Loading

0 comments on commit 660ffad

Please sign in to comment.