Skip to content

Commit

Permalink
Elasticsearch did not allow 'doc_type' starting with _ until
Browse files Browse the repository at this point in the history
version 6.2. This is now reflected in ElastAlert
  • Loading branch information
matsgoran committed Apr 12, 2019
1 parent 33e6ab5 commit 2434ef6
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 39 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ jobs:
env: TOXENV=py27 ES_VERSION=7.0.0-linux-x86_64
- env: TOXENV=py27 ES_VERSION=6.6.2
- env: TOXENV=py27 ES_VERSION=6.3.2
- env: TOXENV=py27 ES_VERSION=6.2.4
- env: TOXENV=py27 ES_VERSION=6.0.1
- env: TOXENV=py27 ES_VERSION=5.6.16

Expand Down
16 changes: 16 additions & 0 deletions elastalert/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,22 @@ def is_atleastseven(self):
"""
return int(self.es_version.split(".")[0]) >= 7

def resolve_writeback_index(self, writeback_index, doc_type):
""" In ES6, you cannot have multiple _types per index,
therefore we use self.writeback_index as the prefix for the actual
index name, based on doc_type. """
if not self.is_atleastsix():
return writeback_index
elif doc_type == 'silence':
return writeback_index + '_silence'
elif doc_type == 'past_elastalert':
return writeback_index + '_past'
elif doc_type == 'elastalert_status':
return writeback_index + '_status'
elif doc_type == 'elastalert_error':
return writeback_index + '_error'
return writeback_index

@query_params(
"_source",
"_source_exclude",
Expand Down
33 changes: 28 additions & 5 deletions elastalert/create_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@
def main(es_client, ea_index, recreate=False, old_ea_index=None):
esversion = es_client.info()["version"]["number"]
print("Elastic Version: " + esversion)
elasticversion = int(esversion.split(".")[0])

es_index_mappings = read_es_index_mappings() if elasticversion > 5 else read_es_index_mappings(5)
es_index_mappings = read_es_index_mappings() if is_atleastsix(esversion) else read_es_index_mappings(5)

es_index = IndicesClient(es_client)
if not recreate:
Expand All @@ -34,7 +33,7 @@ def main(es_client, ea_index, recreate=False, old_ea_index=None):
return None

# (Re-)Create indices.
if (elasticversion > 5):
if is_atleastsix(esversion):
index_names = (
ea_index,
ea_index + '_status',
Expand All @@ -59,7 +58,7 @@ def main(es_client, ea_index, recreate=False, old_ea_index=None):
# To avoid a race condition. TODO: replace this with a real check
time.sleep(2)

if elasticversion > 6:
if is_atleastseven(esversion):
# TODO remove doc_type completely when elasicsearch client allows doc_type=None
# doc_type is a deprecated feature and will be completely removed in Elasicsearch 8
es_client.indices.put_mapping(index=ea_index, doc_type='_doc',
Expand All @@ -72,7 +71,7 @@ def main(es_client, ea_index, recreate=False, old_ea_index=None):
body=es_index_mappings['elastalert_error'], include_type_name=True)
es_client.indices.put_mapping(index=ea_index + '_past', doc_type='_doc',
body=es_index_mappings['past_elastalert'], include_type_name=True)
elif elasticversion > 5:
elif is_atleastsixtwo(esversion):
es_client.indices.put_mapping(index=ea_index, doc_type='_doc',
body=es_index_mappings['elastalert'])
es_client.indices.put_mapping(index=ea_index + '_status', doc_type='_doc',
Expand All @@ -83,6 +82,17 @@ def main(es_client, ea_index, recreate=False, old_ea_index=None):
body=es_index_mappings['elastalert_error'])
es_client.indices.put_mapping(index=ea_index + '_past', doc_type='_doc',
body=es_index_mappings['past_elastalert'])
elif is_atleastsix(esversion):
es_client.indices.put_mapping(index=ea_index, doc_type='elastalert',
body=es_index_mappings['elastalert'])
es_client.indices.put_mapping(index=ea_index + '_status', doc_type='elastalert_status',
body=es_index_mappings['elastalert_status'])
es_client.indices.put_mapping(index=ea_index + '_silence', doc_type='silence',
body=es_index_mappings['silence'])
es_client.indices.put_mapping(index=ea_index + '_error', doc_type='elastalert_error',
body=es_index_mappings['elastalert_error'])
es_client.indices.put_mapping(index=ea_index + '_past', doc_type='past_elastalert',
body=es_index_mappings['past_elastalert'])
else:
es_client.indices.put_mapping(index=ea_index, doc_type='elastalert',
body=es_index_mappings['elastalert'])
Expand Down Expand Up @@ -124,6 +134,19 @@ def read_es_index_mapping(mapping, es_version=6):
return json.load(f)


def is_atleastsix(es_version):
return int(es_version.split(".")[0]) >= 6


def is_atleastsixtwo(es_version):
major, minor = map(int, es_version.split(".")[:2])
return major > 6 or (major == 6 and minor >= 2)


def is_atleastseven(es_version):
return int(es_version.split(".")[0]) >= 7


if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--host', default=os.environ.get('ES_HOST', None), help='Elasticsearch host')
Expand Down
51 changes: 19 additions & 32 deletions elastalert/elastalert.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,21 +162,6 @@ def get_index(rule, starttime=None, endtime=None):
else:
return index

def get_six_index(self, doc_type):
""" In ES6, you cannot have multiple _types per index,
therefore we use self.writeback_index as the prefix for the actual
index name, based on doc_type. """
writeback_index = self.writeback_index
if doc_type == 'silence':
writeback_index += '_silence'
elif doc_type == 'past_elastalert':
writeback_index += '_past'
elif doc_type == 'elastalert_status':
writeback_index += '_status'
elif doc_type == 'elastalert_error':
writeback_index += '_error'
return writeback_index

@staticmethod
def get_query(filters, starttime=None, endtime=None, sort=True, timestamp_field='@timestamp', to_ts_func=dt_to_ts, desc=False,
five=False):
Expand Down Expand Up @@ -654,16 +639,17 @@ def get_starttime(self, rule):
query.update(sort)

try:
if self.writeback_es.is_atleastsix():
index = self.get_six_index('elastalert_status')
doc_type = 'elastalert_status'
index = self.writeback_es.resolve_writeback_index(self.writeback_index, doc_type)
if self.writeback_es.is_atleastsixtwo():
if self.writeback_es.is_atleastsixsix():
res = self.writeback_es.search(index=index, size=1, body=query,
_source_includes=['endtime', 'rule_name'])
else:
res = self.writeback_es.search(index=index, size=1, body=query,
_source_include=['endtime', 'rule_name'])
else:
res = self.writeback_es.deprecated_search(index=self.writeback_index, doc_type='elastalert_status',
res = self.writeback_es.deprecated_search(index=index, doc_type=doc_type,
size=1, body=query, _source_include=['endtime', 'rule_name'])
if res['hits']['hits']:
endtime = ts_to_dt(res['hits']['hits'][0]['_source']['endtime'])
Expand Down Expand Up @@ -1497,11 +1483,11 @@ def writeback(self, doc_type, body):
writeback_body['@timestamp'] = dt_to_ts(ts_now())

try:
if self.writeback_es.is_atleastsix():
writeback_index = self.get_six_index(doc_type)
res = self.writeback_es.index(index=writeback_index, body=body)
index = self.writeback_es.resolve_writeback_index(self.writeback_index, doc_type)
if self.writeback_es.is_atleastsixtwo():
res = self.writeback_es.index(index=index, body=body)
else:
res = self.writeback_es.index(index=self.writeback_index, doc_type=doc_type, body=body)
res = self.writeback_es.index(index=index, doc_type=doc_type, body=body)
return res
except ElasticsearchException as e:
logging.exception("Error writing alert info to Elasticsearch: %s" % (e))
Expand All @@ -1524,7 +1510,7 @@ def find_recent_pending_alerts(self, time_limit):
query = {'query': inner_query, 'filter': time_filter}
query.update(sort)
try:
if self.writeback_es.is_atleastsix():
if self.writeback_es.is_atleastsixtwo():
res = self.writeback_es.search(index=self.writeback_index, body=query, size=1000)
else:
res = self.writeback_es.deprecated_search(index=self.writeback_index,
Expand Down Expand Up @@ -1580,8 +1566,8 @@ def send_pending_alerts(self):

# Delete it from the index
try:
if self.writeback_es.is_atleastsix():
self.writeback_es.delete(index=self.writeback_index, doc_type='_doc', id=_id)
if self.writeback_es.is_atleastsixtwo():
self.writeback_es.delete(index=self.writeback_index, id=_id)
else:
self.writeback_es.delete(index=self.writeback_index, doc_type='elastalert', id=_id)
except ElasticsearchException: # TODO: Give this a more relevant exception, try:except: is evil.
Expand Down Expand Up @@ -1613,16 +1599,16 @@ def get_aggregated_matches(self, _id):
query = {'query': {'query_string': {'query': 'aggregate_id:%s' % (_id)}}, 'sort': {'@timestamp': 'asc'}}
matches = []
try:
if self.writeback_es.is_atleastsix():
if self.writeback_es.is_atleastsixtwo():
res = self.writeback_es.search(index=self.writeback_index, body=query,
size=self.max_aggregation)
else:
res = self.writeback_es.deprecated_search(index=self.writeback_index, doc_type='elastalert',
body=query, size=self.max_aggregation)
for match in res['hits']['hits']:
matches.append(match['_source'])
if self.writeback_es.is_atleastsix():
self.writeback_es.delete(index=self.writeback_index, doc_type='_doc', id=match['_id'])
if self.writeback_es.is_atleastsixtwo():
self.writeback_es.delete(index=self.writeback_index, id=match['_id'])
else:
self.writeback_es.delete(index=self.writeback_index, doc_type='elastalert', id=match['_id'])
except (KeyError, ElasticsearchException) as e:
Expand All @@ -1640,7 +1626,7 @@ def find_pending_aggregate_alert(self, rule, aggregation_key_value=None):
query = {'query': {'bool': query}}
query['sort'] = {'alert_time': {'order': 'desc'}}
try:
if self.writeback_es.is_atleastsix():
if self.writeback_es.is_atleastsixtwo():
res = self.writeback_es.search(index=self.writeback_index, body=query, size=1)
else:
res = self.writeback_es.deprecated_search(index=self.writeback_index, doc_type='elastalert', body=query, size=1)
Expand Down Expand Up @@ -1782,16 +1768,17 @@ def is_silenced(self, rule_name):
query.update(sort)

try:
if self.writeback_es.is_atleastsix():
index = self.get_six_index('silence')
doc_type = 'silence'
index = self.writeback_es.resolve_writeback_index(self.writeback_index, doc_type)
if self.writeback_es.is_atleastsixtwo():
if self.writeback_es.is_atleastsixsix():
res = self.writeback_es.search(index=index, size=1, body=query,
_source_includes=['until', 'exponent'])
else:
res = self.writeback_es.search(index=index, size=1, body=query,
_source_include=['until', 'exponent'])
else:
res = self.writeback_es.deprecated_search(index=self.writeback_index, doc_type='silence',
res = self.writeback_es.deprecated_search(index=index, doc_type=doc_type,
size=1, body=query, _source_include=['until', 'exponent'])
except ElasticsearchException as e:
self.handle_error("Error while querying for alert silence status: %s" % (e), {'rule': rule_name})
Expand Down
22 changes: 20 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from elastalert.util import dt_to_ts
from elastalert.util import ts_to_dt

writeback_index = 'wb'


def pytest_addoption(parser):
parser.addoption(
Expand Down Expand Up @@ -71,8 +73,10 @@ def __init__(self, host='es', port=14900):
self.es_version = mock.Mock(return_value='2.0')
self.is_atleastfive = mock.Mock(return_value=False)
self.is_atleastsix = mock.Mock(return_value=False)
self.is_atleastsixtwo = mock.Mock(return_value=False)
self.is_atleastsixsix = mock.Mock(return_value=False)
self.is_atleastseven = mock.Mock(return_value=False)
self.resolve_writeback_index = mock.Mock(return_value=writeback_index)


class mock_es_sixsix_client(object):
Expand All @@ -91,9 +95,23 @@ def __init__(self, host='es', port=14900):
self.es_version = mock.Mock(return_value='6.6.0')
self.is_atleastfive = mock.Mock(return_value=True)
self.is_atleastsix = mock.Mock(return_value=True)
self.is_atleastsixtwo = mock.Mock(return_value=False)
self.is_atleastsixsix = mock.Mock(return_value=True)
self.is_atleastseven = mock.Mock(return_value=False)

def writeback_index_side_effect(index, doc_type):
if doc_type == 'silence':
return index + '_silence'
elif doc_type == 'past_elastalert':
return index + '_past'
elif doc_type == 'elastalert_status':
return index + '_status'
elif doc_type == 'elastalert_error':
return index + '_error'
return index

self.resolve_writeback_index = mock.Mock(side_effect=writeback_index_side_effect)


class mock_ruletype(object):
def __init__(self):
Expand Down Expand Up @@ -138,7 +156,7 @@ def ea():
'alert_time_limit': datetime.timedelta(hours=24),
'es_host': 'es',
'es_port': 14900,
'writeback_index': 'wb',
'writeback_index': writeback_index,
'rules': rules,
'max_query_size': 10000,
'old_query_limit': datetime.timedelta(weeks=1),
Expand Down Expand Up @@ -184,7 +202,7 @@ def ea_sixsix():
'alert_time_limit': datetime.timedelta(hours=24),
'es_host': 'es',
'es_port': 14900,
'writeback_index': 'wb',
'writeback_index': writeback_index,
'rules': rules,
'max_query_size': 10000,
'old_query_limit': datetime.timedelta(weeks=1),
Expand Down

0 comments on commit 2434ef6

Please sign in to comment.