Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
mongo java driver 3.4.2 support
  • Loading branch information
ipoletti committed Jul 7, 2017
commit 7f6b9c627c16b0b8eabaec7572284000a9825d3e
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
Java message queue using MongoDB as a backend
Adheres to the 1.0.0 [specification](https://github.com/dominionenterprises/mongo-queue-specification).

###This a forked version from https://github.com/gaillard/mongo-queue-java Compatible with mongo-java-driver 3.

##Features

* Message selection and/or count via MongoDB query
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>2.11.2</version>
<version>3.4.2</version>
</dependency>
</dependencies>

Expand Down
128 changes: 71 additions & 57 deletions src/main/java/gaillard/mongo/Queue.java
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
package gaillard.mongo;

import com.mongodb.BasicDBObject;
import com.mongodb.CommandFailureException;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import java.util.Calendar;
import java.util.Date;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.UUID;

import org.bson.Document;
import org.bson.types.ObjectId;

import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.FindOneAndUpdateOptions;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.ReturnDocument;
import com.mongodb.client.model.Sorts;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.Updates;

public final class Queue {

private final DBCollection collection;
private final MongoCollection<Document> collection;

public Queue(final DBCollection collection) {
public Queue(final MongoCollection<Document> collection) {
Objects.requireNonNull(collection);

this.collection = collection;
Expand All @@ -25,16 +32,16 @@ public Queue(final DBCollection collection) {
* Ensure index for get() method with no fields before or after sort fields
*/
public void ensureGetIndex() {
ensureGetIndex(new BasicDBObject());
ensureGetIndex(new Document());
}

/**
* Ensure index for get() method with no fields after sort fields
*
* @param beforeSort fields in get() call that should be before the sort fields in the index. Should not be null
*/
public void ensureGetIndex(final BasicDBObject beforeSort) {
ensureGetIndex(beforeSort, new BasicDBObject());
public void ensureGetIndex(final Document beforeSort) {
ensureGetIndex(beforeSort, new Document());
}

/**
Expand All @@ -43,12 +50,12 @@ public void ensureGetIndex(final BasicDBObject beforeSort) {
* @param beforeSort fields in get() call that should be before the sort fields in the index. Should not be null
* @param afterSort fields in get() call that should be after the sort fields in the index. Should not be null
*/
public void ensureGetIndex(final BasicDBObject beforeSort, final BasicDBObject afterSort) {
public void ensureGetIndex(final Document beforeSort, final Document afterSort) {
Objects.requireNonNull(beforeSort);
Objects.requireNonNull(afterSort);

//using general rule: equality, sort, range or more equality tests in that order for index
final BasicDBObject completeIndex = new BasicDBObject("running", 1);
final Document completeIndex = new Document("running", 1);

for (final Entry<String, Object> field : beforeSort.entrySet()) {
if (!Objects.equals(field.getValue(), 1) && !Objects.equals(field.getValue(), -1)) {
Expand All @@ -71,7 +78,7 @@ public void ensureGetIndex(final BasicDBObject beforeSort, final BasicDBObject a
completeIndex.append("earliestGet", 1);

ensureIndex(completeIndex);//main query in Get()
ensureIndex(new BasicDBObject("running", 1).append("resetTimestamp", 1));//for the stuck messages query in Get()
ensureIndex(new Document("running", 1).append("resetTimestamp", 1));//for the stuck messages query in Get()
}

/**
Expand All @@ -80,10 +87,10 @@ public void ensureGetIndex(final BasicDBObject beforeSort, final BasicDBObject a
* @param index fields in count() call. Should not be null
* @param includeRunning whether running was given to count() or not
*/
public void ensureCountIndex(final BasicDBObject index, final boolean includeRunning) {
public void ensureCountIndex(final Document index, final boolean includeRunning) {
Objects.requireNonNull(index);

final BasicDBObject completeIndex = new BasicDBObject();
final Document completeIndex = new Document();

if (includeRunning) {
completeIndex.append("running", 1);
Expand All @@ -108,7 +115,7 @@ public void ensureCountIndex(final BasicDBObject index, final boolean includeRun
* @param resetDuration duration in seconds before this message is considered abandoned and will be given with another call to get()
* @return message or null
*/
public BasicDBObject get(final BasicDBObject query, final int resetDuration) {
public Document get(final Document query, final int resetDuration) {
return get(query, resetDuration, 3000, 200);
}

Expand All @@ -121,7 +128,7 @@ public BasicDBObject get(final BasicDBObject query, final int resetDuration) {
* @param waitDuration duration in milliseconds to keep polling before returning null
* @return message or null
*/
public BasicDBObject get(final BasicDBObject query, final int resetDuration, final int waitDuration) {
public Document get(final Document query, final int resetDuration, final int waitDuration) {
return get(query, resetDuration, waitDuration, 200);
}

Expand All @@ -135,40 +142,49 @@ public BasicDBObject get(final BasicDBObject query, final int resetDuration, fin
* @param pollDuration duration in milliseconds between poll attempts
* @return message or null
*/
public BasicDBObject get(final BasicDBObject query, final int resetDuration, final int waitDuration, long pollDuration) {
public Document get(final Document query, final int resetDuration, final int waitDuration, long pollDuration) {
Objects.requireNonNull(query);

//reset stuck messages
collection.update(new BasicDBObject("running", true).append("resetTimestamp", new BasicDBObject("$lte", new Date())),
new BasicDBObject("$set", new BasicDBObject("running", false)),
false,
true);

final BasicDBObject builtQuery = new BasicDBObject("running", false);
// collection.update(new Document("running", true).append("resetTimestamp", new Document("$lte", new Date())),
// new Document("$set", new Document("running", false)),
// false,
// true);

collection.updateMany(Filters.and(Filters.eq("running", true),Filters.lte("resetTimestamp", new Date())),
Updates.set("running", false),
new UpdateOptions().upsert(false));


final Document builtQuery = new Document("running", false);
for (final Entry<String, Object> field : query.entrySet()) {
builtQuery.append("payload." + field.getKey(), field.getValue());
}

builtQuery.append("earliestGet", new BasicDBObject("$lte", new Date()));
builtQuery.append("earliestGet", new Document("$lte", new Date()));

final Calendar calendar = Calendar.getInstance();

calendar.add(Calendar.SECOND, resetDuration);
final Date resetTimestamp = calendar.getTime();

final BasicDBObject sort = new BasicDBObject("priority", 1).append("created", 1);
final BasicDBObject update = new BasicDBObject("$set", new BasicDBObject("running", true).append("resetTimestamp", resetTimestamp));
final BasicDBObject fields = new BasicDBObject("payload", 1);
// final Document sort = new Document("priority", 1).append("created", 1);
final Document update = new Document("$set", new Document("running", true).append("resetTimestamp", resetTimestamp));
// final Document fields = new Document("payload", 1);

calendar.setTimeInMillis(System.currentTimeMillis());
calendar.add(Calendar.MILLISECOND, waitDuration);
final Date end = calendar.getTime();

while (true) {
final BasicDBObject message = (BasicDBObject) collection.findAndModify(builtQuery, fields, sort, false, update, true, false);
// final Document message = (Document) collection.findAndModify(builtQuery, fields, sort, false, update, true, false);
final Document message = collection.findOneAndUpdate(builtQuery, update, new FindOneAndUpdateOptions().sort(Sorts.ascending("priority","created")).returnDocument(ReturnDocument.AFTER));
if (message != null) {
final ObjectId id = message.getObjectId("_id");
return ((BasicDBObject) message.get("payload")).append("id", id);
Document d = ((Document)message.get("payload")).append("id", id);
Document Document = new Document();
Document.putAll(d);
return Document;
}

if (new Date().compareTo(end) >= 0) {
Expand All @@ -192,10 +208,10 @@ public BasicDBObject get(final BasicDBObject query, final int resetDuration, fin
* invalid {$and: [{...}, {...}]}. Should not be null
* @return count
*/
public long count(final BasicDBObject query) {
public long count(final Document query) {
Objects.requireNonNull(query);

final BasicDBObject completeQuery = new BasicDBObject();
final Document completeQuery = new Document();

for (final Entry<String, Object> field : query.entrySet()) {
completeQuery.append("payload." + field.getKey(), field.getValue());
Expand All @@ -212,10 +228,10 @@ public long count(final BasicDBObject query) {
* @param running count running messages or not running
* @return count
*/
public long count(final BasicDBObject query, final boolean running) {
public long count(final Document query, final boolean running) {
Objects.requireNonNull(query);

final BasicDBObject completeQuery = new BasicDBObject("running", running);
final Document completeQuery = new Document("running", running);

for (final Entry<String, Object> field : query.entrySet()) {
completeQuery.append("payload." + field.getKey(), field.getValue());
Expand All @@ -229,14 +245,14 @@ public long count(final BasicDBObject query, final boolean running) {
*
* @param message message received from get(). Should not be null.
*/
public void ack(final BasicDBObject message) {
public void ack(final Document message) {
Objects.requireNonNull(message);
final Object id = message.get("id");
if (id.getClass() != ObjectId.class) {
throw new IllegalArgumentException("id must be an ObjectId");
}

collection.remove(new BasicDBObject("_id", id));
collection.deleteOne(new Document("_id", id));
}

/**
Expand All @@ -245,7 +261,7 @@ public void ack(final BasicDBObject message) {
* @param message message to ack received from get(). Should not be null
* @param payload payload to send. Should not be null
*/
public void ackSend(final BasicDBObject message, final BasicDBObject payload) {
public void ackSend(final Document message, final Document payload) {
ackSend(message, payload, new Date());
}

Expand All @@ -256,7 +272,7 @@ public void ackSend(final BasicDBObject message, final BasicDBObject payload) {
* @param payload payload to send. Should not be null
* @param earliestGet earliest instant that a call to get() can return message. Should not be null
*/
public void ackSend(final BasicDBObject message, final BasicDBObject payload, final Date earliestGet) {
public void ackSend(final Document message, final Document payload, final Date earliestGet) {
ackSend(message, payload, earliestGet, 0.0);
}

Expand All @@ -268,7 +284,7 @@ public void ackSend(final BasicDBObject message, final BasicDBObject payload, fi
* @param earliestGet earliest instant that a call to get() can return message. Should not be null
* @param priority priority for order out of get(). 0 is higher priority than 1. Should not be NaN
*/
public void ackSend(final BasicDBObject message, final BasicDBObject payload, final Date earliestGet, final double priority) {
public void ackSend(final Document message, final Document payload, final Date earliestGet, final double priority) {
Objects.requireNonNull(message);
Objects.requireNonNull(payload);
Objects.requireNonNull(earliestGet);
Expand All @@ -281,23 +297,24 @@ public void ackSend(final BasicDBObject message, final BasicDBObject payload, fi
throw new IllegalArgumentException("id must be an ObjectId");
}

final BasicDBObject newMessage = new BasicDBObject("payload", payload)
final Document newMessage = new Document("payload", payload)
.append("running", false)
.append("resetTimestamp", new Date(Long.MAX_VALUE))
.append("earliestGet", earliestGet)
.append("priority", priority)
.append("created", new Date());

//using upsert because if no documents found then the doc was removed (SHOULD ONLY HAPPEN BY SOMEONE MANUALLY) so we can just send
collection.update(new BasicDBObject("_id", id), newMessage, true, false);
// collection.update(new Document("_id", id), newMessage, true, false);
collection.replaceOne(new Document("_id", id), newMessage, new UpdateOptions().upsert(true));
}

/**
* Requeue message with earliestGet as Now and 0.0 priority. Same as ackSend() with the same message.
*
* @param message message to requeue received from get(). Should not be null
*/
public void requeue(final BasicDBObject message) {
public void requeue(final Document message) {
requeue(message, new Date());
}

Expand All @@ -307,7 +324,7 @@ public void requeue(final BasicDBObject message) {
* @param message message to requeue received from get(). Should not be null
* @param earliestGet earliest instant that a call to get() can return message. Should not be null
*/
public void requeue(final BasicDBObject message, final Date earliestGet) {
public void requeue(final Document message, final Date earliestGet) {
requeue(message, earliestGet, 0.0);
}

Expand All @@ -318,7 +335,7 @@ public void requeue(final BasicDBObject message, final Date earliestGet) {
* @param earliestGet earliest instant that a call to get() can return message. Should not be null
* @param priority priority for order out of get(). 0 is higher priority than 1. Should not be NaN
*/
public void requeue(final BasicDBObject message, final Date earliestGet, final double priority) {
public void requeue(final Document message, final Date earliestGet, final double priority) {
Objects.requireNonNull(message);
Objects.requireNonNull(earliestGet);
if (Double.isNaN(priority)) {
Expand All @@ -330,8 +347,8 @@ public void requeue(final BasicDBObject message, final Date earliestGet, final d
throw new IllegalArgumentException("id must be an ObjectId");
}

final BasicDBObject forRequeue = new BasicDBObject(message);
forRequeue.removeField("id");
final Document forRequeue = new Document(message);
forRequeue.remove("id");
ackSend(message, forRequeue, earliestGet, priority);
}

Expand All @@ -340,7 +357,7 @@ public void requeue(final BasicDBObject message, final Date earliestGet, final d
*
* @param payload payload. Should not be null
*/
public void send(final BasicDBObject payload) {
public void send(final Document payload) {
send(payload, new Date());
}

Expand All @@ -350,7 +367,7 @@ public void send(final BasicDBObject payload) {
* @param payload payload. Should not be null
* @param earliestGet earliest instant that a call to Get() can return message. Should not be null
*/
public void send(final BasicDBObject payload, final Date earliestGet) {
public void send(final Document payload, final Date earliestGet) {
send(payload, earliestGet, 0.0);
}

Expand All @@ -361,37 +378,34 @@ public void send(final BasicDBObject payload, final Date earliestGet) {
* @param earliestGet earliest instant that a call to Get() can return message. Should not be null
* @param priority priority for order out of Get(). 0 is higher priority than 1. Should not be NaN
*/
public void send(final BasicDBObject payload, final Date earliestGet, final double priority) {
public void send(final Document payload, final Date earliestGet, final double priority) {
Objects.requireNonNull(payload);
Objects.requireNonNull(earliestGet);
if (Double.isNaN(priority)) {
throw new IllegalArgumentException("priority was NaN");
}

final BasicDBObject message = new BasicDBObject("payload", payload)
final Document message = new Document("payload", payload)
.append("running", false)
.append("resetTimestamp", new Date(Long.MAX_VALUE))
.append("earliestGet", earliestGet)
.append("priority", priority)
.append("created", new Date());

collection.insert(message);
collection.insertOne(message);
}

private void ensureIndex(final BasicDBObject index) {
private void ensureIndex(final Document index) {
for (int i = 0; i < 5; ++i) {
for (String name = UUID.randomUUID().toString(); name.length() > 0; name = name.substring(0, name.length() - 1)) {
//creating an index with the same name and different spec does nothing.
//creating an index with different name and same spec does nothing.
//so we use any generated name, and then find the right spec after we have called, and just go with that name.

try {
collection.ensureIndex(index, new BasicDBObject("name", name).append("background", true));
} catch (final CommandFailureException e) {
//happens when name is too long
}
// collection.ensureIndex(index, new Document("name", name).append("background", true));
collection.createIndex(index, new IndexOptions().name(name).background(true));

for (final DBObject existingIndex : collection.getIndexInfo()) {
for (final Document existingIndex : collection.listIndexes()) {

if (existingIndex.get("key").equals(index)) {
return;
Expand Down
Loading