Add support for canceling Jobs.

master
Greyson Parrelli 2020-01-03 14:10:16 -05:00
parent b10ce080a9
commit 38597aea00
66 changed files with 137 additions and 79 deletions

View File

@ -32,8 +32,10 @@ public abstract class Job {
private final Parameters parameters;
private int runAttempt;
private long nextRunAttemptTime;
private int runAttempt;
private long nextRunAttemptTime;
private volatile boolean canceled;
protected Context context;
@ -75,12 +77,27 @@ public abstract class Job {
this.nextRunAttemptTime = nextRunAttemptTime;
}
/** Should only be invoked by {@link JobController} */
final void cancel() {
this.canceled = true;
}
@WorkerThread
final void onSubmit() {
Log.i(TAG, JobLogger.format(this, "onSubmit()"));
onAdded();
}
/**
* @return True if your job has been marked as canceled while it was running, otherwise false.
* If a job sees that it has been canceled, it should make a best-effort attempt at
* stopping it's work. This job will have {@link #onFailure()} called after {@link #run()}
* has finished.
*/
public final boolean isCanceled() {
return canceled;
}
/**
* Called when the job is first submitted to the {@link JobManager}.
*/
@ -112,10 +129,10 @@ public abstract class Job {
public abstract @NonNull Result run();
/**
* Called when your job has completely failed.
* Called when your job has completely failed and will not be run again.
*/
@WorkerThread
public abstract void onCanceled();
public abstract void onFailure();
public interface Factory<T extends Job> {
@NonNull T create(@NonNull Parameters parameters, @NonNull Data data);

View File

@ -17,11 +17,12 @@ import org.thoughtcrime.securesms.util.Debouncer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
/**
* Manages the queue of jobs. This is the only class that should write to {@link JobStorage} to
@ -40,7 +41,7 @@ class JobController {
private final Scheduler scheduler;
private final Debouncer debouncer;
private final Callback callback;
private final Set<String> runningJobs;
private final Map<String, Job> runningJobs;
JobController(@NonNull Application application,
@NonNull JobStorage jobStorage,
@ -61,7 +62,7 @@ class JobController {
this.scheduler = scheduler;
this.debouncer = debouncer;
this.callback = callback;
this.runningJobs = new HashSet<>();
this.runningJobs = new HashMap<>();
}
@WorkerThread
@ -96,6 +97,29 @@ class JobController {
notifyAll();
}
@WorkerThread
synchronized void cancelJob(@NonNull String id) {
Job runningJob = runningJobs.get(id);
if (runningJob != null) {
Log.w(TAG, JobLogger.format(runningJob, "Canceling while running."));
runningJob.cancel();
} else {
JobSpec jobSpec = jobStorage.getJobSpec(id);
if (jobSpec != null) {
Job job = createJob(jobSpec, jobStorage.getConstraintSpecs(id));
Log.w(TAG, JobLogger.format(job, "Canceling while inactive."));
Log.w(TAG, JobLogger.format(job, "Job failed."));
job.onFailure();
onFailure(job);
} else {
Log.w(TAG, "Tried to cancel JOB::" + id + ", but it could not be found.");
}
}
}
@WorkerThread
synchronized void onRetry(@NonNull Job job) {
int nextRunAttempt = job.getRunAttempt() + 1;
@ -177,7 +201,7 @@ class JobController {
}
jobStorage.updateJobRunningState(job.getId(), true);
runningJobs.add(job.getId());
runningJobs.put(job.getId(), job);
jobTracker.onStateChange(job.getId(), JobTracker.JobState.RUNNING);
return job;
@ -333,7 +357,7 @@ class JobController {
return job;
} catch (RuntimeException e) {
Log.e(TAG, "Failed to instantiate job! Failing it and its dependencies without calling Job#onCanceled. Crash imminent.");
Log.e(TAG, "Failed to instantiate job! Failing it and its dependencies without calling Job#onFailure. Crash imminent.");
List<String> failIds = Stream.of(jobStorage.getDependencySpecsThatDependOnJob(jobSpec.getId()))
.map(DependencySpec::getJobId)

View File

@ -112,7 +112,6 @@ public class JobManager implements ConstraintObserver.Notifier {
jobTracker.removeListener(listener);
}
/**
* Enqueues a single job to be run.
*/
@ -136,9 +135,22 @@ public class JobManager implements ConstraintObserver.Notifier {
return new Chain(this, jobs);
}
/**
* Attempts to cancel a job. This is best-effort and may not actually prevent a job from
* completing if it was already running. If this job is running, this can only stop jobs that
* bother to check {@link Job#isCanceled()}.
*
* When a job is canceled, {@link Job#onFailure()} will be triggered at the earliest possible
* moment. Just like a normal failure, all later jobs in the same chain will also be failed.
*/
public void cancel(@NonNull String id) {
executor.execute(() -> jobController.cancelJob(id));
}
/**
* Retrieves a string representing the state of the job queue. Intended for debugging.
*/
@WorkerThread
public @NonNull String getDebugInfo() {
Future<String> result = executor.submit(jobController::getDebugInfo);
try {

View File

@ -54,8 +54,8 @@ class JobRunner extends Thread {
job.onRetry();
} else if (result.isFailure()) {
List<Job> dependents = jobController.onFailure(job);
job.onCanceled();
Stream.of(dependents).forEach(Job::onCanceled);
job.onFailure();
Stream.of(dependents).forEach(Job::onFailure);
if (result.getException() != null) {
throw result.getException();
@ -80,6 +80,11 @@ class JobRunner extends Thread {
try {
wakeLock = WakeLockUtil.acquire(application, PowerManager.PARTIAL_WAKE_LOCK, WAKE_LOCK_TIMEOUT, job.getId());
result = job.run();
if (job.isCanceled()) {
Log.w(TAG, JobLogger.format(job, String.valueOf(id), "Failing because the job was canceled."));
result = Job.Result.failure();
}
} catch (Exception e) {
Log.w(TAG, JobLogger.format(job, String.valueOf(id), "Failing due to an unexpected exception."), e);
return Job.Result.failure();

View File

@ -181,9 +181,7 @@ public final class Argon2TestJob extends BaseJob {
}
@Override
public void onCanceled() {
}
public void onFailure() {}
private static abstract class Argon2RuntimeException extends RuntimeException {
private Argon2RuntimeException(String message) {

View File

@ -129,7 +129,7 @@ public final class AttachmentCompressionJob extends BaseJob {
}
@Override
public void onCanceled() { }
public void onFailure() { }
@Override
protected boolean onShouldRetry(@NonNull Exception exception) {

View File

@ -83,7 +83,7 @@ public class AttachmentCopyJob extends BaseJob {
}
@Override
public void onCanceled() { }
public void onFailure() { }
public static final class Factory implements Job.Factory<AttachmentCopyJob> {
@Override

View File

@ -15,6 +15,7 @@ import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.events.PartProgressEvent;
import org.thoughtcrime.securesms.jobmanager.Data;
import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobmanager.JobLogger;
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.mms.MmsException;
@ -138,8 +139,8 @@ public class AttachmentDownloadJob extends BaseJob {
}
@Override
public void onCanceled() {
Log.w(TAG, "onCanceled() messageId: " + messageId + " partRowId: " + partRowId + " partUniqueId: " + partUniqueId + " manual: " + manual);
public void onFailure() {
Log.w(TAG, JobLogger.format(this, "onFailure() messageId: " + messageId + " partRowId: " + partRowId + " partUniqueId: " + partUniqueId + " manual: " + manual));
final AttachmentId attachmentId = new AttachmentId(partRowId, partUniqueId);
markFailed(messageId, attachmentId);

View File

@ -115,7 +115,7 @@ public final class AttachmentUploadJob extends BaseJob {
}
@Override
public void onCanceled() { }
public void onFailure() { }
@Override
protected boolean onShouldRetry(@NonNull Exception exception) {

View File

@ -105,7 +105,7 @@ public class AvatarDownloadJob extends BaseJob {
}
@Override
public void onCanceled() {}
public void onFailure() {}
@Override
public boolean onShouldRetry(@NonNull Exception exception) {

View File

@ -96,7 +96,7 @@ public class CleanPreKeysJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
Log.w(TAG, "Failed to execute clean signed prekeys task.");
}

View File

@ -67,7 +67,7 @@ public class CreateSignedPreKeyJob extends BaseJob {
}
@Override
public void onCanceled() {}
public void onFailure() {}
@Override
public boolean onShouldRetry(@NonNull Exception exception) {

View File

@ -80,7 +80,7 @@ public class DirectoryRefreshJob extends BaseJob {
}
@Override
public void onCanceled() {}
public void onFailure() {}
public static final class Factory implements Job.Factory<DirectoryRefreshJob> {

View File

@ -34,7 +34,7 @@ public final class FailingJob extends Job {
}
@Override
public void onCanceled() {
public void onFailure() {
}
public static final class Factory implements Job.Factory<FailingJob> {

View File

@ -110,7 +110,7 @@ public class FcmRefreshJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
Log.w(TAG, "GCM reregistration failed after retry attempt exhaustion!");
}

View File

@ -107,7 +107,7 @@ public class LocalBackupJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
}
public static class Factory implements Job.Factory<LocalBackupJob> {

View File

@ -49,7 +49,7 @@ public final class MarkerJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
}
public static final class Factory implements Job.Factory<MarkerJob> {

View File

@ -157,7 +157,7 @@ public class MmsDownloadJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
MmsDatabase database = DatabaseFactory.getMmsDatabase(context);
database.markDownloadState(messageId, MmsDatabase.Status.DOWNLOAD_SOFT_FAILURE);

View File

@ -88,7 +88,7 @@ public class MmsReceiveJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
// TODO
}

View File

@ -30,6 +30,7 @@ import org.thoughtcrime.securesms.database.NoSuchMessageException;
import org.thoughtcrime.securesms.database.ThreadDatabase;
import org.thoughtcrime.securesms.jobmanager.Data;
import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobmanager.JobLogger;
import org.thoughtcrime.securesms.jobmanager.JobManager;
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
import org.thoughtcrime.securesms.logging.Log;
@ -152,8 +153,8 @@ public final class MmsSendJob extends SendJob {
}
@Override
public void onCanceled() {
Log.i(TAG, "onCanceled() messageId: " + messageId);
public void onFailure() {
Log.i(TAG, JobLogger.format(this, "onFailure() messageId: " + messageId));
DatabaseFactory.getMmsDatabase(context).markAsSentFailed(messageId);
notifyMediaMessageDeliveryFailed(context, messageId);
}

View File

@ -95,7 +95,7 @@ public class MultiDeviceBlockedUpdateJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
}
public static final class Factory implements Job.Factory<MultiDeviceBlockedUpdateJob> {

View File

@ -101,7 +101,7 @@ public class MultiDeviceConfigurationUpdateJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
Log.w(TAG, "**** Failed to synchronize read receipts state!");
}

View File

@ -231,7 +231,7 @@ public class MultiDeviceContactUpdateJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
}

View File

@ -133,7 +133,7 @@ public class MultiDeviceGroupUpdateJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
}

View File

@ -79,7 +79,7 @@ public class MultiDeviceKeysUpdateJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
}
public static final class Factory implements Job.Factory<MultiDeviceKeysUpdateJob> {

View File

@ -61,7 +61,7 @@ public class MultiDeviceProfileContentUpdateJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
Log.w(TAG, "Did not succeed!");
}

View File

@ -102,7 +102,7 @@ public class MultiDeviceProfileKeyUpdateJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
Log.w(TAG, "Profile key sync failed!");
}

View File

@ -104,7 +104,7 @@ public class MultiDeviceReadUpdateJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
}

View File

@ -101,7 +101,7 @@ public class MultiDeviceStickerPackOperationJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
Log.w(TAG, "Failed to sync sticker pack operation!");
}

View File

@ -83,7 +83,7 @@ public class MultiDeviceStickerPackSyncJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
Log.w(TAG, "Failed to sync sticker pack operation!");
}

View File

@ -61,7 +61,7 @@ public class MultiDeviceStorageSyncRequestJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
Log.w(TAG, "Did not succeed!");
}

View File

@ -130,7 +130,7 @@ public class MultiDeviceVerifiedUpdateJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
}

View File

@ -90,7 +90,7 @@ public class MultiDeviceViewOnceOpenJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
}

View File

@ -126,7 +126,7 @@ public final class PushDecryptMessageJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
}
private boolean needsMigration() {

View File

@ -225,7 +225,7 @@ public class PushGroupSendJob extends PushSendJob {
}
@Override
public void onCanceled() {
public void onFailure() {
DatabaseFactory.getMmsDatabase(context).markAsSentFailed(messageId);
}

View File

@ -133,7 +133,7 @@ public class PushGroupUpdateJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
}
public static final class Factory implements Job.Factory<PushGroupUpdateJob> {

View File

@ -174,7 +174,7 @@ public class PushMediaSendJob extends PushSendJob {
}
@Override
public void onCanceled() {
public void onFailure() {
DatabaseFactory.getMmsDatabase(context).markAsSentFailed(messageId);
notifyMediaMessageDeliveryFailed(context, messageId);
}

View File

@ -63,7 +63,7 @@ public class PushNotificationReceiveJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
Log.w(TAG, "***** Failed to download pending message!");
// MessageNotifier.notifyMessagesPending(getContext());
}

View File

@ -242,7 +242,7 @@ public final class PushProcessMessageJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
}
private void handleMessage(@NonNull byte[] plaintextDataBuffer, @NonNull Optional<Long> smsMessageId) {

View File

@ -134,7 +134,7 @@ public class PushTextSendJob extends PushSendJob {
}
@Override
public void onCanceled() {
public void onFailure() {
DatabaseFactory.getSmsDatabase(context).markAsSentFailed(messageId);
long threadId = DatabaseFactory.getSmsDatabase(context).getThreadIdForMessage(messageId);

View File

@ -185,7 +185,7 @@ public class ReactionSendJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
if (recipients.size() < initialRecipientCount) {
Log.w(TAG, "Only sent a reaction to " + recipients.size() + "/" + initialRecipientCount + " recipients. Still, it sent to someone, so it stays.");
return;

View File

@ -71,7 +71,7 @@ public class RefreshAttributesJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
Log.w(TAG, "Failed to update account attributes!");
}

View File

@ -70,7 +70,7 @@ public class RefreshOwnProfileJob extends BaseJob {
}
@Override
public void onCanceled() { }
public void onFailure() { }
private void setProfileName(@Nullable String encryptedName) {
try {

View File

@ -87,7 +87,7 @@ public class RefreshPreKeysJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
}
public static final class Factory implements Job.Factory<RefreshPreKeysJob> {

View File

@ -91,7 +91,7 @@ public class RequestGroupInfoJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
}

View File

@ -131,7 +131,7 @@ public class RetrieveProfileAvatarJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
}
public static final class Factory implements Job.Factory<RetrieveProfileAvatarJob> {

View File

@ -95,7 +95,7 @@ public class RetrieveProfileJob extends BaseJob {
}
@Override
public void onCanceled() {}
public void onFailure() {}
private void handleIndividualRecipient(Recipient recipient) throws IOException {
if (recipient.hasServiceIdentifier()) handlePhoneNumberRecipient(recipient);

View File

@ -67,7 +67,7 @@ public class RotateCertificateJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
Log.w(TAG, "Failed to rotate sender certificate!");
}

View File

@ -59,7 +59,7 @@ public class RotateProfileKeyJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
}

View File

@ -68,7 +68,7 @@ public class RotateSignedPreKeyJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
TextSecurePreferences.setSignedPreKeyFailureCount(context, TextSecurePreferences.getSignedPreKeyFailureCount(context) + 1);
}

View File

@ -93,7 +93,7 @@ public class SendDeliveryReceiptJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
Log.w(TAG, "Failed to send delivery receipt to: " + recipientId);
}

View File

@ -108,7 +108,7 @@ public class SendReadReceiptJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
Log.w(TAG, "Failed to send read receipts to: " + recipientId);
}

View File

@ -86,7 +86,7 @@ public class ServiceOutageDetectionJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
Log.i(TAG, "Service status check could not complete. Assuming success to avoid false positives due to bad network.");
TextSecurePreferences.setServiceOutage(context, false);
TextSecurePreferences.setLastOutageCheckTime(context, System.currentTimeMillis());

View File

@ -89,7 +89,7 @@ public class SmsReceiveJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
}

View File

@ -98,8 +98,8 @@ public class SmsSendJob extends SendJob {
}
@Override
public void onCanceled() {
warn(TAG, "onCanceled() messageId: " + messageId);
public void onFailure() {
warn(TAG, "onFailure() messageId: " + messageId);
long threadId = DatabaseFactory.getSmsDatabase(context).getThreadIdForMessage(messageId);
Recipient recipient = DatabaseFactory.getThreadDatabase(context).getRecipientForThreadId(threadId);

View File

@ -84,7 +84,7 @@ public class SmsSentJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
}
private void handleDeliveredResult(long messageId, int result) {

View File

@ -98,7 +98,7 @@ public class StickerDownloadJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
Log.w(TAG, "Failed to download sticker!");
}

View File

@ -167,7 +167,7 @@ public class StickerPackDownloadJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
Log.w(TAG, "Failed to download manifest! Uninstalling pack.");
DatabaseFactory.getStickerDatabase(context).uninstallPack(packId);
DatabaseFactory.getStickerDatabase(context).deleteOrphanedPacks();

View File

@ -118,7 +118,7 @@ public class StorageForcePushJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
}
private static @NonNull Map<RecipientId, byte[]> generateNewKeys(@NonNull Map<RecipientId, byte[]> oldKeys) {

View File

@ -102,7 +102,7 @@ public class StorageSyncJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
}
private boolean performSync() throws IOException, RetryLaterException, InvalidKeyException {

View File

@ -70,7 +70,7 @@ public class TrimThreadJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
Log.w(TAG, "Canceling trim attempt: " + threadId);
}

View File

@ -101,7 +101,7 @@ public class TypingSendJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
}
@Override

View File

@ -105,7 +105,7 @@ public class UpdateApkJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
Log.w(TAG, "Update check failed");
}

View File

@ -48,7 +48,7 @@ public class MigrationCompleteJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
throw new AssertionError("This job should never fail.");
}

View File

@ -54,7 +54,7 @@ abstract class MigrationJob extends Job {
}
@Override
public void onCanceled() {
public void onFailure() {
throw new AssertionError("This job should never fail. " + getClass().getSimpleName());
}

View File

@ -99,7 +99,7 @@ public final class RegistrationPinV2MigrationJob extends BaseJob {
}
@Override
public void onCanceled() {
public void onFailure() {
}
public static class Factory implements Job.Factory<RegistrationPinV2MigrationJob> {