diff --git a/README.md b/README.md index d96fcfa..cc55991 100644 --- a/README.md +++ b/README.md @@ -38,11 +38,14 @@ js-libp2p-floodsub const FloodSub = require('libp2p-floodsub') const registrar = { - register: (multicodec, handlers) => { + handle: (multicodecs, handle) => { // register multicodec to libp2p + // handle function is called everytime a remote peer opens a stream to the peer. + }, + register: (multicodecs, handlers) => { // handlers will be used to notify pubsub of peer connection establishment or closing }, - unregister: (multicodec) => { + unregister: (id) => { } } diff --git a/package.json b/package.json index 26d1ab8..03a762b 100644 --- a/package.json +++ b/package.json @@ -50,13 +50,14 @@ "dirty-chai": "^2.0.1", "it-pair": "^1.0.0", "lodash": "^4.17.15", - "multiaddr": "^6.1.0", + "multiaddr": "^7.1.0", "p-defer": "^3.0.0", "peer-id": "~0.13.3", "peer-info": "~0.17.0", "sinon": "^7.5.0" }, "dependencies": { + "async.nexttick": "^0.5.2", "debug": "^4.1.1", "it-length-prefixed": "^2.0.0", "it-pipe": "^1.0.1", diff --git a/src/index.js b/src/index.js index b274517..62f7dbf 100644 --- a/src/index.js +++ b/src/index.js @@ -10,6 +10,7 @@ const pipe = require('it-pipe') const lp = require('it-length-prefixed') const pMap = require('p-map') const TimeCache = require('time-cache') +const nextTick = require('async.nexttick') const PeerInfo = require('peer-info') const BaseProtocol = require('libp2p-pubsub') @@ -27,6 +28,7 @@ class FloodSub extends BaseProtocol { /** * @param {PeerInfo} peerInfo instance of the peer's PeerInfo * @param {Object} registrar + * @param {function} registrar.handle * @param {function} registrar.register * @param {function} registrar.unregister * @param {Object} [options] @@ -37,6 +39,7 @@ class FloodSub extends BaseProtocol { assert(PeerInfo.isPeerInfo(peerInfo), 'peer info must be an instance of `peer-info`') // registrar handling + assert(registrar && typeof registrar.handle === 'function', 'a handle function must be provided in registrar') assert(registrar && typeof registrar.register === 'function', 'a register function must be provided in registrar') assert(registrar && typeof registrar.unregister === 'function', 'a unregister function must be provided in registrar') @@ -77,9 +80,10 @@ class FloodSub extends BaseProtocol { * @override * @param {PeerInfo} peerInfo peer info * @param {Connection} conn connection to the peer + * @returns {Promise} */ - _onPeerConnected (peerInfo, conn) { - super._onPeerConnected(peerInfo, conn) + async _onPeerConnected (peerInfo, conn) { + await super._onPeerConnected(peerInfo, conn) const idB58Str = peerInfo.id.toB58String() const peer = this.peers.get(idB58Str) @@ -105,7 +109,7 @@ class FloodSub extends BaseProtocol { await pipe( conn, lp.decode(), - async function collect (source) { + async function (source) { for await (const data of source) { const rpc = Buffer.isBuffer(data) ? data : data.slice() @@ -209,7 +213,7 @@ class FloodSub extends BaseProtocol { /** * Unmounts the floodsub protocol and shuts down every connection * @override - * @returns {Promise} + * @returns {Promise} */ async stop () { await super.stop() @@ -222,7 +226,7 @@ class FloodSub extends BaseProtocol { * @override * @param {Array|string} topics * @param {Array|any} messages - * @returns {Promise} + * @returns {Promise} */ async publish (topics, messages) { assert(this.started, 'FloodSub is not started') @@ -267,10 +271,10 @@ class FloodSub extends BaseProtocol { assert(this.started, 'FloodSub is not started') topics = ensureArray(topics) - topics.forEach((topic) => this.subscriptions.add(topic)) this.peers.forEach((peer) => sendSubscriptionsOnceReady(peer)) + // make sure that FloodSub is already mounted function sendSubscriptionsOnceReady (peer) { if (peer && peer.isWritable) { @@ -303,6 +307,8 @@ class FloodSub extends BaseProtocol { function checkIfReady (peer) { if (peer && peer.isWritable) { peer.sendUnsubscriptions(topics) + } else { + nextTick(checkIfReady.bind(peer)) } } } diff --git a/test/2-nodes.spec.js b/test/2-nodes.spec.js index 34b2f1a..456d0e7 100644 --- a/test/2-nodes.spec.js +++ b/test/2-nodes.spec.js @@ -9,15 +9,17 @@ const expect = chai.expect const pDefer = require('p-defer') const times = require('lodash/times') -const DuplexPair = require('it-pair/duplex') const FloodSub = require('../src') const { multicodec } = require('../src') -const { first, createPeerInfo, expectSet } = require('./utils') - -const defOptions = { - emitSelf: true -} +const { + defOptions, + first, + createPeerInfo, + createMockRegistrar, + expectSet, + ConnectionPair +} = require('./utils') function shouldNotHappen (_) { expect.fail() @@ -31,15 +33,6 @@ describe('basics between 2 nodes', () => { const registrarRecordA = {} const registrarRecordB = {} - const registrar = (registrarRecord) => ({ - register: (multicodecs, handlers) => { - registrarRecord[multicodecs[0]] = handlers - }, - unregister: (multicodecs) => { - delete registrarRecord[multicodecs[0]] - } - }) - // Mount pubsub protocol before(async () => { [peerInfoA, peerInfoB] = await Promise.all([ @@ -47,8 +40,8 @@ describe('basics between 2 nodes', () => { createPeerInfo() ]) - fsA = new FloodSub(peerInfoA, registrar(registrarRecordA), defOptions) - fsB = new FloodSub(peerInfoB, registrar(registrarRecordB), defOptions) + fsA = new FloodSub(peerInfoA, createMockRegistrar(registrarRecordA), defOptions) + fsB = new FloodSub(peerInfoB, createMockRegistrar(registrarRecordB), defOptions) expect(fsA.peers.size).to.be.eql(0) expect(fsA.subscriptions.size).to.eql(0) @@ -63,14 +56,19 @@ describe('basics between 2 nodes', () => { ])) // Connect floodsub nodes - before(() => { + before(async () => { const onConnectA = registrarRecordA[multicodec].onConnect - const onConnectB = registrarRecordB[multicodec].onConnect + const handleB = registrarRecordB[multicodec].handler // Notice peers of connection - const [d0, d1] = DuplexPair() - onConnectA(peerInfoB, d0) - onConnectB(peerInfoA, d1) + const [c0, c1] = ConnectionPair() + await onConnectA(peerInfoB, c0) + + await handleB({ + protocol: multicodec, + stream: c1.stream, + remotePeer: peerInfoA.id + }) expect(fsA.peers.size).to.be.eql(1) expect(fsB.peers.size).to.be.eql(1) @@ -236,15 +234,6 @@ describe('basics between 2 nodes', () => { const registrarRecordA = {} const registrarRecordB = {} - const registrar = (registrarRecord) => ({ - register: (multicodec, handlers) => { - registrarRecord[multicodec] = handlers - }, - unregister: (multicodec) => { - delete registrarRecord[multicodec] - } - }) - // Mount pubsub protocol before(async () => { [peerInfoA, peerInfoB] = await Promise.all([ @@ -252,8 +241,8 @@ describe('basics between 2 nodes', () => { createPeerInfo() ]) - fsA = new FloodSub(peerInfoA, registrar(registrarRecordA), defOptions) - fsB = new FloodSub(peerInfoB, registrar(registrarRecordB), defOptions) + fsA = new FloodSub(peerInfoA, createMockRegistrar(registrarRecordA), defOptions) + fsB = new FloodSub(peerInfoB, createMockRegistrar(registrarRecordB), defOptions) }) // Start pubsub @@ -281,19 +270,18 @@ describe('basics between 2 nodes', () => { }) it('existing subscriptions are sent upon peer connection', async () => { + const dial = async () => { + const onConnectA = registrarRecordA[multicodec].onConnect + const onConnectB = registrarRecordB[multicodec].onConnect + + // Notice peers of connection + const [c0, c1] = ConnectionPair() + await onConnectA(peerInfoB, c0) + await onConnectB(peerInfoA, c1) + } + await Promise.all([ - // nodeA.dial(nodeB.peerInfo), - new Promise((resolve) => { - const onConnectA = registrarRecordA[multicodec].onConnect - const onConnectB = registrarRecordB[multicodec].onConnect - - // Notice peers of connection - const [d0, d1] = DuplexPair() - onConnectA(peerInfoB, d0) - onConnectB(peerInfoA, d1) - - resolve() - }), + dial(), new Promise((resolve) => fsA.once('floodsub:subscription-change', resolve)), new Promise((resolve) => fsB.once('floodsub:subscription-change', resolve)) ]) diff --git a/test/multiple-nodes.spec.js b/test/multiple-nodes.spec.js index 058cbcb..522c875 100644 --- a/test/multiple-nodes.spec.js +++ b/test/multiple-nodes.spec.js @@ -7,11 +7,16 @@ chai.use(require('dirty-chai')) const expect = chai.expect const pDefer = require('p-defer') -const DuplexPair = require('it-pair/duplex') const FloodSub = require('../src') const { multicodec } = require('../src') -const { createPeerInfo, first, expectSet } = require('./utils') +const { + createPeerInfo, + createMockRegistrar, + first, + expectSet, + ConnectionPair +} = require('./utils') async function spawnPubSubNode (peerInfo, reg) { const ps = new FloodSub(peerInfo, reg, { emitSelf: true }) @@ -33,15 +38,6 @@ describe('multiple nodes (more than 2)', () => { const registrarRecordB = {} const registrarRecordC = {} - const registrar = (registrarRecord) => ({ - register: (multicodec, handlers) => { - registrarRecord[multicodec] = handlers - }, - unregister: (multicodec) => { - delete registrarRecord[multicodec] - } - }) - before(async () => { [peerInfoA, peerInfoB, peerInfoC] = await Promise.all([ createPeerInfo(), @@ -50,26 +46,26 @@ describe('multiple nodes (more than 2)', () => { ]); [psA, psB, psC] = await Promise.all([ - spawnPubSubNode(peerInfoA, registrar(registrarRecordA)), - spawnPubSubNode(peerInfoB, registrar(registrarRecordB)), - spawnPubSubNode(peerInfoC, registrar(registrarRecordC)) + spawnPubSubNode(peerInfoA, createMockRegistrar(registrarRecordA)), + spawnPubSubNode(peerInfoB, createMockRegistrar(registrarRecordB)), + spawnPubSubNode(peerInfoC, createMockRegistrar(registrarRecordC)) ]) }) // connect nodes - before(() => { + before(async () => { const onConnectA = registrarRecordA[multicodec].onConnect const onConnectB = registrarRecordB[multicodec].onConnect const onConnectC = registrarRecordC[multicodec].onConnect // Notice peers of connection - const [d0, d1] = DuplexPair() - onConnectA(peerInfoB, d0) - onConnectB(peerInfoA, d1) + const [d0, d1] = ConnectionPair() + await onConnectA(peerInfoB, d0) + await onConnectB(peerInfoA, d1) - const [d2, d3] = DuplexPair() - onConnectB(peerInfoC, d2) - onConnectC(peerInfoB, d3) + const [d2, d3] = ConnectionPair() + await onConnectB(peerInfoC, d2) + await onConnectC(peerInfoB, d3) }) after(() => Promise.all([ @@ -246,15 +242,6 @@ describe('multiple nodes (more than 2)', () => { const registrarRecordD = {} const registrarRecordE = {} - const registrar = (registrarRecord) => ({ - register: (multicodec, handlers) => { - registrarRecord[multicodec] = handlers - }, - unregister: (multicodec) => { - delete registrarRecord[multicodec] - } - }) - before(async () => { [peerInfoA, peerInfoB, peerInfoC, peerInfoD, peerInfoE] = await Promise.all([ createPeerInfo(), @@ -265,16 +252,16 @@ describe('multiple nodes (more than 2)', () => { ]); [psA, psB, psC, psD, psE] = await Promise.all([ - spawnPubSubNode(peerInfoA, registrar(registrarRecordA)), - spawnPubSubNode(peerInfoB, registrar(registrarRecordB)), - spawnPubSubNode(peerInfoC, registrar(registrarRecordC)), - spawnPubSubNode(peerInfoD, registrar(registrarRecordD)), - spawnPubSubNode(peerInfoE, registrar(registrarRecordE)) + spawnPubSubNode(peerInfoA, createMockRegistrar(registrarRecordA)), + spawnPubSubNode(peerInfoB, createMockRegistrar(registrarRecordB)), + spawnPubSubNode(peerInfoC, createMockRegistrar(registrarRecordC)), + spawnPubSubNode(peerInfoD, createMockRegistrar(registrarRecordD)), + spawnPubSubNode(peerInfoE, createMockRegistrar(registrarRecordE)) ]) }) // connect nodes - before(() => { + before(async () => { const onConnectA = registrarRecordA[multicodec].onConnect const onConnectB = registrarRecordB[multicodec].onConnect const onConnectC = registrarRecordC[multicodec].onConnect @@ -282,21 +269,21 @@ describe('multiple nodes (more than 2)', () => { const onConnectE = registrarRecordE[multicodec].onConnect // Notice peers of connection - const [d0, d1] = DuplexPair() // A <-> B - onConnectA(peerInfoB, d0) - onConnectB(peerInfoA, d1) + const [d0, d1] = ConnectionPair() // A <-> B + await onConnectA(peerInfoB, d0) + await onConnectB(peerInfoA, d1) - const [d2, d3] = DuplexPair() // B <-> C - onConnectB(peerInfoC, d2) - onConnectC(peerInfoB, d3) + const [d2, d3] = ConnectionPair() // B <-> C + await onConnectB(peerInfoC, d2) + await onConnectC(peerInfoB, d3) - const [d4, d5] = DuplexPair() // C <-> D - onConnectC(peerInfoD, d4) - onConnectD(peerInfoC, d5) + const [d4, d5] = ConnectionPair() // C <-> D + await onConnectC(peerInfoD, d4) + await onConnectD(peerInfoC, d5) - const [d6, d7] = DuplexPair() // C <-> D - onConnectD(peerInfoE, d6) - onConnectE(peerInfoD, d7) + const [d6, d7] = ConnectionPair() // C <-> D + await onConnectD(peerInfoE, d6) + await onConnectE(peerInfoD, d7) }) after(() => Promise.all([ diff --git a/test/utils/index.js b/test/utils/index.js index 4d98473..fbb6643 100644 --- a/test/utils/index.js +++ b/test/utils/index.js @@ -2,6 +2,7 @@ const PeerId = require('peer-id') const PeerInfo = require('peer-info') +const DuplexPair = require('it-pair/duplex') const { expect } = require('chai') @@ -18,10 +19,50 @@ exports.createPeerInfo = async () => { } exports.mockRegistrar = { - register: (multicodecs, handlers) => { + handle: () => {}, + register: () => {}, + unregister: () => {} +} + +exports.createMockRegistrar = (registrarRecord) => ({ + handle: (multicodecs, handler) => { + const rec = registrarRecord[multicodecs[0]] || {} + registrarRecord[multicodecs[0]] = { + ...rec, + handler + } }, - unregister: (multicodecs) => { + register: (multicodecs, handlers) => { + const rec = registrarRecord[multicodecs[0]] || {} + + registrarRecord[multicodecs[0]] = { + ...rec, + ...handlers + } + return multicodecs[0] + }, + unregister: (id) => { + delete registrarRecord[id] } +}) + +exports.ConnectionPair = () => { + const [d0, d1] = DuplexPair() + + return [ + { + stream: d0, + newStream: () => Promise.resolve({ stream: d0 }) + }, + { + stream: d1, + newStream: () => Promise.resolve({ stream: d1 }) + } + ] +} + +exports.defOptions = { + emitSelf: true }