Skip to content

Commit

Permalink
Enqueues information about cloned repos for scheduled actions
Browse files Browse the repository at this point in the history
- Rakefile: extend queue tasks to clone reporting queue
- application/services/summarize_folder.rb: enqueing step includes clone report queue
- infrastructure/messaging/queue.rb: AWS SQS service gateway
  • Loading branch information
soumyaray committed Dec 24, 2017
1 parent 06583ee commit bb856ea
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 31 deletions.
13 changes: 12 additions & 1 deletion .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,19 @@ Metrics/BlockLength:
Enabled: true
Exclude:
- spec/**/*
- Rakefile

Security/YAMLLoad:
Enabled: true
Exclude:
- spec/**/*
- spec/**/*

Style/HashSyntax:
Enabled: true
Exclude:
- Rakefile

Style/SymbolArray:
Enabled: true
Exclude:
- Rakefile
55 changes: 30 additions & 25 deletions Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -127,42 +127,47 @@ namespace :quality do
end
end

namespace :queue do
namespace :queues do
require 'aws-sdk-sqs'

desc "Create SQS queue for Shoryuken"
desc 'Create SQS queue for Shoryuken'
task :create => :config do
sqs = Aws::SQS::Client.new(region: @config.AWS_REGION)

begin
queue = sqs.create_queue(
queue_name: @config.CLONE_QUEUE,
attributes: {
FifoQueue: 'true',
ContentBasedDeduplication: 'true'
}
)

q_url = sqs.get_queue_url(queue_name: @config.CLONE_QUEUE)
puts "Queue created:"
puts "Name: #{@config.CLONE_QUEUE}"
puts "Region: #{@config.AWS_REGION}"
puts "URL: #{q_url.queue_url}"
puts "Environment: #{@app.environment}"
rescue => e
puts "Error creating queue: #{e}"
puts "Environment: #{@app.environment}"
[@config.CLONE_QUEUE, @config.REPORT_QUEUE].each do |queue_name|
begin
sqs.create_queue(
queue_name: queue_name,
attributes: {
FifoQueue: 'true',
ContentBasedDeduplication: 'true'
}
)

q_url = sqs.get_queue_url(queue_name: queue_name).queue_url
puts 'Queue created:'
puts " Name: #{queue_name}"
puts " Region: #{@config.AWS_REGION}"
puts " URL: #{q_url}"
rescue StandardError => error
puts "Error creating queue: #{error}"
end
end
end

desc "Purge messages in SQS queue for Shoryuken"
desc 'Purge messages in SQS queue for Shoryuken'
task :purge => :config do
sqs = Aws::SQS::Client.new(region: @config.AWS_REGION)

begin
queue = sqs.purge_queue(queue_url: @config.CLONE_QUEUE_URL)
puts "Queue #{@config.CLONE_QUEUE} purged"
rescue => e
puts "Error purging queue: #{e}"
[@config.CLONE_QUEUE, @config.REPORT_QUEUE].each do |queue_name|
begin
q_url = sqs.get_queue_url(queue_name: queue_name).queue_url
sqs.purge_queue(queue_url: q_url)
puts "Queue #{queue_name} purged"
rescue StandardError => error
puts "Error purging queue: #{error}"
end
end
end
end
Expand Down
3 changes: 2 additions & 1 deletion application/representers/clone_request_representer.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# frozen_string_literal: true

require_relative 'collaborator_representer'
require_relative 'repo_representer'

# Represents essential Repo information for API output
module CodePraise
# Representer object for repo clone requests
class CloneRequestRepresenter < Roar::Decorator
include Roar::JSON

Expand Down
15 changes: 12 additions & 3 deletions application/services/summarize_folder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ def clone_repo(input)
if input[:gitrepo].exists_locally?
Right(input)
else
clone_request = clone_request_json(input)
CloneRepoWorker.perform_async(clone_request.to_json)
clone_request_msg = clone_request_json(input)
# TODO:
# - send message to worker using notify_clone_listeners?
# - send repo to worker and let it find gitrepo
CloneRepoWorker.perform_async(clone_request_msg)
notify_clone_listeners(clone_request_msg)
Left(Result.new(:processing, { id: input[:id] }))
end
rescue StandardError => error
Expand All @@ -42,7 +46,12 @@ def summarize_folder(input)

def clone_request_json(input)
clone_request = CloneRequest.new(input[:repo], input[:id])
CloneRequestRepresenter.new(clone_request)
CloneRequestRepresenter.new(clone_request).to_json
end

def notify_clone_listeners(message)
report_queue = Messaging::Queue.new(app.config.REPORT_QUEUE_URL)
report_queue.send(message)
end
end
end
2 changes: 1 addition & 1 deletion infrastructure/init.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# frozen_string_literal: false

folders = %w[github database/orm gitrepo]
folders = %w[github database/orm gitrepo messaging]
folders.each do |folder|
require_relative "#{folder}/init.rb"
end
5 changes: 5 additions & 0 deletions infrastructure/messaging/init.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# frozen_string_literal: false

Dir.glob("#{File.dirname(__FILE__)}/*.rb").each do |file|
require file
end
42 changes: 42 additions & 0 deletions infrastructure/messaging/queue.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# frozen_string_literal: true

module CodePraise
module Messaging
## Queue wrapper for AWS SQS
# Requires: AWS credentials loaded in ENV or through config file
class Queue
GROUP_ID = 'codepraise_api'
IDLE_TIMEOUT = 5 # seconds

def initialize(queue_url)
@queue_url = queue_url
@queue = Aws::SQS::Queue.new(queue_url)
end

## Sends message to queue
# Usage:
# q = Messaging::Queue.new(app.config.REPORT_QUEUE_URL)
# q.send({data: "hello"}.to_json)
def send(message)
unique = message.hash.to_s + Time.now.hash.to_s

@queue.send_message(
message_body: message,
message_group_id: GROUP_ID,
message_deduplication_id: unique
)
end

## Polls queue, yielding each messge
# Usage:
# q = Messaging::Queue.new(app.config.REPORT_QUEUE_URL)
# q.poll { |msg| print msg.body.to_s }
def poll
poller = Aws::SQS::QueuePoller.new(@queue_url)
poller.poll(idle_timeout: IDLE_TIMEOUT) do |msg|
yield msg.body if block_given?
end
end
end
end
end

0 comments on commit bb856ea

Please sign in to comment.