Skip to content

Commit

Permalink
Merge pull request #1 from mathieulavoie/master
Browse files Browse the repository at this point in the history
merge upstream
  • Loading branch information
cryptowest authored Sep 29, 2016
2 parents 1867ddf + e82db0a commit 0d23702
Show file tree
Hide file tree
Showing 10 changed files with 186 additions and 111 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ http://www.bit-cluster.com
magnet:?xt=urn:btih:6654FEEF6D7B4BCDC8DD1CCB89F0403108939BDF

## Dependencies
Python3
flask
pyMongo
python-bitcoinlib

##Installation procedure
[See Wiki](../../wiki/Installation-procedure)

##Database structure
[See Wiki](../../wiki/Database-structure)
62 changes: 29 additions & 33 deletions build_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,43 +14,39 @@ def start():
start_block_id = int(sys.argv[1])
block_id = start_block_id
process = None
try:
while builder.crawl_block(block_id):
while builder.crawl_block(block_id):
if settings.debug or block_id % 100 == 0:
print("Block %d crawled" % block_id)

if block_id - start_block_id > 0 and (block_id - start_block_id) % settings.block_crawling_limit == 0:
builder.network_graph.check_integrity()
while process is not None and process.is_alive():
print("Waiting for insertion thread to complete...")
process.join()

if process is not None and process.exitcode > 0 : #error
raise Exception("Errorcode %d in DB Sync Thread, aborting" % process.exitcode)
process = Process(target=builder.network_graph.synchronize_mongo_db)
process.start()
builder.network_graph = cluster_network.ClusterNetwork(settings.db_server, settings.db_port) #Starting a new graph while other graph data is inserted.

if process is not None and not process.is_alive() and process.exitcode > 0 : #error
raise Exception("Errorcode %d in DB Sync Thread, aborting" % process.exitcode)
block_id+=1

#Finished Crawling, Flushing to DB.
#Waiting for any previous DB Sync
while process is not None and process.is_alive():
print("Waiting for insertion thread to complete...")
process.join()

#Sync the rest
process = Process(target=builder.network_graph.synchronize_mongo_db)
process.start()
if block_id - start_block_id > 0 and (block_id - start_block_id) % settings.block_crawling_limit == 0:
builder.network_graph.check_integrity()
while process is not None and process.is_alive():
print("Waiting for insertion thread to complete...")
process.join()

if process is not None and process.exitcode > 0 : #error
raise Exception("Errorcode %d in DB Sync Thread, aborting" % process.exitcode)
process = Process(target=builder.network_graph.synchronize_mongo_db)
process.start()
builder.network_graph = cluster_network.ClusterNetwork(settings.db_server, settings.db_port) #Starting a new graph while other graph data is inserted.
builder.connect_to_bitcoind_rpc()

if process is not None and not process.is_alive() and process.exitcode > 0 : #error
raise Exception("Errorcode %d in DB Sync Thread, aborting" % process.exitcode)
block_id+=1

#Finished Crawling, Flushing to DB.
#Waiting for any previous DB Sync
while process is not None and process.is_alive():
print("Waiting for insertion thread to complete...")
process.join()

#DONE!

#For Debugging purpose
except:
input("An exception will rise ")
raise
#Sync the rest
print("Inserting into the DB")
process = Process(target=builder.network_graph.synchronize_mongo_db)
process.start()
process.join()
#DONE!


if __name__ == "__main__":
Expand Down
10 changes: 5 additions & 5 deletions crawler/address_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ def get_hash160_from_cscript(self,script):
elif script[0] == OP_HASH160 and script[-1] == OP_EQUAL:#P2SH
script = script[1:]
script = script[1:script[0]+1]
return self.convert_hash160_to_addr(script)
return self.convert_hash160_to_addr(script,network_id=b'\x05') #Multi-Sign Address

elif script[-1] == OP_CHECKSIG: #V1 Validation With Public Key
return self.convert_hash160_to_addr(self.convert_public_key_to_hash160(script))
return None

raise AttributeError("CScript Format not supported")


def convert_public_key_to_hash160(self,pub_key_data):
Expand All @@ -42,13 +43,12 @@ def convert_public_key_to_hash160(self,pub_key_data):
return h.digest()


def convert_hash160_to_addr(self,hash_160):
hash_160 = b'\x00' + hash_160
def convert_hash160_to_addr(self,hash_160, network_id=b'\x00'):
hash_160 = network_id + hash_160
hex_addr = (hash_160 + hashlib.sha256(hashlib.sha256(hash_160).digest()).digest()[:4])
return bitcoin.base58.encode(hex_addr)



