Skip to content

Commit

Permalink
update redundant adaptive broadcast channel
Browse files Browse the repository at this point in the history
  • Loading branch information
Nguyen Anh Tu committed Oct 8, 2024
1 parent 6b4f6b0 commit c30a1e4
Show file tree
Hide file tree
Showing 3 changed files with 248 additions and 9 deletions.
1 change: 1 addition & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ import * as ServerMethod from './methods/server';
export { BroadcastChannel, enforceOptions, OPEN_BROADCAST_CHANNELS } from './broadcast-channel';
export * from './method-chooser';
export { AdaptiveBroadcastChannel } from './adaptive-broadcast-channel';
export { RedundantAdaptiveBroadcastChannel } from './redundant-adaptive-broadcast-channel';
export { NativeMethod, IndexedDbMethod, LocalstorageMethod, ServerMethod };
17 changes: 9 additions & 8 deletions src/redundant-adaptive-broadcast-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,21 @@ export class RedundantAdaptiveBroadcastChannel {
}
}

handleMessage(event, method) {
if (event.data && event.data.nonce) {
if (this.processedNonces.has(event.data.nonce)) {
console.log(`Duplicate message received via ${method}, nonce: ${event.data.nonce}`);
handleMessage(event) {
if (event && event.nonce) {
if (this.processedNonces.has(event.nonce)) {
// console.log(`Duplicate message received via ${method}, nonce: ${event.nonce}`);
return;
}
this.processedNonces.add(event.data.nonce);
this.processedNonces.add(event.nonce);

// Cleanup old nonces (keeping last 1000 to prevent memory issues)
if (this.processedNonces.size > 1000) {
const oldestNonce = Math.min(...this.processedNonces);
this.processedNonces.delete(oldestNonce);
}

this.listeners.forEach(listener => listener(event.data.message));
this.listeners.forEach(listener => listener(event.message));
}
}

Expand All @@ -91,13 +91,14 @@ export class RedundantAdaptiveBroadcastChannel {
const postPromises = Array.from(this.channels.entries()).map(([method, channel]) =>
channel.postMessage(wrappedMessage).catch(error => {
console.warn(`Failed to send via ${method}: ${error.message}`);
throw error;
})
);

await Promise.allSettled(postPromises);
const result = await Promise.allSettled(postPromises);

// Check if at least one promise resolved successfully
const anySuccessful = postPromises.some(p => p.status === 'fulfilled');
const anySuccessful = result.some(p => p.status === 'fulfilled');
if (!anySuccessful) {
throw new Error('Failed to send message through any method');
}
Expand Down
239 changes: 238 additions & 1 deletion test/integration.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const assert = require('assert');
const isNode = require('detect-node');
const clone = require('clone');
const unload = require('unload');
const { AdaptiveBroadcastChannel, BroadcastChannel, OPEN_BROADCAST_CHANNELS, enforceOptions } = require('../');
const { AdaptiveBroadcastChannel, BroadcastChannel, RedundantAdaptiveBroadcastChannel, OPEN_BROADCAST_CHANNELS, enforceOptions } = require('../');

if (isNode) {
process.on('uncaughtException', (err, origin) => {
Expand All @@ -14,6 +14,9 @@ if (isNode) {
});
}

// eslint-disable-next-line no-undef
const sandbox = sinon.createSandbox();

/**
* we run this test once per method
*/
Expand Down Expand Up @@ -470,6 +473,240 @@ if (!isNode) {

useOptions.forEach((o) => runTest(o));

describe('RedundantAdaptiveBroadcastChannel', () => {
afterEach(function () {
sandbox.restore();
});

describe('.constructor()', () => {
it('log options', () => {
console.log('Started: ' + JSON.stringify({}));
});
it('should create a channel', async () => {
const channelName = AsyncTestUtil.randomString(12);
const channel = new RedundantAdaptiveBroadcastChannel(channelName);
await channel.close();
});
});

describe('.postMessage()', () => {
it('should post a message', async () => {
const channelName = AsyncTestUtil.randomString(12);
const channel = new RedundantAdaptiveBroadcastChannel(channelName);
await channel.postMessage('foobar');
await channel.close();
});
it('should throw if channel is already closed', async () => {
const channelName = AsyncTestUtil.randomString(12);
const channel = new RedundantAdaptiveBroadcastChannel(channelName);
await channel.close();
await AsyncTestUtil.assertThrows(() => channel.postMessage('foobar'), Error, 'closed');
});
});

describe('adaptive post message', () => {
it('should still receive message if 1 channel post fail with error', async () => {
const channelName = AsyncTestUtil.randomString(12);
const channel = new RedundantAdaptiveBroadcastChannel(channelName);
const otherChannel = new RedundantAdaptiveBroadcastChannel(channelName);

// native channel post message fail
const nativeChannel = channel.channels.get('native');
sandbox.stub(nativeChannel, 'postMessage').rejects(new Error('test'));

const emitted = [];
otherChannel.onmessage = (msg) => emitted.push(msg);
await channel.postMessage({
foo: 'bar',
});
await AsyncTestUtil.waitUntil(() => emitted.length === 1);
assert.equal(emitted[0].foo, 'bar');
await channel.close();
await otherChannel.close();
});

it('should still receive message if multiple channels post fail with error', async () => {
const channelName = AsyncTestUtil.randomString(12);
const channel = new RedundantAdaptiveBroadcastChannel(channelName);
const otherChannel = new RedundantAdaptiveBroadcastChannel(channelName);

// fail these channels
const failChannels = ['native', 'idb', 'localstorage'];
for (const [type, c] of channel.channels.entries()) {
if (failChannels.includes(type)) {
sandbox.stub(c, 'postMessage').rejects(new Error('test'));
}
}

const emitted = [];
otherChannel.onmessage = (msg) => emitted.push(msg);
await channel.postMessage({
foo: 'bar',
});
await AsyncTestUtil.waitUntil(() => emitted.length === 1);
assert.equal(emitted[0].foo, 'bar');
await channel.close();
await otherChannel.close();
});

it('should still receive message if 1 channel post fail silently', async () => {
const channelName = AsyncTestUtil.randomString(12);
const channel = new RedundantAdaptiveBroadcastChannel(channelName);
const otherChannel = new RedundantAdaptiveBroadcastChannel(channelName);

// native channel post message fail
const nativeChannel = channel.channels.get('native');
sandbox.stub(nativeChannel, 'postMessage').resolves(null);

const emitted = [];
otherChannel.onmessage = (msg) => emitted.push(msg);
await channel.postMessage({
foo: 'bar',
});
await AsyncTestUtil.waitUntil(() => emitted.length === 1);
assert.equal(emitted[0].foo, 'bar');
await channel.close();
await otherChannel.close();
});

it('should still receive message if multiple channels post fail silently', async () => {
const channelName = AsyncTestUtil.randomString(12);
const channel = new RedundantAdaptiveBroadcastChannel(channelName);
const otherChannel = new RedundantAdaptiveBroadcastChannel(channelName);

// fail these channels
const failChannels = ['native', 'idb', 'localstorage'];
for (const [type, c] of channel.channels.entries()) {
if (failChannels.includes(type)) {
sandbox.stub(c, 'postMessage').resolves(null);
}
}

const emitted = [];
otherChannel.onmessage = (msg) => emitted.push(msg);
await channel.postMessage({
foo: 'bar',
});
await AsyncTestUtil.waitUntil(() => emitted.length === 1);
assert.equal(emitted[0].foo, 'bar');
await channel.close();
await otherChannel.close();
});
});

describe('.onmessage', () => {
/**
* the window.BroadcastChannel
* does not emit postMessage to own subscribers,
* if you want to do that, you have to create another channel
*/
it('should NOT receive the message on own', async () => {
const channelName = AsyncTestUtil.randomString(12);
const channel = new RedundantAdaptiveBroadcastChannel(channelName);

const emitted = [];
channel.onmessage = (msg) => emitted.push(msg);
await channel.postMessage({
foo: 'bar',
});

await AsyncTestUtil.wait(100);
assert.equal(emitted.length, 0);

await channel.close();
});
it('should receive the message on other channel', async () => {
const channelName = AsyncTestUtil.randomString(12);
const channel = new RedundantAdaptiveBroadcastChannel(channelName);
const otherChannel = new RedundantAdaptiveBroadcastChannel(channelName);

const emitted = [];
otherChannel.onmessage = (msg) => emitted.push(msg);
await channel.postMessage({
foo: 'bar',
});
await AsyncTestUtil.waitUntil(() => emitted.length === 1);
assert.equal(emitted[0].foo, 'bar');
await channel.close();
await otherChannel.close();
});
});

describe('.close()', () => {
it('should have resolved all processed message promises when close() resolves', async () => {
const channelName = AsyncTestUtil.randomString(12);
const channel = new RedundantAdaptiveBroadcastChannel(channelName);

channel.postMessage({});
channel.postMessage({});
channel.postMessage({});

await channel.close();
for (const c in channel.channels.values()) {
assert.strictEqual(c.isClosed, true);
assert.strictEqual(c._uMP.size, 0);
}
});
});

describe('.addEventListener()', () => {
it('should emit events to all subscribers', async () => {
const channelName = AsyncTestUtil.randomString(12);
const channel = new RedundantAdaptiveBroadcastChannel(channelName);
const otherChannel = new RedundantAdaptiveBroadcastChannel(channelName);

const emitted1 = [];
const emitted2 = [];

otherChannel.addEventListener('message', (msg) => emitted1.push(msg));
otherChannel.addEventListener('message', (msg) => emitted2.push(msg));

const msg = {
foo: 'bar',
};
await channel.postMessage(msg);

await AsyncTestUtil.waitUntil(() => emitted1.length === 1);
await AsyncTestUtil.waitUntil(() => emitted2.length === 1);

assert.deepEqual(msg, emitted1[0]);
assert.deepEqual(msg, emitted2[0]);

await channel.close();
await otherChannel.close();
});
});

describe('.removeEventListener()', () => {
it('should no longer emit the message', async () => {
const channelName = AsyncTestUtil.randomString(12);
const channel = new RedundantAdaptiveBroadcastChannel(channelName);
const otherChannel = new RedundantAdaptiveBroadcastChannel(channelName);

const emitted = [];
const fn = (msg) => emitted.push(msg);
otherChannel.addEventListener('message', fn);

const msg = {
foo: 'bar',
};
await channel.postMessage(msg);

await AsyncTestUtil.waitUntil(() => emitted.length === 1);

otherChannel.removeEventListener('message', fn);

await channel.postMessage(msg);
await AsyncTestUtil.wait(100);

assert.equal(emitted.length, 1);

await channel.close();
await otherChannel.close();
});
});
});

describe('AdaptiveBroadcastChannel', () => {
describe('.constructor()', () => {
it('log options', () => {
Expand Down

0 comments on commit c30a1e4

Please sign in to comment.