Skip to content

Commit

Permalink
Merge pull request #32 from datacite/refactor_usageupdate
Browse files Browse the repository at this point in the history
Refactor usageupdate
  • Loading branch information
kjgarza authored Sep 24, 2018
2 parents c31c0ac + 9b39e2c commit 106cb9f
Show file tree
Hide file tree
Showing 30 changed files with 39,025 additions and 30 deletions.
7 changes: 7 additions & 0 deletions app/jobs/usage_update_import_by_month_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
class UsageUpdateImportByMonthJob < ActiveJob::Base
queue_as :levriero

def perform(options={})
UsageUpdate.import(options)
end
end
18 changes: 18 additions & 0 deletions app/jobs/usage_update_import_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
class UsageUpdateImportJob < ActiveJob::Base
queue_as :levriero

def perform(item, options={})
logger = Logger.new(STDOUT)
logger.info item
response = UsageUpdate.push_item(item, options)
item = JSON.parse(item)
if response.status == 201
logger.info "#{item['subj-id']} #{item['relation-type-id']} #{item['obj-id']} pushed to Event Data service."
elsif response.status == 200
logger.info "#{item['subj-id']} #{item['relation-type-id']} #{item['obj-id']} pushed to Event Data service for update."
elsif response.body["errors"].present?
logger.info "#{item['subj-id']} #{item['relation-type-id']} #{item['obj-id']} had an error:"
logger.info "#{response.body['errors'].first['title']}"
end
end
end
20 changes: 20 additions & 0 deletions app/jobs/usage_update_parse_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
class UsageUpdateParseJob < ActiveJob::Base
queue_as :levriero

def perform(item, options={})
logger = Logger.new(STDOUT)
response = UsageUpdate.get_data(item, options)
unless response.blank?
data = UsageUpdate.parse_data(response, options)
message = data.respond_to?("each") ? "[Usage Report Parsing] Successfully parsed Report #{item}" : "[Usage Report Parsing] Error parsing Report #{item}"
logger.info message

options.merge(
report_meta:{
report_id: item,
created_by: response.body.dig("data","report","report-header","created-by"),
reporting_period:response.body.dig("data","report","report-header","reporting-period")})
UsageUpdate.push_data(data, options)
end
end
end
42 changes: 12 additions & 30 deletions app/models/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,53 +9,33 @@ class Base
ICON_URL = "https://raw.githubusercontent.com/datacite/toccatore/master/lib/toccatore/images/toccatore.png"

def queue options={}
puts "Queue name has not been specified" unless ENV['ENVIRONMENT'].present?
puts "AWS_REGION has not been specified" unless ENV['AWS_REGION'].present?
Rails.logger.info "Queue name has not been specified" unless ENV['ENVIRONMENT'].present?
Rails.logger.info "AWS_REGION has not been specified" unless ENV['AWS_REGION'].present?
region = ENV['AWS_REGION'] ||= 'eu-west-1'
Aws::SQS::Client.new(region: region.to_s, stub_responses: false)
end

# def get_total options={}
# req = @sqs.get_queue_attributes(
# {
# queue_url: queue_url, attribute_names:
# [
# 'ApproximateNumberOfMessages',
# 'ApproximateNumberOfMessagesNotVisible'
# ]
# }
# )

# msgs_available = req.attributes['ApproximateNumberOfMessages']
# msgs_in_flight = req.attributes['ApproximateNumberOfMessagesNotVisible']
# msgs_available.to_i
# end

def get_message options={}
@sqs.receive_message(queue_url: queue_url, max_number_of_messages: 1, wait_time_seconds: 1)
sqs.receive_message(queue_url: queue_url, max_number_of_messages: 1, wait_time_seconds: 1)
end

