Skip to content

Commit

Permalink
Changed storage to use session configuration log2timeline#109
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz committed May 21, 2020
1 parent 61422d7 commit acbf11e
Show file tree
Hide file tree
Showing 18 changed files with 267 additions and 194 deletions.
7 changes: 4 additions & 3 deletions plaso/cli/log2timeline_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,14 +434,15 @@ def ExtractEventsFromSources(self):
logger.debug('Starting extraction in single process mode.')

processing_status = extraction_engine.ProcessSources(
self._source_path_specs, storage_writer, self._resolver_context,
configuration, status_update_callback=status_update_callback)
session, self._source_path_specs, storage_writer,
self._resolver_context, configuration,
status_update_callback=status_update_callback)

else:
logger.debug('Starting extraction in multi process mode.')

processing_status = extraction_engine.ProcessSources(
session.identifier, self._source_path_specs, storage_writer,
session, self._source_path_specs, storage_writer,
configuration, enable_sigsegv_handler=self._enable_sigsegv_handler,
number_of_worker_processes=self._number_of_extraction_workers,
status_update_callback=status_update_callback,
Expand Down
27 changes: 8 additions & 19 deletions plaso/cli/pinfo_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from plaso.cli import tools
from plaso.cli import views
from plaso.cli.helpers import manager as helpers_manager
from plaso.engine import knowledge_base
from plaso.lib import definitions
from plaso.lib import errors
from plaso.lib import loggers
Expand Down Expand Up @@ -436,27 +435,16 @@ def _PrintParsersCounter(self, parsers_counter, session_identifier=None):

table_view.Write(self._output_writer)

def _PrintPreprocessingInformation(
self, storage_reader, session_identifier=None):
"""Prints the details of the preprocessing information.
def _PrintSourceConfiguration(
self, source_configuration, session_identifier=None):
"""Prints the details of a source configuration.
Args:
storage_reader (StorageReader): storage reader.
source_configuration (SourceConfiguration): source configuration.
session_identifier (Optional[str]): session identifier, formatted as
a UUID.
"""
knowledge_base_object = knowledge_base.KnowledgeBase()

storage_reader.ReadSystemConfiguration(knowledge_base_object)

lookup_identifier = session_identifier
if lookup_identifier:
# The knowledge base requires the session identifier to be formatted in
# hexadecimal representation.
lookup_identifier = lookup_identifier.replace('-', '')

system_configuration = knowledge_base_object.GetSystemConfigurationArtifact(
session_identifier=lookup_identifier)
system_configuration = source_configuration.system_configuration
if not system_configuration:
return

Expand Down Expand Up @@ -574,8 +562,9 @@ def _PrintSessionsDetails(self, storage_reader):
table_view.Write(self._output_writer)

if self._verbose:
self._PrintPreprocessingInformation(
storage_reader, session_identifier=session_identifier)
for source_configuration in session.source_configurations:
self._PrintSourceConfiguration(
source_configuration, session_identifier=session_identifier)

self._PrintParsersCounter(
session.parsers_counter, session_identifier=session_identifier)
Expand Down
9 changes: 9 additions & 0 deletions plaso/cli/psort_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,15 @@ def ProcessStorage(self):
'Format of storage file: {0:s} not supported'.format(
self._storage_file_path))

for session in storage_reader.GetSessions():
if not session.source_configurations:
storage_reader.ReadSystemConfiguration(self._knowledge_base)
else:
for source_configuration in session.source_configurations:
self._knowledge_base.ReadSystemConfigurationArtifact(
source_configuration.system_configuration,
session_identifier=session.identifier)

self._number_of_analysis_reports = (
storage_reader.GetNumberOfAnalysisReports())
storage_reader.Close()
Expand Down
8 changes: 4 additions & 4 deletions plaso/cli/psteal_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,15 +327,15 @@ def ExtractEventsFromSources(self):
logger.debug('Starting extraction in single process mode.')

processing_status = extraction_engine.ProcessSources(
self._source_path_specs, storage_writer, self._resolver_context,
configuration, status_update_callback=status_update_callback)
session, self._source_path_specs, storage_writer,
self._resolver_context, configuration,
status_update_callback=status_update_callback)

else:
logger.debug('Starting extraction in multi process mode.')

processing_status = extraction_engine.ProcessSources(
session.identifier, self._source_path_specs, storage_writer,
configuration,
session, self._source_path_specs, storage_writer, configuration,
enable_sigsegv_handler=self._enable_sigsegv_handler,
number_of_worker_processes=self._number_of_extraction_workers,
status_update_callback=status_update_callback)
Expand Down
21 changes: 20 additions & 1 deletion plaso/engine/knowledge_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,26 @@ def GetHostname(self, session_identifier=None):

return hostname_artifact.name or ''

def GetSystemConfigurationArtifact(self, session_identifier=None):
def GetSourceConfigurationArtifacts(self, session_identifier=None):
"""Retrieves the knowledge base as a source configuration artifacts.
Args:
session_identifier (Optional[str])): session identifier, where
None represents the active session.
Returns:
list[SourceConfigurationArtifact]: source configuration artifacts.
"""
source_configuration = artifacts.SourceConfigurationArtifact()

# TODO: set path_spec
source_configuration.system_configuration = (
self._GetSystemConfigurationArtifact(
session_identifier=session_identifier))

return [source_configuration]

def _GetSystemConfigurationArtifact(self, session_identifier=None):
"""Retrieves the knowledge base as a system configuration artifact.
Args:
Expand Down
9 changes: 7 additions & 2 deletions plaso/engine/single_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,12 @@ def _UpdateStatus(
self._last_status_update_timestamp = current_timestamp

def ProcessSources(
self, source_path_specs, storage_writer, resolver_context,
self, session, source_path_specs, storage_writer, resolver_context,
processing_configuration, status_update_callback=None):
"""Processes the sources.
Args:
session (Session): session in which the sources are processed.
source_path_specs (list[dfvfs.PathSpec]): path specifications of
the sources to process.
storage_writer (StorageWriter): storage writer for a session storage.
Expand Down Expand Up @@ -272,8 +273,12 @@ def ProcessSources(
storage_writer.Open()
storage_writer.WriteSessionStart()

# TODO: decouple session and storage writer?
session.source_configurations = (
self.knowledge_base.GetSourceConfigurationArtifacts())

try:
storage_writer.WritePreprocessingInformation(self.knowledge_base)
storage_writer.WriteSessionConfiguration()

self._ProcessSources(
source_path_specs, extraction_worker, parser_mediator, storage_writer)
Expand Down
10 changes: 6 additions & 4 deletions plaso/multi_processing/psort.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
import os
import time

from plaso.containers import tasks
from plaso.engine import plaso_queue
from plaso.engine import processing_status
from plaso.engine import zeromq_queue
from plaso.containers import tasks
from plaso.lib import bufferlib
from plaso.lib import definitions
from plaso.multi_processing import analysis_process
Expand Down Expand Up @@ -901,10 +901,11 @@ def AnalyzeEvents(
# the ZIP storage file will remain locked as long as the worker processes
# are alive.
storage_writer.Open()
storage_writer.ReadSystemConfiguration(knowledge_base_object)
storage_writer.WriteSessionStart()

try:
storage_writer.WriteSessionConfiguration()

self._AnalyzeEvents(
storage_writer, analysis_plugins, event_filter=event_filter)

Expand Down Expand Up @@ -980,12 +981,13 @@ def ExportEvents(
used. The 'time slicer' will provide a context of events around
an event of interest.
"""
# TODO: fix
_ = knowledge_base_object

self._events_status = processing_status.EventsStatus()
self._processing_configuration = processing_configuration
self._status_update_callback = status_update_callback

storage_reader.ReadSystemConfiguration(knowledge_base_object)

total_number_of_events = 0
for session in storage_reader.GetSessions():
total_number_of_events += session.parsers_counter['total']
Expand Down
12 changes: 8 additions & 4 deletions plaso/multi_processing/task_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -697,14 +697,14 @@ def _UpdateProcessingStatus(self, pid, process_status, used_memory):
process.name, task_identifier))

