diff --git a/README.md b/README.md index cc5cbbd..5084760 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,13 @@ http://www.bit-cluster.com magnet:?xt=urn:btih:6654FEEF6D7B4BCDC8DD1CCB89F0403108939BDF ## Dependencies +Python3 flask pyMongo python-bitcoinlib + +##Installation procedure + [See Wiki](../../wiki/Installation-procedure) + +##Database structure + [See Wiki](../../wiki/Database-structure) \ No newline at end of file diff --git a/build_cluster.py b/build_cluster.py index b535c24..00ca9b1 100644 --- a/build_cluster.py +++ b/build_cluster.py @@ -14,43 +14,39 @@ def start(): start_block_id = int(sys.argv[1]) block_id = start_block_id process = None - try: - while builder.crawl_block(block_id): + while builder.crawl_block(block_id): + if settings.debug or block_id % 100 == 0: print("Block %d crawled" % block_id) - if block_id - start_block_id > 0 and (block_id - start_block_id) % settings.block_crawling_limit == 0: - builder.network_graph.check_integrity() - while process is not None and process.is_alive(): - print("Waiting for insertion thread to complete...") - process.join() - - if process is not None and process.exitcode > 0 : #error - raise Exception("Errorcode %d in DB Sync Thread, aborting" % process.exitcode) - process = Process(target=builder.network_graph.synchronize_mongo_db) - process.start() - builder.network_graph = cluster_network.ClusterNetwork(settings.db_server, settings.db_port) #Starting a new graph while other graph data is inserted. - - if process is not None and not process.is_alive() and process.exitcode > 0 : #error - raise Exception("Errorcode %d in DB Sync Thread, aborting" % process.exitcode) - block_id+=1 - - #Finished Crawling, Flushing to DB. - #Waiting for any previous DB Sync - while process is not None and process.is_alive(): - print("Waiting for insertion thread to complete...") - process.join() - - #Sync the rest - process = Process(target=builder.network_graph.synchronize_mongo_db) - process.start() + if block_id - start_block_id > 0 and (block_id - start_block_id) % settings.block_crawling_limit == 0: + builder.network_graph.check_integrity() + while process is not None and process.is_alive(): + print("Waiting for insertion thread to complete...") + process.join() + + if process is not None and process.exitcode > 0 : #error + raise Exception("Errorcode %d in DB Sync Thread, aborting" % process.exitcode) + process = Process(target=builder.network_graph.synchronize_mongo_db) + process.start() + builder.network_graph = cluster_network.ClusterNetwork(settings.db_server, settings.db_port) #Starting a new graph while other graph data is inserted. + builder.connect_to_bitcoind_rpc() + + if process is not None and not process.is_alive() and process.exitcode > 0 : #error + raise Exception("Errorcode %d in DB Sync Thread, aborting" % process.exitcode) + block_id+=1 + + #Finished Crawling, Flushing to DB. + #Waiting for any previous DB Sync + while process is not None and process.is_alive(): + print("Waiting for insertion thread to complete...") process.join() - #DONE! - - #For Debugging purpose - except: - input("An exception will rise ") - raise + #Sync the rest + print("Inserting into the DB") + process = Process(target=builder.network_graph.synchronize_mongo_db) + process.start() + process.join() + #DONE! if __name__ == "__main__": diff --git a/crawler/address_utils.py b/crawler/address_utils.py index 7564138..ced0e71 100644 --- a/crawler/address_utils.py +++ b/crawler/address_utils.py @@ -23,11 +23,12 @@ def get_hash160_from_cscript(self,script): elif script[0] == OP_HASH160 and script[-1] == OP_EQUAL:#P2SH script = script[1:] script = script[1:script[0]+1] - return self.convert_hash160_to_addr(script) + return self.convert_hash160_to_addr(script,network_id=b'\x05') #Multi-Sign Address elif script[-1] == OP_CHECKSIG: #V1 Validation With Public Key return self.convert_hash160_to_addr(self.convert_public_key_to_hash160(script)) - return None + + raise AttributeError("CScript Format not supported") def convert_public_key_to_hash160(self,pub_key_data): @@ -42,13 +43,12 @@ def convert_public_key_to_hash160(self,pub_key_data): return h.digest() - def convert_hash160_to_addr(self,hash_160): - hash_160 = b'\x00' + hash_160 + def convert_hash160_to_addr(self,hash_160, network_id=b'\x00'): + hash_160 = network_id + hash_160 hex_addr = (hash_160 + hashlib.sha256(hashlib.sha256(hash_160).digest()).digest()[:4]) return bitcoin.base58.encode(hex_addr) - ''' Taken From pybitcointools Converted to Python3 diff --git a/crawler/base_crawler.py b/crawler/base_crawler.py index 49fccb5..6ab88e4 100644 --- a/crawler/base_crawler.py +++ b/crawler/base_crawler.py @@ -1,44 +1,71 @@ import bitcoin import bitcoin.rpc import bitcoin.core.script -from datetime import datetime +import socket import binascii +import http.client +from crawler.address_utils import Addressutils from bitcoin.core import CTransaction +from settings import settings +from bitcoin.core.script import OP_FALSE class BaseCrawler: def __init__(self): - self.proxy = bitcoin.rpc.Proxy() - self.start = datetime.now() self.block_id = -1 + self.proxy = None + self.connect_to_bitcoind_rpc() + self.address_utils = Addressutils() - def crawl_block(self,block_id): - + def connect_to_bitcoind_rpc(self): + for i in range(1,settings.rcp_reconnect_max_retry+1): try: - self.block_id = block_id - hash = self.proxy.getblockhash(block_id) - except IndexError as ex: - print("Block not found") - return False + self.proxy = bitcoin.rpc.Proxy() + return + except http.client.HTTPException: + print("Caught a connection error from Bitcoind RCP, Reconnecting...(%d/%d)" %(i,settings.rcp_reconnect_max_retry)) + + + def crawl_block(self,block_id): + for i in range(1,settings.rcp_reconnect_max_retry+1): + try: + try: + self.block_id = block_id + block_hash = self.proxy.getblockhash(block_id) + except IndexError: + print("Block not found") + return False - block = self.proxy.getblock(hash) - for tx in block.vtx[1:]: #ignore mining tx - self.parse_transaction(tx,block) - return True + block = self.proxy.getblock(block_hash) + for tx in block.vtx[1:]: #ignore mining tx + self.parse_transaction(tx,block) + return True + except socket.error: + print("Caught an error from Bitcoind RCP, Reconnecting and retrying...(%d/%d)" %(i,settings.rcp_reconnect_max_retry)) + self.connect_to_bitcoind_rpc() def parse_transaction(self,transaction,block): assert isinstance(transaction,CTransaction) - transaction.GetHash() - signed_script_input = [] - try: - for vin in transaction.vin: - push_data_sig = vin.scriptSig[0] - signed_script = vin.scriptSig[1:] - signed_script = signed_script[push_data_sig:] - signed_script_input.append(signed_script) - self.do_work(signed_script_input, transaction.vout,block) - except: - print("WARNING : Unable to process transaction ", binascii.hexlify(transaction.GetHash()[::-1]) ) - - def do_work(self,inputs,outputs,block): + input_addresses = set() + trx_hash = binascii.hexlify(transaction.GetHash()[::-1]).decode('utf-8') + for vin in transaction.vin: + try: + sign_script = vin.scriptSig + push_data_sig = sign_script[0] + sign_script = sign_script[1:] + sign_script = sign_script[push_data_sig:] + + if len(sign_script) > 0: + input_addresses.add(self.address_utils.convert_hash160_to_addr(self.address_utils.convert_public_key_to_hash160(sign_script))) + else: + prevtxout = self.proxy.getrawtransaction(vin.prevout.hash).vout[vin.prevout.n] + input_addresses.add(self.address_utils.get_hash160_from_cscript(prevtxout.scriptPubKey)) + except Exception as ex: + if settings.debug: + print("Transaction %s Unable To Parse SigScript %s"%(trx_hash,binascii.hexlify(vin.scriptSig))) + print(ex) + + self.do_work(input_addresses, transaction.vout,block,trx_hash) + + def do_work(self,inputs_addresses,outputs_scripts,block,trx_hash): raise NotImplementedError("Not implemented method do_work") \ No newline at end of file diff --git a/crawler/cluster_crawler.py b/crawler/cluster_crawler.py index b076102..141108c 100644 --- a/crawler/cluster_crawler.py +++ b/crawler/cluster_crawler.py @@ -6,9 +6,12 @@ class ClusterCrawler(base_crawler.BaseCrawler): def __init__(self): super().__init__() - self.to_crawl =[] - self.crawled = [] self.network_graph = cluster_network.ClusterNetwork(settings.db_server, settings.db_port) - def do_work(self,inputs, outputs,block): - self.network_graph.process_transaction_data(inputs, outputs) \ No newline at end of file + def do_work(self,inputs_addresses, outputs,block,trx_hash): + if len(inputs_addresses) == 0: + return + self.network_graph.process_transaction_data(inputs_addresses, outputs) + + def start_new_graph(self): + self.network_graph = cluster_network.ClusterNetwork(settings.db_server, settings.db_port) \ No newline at end of file diff --git a/crawler/cluster_network.py b/crawler/cluster_network.py index 071fab0..c6ede2e 100644 --- a/crawler/cluster_network.py +++ b/crawler/cluster_network.py @@ -30,26 +30,9 @@ def chunks(self,l, n): n = max(1, n) return [l[i:i + n] for i in range(0, len(l), n)] - def parse_addresses(self,inputs): - addresses_in = set() - for i in inputs: - try: - addresses_in.add(self.addr_utils.convert_hash160_to_addr(self.addr_utils.convert_public_key_to_hash160(i))) - except: - pass - #print("Unable to parse an input address. Ignoring this address") - - - if len(addresses_in) ==0: - return None - - return addresses_in def process_transaction_data(self,inputs, outputs): - addresses_in = self.parse_addresses(inputs) - if addresses_in is None: - return - self.merge_into_graph(addresses_in) + self.merge_into_graph(inputs) def merge_into_graph(self,addresses_in): new_node_addresses = [] @@ -80,6 +63,10 @@ def synchronize_mongo_db(self): collection = db.addresses transactions = db.transactions db_next_node_id = 1 + + #Ensure index existence + collection.create_index([("n_id", DESCENDING)]) + for x in collection.find().sort("n_id",DESCENDING).limit(1): db_next_node_id = x['n_id'] +1 diff --git a/crawler/money_crawler.py b/crawler/money_crawler.py index 5fc446f..8ccb9e4 100644 --- a/crawler/money_crawler.py +++ b/crawler/money_crawler.py @@ -2,7 +2,8 @@ import json import urllib.request -from pymongo import MongoClient + +from pymongo import MongoClient, ASCENDING, DESCENDING from crawler import address_utils, base_crawler from settings import settings @@ -20,41 +21,59 @@ def __init__(self): self.conversion_table = json.loads(urllib.request.urlopen(url).read().decode('utf8'))['bpi'] self.conversion_table[today] = json.loads(urllib.request.urlopen("https://api.coindesk.com/v1/bpi/currentprice/USD.json").read().decode('utf8'))['bpi']['USD']['rate_float'] + self.cache_nodeid_addresses = dict() + - def do_work(self,inputs, outputs, block): - if len(inputs) == 0: #No Valid Tx, an empty block with only one mining tx + def do_work(self,inputs_addresses, outputs, block, trx_hash): + if len(inputs_addresses) == 0: #No Valid Tx, an empty block with only one mining tx return try: - source = self.addr_utils.convert_hash160_to_addr(self.addr_utils.convert_public_key_to_hash160(inputs[0])) + source = inputs_addresses.pop() - cursor_source_n_id = self.client.bitcoin.addresses.find_one({"_id":source}) - if cursor_source_n_id is not None: - source_n_id = cursor_source_n_id['n_id'] + if source in self.cache_nodeid_addresses: + source_n_id = self.cache_nodeid_addresses[source] else: - source_n_id = -1 + cursor_source_n_id = self.client.bitcoin.addresses.find_one({"_id":source}) + if cursor_source_n_id is not None: + source_n_id = cursor_source_n_id['n_id'] + else: + source_n_id = -1 + self.cache_nodeid_addresses[source] = source_n_id for out in outputs: dest = self.addr_utils.get_hash160_from_cscript(out.scriptPubKey) - cursor_destination_n_id = self.client.bitcoin.addresses.find_one({"_id":dest}) - if cursor_destination_n_id is not None: - destination_n_id = cursor_destination_n_id['n_id'] + if dest in self.cache_nodeid_addresses: + destination_n_id = self.cache_nodeid_addresses[dest] else: - destination_n_id = -1 + cursor_destination_n_id = self.client.bitcoin.addresses.find_one({"_id":dest}) + if cursor_destination_n_id is not None: + destination_n_id = cursor_destination_n_id['n_id'] + else: + destination_n_id = -1 + self.cache_nodeid_addresses[dest] = destination_n_id amount_btc = (out.nValue/100000000) date = datetime.datetime.fromtimestamp(block.nTime).strftime('%Y-%m-%d') - amount_usd = self.conversion_table[date] * amount_btc - entry = {'block_id':self.block_id,'source_n_id':source_n_id,'source':source,'destination_n_id':destination_n_id,'destination':dest,'amount':amount_btc, 'amount_usd':amount_usd, 'trx_date':date} + amount_usd = 0 + if date in self.conversion_table: + amount_usd = self.conversion_table[date] * amount_btc + elif settings.debug: + print("Warning. Conversion rate from BTC to USD not found for date %s:"%date) + + entry = {'block_id':self.block_id,'source_n_id':source_n_id,'source':source,'destination_n_id':destination_n_id,'destination':dest,'amount':amount_btc, 'amount_usd':amount_usd, 'trx_date':date, 'trx_hash':trx_hash} self.money_movements.append(entry) - except Exception: - #print("Unable to parse Tx for Money : %s" % repr(outputs)) + except Exception as ex: + if settings.debug: + print("Unable to parse Tx for Money : %s" % repr(outputs)) + print(ex) return def insert_into_db(self): if len(self.money_movements) == 0: - print("Warning: no money movements to insert. Aborting.") + if settings.debug: + print("Warning: no money movements to insert. Aborting.") return db = self.client.bitcoin @@ -62,4 +81,12 @@ def insert_into_db(self): collection.insert_many(self.money_movements, ordered=False) print("DB Sync Finished") - + def ensure_indexes(self): + #Ensure index existence + db = self.client.bitcoin + collection = db.transactions + collection.create_index([("source_n_id", ASCENDING)]) + collection.create_index([("destination_n_id", ASCENDING)]) + collection.create_index([("source", ASCENDING)]) + collection.create_index([("destination", ASCENDING)]) + collection.create_index([("block_id",DESCENDING)]) diff --git a/map_money.py b/map_money.py index 31d0cd1..349cfdc 100644 --- a/map_money.py +++ b/map_money.py @@ -1,7 +1,5 @@ import sys -import bitcoin.rpc - from crawler.money_crawler import MoneyCrawler from settings import settings @@ -12,17 +10,25 @@ def start(start_block_id, end_block_id): try: while block_id <= end_block_id and mapper.crawl_block(block_id): - print("Money of Block %d mapped" % block_id) + if settings.debug or block_id %100 == 0: + print("Money of Block %d mapped" % block_id) if block_id - start_block_id > 0 and (block_id - start_block_id) % settings.block_crawling_limit == 0: mapper.insert_into_db() mapper.money_movements = [] - mapper.proxy = bitcoin.rpc.Proxy() + mapper.cache_nodeid_addresses = dict() + mapper.connect_to_bitcoind_rpc() block_id+=1 #Sync the rest + print("Inserting the last records in the DB") mapper.insert_into_db() + + print("Ensure that indexes are created") + mapper.ensure_indexes() + + print("Done!") mapper.client.close() #DONE! diff --git a/settings/settings.py b/settings/settings.py index 34905b1..8010a0d 100644 --- a/settings/settings.py +++ b/settings/settings.py @@ -1,4 +1,6 @@ db_server = "127.0.0.1" db_port = 27017 block_crawling_limit = 2500 -max_batch_insert = 10000 \ No newline at end of file +max_batch_insert = 10000 +rcp_reconnect_max_retry = 10 +debug = False \ No newline at end of file diff --git a/start_website.py b/start_website.py index 942434a..2e210ea 100644 --- a/start_website.py +++ b/start_website.py @@ -1,4 +1,24 @@ +#!/usr/bin/python3 from web.web import app +import optparse if __name__ == '__main__': - app.run(threaded=True) \ No newline at end of file + default_ip="127.0.0.1" + default_port="5000" + parser = optparse.OptionParser() + parser.add_option("-i", "--ip", + help="IP address (use 0.0.0.0 for all) [default %s]" % default_ip, default=default_ip) + parser.add_option("-p", "--port", + help="Port to listen to [default %s]" % default_port, default=default_port) + parser.add_option("-d", "--debug", + action="store_true", dest="debug", + help="Enable debug mode") + + options, _ = parser.parse_args() + + app.run( + debug=options.debug, + host=options.ip, + port=int(options.port), + threaded=True + )