Improve detection of websocket drained status.

Will now work when you lose and regain network. Also removes the
unnecessary InitialMessageRetriever.
master
Greyson Parrelli 2020-07-21 10:38:42 -04:00 committed by GitHub
parent 96ce42ae91
commit 662f0b8fb6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 106 additions and 266 deletions

View File

@ -51,7 +51,6 @@ import org.thoughtcrime.securesms.logging.CustomSignalProtocolLogger;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.logging.PersistentLogger;
import org.thoughtcrime.securesms.logging.SignalUncaughtExceptionHandler;
import org.thoughtcrime.securesms.messages.InitialMessageRetriever;
import org.thoughtcrime.securesms.migrations.ApplicationMigrations;
import org.thoughtcrime.securesms.notifications.NotificationChannels;
import org.thoughtcrime.securesms.providers.BlobProvider;
@ -61,7 +60,6 @@ import org.thoughtcrime.securesms.revealable.ViewOnceMessageManager;
import org.thoughtcrime.securesms.ringrtc.RingRtcLogger;
import org.thoughtcrime.securesms.service.DirectoryRefreshListener;
import org.thoughtcrime.securesms.service.ExpiringMessageManager;
import org.thoughtcrime.securesms.messages.IncomingMessageObserver;
import org.thoughtcrime.securesms.service.KeyCachingService;
import org.thoughtcrime.securesms.service.LocalBackupListener;
import org.thoughtcrime.securesms.service.RotateSenderCertificateListener;
@ -97,7 +95,6 @@ public class ApplicationContext extends MultiDexApplication implements DefaultLi
private ViewOnceMessageManager viewOnceMessageManager;
private TypingStatusRepository typingStatusRepository;
private TypingStatusSender typingStatusSender;
private IncomingMessageObserver incomingMessageObserver;
private PersistentLogger persistentLogger;
private volatile boolean isAppVisible;
@ -157,7 +154,6 @@ public class ApplicationContext extends MultiDexApplication implements DefaultLi
KeyCachingService.onAppForegrounded(this);
ApplicationDependencies.getFrameRateTracker().begin();
ApplicationDependencies.getMegaphoneRepository().onAppForegrounded();
catchUpOnMessages();
}
@Override
@ -234,7 +230,7 @@ public class ApplicationContext extends MultiDexApplication implements DefaultLi
}
public void initializeMessageRetrieval() {
this.incomingMessageObserver = new IncomingMessageObserver(this);
ApplicationDependencies.getIncomingMessageObserver();
}
private void initializeAppDependencies() {
@ -382,36 +378,6 @@ public class ApplicationContext extends MultiDexApplication implements DefaultLi
});
}
private void catchUpOnMessages() {
InitialMessageRetriever retriever = ApplicationDependencies.getInitialMessageRetriever();
if (retriever.isCaughtUp()) {
return;
}
SignalExecutors.UNBOUNDED.execute(() -> {
long startTime = System.currentTimeMillis();
switch (retriever.begin(TimeUnit.SECONDS.toMillis(60))) {
case SUCCESS:
Log.i(TAG, "Successfully caught up on messages. " + (System.currentTimeMillis() - startTime) + " ms");
break;
case FAILURE_TIMEOUT:
Log.w(TAG, "Did not finish catching up due to a timeout. " + (System.currentTimeMillis() - startTime) + " ms");
break;
case FAILURE_ERROR:
Log.w(TAG, "Did not finish catching up due to an error. " + (System.currentTimeMillis() - startTime) + " ms");
break;
case SKIPPED_ALREADY_CAUGHT_UP:
Log.i(TAG, "Already caught up. " + (System.currentTimeMillis() - startTime) + " ms");
break;
case SKIPPED_ALREADY_RUNNING:
Log.i(TAG, "Already in the process of catching up. " + (System.currentTimeMillis() - startTime) + " ms");
break;
}
});
}
@Override
protected void attachBaseContext(Context base) {
super.attachBaseContext(DynamicLanguageContextWrapper.updateContext(base, TextSecurePreferences.getLanguage(base)));

View File

@ -14,8 +14,6 @@ import org.thoughtcrime.securesms.jobmanager.JobManager;
import org.thoughtcrime.securesms.keyvalue.KeyValueStore;
import org.thoughtcrime.securesms.keyvalue.SignalStore;
import org.thoughtcrime.securesms.megaphone.MegaphoneRepository;
import org.thoughtcrime.securesms.messages.InitialMessageRetriever;
import org.thoughtcrime.securesms.notifications.DefaultMessageNotifier;
import org.thoughtcrime.securesms.notifications.MessageNotifier;
import org.thoughtcrime.securesms.push.SignalServiceNetworkAccess;
import org.thoughtcrime.securesms.recipients.LiveRecipientCache;
@ -48,6 +46,7 @@ public class ApplicationDependencies {
private static SignalServiceAccountManager accountManager;
private static SignalServiceMessageSender messageSender;
private static SignalServiceMessageReceiver messageReceiver;
private static IncomingMessageObserver incomingMessageObserver;
private static IncomingMessageProcessor incomingMessageProcessor;
private static BackgroundMessageRetriever backgroundMessageRetriever;
private static LiveRecipientCache recipientCache;
@ -59,7 +58,6 @@ public class ApplicationDependencies {
private static GroupsV2StateProcessor groupsV2StateProcessor;
private static GroupsV2Operations groupsV2Operations;
private static EarlyMessageCache earlyMessageCache;
private static InitialMessageRetriever initialMessageRetriever;
private static MessageNotifier messageNotifier;
@MainThread
@ -242,21 +240,21 @@ public class ApplicationDependencies {
return earlyMessageCache;
}
public static synchronized @NonNull InitialMessageRetriever getInitialMessageRetriever() {
assertInitialization();
if (initialMessageRetriever == null) {
initialMessageRetriever = provider.provideInitialMessageRetriever();
}
return initialMessageRetriever;
}
public static synchronized @NonNull MessageNotifier getMessageNotifier() {
assertInitialization();
return messageNotifier;
}
public static synchronized @NonNull IncomingMessageObserver getIncomingMessageObserver() {
assertInitialization();
if (incomingMessageObserver == null) {
incomingMessageObserver = provider.provideIncomingMessageObserver();
}
return incomingMessageObserver;
}
private static void assertInitialization() {
if (application == null || provider == null) {
throw new UninitializedException();
@ -277,8 +275,8 @@ public class ApplicationDependencies {
@NonNull KeyValueStore provideKeyValueStore();
@NonNull MegaphoneRepository provideMegaphoneRepository();
@NonNull EarlyMessageCache provideEarlyMessageCache();
@NonNull InitialMessageRetriever provideInitialMessageRetriever();
@NonNull MessageNotifier provideMessageNotifier();
@NonNull IncomingMessageObserver provideIncomingMessageObserver();
}
private static class UninitializedException extends IllegalStateException {

View File

@ -29,7 +29,6 @@ import org.thoughtcrime.securesms.jobs.JobManagerFactories;
import org.thoughtcrime.securesms.keyvalue.KeyValueStore;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.megaphone.MegaphoneRepository;
import org.thoughtcrime.securesms.messages.InitialMessageRetriever;
import org.thoughtcrime.securesms.notifications.DefaultMessageNotifier;
import org.thoughtcrime.securesms.notifications.MessageNotifier;
import org.thoughtcrime.securesms.notifications.OptimizedMessageNotifier;
@ -55,7 +54,6 @@ import org.whispersystems.signalservice.api.util.UptimeSleepTimer;
import org.whispersystems.signalservice.api.websocket.ConnectivityListener;
import java.util.UUID;
import java.util.concurrent.Executors;
/**
* Implementation of {@link ApplicationDependencies.Provider} that provides real app dependencies.
@ -171,13 +169,13 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr
}
@Override
public @NonNull InitialMessageRetriever provideInitialMessageRetriever() {
return new InitialMessageRetriever();
public @NonNull MessageNotifier provideMessageNotifier() {
return new OptimizedMessageNotifier(new DefaultMessageNotifier());
}
@Override
public @NonNull MessageNotifier provideMessageNotifier() {
return new OptimizedMessageNotifier(new DefaultMessageNotifier());
public @NonNull IncomingMessageObserver provideIncomingMessageObserver() {
return new IncomingMessageObserver(context);
}
private static class DynamicCredentialsProvider implements CredentialsProvider {

View File

@ -10,7 +10,7 @@ import org.thoughtcrime.securesms.jobmanager.Constraint;
/**
* A constraint that is met once we have pulled down all messages from the websocket during initial
* load. See {@link org.thoughtcrime.securesms.messages.InitialMessageRetriever}.
* load. See {@link org.thoughtcrime.securesms.messages.IncomingMessageObserver}.
*/
public final class WebsocketDrainedConstraint implements Constraint {
@ -21,7 +21,7 @@ public final class WebsocketDrainedConstraint implements Constraint {
@Override
public boolean isMet() {
return ApplicationDependencies.getInitialMessageRetriever().isCaughtUp();
return ApplicationDependencies.getIncomingMessageObserver().isWebsocketDrained();
}
@Override

View File

@ -6,8 +6,8 @@ import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.jobmanager.ConstraintObserver;
/**
* An observer for {@link WebsocketDrainedConstraint}. Will fire when the
* {@link org.thoughtcrime.securesms.messages.InitialMessageRetriever} is caught up.
* An observer for {@link WebsocketDrainedConstraint}. Will fire when the websocket is drained
* (i.e. it has received an empty response).
*/
public class WebsocketDrainedConstraintObserver implements ConstraintObserver {
@ -16,7 +16,7 @@ public class WebsocketDrainedConstraintObserver implements ConstraintObserver {
private volatile Notifier notifier;
public WebsocketDrainedConstraintObserver() {
ApplicationDependencies.getInitialMessageRetriever().addListener(() -> {
ApplicationDependencies.getIncomingMessageObserver().addWebsocketDrainedListener(() -> {
if (notifier != null) {
notifier.onConstraintMet(REASON);
}

View File

@ -29,7 +29,6 @@ public class BackgroundMessageRetriever {
private static final Semaphore ACTIVE_LOCK = new Semaphore(2);
private static final long CATCHUP_TIMEOUT = TimeUnit.SECONDS.toMillis(60);
private static final long NORMAL_TIMEOUT = TimeUnit.SECONDS.toMillis(10);
/**
@ -64,21 +63,8 @@ public class BackgroundMessageRetriever {
Log.w(TAG, "We may be operating in a constrained environment. Doze: " + doze + " Network: " + network);
}
if (ApplicationDependencies.getInitialMessageRetriever().isCaughtUp()) {
Log.i(TAG, "Performing normal message fetch.");
return executeBackgroundRetrieval(context, startTime, strategies);
} else {
Log.i(TAG, "Performing initial message fetch.");
InitialMessageRetriever.Result result = ApplicationDependencies.getInitialMessageRetriever().begin(CATCHUP_TIMEOUT);
if (result == InitialMessageRetriever.Result.SUCCESS) {
Log.i(TAG, "Initial message request was completed successfully. " + logSuffix(startTime));
TextSecurePreferences.setNeedsMessagePull(context, false);
return true;
} else {
Log.w(TAG, "Initial message fetch returned result " + result + ", so doing a normal message fetch.");
return executeBackgroundRetrieval(context, System.currentTimeMillis(), strategies);
}
}
Log.i(TAG, "Performing normal message fetch.");
return executeBackgroundRetrieval(context, startTime, strategies);
} finally {
WakeLockUtil.release(wakeLock, WAKE_LOCK_TAG);
ACTIVE_LOCK.release();

View File

@ -4,8 +4,12 @@ import android.app.Service;
import androidx.lifecycle.DefaultLifecycleObserver;
import androidx.lifecycle.LifecycleOwner;
import androidx.lifecycle.ProcessLifecycleOwner;
import android.content.BroadcastReceiver;
import android.content.Context;
import android.content.Intent;
import android.content.IntentFilter;
import android.net.ConnectivityManager;
import android.os.IBinder;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
@ -25,13 +29,17 @@ import org.thoughtcrime.securesms.notifications.NotificationChannels;
import org.thoughtcrime.securesms.push.SignalServiceNetworkAccess;
import org.thoughtcrime.securesms.util.TextSecurePreferences;
import org.whispersystems.libsignal.InvalidVersionException;
import org.whispersystems.libsignal.util.guava.Optional;
import org.whispersystems.signalservice.api.SignalServiceMessagePipe;
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver;
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class IncomingMessageObserver implements ConstraintObserver.Notifier {
public class IncomingMessageObserver {
private static final String TAG = IncomingMessageObserver.class.getSimpleName();
@ -41,19 +49,19 @@ public class IncomingMessageObserver implements ConstraintObserver.Notifier {
private static SignalServiceMessagePipe pipe = null;
private static SignalServiceMessagePipe unidentifiedPipe = null;
private final Context context;
private final NetworkConstraint networkConstraint;
private final SignalServiceNetworkAccess networkAccess;
private final Context context;
private final SignalServiceNetworkAccess networkAccess;
private final List<Runnable> websocketDrainedListeners;
private boolean appVisible;
private volatile boolean websocketDrained;
public IncomingMessageObserver(@NonNull Context context) {
this.context = context;
this.networkConstraint = new NetworkConstraint.Factory(ApplicationContext.getInstance(context)).create();
this.networkAccess = ApplicationDependencies.getSignalServiceNetworkAccess();
this.context = context;
this.networkAccess = ApplicationDependencies.getSignalServiceNetworkAccess();
this.websocketDrainedListeners = new CopyOnWriteArrayList<>();
new NetworkConstraintObserver(ApplicationContext.getInstance(context)).register(this);
new MessageRetrievalThread().start();
if (TextSecurePreferences.isFcmDisabled(context)) {
@ -72,16 +80,32 @@ public class IncomingMessageObserver implements ConstraintObserver.Notifier {
}
});
ApplicationDependencies.getInitialMessageRetriever().addListener(this::onInitialRetrievalComplete);
context.registerReceiver(new BroadcastReceiver() {
@Override
public void onReceive(Context context, Intent intent) {
synchronized (IncomingMessageObserver.this) {
if (!NetworkConstraint.isMet(context)) {
Log.w(TAG, "Lost network connection. Shutting down our websocket connections and resetting the drained state.");
websocketDrained = false;
shutdown(pipe, unidentifiedPipe);
}
IncomingMessageObserver.this.notifyAll();
}
}
}, new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION));
}
@Override
public void onConstraintMet(@NonNull String reason) {
synchronized (this) {
notifyAll();
public synchronized void addWebsocketDrainedListener(@NonNull Runnable listener) {
websocketDrainedListeners.add(listener);
if (websocketDrained) {
listener.run();
}
}
public boolean isWebsocketDrained() {
return websocketDrained;
}
private synchronized void onAppForegrounded() {
appVisible = true;
notifyAll();
@ -92,21 +116,19 @@ public class IncomingMessageObserver implements ConstraintObserver.Notifier {
notifyAll();
}
private synchronized void onInitialRetrievalComplete() {
notifyAll();
}
private synchronized boolean isConnectionNecessary() {
boolean isGcmDisabled = TextSecurePreferences.isFcmDisabled(context);
boolean registered = TextSecurePreferences.isPushRegistered(context);
boolean websocketRegistered = TextSecurePreferences.isWebsocketRegistered(context);
boolean isGcmDisabled = TextSecurePreferences.isFcmDisabled(context);
boolean hasNetwork = NetworkConstraint.isMet(context);
Log.d(TAG, String.format("Network requirement: %s, app visible: %s, gcm disabled: %b",
networkConstraint.isMet(), appVisible, isGcmDisabled));
Log.d(TAG, String.format("Network: %s, Foreground: %s, FCM: %s, Censored: %s, Registered: %s, Websocket Registered: %s",
hasNetwork, appVisible, !isGcmDisabled, networkAccess.isCensored(context), registered, websocketRegistered));
return TextSecurePreferences.isPushRegistered(context) &&
TextSecurePreferences.isWebsocketRegistered(context) &&
(appVisible || isGcmDisabled) &&
networkConstraint.isMet() &&
ApplicationDependencies.getInitialMessageRetriever().isCaughtUp() &&
return registered &&
websocketRegistered &&
(appVisible || isGcmDisabled) &&
hasNetwork &&
!networkAccess.isCensored(context);
}
@ -118,12 +140,21 @@ public class IncomingMessageObserver implements ConstraintObserver.Notifier {
}
}
private void shutdown(SignalServiceMessagePipe pipe, SignalServiceMessagePipe unidentifiedPipe) {
private void shutdown(@Nullable SignalServiceMessagePipe pipe, @Nullable SignalServiceMessagePipe unidentifiedPipe) {
try {
pipe.shutdown();
unidentifiedPipe.shutdown();
if (pipe != null) {
pipe.shutdown();
}
} catch (Throwable t) {
Log.w(TAG, t);
Log.w(TAG, "Closing normal pipe failed!", t);
}
try {
if (unidentifiedPipe != null) {
unidentifiedPipe.shutdown();
}
} catch (Throwable t) {
Log.w(TAG, "Closing unidentified pipe failed!", t);
}
}
@ -160,14 +191,22 @@ public class IncomingMessageObserver implements ConstraintObserver.Notifier {
try {
while (isConnectionNecessary()) {
try {
Log.i(TAG, "Reading message...");
localPipe.read(REQUEST_TIMEOUT_MINUTES, TimeUnit.MINUTES,
envelope -> {
Log.i(TAG, "Retrieved envelope! " + envelope.getTimestamp());
try (Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) {
processor.processEnvelope(envelope);
}
});
Log.d(TAG, "Reading message...");
Optional<SignalServiceEnvelope> result = localPipe.readOrEmpty(REQUEST_TIMEOUT_MINUTES, TimeUnit.MINUTES, envelope -> {
Log.i(TAG, "Retrieved envelope! " + envelope.getTimestamp());
try (Processor processor = ApplicationDependencies.getIncomingMessageProcessor().acquire()) {
processor.processEnvelope(envelope);
}
});
if (!result.isPresent() && !websocketDrained) {
Log.i(TAG, "Websocket was newly-drained. Triggering listeners.");
websocketDrained = true;
for (Runnable listener : websocketDrainedListeners) {
listener.run();
}
}
} catch (TimeoutException e) {
Log.w(TAG, "Application level read timeout...");
} catch (InvalidVersionException e) {

View File

@ -1,151 +0,0 @@
package org.thoughtcrime.securesms.messages;
import android.content.Context;
import androidx.annotation.NonNull;
import androidx.annotation.WorkerThread;
import org.thoughtcrime.securesms.ApplicationContext;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.util.concurrent.SignalExecutors;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* Fetches the first batch of messages, before anything else does.
*
* We have a separate process for fetching "initial" messages in order to have special behavior when
* catching up on a lot of messages after being offline for a while. It also gives us an opportunity
* to flag when we are "up-to-date" with our message queue.
*/
public class InitialMessageRetriever {
private static final String TAG = Log.tag(InitialMessageRetriever.class);
private static final int MAX_ATTEMPTS = 3;
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
private State state = State.NOT_CAUGHT_UP;
private final Object STATE_LOCK = new Object();
/**
* Only fires once. No need to remove. It will be called on an arbitrary worker thread.
*/
public void addListener(@NonNull Listener listener) {
synchronized (STATE_LOCK) {
if (state == State.CAUGHT_UP) {
listener.onCaughtUp();
} else {
listeners.add(listener);
}
}
}
/**
* Performs the initial fetch for messages (if necessary) with the requested timeout. The timeout
* is not just for the initial network request, but for the entire method call.
*
* @return A result describing how the operation completed.
*/
@WorkerThread
public @NonNull Result begin(long timeout) {
synchronized (STATE_LOCK) {
if (state == State.CAUGHT_UP) {
return Result.SKIPPED_ALREADY_CAUGHT_UP;
} else if (state == State.RUNNING) {
return Result.SKIPPED_ALREADY_RUNNING;
}
state = State.RUNNING;
}
long startTime = System.currentTimeMillis();
MessageRetrievalStrategy messageRetrievalStrategy = getRetriever();
CountDownLatch latch = new CountDownLatch(1);
SignalExecutors.UNBOUNDED.execute(() -> {
for (int i = 0; i < MAX_ATTEMPTS; i++) {
if (messageRetrievalStrategy.isCanceled()) {
Log.w(TAG, "Invalidated! Ending attempts.");
break;
}
boolean success = getRetriever().execute(timeout);
if (success) {
break;
} else {
Log.w(TAG, "Failed to catch up! Attempt " + (i + 1) + "/" + MAX_ATTEMPTS);
}
}
latch.countDown();
});
try {
boolean success = latch.await(timeout, TimeUnit.MILLISECONDS);
synchronized (STATE_LOCK) {
state = State.CAUGHT_UP;
for (Listener listener : listeners) {
listener.onCaughtUp();
}
listeners.clear();
}
ApplicationDependencies.getMessageNotifier().updateNotification(ApplicationDependencies.getApplication());
if (success) {
Log.i(TAG, "Successfully caught up in " + (System.currentTimeMillis() - startTime) + " ms.");
return Result.SUCCESS;
} else {
Log.i(TAG, "Could not catch up completely. Hit the timeout of " + timeout + " ms.");
messageRetrievalStrategy.cancel();
return Result.FAILURE_TIMEOUT;
}
} catch (InterruptedException e) {
Log.w(TAG, "Interrupted!", e);
return Result.FAILURE_ERROR;
}
}
public boolean isCaughtUp() {
synchronized (STATE_LOCK) {
return state == State.CAUGHT_UP;
}
}
private @NonNull MessageRetrievalStrategy getRetriever() {
Context context = ApplicationDependencies.getApplication();
if (ApplicationContext.getInstance(context).isAppVisible() &&
!ApplicationDependencies.getSignalServiceNetworkAccess().isCensored(context))
{
return new WebsocketStrategy();
} else {
return new RestStrategy();
}
}
private enum State {
NOT_CAUGHT_UP, RUNNING, CAUGHT_UP
}
public enum Result {
SUCCESS, FAILURE_TIMEOUT, FAILURE_ERROR, SKIPPED_ALREADY_CAUGHT_UP, SKIPPED_ALREADY_RUNNING
}
public interface Listener {
@WorkerThread
void onCaughtUp();
}
}

View File

@ -132,8 +132,10 @@ public class SignalServiceMessagePipe {
* {@link Optional#absent()} when an empty response is hit, which indicates the websocket is
* empty.
*
* Important: The empty response will only be hit once for each instance of {@link SignalServiceMessagePipe}.
* That means subsequent calls will block until an envelope is available.
* Important: The empty response will only be hit once for each connection. That means if you get
* an empty response and call readOrEmpty() again on the same instance, you will not get an empty
* response, and instead will block until you get an actual message. This will, however, reset if
* connection breaks (if, for instance, you lose and regain network).
*/
public Optional<SignalServiceEnvelope> readOrEmpty(long timeout, TimeUnit unit, MessagePipeCallback callback)
throws TimeoutException, IOException, InvalidVersionException

View File

@ -150,6 +150,8 @@ public class WebSocketConnection extends WebSocketListener {
keepAliveSender.shutdown();
keepAliveSender = null;
}
notifyAll();
}
public synchronized WebSocketRequestMessage readRequest(long timeoutMillis)