Skip to content

Commit

Permalink
Code review: 251910043: Added BDE volume support #109
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz committed Dec 31, 2015
1 parent ce62762 commit 8206067
Show file tree
Hide file tree
Showing 7 changed files with 288 additions and 33 deletions.
2 changes: 1 addition & 1 deletion config/dpkg/changelog
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ python-plaso (1.3.0-1) unstable; urgency=low

* Auto-generated

-- Log2Timeline <[email protected]> Wed, 01 Jul 2015 19:35:02 +0200
-- Log2Timeline <[email protected]> Wed, 01 Jul 2015 19:41:07 +0200
243 changes: 217 additions & 26 deletions plaso/cli/storage_media_tool.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# -*- coding: utf-8 -*-
"""The storage media CLI tool."""

import logging
import getpass
import os
import sys

from dfvfs.credentials import manager as credentials_manager
from dfvfs.helpers import source_scanner
from dfvfs.lib import definitions as dfvfs_definitions
from dfvfs.path import factory as path_spec_factory
Expand All @@ -20,8 +21,14 @@ class StorageMediaTool(tools.CLITool):
"""Class that implements a storage media CLI tool."""

_DEFAULT_BYTES_PER_SECTOR = 512

_SOURCE_OPTION = u'source'

_BINARY_DATA_CREDENTIAL_TYPES = [u'key_data']

_SUPPORTED_CREDENTIAL_TYPES = [
u'key_data', u'password', u'recovery_password', u'startup_key']

# For context see: http://en.wikipedia.org/wiki/Byte
_UNITS_1000 = [u'B', u'kB', u'MB', u'GB', u'TB', u'EB', u'ZB', u'YB']
_UNITS_1024 = [u'B', u'KiB', u'MiB', u'GiB', u'TiB', u'EiB', u'ZiB', u'YiB']
Expand All @@ -39,6 +46,7 @@ def __init__(self, input_reader=None, output_writer=None):
"""
super(StorageMediaTool, self).__init__(
input_reader=input_reader, output_writer=output_writer)
self._credentials = []
self._filter_file = None
# TODO: refactor to partitions.
self._partition_string = None
Expand Down Expand Up @@ -123,7 +131,7 @@ def _GetTSKPartitionIdentifiers(
volume_identifiers = self._source_scanner.GetVolumeIdentifiers(
volume_system)
if not volume_identifiers:
logging.info(u'No partitions found.')
self._output_writer.Write(u'[WARNING] No partitions found.\n')
return

if partition_string == u'all':
Expand All @@ -146,17 +154,18 @@ def _GetTSKPartitionIdentifiers(
if volume:
return [u'p{0:d}'.format(partition_number)]

logging.warning(u'No such partition: {0:d}.'.format(partition_number))
self._output_writer.Write(
u'[WARNING] No such partition: {0:d}.\n'.format(partition_number))

if partition_offset is not None:
for volume in volume_system.volumes:
volume_extent = volume.extents[0]
if volume_extent.offset == partition_offset:
return [volume.identifier]

logging.warning(
u'No such partition with offset: {0:d} (0x{0:08x}).'.format(
partition_offset))
self._output_writer.Write((
u'[WARNING] No such partition with offset: {0:d} '
u'(0x{0:08x}).\n').format(partition_offset))

if len(volume_identifiers) == 1:
return volume_identifiers
Expand Down Expand Up @@ -207,6 +216,40 @@ def _GetVSSStoreIdentifiers(self, scan_node, vss_stores=None):

return selected_store_identifiers

def _ParseCredentialOptions(self, options):
"""Parses the credential options.
Args:
options: the command line arguments (instance of argparse.Namespace).
Raises:
BadConfigOption: if the options are invalid.
"""
credentials = getattr(options, u'credentials', [])
if not isinstance(credentials, list):
raise errors.BadConfigOption(u'Unsupported credentials value.')

for credential_string in credentials:
credential_type, _, credential_data = credential_string.partition(u':')
if not credential_type or not credential_data:
raise errors.BadConfigOption(
u'Badly formatted credential: {0:s}.'.format(credential_string))

if credential_type not in self._SUPPORTED_CREDENTIAL_TYPES:
raise errors.BadConfigOption(
u'Unsupported credential type for: {0:s}.'.format(
credential_string))

if credential_type in self._BINARY_DATA_CREDENTIAL_TYPES:
try:
credential_data = credential_data.decode(u'hex')
except TypeError:
raise errors.BadConfigOption(
u'Unsupported credential data for: {0:s}.'.format(
credential_string))

self._credentials.append((credential_type, credential_data))

def _ParseFilterOptions(self, options):
"""Parses the filter options.
Expand Down Expand Up @@ -358,6 +401,76 @@ def _ParseVSSStoresString(self, vss_stores):