'''
Taken From pybitcointools
Converted to Python3
Expand Down
81 changes: 54 additions & 27 deletions crawler/base_crawler.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,71 @@
import bitcoin
import bitcoin.rpc
import bitcoin.core.script
from datetime import datetime
import socket
import binascii
import http.client
from crawler.address_utils import Addressutils
from bitcoin.core import CTransaction
from settings import settings
from bitcoin.core.script import OP_FALSE


class BaseCrawler:
def __init__(self):
self.proxy = bitcoin.rpc.Proxy()
self.start = datetime.now()
self.block_id = -1
self.proxy = None
self.connect_to_bitcoind_rpc()
self.address_utils = Addressutils()

def crawl_block(self,block_id):

def connect_to_bitcoind_rpc(self):
for i in range(1,settings.rcp_reconnect_max_retry+1):
try:
self.block_id = block_id
hash = self.proxy.getblockhash(block_id)
except IndexError as ex:
print("Block not found")
return False
self.proxy = bitcoin.rpc.Proxy()
return
except http.client.HTTPException:
print("Caught a connection error from Bitcoind RCP, Reconnecting...(%d/%d)" %(i,settings.rcp_reconnect_max_retry))


def crawl_block(self,block_id):
for i in range(1,settings.rcp_reconnect_max_retry+1):
try:
try:
self.block_id = block_id
block_hash = self.proxy.getblockhash(block_id)
except IndexError:
print("Block not found")
return False

block = self.proxy.getblock(hash)
for tx in block.vtx[1:]: #ignore mining tx
self.parse_transaction(tx,block)
return True
block = self.proxy.getblock(block_hash)
for tx in block.vtx[1:]: #ignore mining tx
self.parse_transaction(tx,block)
return True
except socket.error:
print("Caught an error from Bitcoind RCP, Reconnecting and retrying...(%d/%d)" %(i,settings.rcp_reconnect_max_retry))
self.connect_to_bitcoind_rpc()

def parse_transaction(self,transaction,block):
assert isinstance(transaction,CTransaction)
transaction.GetHash()
signed_script_input = []
try:
for vin in transaction.vin:
push_data_sig = vin.scriptSig[0]
signed_script = vin.scriptSig[1:]
signed_script = signed_script[push_data_sig:]
signed_script_input.append(signed_script)
self.do_work(signed_script_input, transaction.vout,block)
except:
print("WARNING : Unable to process transaction ", binascii.hexlify(transaction.GetHash()[::-1]) )

def do_work(self,inputs,outputs,block):
input_addresses = set()
trx_hash = binascii.hexlify(transaction.GetHash()[::-1]).decode('utf-8')
for vin in transaction.vin:
try:
sign_script = vin.scriptSig
push_data_sig = sign_script[0]
sign_script = sign_script[1:]
sign_script = sign_script[push_data_sig:]

if len(sign_script) > 0:
input_addresses.add(self.address_utils.convert_hash160_to_addr(self.address_utils.convert_public_key_to_hash160(sign_script)))
else:
prevtxout = self.proxy.getrawtransaction(vin.prevout.hash).vout[vin.prevout.n]
input_addresses.add(self.address_utils.get_hash160_from_cscript(prevtxout.scriptPubKey))
except Exception as ex:
if settings.debug:
print("Transaction %s Unable To Parse SigScript %s"%(trx_hash,binascii.hexlify(vin.scriptSig)))
print(ex)

self.do_work(input_addresses, transaction.vout,block,trx_hash)

def do_work(self,inputs_addresses,outputs_scripts,block,trx_hash):
raise NotImplementedError("Not implemented method do_work")
11 changes: 7 additions & 4 deletions crawler/cluster_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
class ClusterCrawler(base_crawler.BaseCrawler):
def __init__(self):
super().__init__()
self.to_crawl =[]
self.crawled = []
self.network_graph = cluster_network.ClusterNetwork(settings.db_server, settings.db_port)

def do_work(self,inputs, outputs,block):
self.network_graph.process_transaction_data(inputs, outputs)
def do_work(self,inputs_addresses, outputs,block,trx_hash):
if len(inputs_addresses) == 0:
return
self.network_graph.process_transaction_data(inputs_addresses, outputs)

def start_new_graph(self):
self.network_graph = cluster_network.ClusterNetwork(settings.db_server, settings.db_port)
23 changes: 5 additions & 18 deletions crawler/cluster_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,9 @@ def chunks(self,l, n):
n = max(1, n)
return [l[i:i + n] for i in range(0, len(l), n)]

def parse_addresses(self,inputs):
addresses_in = set()
for i in inputs:
try:
addresses_in.add(self.addr_utils.convert_hash160_to_addr(self.addr_utils.convert_public_key_to_hash160(i)))
except:
pass
#print("Unable to parse an input address. Ignoring this address")


if len(addresses_in) ==0:
return None

return addresses_in

def process_transaction_data(self,inputs, outputs):
addresses_in = self.parse_addresses(inputs)
if addresses_in is None:
return
self.merge_into_graph(addresses_in)
self.merge_into_graph(inputs)

def merge_into_graph(self,addresses_in):
new_node_addresses = []
Expand Down Expand Up @@ -80,6 +63,10 @@ def synchronize_mongo_db(self):
collection = db.addresses
transactions = db.transactions
db_next_node_id = 1

#Ensure index existence
collection.create_index([("n_id", DESCENDING)])

for x in collection.find().sort("n_id",DESCENDING).limit(1):
db_next_node_id = x['n_id'] +1

Expand Down
63 changes: 45 additions & 18 deletions crawler/money_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import json
import urllib.request

from pymongo import MongoClient

from pymongo import MongoClient, ASCENDING, DESCENDING

from crawler import address_utils, base_crawler
from settings import settings
Expand All @@ -20,46 +21,72 @@ def __init__(self):
self.conversion_table = json.loads(urllib.request.urlopen(url).read().decode('utf8'))['bpi']
self.conversion_table[today] = json.loads(urllib.request.urlopen("https://api.coindesk.com/v1/bpi/currentprice/USD.json").read().decode('utf8'))['bpi']['USD']['rate_float']

self.cache_nodeid_addresses = dict()


def do_work(self,inputs, outputs, block):
if len(inputs) == 0: #No Valid Tx, an empty block with only one mining tx
def do_work(self,inputs_addresses, outputs, block, trx_hash):
if len(inputs_addresses) == 0: #No Valid Tx, an empty block with only one mining tx
return
try:
source = self.addr_utils.convert_hash160_to_addr(self.addr_utils.convert_public_key_to_hash160(inputs[0]))
source = inputs_addresses.pop()

cursor_source_n_id = self.client.bitcoin.addresses.find_one({"_id":source})
if cursor_source_n_id is not None:
source_n_id = cursor_source_n_id['n_id']
if source in self.cache_nodeid_addresses:
source_n_id = self.cache_nodeid_addresses[source]
else:
source_n_id = -1
cursor_source_n_id = self.client.bitcoin.addresses.find_one({"_id":source})
if cursor_source_n_id is not None:
source_n_id = cursor_source_n_id['n_id']
else:
source_n_id = -1
self.cache_nodeid_addresses[source] = source_n_id


for out in outputs:
dest = self.addr_utils.get_hash160_from_cscript(out.scriptPubKey)
cursor_destination_n_id = self.client.bitcoin.addresses.find_one({"_id":dest})
if cursor_destination_n_id is not None:
destination_n_id = cursor_destination_n_id['n_id']
if dest in self.cache_nodeid_addresses:
destination_n_id = self.cache_nodeid_addresses[dest]
else:
destination_n_id = -1
cursor_destination_n_id = self.client.bitcoin.addresses.find_one({"_id":dest})
if cursor_destination_n_id is not None:
destination_n_id = cursor_destination_n_id['n_id']
else:
destination_n_id = -1
self.cache_nodeid_addresses[dest] = destination_n_id

amount_btc = (out.nValue/100000000)
date = datetime.datetime.fromtimestamp(block.nTime).strftime('%Y-%m-%d')
amount_usd = self.conversion_table[date] * amount_btc
entry = {'block_id':self.block_id,'source_n_id':source_n_id,'source':source,'destination_n_id':destination_n_id,'destination':dest,'amount':amount_btc, 'amount_usd':amount_usd, 'trx_date':date}
amount_usd = 0
if date in self.conversion_table:
amount_usd = self.conversion_table[date] * amount_btc
elif settings.debug:
print("Warning. Conversion rate from BTC to USD not found for date %s:"%date)

entry = {'block_id':self.block_id,'source_n_id':source_n_id,'source':source,'destination_n_id':destination_n_id,'destination':dest,'amount':amount_btc, 'amount_usd':amount_usd, 'trx_date':date, 'trx_hash':trx_hash}
self.money_movements.append(entry)
except Exception:
#print("Unable to parse Tx for Money : %s" % repr(outputs))
except Exception as ex:
if settings.debug:
print("Unable to parse Tx for Money : %s" % repr(outputs))
print(ex)
return


def insert_into_db(self):
if len(self.money_movements) == 0:
print("Warning: no money movements to insert. Aborting.")
if settings.debug:
print("Warning: no money movements to insert. Aborting.")
return

db = self.client.bitcoin
collection = db.transactions
collection.insert_many(self.money_movements, ordered=False)
print("DB Sync Finished")


def ensure_indexes(self):
#Ensure index existence
db = self.client.bitcoin
collection = db.transactions
collection.create_index([("source_n_id", ASCENDING)])
collection.create_index([("destination_n_id", ASCENDING)])
collection.create_index([("source", ASCENDING)])
collection.create_index([("destination", ASCENDING)])
collection.create_index([("block_id",DESCENDING)])
Loading

0 comments on commit 0d23702

Please sign in to comment.