Skip to content

Commit

Permalink
Merge pull request #195 from datacite/add-events-to-shoryuken-config
Browse files Browse the repository at this point in the history
Process event creation through SQS queue
  • Loading branch information
wendelfabianchinsamy authored Jan 23, 2025
2 parents 2d93ff3 + 6f0cc9e commit d0f0598
Show file tree
Hide file tree
Showing 24 changed files with 508 additions and 798 deletions.
59 changes: 23 additions & 36 deletions app/models/affiliation_identifier.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
class AffiliationIdentifier < Base
LICENSE = "https://creativecommons.org/publicdomain/zero/1.0/".freeze

include Queueable

def self.import_by_month(options = {})
from_date = (options[:from_date].present? ? Date.parse(options[:from_date]) : Date.current).beginning_of_month
until_date = (options[:until_date].present? ? Date.parse(options[:until_date]) : Date.current).end_of_month
Expand Down Expand Up @@ -102,43 +104,28 @@ def self.push_item(item)

# there can be one or more affiliation_identifier per DOI
Array.wrap(push_items).each do |iiitem|
# send to DataCite Event Data API
if ENV["STAFF_ADMIN_TOKEN"].present?
push_url = "#{ENV['LAGOTTINO_URL']}/events"

data = {
"data" => {
"type" => "events",
"attributes" => {
"messageAction" => iiitem["message_action"],
"subjId" => iiitem["subj_id"],
"objId" => iiitem["obj_id"],
"relationTypeId" => iiitem["relation_type_id"].to_s.dasherize,
"sourceId" => iiitem["source_id"].to_s.dasherize,
"sourceToken" => iiitem["source_token"],
"occurredAt" => iiitem["occurred_at"],
"timestamp" => iiitem["timestamp"],
"license" => iiitem["license"],
"subj" => iiitem["subj"],
"obj" => iiitem["obj"],
},
data = {
"data" => {
"type" => "events",
"attributes" => {
"messageAction" => iiitem["message_action"],
"subjId" => iiitem["subj_id"],
"objId" => iiitem["obj_id"],
"relationTypeId" => iiitem["relation_type_id"].to_s.dasherize,
"sourceId" => iiitem["source_id"].to_s.dasherize,
"sourceToken" => iiitem["source_token"],
"occurredAt" => iiitem["occurred_at"],
"timestamp" => iiitem["timestamp"],
"license" => iiitem["license"],
"subj" => iiitem["subj"],
"obj" => iiitem["obj"],
},
}

response = Maremma.post(push_url, data: data.to_json,
bearer: ENV["STAFF_ADMIN_TOKEN"],
content_type: "application/vnd.api+json",
accept: "application/vnd.api+json; version=2")

if [200, 201].include?(response.status)
Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} pushed to Event Data service."
elsif response.status == 409
Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} already pushed to Event Data service."
elsif response.body["errors"].present?
Rails.logger.error "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} had an error: #{response.body['errors']}"
Rails.logger.error data.inspect
end
end
},
}

send_event_import_message(data)

Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} sent to the events queue."
end

push_items.length
Expand Down
15 changes: 5 additions & 10 deletions app/models/concerns/importable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,10 @@ def import_from_api
def parse_record(sqs_msg: nil, data: nil)
id = "https://doi.org/#{data['id']}"
response = get_datacite_json(id)
related_identifiers = Array.wrap(response.fetch("relatedIdentifiers",
nil)).select do |r|
["DOI", "URL"].include?(r["relatedIdentifierType"])

related_identifiers = Array.wrap(
response.fetch("relatedIdentifiers", nil)).select do |r|
["DOI", "URL"].include?(r["relatedIdentifierType"])
end

if related_identifiers.any? { |r| r["relatedIdentifierType"] == "DOI" }
Expand All @@ -244,6 +245,7 @@ def parse_record(sqs_msg: nil, data: nil)
nil)).select do |f|
f.fetch("funderIdentifierType", nil) == "Crossref Funder ID"
end