return sorted(stores)

def _PromptUserForEncryptedVolumeCredential(
self, scan_context, locked_scan_node, credentials):
"""Prompts the user to provide a credential for an encrypted volume.
Args:
scan_context: the source scanner context (instance of
SourceScannerContext).
locked_scan_node: the locked scan node (instance of SourceScanNode).
credentials: the credentials supported by the locked scan node (instance
of dfvfs.Credentials).
Returns:
A boolean value indicating the volume was unlocked.
"""
# TODO: print volume description.
if locked_scan_node.type_indicator == dfvfs_definitions.TYPE_INDICATOR_BDE:
self._output_writer.Write(u'Found a BitLocker encrypted volume.\n')
else:
self._output_writer.Write(u'Found an encrypted volume.\n')

credentials_list = list(credentials.CREDENTIALS)
credentials_list.append(u'skip')

self._output_writer.Write(u'Supported credentials:\n')
self._output_writer.Write(u'\n')
for index, name in enumerate(credentials_list):
self._output_writer.Write(u' {0:d}. {1:s}\n'.format(index, name))
self._output_writer.Write(u'\nNote that you can abort with Ctrl^C.\n\n')

result = False
while not result:
self._output_writer.Write(u'Select a credential to unlock the volume: ')
# TODO: add an input reader.
input_line = self._input_reader.Read()
input_line = input_line.strip()

if input_line in credentials_list:
credential_type = input_line
else:
try:
credential_type = int(input_line, 10)
credential_type = credentials_list[credential_type]
except (IndexError, ValueError):
self._output_writer.Write(
u'Unsupported credential: {0:s}\n'.format(input_line))
continue

if credential_type == u'skip':
break

credential_data = getpass.getpass(u'Enter credential data: ')
self._output_writer.Write(u'\n')

if credential_type in self._BINARY_DATA_CREDENTIAL_TYPES:
try:
credential_data = credential_data.decode(u'hex')
except TypeError:
self._output_writer.Write(u'Unsupported credential data.\n')
continue

result = self._source_scanner.Unlock(
scan_context, locked_scan_node.path_spec, credential_type,
credential_data)

if not result:
self._output_writer.Write(u'Unable to unlock volume.\n')
self._output_writer.Write(u'\n')

return result

def _PromptUserForPartitionIdentifier(
self, volume_system, volume_identifiers):
"""Prompts the user to provide a partition identifier.
Expand Down Expand Up @@ -522,10 +635,12 @@ def _PromptUserForVSSStoreIdentifiers(

return selected_vss_stores

def _ScanVolume(self, volume_scan_node):
def _ScanVolume(self, scan_context, volume_scan_node):
"""Scans the volume scan node for volume and file systems.
Args:
scan_context: the source scanner context (instance of
SourceScannerContext).
volume_scan_node: the volume scan node (instance of dfvfs.ScanNode).
Raises:
Expand All @@ -536,18 +651,20 @@ def _ScanVolume(self, volume_scan_node):
raise errors.SourceScannerError(u'Invalid or missing volume scan node.')

if len(volume_scan_node.sub_nodes) == 0:
self._ScanVolumeScanNode(volume_scan_node)
self._ScanVolumeScanNode(scan_context, volume_scan_node)

else:
# Some volumes contain other volume or file systems e.g. BitLocker ToGo
# has an encrypted and unencrypted volume.
for sub_scan_node in volume_scan_node.sub_nodes:
self._ScanVolumeScanNode(sub_scan_node)
self._ScanVolumeScanNode(scan_context, sub_scan_node)

