From 7a11da87d68cc3372fd019a473913a7ec0eee9b5 Mon Sep 17 00:00:00 2001 From: Martin Fenner Date: Sun, 23 Jun 2019 21:22:38 +0200 Subject: [PATCH] more background jobs for indexing crossref dois. datacite/levriero#56 --- app/jobs/crossref_doi_by_id_job.rb | 62 ++++++++++++++++++++++++++++++ app/jobs/crossref_doi_job.rb | 61 ++--------------------------- app/jobs/event_import_by_id_job.rb | 2 +- app/models/event.rb | 24 +++++------- lib/tasks/event.rake | 14 +++---- 5 files changed, 82 insertions(+), 81 deletions(-) create mode 100644 app/jobs/crossref_doi_by_id_job.rb diff --git a/app/jobs/crossref_doi_by_id_job.rb b/app/jobs/crossref_doi_by_id_job.rb new file mode 100644 index 000000000..122e94c23 --- /dev/null +++ b/app/jobs/crossref_doi_by_id_job.rb @@ -0,0 +1,62 @@ +class CrossrefDoiByIdJob < ActiveJob::Base + queue_as :lupo_background + + # retry_on ActiveRecord::Deadlocked, wait: 10.seconds, attempts: 3 + # retry_on Faraday::TimeoutError, wait: 10.minutes, attempts: 3 + + # discard_on ActiveJob::DeserializationError + + def perform(id) + logger = Logger.new(STDOUT) + + doi = doi_from_url(id) + return {} unless doi.present? + + # check whether DOI has been registered with DataCite already + result = Doi.find_by_id(doi).results.first + return {} unless result.blank? + + # otherwise store Crossref metadata with DataCite + # using client crossref.citations and DataCite XML + xml = Base64.strict_encode64(id) + attributes = { + "xml" => xml, + "source" => "levriero", + "event" => "publish" }.compact + + data = { + "data" => { + "type" => "dois", + "attributes" => attributes, + "relationships" => { + "client" => { + "data" => { + "type" => "clients", + "id" => "crossref.citations" + } + } + } + } + } + + url = "http://localhost/dois/#{doi}" + response = Maremma.put(url, accept: 'application/vnd.api+json', + content_type: 'application/vnd.api+json', + data: data.to_json, + username: ENV["ADMIN_USERNAME"], + password: ENV["ADMIN_PASSWORD"]) + + if [200, 201].include?(response.status) + logger.info "DOI #{doi} created." + else + logger.warn response.body["errors"] + end + end + + def doi_from_url(url) + if /\A(?:(http|https):\/\/(dx\.)?(doi.org|handle.test.datacite.org)\/)?(doi:)?(10\.\d{4,5}\/.+)\z/.match(url) + uri = Addressable::URI.parse(url) + uri.path.gsub(/^\//, '').downcase + end + end +end \ No newline at end of file diff --git a/app/jobs/crossref_doi_job.rb b/app/jobs/crossref_doi_job.rb index 35acaf960..caf5f6bac 100644 --- a/app/jobs/crossref_doi_job.rb +++ b/app/jobs/crossref_doi_job.rb @@ -1,62 +1,7 @@ class CrossrefDoiJob < ActiveJob::Base queue_as :lupo_background - # retry_on ActiveRecord::Deadlocked, wait: 10.seconds, attempts: 3 - # retry_on Faraday::TimeoutError, wait: 10.minutes, attempts: 3 - - # discard_on ActiveJob::DeserializationError - - def perform(id) - logger = Logger.new(STDOUT) - - doi = doi_from_url(id) - return {} unless doi.present? - - # check whether DOI has been registered with DataCite already - result = Doi.find_by_id(doi).results.first - return {} unless result.blank? - - # otherwise store Crossref metadata with DataCite - # using client crossref.citations and DataCite XML - xml = Base64.strict_encode64(id) - attributes = { - "xml" => xml, - "source" => "levriero", - "event" => "publish" }.compact - - data = { - "data" => { - "type" => "dois", - "attributes" => attributes, - "relationships" => { - "client" => { - "data" => { - "type" => "clients", - "id" => "crossref.citations" - } - } - } - } - } - - url = "http://localhost/dois/#{doi}" - response = Maremma.put(url, accept: 'application/vnd.api+json', - content_type: 'application/vnd.api+json', - data: data.to_json, - username: ENV["ADMIN_USERNAME"], - password: ENV["ADMIN_PASSWORD"]) - - if [200, 201].include?(response.status) - logger.info "DOI #{doi} created." - else - logger.warn response.body["errors"] - end - end - - def doi_from_url(url) - if /\A(?:(http|https):\/\/(dx\.)?(doi.org|handle.test.datacite.org)\/)?(doi:)?(10\.\d{4,5}\/.+)\z/.match(url) - uri = Addressable::URI.parse(url) - uri.path.gsub(/^\//, '').downcase - end + def perform(ids) + ids.each { |id| CrossrefDoiByIdJob.perform_later(id) } end -end \ No newline at end of file +end diff --git a/app/jobs/event_import_by_id_job.rb b/app/jobs/event_import_by_id_job.rb index 4f7fc8334..3c17264d9 100644 --- a/app/jobs/event_import_by_id_job.rb +++ b/app/jobs/event_import_by_id_job.rb @@ -4,4 +4,4 @@ class EventImportByIdJob < ActiveJob::Base def perform(options={}) Event.import_by_id(options) end -end \ No newline at end of file +end diff --git a/app/models/event.rb b/app/models/event.rb index c4b8384c7..bf1933119 100644 --- a/app/models/event.rb +++ b/app/models/event.rb @@ -283,14 +283,13 @@ def self.update_crossref(options={}) logger = Logger.new(STDOUT) size = (options[:size] || 1000).to_i + cursor = (options[:cursor] || 0).to_i - response = Event.query(nil, source_id: "crossref", page: { size: 1, cursor: 0 }) + response = Event.query(nil, source_id: "crossref", page: { size: 1, cursor: cursor }) logger.info "[Update] #{response.results.total} events for source crossref." + # walk through results using cursor if response.results.total > 0 - # walk through results using cursor - cursor = 0 - while response.results.results.length > 0 do response = Event.query(nil, source_id: "crossref", page: { size: size, cursor: cursor }) break unless response.results.results.length > 0 @@ -298,9 +297,8 @@ def self.update_crossref(options={}) logger.info "[Update] Updating #{response.results.results.length} crossref events starting with _id #{cursor + 1}." cursor = response.results.to_a.last[:sort].first.to_i - response.results.results.each do |e| - CrossrefDoiJob.perform_later(e.subj_id) - end + dois = response.results.results.map(&:subj_id).uniq + CrossrefDoiJob.perform_later(dois) end end @@ -311,14 +309,13 @@ def self.update_datacite_crossref(options={}) logger = Logger.new(STDOUT) size = (options[:size] || 1000).to_i + cursor = (options[:cursor] || 0).to_i - response = Event.query(nil, source_id: "datacite-crossref", page: { size: 1, cursor: 0 }) + response = Event.query(nil, source_id: "datacite-crossref", page: { size: 1, cursor: cursor }) logger.info "[Update] #{response.results.total} events for source datacite-crossref." + # walk through results using cursor if response.results.total > 0 - # walk through results using cursor - cursor = 0 - while response.results.results.length > 0 do response = Event.query(nil, source_id: "datacite-crossref", page: { size: size, cursor: cursor }) break unless response.results.results.length > 0 @@ -326,9 +323,8 @@ def self.update_datacite_crossref(options={}) logger.info "[Update] Updating #{response.results.results.length} datacite-crossref events starting with _id #{cursor + 1}." cursor = response.results.to_a.last[:sort].first.to_i - response.results.results.each do |e| - CrossrefDoiJob.perform_later(e.obj_id) - end + dois = response.results.results.map(&:obj_id).uniq + CrossrefDoiJob.perform_later(dois) end end diff --git a/lib/tasks/event.rake b/lib/tasks/event.rake index 049eb902d..b0ca30fd7 100644 --- a/lib/tasks/event.rake +++ b/lib/tasks/event.rake @@ -24,21 +24,19 @@ namespace :event do end namespace :crossref do - desc 'Import dois for all events' + desc 'Import crossref dois for all events' task :import_doi => :environment do - from_id = (ENV['FROM_ID'] || 1).to_i - until_id = (ENV['UNTIL_ID'] || Event.maximum(:id)).to_i + cursor = (ENV['CURSOR'] || Event.minimum(:id)).to_i - Event.update_crossref(from_id: from_id, until_id: until_id) + Event.update_crossref(cursor: cursor) end end namespace :datacite_crossref do - desc 'Import dois for all events' + desc 'Import crossref dois for all events' task :import_doi => :environment do - from_id = (ENV['FROM_ID'] || 1).to_i - until_id = (ENV['UNTIL_ID'] || Event.maximum(:id)).to_i + cursor = (ENV['CURSOR'] || Event.minimum(:id)).to_i - Event.update_datacite_crossref(from_id: from_id, until_id: until_id) + Event.update_datacite_crossref(cursor: cursor) end end