def ProcessSources(
self, session_identifier, source_path_specs, storage_writer,
self, session, source_path_specs, storage_writer,
processing_configuration, enable_sigsegv_handler=False,
number_of_worker_processes=0, status_update_callback=None,
worker_memory_limit=None):
"""Processes the sources and extract events.
Args:
session_identifier (str): identifier of the session.
session (Session): session in which the sources are processed.
source_path_specs (list[dfvfs.PathSpec]): path specifications of
the sources to process.
storage_writer (StorageWriter): storage writer for a session storage.
Expand Down Expand Up @@ -760,7 +760,7 @@ def ProcessSources(

self._debug_output = processing_configuration.debug_output
self._log_filename = processing_configuration.log_filename
self._session_identifier = session_identifier
self._session_identifier = session.identifier
self._status_update_callback = status_update_callback
self._storage_writer = storage_writer

Expand Down Expand Up @@ -799,6 +799,10 @@ def ProcessSources(

self._StartStatusUpdateThread()

# TODO: decouple session and storage writer?
session.source_configurations = (
self.knowledge_base.GetSourceConfigurationArtifacts())

try:
# Open the storage file after creating the worker processes otherwise
# the ZIP storage file will remain locked as long as the worker processes
Expand All @@ -807,7 +811,7 @@ def ProcessSources(
storage_writer.WriteSessionStart()

try:
storage_writer.WritePreprocessingInformation(self.knowledge_base)
storage_writer.WriteSessionConfiguration()

self._ProcessSources(source_path_specs, storage_writer)

Expand Down
64 changes: 20 additions & 44 deletions plaso/storage/fake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class FakeStorageWriter(interface.StorageWriter):
analysis_reports (list[AnalysisReport]): analysis reports.
session_completion (SessionCompletion): session completion attribute
container.
session_configuration (SessionConfiguration): session configuration
attribute container.
session_start (SessionStart): session start attribute container.
task_completion (TaskCompletion): task completion attribute container.
task_start (TaskStart): task start attribute container.
Expand All @@ -43,6 +45,7 @@ def __init__(
self._task_storage_writers = {}
self.analysis_reports = []
self.session_completion = None
self.session_configuration = None
self.session_start = None
self.task_completion = None
self.task_start = None
Expand Down Expand Up @@ -426,29 +429,6 @@ def PrepareMergeTaskStorage(self, task):
raise IOError('Storage writer for task: {0:s} does not exist.'.format(
task.identifier))

# pylint: disable=unused-argument
def ReadSystemConfiguration(self, knowledge_base):
"""Reads system configuration information.
The system configuration contains information about various system specific
configuration data, for example the user accounts.
Args:
knowledge_base (KnowledgeBase): is used to store the system configuration.
Raises:
IOError: if the storage type does not support writing preprocessing
information or when the storage writer is closed.
OSError: if the storage type does not support writing preprocessing
information or when the storage writer is closed.
"""
self._RaiseIfNotWritable()

if self._storage_type != definitions.STORAGE_TYPE_SESSION:
raise IOError('Preprocessing information not supported by storage type.')

# TODO: implement.

def RemoveProcessedTaskStorage(self, task):
"""Removes a processed task storage.
Expand Down Expand Up @@ -481,27 +461,6 @@ def SetStorageProfiler(self, storage_profiler):
"""
return

# pylint: disable=unused-argument
def WritePreprocessingInformation(self, knowledge_base):
"""Writes preprocessing information.
Args:
knowledge_base (KnowledgeBase): used to store the preprocessing
information.
Raises:
IOError: if the storage type does not support writing preprocessing
information or when the storage writer is closed.
OSError: if the storage type does not support writing preprocessing
information or when the storage writer is closed.
"""
self._RaiseIfNotWritable()

if self._storage_type != definitions.STORAGE_TYPE_SESSION:
raise IOError('Preprocessing information not supported by storage type.')

# TODO: implement.

def WriteSessionCompletion(self, aborted=False):
"""Writes session completion information.
Expand All @@ -522,6 +481,23 @@ def WriteSessionCompletion(self, aborted=False):
self._session.aborted = aborted
self.session_completion = self._session.CreateSessionCompletion()

def WriteSessionConfiguration(self):
"""Writes session configuration information.
Raises:
IOError: if the storage type does not support writing session
configuration information or when the storage writer is closed.
OSError: if the storage type does not support writing session
configuration information or when the storage writer is closed.
"""
self._RaiseIfNotWritable()

if self._storage_type != definitions.STORAGE_TYPE_SESSION:
raise IOError('Session configuration not supported by storage type.')

self.session_configuration = self._session.CreateSessionConfiguration()

def WriteSessionStart(self):
"""Writes session start information.
Expand Down
Loading

0 comments on commit acbf11e

Please sign in to comment.