Signal-Android/app/src/main/java/org/thoughtcrime/securesms/jobmanager/JobManager.java

467 lines
16 KiB
Java

package org.thoughtcrime.securesms.jobmanager;
import android.app.Application;
import android.content.Intent;
import android.os.Build;
import androidx.annotation.NonNull;
import androidx.annotation.WorkerThread;
import org.thoughtcrime.securesms.jobmanager.impl.DefaultExecutorFactory;
import org.thoughtcrime.securesms.jobmanager.impl.JsonDataSerializer;
import org.thoughtcrime.securesms.jobmanager.workmanager.WorkManagerMigrator;
import org.thoughtcrime.securesms.jobmanager.persistence.JobStorage;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.util.Debouncer;
import org.thoughtcrime.securesms.util.TextSecurePreferences;
import org.whispersystems.libsignal.util.guava.Optional;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* Allows the scheduling of durable jobs that will be run as early as possible.
*/
public class JobManager implements ConstraintObserver.Notifier {
private static final String TAG = JobManager.class.getSimpleName();
public static final int CURRENT_VERSION = 5;
private final Application application;
private final Configuration configuration;
private final ExecutorService executor;
private final JobController jobController;
private final JobTracker jobTracker;
private final Set<EmptyQueueListener> emptyQueueListeners = new CopyOnWriteArraySet<>();
public JobManager(@NonNull Application application, @NonNull Configuration configuration) {
this.application = application;
this.configuration = configuration;
this.executor = configuration.getExecutorFactory().newSingleThreadExecutor("signal-JobManager");
this.jobTracker = configuration.getJobTracker();
this.jobController = new JobController(application,
configuration.getJobStorage(),
configuration.getJobInstantiator(),
configuration.getConstraintFactories(),
configuration.getDataSerializer(),
configuration.getJobTracker(),
Build.VERSION.SDK_INT < 26 ? new AlarmManagerScheduler(application)
: new CompositeScheduler(new InAppScheduler(this), new JobSchedulerScheduler(application)),
new Debouncer(500),
this::onEmptyQueue);
executor.execute(() -> {
if (WorkManagerMigrator.needsMigration(application)) {
Log.i(TAG, "Detected an old WorkManager database. Migrating.");
WorkManagerMigrator.migrate(application, configuration.getJobStorage(), configuration.getDataSerializer());
}
JobStorage jobStorage = configuration.getJobStorage();
jobStorage.init();
int latestVersion = configuration.getJobMigrator().migrate(jobStorage, configuration.getDataSerializer());
TextSecurePreferences.setJobManagerVersion(application, latestVersion);
jobController.init();
for (ConstraintObserver constraintObserver : configuration.getConstraintObservers()) {
constraintObserver.register(this);
}
if (Build.VERSION.SDK_INT < 26) {
application.startService(new Intent(application, KeepAliveService.class));
}
});
}
/**
* Begins the execution of jobs.
*/
public void beginJobLoop() {
executor.execute(() -> {
for (int i = 0; i < configuration.getJobThreadCount(); i++) {
new JobRunner(application, i + 1, jobController).start();
}
wakeUp();
});
}
/**
* Convenience method for {@link #addListener(JobTracker.JobFilter, JobTracker.JobListener)} that
* takes in an ID to filter on.
*/
public void addListener(@NonNull String id, @NonNull JobTracker.JobListener listener) {
jobTracker.addListener(new JobIdFilter(id), listener);
}
/**
* Add a listener to subscribe to job state updates. Listeners will be invoked on an arbitrary
* background thread. You must eventually call {@link #removeListener(JobTracker.JobListener)} to avoid
* memory leaks.
*/
public void addListener(@NonNull JobTracker.JobFilter filter, @NonNull JobTracker.JobListener listener) {
jobTracker.addListener(filter, listener);
}
/**
* Unsubscribe the provided listener from all job updates.
*/
public void removeListener(@NonNull JobTracker.JobListener listener) {
jobTracker.removeListener(listener);
}
/**
* Enqueues a single job to be run.
*/
public void add(@NonNull Job job) {
new Chain(this, Collections.singletonList(job)).enqueue();
}
/**
* Enqueues a single job that depends on a collection of job ID's.
*/
public void add(@NonNull Job job, @NonNull Collection<String> dependsOn) {
jobTracker.onStateChange(job, JobTracker.JobState.PENDING);
executor.execute(() -> {
jobController.submitJobWithExistingDependencies(job, dependsOn);
wakeUp();
});
}
/**
* Begins the creation of a job chain with a single job.
* @see Chain
*/
public Chain startChain(@NonNull Job job) {
return new Chain(this, Collections.singletonList(job));
}
/**
* Begins the creation of a job chain with a set of jobs that can be run in parallel.
* @see Chain
*/
public Chain startChain(@NonNull List<? extends Job> jobs) {
return new Chain(this, jobs);
}
/**
* Attempts to cancel a job. This is best-effort and may not actually prevent a job from
* completing if it was already running. If this job is running, this can only stop jobs that
* bother to check {@link Job#isCanceled()}.
*
* When a job is canceled, {@link Job#onFailure()} will be triggered at the earliest possible
* moment. Just like a normal failure, all later jobs in the same chain will also be failed.
*/
public void cancel(@NonNull String id) {
executor.execute(() -> jobController.cancelJob(id));
}
/**
* Runs the specified job synchronously. Beware: All normal dependencies are respected, meaning
* you must take great care where you call this. It could take a very long time to complete!
*
* @return If the job completed, this will contain its completion state. If it timed out or
* otherwise didn't complete, this will be absent.
*/
@WorkerThread
public Optional<JobTracker.JobState> runSynchronously(@NonNull Job job, long timeout) {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<JobTracker.JobState> resultState = new AtomicReference<>();
addListener(job.getId(), new JobTracker.JobListener() {
@Override
public void onStateChanged(@NonNull Job job, @NonNull JobTracker.JobState jobState) {
if (jobState.isComplete()) {
removeListener(this);
resultState.set(jobState);
latch.countDown();
}
}
});
add(job);
try {
if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
return Optional.absent();
}
} catch (InterruptedException e) {
Log.w(TAG, "Interrupted during runSynchronously()", e);
return Optional.absent();
}
return Optional.fromNullable(resultState.get());
}
/**
* Retrieves a string representing the state of the job queue. Intended for debugging.
*/
@WorkerThread
public @NonNull String getDebugInfo() {
Future<String> result = executor.submit(jobController::getDebugInfo);
try {
return result.get();
} catch (ExecutionException | InterruptedException e) {
Log.w(TAG, "Failed to retrieve Job info.", e);
return "Failed to retrieve Job info.";
}
}
/**
* Adds a listener that will be notified when the job queue has been drained.
*/
void addOnEmptyQueueListener(@NonNull EmptyQueueListener listener) {
executor.execute(() -> {
emptyQueueListeners.add(listener);
});
}
/**
* Removes a listener that was added via {@link #addOnEmptyQueueListener(EmptyQueueListener)}.
*/
void removeOnEmptyQueueListener(@NonNull EmptyQueueListener listener) {
executor.execute(() -> {
emptyQueueListeners.remove(listener);
});
}
@Override
public void onConstraintMet(@NonNull String reason) {
Log.i(TAG, "onConstraintMet(" + reason + ")");
wakeUp();
}
/**
* Pokes the system to take another pass at the job queue.
*/
void wakeUp() {
executor.execute(jobController::wakeUp);
}
private void enqueueChain(@NonNull Chain chain) {
for (List<Job> jobList : chain.getJobListChain()) {
for (Job job : jobList) {
jobTracker.onStateChange(job, JobTracker.JobState.PENDING);
}
}
executor.execute(() -> {
jobController.submitNewJobChain(chain.getJobListChain());
wakeUp();
});
}
private void onEmptyQueue() {
executor.execute(() -> {
for (EmptyQueueListener listener : emptyQueueListeners) {
listener.onQueueEmpty();
}
});
}
public interface EmptyQueueListener {
void onQueueEmpty();
}
public static class JobIdFilter implements JobTracker.JobFilter {
private final String id;
public JobIdFilter(@NonNull String id) {
this.id = id;
}
@Override
public boolean matches(@NonNull Job job) {
return id.equals(job.getId());
}
}
/**
* Allows enqueuing work that depends on each other. Jobs that appear later in the chain will
* only run after all jobs earlier in the chain have been completed. If a job fails, all jobs
* that occur later in the chain will also be failed.
*/
public static class Chain {
private final JobManager jobManager;
private final List<List<Job>> jobs;
private Chain(@NonNull JobManager jobManager, @NonNull List<? extends Job> jobs) {
this.jobManager = jobManager;
this.jobs = new LinkedList<>();
this.jobs.add(new ArrayList<>(jobs));
}
public Chain then(@NonNull Job job) {
return then(Collections.singletonList(job));
}
public Chain then(@NonNull List<? extends Job> jobs) {
if (!jobs.isEmpty()) {
this.jobs.add(new ArrayList<>(jobs));
}
return this;
}
public void enqueue() {
jobManager.enqueueChain(this);
}
private List<List<Job>> getJobListChain() {
return jobs;
}
}
public static class Configuration {
private final ExecutorFactory executorFactory;
private final int jobThreadCount;
private final JobInstantiator jobInstantiator;
private final ConstraintInstantiator constraintInstantiator;
private final List<ConstraintObserver> constraintObservers;
private final Data.Serializer dataSerializer;
private final JobStorage jobStorage;
private final JobMigrator jobMigrator;
private final JobTracker jobTracker;
private Configuration(int jobThreadCount,
@NonNull ExecutorFactory executorFactory,
@NonNull JobInstantiator jobInstantiator,
@NonNull ConstraintInstantiator constraintInstantiator,
@NonNull List<ConstraintObserver> constraintObservers,
@NonNull Data.Serializer dataSerializer,
@NonNull JobStorage jobStorage,
@NonNull JobMigrator jobMigrator,
@NonNull JobTracker jobTracker)
{
this.executorFactory = executorFactory;
this.jobThreadCount = jobThreadCount;
this.jobInstantiator = jobInstantiator;
this.constraintInstantiator = constraintInstantiator;
this.constraintObservers = constraintObservers;
this.dataSerializer = dataSerializer;
this.jobStorage = jobStorage;
this.jobMigrator = jobMigrator;
this.jobTracker = jobTracker;
}
int getJobThreadCount() {
return jobThreadCount;
}
@NonNull ExecutorFactory getExecutorFactory() {
return executorFactory;
}
@NonNull JobInstantiator getJobInstantiator() {
return jobInstantiator;
}
@NonNull
ConstraintInstantiator getConstraintFactories() {
return constraintInstantiator;
}
@NonNull List<ConstraintObserver> getConstraintObservers() {
return constraintObservers;
}
@NonNull Data.Serializer getDataSerializer() {
return dataSerializer;
}
@NonNull JobStorage getJobStorage() {
return jobStorage;
}
@NonNull JobMigrator getJobMigrator() {
return jobMigrator;
}
@NonNull JobTracker getJobTracker() {
return jobTracker;
}
public static class Builder {
private ExecutorFactory executorFactory = new DefaultExecutorFactory();
private int jobThreadCount = Math.max(2, Math.min(Runtime.getRuntime().availableProcessors() - 1, 4));
private Map<String, Job.Factory> jobFactories = new HashMap<>();
private Map<String, Constraint.Factory> constraintFactories = new HashMap<>();
private List<ConstraintObserver> constraintObservers = new ArrayList<>();
private Data.Serializer dataSerializer = new JsonDataSerializer();
private JobStorage jobStorage = null;
private JobMigrator jobMigrator = null;
private JobTracker jobTracker = new JobTracker();
public @NonNull Builder setJobThreadCount(int jobThreadCount) {
this.jobThreadCount = jobThreadCount;
return this;
}
public @NonNull Builder setExecutorFactory(@NonNull ExecutorFactory executorFactory) {
this.executorFactory = executorFactory;
return this;
}
public @NonNull Builder setJobFactories(@NonNull Map<String, Job.Factory> jobFactories) {
this.jobFactories = jobFactories;
return this;
}
public @NonNull Builder setConstraintFactories(@NonNull Map<String, Constraint.Factory> constraintFactories) {
this.constraintFactories = constraintFactories;
return this;
}
public @NonNull Builder setConstraintObservers(@NonNull List<ConstraintObserver> constraintObservers) {
this.constraintObservers = constraintObservers;
return this;
}
public @NonNull Builder setDataSerializer(@NonNull Data.Serializer dataSerializer) {
this.dataSerializer = dataSerializer;
return this;
}
public @NonNull Builder setJobStorage(@NonNull JobStorage jobStorage) {
this.jobStorage = jobStorage;
return this;
}
public @NonNull Builder setJobMigrator(@NonNull JobMigrator jobMigrator) {
this.jobMigrator = jobMigrator;
return this;
}
public @NonNull Configuration build() {
return new Configuration(jobThreadCount,
executorFactory,
new JobInstantiator(jobFactories),
new ConstraintInstantiator(constraintFactories),
new ArrayList<>(constraintObservers),
dataSerializer,
jobStorage,
jobMigrator,
jobTracker);
}
}
}
}