Skip to content

Commit

Permalink
more background jobs for indexing crossref dois. datacite/levriero#56
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin Fenner committed Jun 23, 2019
1 parent 12fdb12 commit 7a11da8
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 81 deletions.
62 changes: 62 additions & 0 deletions app/jobs/crossref_doi_by_id_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
class CrossrefDoiByIdJob < ActiveJob::Base
queue_as :lupo_background

# retry_on ActiveRecord::Deadlocked, wait: 10.seconds, attempts: 3
# retry_on Faraday::TimeoutError, wait: 10.minutes, attempts: 3

# discard_on ActiveJob::DeserializationError

def perform(id)
logger = Logger.new(STDOUT)

doi = doi_from_url(id)
return {} unless doi.present?

# check whether DOI has been registered with DataCite already
result = Doi.find_by_id(doi).results.first
return {} unless result.blank?

# otherwise store Crossref metadata with DataCite
# using client crossref.citations and DataCite XML
xml = Base64.strict_encode64(id)
attributes = {
"xml" => xml,
"source" => "levriero",
"event" => "publish" }.compact

data = {
"data" => {
"type" => "dois",
"attributes" => attributes,
"relationships" => {
"client" => {
"data" => {
"type" => "clients",
"id" => "crossref.citations"
}
}
}
}
}

url = "http://localhost/dois/#{doi}"
response = Maremma.put(url, accept: 'application/vnd.api+json',
content_type: 'application/vnd.api+json',
data: data.to_json,
username: ENV["ADMIN_USERNAME"],
password: ENV["ADMIN_PASSWORD"])

if [200, 201].include?(response.status)
logger.info "DOI #{doi} created."
else
logger.warn response.body["errors"]
end
end

def doi_from_url(url)
if /\A(?:(http|https):\/\/(dx\.)?(doi.org|handle.test.datacite.org)\/)?(doi:)?(10\.\d{4,5}\/.+)\z/.match(url)
uri = Addressable::URI.parse(url)
uri.path.gsub(/^\//, '').downcase
end
end
end
61 changes: 3 additions & 58 deletions app/jobs/crossref_doi_job.rb
Original file line number Diff line number Diff line change
@@ -1,62 +1,7 @@
class CrossrefDoiJob < ActiveJob::Base
queue_as :lupo_background

# retry_on ActiveRecord::Deadlocked, wait: 10.seconds, attempts: 3
# retry_on Faraday::TimeoutError, wait: 10.minutes, attempts: 3

# discard_on ActiveJob::DeserializationError

def perform(id)
logger = Logger.new(STDOUT)

doi = doi_from_url(id)
return {} unless doi.present?

# check whether DOI has been registered with DataCite already
result = Doi.find_by_id(doi).results.first
return {} unless result.blank?

# otherwise store Crossref metadata with DataCite
# using client crossref.citations and DataCite XML
xml = Base64.strict_encode64(id)
attributes = {
"xml" => xml,
"source" => "levriero",
"event" => "publish" }.compact

data = {
"data" => {
"type" => "dois",
"attributes" => attributes,
"relationships" => {
"client" => {
"data" => {
"type" => "clients",
"id" => "crossref.citations"
}
}
}
}
}

url = "http://localhost/dois/#{doi}"
response = Maremma.put(url, accept: 'application/vnd.api+json',
content_type: 'application/vnd.api+json',
data: data.to_json,
username: ENV["ADMIN_USERNAME"],
password: ENV["ADMIN_PASSWORD"])

if [200, 201].include?(response.status)
logger.info "DOI #{doi} created."
else
logger.warn response.body["errors"]
end
end

def doi_from_url(url)
if /\A(?:(http|https):\/\/(dx\.)?(doi.org|handle.test.datacite.org)\/)?(doi:)?(10\.\d{4,5}\/.+)\z/.match(url)
uri = Addressable::URI.parse(url)
uri.path.gsub(/^\//, '').downcase
end
def perform(ids)
ids.each { |id| CrossrefDoiByIdJob.perform_later(id) }
end
end
end
2 changes: 1 addition & 1 deletion app/jobs/event_import_by_id_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ class EventImportByIdJob < ActiveJob::Base
def perform(options={})
Event.import_by_id(options)
end
end
end
24 changes: 10 additions & 14 deletions app/models/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -283,24 +283,22 @@ def self.update_crossref(options={})
logger = Logger.new(STDOUT)

size = (options[:size] || 1000).to_i
cursor = (options[:cursor] || 0).to_i

response = Event.query(nil, source_id: "crossref", page: { size: 1, cursor: 0 })
response = Event.query(nil, source_id: "crossref", page: { size: 1, cursor: cursor })
logger.info "[Update] #{response.results.total} events for source crossref."

# walk through results using cursor
if response.results.total > 0
# walk through results using cursor
cursor = 0

while response.results.results.length > 0 do
response = Event.query(nil, source_id: "crossref", page: { size: size, cursor: cursor })
break unless response.results.results.length > 0

logger.info "[Update] Updating #{response.results.results.length} crossref events starting with _id #{cursor + 1}."
cursor = response.results.to_a.last[:sort].first.to_i

response.results.results.each do |e|
CrossrefDoiJob.perform_later(e.subj_id)
end
dois = response.results.results.map(&:subj_id).uniq
CrossrefDoiJob.perform_later(dois)
end
end

Expand All @@ -311,24 +309,22 @@ def self.update_datacite_crossref(options={})
logger = Logger.new(STDOUT)

size = (options[:size] || 1000).to_i
cursor = (options[:cursor] || 0).to_i

response = Event.query(nil, source_id: "datacite-crossref", page: { size: 1, cursor: 0 })
response = Event.query(nil, source_id: "datacite-crossref", page: { size: 1, cursor: cursor })
logger.info "[Update] #{response.results.total} events for source datacite-crossref."

# walk through results using cursor
if response.results.total > 0
# walk through results using cursor
cursor = 0

while response.results.results.length > 0 do
response = Event.query(nil, source_id: "datacite-crossref", page: { size: size, cursor: cursor })
break unless response.results.results.length > 0

logger.info "[Update] Updating #{response.results.results.length} datacite-crossref events starting with _id #{cursor + 1}."
cursor = response.results.to_a.last[:sort].first.to_i

response.results.results.each do |e|
CrossrefDoiJob.perform_later(e.obj_id)
end
dois = response.results.results.map(&:obj_id).uniq
CrossrefDoiJob.perform_later(dois)
end
end

Expand Down
14 changes: 6 additions & 8 deletions lib/tasks/event.rake
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,19 @@ namespace :event do
end

namespace :crossref do
desc 'Import dois for all events'
desc 'Import crossref dois for all events'
task :import_doi => :environment do
from_id = (ENV['FROM_ID'] || 1).to_i
until_id = (ENV['UNTIL_ID'] || Event.maximum(:id)).to_i
cursor = (ENV['CURSOR'] || Event.minimum(:id)).to_i

Event.update_crossref(from_id: from_id, until_id: until_id)
Event.update_crossref(cursor: cursor)
end
end

namespace :datacite_crossref do
desc 'Import dois for all events'
desc 'Import crossref dois for all events'
task :import_doi => :environment do
from_id = (ENV['FROM_ID'] || 1).to_i
until_id = (ENV['UNTIL_ID'] || Event.maximum(:id)).to_i
cursor = (ENV['CURSOR'] || Event.minimum(:id)).to_i

Event.update_datacite_crossref(from_id: from_id, until_id: until_id)
Event.update_datacite_crossref(cursor: cursor)
end
end

0 comments on commit 7a11da8

Please sign in to comment.