Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

Commit

Permalink
chore: address review
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Oct 30, 2019
1 parent c670d50 commit d7ae2ed
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 104 deletions.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@ js-libp2p-floodsub
const FloodSub = require('libp2p-floodsub')

const registrar = {
register: (multicodec, handlers) => {
handle: (multicodecs, handle) => {
// register multicodec to libp2p
// handle function is called everytime a remote peer opens a stream to the peer.
},
register: (multicodecs, handlers) => {
// handlers will be used to notify pubsub of peer connection establishment or closing
},
unregister: (multicodec) => {
unregister: (id) => {

}
}
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@
"dirty-chai": "^2.0.1",
"it-pair": "^1.0.0",
"lodash": "^4.17.15",
"multiaddr": "^6.1.0",
"multiaddr": "^7.1.0",
"p-defer": "^3.0.0",
"peer-id": "~0.13.3",
"peer-info": "~0.17.0",
"sinon": "^7.5.0"
},
"dependencies": {
"async.nexttick": "^0.5.2",
"debug": "^4.1.1",
"it-length-prefixed": "^2.0.0",
"it-pipe": "^1.0.1",
Expand Down
18 changes: 12 additions & 6 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const pipe = require('it-pipe')
const lp = require('it-length-prefixed')
const pMap = require('p-map')
const TimeCache = require('time-cache')
const nextTick = require('async.nexttick')

const PeerInfo = require('peer-info')
const BaseProtocol = require('libp2p-pubsub')
Expand All @@ -27,6 +28,7 @@ class FloodSub extends BaseProtocol {
/**
* @param {PeerInfo} peerInfo instance of the peer's PeerInfo
* @param {Object} registrar
* @param {function} registrar.handle
* @param {function} registrar.register
* @param {function} registrar.unregister
* @param {Object} [options]
Expand All @@ -37,6 +39,7 @@ class FloodSub extends BaseProtocol {
assert(PeerInfo.isPeerInfo(peerInfo), 'peer info must be an instance of `peer-info`')

// registrar handling
assert(registrar && typeof registrar.handle === 'function', 'a handle function must be provided in registrar')
assert(registrar && typeof registrar.register === 'function', 'a register function must be provided in registrar')
assert(registrar && typeof registrar.unregister === 'function', 'a unregister function must be provided in registrar')

Expand Down Expand Up @@ -77,9 +80,10 @@ class FloodSub extends BaseProtocol {
* @override
* @param {PeerInfo} peerInfo peer info
* @param {Connection} conn connection to the peer
* @returns {Promise<void>}
*/
_onPeerConnected (peerInfo, conn) {
super._onPeerConnected(peerInfo, conn)
async _onPeerConnected (peerInfo, conn) {
await super._onPeerConnected(peerInfo, conn)
const idB58Str = peerInfo.id.toB58String()
const peer = this.peers.get(idB58Str)

Expand All @@ -105,7 +109,7 @@ class FloodSub extends BaseProtocol {
await pipe(
conn,
lp.decode(),
async function collect (source) {
async function (source) {
for await (const data of source) {
const rpc = Buffer.isBuffer(data) ? data : data.slice()

Expand Down Expand Up @@ -209,7 +213,7 @@ class FloodSub extends BaseProtocol {
/**
* Unmounts the floodsub protocol and shuts down every connection
* @override
* @returns {Promise}
* @returns {Promise<void>}
*/
async stop () {
await super.stop()
Expand All @@ -222,7 +226,7 @@ class FloodSub extends BaseProtocol {
* @override
* @param {Array<string>|string} topics
* @param {Array<any>|any} messages
* @returns {Promise}
* @returns {Promise<void>}
*/
async publish (topics, messages) {
assert(this.started, 'FloodSub is not started')
Expand Down Expand Up @@ -267,10 +271,10 @@ class FloodSub extends BaseProtocol {
assert(this.started, 'FloodSub is not started')

topics = ensureArray(topics)

topics.forEach((topic) => this.subscriptions.add(topic))

this.peers.forEach((peer) => sendSubscriptionsOnceReady(peer))

// make sure that FloodSub is already mounted
function sendSubscriptionsOnceReady (peer) {
if (peer && peer.isWritable) {
Expand Down Expand Up @@ -303,6 +307,8 @@ class FloodSub extends BaseProtocol {
function checkIfReady (peer) {
if (peer && peer.isWritable) {
peer.sendUnsubscriptions(topics)
} else {
nextTick(checkIfReady.bind(peer))
}
}
}
Expand Down
78 changes: 33 additions & 45 deletions test/2-nodes.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@ const expect = chai.expect

const pDefer = require('p-defer')
const times = require('lodash/times')
const DuplexPair = require('it-pair/duplex')

const FloodSub = require('../src')
const { multicodec } = require('../src')
const { first, createPeerInfo, expectSet } = require('./utils')

const defOptions = {
emitSelf: true
}
const {
defOptions,
first,
createPeerInfo,
createMockRegistrar,
expectSet,
ConnectionPair
} = require('./utils')

function shouldNotHappen (_) {
expect.fail()
Expand All @@ -31,24 +33,15 @@ describe('basics between 2 nodes', () => {
const registrarRecordA = {}
const registrarRecordB = {}

const registrar = (registrarRecord) => ({
register: (multicodecs, handlers) => {
registrarRecord[multicodecs[0]] = handlers
},
unregister: (multicodecs) => {
delete registrarRecord[multicodecs[0]]
}
})

// Mount pubsub protocol
before(async () => {
[peerInfoA, peerInfoB] = await Promise.all([
createPeerInfo(),
createPeerInfo()
])

fsA = new FloodSub(peerInfoA, registrar(registrarRecordA), defOptions)
fsB = new FloodSub(peerInfoB, registrar(registrarRecordB), defOptions)
fsA = new FloodSub(peerInfoA, createMockRegistrar(registrarRecordA), defOptions)
fsB = new FloodSub(peerInfoB, createMockRegistrar(registrarRecordB), defOptions)

expect(fsA.peers.size).to.be.eql(0)
expect(fsA.subscriptions.size).to.eql(0)
Expand All @@ -63,14 +56,19 @@ describe('basics between 2 nodes', () => {
]))

// Connect floodsub nodes
before(() => {
before(async () => {
const onConnectA = registrarRecordA[multicodec].onConnect
const onConnectB = registrarRecordB[multicodec].onConnect
const handleB = registrarRecordB[multicodec].handler

// Notice peers of connection
const [d0, d1] = DuplexPair()
onConnectA(peerInfoB, d0)
onConnectB(peerInfoA, d1)
const [c0, c1] = ConnectionPair()
await onConnectA(peerInfoB, c0)

await handleB({
protocol: multicodec,
stream: c1.stream,
remotePeer: peerInfoA.id
})

expect(fsA.peers.size).to.be.eql(1)
expect(fsB.peers.size).to.be.eql(1)
Expand Down Expand Up @@ -236,24 +234,15 @@ describe('basics between 2 nodes', () => {
const registrarRecordA = {}
const registrarRecordB = {}

const registrar = (registrarRecord) => ({
register: (multicodec, handlers) => {
registrarRecord[multicodec] = handlers
},
unregister: (multicodec) => {
delete registrarRecord[multicodec]
}
})

// Mount pubsub protocol
before(async () => {
[peerInfoA, peerInfoB] = await Promise.all([
createPeerInfo(),
createPeerInfo()
])

fsA = new FloodSub(peerInfoA, registrar(registrarRecordA), defOptions)
fsB = new FloodSub(peerInfoB, registrar(registrarRecordB), defOptions)
fsA = new FloodSub(peerInfoA, createMockRegistrar(registrarRecordA), defOptions)
fsB = new FloodSub(peerInfoB, createMockRegistrar(registrarRecordB), defOptions)
})

// Start pubsub
Expand Down Expand Up @@ -281,19 +270,18 @@ describe('basics between 2 nodes', () => {
})

it('existing subscriptions are sent upon peer connection', async () => {
const dial = async () => {
const onConnectA = registrarRecordA[multicodec].onConnect
const onConnectB = registrarRecordB[multicodec].onConnect

// Notice peers of connection
const [c0, c1] = ConnectionPair()
await onConnectA(peerInfoB, c0)
await onConnectB(peerInfoA, c1)
}

await Promise.all([
// nodeA.dial(nodeB.peerInfo),
new Promise((resolve) => {
const onConnectA = registrarRecordA[multicodec].onConnect
const onConnectB = registrarRecordB[multicodec].onConnect

// Notice peers of connection
const [d0, d1] = DuplexPair()
onConnectA(peerInfoB, d0)
onConnectB(peerInfoA, d1)

resolve()
}),
dial(),
new Promise((resolve) => fsA.once('floodsub:subscription-change', resolve)),
new Promise((resolve) => fsB.once('floodsub:subscription-change', resolve))
])
Expand Down
83 changes: 35 additions & 48 deletions test/multiple-nodes.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,16 @@ chai.use(require('dirty-chai'))
const expect = chai.expect

const pDefer = require('p-defer')
const DuplexPair = require('it-pair/duplex')

const FloodSub = require('../src')
const { multicodec } = require('../src')
const { createPeerInfo, first, expectSet } = require('./utils')
const {
createPeerInfo,
createMockRegistrar,
first,
expectSet,
ConnectionPair
} = require('./utils')

async function spawnPubSubNode (peerInfo, reg) {
const ps = new FloodSub(peerInfo, reg, { emitSelf: true })
Expand All @@ -33,15 +38,6 @@ describe('multiple nodes (more than 2)', () => {
const registrarRecordB = {}
const registrarRecordC = {}

const registrar = (registrarRecord) => ({
register: (multicodec, handlers) => {
registrarRecord[multicodec] = handlers
},
unregister: (multicodec) => {
delete registrarRecord[multicodec]
}
})

before(async () => {
[peerInfoA, peerInfoB, peerInfoC] = await Promise.all([
createPeerInfo(),
Expand All @@ -50,26 +46,26 @@ describe('multiple nodes (more than 2)', () => {
]);

[psA, psB, psC] = await Promise.all([
spawnPubSubNode(peerInfoA, registrar(registrarRecordA)),
spawnPubSubNode(peerInfoB, registrar(registrarRecordB)),
spawnPubSubNode(peerInfoC, registrar(registrarRecordC))
spawnPubSubNode(peerInfoA, createMockRegistrar(registrarRecordA)),
spawnPubSubNode(peerInfoB, createMockRegistrar(registrarRecordB)),
spawnPubSubNode(peerInfoC, createMockRegistrar(registrarRecordC))
])
})

// connect nodes
before(() => {
before(async () => {
const onConnectA = registrarRecordA[multicodec].onConnect
const onConnectB = registrarRecordB[multicodec].onConnect
const onConnectC = registrarRecordC[multicodec].onConnect

// Notice peers of connection
const [d0, d1] = DuplexPair()
onConnectA(peerInfoB, d0)
onConnectB(peerInfoA, d1)
const [d0, d1] = ConnectionPair()
await onConnectA(peerInfoB, d0)
await onConnectB(peerInfoA, d1)

const [d2, d3] = DuplexPair()
onConnectB(peerInfoC, d2)
onConnectC(peerInfoB, d3)
const [d2, d3] = ConnectionPair()
await onConnectB(peerInfoC, d2)
await onConnectC(peerInfoB, d3)
})

after(() => Promise.all([
Expand Down Expand Up @@ -246,15 +242,6 @@ describe('multiple nodes (more than 2)', () => {
const registrarRecordD = {}
const registrarRecordE = {}

const registrar = (registrarRecord) => ({
register: (multicodec, handlers) => {
registrarRecord[multicodec] = handlers
},
unregister: (multicodec) => {
delete registrarRecord[multicodec]
}
})

before(async () => {
[peerInfoA, peerInfoB, peerInfoC, peerInfoD, peerInfoE] = await Promise.all([
createPeerInfo(),
Expand All @@ -265,38 +252,38 @@ describe('multiple nodes (more than 2)', () => {
]);

[psA, psB, psC, psD, psE] = await Promise.all([
spawnPubSubNode(peerInfoA, registrar(registrarRecordA)),
spawnPubSubNode(peerInfoB, registrar(registrarRecordB)),
spawnPubSubNode(peerInfoC, registrar(registrarRecordC)),
spawnPubSubNode(peerInfoD, registrar(registrarRecordD)),
spawnPubSubNode(peerInfoE, registrar(registrarRecordE))
spawnPubSubNode(peerInfoA, createMockRegistrar(registrarRecordA)),
spawnPubSubNode(peerInfoB, createMockRegistrar(registrarRecordB)),
spawnPubSubNode(peerInfoC, createMockRegistrar(registrarRecordC)),
spawnPubSubNode(peerInfoD, createMockRegistrar(registrarRecordD)),
spawnPubSubNode(peerInfoE, createMockRegistrar(registrarRecordE))
])
})

// connect nodes
before(() => {
before(async () => {
const onConnectA = registrarRecordA[multicodec].onConnect
const onConnectB = registrarRecordB[multicodec].onConnect
const onConnectC = registrarRecordC[multicodec].onConnect
const onConnectD = registrarRecordD[multicodec].onConnect
const onConnectE = registrarRecordE[multicodec].onConnect

// Notice peers of connection
const [d0, d1] = DuplexPair() // A <-> B
onConnectA(peerInfoB, d0)
onConnectB(peerInfoA, d1)
const [d0, d1] = ConnectionPair() // A <-> B
await onConnectA(peerInfoB, d0)
await onConnectB(peerInfoA, d1)

const [d2, d3] = DuplexPair() // B <-> C
onConnectB(peerInfoC, d2)
onConnectC(peerInfoB, d3)
const [d2, d3] = ConnectionPair() // B <-> C
await onConnectB(peerInfoC, d2)
await onConnectC(peerInfoB, d3)

const [d4, d5] = DuplexPair() // C <-> D
onConnectC(peerInfoD, d4)
onConnectD(peerInfoC, d5)
const [d4, d5] = ConnectionPair() // C <-> D
await onConnectC(peerInfoD, d4)
await onConnectD(peerInfoC, d5)

const [d6, d7] = DuplexPair() // C <-> D
onConnectD(peerInfoE, d6)
onConnectE(peerInfoD, d7)
const [d6, d7] = ConnectionPair() // C <-> D
await onConnectD(peerInfoE, d6)
await onConnectE(peerInfoD, d7)
})

after(() => Promise.all([
Expand Down
Loading

0 comments on commit d7ae2ed

Please sign in to comment.