Skip to content

Commit

Permalink
refactor bulk import. #690
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin Fenner committed Dec 24, 2020
1 parent e77ab6a commit 409b7e0
Show file tree
Hide file tree
Showing 20 changed files with 237 additions and 269 deletions.
14 changes: 0 additions & 14 deletions app/jobs/datacite_doi_import_by_id_job.rb

This file was deleted.

9 changes: 9 additions & 0 deletions app/jobs/datacite_doi_import_in_bulk_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# frozen_string_literal: true

class DataciteDoiImportInBulkJob < ApplicationJob
queue_as :lupo_import

def perform(dois, options = {})
DataciteDoi.import_in_bulk(dois, options)
end
end
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# frozen_string_literal: true

class DoiNotIndexedJob < ApplicationJob
class DoiImportByClientJob < ApplicationJob
queue_as :lupo_background

def perform(client_id, _options = {})
Doi.import_by_client(client_id: client_id)
DataciteDoi.import_by_client(client_id)
end
end
3 changes: 2 additions & 1 deletion app/jobs/index_background_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class IndexBackgroundJob < ApplicationJob
end

def perform(obj)
obj.__elasticsearch__.index_document
response = obj.__elasticsearch__.index_document
Rails.logger.error "[Elasticsearch] Error indexing id #{response["_id"]} in index #{response["_index"]}" if response["result"] != "created"
end
end
3 changes: 2 additions & 1 deletion app/jobs/index_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class IndexJob < ApplicationJob
end

def perform(obj)
obj.__elasticsearch__.index_document
response = obj.__elasticsearch__.index_document
Rails.logger.error "[Elasticsearch] Error indexing id #{response["_id"]} in index #{response["_index"]}" if response["result"] != "created"
end
end
14 changes: 0 additions & 14 deletions app/jobs/other_doi_import_by_id_job.rb

This file was deleted.

9 changes: 9 additions & 0 deletions app/jobs/other_doi_import_in_bulk_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# frozen_string_literal: true

class DataciteDoiImportInBulkJob < ApplicationJob
queue_as :lupo_import_other_doi

def perform(dois, options = {})
OtherDoi.import_in_bulk(dois, options)
end
end
4 changes: 2 additions & 2 deletions app/models/activity.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def self.import_by_ids(options = {})
from_id = (options[:from_id] || Activity.minimum(:id)).to_i
until_id = (options[:until_id] || Activity.maximum(:id)).to_i

# get every id between from_id and end_id
# get every id between from_id and until_id
(from_id..until_id).step(500).each do |id|
ActivityImportByIdJob.perform_later(options.merge(id: id))
end
Expand Down Expand Up @@ -163,7 +163,7 @@ def self.convert_affiliations(options = {})
from_id = (options[:from_id] || Doi.minimum(:id)).to_i
until_id = (options[:until_id] || Doi.maximum(:id)).to_i

# get every id between from_id and end_id
# get every id between from_id and until_id
(from_id..until_id).step(500).each do |id|
ActivityConvertAffiliationByIdJob.perform_later(options.merge(id: id))
unless Rails.env.test?
Expand Down
20 changes: 16 additions & 4 deletions app/models/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,9 @@ def as_indexed_json(options = {})
"password" => password,
"cache_key" => cache_key,
"client_type" => client_type,
"created" => created,
"updated" => updated,
"deleted_at" => deleted_at,
"created" => created.try(:iso8601),
"updated" => updated.try(:iso8601),
"deleted_at" => deleted_at.try(:iso8601),
"cumulative_years" => cumulative_years,
"provider" =>
if options[:exclude_associations]
Expand Down Expand Up @@ -712,14 +712,26 @@ def self.export_doi_counts(query: nil)
csv.join("")
end

def self.import_dois(client_id)
client = self.where(deleted_at: nil).where(symbol: client_id).first
if client.nil?
logger.error "Client not found for client ID #{client_id}."
exit
end

# import DOIs for client
logger.info "#{client.dois.length} DOIs will be imported."
DoiImportByClientJob.perform_later(client.id)
end

