Skip to content

Commit

Permalink
Changes to make tagging file tests data driven log2timeline#2937 (log…
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz authored May 24, 2020
1 parent f718e4c commit 8f945d7
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 70 deletions.
91 changes: 27 additions & 64 deletions tests/data/tag_linux.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

from plaso.containers import events
from plaso.lib import definitions
from plaso.lib import timelib
from plaso.parsers import bash_history
from plaso.parsers import docker
from plaso.parsers import dpkg
Expand All @@ -30,86 +29,50 @@ class LinuxTaggingFileTest(test_lib.TaggingFileTestCase):

_TAG_FILE = 'tag_linux.txt'

_TEST_TIMESTAMP = timelib.Timestamp.CopyFromString('2020-04-04 14:56:39')

def testRuleApplicationExecution(self):
"""Tests the application_execution tagging rule."""
event = events.EventObject()
event.timestamp = self._TEST_TIMESTAMP
event.timestamp_desc = definitions.TIME_DESCRIPTION_UNKNOWN

# Test: data_type is 'bash:history:command'
event_data = bash_history.BashHistoryEventData()

storage_writer = self._TagEvent(event, event_data)

self.assertEqual(storage_writer.number_of_event_tags, 1)
self._CheckLabels(storage_writer, ['application_execution'])
attribute_values_per_name = {}
self._CheckTaggingRule(
bash_history.BashHistoryEventData, attribute_values_per_name,
['application_execution'])

# Test: data_type is 'docker:json:layer'
event_data = docker.DockerJSONLayerEventData()

storage_writer = self._TagEvent(event, event_data)

self.assertEqual(storage_writer.number_of_event_tags, 1)
self._CheckLabels(storage_writer, ['application_execution'])
attribute_values_per_name = {}
self._CheckTaggingRule(
docker.DockerJSONLayerEventData, attribute_values_per_name,
['application_execution'])

# Test: data_type is 'selinux:line' AND audit_type is 'EXECVE'
event_data = selinux.SELinuxLogEventData()
event_data.audit_type = 'bogus'

storage_writer = self._TagEvent(event, event_data)

self.assertEqual(storage_writer.number_of_event_tags, 0)
self._CheckLabels(storage_writer, [])

event_data.audit_type = 'EXECVE'

storage_writer = self._TagEvent(event, event_data)

self.assertEqual(storage_writer.number_of_event_tags, 1)
self._CheckLabels(storage_writer, ['application_execution'])
attribute_values_per_name = {
'audit_type': ['EXECVE']}
self._CheckTaggingRule(
selinux.SELinuxLogEventData, attribute_values_per_name,
['application_execution'])

# Test: data_type is 'shell:zsh:history'
event_data = zsh_extended_history.ZshHistoryEventData()

storage_writer = self._TagEvent(event, event_data)

self.assertEqual(storage_writer.number_of_event_tags, 1)
self._CheckLabels(storage_writer, ['application_execution'])
attribute_values_per_name = {}
self._CheckTaggingRule(
zsh_extended_history.ZshHistoryEventData, attribute_values_per_name,
['application_execution'])

# Test: data_type is 'syslog:cron:task_run'
event_data = cron.CronTaskRunEventData()

storage_writer = self._TagEvent(event, event_data)

self.assertEqual(storage_writer.number_of_event_tags, 1)
self._CheckLabels(storage_writer, ['application_execution'])
attribute_values_per_name = {}
self._CheckTaggingRule(
cron.CronTaskRunEventData, attribute_values_per_name,
['application_execution'])

# Test: reporter is 'sudo' AND body contains 'COMMAND='
event_data = syslog.SyslogLineEventData()
event_data.reporter = 'sudo'
event_data.body = 'bogus'

storage_writer = self._TagEvent(event, event_data)

self.assertEqual(storage_writer.number_of_event_tags, 0)
self._CheckLabels(storage_writer, [])

event_data.reporter = 'bogus'
event_data.body = 'test if my COMMAND=bogus'

storage_writer = self._TagEvent(event, event_data)

self.assertEqual(storage_writer.number_of_event_tags, 0)
self._CheckLabels(storage_writer, [])

event_data.reporter = 'sudo'

storage_writer = self._TagEvent(event, event_data)

self.assertEqual(storage_writer.number_of_event_tags, 1)
self._CheckLabels(storage_writer, ['application_execution'])
attribute_values_per_name = {
'body': ['test if my COMMAND=bogus'],
'reporter': ['sudo']}
self._CheckTaggingRule(
syslog.SyslogLineEventData, attribute_values_per_name,
['application_execution'])

