From 2c85c0d217056a5957b858fe1b5c2920a6c52ab7 Mon Sep 17 00:00:00 2001 From: Heiko Heiko Date: Mon, 12 Jan 2015 10:34:34 +0100 Subject: [PATCH] work in progress --- .editorconfig | 21 ++ .travis.yml | 15 ++ AUTHORS.rst | 13 + CONTRIBUTING.rst | 117 +++++++++ HISTORY.rst | 9 + LICENSE | 12 + MANIFEST.in | 11 + Makefile | 64 +++++ README.rst | 23 ++ devp2p/app.py | 86 +++++++ devp2p/crypto.py | 68 ++++++ devp2p/encryption.py | 285 ++++++++++++++++++++++ devp2p/jsonrpc.py | 59 +++++ devp2p/multiplexer.py | 519 ++++++++++++++++++++++++++++++++++++++++ devp2p/peer.py | 131 ++++++++++ devp2p/peermanager.py | 92 +++++++ devp2p/protocol.py | 318 ++++++++++++++++++++++++ devp2p/serialization.py | 101 ++++++++ devp2p/service.py | 36 +++ devp2p/slogging.py | 16 ++ requirements.txt | 1 + setup.cfg | 2 + setup.py | 53 ++++ tox.ini | 9 + 24 files changed, 2061 insertions(+) create mode 100644 .editorconfig create mode 100644 .travis.yml create mode 100644 AUTHORS.rst create mode 100644 CONTRIBUTING.rst create mode 100644 HISTORY.rst create mode 100644 LICENSE create mode 100644 MANIFEST.in create mode 100644 Makefile create mode 100644 README.rst create mode 100644 devp2p/app.py create mode 100644 devp2p/crypto.py create mode 100644 devp2p/encryption.py create mode 100644 devp2p/jsonrpc.py create mode 100644 devp2p/multiplexer.py create mode 100644 devp2p/peer.py create mode 100644 devp2p/peermanager.py create mode 100644 devp2p/protocol.py create mode 100644 devp2p/serialization.py create mode 100644 devp2p/service.py create mode 100644 devp2p/slogging.py create mode 100644 requirements.txt create mode 100644 setup.cfg create mode 100755 setup.py create mode 100644 tox.ini diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..f7cfe80 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,21 @@ +# http://editorconfig.org + +root = true + +[*] +indent_style = space +indent_size = 4 +trim_trailing_whitespace = true +insert_final_newline = true +charset = utf-8 +end_of_line = lf + +[*.bat] +indent_style = tab +end_of_line = crlf + +[LICENSE] +insert_final_newline = false + +[Makefile] +indent_style = tab \ No newline at end of file diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..e9faee2 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,15 @@ +# Config file for automatic testing at travis-ci.org + +language: python + +python: + - "3.4" + - "3.3" + - "2.7" + - "pypy" + +# command to install dependencies, e.g. pip install -r requirements.txt --use-mirrors +install: pip install -r requirements.txt + +# command to run tests, e.g. python setup.py test +script: python setup.py test diff --git a/AUTHORS.rst b/AUTHORS.rst new file mode 100644 index 0000000..8215620 --- /dev/null +++ b/AUTHORS.rst @@ -0,0 +1,13 @@ +======= +Credits +======= + +Development Lead +---------------- + +* Heiko Heiko + +Contributors +------------ + +None yet. Why not be the first? \ No newline at end of file diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst new file mode 100644 index 0000000..f701bfc --- /dev/null +++ b/CONTRIBUTING.rst @@ -0,0 +1,117 @@ +============ +Contributing +============ + +Contributions are welcome, and they are greatly appreciated! Every +little bit helps, and credit will always be given. + +You can contribute in many ways: + +Types of Contributions +---------------------- + +Report Bugs +~~~~~~~~~~~ + +Report bugs at https://github.com/heikoheiko/pydevp2p/issues. + +If you are reporting a bug, please include: + +* Your operating system name and version. +* Any details about your local setup that might be helpful in troubleshooting. +* Detailed steps to reproduce the bug. + +Fix Bugs +~~~~~~~~ + +Look through the GitHub issues for bugs. Anything tagged with "bug" +is open to whoever wants to implement it. + +Implement Features +~~~~~~~~~~~~~~~~~~ + +Look through the GitHub issues for features. Anything tagged with "feature" +is open to whoever wants to implement it. + +Write Documentation +~~~~~~~~~~~~~~~~~~~ + +pydevp2p could always use more documentation, whether as part of the +official pydevp2p docs, in docstrings, or even on the web in blog posts, +articles, and such. + +Submit Feedback +~~~~~~~~~~~~~~~ + +The best way to send feedback is to file an issue at https://github.com/heikoheiko/pydevp2p/issues. + +If you are proposing a feature: + +* Explain in detail how it would work. +* Keep the scope as narrow as possible, to make it easier to implement. +* Remember that this is a volunteer-driven project, and that contributions + are welcome :) + +Get Started! +------------ + +Ready to contribute? Here's how to set up `pydevp2p` for local development. + +1. Fork the `pydevp2p` repo on GitHub. +2. Clone your fork locally:: + + $ git clone git@github.com:your_name_here/pydevp2p.git + +3. Install your local copy into a virtualenv. Assuming you have virtualenvwrapper installed, this is how you set up your fork for local development:: + + $ mkvirtualenv pydevp2p + $ cd pydevp2p/ + $ python setup.py develop + +4. Create a branch for local development:: + + $ git checkout -b name-of-your-bugfix-or-feature + + Now you can make your changes locally. + +5. When you're done making changes, check that your changes pass flake8 and the tests, including testing other Python versions with tox:: + + $ flake8 pydevp2p tests + $ python setup.py test + $ tox + + To get flake8 and tox, just pip install them into your virtualenv. + +6. Commit your changes and push your branch to GitHub:: + + $ git add . + $ git commit -m "Your detailed description of your changes." + $ git push origin name-of-your-bugfix-or-feature + +7. Submit a pull request through the GitHub website. + +Pull Request Guidelines +----------------------- + +Before you submit a pull request, check that it meets these guidelines: + +1. The pull request should include tests. +2. If the pull request adds functionality, the docs should be updated. Put + your new functionality into a function with a docstring, and add the + feature to the list in README.rst. +3. The pull request should work for Python 2.6, 2.7, 3.3, and 3.4, and for PyPy. Check + https://travis-ci.org/heikoheiko/pydevp2p/pull_requests + and make sure that the tests pass for all supported Python versions. + +Tips +---- + +To run a subset of tests:: + + $ python -m unittest tests.test_pydevp2p + +To do a release:: + +Check out these sources: + Checklist: https://gist.github.com/audreyr/5990987 + Helper: https://github.com/michaeljoseph/changes \ No newline at end of file diff --git a/HISTORY.rst b/HISTORY.rst new file mode 100644 index 0000000..16a5f7c --- /dev/null +++ b/HISTORY.rst @@ -0,0 +1,9 @@ +.. :changelog: + +History +------- + +0.0.1 (2014-01-11) +--------------------- + +* First release on PyPI. \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..5705a42 --- /dev/null +++ b/LICENSE @@ -0,0 +1,12 @@ +Copyright (c) 2014, Heiko Heiko +All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. + +* Neither the name of pydevp2p nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..6fd9409 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,11 @@ +include AUTHORS.rst +include CONTRIBUTING.rst +include HISTORY.rst +include LICENSE +include README.rst + +recursive-include tests * +recursive-exclude * __pycache__ +recursive-exclude * *.py[co] + +recursive-include docs *.rst conf.py Makefile make.bat \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..f7f185d --- /dev/null +++ b/Makefile @@ -0,0 +1,64 @@ +.PHONY: clean-pyc clean-build docs clean + +help: + @echo "clean - remove all build, test, coverage and Python artifacts" + @echo "clean-build - remove build artifacts" + @echo "clean-pyc - remove Python file artifacts" + @echo "clean-test - remove test and coverage artifacts" + @echo "lint - check style with flake8" + @echo "test - run tests quickly with the default Python" + @echo "test-all - run tests on every Python version with tox" + @echo "coverage - check code coverage quickly with the default Python" + @echo "docs - generate Sphinx HTML documentation, including API docs" + @echo "release - package and upload a release" + @echo "dist - package" + +clean: clean-build clean-pyc clean-test + +clean-build: + rm -fr build/ + rm -fr dist/ + rm -fr *.egg-info + +clean-pyc: + find . -name '*.pyc' -exec rm -f {} + + find . -name '*.pyo' -exec rm -f {} + + find . -name '*~' -exec rm -f {} + + find . -name '__pycache__' -exec rm -fr {} + + +clean-test: + rm -fr .tox/ + rm -f .coverage + rm -fr htmlcov/ + +lint: + flake8 pydevp2p tests + +test: + python setup.py test + +test-all: + tox + +coverage: + coverage run --source devp2p setup.py test + coverage report -m + coverage html + open htmlcov/index.html + +docs: + rm -f docs/pydevp2p.rst + rm -f docs/modules.rst + sphinx-apidoc -o docs/ pydevp2p + $(MAKE) -C docs clean + $(MAKE) -C docs html + open docs/_build/html/index.html + +release: clean + python setup.py sdist upload + python setup.py bdist_wheel upload + +dist: clean + python setup.py sdist + python setup.py bdist_wheel + ls -l dist diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..bcbe0a5 --- /dev/null +++ b/README.rst @@ -0,0 +1,23 @@ +=============================== +pydevp2p +=============================== + +.. image:: https://badge.fury.io/py/pydevp2p.png + :target: http://badge.fury.io/py/pydevp2p + +.. image:: https://travis-ci.org/heikoheiko/pydevp2p.png?branch=master + :target: https://travis-ci.org/heikoheiko/pydevp2p + +.. image:: https://pypip.in/d/pydevp2p/badge.png + :target: https://pypi.python.org/pypi/pydevp2p + + +Python implementation of the Ethereum P@P2P stack + +* Free software: BSD license +* Documentation: https://pydevp2p.readthedocs.org. + +Features +-------- + +* TODO \ No newline at end of file diff --git a/devp2p/app.py b/devp2p/app.py new file mode 100644 index 0000000..e0b4f4d --- /dev/null +++ b/devp2p/app.py @@ -0,0 +1,86 @@ +from collections import UserDict +from service import BaseService +from slogging import get_logger +log = get_logger('app') + + +class BaseApp(object): + + def __init__(self, config): + self.config = config + self.services = UserDict() + + def register_service(self, service): + """ + registeres protocol with peer, which will be accessible as + peer. (e.g. peer.p2p or peer.eth) + """ + assert isinstance(service, BaseService) + assert service.name not in self.services + log.info('registering service', service=service.name) + self.services[service.name] = service + setattr(self.services, service.name, service) + + def deregister_service(self, service): + assert isinstance(service, BaseService) + self._services.remove(service) + delattr(self.services, service.name) + + def start(self): + for service in self._services: + service.start() + + def stop(self): + for service in self._services: + service.stop() + + +if __name__ == '__main__': + # config + import ConfigParser + import io + import sys + import signal + import gevent + from peermanager import PeerManager + from jsonrpc import JSONRPCServer + + import slogging + log = slogging.get_logger('app') + + # read config + sample_config = """ +[p2p] +num_peers = 10 +bootstrap_host = localhost +bootstrap_port = 30303 +listen_host = 127.0.0.1 +listen_port = 30302 + """ + config = ConfigParser.ConfigParser() + if len(sys.argv) == 1: + config.readfp(io.BytesIO(sample_config)) + else: + fn = sys.argv[1] + log.info('loading config from', fn=fn) + config.readfp(open(fn)) + + # create app + app = BaseApp(config) + + # register services + PeerManager.register_with_app(app) + JSONRPCServer.register_with_app(app) + + # start app + app.start() + + # wait for interupt + evt = gevent.event.Event() + gevent.signal(signal.SIGQUIT, evt.set) + gevent.signal(signal.SIGTERM, evt.set) + gevent.signal(signal.SIGINT, evt.set) + evt.wait() + + # finally stop + app.stop() diff --git a/devp2p/crypto.py b/devp2p/crypto.py new file mode 100644 index 0000000..068d275 --- /dev/null +++ b/devp2p/crypto.py @@ -0,0 +1,68 @@ +#!/usr/bin/python + +CURVE = 'secp256k1' +CIPHERNAME = 'aes-256-ctr' + +# FIX PATH ON OS X () +# https://github.com/yann2192/pyelliptic/issues/11 +import os +import sys +_openssl_lib_paths = ['/usr/local/Cellar/openssl/'] +for p in _openssl_lib_paths: + if os.path.exists(p): + p = os.path.join(p, os.listdir(p)[-1], 'lib') + os.environ['DYLD_LIBRARY_PATH'] = p + import pyelliptic + if CIPHERNAME in pyelliptic.Cipher.get_all_cipher(): + break +if not CIPHERNAME in pyelliptic.Cipher.get_all_cipher(): + print 'required cipher %s not available in openssl library' % CIPHERNAME + if sys.platform == 'darwin': + print 'use homebrew to install newer openssl' + print '> brew install openssl' + sys.exit(1) + +import bitcoin +from sha3 import sha3_256 + + +class ECCx(pyelliptic.ECC): + + def __init__(self, pubkey=None, privkey=None): + assert len(pubkey) == 64 # 512bit + if pubkey: + pubkey_x = pubkey[:32] + pubkey_y = pubkey[32:] + else: + pubkey_x, pubkey_y = None, None + pyelliptic.ECC.__init__(self, pubkey_x=pubkey_x, pubkey_y=pubkey_y, + raw_privkey=privkey, curve=CURVE) + + def get_pubkey(self): + return self.pubkey_x + self.pubkey_y + pubkey = property(get_pubkey) + + def get_privkey(self): + return self.privkey + +def ecdsa_recover(msg, sig): + return bitcoin.encode_pubkey(bitcoin.ecdsa_raw_recover( + bitcoin.electrum_sig_hash(msg), + bitcoin.decode_sig(sig)), + 'bin_electrum') + +def sha3(seed): + return sha3_256(seed).digest() + + +def mk_privkey(seed): + return sha3(seed) + + +def privtopub(privkey): + r = bitcoin.encode_pubkey(bitcoin.privtopub(privkey), 'bin_electrum') + assert len(r) == 64 + return r + +def encrypt(data, pubkey): + return pyelliptic.encrypt(data, pubkey, ephemcurve=CURVE, ciphername=CIPHERNAME) diff --git a/devp2p/encryption.py b/devp2p/encryption.py new file mode 100644 index 0000000..7a1122d --- /dev/null +++ b/devp2p/encryption.py @@ -0,0 +1,285 @@ +#!/usr/bin/python + +from crypto import privtopub, mk_privkey +from crypto import sha3 +from crypto import ECC +from crypto import encrypt +from crypto import ecdsa_recover +import pyelliptic +from pyethereum.rlp import big_endian_to_int as idec # integer decode +from pyethereum.rlp import int_to_big_endian as ienc # integer encode + +def sxor(s1,s2): + "string xor" + return ''.join(chr(ord(a) ^ ord(b)) for a,b in zip(s1,s2)) + + +class Transport(object): + + def __init__(self, sender, receiver): + assert isinstance(sender, Peer) + assert isinstance(receiver, Peer) + self.sender = sender + self.receiver = receiver + + def send(self, data): + self.receiver.receive(data) + + def receive(self, data): + self.sender.receive(data) + + + +class Peer(object): + "Peer carries the session with a connected remote node" + + def __init__(self, local_node, transport=None, receive_cb=None): + self.local_node = local_node + self.transport = transport + self.session = None + self.receive_cb = receive_cb + + def connect(self, remote_node): + self.session = RLPxSession(self) + self.session.send_authentication(remote_node) + + def send(self, data): + assert self.session + assert self.transport + self.transport.send(self.session.encode(data)) + + def receive(self, data): + if not self.session: + self.session = RLPxSession(self) + self.session.respond_authentication(data) + else: + data = self.session.decode(data) + if self.receive_cb: + self.receive_cb(self, data) + + +class LocalNode(object): + + def __init__(self, pubkey, privkey): + self.ecc = ECCx(pubkey=pubkey, privkey=privkey) + + @property + def pubkey(self): + return self.ecc.pubkey_x + self.ecc.pubkey_y + + def sign(self, data): + return self.ecc.sign(data) + + +class RemoteNode(object): + + def __init__(self, pubkey): + self.pubkey = pubkey + self.token = None + + + + + + +class RLPxSession(object): + + ephemeral_ecc = None + nonce = None + token = None + aes_secret = None + aes_enc = None + aes_dec = None + egress_mac = None + ingress_mac = None + remote_node = None + _authentication_sent = False + is_ready = False + + def __init__(self, peer): + # persisted peer data. keys are the nodeid + # session data + self.peer = peer + + def __repr__(self): + return '' % self.address.encode('hex') + + @property + def node(self): + return self.peer.local_node + + def encrypt(self, header, frame): + """ + header-mac: right128 of + egress-mac.update(aes(mac-secret,egress-mac)^header-ciphertext) + """ + assert self.is_ready is True + + def aes(data): + return self.aes_enc.update(data) + + def mac(data): + return self.egress_mac.update(data) + + # header + assert len(header) == 16 # zero padded to 16 bytes + header_ciphertext = aes(header) + assert len(header_ciphertext) <= 32 # must not be larger than mac + # FIXME mac-secret!? + header_mac = mac(sxor(aes(mac('')), header_ciphertext))[-16:] + # frame + frame_ciphertext = aes(frame) + frame_mac = self.egress_mac.update(frame_ciphertext) + return header_ciphertext + header_mac + frame_ciphertext + frame_mac + + + def decrypt(self, data): + assert self.is_ready is True + + def aes(data): + return self.aes_dec.update(data) + + def mac(data): + return self.egress_mac.update(data) + + header_ciphertext = data[:16] + header_mac = data[16:32] + + header = aes(header_ciphertext) + expected_header_mac = mac(sxor(aes(mac(''), header_ciphertext)))[-16:] + assert expected_header_mac == header_mac + + # FIXME check frame length in header + # assume datalen == framelen for now + frame_mac = self.egress_mac.update(frame_ciphertext) + data = aes(data[32:]) + + + + + + + def send_authentication(self, remote_node): + """ + 1. initiator generates ecdhe-random and nonce and creates auth + 2. initiator connects to remote and sends auth + + Handshake for connecting to Known Peer + eciesEncrypt(remote-pubk, sign(privkey, token^nonce) || 0x80 || ecdhe-random || nonce ) + + Handshake for connecting to New Peer + eciesEncrypt(remote-pubk, sign(privkey, nonce) || 0x80 || ecdhe-random || nonce ) + + The value 0x80 is a placeholder which maybe used in the future for versioning and/or + protocol handshake. + """ + self.ephemeral_ecc = ECCx() # FIXME, add seed + ecdhe_pubkey = self.ephemeral_ecc.get_pubkey() + assert len(ecdhe_pubkey) == 512/8 + token = remote_node.token + nonce = ienc(random.randint(0, 2**256-1)) + assert len(nonce) == 32 + token_or_nonce = token or nonce + signature = self.node.sign(ienc(token_or_nonce)) + assert len(signature) == 65 + payload = signature + '0x80' + ecdhe_pubkey + token_or_nonce + auth_message = crypto.encrypt(payload, remote.pubkey) + self.peer.send(auth_message) + self._authentication_sent = True + + + def receive_authentication(self, other, ciphertext): + """ + Verification (function, upon receive of PresetAuthentication): + 3. remote generates ecdhe-random and nonce and creates auth + 4. remote receives auth and decrypts (ECIES performs authentication before + decryption) + - If address is known, lookup token and public key to be authenticated + - derive signature-message = sha3(token || addr^addrRemote) + - success -> AcknowledgeAuthentication + 5. remote sends auth + 6. remote derives shared-secret, aes-secret, mac-secret, ingress-mac, egress-mac +""" + + # eciesEncrypt(remote-pubk, sign(privkey, token^nonce) || 0x80 || ecdhe-random || nonce ) + + data = self.node.decrypt(ciphertext) + assert len(data) == 64 + 1 + 64 + 32 + signature = data[:65] + assert data[65] == '0x80' + remote_ecdhe_pubkey = data[65:65+64] + token_or_nonce = idec(data[-32:]) + + # verify signature + if not self.node.verify(signature, token_or_nonce): + return self.disconnect() + + # recover remote pubkey + remote_pubkey = ecdsa_recover(token_or_nonce, signature) + + # lookup pubkey and related token + token_database = dict() # FIXME + token = token_database.get(remote_pubkey, None) + if token and token != token_or_nonce: + # something fishy + # FIXME reset node reputation + pass + + remote_nonce = token_or_nonce + + # send authentication if not yet + if not self._authentication_sent: + remote_node = RemoteNode(remote_pubkey) # FIXME LOOKUP + self.send_authentication(remote_node) + + # - success -> AcknowledgeAuthentication + self.acknowledge_authentication(other, remote_pubkey, remote_ecdhe_pubkey) + + # ecdhe_shared_secret = ecdh.agree(ecdhe-random, ecdhe-random-public) + # Compute public key with the local private key and return a 512bits shared key + ecdhe_shared_secret = self.ephemeral_ecc.get_ecdh_key(remote_pubkey) + ecdhe_pubkey = ephemeral_ecc.get_pubkey() + # shared-secret = sha3(ecdhe-shared-secret || sha3(nonce || remote-nonce)) + shared_secret = sha3(ecdhe_shared_secret + sha3(ienc(self.nonce) + ienc(remote_nonce))) + + self.aes_secret = sha3_256(ecdhe_shared_secret + shared_secret) + self.mac_secret = sha3_256(ecdhe_shared_secret + self.aes_secret) + # egress-mac = sha3(mac-secret^nonce || auth) + self.egress_mac = sha3_256(sxor(self.mac_secret, self.nonce) + ciphertext) + # ingress-mac = sha3(mac-secret^remote-nonce || auth) + self.ingress_mac = sha3_256(sxor(self.mac_secret, remote_nonce) + ciphertext) + self.token = sha3(shared_secret) + + iv = pyelliptic.Cipher.gen_IV('aes-256-ctr') + self.aes_enc = pyelliptic.Cipher(self.aes_secret, iv, 1, ciphername='aes-256-ctr') + self.aes_dec = pyelliptic.Cipher(self.aes_secret, iv, 0, ciphername='aes-256-ctr') + + self.is_ready = True + + + +def main(): + alice_privkey = mk_privkey('secret1') + alice_pubkey = privtopub(alice_privkey) + alice = LocalNode(alice_pubkey, alice_privkey) + + bob_privkey = mk_privkey('secret2') + bob_pubkey = privtopub(bob_privkey) + bob = LocalNode(bob_pubkey, bob_privkey) + + def _receivecb(peer, data): + print peer, data + + # alice knows bob and connects + node_bob = RemoteNode(bob_pubkey) + peer_alice = Peer(alice, transport=None, _receivecb) + peer_bob = Peer(bob, transport=None, _receivecb) + peer_alice.transport = Transport(sender=peer_alice, receiver=peer_bob) + peer_bob.transport = Transport(sender=peer_bob, receiver=peer_alice) + + peer_alice.connect(peer_bob) + + +if __name__ == '__main__': + main() + diff --git a/devp2p/jsonrpc.py b/devp2p/jsonrpc.py new file mode 100644 index 0000000..32aa6b5 --- /dev/null +++ b/devp2p/jsonrpc.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import gevent +import gevent.wsgi +import gevent.queue +from tinyrpc.protocols.jsonrpc import JSONRPCProtocol +from tinyrpc.transports.wsgi import WsgiServerTransport +from tinyrpc.server.gevent import RPCServerGreenlets +from tinyrpc.dispatch import RPCDispatcher +from service import BaseService +import slogging +log = slogging.get_logger('jsonrpc') + + +class JSONRPCServer(BaseService): + + name = 'jsonrpc' + + def __init__(self, app): + log.debug('initializing JSONRPCServer') + BaseService.__init__(self, app) + self.app = app + self.dispatcher = RPCDispatcher() + transport = WsgiServerTransport(queue_class=gevent.queue.Queue) + + # start wsgi server as a background-greenlet + self.wsgi_server = gevent.wsgi.WSGIServer(('127.0.0.1', 5000), transport.handle) + + self.rpc_server = RPCServerGreenlets( + transport, + JSONRPCProtocol(), + self.dispatcher + ) + + def _run(self): + log.info('starting JSONRPCServer') + # in the main greenlet, run our rpc_server + self.wsgi_thread = gevent.spawn(self.wsgi_server.serve_forever) + self.rpc_server.serve_forever() + + def add_method(self, func, name=None): + self.dispatcher.add_method(func, name) + + def stop(self): + log.info('stopping JSONRPCServer') + self.wsgi_thread.kill() + +if __name__ == '__main__': + + def reverse_string(s): + return s[::-1] + + server = JSONRPCServer(app=None) + server.add_method(reverse_string) + server.start() +# server._run() + e = gevent.event.Event() + e.wait() diff --git a/devp2p/multiplexer.py b/devp2p/multiplexer.py new file mode 100644 index 0000000..26aec8c --- /dev/null +++ b/devp2p/multiplexer.py @@ -0,0 +1,519 @@ +from gevent.queue import Queue +from collections import OrderedDict +from pyethereum import rlp +import struct + +""" +Questions: + packet_type length and endcoding (bigendian signed?) : preliminary YES + is data in the payload really wrapped into rlp? : preliminary YES + how is a packet build (cmd_id, rlp(payload))? : preliminary Packet + total-packet-size == total_payload_size ??? : preliminary YES + +Improvements: + use memoryview + tests +""" + + + +class Frame(object): + """ + When sending a packet over RLPx, the packet will be framed. + The frame provides information about the size of the packet and the packet's + source protocol. There are three slightly different frames, depending on whether + or not the frame is delivering a multi-frame packet. A multi-frame packet is a + packet which is split (aka chunked) into multiple frames because it's size is + larger than the protocol window size (pws; see Multiplexing). When a packet is + chunked into multiple frames, there is an implicit difference between the first + frame and all subsequent frames. + Thus, the three frame types are + normal, chunked-0 (first frame of a multi-frame packet), + and chunked-n (subsequent frames of a multi-frame packet). + + + Single-frame packet: + header || header-mac || frame || mac + + Multi-frame packet: + header || header-mac || frame-0 || + [ header || header-mac || frame-n || ... || ] + header || header-mac || frame-last || mac + """ + + header_size = 16 + header_mac_size = 16 + payload_mac_size = 32 + frame_base_size = header_size + header_mac_size + payload_mac_size + is_chunked_0 = False + total_payload_size = None # only used with chunked_0 + + def __init__(self, protocol_id, cmd_id, payload, sequence_id, window_size, + is_chunked_n=False, frames=None): + assert isinstance(window_size, int) + assert isinstance(cmd_id, int) and cmd_id < 256 + self.cmd_id = cmd_id + self.payload = payload + self.frames = frames or [] + assert protocol_id < 2**16 + self.protocol_id = protocol_id + assert sequence_id is None or sequence_id < 2**16 + self.sequence_id = sequence_id + self.is_chunked_n = is_chunked_n + self.frames.append(self) + + # chunk payloads resulting in frames exceeding window_size + fs = self.frame_size() + if fs > window_size: + if not is_chunked_n: + self.is_chunked_0 = True + self.total_payload_size = len(payload) + # chunk payload + self.payload = payload[:window_size-fs] + remain = payload[len(self.payload):] + assert len(remain) + len(self.payload) == len(payload) + assert self.frame_size() <= window_size + Frame(protocol_id, cmd_id, remain, sequence_id+1, window_size, + is_chunked_n=True, + frames=self.frames) + + def __repr__(self): + return '' % \ + (self._frame_type(), self.frame_size(), self.sequence_id) + + def _frame_type(self): + return 'normal' * self.is_normal or 'chunked_0' * self.is_chunked_0 or 'chunked_n' + + def frame_size(self, data_len=0): + # header16 || mac16 || dataN || mac32 + return self.frame_base_size + (data_len or len(self.body)) + + @property + def is_normal(self): + return not self.is_chunked_n and not self.is_chunked_0 + + @property + def header(self): + """ + header: frame-size || header-data || padding + frame-size: 3-byte integer size of frame, big endian encoded + header-data: + normal, chunked-n: rlp.list(protocol-type[, sequence-id]) + chunked-0: rlp.list(protocol-type, sequence-id, total-packet-size) + values: + protocol-type: < 2**16 + sequence-id: < 2**16 (this value is optional for normal frames) + total-packet-size: < 2**32 + padding: zero-fill to 16-byte boundary + """ + def i16(_int): + return struct.pack('>I', _int)[2:] + + assert self.protocol_id < 2**16 + assert self.sequence_id is None or self.sequence_id < 2**16 + l = [i16(self.protocol_id)] + if self.is_chunked_0: + assert self.sequence_id is not None + l.append(i16(self.sequence_id)) + l.append(struct.pack('>I', self.total_payload_size)) + elif self.sequence_id: # normal, chunked_n + l.append(i16(self.sequence_id)) + header_data = rlp.encode(l) + frame_size = self.frame_size() + assert frame_size < 256**3 + header = struct.pack('>I', frame_size)[1:] + header_data + header += '\x00' * (self.header_size - len(header)) + return header + + @property + def body(self): + """ + packet: + normal, chunked-0: rlp(packet-type) [|| rlp(packet-data)] || padding + chunked-n: packet-data || padding + + padding: zero-fill to 16-byte boundary + + Q: rlp(data) or rlp_data + """ + b = '' + # packet-type + if not self.is_chunked_n: + b += rlp.encode(struct.pack("B", self.cmd_id)) # unsigned byte + # payload + b += rlp.encode(self.payload) + else: + b += self.payload + # padding + if len(b) % 16: + b += '\0' * (1 - len(b) % 16) + return b + + def get_frames(self): + return self.frames + + def to_string(self): + return self.header + self.body + + +class Packet(object): + """ + Packets are emitted and received by subprotocols + """ + + def __init__(self, protocol_id=0, cmd_id=0, payload='', prioritize=False): + self.protocol_id = protocol_id + self.cmd_id = cmd_id + self.payload = payload + self.prioritize = prioritize + + def __repr__(self): + return 'Packet(%r)' % dict(protocol_id=self.protocol_id, + cmd_id=self.cmd_id, + payload_len=len(self.payload), + prioritize=self.prioritize) + + def __eq__(self, other): + s = dict(self.__dict__) + s.pop('prioritize') + o = dict(other.__dict__) + o.pop('prioritize') + return s == o + + +class Multiplexer(object): + """ + Multiplexing of protocols is performed via dynamic framing and fair queueing. + Dequeuing packets is performed in a cycle which dequeues one or more packets + from the queue(s) of each active protocol. The multiplexor determines the + amount of bytes to send for each protocol prior to each round of dequeuing packets. + + If the size of an RLP-encoded packet is less than 1 KB then the protocol may + request that the network layer prioritize the delivery of the packet. + This should be used if and only if the packet must be delivered before all other packets. + The network layer maintains two queues and three buffers per protocol: + a queue for normal packets, a queue for priority packets, + a chunked-frame buffer, a normal-frame buffer, and a priority-frame buffer. + + + Implemented Variant: + + each sub protocol has three queues + prio + normal + chunked + + protocols are queried round robin + + """ + + max_window_size = 8*1024 + max_priority_frame_size = 1024 + + def __init__(self): + self.queues = OrderedDict() # protocol_id : dict(normal=queue, chunked=queue, prio=queue) + self.sequence_id = 0 + self.last_protocol = None # last protocol, which sent data to the buffer + self.chunked_buffers = dict() # decode: next_expected_sequence_id > buffer + + + @property + def num_active_protocols(self): + "A protocol is considered active if it's queue contains one or more packets." + return sum(1 for p_id in self.queues if self.is_active_protocol(p_id)) + + def is_active_protocol(self, protocol_id): + return True if sum(q.qsize() for q in self.queues[protocol_id].values()) else False + + def protocol_window_size(self, protocol_id=None): + """ + pws = protocol-window-size = window-size / active-protocol-count + initial pws = 8kb + """ + if protocol_id and not self.is_active_protocol(protocol_id): + return self.max_window_size / (1 + self.num_active_protocols) + else: + return self.max_window_size / max(1, self.num_active_protocols) + + def add_protocol(self, protocol_id): + assert protocol_id not in self.queues + self.queues[protocol_id] = dict(normal=Queue(), + chunked=Queue(), + priority=Queue()) + self.last_protocol = protocol_id + + @property + def next_protocol(self): + protocols = self.queues.keys() + if self.last_protocol == protocols[-1]: + next_protocol = protocols[0] + else: + next_protocol = protocols[protocols.index(self.last_protocol) + 1] + self.last_protocol = next_protocol + return next_protocol + + def add_packet(self, packet): + #protocol_id, cmd_id, rlp_data, prioritize=False + frames = Frame(packet.protocol_id, packet.cmd_id, packet.payload, + sequence_id=self.sequence_id, + window_size=self.protocol_window_size(packet.protocol_id) + ).frames + self.sequence_id = frames[-1].sequence_id + 1 + queues = self.queues[packet.protocol_id] + if packet.prioritize: + assert len(frames) == 1 + assert frames[0].frame_size() <= self.max_priority_frame_size + queues['priority'].put(frames[0]) + elif len(frames) == 1: + queues['normal'].put(frames[0]) + else: + for f in frames: + queues['chunked'].put(f) + + + def pop_frames_for_protocol(self, protocol_id): + """ + If priority packet and normal packet exist: + send up to pws/2 bytes from each (priority first!) + else if priority packet and chunked-frame exist: + send up to pws/2 bytes from each + else + if normal packet and chunked-frame exist: send up to pws/2 bytes from each + else + read pws bytes from active buffer + + If there are bytes leftover -- for example, if the bytes sent is < pws, + then repeat the cycle. + """ + + pws = self.protocol_window_size() + queues = self.queues[protocol_id] + frames = [] + # size = lambda: + size = 0 + while size < pws: + frames_added = 0 + for qn in ('priority', 'normal', 'chunked'): + q = queues[qn] + if q.qsize(): + fs = q.peek().frame_size() + if size + fs <= pws: + frames.append(q.get()) + size += fs + frames_added += 1 + # add no more than two in order to send normal and priority first + if frames_added == 2: + break # i.e. next is 'priority' again + # empty queues + if frames_added == 0: + break + # the following can not be guaranteed, as pws might have been different + # at the time where packets were framed and added to the queues + # assert sum(f.frame_size() for f in frames) <= pws + return frames + + def pop_frames(self): + protocols = self.queues.keys() + idx = protocols.index(self.next_protocol) + protocols = protocols[idx:] + protocols[:idx] + assert len(protocols) == len(self.queues.keys()) + for p in protocols: + frames = self.pop_frames_for_protocol(p) + if frames: + return frames + return [] + + def pop_all_frames(self): + frames = [] + while True: + r = self.pop_frames() + frames.extend(r) + if not r: + break + return frames + + def pop_all_frames_as_bytes(self): + return ''.join(f.to_string() for f in self.pop_all_frames()) + + def decode_frame(self, buffer): + """ + w/o encryption + peak into buffer for frame_size + + return None if buffer is not long enough to decode frame + """ + + if len(buffer) < Frame.header_size: + return None, buffer + + def d16(data): + return struct.unpack('>I', '\x00\x00' + data)[0] + + def garbage_collect(protocol_id): + """ + chunked packets of a sub protocol are send in order + thus if a new frame_0 of a subprotocol is received others must be removed + """ + for sid, packet in self.chunked_buffers.items(): + if packet.protocol_id == protocol_id: + del self.chunked_buffers[sid] + + # header: frame-size || header-data || padding + # frame-size: 3-byte integer size of frame, big endian encoded + frame_size = struct.unpack('>I', '\x00' + buffer[:3])[0] + + # FIXME: frames are calculated with MACs, which we don't have yet + real_no_mac_frame_size = frame_size - 16 - 32 + remain = buffer[real_no_mac_frame_size:] + if len(buffer) < real_no_mac_frame_size: + return None, buffer + buffer = buffer[:real_no_mac_frame_size] + # END FIXME + + header_data = rlp.decode(buffer[3:Frame.header_size]) + # normal, chunked-n: rlp.list(protocol-type[, sequence-id]) + # chunked-0: rlp.list(protocol-type, sequence-id, total-packet-size) + + if len(header_data) == 3: + chunked_0 = True + # total-packet-size: < 2**32 + total_payload_size = struct.unpack('>I', header_data[2])[0] + else: + chunked_0 = False + total_payload_size = None + + # protocol-type: < 2**16 + protocol_id = d16(header_data[0]) + # sequence-id: < 2**16 (this value is optional for normal frames) + if len(header_data) > 1: + sequence_id = d16(header_data[1]) + else: + sequence_id = None + + # build packet + body_offset = Frame.header_size + if sequence_id in self.chunked_buffers: + # body chunked-n: packet-data || padding + packet = self.chunked_buffers.pop(sequence_id) + packet.payload += buffer[body_offset:] + if packet.total_payload_size == len(packet.payload): + del packet.total_payload_size + return packet, remain + self.chunked_buffers[sequence_id + 1] = packet + else: + # body normal, chunked-0: rlp(packet-type) [|| rlp(packet-data)] || padding + cmd_id = rlp.big_endian_to_int(rlp.decode(buffer[body_offset])) + packet = Packet(protocol_id=protocol_id, + cmd_id=cmd_id, + payload=rlp.decode(buffer[body_offset+1:])) + if chunked_0: + garbage_collect(protocol_id) + assert sequence_id + packet.total_payload_size = total_payload_size + self.chunked_buffers[sequence_id + 1] = packet + else: # normal + return packet, remain + return None, remain # for chunked, not finished data + + def decode_frames(self, buffer): + packets = [] + remain = last_remain = buffer + while True: + packet, remain = self.decode_frame(remain) + if packet: + packets.append(packet) + elif remain == last_remain: + break + last_remain = remain + return packets, remain + + +if __name__ == '__main__': + import sys + + mux = Multiplexer() + p0, p1, p2 = 0, 1, 2 + mux.add_protocol(p0) + mux.add_protocol(p1) + mux.add_protocol(p2) + + assert mux.next_protocol == p0 + assert mux.next_protocol == p1 + assert mux.next_protocol == p2 + assert mux.next_protocol == p0 + + assert mux.pop_frames() == [] + assert mux.num_active_protocols == 0 + + def pws(): + print 'pws', mux.protocol_window_size(), 'n active', mux.num_active_protocols + + + # test normal packet + packet0 = Packet(p0, cmd_id=0, payload='\x00' * 100) + + mux.add_packet(packet0) + assert mux.num_active_protocols == 1 + + frames = mux.pop_frames() + assert len(frames) == 1 + f = frames[0] + assert len(f.to_string()) == f.frame_size() - 32 - 16 + + mux.add_packet(packet0) + assert mux.num_active_protocols == 1 + message = mux.pop_all_frames_as_bytes() + packets, remain = mux.decode_frames(message) + assert packets[0] == packet0 + + # nothing left to pop + assert len(mux.pop_frames()) == 0 + + packet1 = Packet(p1, cmd_id=0, payload='\x00' * mux.max_window_size * 2) + mux.add_packet(packet1) + + # decode packets from buffer + message = mux.pop_all_frames_as_bytes() + packets, remain = mux.decode_frames(message) + assert packets[0].payload == packet1.payload + assert packets[0] == packet1 + assert len(packets) == 1 + + # mix packet types + packet2 = Packet(p0, cmd_id=0, payload='\x00' * 200, prioritize=True) + mux.add_packet(packet1) + mux.add_packet(packet0) + mux.add_packet(packet2) + message = mux.pop_all_frames_as_bytes() + packets, remain = mux.decode_frames(message) + assert packets == [packet2, packet0, packet1] + + # packets with different protocols + packet3 = Packet(p1, cmd_id=0, payload='\x00' * 3000, prioritize=False) + mux.add_packet(packet1) + mux.add_packet(packet0) + mux.add_packet(packet2) + mux.add_packet(packet3) + mux.add_packet(packet3) + mux.add_packet(packet3) + assert mux.next_protocol == p0 + # thus next with data is p1 w/ packet3 + message = mux.pop_all_frames_as_bytes() + packets, remain = mux.decode_frames(message) + assert packets == [packet3, packet2, packet0, packet3, packet3, packet1] + + # test buffer remains, incomplete frames + packet1 = Packet(p1, cmd_id=0, payload='\x00' * 100) + mux.add_packet(packet1) + message = mux.pop_all_frames_as_bytes() + tail = message[:50] + message += tail + packets, remain = mux.decode_frames(message) + assert packets[0] == packet1 + assert len(packets) == 1 + assert len(remain) == len(tail) + + # test buffer decode with invalid data + message = message[1:] + packets, remain = mux.decode_frames(message) + diff --git a/devp2p/peer.py b/devp2p/peer.py new file mode 100644 index 0000000..f99fea7 --- /dev/null +++ b/devp2p/peer.py @@ -0,0 +1,131 @@ +import gevent +from collections import OrderedDict +from protocol import BaseProtocol +from protocol import decode_packet_header, header_length +import slogging + +log = slogging.get_logger('peer') + + +class QueueWorker(gevent.Greenlet): + # FIXME we need to queue send messages + def __init__(self, queue): + self.queue = queue + super(QueueWorker, self).__init__() + + def _run(self): + self.running = True + while self.running: + msg = self.queue.get() # block call + print('queue:', msg) + + +class Peer(gevent.Greenlet): + """ + After creation: + register peer protocol + send hello & encryption + receive hello & derive session key + register in common protocols + + receive data + decrypt, check auth + decode packet id + lookup handling protocol + pass packet to protocol + + send packet + encrypt + """ + + def __init__(self, peermanager, connection): + super(Peer, self).__init__() + log.debug('peer init', peer=self) + self.peermanager = peermanager + self.connection = connection + self.protocols = OrderedDict() + self._buffer = '' + # learned and set on handshake + self.nodeid = None + self.client_version = None + self.listen_port = None + + def __repr__(self): + return '' % (self.connection.getpeername(), id(gevent.getcurrent())) + + @property + def ip_port(self): + return self.connection.getpeername() + + # protocols + def register_protocol(self, protocol): + """ + registeres protocol with peer, which will be accessible as + peer. (e.g. peer.p2p or peer.eth) + """ + assert isinstance(protocol, BaseProtocol) + assert protocol.name not in self.protocols + log.debug('registering protocol', protocol=protocol.name, peer=self) + self.protocols[protocol.name] = protocol + setattr(self.protocols, protocol.name, protocol) + + def deregister_protocol(self, protocol): + assert isinstance(protocol, BaseProtocol) + del self.protocols[protocol.name] + delattr(self.protocols, protocol.name) + + def protocol_by_cmd_id(self, cmd_id): + max_id = 0 + for p in self.protocols.values(): + max_id += len(p.cmd_map) + if cmd_id < max_id: + return p + raise Exception('no protocol for id %s' % cmd_id) + + def has_protocol(self, name): + assert isinstance(name, str) + return hasattr(self.protocol, name) + + # receiving p2p mesages + + def _handle_packet(self, cmd_id, payload): + log.debug('handling packet', cmd_id=cmd_id, peer=self) + protocol = self.protocol_by_cmd_id(cmd_id) + protocol.handle_message(cmd_id, payload) + + def _data_received(self, data): + # use memoryview(string)[offset:] instead copying data + self._buffer += data + while len(self._buffer): + # read packets from buffer + payload_len, cmd_id = decode_packet_header(self._buffer) + # check if we have a complete message + if len(self._buffer) >= payload_len + header_length: + payload = self._buffer[header_length:payload_len + header_length] + self._buffer = self._buffer[payload_len + header_length:] + self._handle_packet(cmd_id, payload) + else: + break + + def send(self, data): + log.debug('send', size=len(data)) + self.connection.sendall(data) + log.debug('send sent', size=len(data)) + + def _run(self): + while True: + log.debug('loop_socket.wait', peer=self) + data = self.connection.recv(4096) + log.debug('loop_socket.received', size=len(data), peer=self) + if not data: + log.debug('loop_socket.not_data', peer=self) + self.stop() + break + self._data_received(data) + + def stop(self): + log.debug('stopped', thread=gevent.getcurrent()) + for p in self.protocols.values(): + p.stop() + self.peermanager.peers.remove(self) + self.kill() diff --git a/devp2p/peermanager.py b/devp2p/peermanager.py new file mode 100644 index 0000000..75ac2c4 --- /dev/null +++ b/devp2p/peermanager.py @@ -0,0 +1,92 @@ +import gevent +import socket +from gevent.server import StreamServer +from gevent.socket import create_connection +from service import BaseService +from protocol import P2PProtocol +from peer import Peer + +import slogging +log = slogging.get_logger('peermgr') + + +class PeerManager(BaseService): + """ + todo: + on peer Hello adds services to peer + connects new peers if there are too few + selects peers based on a DHT + keeps track of peer reputation + saves/loads peers to disc + + """ + name = 'peermanager' + + def __init__(self, app): + BaseService.__init__(self, app) + log.info('PeerManager init') + self.peers = [] + + def __repr__(self): + return '' + + def on_hello_received(self, p2pprotocol, data): + log.debug('hello_reveived', peer=p2pprotocol.peer) + # register more protocols + + def _start_peer(self, connection, address, is_inititator=False): + log.debug('new connect', connection=connection) + # create peer + peer = Peer(self, connection) + log.debug('created new peer', peer=peer) + + # register p2p protocol + p2pprotocol = P2PProtocol(peer, cmd_offset=0, is_inititator=is_inititator) + peer.register_protocol(p2pprotocol) + self.peers.append(peer) + + # loop + peer.start() + log.debug('peer started', peer=peer) + + def connect(self, address): + log.debug('connecting', address=address) + """ + gevent.socket.create_connection(address, timeout=Timeout, source_address=None) + Connect to address (a 2-tuple (host, port)) and return the socket object. + Passing the optional timeout parameter will set the timeout + getdefaulttimeout() is default + """ + connection = create_connection(address) + self._start_peer(connection, address, is_inititator=True) + + def _bootstrap(self): + host = self.app.config.get('p2p', 'bootstrap_host', None) + if host: + log.info('connecting bootstrap server') + port = self.app.config.getint('p2p', 'bootstrap_port') + try: + self.connect((host, port)) + except socket.error: + log.warn('connecting bootstrap server failed') + + def start(self): + log.info('starting peermanager') + # start a listening server + host = self.app.config.get('p2p', 'listen_host') + port = self.app.config.getint('p2p', 'listen_port') + log.info('starting listener', host=host, port=port) + self.server = StreamServer((host, port), handle=self._start_peer) + self.server.start() + self._bootstrap() + super(PeerManager, self).start() + + def _run(self): + evt = gevent.event.Event() + evt.wait() + + def stop(self): + log.info('stopping peermanager') + self.server.stop() + for peer in self.peers: + peer.stop() diff --git a/devp2p/protocol.py b/devp2p/protocol.py new file mode 100644 index 0000000..153b48a --- /dev/null +++ b/devp2p/protocol.py @@ -0,0 +1,318 @@ +import time +import gevent +import struct +import json +from gevent import Greenlet +from pyethereum.utils import big_endian_to_int as idec +import serialization +import slogging +log = slogging.get_logger('protocol.p2p').warn + +# packetize + +header_length = 5 + + +def decode_packet_header(message): + header = message[:header_length] + payload_len, cmd_id = struct.unpack('>BL', header) + return payload_len, cmd_id + + +def encode_packet(cmd_id, data): + payload = json.dumps(data) + header = struct.pack('>BL', len(payload), cmd_id) + assert len(header) == header_length + return header + payload + + +class BaseProtocol(object): + """ + Component which translates between + messages from the p2p wire + and services + + Keeps necessary state for the peer + e.g. last ping, sent/received hashes, ... + + + """ + name = '' + cmd_map = {} # cmd_name: cmd_id + + def __init__(self, peer, cmd_offset=0): + self.peer = peer + self.cmd_offset = cmd_offset + self.cmd_map = dict((k, v + cmd_offset) for k, v in self.cmd_map.items()) + self.rev_cmd_map = dict((v, k) for k, v in self.cmd_map.items()) + + def handle_message(self, cmd_id, payload): + data = json.loads(payload) + cmd_name = 'receive_%s' % self.rev_cmd_map[cmd_id] + cmd = getattr(self, cmd_name) + cmd(data) + + def stop(self): + "called when peer disconnects, use to cleanup" + pass + + +class ETHProtocol(BaseProtocol): + name = 'eth' + cmd_map = dict(status=0) + status_sent = False + status_received = False + + def send_status(self): + data = dict(head_number=1, + eth_version=49) + packet = encode_packet(self.cmd_map['status'], data) + self.peer._send_packet(packet) + self.status_sent = True + + def receive_status(self, data): + # tell peermanager about spoken protocols + if not self.status_sent: + self.send_status() + self.status_received = True + + +class SHHProtocol(BaseProtocol): + name = 'shh' + cmd_map = dict(gossip=0) + + def send_gossip(self, gossip=''): + data = dict(gossip=gossip) + packet = encode_packet(self.cmd_map['gossip'], data) + self.peer._send_packet(packet) + + def receive_gossip(self, data): + pass + + +class ConnectionMonitor(Greenlet): + ping_interval = 1. + response_delay_threshold = 2. + max_samples = 10 + + def __init__(self, p2pprotocol): + Greenlet.__init__(self) + self.p2pprotocol = p2pprotocol + self.samples = [] + self.last_request = time.time() + self.last_response = time.time() + + def __repr__(self): + return '' + + def track_request(self): + self.last_request = time.time() + + def track_response(self): + self.last_response = time.time() + dt = self.last_response - self.last_request + self.samples.append(dt) + if len(self.samples) > self.max_samples: + self.samples.pop(0) + + @property + def last_response_elapsed(self): + return time.time() - self.last_response + + @property + def latency(self, num_samples=0): + if not self.samples: + return None + num_samples = min(num_samples or self.max_samples, len(self.samples)) + return sum(self.samples[:num_samples])/num_samples + + def _run(self): + log('p2p.peer.monitor.started', monitor=self) + while True: + log('p2p.peer.monitor.pinging', monitor=self) + self.p2pprotocol.send_ping() + gevent.sleep(self.ping_interval) + log('p2p.peer.monitor.latency', monitor=self, latency=self.latency) + if self.last_response_elapsed > self.response_delay_threshold: + log('p2p.peer.monitor.unresponsive_peer', monitor=self) + self.stop() + + def stop(self): + self.kill() + + +class P2PProtocol(BaseProtocol): + name = 'p2p' + version = 2 + + # IF CHANGED, DO: git tag 0.6. + CLIENT_VERSION = 'Ethereum(py)/%s/%s' % (sys.platform, '0.7.0') + # the node s Unique Identifier and is the 512-bit hash that serves to + # identify the node. + NODE_ID = sha3('test') # set in config + NETWORK_ID = 0 + SYNCHRONIZATION_TOKEN = 0x22400891 + + cmd_map = dict(hello=0, ping=1, pong=2, disconnect=3, getpeers=4, peers=5) + + def __init__(self, peer, cmd_offset, is_inititator=False): + """ + initiater sends initial hello + """ + super(P2PProtocol, self).__init__(peer, cmd_offset) + self.is_inititator = is_inititator + self.hello_received = False + self.connection_monitor = ConnectionMonitor(self) + self._handshake() + + def stop(self): + self.connection_monitor.stop() + + @property + def peermanager(self): + return self.peer.peermanager + + @property + def config(self): + return self.peermanager.app.config + + @property + def nodeid(self): + return self.config.get('network', 'node_id') + + def _handshake(self): + if self.is_inititator: + self.send_hello() + + def _send_packet(self, cmd_name, data): + assert isinstance(list, data) + cmd_id = self.cmd_map['cmd_name'] + self.cmd_offset + msg = serialization.Serializer.dump_packet([cmd_id] + data) + self.peer.send(msg) + + def send_ping(self): + log('p2p.send_ping', peer=self.peer) + self._send_packet('ping', []) + self.connection_monitor.track_request() + + def receive_ping(self, data): + log('p2p.receive_ping', peer=self.peer) + self.send_pong() + + def send_pong(self): + log('p2p.send_pong', peer=self.peer) + self._send_packet('pong') + + def receive_pong(self, data): + log('p2p.receive_pong', peer=self.peer) + self.connection_monitor.track_response() + + def send_disconnect(self, reason=''): + log('p2p.send_disconnect', peer=self.peer, reason=reason) + data = [] + if reason: + data.append(serialization.Serializer.disconnect_reasons_map[reason]) # FIXME + self._send_packet('disconnect', data) + self.peer.stop() + + def receive_disconnect(self, data): + reason = serialization.Serializer.disconnect_reasons_map_by_id[idec(data[0])] + log('p2p.receive_disconnect', peer=self.peer, reason=reason) + self.peer.stop() + + + def send_getpeers(self): + return self._send_packet('getpeers') + + def receive_getpeers(self): + self.send_peers() + + + def send_peers(self): + ''' + :param peers: a sequence of (ip, port, pid) + :return: None if no peers + ''' + data = [] + for peer in self.peermanager.peers: + ip, port = peer.ip_port + assert ip.count('.') == 3 + ip = ''.join(chr(int(x)) for x in ip.split('.')) + data.append([ip, port, peer.nodeid]) + return self._send_packet('peers', data) + + def receive_peers(self, data): + pass # FIXME + + + def send_hello(self): + """ + 0x01 Hello: [0x01: P, protocolVersion: P, clientVersion: B, [cap0: B, cap1: B, ...] + listenPort: P, id: B_64] + + protocolVersion: The underlying network protocol. 0 + clientVersion: The underlying client. A user-readable string. + capN: A peer-network capability code, readable ASCII and 3 letters. Currently only "eth + and "shh" are known. + listenPort: The port on which the peer is listening for an incoming connection. + id: The identity and public key of the peer. + """ + log('p2p.send_hello', peer=self.peer) + capabilities = [(p.name, p.version) for p in self.peer.protocols] + + data = [ + self.version, + self.CLIENT_VERSION, + capabilities, + self.config.getint('network', 'listen_port'), + self.nodeid + ] + self._send_packet('hello', data) + + + def _recv_hello(self, data): + log('p2p.receive_hello', peer=self.peer) + # 0x01 Hello: [0x01: P, protocolVersion: P, clientVersion: B, [cap0: B, + # cap1: B, ...], listenPort: P, id: B_64] + _decode = (idec, str, list, idec, str) + try: + data = [_decode[i](x) for i, x in enumerate(data)] + network_protocol_version, client_version = data[0], data[1] + capabilities, listen_port, node_id = data[2], data[3], data[4] + self.capabilities = [(p, ord(v)) for p, v in capabilities] + except (IndexError, ValueError) as e: + log('could not decode hello', peer=self, error=e) + return self.send_Disconnect(reason='Incompatible network protocols') + + assert node_id + if node_id == self.nodeid: + log.critical('connected myself') + return self.send_Disconnect(reason='Incompatible network protocols') + + self.capabilities = [(p, ord(v)) for p, v in capabilities] + log('received hello', + peer=self, + network_protocol_version=network_protocol_version, + node_id=node_id.encode('hex'), + client_version=client_version, + capabilities=self.capabilities) + + if network_protocol_version != self.version: + log('incompatible network protocols', + peer=self, + expected=self.version, + received=network_protocol_version) + return self.send_Disconnect(reason='Incompatible network protocols') + + self.hello_received = True + self.peer.client_version = client_version + self.peer.nodeid = node_id + self.peer.listen_port = listen_port # replace connection port with listen port + + if not self.is_inititator: + self.send_hello() + + # tell peermanager about spoken protocols + self.peer.peermanager.on_hello_received(self, data) + self.connection_monitor.start() + diff --git a/devp2p/serialization.py b/devp2p/serialization.py new file mode 100644 index 0000000..d131965 --- /dev/null +++ b/devp2p/serialization.py @@ -0,0 +1,101 @@ + +import sys +from pyethereum import rlp +from pyethereum.utils import big_endian_to_int as idec +from pyethereum.utils import int_to_big_endian4 as ienc4 +from pyethereum.utils import recursive_int_to_big_endian +from pyethereum.slogging import get_logger +log = get_logger('serialization') + + +def lrlp_decode(data): + "always return a list" + d = rlp.decode(data) + if isinstance(d, str): + d = [d] + return d + + +class Serializer(object): + + disconnect_reasons_map = dict(( + ('Disconnect requested', 0x00), + ('TCP sub-system error', 0x01), + ('Bad protocol', 0x02), + ('Useless peer', 0x03), + ('Too many peers', 0x04), + ('Already connected', 0x05), + ('Wrong genesis block', 0x06), + ('Incompatible network protocols', 0x07), + ('Client quitting', 0x08))) + + disconnect_reasons_map_by_id = \ + dict((v, k) for k, v in disconnect_reasons_map.items()) + + @classmethod + def packet_size(cls, packet): + return idec(packet[4:8]) + 8 + + @classmethod + def packet_cmd(cls, packet): + try: + v = idec(rlp.descend(packet[8:200], 0)) + except rlp.DecodingError: + v = -1 + return v + + @classmethod + def load_packet(cls, packet): + ''' + Though TCP provides a connection-oriented medium, Ethereum nodes + communicate in terms of packets. These packets are formed as a 4-byte + synchronisation token (0x22400891), a 4-byte "payload size", to be + interpreted as a big-endian integer and finally an N-byte + RLP-serialised data structure, where N is the aforementioned + "payload size". To be clear, the payload size specifies the number of + bytes in the packet ''following'' the first 8. + + :return: (success, result), where result should be None when fail, + and (header, payload_len, cmd, data) when success + ''' + header = idec(packet[:4]) + if header != cls.SYNCHRONIZATION_TOKEN: + return dict(error='check header failed, skipping message,' + 'sync token was hex: %s' % hex(header)) + + try: + payload_len = idec(packet[4:8]) + except Exception as e: + return dict(error=str(e)) + + if len(packet) < payload_len + 8: + return dict(error='packet wrong length') + + try: + payload = lrlp_decode(packet[8:8 + payload_len]) + except Exception as e: + return dict(error=str(e)) + + if (not len(payload)) or (idec(payload[0]) not in cls.cmd_map): + return dict(error='check cmd %r failed' % idec(payload[0])) + + cmd_id = idec(payload[0]) + remain = packet[8 + payload_len:] + return dict(header=header, + payload_len=payload_len, + cmd_id=cmd_id, + data=payload[1:], + remain=remain) + + @classmethod + def dump_packet(cls, data): + """ + 4-byte synchronisation token, (0x22400891), + a 4-byte "payload size", to be interpreted as a big-endian integer + an N-byte RLP-serialised data structure + """ + payload = rlp.encode(recursive_int_to_big_endian(data)) + packet = ienc4(cls.SYNCHRONIZATION_TOKEN) + packet += ienc4(len(payload)) + packet += payload + return packet diff --git a/devp2p/service.py b/devp2p/service.py new file mode 100644 index 0000000..7ac3e5e --- /dev/null +++ b/devp2p/service.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python +from gevent import Greenlet + + +class BaseService(Greenlet): + """ + service instances are added to the application under + app.services. + + app should be passed to the service in order to query other services + + services may be a greenlet or spawn greenlets. + both must implement a .stop() + if a services spawns additional greenlets, it's responsible to stop them. + """ + + name = '' + + def __init__(self, app): + Greenlet.__init__(self) + self.app = app + + def start(self): + Greenlet.start(self) + + def stop(self): + Greenlet.stop(self) + + @classmethod + def register_with_app(klass, app): + """ + services know best how to initiate themselfs. + create a service instance, propably based on + app.config and app.services + """ + app.register_service(klass(app)) diff --git a/devp2p/slogging.py b/devp2p/slogging.py new file mode 100644 index 0000000..e954668 --- /dev/null +++ b/devp2p/slogging.py @@ -0,0 +1,16 @@ +import logging +try: + from pyethereum.slogging import get_logger +except ImportError: + # patch logging to support kargs + _log_orig = logging.Logger._log + def _kargs_log(self, level, msg, args, exc_info=None, extra=None, **kargs): + msg += ' ' + ' '.join('%s=%r' % (k,v) for k,v in kargs.items()) + _log_orig(self, level, msg, args, exc_info, extra) + logging.Logger._log = _kargs_log + get_logger = logging.getLogger + +if __name__ == '__main__': + logging.basicConfig() + log = get_logger('test') + log.warn('miner.new_block', block_hash='abcdef123', nonce=2234231) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..edcc87e --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +wheel==0.23.0 \ No newline at end of file diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..0a8df87 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,2 @@ +[wheel] +universal = 1 \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100755 index 0000000..4c7698e --- /dev/null +++ b/setup.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + + +try: + from setuptools import setup +except ImportError: + from distutils.core import setup + + +readme = open('README.rst').read() +history = open('HISTORY.rst').read().replace('.. :changelog:', '') + +requirements = [ + # TODO: put package requirements here +] + +test_requirements = [ + # TODO: put package test requirements here +] + +setup( + name='devp2p', + version='0.0.1', + description='Python implementation of the Ethereum P2P stack', + long_description=readme + '\n\n' + history, + author='Heiko Heiko', + author_email='heiko@ethdev.com', + url='https://github.com/heikoheiko/pydevp2p', + packages=[ + 'devp2p', + ], + package_dir={'devp2p': + 'devp2p'}, + include_package_data=True, + install_requires=requirements, + license="BSD", + zip_safe=False, + keywords='devp2p', + classifiers=[ + 'Development Status :: 2 - Pre-Alpha', + 'Intended Audience :: Developers', + 'License :: OSI Approved :: BSD License', + 'Natural Language :: English', + "Programming Language :: Python :: 2", + 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.3', + 'Programming Language :: Python :: 3.4', + ], + test_suite='tests', + tests_require=test_requirements +) diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..e726ac8 --- /dev/null +++ b/tox.ini @@ -0,0 +1,9 @@ +[tox] +envlist = py27, py33, py34 + +[testenv] +setenv = + PYTHONPATH = {toxinidir}:{toxinidir}/pydevp2p +commands = python setup.py test +deps = + -r{toxinidir}/requirements.txt