def delete_message options={}
return 1 if options.messages.size < 1
reponse = @sqs.delete_message({
def delete_message message
reponse = sqs.delete_message({
queue_url: queue_url,
receipt_handle: options.messages[0][:receipt_handle]
receipt_handle: message[:receipt_handle]
})
if reponse.successful?
puts "Message #{options.messages[0][:receipt_handle]} deleted"
0
Rails.logger.info "Message #{message[:receipt_handle]} deleted"
else
puts "Could NOT delete Message #{options.messages[0][:receipt_handle]}"
1
Rails.logger.info "Could NOT delete Message #{message[:receipt_handle]}"
end

end

def queue_url options={}
options[:queue_name] ||= "#{ENV['ENVIRONMENT']}_usage"
queue_name = options[:queue_name]
# puts "Using #{@sqs.get_queue_url(queue_name: queue_name).queue_url} queue"
@sqs.get_queue_url(queue_name: queue_name).queue_url
# puts "Using #{sqs.get_queue_url(queue_name: queue_name).queue_url} queue"
sqs.get_queue_url(queue_name: queue_name).queue_url
end

def get_query_url(options={})
Expand Down Expand Up @@ -183,6 +163,8 @@ def self.get_datacite_metadata(id)
response = Maremma.get(url)

return {} if response.status != 200

puts doi

attributes = response.body.dig("data", "attributes")

Expand Down
192 changes: 192 additions & 0 deletions app/models/usage_update.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
class UsageUpdate < Base
LICENSE = "https://creativecommons.org/publicdomain/zero/1.0/"
LOGGER = Logger.new(STDOUT)


def self.import(_options={})
usage_update = UsageUpdate.new
usage_update.queue_jobs
end

def source_id
"usage_update"
end


def process_data options={}
messages = get_query_url options
messages.each do |message|
body = JSON.parse(message.body)
report_id = body["report_id"]
UsageUpdateParseJob.perform_later(report_id, options)
delete_message message
end if messages.respond_to?("each")
messages.length
end

def get_query_url _options={}
queue_url = sqs.get_queue_url(queue_name: "#{Rails.env}_usage" ).queue_url
resp = sqs.receive_message(queue_url: queue_url, max_number_of_messages: 5, wait_time_seconds: 1)
resp.messages
end

def self.get_data report_id, _options={}
return OpenStruct.new(body: { "errors" => "No Report given given"}) if report_id.blank?
host = URI.parse(report_id).host.downcase
report = Maremma.get(report_id, timeout: 120, host: host)
report
end

def sqs
sqs = Aws::SQS::Client.new(region: ENV["AWS_REGION"])
sqs
end


def get_total _options={}
queue_url = sqs.get_queue_url(queue_name: "#{Rails.env}_usage" ).queue_url
req = sqs.get_queue_attributes(
{
queue_url: queue_url, attribute_names:
[
'ApproximateNumberOfMessages',
'ApproximateNumberOfMessagesNotVisible'
]
}
)

msgs_available = req.attributes['ApproximateNumberOfMessages']
msgs_available.to_i
end

def queue_jobs(options={})

total = get_total(options)

if total < 1
text = "No works found in the Usage Reports Queue."
end

num_messages = total
while num_messages > 0
queued = process_data(options)
num_messages -= queued
puts num_messages
puts queued
end
text = "#{queued} reports queued out of #{total} for Usage Reports Queue"

LOGGER.info text
# send slack notification
if queued == 0
options[:level] = "warning"
else
options[:level] = "good"
end
options[:title] = "Report for #{source_id}"
send_notification_to_slack(text, options) if options[:slack_webhook_url].present?
queued
end

def self.parse_data report, options={}

return report.body.fetch("errors") if report.body.fetch("errors", nil).present?
return [{ "errors" => { "title" => "The report is blank" }}] if report.body.blank?

items = report.body.dig("data","report","report-datasets")
header = report.body.dig("data","report","report-header")
report_id = report.url

Array.wrap(items).reduce([]) do |x, item|
data = {
doi: item.dig("dataset-id").first.dig("value"),
id: normalize_doi(item.dig("dataset-id").first.dig("value")),
created: header.fetch("created"),
report_id: report.url,
created_at: header.fetch("created")
}
instances = item.dig("performance", 0, "instance")

return x += [OpenStruct.new(body: { "errors" => "There are too many instances in #{data[:doi]} for report #{report_id}. There can only be 4" })] if instances.size > 8

x += Array.wrap(instances).reduce([]) do |ssum, instance|
data[:count] = instance.dig("count")
event_type = "#{instance.dig("metric-type")}-#{instance.dig("access-method")}"
ssum << format_event(event_type, data, options)
ssum
end
end
end

def self.format_event type, data, options={}
fail "Not type given. Report #{data[:report_id]} not proccessed" if type.blank?
fail "Access token missing." if ENV['DATACITE_USAGE_SOURCE_TOKEN'].blank?
fail "Report_id is missing" if data[:report_id].blank?
{ "uuid" => SecureRandom.uuid,
"message-action" => "create",
"subj-id" => data[:report_id],
"subj"=> {
"id"=> data[:report_id],
"issued"=> data[:created]
},
"total"=> data[:count],
"obj-id" => data[:id],
"relation-type-id" => type,
"source-id" => "datacite-usage",
"source-token" =>ENV['DATACITE_USAGE_SOURCE_TOKEN'],
"occurred-at" => data[:created_at],
"license" => LICENSE
}
end

# method returns number of errors
def self.push_data items, options={}
if items.empty?
LOGGER.info "No works found in the Queue."
else
Array.wrap(items).map do |item|
UsageUpdateImportJob.perform_later(item.to_json, options)
end
end
end

def self.push_item item, options={}
item = JSON.parse(item)

if item["subj-id"].blank?
return LOGGER.info OpenStruct.new(body: { "errors" => [{ "title" => "There is no Subject" }] })
elsif ENV['LAGOTTINO_TOKEN'].blank?
return LOGGER.info OpenStruct.new(body: { "errors" => [{ "title" => "Access token missing." }] })
elsif item["errors"].present?
return LOGGER.info OpenStruct.new(body: { "errors" => [{ "title" => "#{item["errors"]["title"]}" }] })
end

obj = cached_datacite_response(item["obj-id"])
subj = options[:report_meta]
push_url = ENV['LAGOTTINO_URL'] + "/events/" + item["uuid"].to_s
data = {
"data" => {
"id" => item["uuid"],
"type" => "events",
"attributes" => {
"message-action" => item["message-action"],
"subj-id" => item["subj-id"],
"obj-id" => item["obj-id"],
"relation-type-id" => item["relation-type-id"].to_s.dasherize,
"source-id" => item["source-id"].to_s.dasherize,
"source-token" => item["source-token"],
"occurred-at" => item["occurred-at"],
"timestamp" => item["timestamp"],
"license" => item["license"],
"subj" => subj,
"obj" => obj } }}

host = URI.parse(push_url).host.downcase
response = Maremma.put(push_url, data: data.to_json,
bearer: ENV['LAGOTTINO_TOKEN'],
content_type: 'json',
host: host)
response
end
end

1 change: 1 addition & 0 deletions config/application.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
ENV['CROSSREF_QUERY_URL'] ||= "https://api.eventdata.crossref.org"
ENV['RE3DATA_URL'] ||= "https://www.re3data.org/api/beta"
ENV['TRUSTED_IP'] ||= "10.0.40.1"
ENV['SASHIMI_QUERY_URL'] ||= "https://api.test.datacite.org"

module Levriero
class Application < Rails::Application
Expand Down
19 changes: 19 additions & 0 deletions lib/tasks/usage_update.rake
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace :usage_update do
desc 'Import all usage_updates by month'
task :import_by_month => :environment do
from_date = ENV['FROM_DATE'] || Date.current.beginning_of_month.strftime("%F")
until_date = ENV['UNTIL_DATE'] || Date.current.end_of_month.strftime("%F")

response = UsageUpdate.import_by_month(from_date: from_date, until_date: until_date)
puts response
end

desc 'Import all usage_updates'
task :import => :environment do
from_date = ENV['FROM_DATE'] || (Date.current - 1.day).strftime("%F")
until_date = ENV['UNTIL_DATE'] || Date.current.strftime("%F")

response = UsageUpdate.import(from_date: from_date, until_date: until_date)
puts "Queued import for #{response} Reports updated from #{from_date} - #{until_date}."
end
end
17 changes: 17 additions & 0 deletions spec/factories/default.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,21 @@
skip_create
initialize_with { new(attributes) }
end

factory :event , class: OpenStruct do
sequence(:uuid) { |n| "#{SecureRandom.uuid}-#{n}" }
message_action { "create" }
sequence(:obj_id) { |n| "#{Faker::Internet.url}#{n}" }
sequence(:subj_id) { |n| "#{Faker::Internet.url}#{n}" }
total {Faker::Number.number(3)}
subj {{
"id" => "#{SecureRandom.uuid}",
"issued" => Faker::Time.between(DateTime.now - 2, DateTime.now),
}}
relation_type_id {["total-dataset-investigations-regular","total-dataset-investigations-machine","unique-dataset-investigations-machine","total-dataset-investigations-machine"].sample}
source_id {"datacite-usage"}
sequence(:source_token) { |n| "#{SecureRandom.uuid}-#{n}" }
occurred_at {Faker::Time.between(DateTime.now - 2, DateTime.now)}
license {"https://creativecommons.org/publicdomain/zero/1.0/"}
end
end
2 changes: 2 additions & 0 deletions spec/fixtures/event_data_resp_1
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
https://api.test.datacite.org/reports/2018-3-Dash total-dataset-investigations-regular https://doi.org/10.7291/d1q94r pushed to Event Data service.
https://api.test.datacite.org/reports/2018-3-Dash unique-dataset-investigations-regular https://doi.org/10.7291/d1q94r pushed to Event Data service.
4 changes: 4 additions & 0 deletions spec/fixtures/event_data_resp_2
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
https://api.test.datacite.org/reports/2018-3-Dash total-dataset-investigations-regular https://doi.org/10.7291/d1q94r had an error:\nthe server responded with status 500/\n
https://api.test.datacite.org/reports/2018-3-Dash unique-dataset-investigations-regular https://doi.org/10.7291/d1q94r had an error:\nthe server responded with status 500/\n
https://api.test.datacite.org/reports/2018-3-Dash Total-Dataset-Requests-Machine https://doi.org/10.6071/z7wc73 had an error:\nthe server responded with status 500/\n
https://api.test.datacite.org/reports/2018-3-Dash Unique-Dataset-Requests-Machine https://doi.org/10.6071/z7wc73 had an error:\nthe server responded with status 500/\n
Loading

0 comments on commit 106cb9f

Please sign in to comment.