Skip to content

Commit

Permalink
Update internal dependencies with actual versions
Browse files Browse the repository at this point in the history
  • Loading branch information
dskvr committed Sep 11, 2024
1 parent 9085b96 commit 75a585e
Show file tree
Hide file tree
Showing 31 changed files with 813 additions and 1,195 deletions.
15 changes: 15 additions & 0 deletions .changeset/lazy-emus-warn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
'@nostrwatch/nostrings': minor
'@nostrwatch/trawler': minor
'@nostrwatch/nocapd': minor
'@nostrwatch/nocap-every-adapter-default': patch
'@nostrwatch/kit-adapter-idb': patch
'@nostrwatch/demo-kit-with-idb': patch
'@nostrwatch/sanitize': patch
'@nostrwatch/nip11': patch
'@nostrwatch/nocap': patch
'@nostrwatch/seed': patch
'@nostrwatch/idb': patch
---

nostrings + nocapd improvements
45 changes: 0 additions & 45 deletions apps/nocapd/src/classes/Persist.js

This file was deleted.

56 changes: 56 additions & 0 deletions apps/nocapd/src/classes/ShortBus.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import Logger from '@nostrwatch/logger'
import { PersistQueue, BullMQ } from '@nostrwatch/controlflow'
import { delay, RedisConnectionDetails } from "@nostrwatch/utils"

const JOB_OPTS = {
removeOnComplete: true,
removeOnFail: true,
attempts: 3,
backoff: {
type: 'exponential',
delay: 60*1000,
}
}

export class ShortBus {

$ = null
key
worker = () => {}
workerFns = new Map()

constructor(slug){
this.key = `persist6/${slug}`
this.$ = PersistQueue(this.key)
this.worker = new BullMQ.Worker(this.key, this.work.bind(this), { concurrency: 1, connection: RedisConnectionDetails() } )
this.$.$Queue.resume()
this.log = new Logger('@nostrwatch/shortbus')
}

async addJob(payload){
return this.$.$Queue.add( 'persist', payload, JOB_OPTS )
}

setWorker(key, fn){
this.log.debug(`ShortBus::setWorker`, key)
if(fn instanceof Function)
this.workerFns.set(key, fn)
this.log.debug(`ShortBus: ${this.workerFns.size} workers registered`)
}

async work(job) {
await delay(1)
const { type } = job.data
for(let [key, fn] of this.workerFns) {
if( type !== key ) continue;
this.log.debug(`ShortBus: BEGIN ${key}`)
try {
await fn(job)?.catch(e => this.log.error(`Worker error ${e.message}`))
}
catch(e){
this.log.error(`Worker error ${e.message}`)
}
}
}

}
19 changes: 9 additions & 10 deletions apps/nocapd/src/classes/Worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,25 @@ import Publish from '@nostrwatch/publisher'
import { Nocap } from "@nostrwatch/nocap"
import nocapAdapters from "@nostrwatch/nocap-every-adapter-default"

import { Persist } from './Persist.js'

import util from 'util'

let errors = 0