# import all DOIs not indexed in Elasticsearch
def self.import_dois_not_indexed(query: nil)
table = CSV.parse(export_doi_counts(query: query), headers: true)

# loop through repositories that have DOIs not indexed in Elasticsearch
table.each do |row|
Rails.logger.info "Started to import #{row["DOIs in Database"]} DOIs (#{row["DOIs missing"]} missing) for repository #{row["Repository ID"]}."
DoiNotIndexedJob.perform_later(client_id: row["Repository ID"])
DoiImportByClientJob.perform_later(row["Repository ID"])
end
end

Expand Down
122 changes: 61 additions & 61 deletions app/models/datacite_doi.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ class DataciteDoi < Doi

# TODO remove query for type once STI is enabled
def self.import_by_ids(options = {})
index =
if Rails.env.test?
index_name
elsif options[:index].present?
options[:index]
else
inactive_index
end
from_id =
(options[:from_id] || DataciteDoi.where(type: "DataciteDoi").minimum(:id)).
to_i
Expand All @@ -31,25 +39,37 @@ def self.import_by_ids(options = {})
DataciteDoi.where(type: "DataciteDoi").maximum(:id)
).
to_i
count = 0

# get every id between from_id and end_id
(from_id..until_id).step(500).each do |id|
DataciteDoiImportByIdJob.perform_later(options.merge(id: id))
unless Rails.env.test?
Rails.
logger.info "Queued importing for DataCite DOIs with IDs starting with #{
id
}."
# TODO remove query for type once STI is enabled
# SQS message size limit is 256 kB, up to 2 GB with S3
DataciteDoi.where(type: "DataciteDoi").where(id: from_id..until_id).
find_in_batches(batch_size: 100) do |dois|
mapped_dois = dois.map do |doi|
{ "id" => doi.id, "as_indexed_json" => doi.as_indexed_json }
end
DataciteDoiImportInBulkJob.perform_later(mapped_dois, index: index)
count += dois.length
end

(from_id..until_id).to_a.length
logger.info "Queued importing for DataCite DOIs with IDs #{from_id}-#{until_id}."
count
end

def self.import_by_id(options = {})
return nil if options[:id].blank?
def self.import_by_client(client_id)
return nil if client_id.blank?

# TODO remove query for type once STI is enabled
DataciteDoi.where(type: "DataciteDoi").where(datacentre: client_id).
find_in_batches(batch_size: 250) do |dois|
mapped_dois = dois.map do |doi|
{ "id" => doi.id, "as_indexed_json" => doi.as_indexed_json }
end
DataciteDoiImportInBulkJob.perform_later(mapped_dois, index: self.active_index)
end
end

id = options[:id].to_i
def self.import_in_bulk(dois, options = {})
index =
if Rails.env.test?
index_name
Expand All @@ -61,68 +81,48 @@ def self.import_by_id(options = {})
errors = 0
count = 0

# TODO remove query for type once STI is enabled
DataciteDoi.where(type: "DataciteDoi").where(id: id..(id + 499)).
find_in_batches(batch_size: 500) do |dois|
response =
DataciteDoi.__elasticsearch__.client.bulk index: index,
type:
DataciteDoi.document_type,
body:
dois.map { |doi|
{
index: {
_id: doi.id,
data:
doi.as_indexed_json,
},
}
response =
DataciteDoi.__elasticsearch__.client.bulk index: index,
type:
DataciteDoi.document_type,
body:
dois.map { |doi|
{
index: {
_id: doi["id"],
data:
doi["as_indexed_json"],
},
}

# try to handle errors
errors_in_response =
response["items"].select { |k, _v| k.values.first["error"].present? }
errors += errors_in_response.length
errors_in_response.each do |item|
Rails.logger.error "[Elasticsearch] " + item.inspect
doi_id = item.dig("index", "_id").to_i
import_one(doi_id: doi_id) if doi_id > 0
end

count += dois.length
}

# report errors
errors_in_response =
response["items"].select { |k, _v| k.values.first["error"].present? }
errors += errors_in_response.length
errors_in_response.each do |item|
Rails.logger.error "[Elasticsearch] " + item.inspect
end

count += dois.length

if errors > 1
Rails.logger.error "[Elasticsearch] #{errors} errors importing #{
count
} DataCite DOIs with IDs #{id} - #{id + 499}."
} DataCite DOIs."
elsif count > 0
Rails.logger.info "[Elasticsearch] Imported #{
count
} DataCite DOIs with IDs #{id} - #{id + 499}."
} DataCite DOIs."
end