def testRuleLogin(self):
"""Tests the login tagging rule."""
Expand Down
3 changes: 0 additions & 3 deletions tests/data/tag_macos.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from plaso.containers import events
from plaso.containers import plist_event
from plaso.lib import definitions
from plaso.lib import timelib
from plaso.parsers import filestat
from plaso.parsers import syslog
from plaso.parsers.olecf_plugins import summary
Expand All @@ -30,8 +29,6 @@ class MacOSTaggingFileTest(test_lib.TaggingFileTestCase):

_TAG_FILE = 'tag_macos.txt'

_TEST_TIMESTAMP = timelib.Timestamp.CopyFromString('2020-04-04 13:46:25')

def testRuleApplicationExecution(self):
"""Tests the application_execution tagging rule."""
event = events.EventObject()
Expand Down
3 changes: 0 additions & 3 deletions tests/data/tag_windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

from plaso.containers import events
from plaso.lib import definitions
from plaso.lib import timelib
from plaso.parsers import filestat
from plaso.parsers import winevt
from plaso.parsers import winevtx
Expand All @@ -34,8 +33,6 @@ class WindowsTaggingFileTest(test_lib.TaggingFileTestCase):

_TAG_FILE = 'tag_windows.txt'

_TEST_TIMESTAMP = timelib.Timestamp.CopyFromString('2020-04-10 15:22:28')

def testApplicationExecution(self):
"""Tests the application_execution tagging rule."""
event = events.EventObject()
Expand Down
70 changes: 70 additions & 0 deletions tests/data/test_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@

from plaso.analysis import mediator as analysis_mediator
from plaso.analysis import tagging
from plaso.containers import events
from plaso.containers import sessions
from plaso.engine import knowledge_base
from plaso.lib import definitions
from plaso.lib import timelib
from plaso.storage.fake import writer as fake_writer

from tests import test_lib as shared_test_lib
Expand All @@ -17,6 +20,8 @@ class TaggingFileTestCase(shared_test_lib.BaseTestCase):

_TAG_FILE = None

_TEST_TIMESTAMP = timelib.Timestamp.CopyFromString('2020-04-04 14:56:39')

def _CheckLabels(self, storage_writer, expected_labels):
"""Checks the labels of tagged events.
Expand All @@ -34,6 +39,71 @@ def _CheckLabels(self, storage_writer, expected_labels):
self.assertEqual(len(labels), len(expected_labels))
self.assertEqual(sorted(labels), sorted(expected_labels))

def _CheckTaggingRule(
self, event_data_class, attribute_values_per_name, expected_rule_names):
"""Tests a tagging rule.
Args:
event_data_class (type): class of the event data object to use in tests.
attribute_values_per_name (dict[str, list[str]): values of the event data
attribute values per name, to use for testing events that match the
tagging rule.
expected_rule_names (list[str]): expected rule names.
"""
event = events.EventObject()
event.timestamp = self._TEST_TIMESTAMP
event.timestamp_desc = definitions.TIME_DESCRIPTION_UNKNOWN

if not attribute_values_per_name:
event_data = event_data_class()
storage_writer = self._TagEvent(event, event_data)

self.assertEqual(
storage_writer.number_of_event_tags, len(expected_rule_names))
self._CheckLabels(storage_writer, expected_rule_names)

else:
maximum_number_of_attribute_values = max([
len(attribute_values)
for attribute_values in attribute_values_per_name.values()])

# Test if variations defined by the attribute_values_per_name match
# the tagging rule.
for test_index in range(maximum_number_of_attribute_values):
# Create the test event data and set the attributes to one of
# the test values.
event_data = event_data_class()
for attribute_name, attribute_values in (
attribute_values_per_name.items()):
attribute_value_index = min(test_index, len(attribute_values))
attribute_value = attribute_values[attribute_value_index]
setattr(event_data, attribute_name, attribute_value)

storage_writer = self._TagEvent(event, event_data)

self.assertEqual(
storage_writer.number_of_event_tags, len(expected_rule_names))
self._CheckLabels(storage_writer, expected_rule_names)

# Test if bogus variations on attribute_values_per_name do not match
# the tagging rule.
for test_attribute_name in attribute_values_per_name.keys():
# Create the test event data and set the attributes to one of
# the test values.
event_data = event_data_class()
for attribute_name, attribute_values in (
attribute_values_per_name.items()):
if attribute_name == test_attribute_name:
attribute_value = 'BOGUS'
else:
attribute_value = attribute_values[0]
setattr(event_data, attribute_name, attribute_value)

storage_writer = self._TagEvent(event, event_data)

self.assertEqual(storage_writer.number_of_event_tags, 0)
self._CheckLabels(storage_writer, [])

def _TagEvent(self, event, event_data):
"""Tags an event.
Expand Down

0 comments on commit 8f945d7

Please sign in to comment.