Wait for message queue to drain before updating v2 groups.

master
Alan Evans 2020-07-20 11:09:42 -03:00 committed by GitHub
parent 6a9476c6d0
commit 575413cac9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 141 additions and 45 deletions

View File

@ -27,6 +27,7 @@ import org.thoughtcrime.securesms.groups.GroupsV2Authorization;
import org.thoughtcrime.securesms.groups.v2.ProfileKeySet;
import org.thoughtcrime.securesms.jobmanager.JobManager;
import org.thoughtcrime.securesms.jobs.AvatarGroupsV2DownloadJob;
import org.thoughtcrime.securesms.jobs.RequestGroupV2InfoJob;
import org.thoughtcrime.securesms.jobs.RetrieveProfileJob;
import org.thoughtcrime.securesms.keyvalue.SignalStore;
import org.thoughtcrime.securesms.logging.Log;
@ -192,7 +193,8 @@ public final class GroupsV2StateProcessor {
GlobalGroupState remainingWork = advanceGroupStateResult.getNewGlobalGroupState();
if (remainingWork.getServerHistory().size() > 0) {
Log.i(TAG, String.format(Locale.US, "There are more revisions on the server for this group, not applying at this time, V[%d..%d]", newLocalState.getRevision() + 1, remainingWork.getLatestRevisionNumber()));
Log.i(TAG, String.format(Locale.US, "There are more revisions on the server for this group, scheduling for later, V[%d..%d]", newLocalState.getRevision() + 1, remainingWork.getLatestRevisionNumber()));
ApplicationDependencies.getJobManager().add(new RequestGroupV2InfoJob(groupId, remainingWork.getLatestRevisionNumber()));
}
return new GroupUpdateResult(GroupState.GROUP_UPDATED, newLocalState);

View File

@ -98,6 +98,7 @@ public final class JobManagerFactories {
put(RequestGroupInfoJob.KEY, new RequestGroupInfoJob.Factory());
put(ResumableUploadSpecJob.KEY, new ResumableUploadSpecJob.Factory());
put(StorageAccountRestoreJob.KEY, new StorageAccountRestoreJob.Factory());
put(RequestGroupV2InfoWorkerJob.KEY, new RequestGroupV2InfoWorkerJob.Factory());
put(RequestGroupV2InfoJob.KEY, new RequestGroupV2InfoJob.Factory());
put(WakeGroupV2Job.KEY, new WakeGroupV2Job.Factory());
put(GroupV2UpdateSelfProfileKeyJob.KEY, new GroupV2UpdateSelfProfileKeyJob.Factory());

View File

@ -222,12 +222,16 @@ public final class PushProcessMessageJob extends BaseJob {
this.timestamp = timestamp;
}
static @NonNull String getQueueName(@NonNull RecipientId recipientId) {
return QUEUE_PREFIX + recipientId.toQueueKey();
}
@WorkerThread
private static @NonNull Parameters createParameters(@Nullable SignalServiceContent content, @Nullable ExceptionMetadata exceptionMetadata) {
Context context = ApplicationDependencies.getApplication();
String queueSuffix = "";
Parameters.Builder builder = new Parameters.Builder()
.setMaxAttempts(Parameters.UNLIMITED);
Context context = ApplicationDependencies.getApplication();
String queueName = QUEUE_PREFIX;
Parameters.Builder builder = new Parameters.Builder()
.setMaxAttempts(Parameters.UNLIMITED);
if (content != null) {
if (content.getDataMessage().isPresent() && content.getDataMessage().get().getGroupContext().isPresent()) {
@ -236,7 +240,7 @@ public final class PushProcessMessageJob extends BaseJob {
GroupId groupId = GroupUtil.idFromGroupContext(signalServiceGroupContext);
Recipient recipient = Recipient.externalGroup(context, groupId);
queueSuffix = recipient.getId().toQueueKey();
queueName = getQueueName(recipient.getId());
if (groupId.isV2()) {
int localRevision = DatabaseFactory.getGroupDatabase(context)
@ -251,15 +255,15 @@ public final class PushProcessMessageJob extends BaseJob {
Log.w(TAG, "Bad groupId! Using default queue.");
}
} else {
queueSuffix = RecipientId.fromHighTrust(content.getSender()).toQueueKey();
queueName = getQueueName(RecipientId.fromHighTrust(content.getSender()));
}
} else if (exceptionMetadata != null) {
Recipient recipient = exceptionMetadata.groupId != null ? Recipient.externalGroup(context, exceptionMetadata.groupId)
: Recipient.external(context, exceptionMetadata.sender);
queueSuffix = recipient.getId().toQueueKey();
queueName = getQueueName(recipient.getId());
}
builder.setQueue(QUEUE_PREFIX + queueSuffix);
builder.setQueue(queueName);
return builder.build();
}

View File

@ -2,25 +2,17 @@ package org.thoughtcrime.securesms.jobs;
import androidx.annotation.NonNull;
import org.thoughtcrime.securesms.database.DatabaseFactory;
import org.thoughtcrime.securesms.database.GroupDatabase;
import org.thoughtcrime.securesms.groups.GroupChangeBusyException;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.groups.GroupId;
import org.thoughtcrime.securesms.groups.GroupManager;
import org.thoughtcrime.securesms.groups.GroupNotAMemberException;
import org.thoughtcrime.securesms.groups.v2.processing.GroupsV2StateProcessor;
import org.thoughtcrime.securesms.jobmanager.Data;
import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
import org.thoughtcrime.securesms.jobmanager.impl.WebsocketDrainedConstraint;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.util.FeatureFlags;
import org.whispersystems.libsignal.util.guava.Optional;
import org.whispersystems.signalservice.api.groupsv2.NoCredentialForRedemptionTimeException;
import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* Schedules a {@link RequestGroupV2InfoWorkerJob} to happen after message queues are drained.
*/
public final class RequestGroupV2InfoJob extends BaseJob {
public static final String KEY = "RequestGroupV2InfoJob";
@ -34,11 +26,13 @@ public final class RequestGroupV2InfoJob extends BaseJob {
private final GroupId.V2 groupId;
private final int toRevision;
/**
* Get a particular group state revision for group after message queues are drained.
*/
public RequestGroupV2InfoJob(@NonNull GroupId.V2 groupId, int toRevision) {
this(new Parameters.Builder()
.setQueue("RequestGroupV2InfoJob::" + groupId)
.addConstraint(NetworkConstraint.KEY)
.setLifespan(TimeUnit.DAYS.toMillis(1))
.setQueue("RequestGroupV2InfoSyncJob")
.addConstraint(WebsocketDrainedConstraint.KEY)
.setMaxAttempts(Parameters.UNLIMITED)
.build(),
groupId,
@ -46,7 +40,7 @@ public final class RequestGroupV2InfoJob extends BaseJob {
}
/**
* Get latest group state for group.
* Get latest group state for group after message queues are drained.
*/
public RequestGroupV2InfoJob(@NonNull GroupId.V2 groupId) {
this(groupId, GroupsV2StateProcessor.LATEST);
@ -72,29 +66,13 @@ public final class RequestGroupV2InfoJob extends BaseJob {
}
@Override
public void onRun() throws IOException, GroupNotAMemberException, GroupChangeBusyException {
if (!FeatureFlags.groupsV2()) {
Log.w(TAG, "Group update skipped due to feature flag " + groupId);
return;
}
Log.i(TAG, "Updating group to revision " + toRevision);
Optional<GroupDatabase.GroupRecord> group = DatabaseFactory.getGroupDatabase(context).getGroup(groupId);
if (!group.isPresent()) {
Log.w(TAG, "Group not found");
return;
}
GroupManager.updateGroupFromServer(context, group.get().requireV2GroupProperties().getGroupMasterKey(), toRevision, System.currentTimeMillis(), null);
public void onRun() {
ApplicationDependencies.getJobManager().add(new RequestGroupV2InfoWorkerJob(groupId, toRevision));
}
@Override
public boolean onShouldRetry(@NonNull Exception e) {
return e instanceof PushNetworkException ||
e instanceof NoCredentialForRedemptionTimeException ||
e instanceof GroupChangeBusyException;
return false;
}
@Override

View File

@ -0,0 +1,111 @@
package org.thoughtcrime.securesms.jobs;
import androidx.annotation.NonNull;
import androidx.annotation.WorkerThread;
import org.thoughtcrime.securesms.database.DatabaseFactory;
import org.thoughtcrime.securesms.database.GroupDatabase;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.groups.GroupChangeBusyException;
import org.thoughtcrime.securesms.groups.GroupId;
import org.thoughtcrime.securesms.groups.GroupManager;
import org.thoughtcrime.securesms.groups.GroupNotAMemberException;
import org.thoughtcrime.securesms.jobmanager.Data;
import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint;
import org.thoughtcrime.securesms.logging.Log;
import org.thoughtcrime.securesms.recipients.Recipient;
import org.thoughtcrime.securesms.util.FeatureFlags;
import org.whispersystems.libsignal.util.guava.Optional;
import org.whispersystems.signalservice.api.groupsv2.NoCredentialForRedemptionTimeException;
import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* Scheduled by {@link RequestGroupV2InfoJob} after message queues are drained.
*/
final class RequestGroupV2InfoWorkerJob extends BaseJob {
public static final String KEY = "RequestGroupV2InfoWorkerJob";
private static final String TAG = Log.tag(RequestGroupV2InfoWorkerJob.class);
private static final String KEY_GROUP_ID = "group_id";
private static final String KEY_TO_REVISION = "to_revision";
private final GroupId.V2 groupId;
private final int toRevision;
@WorkerThread
RequestGroupV2InfoWorkerJob(@NonNull GroupId.V2 groupId, int toRevision) {
this(new Parameters.Builder()
.setQueue(PushProcessMessageJob.getQueueName(Recipient.externalGroup(ApplicationDependencies.getApplication(), groupId).getId()))
.addConstraint(NetworkConstraint.KEY)
.setLifespan(TimeUnit.DAYS.toMillis(1))
.setMaxAttempts(Parameters.UNLIMITED)
.build(),
groupId,
toRevision);
}
private RequestGroupV2InfoWorkerJob(@NonNull Parameters parameters, @NonNull GroupId.V2 groupId, int toRevision) {
super(parameters);
this.groupId = groupId;
this.toRevision = toRevision;
}
@Override
public @NonNull Data serialize() {
return new Data.Builder().putString(KEY_GROUP_ID, groupId.toString())
.putInt(KEY_TO_REVISION, toRevision)
.build();
}
@Override
public @NonNull String getFactoryKey() {
return KEY;
}
@Override
public void onRun() throws IOException, GroupNotAMemberException, GroupChangeBusyException {
if (!FeatureFlags.groupsV2()) {
Log.w(TAG, "Group update skipped due to feature flag " + groupId);
return;
}
Log.i(TAG, "Updating group to revision " + toRevision);
Optional<GroupDatabase.GroupRecord> group = DatabaseFactory.getGroupDatabase(context).getGroup(groupId);
if (!group.isPresent()) {
Log.w(TAG, "Group not found");
return;
}
GroupManager.updateGroupFromServer(context, group.get().requireV2GroupProperties().getGroupMasterKey(), toRevision, System.currentTimeMillis(), null);
}
@Override
public boolean onShouldRetry(@NonNull Exception e) {
return e instanceof PushNetworkException ||
e instanceof NoCredentialForRedemptionTimeException ||
e instanceof GroupChangeBusyException;
}
@Override
public void onFailure() {
}
public static final class Factory implements Job.Factory<RequestGroupV2InfoWorkerJob> {
@Override
public @NonNull RequestGroupV2InfoWorkerJob create(@NonNull Parameters parameters, @NonNull Data data) {
return new RequestGroupV2InfoWorkerJob(parameters,
GroupId.parseOrThrow(data.getString(KEY_GROUP_ID)).requireV2(),
data.getInt(KEY_TO_REVISION));
}
}
}