count
rescue Elasticsearch::Transport::Transport::Errors::RequestEntityTooLarge,
Faraday::ConnectionFailed,
ActiveRecord::LockWaitTimeout => e
Rails.logger.info "[Elasticsearch] Error #{
e.message
} importing DataCite DOIs with IDs #{id} - #{id + 499}."
Faraday::ConnectionFailed,
ActiveRecord::LockWaitTimeout => e

count = 0

# TODO remove query for type once STI is enabled
DataciteDoi.where(type: "DataciteDoi").where(id: id..(id + 499)).
find_each do |doi|
IndexJob.perform_later(doi)
count += 1
end

Rails.logger.info "[Elasticsearch] Imported #{
count
} DataCite DOIs with IDs #{id} - #{id + 499}."

count
Rails.logger.error "[Elasticsearch] Error #{
e.message
} importing DataCite DOIs."
end
end
23 changes: 7 additions & 16 deletions app/models/doi.rb
Original file line number Diff line number Diff line change
Expand Up @@ -528,10 +528,10 @@ def as_indexed_json(_options = {})
"reason" => reason,
"source" => source,
"cache_key" => cache_key,
"registered" => registered,
"created" => created,
"updated" => updated,
"published" => published,
"registered" => registered.try(:iso8601),
"created" => created.try(:iso8601),
"updated" => updated.try(:iso8601),
"published" => published.try(:iso8601),
"client" => client.try(:as_indexed_json, exclude_associations: true),
"provider" => provider.try(:as_indexed_json, exclude_associations: true),
"resource_type" => resource_type.try(:as_indexed_json),
Expand Down Expand Up @@ -1248,15 +1248,6 @@ def self.import_one(doi_id: nil, id: nil)
message
end

def self.import_by_client(client_id: nil)
client = ::Client.where(symbol: client_id).first
return nil if client.blank?

Doi.where(datacentre: client.id).find_each do |doi|
IndexJob.perform_later(doi)
end
end

def uid
doi.downcase
end
Expand Down Expand Up @@ -1404,7 +1395,7 @@ def self.convert_affiliations(options = {})
from_id = (options[:from_id] || Doi.minimum(:id)).to_i
until_id = (options[:until_id] || Doi.maximum(:id)).to_i

# get every id between from_id and end_id
# get every id between from_id and until_id
(from_id..until_id).step(500).each do |id|
DoiConvertAffiliationByIdJob.perform_later(options.merge(id: id))
Rails.logger.info "Queued converting affiliations for DOIs with IDs starting with #{id}." unless Rails.env.test?
Expand Down Expand Up @@ -1508,7 +1499,7 @@ def self.convert_containers(options = {})
from_id = (options[:from_id] || Doi.minimum(:id)).to_i
until_id = (options[:until_id] || Doi.maximum(:id)).to_i

# get every id between from_id and end_id
# get every id between from_id and until_id
(from_id..until_id).step(500).each do |id|
DoiConvertContainerByIdJob.perform_later(options.merge(id: id))
Rails.logger.info "Queued converting containers for DOIs with IDs starting with #{id}." unless Rails.env.test?
Expand Down Expand Up @@ -2231,7 +2222,7 @@ def self.add_index_type(options = {})
from_id = options[:from_id].to_i
until_id = (options[:until_id] || (from_id + 499)).to_i

# get every id between from_id and end_id
# get every id between from_id and until_id
count = 0

Rails.logger.info "[migration_index_types] adding type information for DOIs with IDs #{from_id} - #{until_id}."
Expand Down
Loading

0 comments on commit 409b7e0

Please sign in to comment.