From 575413cac99bf0f11c04acfe8652eda2cde193f5 Mon Sep 17 00:00:00 2001 From: Alan Evans Date: Mon, 20 Jul 2020 11:09:42 -0300 Subject: [PATCH] Wait for message queue to drain before updating v2 groups. --- .../v2/processing/GroupsV2StateProcessor.java | 4 +- .../securesms/jobs/JobManagerFactories.java | 1 + .../securesms/jobs/PushProcessMessageJob.java | 20 ++-- .../securesms/jobs/RequestGroupV2InfoJob.java | 50 +++----- .../jobs/RequestGroupV2InfoWorkerJob.java | 111 ++++++++++++++++++ 5 files changed, 141 insertions(+), 45 deletions(-) create mode 100644 app/src/main/java/org/thoughtcrime/securesms/jobs/RequestGroupV2InfoWorkerJob.java diff --git a/app/src/main/java/org/thoughtcrime/securesms/groups/v2/processing/GroupsV2StateProcessor.java b/app/src/main/java/org/thoughtcrime/securesms/groups/v2/processing/GroupsV2StateProcessor.java index 720c19cba..3752ef43d 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/groups/v2/processing/GroupsV2StateProcessor.java +++ b/app/src/main/java/org/thoughtcrime/securesms/groups/v2/processing/GroupsV2StateProcessor.java @@ -27,6 +27,7 @@ import org.thoughtcrime.securesms.groups.GroupsV2Authorization; import org.thoughtcrime.securesms.groups.v2.ProfileKeySet; import org.thoughtcrime.securesms.jobmanager.JobManager; import org.thoughtcrime.securesms.jobs.AvatarGroupsV2DownloadJob; +import org.thoughtcrime.securesms.jobs.RequestGroupV2InfoJob; import org.thoughtcrime.securesms.jobs.RetrieveProfileJob; import org.thoughtcrime.securesms.keyvalue.SignalStore; import org.thoughtcrime.securesms.logging.Log; @@ -192,7 +193,8 @@ public final class GroupsV2StateProcessor { GlobalGroupState remainingWork = advanceGroupStateResult.getNewGlobalGroupState(); if (remainingWork.getServerHistory().size() > 0) { - Log.i(TAG, String.format(Locale.US, "There are more revisions on the server for this group, not applying at this time, V[%d..%d]", newLocalState.getRevision() + 1, remainingWork.getLatestRevisionNumber())); + Log.i(TAG, String.format(Locale.US, "There are more revisions on the server for this group, scheduling for later, V[%d..%d]", newLocalState.getRevision() + 1, remainingWork.getLatestRevisionNumber())); + ApplicationDependencies.getJobManager().add(new RequestGroupV2InfoJob(groupId, remainingWork.getLatestRevisionNumber())); } return new GroupUpdateResult(GroupState.GROUP_UPDATED, newLocalState); diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java index 0c7e5d6b7..e070e7ccc 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/JobManagerFactories.java @@ -98,6 +98,7 @@ public final class JobManagerFactories { put(RequestGroupInfoJob.KEY, new RequestGroupInfoJob.Factory()); put(ResumableUploadSpecJob.KEY, new ResumableUploadSpecJob.Factory()); put(StorageAccountRestoreJob.KEY, new StorageAccountRestoreJob.Factory()); + put(RequestGroupV2InfoWorkerJob.KEY, new RequestGroupV2InfoWorkerJob.Factory()); put(RequestGroupV2InfoJob.KEY, new RequestGroupV2InfoJob.Factory()); put(WakeGroupV2Job.KEY, new WakeGroupV2Job.Factory()); put(GroupV2UpdateSelfProfileKeyJob.KEY, new GroupV2UpdateSelfProfileKeyJob.Factory()); diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushProcessMessageJob.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushProcessMessageJob.java index 6e0949c8d..35f7b95e4 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/PushProcessMessageJob.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/PushProcessMessageJob.java @@ -222,12 +222,16 @@ public final class PushProcessMessageJob extends BaseJob { this.timestamp = timestamp; } + static @NonNull String getQueueName(@NonNull RecipientId recipientId) { + return QUEUE_PREFIX + recipientId.toQueueKey(); + } + @WorkerThread private static @NonNull Parameters createParameters(@Nullable SignalServiceContent content, @Nullable ExceptionMetadata exceptionMetadata) { - Context context = ApplicationDependencies.getApplication(); - String queueSuffix = ""; - Parameters.Builder builder = new Parameters.Builder() - .setMaxAttempts(Parameters.UNLIMITED); + Context context = ApplicationDependencies.getApplication(); + String queueName = QUEUE_PREFIX; + Parameters.Builder builder = new Parameters.Builder() + .setMaxAttempts(Parameters.UNLIMITED); if (content != null) { if (content.getDataMessage().isPresent() && content.getDataMessage().get().getGroupContext().isPresent()) { @@ -236,7 +240,7 @@ public final class PushProcessMessageJob extends BaseJob { GroupId groupId = GroupUtil.idFromGroupContext(signalServiceGroupContext); Recipient recipient = Recipient.externalGroup(context, groupId); - queueSuffix = recipient.getId().toQueueKey(); + queueName = getQueueName(recipient.getId()); if (groupId.isV2()) { int localRevision = DatabaseFactory.getGroupDatabase(context) @@ -251,15 +255,15 @@ public final class PushProcessMessageJob extends BaseJob { Log.w(TAG, "Bad groupId! Using default queue."); } } else { - queueSuffix = RecipientId.fromHighTrust(content.getSender()).toQueueKey(); + queueName = getQueueName(RecipientId.fromHighTrust(content.getSender())); } } else if (exceptionMetadata != null) { Recipient recipient = exceptionMetadata.groupId != null ? Recipient.externalGroup(context, exceptionMetadata.groupId) : Recipient.external(context, exceptionMetadata.sender); - queueSuffix = recipient.getId().toQueueKey(); + queueName = getQueueName(recipient.getId()); } - builder.setQueue(QUEUE_PREFIX + queueSuffix); + builder.setQueue(queueName); return builder.build(); } diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/RequestGroupV2InfoJob.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/RequestGroupV2InfoJob.java index 4cac95af0..687d5623a 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/RequestGroupV2InfoJob.java +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/RequestGroupV2InfoJob.java @@ -2,25 +2,17 @@ package org.thoughtcrime.securesms.jobs; import androidx.annotation.NonNull; -import org.thoughtcrime.securesms.database.DatabaseFactory; -import org.thoughtcrime.securesms.database.GroupDatabase; -import org.thoughtcrime.securesms.groups.GroupChangeBusyException; +import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; import org.thoughtcrime.securesms.groups.GroupId; -import org.thoughtcrime.securesms.groups.GroupManager; -import org.thoughtcrime.securesms.groups.GroupNotAMemberException; import org.thoughtcrime.securesms.groups.v2.processing.GroupsV2StateProcessor; import org.thoughtcrime.securesms.jobmanager.Data; import org.thoughtcrime.securesms.jobmanager.Job; -import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint; +import org.thoughtcrime.securesms.jobmanager.impl.WebsocketDrainedConstraint; import org.thoughtcrime.securesms.logging.Log; -import org.thoughtcrime.securesms.util.FeatureFlags; -import org.whispersystems.libsignal.util.guava.Optional; -import org.whispersystems.signalservice.api.groupsv2.NoCredentialForRedemptionTimeException; -import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; +/** + * Schedules a {@link RequestGroupV2InfoWorkerJob} to happen after message queues are drained. + */ public final class RequestGroupV2InfoJob extends BaseJob { public static final String KEY = "RequestGroupV2InfoJob"; @@ -34,11 +26,13 @@ public final class RequestGroupV2InfoJob extends BaseJob { private final GroupId.V2 groupId; private final int toRevision; + /** + * Get a particular group state revision for group after message queues are drained. + */ public RequestGroupV2InfoJob(@NonNull GroupId.V2 groupId, int toRevision) { this(new Parameters.Builder() - .setQueue("RequestGroupV2InfoJob::" + groupId) - .addConstraint(NetworkConstraint.KEY) - .setLifespan(TimeUnit.DAYS.toMillis(1)) + .setQueue("RequestGroupV2InfoSyncJob") + .addConstraint(WebsocketDrainedConstraint.KEY) .setMaxAttempts(Parameters.UNLIMITED) .build(), groupId, @@ -46,7 +40,7 @@ public final class RequestGroupV2InfoJob extends BaseJob { } /** - * Get latest group state for group. + * Get latest group state for group after message queues are drained. */ public RequestGroupV2InfoJob(@NonNull GroupId.V2 groupId) { this(groupId, GroupsV2StateProcessor.LATEST); @@ -72,29 +66,13 @@ public final class RequestGroupV2InfoJob extends BaseJob { } @Override - public void onRun() throws IOException, GroupNotAMemberException, GroupChangeBusyException { - if (!FeatureFlags.groupsV2()) { - Log.w(TAG, "Group update skipped due to feature flag " + groupId); - return; - } - - Log.i(TAG, "Updating group to revision " + toRevision); - - Optional group = DatabaseFactory.getGroupDatabase(context).getGroup(groupId); - - if (!group.isPresent()) { - Log.w(TAG, "Group not found"); - return; - } - - GroupManager.updateGroupFromServer(context, group.get().requireV2GroupProperties().getGroupMasterKey(), toRevision, System.currentTimeMillis(), null); + public void onRun() { + ApplicationDependencies.getJobManager().add(new RequestGroupV2InfoWorkerJob(groupId, toRevision)); } @Override public boolean onShouldRetry(@NonNull Exception e) { - return e instanceof PushNetworkException || - e instanceof NoCredentialForRedemptionTimeException || - e instanceof GroupChangeBusyException; + return false; } @Override diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/RequestGroupV2InfoWorkerJob.java b/app/src/main/java/org/thoughtcrime/securesms/jobs/RequestGroupV2InfoWorkerJob.java new file mode 100644 index 000000000..60ac892e1 --- /dev/null +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/RequestGroupV2InfoWorkerJob.java @@ -0,0 +1,111 @@ +package org.thoughtcrime.securesms.jobs; + +import androidx.annotation.NonNull; +import androidx.annotation.WorkerThread; + +import org.thoughtcrime.securesms.database.DatabaseFactory; +import org.thoughtcrime.securesms.database.GroupDatabase; +import org.thoughtcrime.securesms.dependencies.ApplicationDependencies; +import org.thoughtcrime.securesms.groups.GroupChangeBusyException; +import org.thoughtcrime.securesms.groups.GroupId; +import org.thoughtcrime.securesms.groups.GroupManager; +import org.thoughtcrime.securesms.groups.GroupNotAMemberException; +import org.thoughtcrime.securesms.jobmanager.Data; +import org.thoughtcrime.securesms.jobmanager.Job; +import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint; +import org.thoughtcrime.securesms.logging.Log; +import org.thoughtcrime.securesms.recipients.Recipient; +import org.thoughtcrime.securesms.util.FeatureFlags; +import org.whispersystems.libsignal.util.guava.Optional; +import org.whispersystems.signalservice.api.groupsv2.NoCredentialForRedemptionTimeException; +import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * Scheduled by {@link RequestGroupV2InfoJob} after message queues are drained. + */ +final class RequestGroupV2InfoWorkerJob extends BaseJob { + + public static final String KEY = "RequestGroupV2InfoWorkerJob"; + + private static final String TAG = Log.tag(RequestGroupV2InfoWorkerJob.class); + + private static final String KEY_GROUP_ID = "group_id"; + private static final String KEY_TO_REVISION = "to_revision"; + + private final GroupId.V2 groupId; + private final int toRevision; + + @WorkerThread + RequestGroupV2InfoWorkerJob(@NonNull GroupId.V2 groupId, int toRevision) { + this(new Parameters.Builder() + .setQueue(PushProcessMessageJob.getQueueName(Recipient.externalGroup(ApplicationDependencies.getApplication(), groupId).getId())) + .addConstraint(NetworkConstraint.KEY) + .setLifespan(TimeUnit.DAYS.toMillis(1)) + .setMaxAttempts(Parameters.UNLIMITED) + .build(), + groupId, + toRevision); + } + + private RequestGroupV2InfoWorkerJob(@NonNull Parameters parameters, @NonNull GroupId.V2 groupId, int toRevision) { + super(parameters); + + this.groupId = groupId; + this.toRevision = toRevision; + } + + @Override + public @NonNull Data serialize() { + return new Data.Builder().putString(KEY_GROUP_ID, groupId.toString()) + .putInt(KEY_TO_REVISION, toRevision) + .build(); + } + + @Override + public @NonNull String getFactoryKey() { + return KEY; + } + + @Override + public void onRun() throws IOException, GroupNotAMemberException, GroupChangeBusyException { + if (!FeatureFlags.groupsV2()) { + Log.w(TAG, "Group update skipped due to feature flag " + groupId); + return; + } + + Log.i(TAG, "Updating group to revision " + toRevision); + + Optional group = DatabaseFactory.getGroupDatabase(context).getGroup(groupId); + + if (!group.isPresent()) { + Log.w(TAG, "Group not found"); + return; + } + + GroupManager.updateGroupFromServer(context, group.get().requireV2GroupProperties().getGroupMasterKey(), toRevision, System.currentTimeMillis(), null); + } + + @Override + public boolean onShouldRetry(@NonNull Exception e) { + return e instanceof PushNetworkException || + e instanceof NoCredentialForRedemptionTimeException || + e instanceof GroupChangeBusyException; + } + + @Override + public void onFailure() { + } + + public static final class Factory implements Job.Factory { + + @Override + public @NonNull RequestGroupV2InfoWorkerJob create(@NonNull Parameters parameters, @NonNull Data data) { + return new RequestGroupV2InfoWorkerJob(parameters, + GroupId.parseOrThrow(data.getString(KEY_GROUP_ID)).requireV2(), + data.getInt(KEY_TO_REVISION)); + } + } +}