Skip to content

Commit

Permalink
update elasticsearch index for users in the background
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin Fenner committed Mar 28, 2020
1 parent 57ca6a8 commit e1db2a7
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 3 deletions.
11 changes: 11 additions & 0 deletions app/jobs/user_import_by_id_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
class UserImportByIdJob < ActiveJob::Base
queue_as :volpino

rescue_from ActiveJob::DeserializationError, Elasticsearch::Transport::Transport::Errors::BadRequest do |error|
Rails.logger.error error.message
end

def perform(options={})
User.import_by_id(options)
end
end
68 changes: 67 additions & 1 deletion app/models/user.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class User < ActiveRecord::Base
indexes :id, type: :keyword
indexes :uid, type: :keyword
indexes :name, type: :text, fields: { keyword: { type: "keyword" }, raw: { type: "text", "analyzer": "string_lowercase", "fielddata": true }}
indexes :given_name, type: :text, fields: { keyword: { type: "keyword" }, raw: { type: "text", "analyzer": "string_lowercase", "fielddata": true }}
indexes :given_name, type: :text, fields: { keyword: { type: "keyword" }, raw: { type: "text", "analyzer": "string_lowercase", "fielddata": true }}
indexes :family_name, type: :text, fields: { keyword: { type: "keyword" }, raw: { type: "text", "analyzer": "string_lowercase", "fielddata": true }}
indexes :email, type: :keyword
indexes :github, type: :keyword
Expand Down Expand Up @@ -122,6 +122,72 @@ def self.from_omniauth(auth, options={})
where(provider: options[:provider], uid: options[:uid] || auth.uid).first_or_create
end

def self.import_by_ids(options={})
from_id = (options[:from_id] || User.minimum(:id)).to_i
until_id = (options[:until_id] || User.maximum(:id)).to_i

# get every id between from_id and end_id
(from_id..until_id).step(500).each do |id|
UserImportByIdJob.perform_later(options.merge(id: id))
Rails.logger.info "Queued importing for users with IDs starting with #{id}." unless Rails.env.test?
end

(from_id..until_id).to_a.length
end

def self.import_by_id(options={})
return nil if options[:id].blank?

id = options[:id].to_i
index = if Rails.env.test?
"users-test"
elsif options[:index].present?
options[:index]
else
self.inactive_index
end
errors = 0
count = 0

User.where(id: id..(id + 499)).find_in_batches(batch_size: 500) do |users|
response = User.__elasticsearch__.client.bulk \
index: index,
type: User.document_type,
body: users.map { |user| { index: { _id: user.id, data: user.as_indexed_json } } }

# try to handle errors
response['items'].select { |k, v| k.values.first['error'].present? }.each do |item|
Rails.logger.error "[Elasticsearch] " + item.inspect
id = item.dig("index", "_id").to_i
user = User.where(id: id).first
IndexJob.perform_later(user) if user.present?
end

count += users.length
end

if errors > 1
Rails.logger.error "[Elasticsearch] #{errors} errors importing #{count} users with IDs #{id} - #{(id + 499)}."
elsif count > 0
Rails.logger.info "[Elasticsearch] Imported #{count} users with IDs #{id} - #{(id + 499)}."
end

count
rescue Elasticsearch::Transport::Transport::Errors::RequestEntityTooLarge, Faraday::ConnectionFailed, ActiveRecord::LockWaitTimeout => error
Rails.logger.info "[Elasticsearch] Error #{error.message} importing users with IDs #{id} - #{(id + 499)}."

count = 0

User.where(id: id..(id + 499)).find_each do |user|
IndexJob.perform_later(user)
count += 1
end

Rails.logger.info "[Elasticsearch] Imported #{count} users with IDs #{id} - #{(id + 499)}."

count
end

def queue_user_job
UserJob.perform_later(self)
end
Expand Down
7 changes: 5 additions & 2 deletions lib/tasks/user.rake
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@ namespace :user do
end

desc 'Import all users'
task import: :environment do
User.import(index: User.inactive_index)
task :import => :environment do
from_id = (ENV['FROM_ID'] || User.minimum(:id)).to_i
until_id = (ENV['UNTIL_ID'] || User.maximum(:id)).to_i

User.import_by_ids(from_id: from_id, until_id: until_id, index: ENV["INDEX"] || User.inactive_index)
end

desc "Update all claims counts"
Expand Down

0 comments on commit e1db2a7

Please sign in to comment.