Skip to content

Commit f5727e0

Browse files
authored
Merge pull request #794 from apache/master
[pull] master from apache:master
2 parents dde4484 + e42a221 commit f5727e0

30 files changed

Lines changed: 362 additions & 162 deletions

File tree

mailbox/api/src/main/java/org/apache/james/mailbox/events/MailboxEvents.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import java.util.Optional;
2828
import java.util.SortedMap;
2929

30+
import jakarta.mail.Flags;
31+
3032
import org.apache.james.core.Username;
3133
import org.apache.james.core.quota.QuotaCountLimit;
3234
import org.apache.james.core.quota.QuotaCountUsage;
@@ -537,7 +539,7 @@ public final int hashCode() {
537539
}
538540

539541
record MessageContentDeletionEvent(EventId eventId, Username username, MailboxId mailboxId, MessageId messageId, long size,
540-
Instant internalDate, boolean hasAttachments, Optional<String> headerBlobId, Optional<String> headerContent,
542+
Instant internalDate, Flags flags, boolean hasAttachments, Optional<String> headerBlobId, Optional<String> headerContent,
541543
String bodyBlobId) implements Event {
542544

543545
@Override

mailbox/api/src/main/java/org/apache/james/mailbox/model/UpdatedFlags.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.james.mailbox.model;
2121

2222
import java.util.Arrays;
23+
import java.util.Date;
2324
import java.util.Iterator;
2425
import java.util.List;
2526
import java.util.Objects;
@@ -50,6 +51,7 @@ public static class Builder {
5051
private Flags oldFlags;
5152
private Flags newFlags;
5253
private Optional<ModSeq> modSeq = Optional.empty();
54+
private Optional<Date> internalDate = Optional.empty();
5355

5456
private Builder() {
5557
}
@@ -84,12 +86,22 @@ public Builder modSeq(ModSeq modSeq) {
8486
return this;
8587
}
8688

89+
public Builder internalDate(Date internalDate) {
90+
this.internalDate = Optional.of(internalDate);
91+
return this;
92+
}
93+
94+
public Builder internalDate(Optional<Date> internalDate) {
95+
this.internalDate = internalDate;
96+
return this;
97+
}
98+
8799
public UpdatedFlags build() {
88100
Preconditions.checkNotNull(uid);
89101
Preconditions.checkNotNull(newFlags);
90102
Preconditions.checkNotNull(oldFlags);
91103
Preconditions.checkState(modSeq.isPresent());
92-
return new UpdatedFlags(uid, messageId, modSeq.get(), oldFlags, newFlags);
104+
return new UpdatedFlags(uid, messageId, modSeq.get(), oldFlags, newFlags, internalDate);
93105
}
94106
}
95107

@@ -146,13 +158,18 @@ private static boolean isChanged(Flags original, Flags updated, String userFlag)
146158
private final Flags newFlags;
147159
private final Flags modifiedFlags;
148160
private final ModSeq modSeq;
161+
/**
162+
* The usage of Optional here is for backward compatibility (to be able to still dequeue older events)
163+
*/
164+
private final Optional<Date> internalDate;
149165

150-
private UpdatedFlags(MessageUid uid, Optional<MessageId> messageId, ModSeq modSeq, Flags oldFlags, Flags newFlags) {
166+
private UpdatedFlags(MessageUid uid, Optional<MessageId> messageId, ModSeq modSeq, Flags oldFlags, Flags newFlags, Optional<Date> internalDate) {
151167
this.uid = uid;
152-
this.messageId = messageId;
168+
this.messageId = Optional.ofNullable(messageId).orElse(Optional.empty());
153169
this.modSeq = modSeq;
154170
this.oldFlags = oldFlags;
155171
this.newFlags = newFlags;
172+
this.internalDate = Optional.ofNullable(internalDate).orElse(Optional.empty());
156173
this.modifiedFlags = new Flags();
157174
addModifiedSystemFlags(oldFlags, newFlags, modifiedFlags);
158175
addModifiedUserFlags(oldFlags, newFlags, modifiedFlags);
@@ -203,6 +220,10 @@ public Optional<MessageId> getMessageId() {
203220
return messageId;
204221
}
205222

223+
public Optional<Date> getInternalDate() {
224+
return internalDate;
225+
}
226+
206227
/**
207228
* Gets an iterator for the system flags changed.
208229
*
@@ -272,12 +293,13 @@ public final boolean equals(Object other) {
272293
Objects.equals(messageId, that.messageId) &&
273294
Objects.equals(oldFlags, that.oldFlags) &&
274295
Objects.equals(newFlags, that.newFlags) &&
275-
Objects.equals(modSeq, that.modSeq);
296+
Objects.equals(modSeq, that.modSeq) &&
297+
Objects.equals(internalDate, that.internalDate);
276298
}
277299

278300
@Override
279301
public final int hashCode() {
280-
return Objects.hash(uid, messageId, oldFlags, newFlags, modSeq);
302+
return Objects.hash(uid, messageId, oldFlags, newFlags, modSeq, internalDate);
281303
}
282304

283305
@Override
@@ -288,6 +310,7 @@ public String toString() {
288310
.add("oldFlags", oldFlags)
289311
.add("newFlags", newFlags)
290312
.add("modSeq", modSeq)
313+
.add("internalDate", internalDate)
291314
.toString();
292315
}
293316
}

mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import jakarta.inject.Inject;
2929
import jakarta.inject.Named;
30+
import jakarta.mail.Flags;
3031

3132
import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
3233
import org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles;
@@ -169,7 +170,7 @@ private Mono<Void> handleMailboxDeletion(CassandraId mailboxId, MailboxPath path
169170
return Flux.mergeDelayError(prefetch,
170171
messageIdDAO.retrieveMessages(mailboxId, MessageRange.all(), Limit.unlimited())
171172
.concatMap(metadata -> handleMessageDeletionAsPartOfMailboxDeletion((CassandraMessageId) metadata.getComposedMessageId().getComposedMessageId().getMessageId(),
172-
metadata.getComposedMessageId().getThreadId(), mailboxId, path.getUser())
173+
metadata.getComposedMessageId().getThreadId(), metadata.getComposedMessageId().getFlags(), mailboxId, path.getUser())
173174
.then(imapUidDAO.delete((CassandraMessageId) metadata.getComposedMessageId().getComposedMessageId().getMessageId(), mailboxId))
174175
.then(messageIdDAO.delete(mailboxId, metadata.getComposedMessageId().getComposedMessageId().getUid()))),
175176
deleteAcl(mailboxId),
@@ -184,7 +185,7 @@ private Mono<Void> handleMailboxDeletion(CassandraId mailboxId, MailboxPath path
184185
private Mono<Void> handleMessageDeletion(Expunged expunged) {
185186
return Flux.fromIterable(expunged.getExpunged().values())
186187
.concatMap(metaData -> handleMessageDeletion((CassandraMessageId) metaData.getMessageId(),
187-
expunged.getMailboxId(), metaData.getThreadId(), expunged.getMailboxPath().getUser()))
188+
expunged.getMailboxId(), metaData.getThreadId(), metaData.getFlags(), expunged.getMailboxPath().getUser()))
188189
.then();
189190
}
190191

@@ -194,11 +195,11 @@ private Mono<Void> deleteAcl(CassandraId mailboxId) {
194195
.then(aclMapper.delete(mailboxId)));
195196
}
196197

197-
private Mono<Void> handleMessageDeletion(CassandraMessageId messageId, MailboxId mailboxId, ThreadId threadId, Username owner) {
198+
private Mono<Void> handleMessageDeletion(CassandraMessageId messageId, MailboxId mailboxId, ThreadId threadId, Flags flags, Username owner) {
198199
return Mono.just(messageId)
199200
.filterWhen(this::isReferenced)
200201
.flatMap(id -> readMessage(id)
201-
.flatMap(message -> dispatchMessageContentDeletionEvent(mailboxId, owner, message)
202+
.flatMap(message -> dispatchMessageContentDeletionEvent(mailboxId, owner, flags, message)
202203
.thenReturn(message))
203204
.flatMap(message -> deleteUnreferencedAttachments(message).thenReturn(message))
204205
.flatMap(this::deleteMessageBlobs)
@@ -209,7 +210,7 @@ private Mono<Void> handleMessageDeletion(CassandraMessageId messageId, MailboxId
209210
.then(threadLookupDAO.deleteOneRow(threadId, messageId)));
210211
}
211212

212-
private Mono<Void> dispatchMessageContentDeletionEvent(MailboxId mailboxId, Username owner, MessageRepresentation message) {
213+
private Mono<Void> dispatchMessageContentDeletionEvent(MailboxId mailboxId, Username owner, Flags flags, MessageRepresentation message) {
213214
AuditTrail.entry()
214215
.action("DELETION")
215216
.username(owner::asString)
@@ -226,18 +227,19 @@ private Mono<Void> dispatchMessageContentDeletionEvent(MailboxId mailboxId, User
226227
.messageId(message.getMessageId())
227228
.size(message.getSize())
228229
.instant(message.getInternalDate().toInstant())
230+
.flags(flags)
229231
.hasAttachments(!message.getAttachments().isEmpty())
230232
.bodyBlobId(message.getBodyId().asString())
231233
.headerBlobId(message.getHeaderId().asString())
232234
.build(),
233235
ImmutableSet.of()));
234236
}
235237

236-
private Mono<Void> handleMessageDeletionAsPartOfMailboxDeletion(CassandraMessageId messageId, ThreadId threadId, CassandraId excludedId, Username owner) {
238+
private Mono<Void> handleMessageDeletionAsPartOfMailboxDeletion(CassandraMessageId messageId, ThreadId threadId, Flags flags, CassandraId excludedId, Username owner) {
237239
return Mono.just(messageId)
238240
.filterWhen(id -> isReferenced(id, excludedId))
239241
.flatMap(id -> readMessage(id)
240-
.flatMap(message -> dispatchMessageContentDeletionEvent(excludedId, owner, message)
242+
.flatMap(message -> dispatchMessageContentDeletionEvent(excludedId, owner, flags, message)
241243
.thenReturn(message)))
242244
.flatMap(message -> deleteUnreferencedAttachments(message).thenReturn(message))
243245
.flatMap(this::deleteMessageBlobs)

mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -299,11 +299,13 @@ private Flux<Pair<MailboxId, UpdatedFlags>> flagsUpdateWithRetry(Flags newState,
299299
.map(pair -> buildUpdatedFlags(pair.getRight(), pair.getLeft()));
300300
}
301301

302-
private Pair<MailboxId, UpdatedFlags> buildUpdatedFlags(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, Flags oldFlags) {
302+
private Pair<MailboxId, UpdatedFlags> buildUpdatedFlags(CassandraMessageMetadata metadata, Flags oldFlags) {
303+
ComposedMessageIdWithMetaData composedMessageIdWithMetaData = metadata.getComposedMessageId();
303304
return Pair.of(composedMessageIdWithMetaData.getComposedMessageId().getMailboxId(),
304305
UpdatedFlags.builder()
305306
.uid(composedMessageIdWithMetaData.getComposedMessageId().getUid())
306307
.messageId(composedMessageIdWithMetaData.getComposedMessageId().getMessageId())
308+
.internalDate(metadata.getInternalDate())
307309
.modSeq(composedMessageIdWithMetaData.getModSeq())
308310
.oldFlags(oldFlags)
309311
.newFlags(composedMessageIdWithMetaData.getFlags())
@@ -316,35 +318,45 @@ private Mono<Pair<MailboxId, UpdatedFlags>> updateCounts(Pair<MailboxId, Updated
316318
.thenReturn(pair);
317319
}
318320

319-
private Mono<List<Pair<Flags, ComposedMessageIdWithMetaData>>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) {
321+
private Mono<List<Pair<Flags, CassandraMessageMetadata>>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) {
320322
CassandraId cassandraId = (CassandraId) mailboxId;
321323
return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId), chooseReadConsistencyUponWrites())
322-
.map(CassandraMessageMetadata::getComposedMessageId)
323-
.flatMap(oldComposedId -> updateFlags(newState, updateMode, cassandraId, oldComposedId), ReactorUtils.DEFAULT_CONCURRENCY)
324+
.flatMap(oldMetadata -> updateFlags(newState, updateMode, cassandraId, oldMetadata), ReactorUtils.DEFAULT_CONCURRENCY)
324325
.switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new))
325326
.collectList();
326327
}
327328

328-
private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(Flags newState, MessageManager.FlagsUpdateMode updateMode, CassandraId cassandraId, ComposedMessageIdWithMetaData oldComposedId) {
329+
private Mono<Pair<Flags, CassandraMessageMetadata>> updateFlags(Flags newState, MessageManager.FlagsUpdateMode updateMode, CassandraId cassandraId, CassandraMessageMetadata oldMetadata) {
330+
ComposedMessageIdWithMetaData oldComposedId = oldMetadata.getComposedMessageId();
329331
Flags newFlags = new FlagsUpdateCalculator(newState, updateMode).buildNewFlags(oldComposedId.getFlags());
330332
if (identicalFlags(oldComposedId, newFlags)) {
331-
return Mono.just(Pair.of(oldComposedId.getFlags(), oldComposedId));
333+
return Mono.just(Pair.of(oldComposedId.getFlags(), oldMetadata));
332334
} else {
333335
return modSeqProvider.nextModSeqReactive(cassandraId)
334336
.map(modSeq -> new ComposedMessageIdWithMetaData(
335337
oldComposedId.getComposedMessageId(),
336338
newFlags,
337339
modSeq,
338340
oldComposedId.getThreadId()))
339-
.flatMap(newComposedId -> updateFlags(oldComposedId, newComposedId));
341+
.map(newComposedId -> CassandraMessageMetadata.builder()
342+
.ids(newComposedId)
343+
.internalDate(oldMetadata.getInternalDate())
344+
.saveDate(oldMetadata.getSaveDate())
345+
.bodyStartOctet(oldMetadata.getBodyStartOctet())
346+
.size(oldMetadata.getSize())
347+
.headerContent(oldMetadata.getHeaderContent())
348+
.build())
349+
.flatMap(newMetadata -> updateFlags(oldMetadata, newMetadata));
340350
}
341351
}
342352

343353
private boolean identicalFlags(ComposedMessageIdWithMetaData oldComposedId, Flags newFlags) {
344354
return oldComposedId.getFlags().equals(newFlags);
345355
}
346356

347-
private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(ComposedMessageIdWithMetaData oldComposedId, ComposedMessageIdWithMetaData newComposedId) {
357+
private Mono<Pair<Flags, CassandraMessageMetadata>> updateFlags(CassandraMessageMetadata oldMetadata, CassandraMessageMetadata newMetadata) {
358+
ComposedMessageIdWithMetaData oldComposedId = oldMetadata.getComposedMessageId();
359+
ComposedMessageIdWithMetaData newComposedId = newMetadata.getComposedMessageId();
348360
ComposedMessageId composedMessageId = newComposedId.getComposedMessageId();
349361
ModSeq previousModseq = oldComposedId.getModSeq();
350362
UpdatedFlags updatedFlags = UpdatedFlags.builder()
@@ -353,12 +365,13 @@ private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(ComposedMes
353365
.oldFlags(oldComposedId.getFlags())
354366
.newFlags(newComposedId.getFlags())
355367
.uid(composedMessageId.getUid())
368+
.internalDate(newMetadata.getInternalDate())
356369
.build();
357370

358371
return imapUidDAO.updateMetadata(composedMessageId, updatedFlags, previousModseq)
359372
.filter(FunctionalUtils.identityPredicate())
360373
.flatMap(any -> messageIdDAO.updateMetadata(composedMessageId, updatedFlags)
361-
.thenReturn(Pair.of(oldComposedId.getFlags(), newComposedId)))
374+
.thenReturn(Pair.of(oldComposedId.getFlags(), newMetadata)))
362375
.single();
363376
}
364377
}

mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -652,37 +652,49 @@ private Mono<FlagsUpdateStageResult> tryFlagsUpdate(FlagsUpdateCalculator flagUp
652652
Flags oldFlags = oldMetaData.getFlags();
653653
Flags newFlags = flagUpdateCalculator.buildNewFlags(oldFlags);
654654

655-
if (identicalFlags(oldFlags, newFlags)) {
656-
return Mono.just(FlagsUpdateStageResult.success(UpdatedFlags.builder()
657-
.uid(oldMetaData.getComposedMessageId().getUid())
658-
.messageId(oldMetaData.getComposedMessageId().getMessageId())
659-
.modSeq(oldMetaData.getModSeq())
660-
.oldFlags(oldFlags)
661-
.newFlags(newFlags)
662-
.build()));
663-
}
664-
665-
return updateFlags(oldMetaData, newFlags, newModSeq)
666-
.map(success -> {
667-
if (success) {
668-
return FlagsUpdateStageResult.success(UpdatedFlags.builder()
655+
return retrieveInternalDate(oldMetaData)
656+
.flatMap(internalDate -> {
657+
if (identicalFlags(oldFlags, newFlags)) {
658+
return Mono.just(FlagsUpdateStageResult.success(UpdatedFlags.builder()
669659
.uid(oldMetaData.getComposedMessageId().getUid())
670660
.messageId(oldMetaData.getComposedMessageId().getMessageId())
671-
.modSeq(newModSeq)
661+
.internalDate(internalDate)
662+
.modSeq(oldMetaData.getModSeq())
672663
.oldFlags(oldFlags)
673664
.newFlags(newFlags)
674-
.build());
675-
} else {
676-
return FlagsUpdateStageResult.fail(oldMetaData.getComposedMessageId());
665+
.build()));
677666
}
667+
668+
return updateFlags(oldMetaData, newFlags, newModSeq, internalDate)
669+
.map(success -> {
670+
if (success) {
671+
return FlagsUpdateStageResult.success(UpdatedFlags.builder()
672+
.uid(oldMetaData.getComposedMessageId().getUid())
673+
.messageId(oldMetaData.getComposedMessageId().getMessageId())
674+
.internalDate(internalDate)
675+
.modSeq(newModSeq)
676+
.oldFlags(oldFlags)
677+
.newFlags(newFlags)
678+
.build());
679+
} else {
680+
return FlagsUpdateStageResult.fail(oldMetaData.getComposedMessageId());
681+
}
682+
});
678683
});
679684
}
680685

681686
private boolean identicalFlags(Flags oldFlags, Flags newFlags) {
682687
return oldFlags.equals(newFlags);
683688
}
684689

685-
private Mono<Boolean> updateFlags(ComposedMessageIdWithMetaData oldMetadata, Flags newFlags, ModSeq newModSeq) {
690+
private Mono<Optional<Date>> retrieveInternalDate(ComposedMessageIdWithMetaData oldMetaData) {
691+
CassandraId mailboxId = (CassandraId) oldMetaData.getComposedMessageId().getMailboxId();
692+
MessageUid uid = oldMetaData.getComposedMessageId().getUid();
693+
return messageIdDAO.retrieve(mailboxId, uid)
694+
.map(cassandraMessageMetadata -> cassandraMessageMetadata.flatMap(CassandraMessageMetadata::getInternalDate));
695+
}
696+
697+
private Mono<Boolean> updateFlags(ComposedMessageIdWithMetaData oldMetadata, Flags newFlags, ModSeq newModSeq, Optional<Date> internalDate) {
686698
ComposedMessageIdWithMetaData newMetadata = ComposedMessageIdWithMetaData.builder()
687699
.composedMessageId(oldMetadata.getComposedMessageId())
688700
.modSeq(newModSeq)
@@ -694,6 +706,7 @@ private Mono<Boolean> updateFlags(ComposedMessageIdWithMetaData oldMetadata, Fla
694706
ModSeq previousModseq = oldMetadata.getModSeq();
695707
UpdatedFlags updatedFlags = UpdatedFlags.builder()
696708
.messageId(composedMessageId.getMessageId())
709+
.internalDate(internalDate)
697710
.modSeq(newMetadata.getModSeq())
698711
.oldFlags(oldMetadata.getFlags())
699712
.newFlags(newMetadata.getFlags())

mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ public Mono<Task.Result> fix(Context context, CassandraMessageIdToImapUidDAO ima
127127
.modSeq(id.getModSeq())
128128
.messageId(id.getComposedMessageId().getMessageId())
129129
.uid(id.getComposedMessageId().getUid())
130+
.internalDate(messageFromImapUid.getInternalDate())
130131
.build())
131132
.doOnSuccess(any -> notifySuccess(context))
132133
.thenReturn(Task.Result.COMPLETED)

0 commit comments

Comments
 (0)