Prevent FCM bottlenecking.
parent
4cda267f3b
commit
fe25d941bb
|
@ -531,7 +531,9 @@
|
|||
|
||||
<service android:name=".service.GenericForegroundService"/>
|
||||
|
||||
<service android:name=".gcm.FcmService">
|
||||
<service android:name=".gcm.FcmFetchService" />
|
||||
|
||||
<service android:name=".gcm.FcmReceiveService">
|
||||
<intent-filter>
|
||||
<action android:name="com.google.firebase.MESSAGING_EVENT" />
|
||||
</intent-filter>
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
package org.thoughtcrime.securesms.gcm;
|
||||
|
||||
import android.app.Service;
|
||||
import android.content.Intent;
|
||||
import android.os.Build;
|
||||
import android.os.IBinder;
|
||||
|
||||
import androidx.annotation.Nullable;
|
||||
|
||||
import com.google.firebase.messaging.RemoteMessage;
|
||||
|
||||
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
|
||||
import org.thoughtcrime.securesms.jobs.PushNotificationReceiveJob;
|
||||
import org.thoughtcrime.securesms.logging.Log;
|
||||
import org.thoughtcrime.securesms.util.concurrent.SerialMonoLifoExecutor;
|
||||
import org.thoughtcrime.securesms.util.concurrent.SignalExecutors;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* This service does the actual network fetch in response to an FCM message.
|
||||
*
|
||||
* Our goals with FCM processing are as follows:
|
||||
* (1) Ensure some service is active for the duration of the fetch and processing stages.
|
||||
* (2) Do not make unnecessary network requests.
|
||||
*
|
||||
* To fulfill goal 1, this service will not call {@link #stopSelf()} until there is no more running
|
||||
* requests.
|
||||
*
|
||||
* To fulfill goal 2, this service will not enqueue a fetch if there are already 2 active fetches
|
||||
* (or rather, 1 active and 1 waiting, since we use a single thread executor).
|
||||
*
|
||||
* Unfortunately we can't do this all in {@link FcmReceiveService} because it won't let us process
|
||||
* the next FCM message until {@link FcmReceiveService#onMessageReceived(RemoteMessage)} returns,
|
||||
* but as soon as that method returns, it could also destroy the service. By not letting us control
|
||||
* when the service is destroyed, we can't accomplish both goals within that service.
|
||||
*/
|
||||
public class FcmFetchService extends Service {
|
||||
|
||||
private static final String TAG = Log.tag(FcmFetchService.class);
|
||||
|
||||
private static final SerialMonoLifoExecutor EXECUTOR = new SerialMonoLifoExecutor(SignalExecutors.UNBOUNDED);
|
||||
|
||||
private final AtomicInteger activeCount = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
public int onStartCommand(Intent intent, int flags, int startId) {
|
||||
boolean performedReplace = EXECUTOR.enqueue(this::fetch);
|
||||
|
||||
if (performedReplace) {
|
||||
Log.i(TAG, "Already have one running and one enqueued. Ignoring.");
|
||||
} else {
|
||||
int count = activeCount.incrementAndGet();
|
||||
Log.i(TAG, "Incrementing active count to " + count);
|
||||
}
|
||||
|
||||
return START_NOT_STICKY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDestroy() {
|
||||
Log.i(TAG, "onDestroy()");
|
||||
}
|
||||
|
||||
@Override
|
||||
public @Nullable IBinder onBind(Intent intent) {
|
||||
return null;
|
||||
}
|
||||
|
||||
private void fetch() {
|
||||
MessageRetriever retriever = ApplicationDependencies.getMessageRetriever();
|
||||
boolean success = retriever.retrieveMessages(this, new RestStrategy(), new RestStrategy());
|
||||
|
||||
if (success) {
|
||||
Log.i(TAG, "Successfully retrieved messages.");
|
||||
} else {
|
||||
if (Build.VERSION.SDK_INT >= 26) {
|
||||
Log.w(TAG, "Failed to retrieve messages. Scheduling on the system JobScheduler (API " + Build.VERSION.SDK_INT + ").");
|
||||
FcmJobService.schedule(this);
|
||||
} else {
|
||||
Log.w(TAG, "Failed to retrieve messages. Scheduling on JobManager (API " + Build.VERSION.SDK_INT + ").");
|
||||
ApplicationDependencies.getJobManager().add(new PushNotificationReceiveJob(this));
|
||||
}
|
||||
}
|
||||
|
||||
if (activeCount.decrementAndGet() == 0) {
|
||||
Log.e(TAG, "stopping");
|
||||
stopSelf();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -9,16 +9,14 @@ import android.content.Context;
|
|||
import androidx.annotation.NonNull;
|
||||
import androidx.annotation.RequiresApi;
|
||||
|
||||
import org.thoughtcrime.securesms.ApplicationContext;
|
||||
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
|
||||
import org.thoughtcrime.securesms.logging.Log;
|
||||
import org.thoughtcrime.securesms.notifications.MessageNotifier;
|
||||
import org.thoughtcrime.securesms.util.ServiceUtil;
|
||||
import org.thoughtcrime.securesms.util.TextSecurePreferences;
|
||||
import org.thoughtcrime.securesms.util.concurrent.SignalExecutors;
|
||||
|
||||
/**
|
||||
* Pulls down messages. Used when we fail to pull down messages in {@link FcmService}.
|
||||
* Pulls down messages. Used when we fail to pull down messages in {@link FcmReceiveService}.
|
||||
*/
|
||||
@RequiresApi(26)
|
||||
public class FcmJobService extends JobService {
|
||||
|
|
|
@ -1,25 +1,22 @@
|
|||
package org.thoughtcrime.securesms.gcm;
|
||||
|
||||
import android.content.Context;
|
||||
import android.os.Build;
|
||||
import android.content.Intent;
|
||||
|
||||
import androidx.annotation.NonNull;
|
||||
|
||||
import com.google.firebase.messaging.FirebaseMessagingService;
|
||||
import com.google.firebase.messaging.RemoteMessage;
|
||||
|
||||
import org.thoughtcrime.securesms.ApplicationContext;
|
||||
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
|
||||
import org.thoughtcrime.securesms.jobs.FcmRefreshJob;
|
||||
import org.thoughtcrime.securesms.jobs.PushNotificationReceiveJob;
|
||||
import org.thoughtcrime.securesms.logging.Log;
|
||||
import org.thoughtcrime.securesms.registration.PushChallengeRequest;
|
||||
import org.thoughtcrime.securesms.util.TextSecurePreferences;
|
||||
import org.thoughtcrime.securesms.util.concurrent.SignalExecutors;
|
||||
|
||||
public class FcmService extends FirebaseMessagingService {
|
||||
public class FcmReceiveService extends FirebaseMessagingService {
|
||||
|
||||
private static final String TAG = FcmService.class.getSimpleName();
|
||||
private static final String TAG = FcmReceiveService.class.getSimpleName();
|
||||
|
||||
@Override
|
||||
public void onMessageReceived(RemoteMessage remoteMessage) {
|
||||
|
@ -46,22 +43,7 @@ public class FcmService extends FirebaseMessagingService {
|
|||
}
|
||||
|
||||
private static void handleReceivedNotification(Context context) {
|
||||
MessageRetriever retriever = ApplicationDependencies.getMessageRetriever();
|
||||
boolean success = retriever.retrieveMessages(context, new RestStrategy(), new RestStrategy());
|
||||
|
||||
if (success) {
|
||||
Log.i(TAG, "Successfully retrieved messages.");
|
||||
} else {
|
||||
if (Build.VERSION.SDK_INT >= 26) {
|
||||
Log.w(TAG, "Failed to retrieve messages. Scheduling on the system JobScheduler (API " + Build.VERSION.SDK_INT + ").");
|
||||
FcmJobService.schedule(context);
|
||||
} else {
|
||||
Log.w(TAG, "Failed to retrieve messages. Scheduling on JobManager (API " + Build.VERSION.SDK_INT + ").");
|
||||
ApplicationDependencies.getJobManager().add(new PushNotificationReceiveJob(context));
|
||||
}
|
||||
}
|
||||
|
||||
Log.i(TAG, "Processing complete.");
|
||||
context.startService(new Intent(context, FcmFetchService.class));
|
||||
}
|
||||
|
||||
private static void handlePushChallenge(@NonNull String challenge) {
|
||||
|
@ -69,4 +51,4 @@ public class FcmService extends FirebaseMessagingService {
|
|||
|
||||
PushChallengeRequest.postChallengeResponse(challenge);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,64 @@
|
|||
package org.thoughtcrime.securesms.util.concurrent;
|
||||
|
||||
import androidx.annotation.NonNull;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
/**
|
||||
* Wraps another executor to make a new executor that only keeps around two tasks:
|
||||
* - The actively running task
|
||||
* - A single enqueued task
|
||||
*
|
||||
* If multiple tasks are enqueued while one is running, only the latest task is kept. The rest are
|
||||
* dropped.
|
||||
*
|
||||
* This is useful when you want to enqueue a bunch of tasks at unknown intervals, but only the most
|
||||
* recent one is relevant. For instance, running a query in response to changing user input.
|
||||
*
|
||||
* Based on SerialExecutor
|
||||
* https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
|
||||
*/
|
||||
public final class SerialMonoLifoExecutor implements Executor {
|
||||
private final Executor executor;
|
||||
|
||||
private Runnable next;
|
||||
private Runnable active;
|
||||
|
||||
public SerialMonoLifoExecutor(@NonNull Executor executor) {
|
||||
this.executor = executor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void execute(@NonNull Runnable command) {
|
||||
enqueue(command);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return True if a pending task was replaced by this one, otherwise false.
|
||||
*/
|
||||
public synchronized boolean enqueue(@NonNull Runnable command) {
|
||||
boolean performedReplace = next != null;
|
||||
|
||||
next = () -> {
|
||||
try {
|
||||
command.run();
|
||||
} finally {
|
||||
scheduleNext();
|
||||
}
|
||||
};
|
||||
|
||||
if (active == null) {
|
||||
scheduleNext();
|
||||
}
|
||||
|
||||
return performedReplace;
|
||||
}
|
||||
|
||||
private synchronized void scheduleNext() {
|
||||
active = next;
|
||||
next = null;
|
||||
if (active != null) {
|
||||
executor.execute(active);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -7,6 +7,7 @@ import androidx.lifecycle.MutableLiveData;
|
|||
|
||||
import com.annimon.stream.function.Predicate;
|
||||
|
||||
import org.thoughtcrime.securesms.util.concurrent.SerialMonoLifoExecutor;
|
||||
import org.thoughtcrime.securesms.util.concurrent.SignalExecutors;
|
||||
import org.whispersystems.libsignal.util.guava.Function;
|
||||
|
||||
|
@ -57,7 +58,7 @@ public final class LiveDataUtil {
|
|||
*/
|
||||
public static <A, B> LiveData<B> mapAsync(@NonNull Executor executor, @NonNull LiveData<A> source, @NonNull Function<A, B> backgroundFunction) {
|
||||
MediatorLiveData<B> outputLiveData = new MediatorLiveData<>();
|
||||
Executor liveDataExecutor = new SerialLiveDataExecutor(executor);
|
||||
Executor liveDataExecutor = new SerialMonoLifoExecutor(executor);
|
||||
|
||||
outputLiveData.addSource(source, currentValue -> liveDataExecutor.execute(() -> outputLiveData.postValue(backgroundFunction.apply(currentValue))));
|
||||
|
||||
|
@ -119,42 +120,4 @@ public final class LiveDataUtil {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executor decorator that runs serially but enqueues just the latest task, dropping any pending task.
|
||||
* <p>
|
||||
* Based on SerialExecutor https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html
|
||||
* but modified to represent a queue of size one which is replaced by the latest call to {@link #execute(Runnable)}.
|
||||
*/
|
||||
private static final class SerialLiveDataExecutor implements Executor {
|
||||
private final Executor executor;
|
||||
private Runnable next;
|
||||
private Runnable active;
|
||||
|
||||
SerialLiveDataExecutor(@NonNull Executor executor) {
|
||||
this.executor = executor;
|
||||
}
|
||||
|
||||
public synchronized void execute(@NonNull Runnable command) {
|
||||
next = () -> {
|
||||
try {
|
||||
command.run();
|
||||
} finally {
|
||||
scheduleNext();
|
||||
}
|
||||
};
|
||||
|
||||
if (active == null) {
|
||||
scheduleNext();
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void scheduleNext() {
|
||||
active = next;
|
||||
next = null;
|
||||
if (active != null) {
|
||||
executor.execute(active);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue