Skip to content

Commit

Permalink
Prevent indexer from failing hard if something other than WARC file p…
Browse files Browse the repository at this point in the history
…assed. Refactored in anticipation of #129. Closes #83
  • Loading branch information
machawk1 committed Mar 13, 2017
1 parent bdc7813 commit 75cd0fd
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 78 deletions.
2 changes: 1 addition & 1 deletion ipwb/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.2017.03.13.1253'
__version__ = '0.2017.03.13.1602'
177 changes: 100 additions & 77 deletions ipwb/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@
from pywb.warc.archiveiterator import DefaultRecordParser
# from pywb.utils.canonicalize import canonicalize as surt
from requests.packages.urllib3.exceptions import NewConnectionError
from pywb.warc.recordloader import ArchiveLoadFailed
from ipfsapi.exceptions import ConnectionError
# from requests.exceptions import ConnectionError

# from warcio.archiveiterator import ArchiveIterator

import requests
import datetime

Expand Down Expand Up @@ -87,92 +91,29 @@ def indexFileAt(warcPaths, encryptionKey=None,
for warcPath in warcPaths:
verifyFileExists(warcPath)

textRecordParserOptions = {
'cdxj': True,
'include_all': False,
'surt_ordered': False}
cdxjLines = []

if encryptionKey is not None and len(encryptionKey) == 0:
encryptionKey = askUserForEncryptionKey()

encryptionAndCompressionSetting = {
'encryptTHENCompress': encryptTHENCompress,
'encryptionKey': encryptionKey,
'compressionLevel': compressionLevel
}

for warcPath in warcPaths:
warcFileFullPath = warcPath

with open(warcFileFullPath, 'rb') as warc:
iter = TextRecordParser(**textRecordParserOptions)

for entry in iter(warc):
# Only consider WARC resps records from reqs for web resources
''' TODO: Change conditional to return on non-HTTP responses
to reduce branch depth'''
if entry.record.rec_type != 'response' or \
entry.get('mime') in ('text/dns', 'text/whois'):
continue

hdrs = entry.record.status_headers
hstr = hdrs.protocol + ' ' + hdrs.statusline
for h in hdrs.headers:
hstr += "\n" + ': '.join(h)

statusCode = hdrs.statusline.split()[0]

if not entry.buffer:
return

entry.buffer.seek(0)
payload = entry.buffer.read()

httpHeaderIPFSHash = ''
payloadIPFSHash = ''
retryCount = 0

if encryptTHENCompress:
if encryptionKey is not None:
(hstr, payload) = encrypt(hstr, payload, encryptionKey)
if compressionLevel is not None:
hstr = zlib.compress(hstr, compressionLevel)
payload = zlib.compress(payload, compressionLevel)
else:
if compressionLevel is not None:
hstr = zlib.compress(hstr, compressionLevel)
payload = zlib.compress(payload, compressionLevel)
if encryptionKey is not None:
(hstr, payload) = encrypt(hstr, payload, encryptionKey)

# print('Adding {0} to IPFS'.format(entry.get('url')))
ipfsHashes = pushToIPFS(hstr, payload)

if ipfsHashes is None:
logError('Skipping ' + entry.get('url'))

continue

(httpHeaderIPFSHash, payloadIPFSHash) = ipfsHashes

uri = surt.surt(entry.get('url'),
path_strip_trailing_slash_unless_empty=False)
timestamp = entry.get('timestamp')
mime = entry.get('mime')

obj = {
'locator': 'urn:ipfs/{0}/{1}'.format(
httpHeaderIPFSHash, payloadIPFSHash),
'status_code': statusCode,
'mime_type': mime
}
if encryptionKey is not None:
obj['encryption_key'] = encryptionKey
obj['encryption_method'] = 'xor'
objJSON = json.dumps(obj)

cdxjLine = '{0} {1} {2}'.format(uri, timestamp, objJSON)
cdxjLines.append(cdxjLine) # + '\n'

# De-dupe
cdxjLines = list(set(cdxjLines))
try:
cdxjLines += getCDXJLinesFromFile(
warc, **encryptionAndCompressionSetting)
except ArchiveLoadFailed:
logError(warcPath + ' is not a valid WARC file.')

# Sort
# De-dupe and sort, needed for CDXJ adherence
cdxjLines = list(set(cdxjLines))
cdxjLines.sort()

# Prepend metadata
Expand All @@ -184,6 +125,89 @@ def indexFileAt(warcPaths, encryptionKey=None,
print('\n'.join(cdxjLines))


def getCDXJLinesFromFile(fh, **encCompOpts):
textRecordParserOptions = {
'cdxj': True,
'include_all': False,
'surt_ordered': False}
iter = TextRecordParser(**textRecordParserOptions)

cdxjLines = []
# Throws pywb.warc.recordloader.ArchiveLoadFailed if not a warc
for entry in iter(fh):
# Only consider WARC resps records from reqs for web resources
''' TODO: Change conditional to return on non-HTTP responses
to reduce branch depth'''
if entry.record.rec_type != 'response' or \
entry.get('mime') in ('text/dns', 'text/whois'):
continue

hdrs = entry.record.status_headers
hstr = hdrs.protocol + ' ' + hdrs.statusline
for h in hdrs.headers:
hstr += "\n" + ': '.join(h)

statusCode = hdrs.statusline.split()[0]

if not entry.buffer:
return

entry.buffer.seek(0)
payload = entry.buffer.read()

httpHeaderIPFSHash = ''
payloadIPFSHash = ''
retryCount = 0

if encCompOpts.get('encryptTHENCompress'):
if encCompOpts.get('encryptionKey') is not None:
(hstr, payload) = encrypt(hstr, payload,
encCompOpts.get('encryptionKey'))
if encCompOpts.get('compressionLevel') is not None:
hstr = zlib.compress(hstr, encCompOpts.get('compressionLevel'))
payload = zlib.compress(payload,
encCompOpts.get('compressionLevel'))
else:
if encCompOpts.get('compressionLevel') is not None:
hstr = zlib.compress(hstr,
encCompOpts.get('compressionLevel'))
payload = zlib.compress(payload,
encCompOpts.get('compressionLevel'))
if encCompOpts.get('encryptionKey') is not None:
(hstr, payload) = encrypt(hstr, payload,
encCompOpts.get('encryptionKey'))

# print('Adding {0} to IPFS'.format(entry.get('url')))
ipfsHashes = pushToIPFS(hstr, payload)

if ipfsHashes is None:
logError('Skipping ' + entry.get('url'))

continue

(httpHeaderIPFSHash, payloadIPFSHash) = ipfsHashes

uri = surt.surt(entry.get('url'),
path_strip_trailing_slash_unless_empty=False)
timestamp = entry.get('timestamp')
mime = entry.get('mime')

obj = {
'locator': 'urn:ipfs/{0}/{1}'.format(
httpHeaderIPFSHash, payloadIPFSHash),
'status_code': statusCode,
'mime_type': mime
}
if encCompOpts.get('encryptionKey') is not None:
obj['encryption_key'] = encCompOpts.get('encryptionKey')
obj['encryption_method'] = 'xor'
objJSON = json.dumps(obj)

cdxjLine = '{0} {1} {2}'.format(uri, timestamp, objJSON)
cdxjLines.append(cdxjLine) # + '\n'
return cdxjLines


def generateCDXJMetadata(cdxjLines=None):
metadata = ['!context ["http://oduwsdl.github.io/contexts/cdxj"]']
metaVals = {
Expand Down Expand Up @@ -258,7 +282,6 @@ def writeFile(filename, content):


class TextRecordParser(DefaultRecordParser):

def create_payload_buffer(self, entry):
return BytesIO()

Expand Down

0 comments on commit 75cd0fd

Please sign in to comment.