Skip to content

Commit

Permalink
kb12
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz committed Apr 16, 2023
1 parent a0f1116 commit 57b26c1
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 24 deletions.
14 changes: 7 additions & 7 deletions plaso/cli/extraction_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,16 @@ def _CreateExtractionEngine(self, single_process_mode):
Returns:
BaseEngine: extraction engine.
"""
status_update_callback = (
self._status_view.GetExtractionStatusUpdateCallback())

if single_process_mode:
extraction_engine = single_extraction_engine.SingleProcessEngine()
extraction_engine = single_extraction_engine.SingleProcessEngine(
status_update_callback=status_update_callback)
else:
extraction_engine = multi_extraction_engine.ExtractionMultiProcessEngine(
number_of_worker_processes=self._number_of_extraction_workers,
status_update_callback=status_update_callback,
worker_memory_limit=self._worker_memory_limit,
worker_timeout=self._worker_timeout)

Expand Down Expand Up @@ -542,17 +547,13 @@ def _ProcessSource(self, session, storage_writer):
for system_configuration in system_configurations:
storage_writer.AddAttributeContainer(system_configuration)

status_update_callback = (
self._status_view.GetExtractionStatusUpdateCallback())

if single_process_mode:
logger.debug('Starting extraction in single process mode.')

processing_status = extraction_engine.ProcessSource(
storage_writer, self._resolver_context, configuration,
system_configurations, self._file_system_path_specs,
force_parser=force_parser,
status_update_callback=status_update_callback)
force_parser=force_parser)

else:
logger.debug('Starting extraction in multi process mode.')
Expand All @@ -564,7 +565,6 @@ def _ProcessSource(self, session, storage_writer):
storage_writer, session.identifier, configuration,
system_configurations, self._file_system_path_specs,
enable_sigsegv_handler=self._enable_sigsegv_handler,
status_update_callback=status_update_callback,
storage_file_path=self._storage_file_path)

finally:
Expand Down
1 change: 0 additions & 1 deletion plaso/multi_process/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ def __init__(self):
self._rpc_clients_per_pid = {}
self._rpc_errors_per_pid = {}
self._status_update_active = False
self._status_update_callback = None
self._status_update_thread = None
self._storage_writer = None
self._worker_memory_limit = definitions.DEFAULT_WORKER_MEMORY_LIMIT
Expand Down
13 changes: 6 additions & 7 deletions plaso/multi_process/extraction_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,16 @@ class ExtractionMultiProcessEngine(task_engine.TaskMultiProcessEngine):

def __init__(
self, maximum_number_of_tasks=None, number_of_worker_processes=0,
worker_memory_limit=None, worker_timeout=None):
status_update_callback=None, worker_memory_limit=None,
worker_timeout=None):
"""Initializes an engine.
Args:
maximum_number_of_tasks (Optional[int]): maximum number of concurrent
tasks, where 0 represents no limit.
number_of_worker_processes (Optional[int]): number of worker processes.
status_update_callback (Optional[function]): callback function for status
updates.
worker_memory_limit (Optional[int]): maximum amount of memory a worker is
allowed to consume, where None represents the default memory limit
and 0 represents no limit.
Expand Down Expand Up @@ -176,6 +179,7 @@ def __init__(
self._path_spec_extractor = extractors.PathSpecExtractor()
self._resolver_context = context.Context()
self._status = definitions.STATUS_INDICATOR_IDLE
self._status_update_callback = status_update_callback
self._task_manager = task_manager.TaskManager()
self._task_merge_helper = None
self._task_merge_helper_on_hold = None
Expand Down Expand Up @@ -945,8 +949,7 @@ def _UpdateStatus(self):
def ProcessSource(
self, storage_writer, session_identifier, processing_configuration,
system_configurations, file_system_path_specs,
enable_sigsegv_handler=False, status_update_callback=None,
storage_file_path=None):
enable_sigsegv_handler=False, storage_file_path=None):
"""Processes file systems within a source.
Args:
Expand All @@ -961,8 +964,6 @@ def ProcessSource(
the source file systems to process.
enable_sigsegv_handler (Optional[bool]): True if the SIGSEGV handler
should be enabled.
status_update_callback (Optional[function]): callback function for status
updates.
storage_file_path (Optional[str]): path to the session storage file.
Returns:
Expand Down Expand Up @@ -998,7 +999,6 @@ def ProcessSource(

self._debug_output = processing_configuration.debug_output
self._log_filename = processing_configuration.log_filename
self._status_update_callback = status_update_callback
self._storage_file_path = storage_file_path
self._storage_writer = storage_writer
self._task_storage_format = processing_configuration.task_storage_format
Expand Down Expand Up @@ -1096,7 +1096,6 @@ def ProcessSource(
self._enable_sigsegv_handler = None
self._event_data_timeliner = None
self._processing_configuration = None
self._status_update_callback = None
self._storage_file_path = None
self._storage_writer = None
self._system_configurations = None
Expand Down
18 changes: 9 additions & 9 deletions plaso/single_process/extraction_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@ class SingleProcessEngine(engine.BaseEngine):
# Maximum number of dfVFS file system objects to cache.
_FILE_SYSTEM_CACHE_SIZE = 3

def __init__(self):
"""Initializes a single process extraction engine."""
def __init__(self, status_update_callback=None):
"""Initializes a single process extraction engine.
Args:
status_update_callback (Optional[function]): callback function for status
updates.
"""
super(SingleProcessEngine, self).__init__()
self._current_display_name = ''
self._event_data_timeliner = None
Expand All @@ -51,7 +56,7 @@ def __init__(self):
self._resolver_context = None
self._status = definitions.STATUS_INDICATOR_IDLE
self._status_update_active = False
self._status_update_callback = None
self._status_update_callback = status_update_callback
self._status_update_thread = None
self._storage_writer = None

Expand Down Expand Up @@ -354,8 +359,7 @@ def _CreateParserMediator(

def ProcessSource(
self, storage_writer, resolver_context, processing_configuration,
system_configurations, file_system_path_specs, force_parser=False,
status_update_callback=None):
system_configurations, file_system_path_specs, force_parser=False):
"""Processes file systems within a source.
Args:
Expand All @@ -369,8 +373,6 @@ def ProcessSource(
the source file systems to process.
force_parser (Optional[bool]): True if a specified parser should be forced
to be used to extract events.
status_update_callback (Optional[function]): callback function for status
updates.
Returns:
ProcessingStatus: processing status.
Expand Down Expand Up @@ -411,7 +413,6 @@ def ProcessSource(
self._parser_mediator = parser_mediator
self._processing_configuration = processing_configuration
self._resolver_context = resolver_context
self._status_update_callback = status_update_callback
self._storage_writer = storage_writer

logger.debug('Processing started.')
Expand Down Expand Up @@ -501,7 +502,6 @@ def ProcessSource(
self._parser_mediator = None
self._processing_configuration = None
self._resolver_context = None
self._status_update_callback = None
self._storage_writer = None

return self._processing_status

0 comments on commit 57b26c1

Please sign in to comment.