diff --git a/app/jobs/delete_gbif_events_job.rb b/app/jobs/delete_gbif_events_job.rb index 2a75fd443..25c5bad33 100644 --- a/app/jobs/delete_gbif_events_job.rb +++ b/app/jobs/delete_gbif_events_job.rb @@ -3,11 +3,24 @@ class DeleteGbifEventsJob < ApplicationJob queue_as :lupo_background - def perform(id, options = {}) - event = Event.find_by(uuid: id) + def perform(ids, options = {}) + label = options[:label] + index = ENV["INDEX"] - event.destroy! if event.present? + if index.blank? + Rails.logger.error("#{label}: ENV['INDEX'] must be provided") + return + end + + # delete event records from mysql + result = Events.where(id: ids).delete_all + Rails.logger.info("#{label}: #{result} event records deleted") + + # delete event documents from elasticsearch + bulk_payload = ids.map { |id| { delete: { _index: index, _id: id } } } + response = Event.__elasticsearch__.client.bulk(body: bulk_payload) + Rails.logger.info("#{label}: #{response}") rescue => err - Rails.logger.info("#{options[:label]}: event delete error: #{err.message}") + Rails.logger.error("#{label}: #{are.message}") end end diff --git a/app/models/event.rb b/app/models/event.rb index b8fe8745c..4fec93971 100644 --- a/app/models/event.rb +++ b/app/models/event.rb @@ -880,6 +880,39 @@ def self.loop_through_events(options) Rails.logger.info("#{label}: task completed") end + def self.loop_through_gbif_events(options) + size = (options[:size] || 1_000).to_i + cursor = options[:cursor] || [] + filter = options[:filter] || {} + label = options[:label] || "" + job_name = options[:job_name] || "" + query = options[:query].presence + + response = Event.query(query, filter.merge(page: { size: 1, cursor: [] })) + + if response.size.positive? + while response.size.positive? + response = Event.query(query, filter.merge(page: { size: size, cursor: cursor })) + + break unless response.size.positive? + + Rails.logger.info("#{label}: #{response.size} events starting with _id #{response.results.to_a.first[:_id]}") + + cursor = response.results.to_a.last[:sort] + + Rails.logger.info "#{label}: cursor: #{cursor}" + + ids = response.results.map(&:_id).uniq + + DeleteGbifEventsJob.perform_later(ids, options) + + # Object.const_get(job_name).perform_later(ids, options) + end + end + + Rails.logger.info("#{label}: task completed") + end + def metric_type if /(requests|investigations)/.match?(relation_type_id.to_s) arr = relation_type_id.split("-", 4) diff --git a/lib/tasks/event.rake b/lib/tasks/event.rake index 2e6fd5323..d20925ea1 100644 --- a/lib/tasks/event.rake +++ b/lib/tasks/event.rake @@ -88,16 +88,17 @@ namespace :gbif_events do desc "delete gbif events" task delete_gbif_events: :environment do options = { - size: 100, + size: 5, from_id: (ENV["FROM_ID"] || Event.minimum(:id)).to_i, until_id: (ENV["UNTIL_ID"] || Event.maximum(:id)).to_i, filter: {}, - query: "+subj.registrantId:datacite.gbif.gbif +relation_type_id:references -source_doi:(\"10.15468/QJGWBA\" OR \"10.35035/GDWQ-3V93\" OR \"10.15469/3XSWXB\" OR \"10.15469/UBP6QO\" OR \"10.35000/TEDB-QD70\" OR \"10.15469/2YMQOZ\")", + # query: "+subj.registrantId:datacite.gbif.gbif +relation_type_id:references -source_doi:(\"10.15468/QJGWBA\" OR \"10.35035/GDWQ-3V93\" OR \"10.15469/3XSWXB\" OR \"10.15469/UBP6QO\" OR \"10.35000/TEDB-QD70\" OR \"10.15469/2YMQOZ\")", + query: "+relation_type_id:is-authored-by", label: "gbif_event_cleanup_#{Time.now.utc.strftime("%d%m%Y%H%M%S")}", job_name: "DeleteGbifEventsJob" } - Event.loop_through_events(options) + Event.loop_through_gbif_events(options) end desc "delete orphaned gbif_events"