Skip to content

Commit

Permalink
fix operator feed regression
Browse files Browse the repository at this point in the history
  • Loading branch information
dskvr committed Jan 14, 2025
1 parent 0e5da1b commit 651eab9
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 40 deletions.
2 changes: 0 additions & 2 deletions apps/gui/src/lib/components/lists/table/DataTable.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,6 @@
</script>

<pre>{JSON.stringify(Object.keys($config.tableFormatters).length, null, 2)}</pre>

<!-- **UI Layout with Resizable Panes** -->
<Resizable.PaneGroup direction="horizontal" class="min-h-[100%]">
<!-- **Main Table Pane** -->
Expand Down
27 changes: 11 additions & 16 deletions apps/gui/src/lib/services/UserService/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export class UserService extends Service {
super(adapters)
this.addRelay('userMeta', 'wss://purplepag.es')
this.addRelay('userMeta', 'wss://user.kindpag.es')
this._ready = true;
}

get subIds(): string[] {
Expand Down Expand Up @@ -73,26 +74,20 @@ export class UserService extends Service {
return notes;
}

async unsubscribe(hash?: string): Promise<void> {
const $nip66: Nip66 = get(nip66)
await $nip66.adapters?.websocket?.unsubscribe(hash)
}
// async unsubscribe(hash?: string): Promise<void> {
// const $nip66: Nip66 = get(nip66)
// await $nip66.adapters?.websocket?.unsubscribe(hash)
// }

async unsubscribeAll(): Promise<void> {
const $nip66: Nip66 = get(nip66)
const promises: Promise<boolean>[] = []
for(const id of this._subIds) {
promises.push($nip66.adapters?.websocket?.unsubscribe(id))
}
await Promise.all(promises)
this._subIds = []
}
// async unsubscribeAll(): Promise<void> {
// const $nip66: Nip66 = get(nip66)
// const promises: Promise<boolean>[] = []
// this.unsubscribeMany(this._subIds)
// }

async feed(user: User, limit: number = 1, until?: number): Promise<UserFeedItem[]> {
const { relays } = user
const notes = await this.userNotes(user, limit, until)
const relatives: IEvent[][] = [[]]

return notes
.map((_note: IEvent, index: number) => {
const note = new NostrEvent(_note, { relays: relays ?? [] })
Expand All @@ -102,7 +97,7 @@ export class UserService extends Service {
const reactions = relatives.filter(rel => rel.kind === 7);
const zaps = relatives.filter(rel => rel.kind === 9735 || rel.kind === 9321);
const comments = relatives.filter(rel => rel.kind === 1 || rel.kind === 1111);
return { reactions, zaps, comments};
return { reactions, zaps, comments };
}
return { user, note, fetchRelatives };
})
Expand Down
22 changes: 5 additions & 17 deletions libraries/nip66/src/core/WebsocketAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,11 @@ export class WebsocketAdapter extends Adapter implements IWebsocketAdapter {
}

async shutdown(): Promise<void> {
console.log('shutting down websocket adapter')
console.log('unsubscribing all...')
await this.unsubscribeAll();
console.log('aborting...')
await this.abort();
console.log('terminating worker...')
await delay(1000);
this.terminate()
await delay(100);
}

async abort(): Promise<boolean> {
Expand Down Expand Up @@ -195,13 +192,15 @@ export class WebsocketAdapter extends Adapter implements IWebsocketAdapter {
}

async subscribe(args: WebsocketRequestBody = defaultWebsocketRequestBody, callbacks?: SubscribeHandlers): Promise<IEvent[] | boolean>{
let { hash } = args
if(callbacks && Object.keys(callbacks).length > 0) {
args.options.stream = true
}
const hash = this.request({
const hash_ = this.request({
action: 'subscribe',
args
})
if(!hash) hash = hash_
if(this.subscriptions.has(hash)) {
console.warn(`[WebsocketAdapter] Already subscribed to ${hash}`)
return true;
Expand Down Expand Up @@ -238,15 +237,10 @@ export class WebsocketAdapter extends Adapter implements IWebsocketAdapter {
message.args.hash = deterministicHash(message?.args?.filters ?? {})
}
const { hash } = message.args
//console.log('HASH', hash)
if(!this?.worker) {
console.warn('[WebsocketAdapter] Error sending command: no worker found')
return hash
}
//console.log('adding subscription', hash)


//console.log(`[WebsocketAdapter:${this.constructor.name}] o/o SEND: ${message.action} -> websocketWorker`, message.args.filters)
}
if(this.worker instanceof Worker)
this.worker.postMessage(message)
else if(this.worker instanceof SharedWorker)
Expand All @@ -255,11 +249,9 @@ export class WebsocketAdapter extends Adapter implements IWebsocketAdapter {
}

async response(hash: string, callbacks?: SubscribeHandlers): Promise<boolean | any[]>{
//console.log('response', hash)
return new Promise( resolve => {
const results: any[] = []
const responseHandler = (message: WebsocketResponseBody) => {
//console.log('websocket adapter response handler...')
let { result, type } = message
if(type === 'unsubscribed'){
return true
Expand All @@ -268,9 +260,7 @@ export class WebsocketAdapter extends Adapter implements IWebsocketAdapter {
return true
}
if(type === 'events') {
//console.log(`websocket adapter: events`)
if(callbacks?.onevents){
//console.log(`websocket adapter: onevents(${result.length})`)
callbacks.onevents(result)
return
}
Expand All @@ -292,9 +282,7 @@ export class WebsocketAdapter extends Adapter implements IWebsocketAdapter {
}
}
else if(type == 'complete'){
//console.log('deleting subscription:', hash)
this.subscriptions.delete(hash)
//console.log('complete: removing handler.', hash)
StateManager.off(hash)
if(callbacks?.onevent){
resolve(true)
Expand Down
20 changes: 16 additions & 4 deletions libraries/nip66/src/services/Service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,22 @@ export class Service {
}

async subscribe(args: FetchOptions, callbacks?: SubscribeHandlers): Promise<IEvent[]> {
console.log(`Service.subscribe()`)
await this.ready();
console.log(`Service: is ready`)
let { filters, relays, options, hash } = args;
if(filters) {
this.fetchFromCache(filters, callbacks);
}
if(!hash) {
hash = deterministicHash(args)
console.log('Service.subscribe:hash', hash)
}
const message: WebsocketRequestBody = { filters, relays, options, hash };
console.log('Service add hash', hash)
this.subscriptions.add(hash)
if(filters) {
this.fetchFromCache(filters, callbacks);
}
const message: WebsocketRequestBody = { filters, relays, options, hash };
const result = await this.websocketAdapter.subscribe(message, callbacks);
console.log('Service delete hash', hash)
this.subscriptions.delete(hash)
return typeof result === 'boolean'? []: result;
}
Expand All @@ -98,6 +103,13 @@ export class Service {
this.websocketAdapter.unsubscribe(hash);
}

async unsubscribeMany(hashes: string[]) {
await this.ready();
hashes.forEach((hash) => {
this.unsubscribe(hash);
})
}

async unsubscribeAllActive(){
await this.ready();
this.subscriptions.forEach((hash) => {
Expand Down
1 change: 0 additions & 1 deletion libraries/nocap/src/classes/Base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,6 @@ export default class Base {
}
this.precheck(key)
.then(async () => {
console.log('seriously wtf fuck this.')
this.logger.debug(`${key}: precheck resolved`);
this.latency.start(key);
this.logger.debug(`${key}: this.adapters[${adapter}][${this.checkKey(key)}]()`);
Expand Down

0 comments on commit 651eab9

Please sign in to comment.