diff --git a/plaso/cli/extraction_tool.py b/plaso/cli/extraction_tool.py index 84aa402a60..b13b294e53 100644 --- a/plaso/cli/extraction_tool.py +++ b/plaso/cli/extraction_tool.py @@ -159,11 +159,16 @@ def _CreateExtractionEngine(self, single_process_mode): Returns: BaseEngine: extraction engine. """ + status_update_callback = ( + self._status_view.GetExtractionStatusUpdateCallback()) + if single_process_mode: - extraction_engine = single_extraction_engine.SingleProcessEngine() + extraction_engine = single_extraction_engine.SingleProcessEngine( + status_update_callback=status_update_callback) else: extraction_engine = multi_extraction_engine.ExtractionMultiProcessEngine( number_of_worker_processes=self._number_of_extraction_workers, + status_update_callback=status_update_callback, worker_memory_limit=self._worker_memory_limit, worker_timeout=self._worker_timeout) @@ -542,17 +547,13 @@ def _ProcessSource(self, session, storage_writer): for system_configuration in system_configurations: storage_writer.AddAttributeContainer(system_configuration) - status_update_callback = ( - self._status_view.GetExtractionStatusUpdateCallback()) - if single_process_mode: logger.debug('Starting extraction in single process mode.') processing_status = extraction_engine.ProcessSource( storage_writer, self._resolver_context, configuration, system_configurations, self._file_system_path_specs, - force_parser=force_parser, - status_update_callback=status_update_callback) + force_parser=force_parser) else: logger.debug('Starting extraction in multi process mode.') @@ -564,7 +565,6 @@ def _ProcessSource(self, session, storage_writer): storage_writer, session.identifier, configuration, system_configurations, self._file_system_path_specs, enable_sigsegv_handler=self._enable_sigsegv_handler, - status_update_callback=status_update_callback, storage_file_path=self._storage_file_path) finally: diff --git a/plaso/multi_process/engine.py b/plaso/multi_process/engine.py index 7d174eec6b..b7e4afd2c4 100644 --- a/plaso/multi_process/engine.py +++ b/plaso/multi_process/engine.py @@ -53,7 +53,6 @@ def __init__(self): self._rpc_clients_per_pid = {} self._rpc_errors_per_pid = {} self._status_update_active = False - self._status_update_callback = None self._status_update_thread = None self._storage_writer = None self._worker_memory_limit = definitions.DEFAULT_WORKER_MEMORY_LIMIT diff --git a/plaso/multi_process/extraction_engine.py b/plaso/multi_process/extraction_engine.py index 4b6551b086..e74a254925 100644 --- a/plaso/multi_process/extraction_engine.py +++ b/plaso/multi_process/extraction_engine.py @@ -110,13 +110,16 @@ class ExtractionMultiProcessEngine(task_engine.TaskMultiProcessEngine): def __init__( self, maximum_number_of_tasks=None, number_of_worker_processes=0, - worker_memory_limit=None, worker_timeout=None): + status_update_callback=None, worker_memory_limit=None, + worker_timeout=None): """Initializes an engine. Args: maximum_number_of_tasks (Optional[int]): maximum number of concurrent tasks, where 0 represents no limit. number_of_worker_processes (Optional[int]): number of worker processes. + status_update_callback (Optional[function]): callback function for status + updates. worker_memory_limit (Optional[int]): maximum amount of memory a worker is allowed to consume, where None represents the default memory limit and 0 represents no limit. @@ -176,6 +179,7 @@ def __init__( self._path_spec_extractor = extractors.PathSpecExtractor() self._resolver_context = context.Context() self._status = definitions.STATUS_INDICATOR_IDLE + self._status_update_callback = status_update_callback self._task_manager = task_manager.TaskManager() self._task_merge_helper = None self._task_merge_helper_on_hold = None @@ -945,8 +949,7 @@ def _UpdateStatus(self): def ProcessSource( self, storage_writer, session_identifier, processing_configuration, system_configurations, file_system_path_specs, - enable_sigsegv_handler=False, status_update_callback=None, - storage_file_path=None): + enable_sigsegv_handler=False, storage_file_path=None): """Processes file systems within a source. Args: @@ -961,8 +964,6 @@ def ProcessSource( the source file systems to process. enable_sigsegv_handler (Optional[bool]): True if the SIGSEGV handler should be enabled. - status_update_callback (Optional[function]): callback function for status - updates. storage_file_path (Optional[str]): path to the session storage file. Returns: @@ -998,7 +999,6 @@ def ProcessSource( self._debug_output = processing_configuration.debug_output self._log_filename = processing_configuration.log_filename - self._status_update_callback = status_update_callback self._storage_file_path = storage_file_path self._storage_writer = storage_writer self._task_storage_format = processing_configuration.task_storage_format @@ -1096,7 +1096,6 @@ def ProcessSource( self._enable_sigsegv_handler = None self._event_data_timeliner = None self._processing_configuration = None - self._status_update_callback = None self._storage_file_path = None self._storage_writer = None self._system_configurations = None diff --git a/plaso/single_process/extraction_engine.py b/plaso/single_process/extraction_engine.py index 521a28fc7b..0876d8f3d7 100644 --- a/plaso/single_process/extraction_engine.py +++ b/plaso/single_process/extraction_engine.py @@ -32,8 +32,13 @@ class SingleProcessEngine(engine.BaseEngine): # Maximum number of dfVFS file system objects to cache. _FILE_SYSTEM_CACHE_SIZE = 3 - def __init__(self): - """Initializes a single process extraction engine.""" + def __init__(self, status_update_callback=None): + """Initializes a single process extraction engine. + + Args: + status_update_callback (Optional[function]): callback function for status + updates. + """ super(SingleProcessEngine, self).__init__() self._current_display_name = '' self._event_data_timeliner = None @@ -51,7 +56,7 @@ def __init__(self): self._resolver_context = None self._status = definitions.STATUS_INDICATOR_IDLE self._status_update_active = False - self._status_update_callback = None + self._status_update_callback = status_update_callback self._status_update_thread = None self._storage_writer = None @@ -354,8 +359,7 @@ def _CreateParserMediator( def ProcessSource( self, storage_writer, resolver_context, processing_configuration, - system_configurations, file_system_path_specs, force_parser=False, - status_update_callback=None): + system_configurations, file_system_path_specs, force_parser=False): """Processes file systems within a source. Args: @@ -369,8 +373,6 @@ def ProcessSource( the source file systems to process. force_parser (Optional[bool]): True if a specified parser should be forced to be used to extract events. - status_update_callback (Optional[function]): callback function for status - updates. Returns: ProcessingStatus: processing status. @@ -411,7 +413,6 @@ def ProcessSource( self._parser_mediator = parser_mediator self._processing_configuration = processing_configuration self._resolver_context = resolver_context - self._status_update_callback = status_update_callback self._storage_writer = storage_writer logger.debug('Processing started.') @@ -501,7 +502,6 @@ def ProcessSource( self._parser_mediator = None self._processing_configuration = None self._resolver_context = None - self._status_update_callback = None self._storage_writer = None return self._processing_status