if funding_references.present?
item = {
"doi" => data["id"],
Expand Down Expand Up @@ -304,13 +306,6 @@ def parse_record(sqs_msg: nil, data: nil)
OrcidAffiliation.push_item(item)
end

Rails.logger.info "[Event Data] #{related_identifiers.length} related_identifiers found for DOI #{data['id']}" if related_identifiers.present?
Rails.logger.info "[Event Data] #{name_identifiers.length} name_identifiers found for DOI #{data['id']}" if name_identifiers.present?
Rails.logger.info "[Event Data] #{affiliation_identifiers.length} affiliation_identifiers found for DOI #{data['id']}" if affiliation_identifiers.present?
Rails.logger.info "[Event Data] #{orcid_affiliation.length} orcid_affiliations found for DOI #{data['id']}" if affiliation_identifiers.present?
Rails.logger.info "[Event Data] #{funding_references.length} funding_references found for DOI #{data['id']}" if funding_references.present?
Rails.logger.info "No events found for DOI #{data['id']}" if related_identifiers.blank? && name_identifiers.blank? && funding_references.blank? && affiliation_identifiers.blank?

related_identifiers + name_identifiers + funding_references + affiliation_identifiers + orcid_affiliation
end

Expand Down
40 changes: 40 additions & 0 deletions app/models/concerns/queueable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
module Queueable
extend ActiveSupport::Concern

require "aws-sdk-sqs"

class_methods do
def send_event_import_message(data)
send_message(data, shoryuken_class: "EventImportWorker", queue_name: "events")
end

private

def send_message(body, options = {})
sqs = get_sqs_client
queue_name_prefix = ENV["SQS_PREFIX"].present? ? ENV["SQS_PREFIX"] : Rails.env
queue_url = sqs.get_queue_url(queue_name: "#{queue_name_prefix}_#{options[:queue_name]}").queue_url

options = {
queue_url: queue_url,
message_attributes: {
"shoryuken_class" => {
string_value: options[:shoryuken_class],
data_type: "String"
},
},
message_body: body.to_json,
}

sqs.send_message(options)
end

def get_sqs_client()
if Rails.env.development?
Aws::SQS::Client.new(endpoint: ENV["AWS_ENDPOINT"])
else
Aws::SQS::Client.new
end
end
end
end
59 changes: 23 additions & 36 deletions app/models/funder_identifier.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
class FunderIdentifier < Base
LICENSE = "https://creativecommons.org/publicdomain/zero/1.0/".freeze

include Queueable

def self.import_by_month(options = {})
from_date = (options[:from_date].present? ? Date.parse(options[:from_date]) : Date.current).beginning_of_month
until_date = (options[:until_date].present? ? Date.parse(options[:until_date]) : Date.current).end_of_month
Expand Down Expand Up @@ -92,43 +94,28 @@ def self.push_item(item)

# there can be one or more funder_identifier per DOI
Array.wrap(push_items).each do |iiitem|
# send to DataCite Event Data Query API
if ENV["STAFF_ADMIN_TOKEN"].present?
push_url = "#{ENV['LAGOTTINO_URL']}/events"

data = {
"data" => {
"type" => "events",
"attributes" => {
"messageAction" => iiitem["message_action"],
"subjId" => iiitem["subj_id"],
"objId" => iiitem["obj_id"],
"relationTypeId" => iiitem["relation_type_id"].to_s.dasherize,
"sourceId" => iiitem["source_id"].to_s.dasherize,
"sourceToken" => iiitem["source_token"],
"occurredAt" => iiitem["occurred_at"],
"timestamp" => iiitem["timestamp"],
"license" => iiitem["license"],
"subj" => iiitem["subj"],
"obj" => iiitem["obj"],
},
data = {
"data" => {
"type" => "events",
"attributes" => {
"messageAction" => iiitem["message_action"],
"subjId" => iiitem["subj_id"],
"objId" => iiitem["obj_id"],
"relationTypeId" => iiitem["relation_type_id"].to_s.dasherize,
"sourceId" => iiitem["source_id"].to_s.dasherize,
"sourceToken" => iiitem["source_token"],
"occurredAt" => iiitem["occurred_at"],
"timestamp" => iiitem["timestamp"],
"license" => iiitem["license"],
"subj" => iiitem["subj"],
"obj" => iiitem["obj"],
},
}

response = Maremma.post(push_url, data: data.to_json,
bearer: ENV["STAFF_ADMIN_TOKEN"],
content_type: "application/vnd.api+json",
accept: "application/vnd.api+json; version=2")

if [200, 201].include?(response.status)
Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} pushed to Event Data service."
elsif response.status == 409
Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} already pushed to Event Data service."
elsif response.body["errors"].present?
Rails.logger.error "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} had an error: #{response.body['errors']}"
Rails.logger.error data.inspect
end
end
},
}

send_event_import_message(data)

Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} sent to the events queue."
end

push_items.length
Expand Down
55 changes: 21 additions & 34 deletions app/models/name_identifier.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
class NameIdentifier < Base
LICENSE = "https://creativecommons.org/publicdomain/zero/1.0/".freeze

include Queueable

def self.import_by_month(options = {})
from_date = (options[:from_date].present? ? Date.parse(options[:from_date]) : Date.current).beginning_of_month
until_date = (options[:until_date].present? ? Date.parse(options[:until_date]) : Date.current).end_of_month
Expand Down Expand Up @@ -113,43 +115,28 @@ def self.push_item(item)

# there can be one or more name_identifier per DOI
Array.wrap(push_items).each do |iiitem|
# send to DataCite Event Data API
if ENV["STAFF_ADMIN_TOKEN"].present?
push_url = "#{ENV['LAGOTTINO_URL']}/events"