export class NWWorker {
key = 'relay-check'
$
rcache
pubkey
persist
bus

constructor(pubkey, $q, rcache, config){
constructor(pubkey, $q, rcache, bus, config){
this.pubkey = pubkey
this.$ = $q
this.rcache = rcache
this.config = config
this.persist = new Persist(config?.monitor?.slug)
this.persist.setWorker(this.persist_result.bind(this))
this.setup()
this.log.info(`${this.id()} initialized`)
this.bus = bus
}

setup(){
Expand Down Expand Up @@ -151,15 +149,16 @@ export class NWWorker {
this.log.debug(`after_completed(): ${result.url}`)
const concurrency = this.config?.nocapd?.bullmq?.worker?.concurrency
if(!concurrency || concurrency <= 1) {
await this.persist_result({ data: result })
await this.bus_result({ data: { result, type: this.key }})
}
else {
await this.persist.addJob(result)
await this.bus.addJob({ result, type: this.key })
}
}

async persist_result(job){
const {data:result} = job
const {result} = job.data
this.log.debug(`inside persist_result()`)
let err = false
const fail = result?.open?.data? false: true
const cacheError = (from, e) => err = { from, e}
Expand Down Expand Up @@ -464,7 +463,7 @@ export class NWWorker {
}

if(errors > 0)
console.log('DATA INTEGRITY ERRORS', errors)
this.log.debug(`DATA INTEGRITY ERRORS #: ${errors}`)

expiredRelays = expiredRelays.sort((a, b) => a.retries - b.retries).map(r => r.url);

Expand Down
61 changes: 40 additions & 21 deletions apps/nocapd/src/daemon.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { bootstrap } from '@nostrwatch/seed'
import { parseRelayNetwork, delay, loadConfig, RedisConnectionDetails } from "@nostrwatch/utils"

import { NWWorker } from './classes/Worker.js'
import { ShortBus } from './classes/ShortBus.js'
import { NocapdQueues } from './classes/NocapdQueues.js'

import util from 'util'
Expand All @@ -26,6 +27,7 @@ const log = new Logger('@nostrwatch/nocapd')
let rcache,
config,
$q,
bus,
jobs

const populateJobQueue = async () => {
Expand All @@ -48,17 +50,22 @@ const setSchedules = async () => {
return { relayPopulator, jobPopulator }
}

const initWorker = async () => {
const initBus = () => {
bus = new ShortBus(config?.monitor?.slug)
}

const initQueue = async () => {

const connection = RedisConnectionDetails()
log.info(`initWorker(): connecting to redis at`, connection)
log.info(`initQueue(): connecting to redis at`, connection)
const concurrency = config?.nocapd?.bullmq?.worker?.concurrency? config.nocapd.bullmq.worker.concurrency: 1
const ncdq = NocapdQueue(`nocapd/${config?.monitor?.slug}` || null)

$q = new NocapdQueues({ pubkey: PUBKEY, logger: new Logger('@nostrwatch/nocapd:queue-control'), redis: connection })
await $q
.set( 'queue' , ncdq.$Queue )
.set( 'events' , ncdq.$QueueEvents )
.set( 'checker', new NWWorker(PUBKEY, $q, rcache, {...config, logger: new Logger('@nostrwatch/nocapd:worker'), pubkey: PUBKEY }) )
.set( 'checker', new NWWorker(PUBKEY, $q, rcache, bus, {...config, logger: new Logger('@nostrwatch/nocapd:worker'), pubkey: PUBKEY }) )
.set( 'worker' , new BullMQ.Worker($q.queue.name, $q.route_work.bind($q), { concurrency, connection, ...queueOpts() } ) )
await $q.checker.drainSmart()

Expand All @@ -73,7 +80,7 @@ const initWorker = async () => {

$q.resume()
log.info(`initialized: ${$q.queue.name}`)
return $q

}

const stop = async(signal) => {
Expand Down Expand Up @@ -162,11 +169,20 @@ const scheduleJobPopulator = () =>{
return scheduleSeconds(name, seconds, job)
}

const relayPopulatorOnTheShortBus = () => {
bus.setWorker('relay-import', persistRelays)
}

const relayCheckerOnTheShortBus = () => {
bus.setWorker($q.checker.key, $q.checker.persist_result.bind($q.checker))
}

const scheduleRelayPopulator = () =>{
const name = "scheduleRelayPopulator()"
const seedOpts = config?.nocapd?.seed
if(!seedOpts || !config?.nocapd?.seed?.sources?.length) return
const seconds = timestring(seedOpts.interval, "s")

const job = async () => {
log.debug(`Scheduled: populateRelays()`)
await populateRelays()
Expand All @@ -187,8 +203,8 @@ const pause = async (caller = "unknown") => {
log.info(`${caller} pausing: all queues/workers`)
await $q.queue.pause()
await $q.worker.pause()
await $q.checker.persist.$.$Queue.pause()
await $q.checker.persist.worker.pause()
await bus.$.$Queue.pause()
await bus.worker.pause()
await delay(1000)
log.info(`${caller} paused: all queues/workers`)
}
Expand All @@ -197,31 +213,33 @@ const resume = async (caller = "unknown") => {
log.info(`${caller} resuming: all queues/workers`)
await $q.queue.resume()
await $q.worker.resume()
await $q.checker.persist.$.$Queue.resume()
await $q.checker.persist.worker.resume()
await bus.$.$Queue.resume()
await bus.worker.resume()
await delay(1000)
log.info(`${caller} resumed: all queues/workers`)
}

const populateRelays = async () => {
if($q?.queue) await pause('populateRelays()')

log.debug(`populateRelays(): begin`)
const syncData = await bootstrap('nocapd')
await delay(1)

log.debug(`populateRelays(): found ${syncData[0].length} *maybe new* relays`)
const relays = syncData[0].map(r => { return { url: normalizeUrl(r), online: null, network: parseRelayNetwork(r) } })

log.debug(`populateRelays(): Persisting ${relays.length} relays`, relays)
const persisted = await rcache.relay.batch.insertIfNotExists(relays).catch(console.error)

if($q?.queue) await resume('populateRelays()')
bus.addJob({ relays, type: 'relay-import' })
}

if(persisted.length === 0) return 0
const persistRelays = async (job) => {
log.debug('persistRelays(): begin')

log.info(chalk.yellow.bold(`Persisted ${persisted.length} new relays`))
return persisted
const { relays } = job.data
log.debug(`populateRelays(): Persisting ${relays.length} relays`, relays)
const persisted = await rcache.relay.batch.insertIfNotExists(relays).catch(console.error)

if(persisted.length === 0) return 0

log.info(chalk.yellow.bold(`Persisted ${persisted.length} new relays`))
}

const queueOpts = () => {
Expand Down Expand Up @@ -282,11 +300,12 @@ export const Nocapd = async () => {
await maybeAnnounce()
if(await maybeBootstrap())
log.info('Bootstrapped')
// else
// await populateRelays()

$q = await initWorker()
// $q.worker.on('drained', populateJobQueue)
initBus()
await initQueue()

relayPopulatorOnTheShortBus()
relayCheckerOnTheShortBus()

globalHandlers()
return {
Expand Down
Loading

0 comments on commit 75a585e

Please sign in to comment.