def _ScanVolumeScanNode(self, volume_scan_node):
def _ScanVolumeScanNode(self, scan_context, volume_scan_node):
"""Scans an individual volume scan node for volume and file systems.
Args:
scan_context: the source scanner context (instance of
SourceScannerContext).
volume_scan_node: the volume scan node (instance of dfvfs.ScanNode).
Raises:
Expand All @@ -566,27 +683,100 @@ def _ScanVolumeScanNode(self, volume_scan_node):
# a credential to unlock the volume.
if scan_node.type_indicator in (
dfvfs_definitions.ENCRYPTED_VOLUME_TYPE_INDICATORS):
# TODO: ask for credentials to unlock volume.
logging.warning(u'Encrypted volume not yet supported.')
self._ScanVolumeScanNodeEncrypted(scan_context, scan_node)

elif scan_node.type_indicator == dfvfs_definitions.TYPE_INDICATOR_VSHADOW:
if self._process_vss:
vss_store_identifiers = self._GetVSSStoreIdentifiers(
scan_node, vss_stores=self._vss_stores)

self._vss_stores = vss_store_identifiers
for vss_store_identifier in vss_store_identifiers:
location = u'/vss{0:d}'.format(vss_store_identifier)
sub_scan_node = scan_node.GetSubNodeByLocation(location)
path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_TSK, location=u'/',
parent=sub_scan_node.path_spec)
self._source_path_specs.append(path_spec)
self._ScanVolumeScanNodeVSS(scan_context, scan_node)

elif scan_node.type_indicator in (
dfvfs_definitions.FILE_SYSTEM_TYPE_INDICATORS):
self._source_path_specs.append(scan_node.path_spec)

def _ScanVolumeScanNodeEncrypted(self, scan_context, volume_scan_node):
"""Scans an encrypted volume scan node for volume and file systems.
Args:
scan_context: the source scanner context (instance of
SourceScannerContext).
volume_scan_node: the volume scan node (instance of dfvfs.ScanNode).
"""
result = not scan_context.IsLockedScanNode(volume_scan_node.path_spec)
if not result:
credentials = credentials_manager.CredentialsManager.GetCredentials(
volume_scan_node.path_spec)

result = False
for credential_type, credential_data in self._credentials:
if credential_type not in credentials.CREDENTIALS:
continue

result = self._source_scanner.Unlock(
scan_context, volume_scan_node.path_spec, credential_type,
credential_data)
if result:
break

if self._credentials and not result:
self._output_writer.Write(
u'[WARNING] Unable to unlock encrypted volume using the provided '
u'credentials.\n\n')

if not result:
result = self._PromptUserForEncryptedVolumeCredential(
scan_context, volume_scan_node, credentials)

if result:
# TODO: instead of hard coding TSK scan for the file system.
path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_TSK, location=u'/',
parent=volume_scan_node.path_spec)
self._source_path_specs.append(path_spec)

def _ScanVolumeScanNodeVSS(self, unused_scan_context, volume_scan_node):
"""Scans a VSS volume scan node for volume and file systems.
Args:
scan_context: the source scanner context (instance of
SourceScannerContext).
volume_scan_node: the volume scan node (instance of dfvfs.ScanNode).
"""
if not self._process_vss:
return

vss_store_identifiers = self._GetVSSStoreIdentifiers(
volume_scan_node, vss_stores=self._vss_stores)

self._vss_stores = vss_store_identifiers
for vss_store_identifier in vss_store_identifiers:
location = u'/vss{0:d}'.format(vss_store_identifier)
sub_scan_node = volume_scan_node.GetSubNodeByLocation(location)

# TODO: instead of hard coding TSK scan for the file system.
path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_TSK, location=u'/',
parent=sub_scan_node.path_spec)
self._source_path_specs.append(path_spec)

def AddCredentialOptions(self, argument_group):
"""Adds the credential options to the argument group.
The credential options are use to unlock encrypted volumes.
Args:
argument_group: The argparse argument group (instance of
argparse._ArgumentGroup).
"""
argument_group.add_argument(
u'--credential', action=u'append', default=[], type=unicode,
dest=u'credentials', metavar=u'TYPE:DATA', help=(
u'Define a credentials that can be used to unlock encrypted '
u'volumes e.g. BitLocker. The credential is defined as type:data '
u'e.g. "password:BDE-test". Supported credential types are: '
u'{0:s}. Binary key data is expected to be passed in BASE-16 '
u'encoding (hexadecimal). WARNING credentials passed via command '
u'line arguments can end up in logs, so use this option with '
u'care.').format(u', '.join(self._SUPPORTED_CREDENTIAL_TYPES)))

def AddFilterOptions(self, argument_group):
"""Adds the filter options to the argument group.
Expand Down Expand Up @@ -674,6 +864,7 @@ def ParseOptions(self, options):
super(StorageMediaTool, self).ParseOptions(options)
self._ParseStorageMediaImageOptions(options)
self._ParseVSSProcessingOptions(options)
self._ParseCredentialOptions(options)

self._source_path = getattr(options, self._SOURCE_OPTION, None)
if not self._source_path:
Expand Down Expand Up @@ -745,13 +936,13 @@ def ScanSource(self, front_end):
partition_offset=self._partition_offset)

if not partition_identifiers:
self._ScanVolume(scan_node)
self._ScanVolume(scan_context, scan_node)

else:
for partition_identifier in partition_identifiers:
location = u'/{0:s}'.format(partition_identifier)
sub_scan_node = scan_node.GetSubNodeByLocation(location)
self._ScanVolume(sub_scan_node)
self._ScanVolume(scan_context, sub_scan_node)

if not self._source_path_specs:
raise errors.SourceScannerError(
Expand Down
Binary file added test_data/bdetogo.raw
Binary file not shown.
Loading

0 comments on commit 8206067

Please sign in to comment.