data = {
"data" => {
"type" => "events",
"attributes" => {
"messageAction" => iiitem["message_action"],
"subjId" => iiitem["subj_id"],
"objId" => iiitem["obj_id"],
"relationTypeId" => iiitem["relation_type_id"].to_s.dasherize,
"sourceId" => iiitem["source_id"].to_s.dasherize,
"sourceToken" => iiitem["source_token"],
"occurredAt" => iiitem["occurred_at"],
"timestamp" => iiitem["timestamp"],
"license" => iiitem["license"],
"subj" => iiitem["subj"],
"obj" => iiitem["obj"],
},
data = {
"data" => {
"type" => "events",
"attributes" => {
"messageAction" => iiitem["message_action"],
"subjId" => iiitem["subj_id"],
"objId" => iiitem["obj_id"],
"relationTypeId" => iiitem["relation_type_id"].to_s.dasherize,
"sourceId" => iiitem["source_id"].to_s.dasherize,
"sourceToken" => iiitem["source_token"],
"occurredAt" => iiitem["occurred_at"],
"timestamp" => iiitem["timestamp"],
"license" => iiitem["license"],
"subj" => iiitem["subj"],
"obj" => iiitem["obj"],
},
}
},
}

response = Maremma.post(push_url, data: data.to_json,
bearer: ENV["STAFF_ADMIN_TOKEN"],
content_type: "application/vnd.api+json",
accept: "application/vnd.api+json; version=2")
send_event_import_message(data)

if [200, 201].include?(response.status)
Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} pushed to Event Data service."
elsif response.status == 409
Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} already pushed to Event Data service."
elsif response.body["errors"].present?
Rails.logger.error "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} had an error: #{response.body['errors']}"
Rails.logger.error data.inspect
end
end
Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} sent to the events queue."

# send to Profiles service, which then pushes to ORCID
if ENV["STAFF_PROFILES_ADMIN_TOKEN"].present?
Expand Down
62 changes: 24 additions & 38 deletions app/models/orcid_affiliation.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
class OrcidAffiliation < Base
LICENSE = "https://creativecommons.org/publicdomain/zero/1.0/".freeze

include Queueable

def self.import_by_month(options = {})
from_date = (options[:from_date].present? ? Date.parse(options[:from_date]) : Date.current).beginning_of_month
until_date = (options[:until_date].present? ? Date.parse(options[:until_date]) : Date.current).end_of_month
Expand Down Expand Up @@ -51,8 +53,7 @@ def push_data(result, _options = {})

def self.push_item(item)
attributes = item.fetch("attributes", {})
related_identifiers = Array.wrap(attributes.fetch("relatedIdentifiers",
nil))
related_identifiers = Array.wrap(attributes.fetch("relatedIdentifiers", nil))
skip_doi = related_identifiers.any? do |related_identifier|
["IsIdenticalTo", "IsPartOf", "IsPreviousVersionOf",
"IsVersionOf"].include?(related_identifier["relatedIdentifierType"])
Expand Down Expand Up @@ -107,43 +108,28 @@ def self.push_item(item)

# there can be one or more affiliation_identifier per DOI
Array.wrap(push_items).each do |iiitem|
# send to DataCite Event Data API
if ENV["STAFF_ADMIN_TOKEN"].present?
push_url = "#{ENV['LAGOTTINO_URL']}/events"

data = {
"data" => {
"type" => "events",
"attributes" => {
"messageAction" => iiitem["message_action"],
"subjId" => iiitem["subj_id"],
"objId" => iiitem["obj_id"],
"relationTypeId" => iiitem["relation_type_id"].to_s.dasherize,
"sourceId" => iiitem["source_id"].to_s.dasherize,
"sourceToken" => iiitem["source_token"],
"occurredAt" => iiitem["occurred_at"],
"timestamp" => iiitem["timestamp"],
"license" => iiitem["license"],
"subj" => iiitem["subj"],
"obj" => iiitem["obj"],
},
data = {
"data" => {
"type" => "events",
"attributes" => {
"messageAction" => iiitem["message_action"],
"subjId" => iiitem["subj_id"],
"objId" => iiitem["obj_id"],
"relationTypeId" => iiitem["relation_type_id"].to_s.dasherize,
"sourceId" => iiitem["source_id"].to_s.dasherize,
"sourceToken" => iiitem["source_token"],
"occurredAt" => iiitem["occurred_at"],
"timestamp" => iiitem["timestamp"],
"license" => iiitem["license"],
"subj" => iiitem["subj"],
"obj" => iiitem["obj"],
},
}

response = Maremma.post(push_url, data: data.to_json,
bearer: ENV["STAFF_ADMIN_TOKEN"],
content_type: "application/vnd.api+json",
accept: "application/vnd.api+json; version=2")

if [200, 201].include?(response.status)
Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} pushed to Event Data service."
elsif response.status == 409
Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} already pushed to Event Data service."
elsif response.body["errors"].present?
Rails.logger.error "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} had an error: #{response.body['errors']}"
Rails.logger.error data.inspect
end
end
},
}

send_event_import_message(data)

Rails.logger.info "[Event Data] #{iiitem['subj_id']} #{iiitem['relation_type_id']} #{iiitem['obj_id']} sent to the events queue."
end

total_push_items += push_items
Expand Down
Loading

0 comments on commit d0f0598

Please sign in to comment.