Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change interface JobScheduler remove signatures to provide control me… #1322

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,25 +153,25 @@ public String getNextScheduleTime() throws Exception {

@Override
public void removeAllJobs() throws Exception {
this.jobScheduler.removeAllJobs();
this.jobScheduler.removeAllJobs(null);
}

@Override
public void removeAllJobs(String startTime, String finishTime) throws Exception {
long start = JobSupport.getDataTime(startTime);
long finish = JobSupport.getDataTime(finishTime);
this.jobScheduler.removeAllJobs(start, finish);
this.jobScheduler.removeAllJobs(start, finish, null);
}

@Override
public void removeAllJobsAtScheduledTime(String time) throws Exception {
long removeAtTime = JobSupport.getDataTime(time);
this.jobScheduler.remove(removeAtTime);
this.jobScheduler.remove(removeAtTime, null);
}

@Override
public void removeJob(String jobId) throws Exception {
this.jobScheduler.remove(jobId);
this.jobScheduler.remove(jobId, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.List;

import org.apache.activemq.command.Message;
import org.apache.activemq.util.ByteSequence;

public interface JobScheduler {
Expand Down Expand Up @@ -118,27 +119,34 @@ public interface JobScheduler {
*
* @param time
* The UTC time to use to remove a batch of scheduled Jobs.
* @param message
* The message from management queue, that triggered the removal action
*
* @throws Exception
*/
void remove(long time) throws Exception;
void remove(long time, Message message) throws Exception;

/**
* remove a job with the matching jobId
*
* @param jobId
* The unique Job Id to search for and remove from the scheduled set of jobs.
* @param message
* The message from management queue, that triggered the removal action
*
* @throws Exception if an error occurs while removing the Job.
*/
void remove(String jobId) throws Exception;
void remove(String jobId, Message message) throws Exception;

/**
* remove all the Jobs from the scheduler
*
* @param message
* The message from management queue, that triggered the removal action
*
* @throws Exception
*/
void removeAllJobs() throws Exception;
void removeAllJobs(Message message) throws Exception;

/**
* remove all the Jobs from the scheduler that are due between the start and finish times
Expand All @@ -147,9 +155,11 @@ public interface JobScheduler {
* time in milliseconds
* @param finish
* time in milliseconds
* @param message
* The message from management queue, that triggered the removal action
* @throws Exception
*/
void removeAllJobs(long start, long finish) throws Exception;
void removeAllJobs(long start, long finish, Message message) throws Exception;

/**
* Get the next time jobs will be fired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Collections;
import java.util.List;

import org.apache.activemq.command.Message;
import org.apache.activemq.util.ByteSequence;

/**
Expand Down Expand Up @@ -89,34 +90,34 @@ public long getNextScheduleTime() throws Exception {
}

@Override
public void remove(long time) throws Exception {
public void remove(long time, Message message) throws Exception {
JobScheduler js = this.broker.getInternalScheduler();
if (js != null) {
js.remove(time);
js.remove(time, message);
}
}

@Override
public void remove(String jobId) throws Exception {
public void remove(String jobId, Message message) throws Exception {
JobScheduler js = this.broker.getInternalScheduler();
if (js != null) {
js.remove(jobId);
js.remove(jobId, message);
}
}

@Override
public void removeAllJobs() throws Exception {
public void removeAllJobs(Message message) throws Exception {
JobScheduler js = this.broker.getInternalScheduler();
if (js != null) {
js.removeAllJobs();
js.removeAllJobs(message);
}
}

@Override
public void removeAllJobs(long start, long finish) throws Exception {
public void removeAllJobs(long start, long finish, Message message) throws Exception {
JobScheduler js = this.broker.getInternalScheduler();
if (js != null) {
js.removeAllJobs(start, finish);
js.removeAllJobs(start, finish, message);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,17 +272,17 @@ public void send(ProducerBrokerExchange producerExchange, final Message messageS
}
}
if (jobId != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE)) {
scheduler.remove(jobId);
scheduler.remove(jobId, messageSend);
} else if (action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL)) {

if (startTime != null && endTime != null) {

long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);

scheduler.removeAllJobs(start, finish);
scheduler.removeAllJobs(start, finish, messageSend);
} else {
scheduler.removeAllJobs();
scheduler.removeAllJobs(messageSend);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.activemq.broker.scheduler.JobListener;
import org.apache.activemq.broker.scheduler.JobScheduler;
import org.apache.activemq.broker.scheduler.JobSupport;
import org.apache.activemq.command.Message;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IdGenerator;
import org.slf4j.Logger;
Expand Down Expand Up @@ -129,22 +130,22 @@ public void schedule(String jobId, ByteSequence payload, String cronEntry, long
}

@Override
public void remove(long time) throws Exception {
public void remove(long time, Message message) throws Exception {
doRemoveRange(time, time);
}

@Override
public void remove(String jobId) throws Exception {
public void remove(String jobId, Message message) throws Exception {
doRemoveJob(jobId);
}

@Override
public void removeAllJobs() throws Exception {
public void removeAllJobs(Message message) throws Exception {
doRemoveRange(0, Long.MAX_VALUE);
}

@Override
public void removeAllJobs(long start, long finish) throws Exception {
public void removeAllJobs(long start, long finish, Message message) throws Exception {
doRemoveRange(start, finish);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.activemq.broker.scheduler.Job;
import org.apache.activemq.broker.scheduler.JobListener;
import org.apache.activemq.broker.scheduler.JobScheduler;
import org.apache.activemq.command.Message;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.kahadb.data.KahaAddScheduledJobCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobCommand;
Expand Down Expand Up @@ -101,22 +102,22 @@ public void schedule(final String jobId, final ByteSequence payload, final Strin
}

@Override
public void remove(final long time) throws IOException {
public void remove(final long time, Message message) throws IOException {
doRemoveRange(time, time);
}

@Override
public void remove(final String jobId) throws IOException {
public void remove(final String jobId, Message message) throws IOException {
doRemove(-1, jobId);
}

@Override
public void removeAllJobs() throws IOException {
public void removeAllJobs(Message message) throws IOException {
doRemoveRange(0, Long.MAX_VALUE);
}

@Override
public void removeAllJobs(final long start, final long finish) throws IOException {
public void removeAllJobs(final long start, final long finish, Message message) throws IOException {
doRemoveRange(start, finish);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,12 @@ public void testRemoveLong() throws Exception {
assertEquals(size, COUNT);

long removeTime = scheduler.getNextScheduleTime();
scheduler.remove(removeTime);
scheduler.remove(removeTime, null);

// If all jobs are not started within the same second we need to call remove again
if (size != 0) {
removeTime = scheduler.getNextScheduleTime();
scheduler.remove(removeTime);
scheduler.remove(removeTime, null);
}

size = scheduler.getAllJobs().size();
Expand All @@ -172,7 +172,7 @@ public void testRemoveString() throws Exception {

int size = scheduler.getAllJobs().size();
assertEquals(size, COUNT + 1);
scheduler.remove(test);
scheduler.remove(test, null);
size = scheduler.getAllJobs().size();
assertEquals(size, COUNT);
}
Expand Down Expand Up @@ -264,7 +264,7 @@ public void testRemoveAllJobsInRange() throws Exception {
}
start = System.currentTimeMillis();
long finish = start + 12000 + (COUNT * 1000);
scheduler.removeAllJobs(start, finish);
scheduler.removeAllJobs(start, finish, null);

assertTrue(scheduler.getAllJobs().isEmpty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,12 @@ public void testRemoveLong() throws Exception {
assertEquals(size, COUNT);

long removeTime = scheduler.getNextScheduleTime();
scheduler.remove(removeTime);
scheduler.remove(removeTime, null);

// If all jobs are not started within the same second we need to call remove again
if (size != 0) {
removeTime = scheduler.getNextScheduleTime();
scheduler.remove(removeTime);
scheduler.remove(removeTime, null);
}

size = scheduler.getAllJobs().size();
Expand All @@ -176,7 +176,7 @@ public void testRemoveString() throws Exception {

int size = scheduler.getAllJobs().size();
assertEquals(size, COUNT + 1);
scheduler.remove(test);
scheduler.remove(test, null);
size = scheduler.getAllJobs().size();
assertEquals(size, COUNT);
}
Expand Down Expand Up @@ -268,7 +268,7 @@ public void testRemoveAllJobsInRange() throws Exception {
}
start = System.currentTimeMillis();
long finish = start + 12000 + (COUNT * 1000);
scheduler.removeAllJobs(start, finish);
scheduler.removeAllJobs(start, finish, null);

assertTrue(scheduler.getAllJobs().isEmpty());
}
Expand Down