Skip to content

Commit

Permalink
fix: retry all streaming errors (#687)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwasniew authored Dec 19, 2024
1 parent 098551c commit 60807bf
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 17 deletions.
38 changes: 26 additions & 12 deletions src/repository/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ export default class Repository extends EventEmitter implements EventEmitter {

private eventSource: EventSource | undefined;

private initialEventSourceConnected: boolean = false;

constructor({
url,
appName,
Expand Down Expand Up @@ -130,26 +132,37 @@ export default class Repository extends EventEmitter implements EventEmitter {
this.segments = new Map();
this.eventSource = eventSource;
if (this.eventSource) {
this.eventSource.addEventListener('unleash-updated', (event: { data: string }) => {
try {
const data: ClientFeaturesResponse & { meta: { etag: string } } = JSON.parse(event.data);
const etag = data.meta.etag;
if (etag !== null) {
this.etag = etag;
} else {
this.etag = undefined;
}
this.save(data, true);
} catch (err) {
this.emit(UnleashEvents.Error, err);
// On re-connect it guarantees catching up with the latest state.
this.eventSource.addEventListener('unleash-connected', (event: { data: string }) => {
// reconnect
if (this.initialEventSourceConnected) {
this.handleFlagsFromStream(event);
} else {
this.initialEventSourceConnected = true;
}
});
this.eventSource.addEventListener('unleash-updated', this.handleFlagsFromStream.bind(this));
this.eventSource.addEventListener('error', (error: unknown) => {
this.emit(UnleashEvents.Warn, error);
});
}
}

private handleFlagsFromStream(event: { data: string }) {
try {
const data: ClientFeaturesResponse & { meta: { etag: string } } = JSON.parse(event.data);
const etag = data.meta.etag;
if (etag !== null) {
this.etag = etag;
} else {
this.etag = undefined;
}
this.save(data, true);
} catch (err) {
this.emit(UnleashEvents.Error, err);
}
}

timedFetch(interval: number) {
if (interval > 0 && !this.eventSource) {
this.timer = setTimeout(() => this.fetch(), interval);
Expand Down Expand Up @@ -180,6 +193,7 @@ export default class Repository extends EventEmitter implements EventEmitter {
}

async start(): Promise<void> {
// the first fetch is used as a fallback even when streaming is enabled
await Promise.all([this.fetch(), this.loadBackup(), this.loadBootstrap()]);
}

Expand Down
27 changes: 22 additions & 5 deletions src/test/repository.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1363,8 +1363,8 @@ test('Stopping repository should stop storage provider updates', async (t) => {
});

test('Streaming', async (t) => {
t.plan(6);
const url = 'irrelevant';
t.plan(7);
const url = 'http://unleash-test-streaming.app';
const feature = {
name: 'feature',
enabled: true,
Expand All @@ -1374,6 +1374,7 @@ test('Streaming', async (t) => {
},
],
};
setup(url, [{ ...feature, name: 'initialFetch' }]);
const storageProvider: StorageProvider<ClientFeaturesResponse> = new InMemStorageProvider();
const eventSource = {
eventEmitter: new EventEmitter(),
Expand Down Expand Up @@ -1402,16 +1403,24 @@ test('Streaming', async (t) => {
eventSource,
});

await repo.start();

// first connection is ignored, since we do regular fetch
eventSource.emit('unleash-connected', {
type: 'unleash-connected',
data: JSON.stringify({ meta: {}, features: [{ ...feature, name: 'intialConnectedIgnored' }] }),
});

const before = repo.getToggles();
t.deepEqual(before, []);
t.deepEqual(before, [{ ...feature, name: 'initialFetch' }]);

// update with feature
eventSource.emit('unleash-updated', {
type: 'unleash-updated',
data: JSON.stringify({ meta: {}, features: [feature] }),
data: JSON.stringify({ meta: {}, features: [{ ...feature, name: 'firstUpdate' }] }),
});
const firstUpdate = repo.getToggles();
t.deepEqual(firstUpdate, [feature]);
t.deepEqual(firstUpdate, [{ ...feature, name: 'firstUpdate' }]);
// @ts-expect-error
t.is(repo.etag, undefined);

Expand All @@ -1430,4 +1439,12 @@ test('Streaming', async (t) => {
t.is(msg, 'some error');
});
eventSource.emit('error', 'some error');

// re-connect simulation
eventSource.emit('unleash-connected', {
type: 'unleash-connected',
data: JSON.stringify({ meta: {}, features: [{ ...feature, name: 'reconnectUpdate' }] }),
});
const reconnectUpdate = repo.getToggles();
t.deepEqual(reconnectUpdate, [{ ...feature, name: 'reconnectUpdate' }]);
});
4 changes: 4 additions & 0 deletions src/unleash.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ export class Unleash extends EventEmitter {
maxBackoffMillis: 30000,
retryResetIntervalMillis: 60000,
jitterRatio: 0.5,
errorFilter: function () {
// retry all errors
return true;
},
})
: undefined,
storageProvider: storageProvider || new FileStorageProvider(backupPath),
Expand Down

0 comments on commit 60807bf

Please sign in to comment.