|
1 | 1 | /* |
2 | | - * Copyright (c) 2008-2015 MongoDB, Inc. |
| 2 | + * Copyright 2008-2016 MongoDB, Inc. |
3 | 3 | * |
4 | 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | 5 | * you may not use this file except in compliance with the License. |
|
22 | 22 | import com.mongodb.diagnostics.logging.Logger; |
23 | 23 | import com.mongodb.diagnostics.logging.Loggers; |
24 | 24 | import com.mongodb.event.CommandListener; |
| 25 | +import org.bson.BsonBinaryReader; |
25 | 26 | import org.bson.BsonDocument; |
26 | | -import org.bson.BsonDocumentReader; |
27 | 27 | import org.bson.FieldNameValidator; |
28 | 28 | import org.bson.codecs.BsonDocumentCodec; |
29 | 29 | import org.bson.codecs.Decoder; |
30 | | -import org.bson.codecs.DecoderContext; |
| 30 | +import org.bson.codecs.RawBsonDocumentCodec; |
| 31 | +import org.bson.io.ByteBufferBsonInput; |
31 | 32 |
|
32 | 33 | import java.util.HashSet; |
33 | 34 | import java.util.Set; |
@@ -105,31 +106,41 @@ public T execute(final InternalConnection connection) { |
105 | 106 | long startTimeNanos = System.nanoTime(); |
106 | 107 | CommandMessage commandMessage = new CommandMessage(namespace.getFullName(), command, slaveOk, fieldNameValidator, |
107 | 108 | ProtocolHelper.getMessageSettings(connection.getDescription())); |
| 109 | + ResponseBuffers responseBuffers = null; |
108 | 110 | try { |
109 | 111 | sendMessage(commandMessage, connection); |
110 | | - ResponseBuffers responseBuffers = connection.receiveMessage(commandMessage.getId()); |
111 | | - ReplyMessage<BsonDocument> replyMessage; |
112 | | - try { |
113 | | - replyMessage = new ReplyMessage<BsonDocument>(responseBuffers, new BsonDocumentCodec(), commandMessage.getId()); |
114 | | - } finally { |
115 | | - responseBuffers.close(); |
| 112 | + responseBuffers = connection.receiveMessage(commandMessage.getId()); |
| 113 | + if (!ProtocolHelper.isCommandOk(new BsonBinaryReader(new ByteBufferBsonInput(responseBuffers.getBodyByteBuffer())))) { |
| 114 | + throw getCommandFailureException(getResponseDocument(responseBuffers, commandMessage, new BsonDocumentCodec()), |
| 115 | + connection.getDescription().getServerAddress()); |
116 | 116 | } |
117 | 117 |
|
118 | | - BsonDocument response = replyMessage.getDocuments().get(0); |
119 | | - if (!ProtocolHelper.isCommandOk(response)) { |
120 | | - throw getCommandFailureException(response, connection.getDescription().getServerAddress()); |
121 | | - } |
| 118 | + T retval = getResponseDocument(responseBuffers, commandMessage, commandResultDecoder); |
122 | 119 |
|
123 | | - T retval = commandResultDecoder.decode(new BsonDocumentReader(response), DecoderContext.builder().build()); |
124 | | - sendSucceededEvent(connection.getDescription(), startTimeNanos, commandMessage, response); |
| 120 | + if (commandListener != null) { |
| 121 | + sendSucceededEvent(connection.getDescription(), startTimeNanos, commandMessage, |
| 122 | + getResponseDocument(responseBuffers, commandMessage, new RawBsonDocumentCodec())); |
| 123 | + } |
125 | 124 | LOGGER.debug("Command execution completed"); |
126 | 125 | return retval; |
127 | 126 | } catch (RuntimeException e) { |
128 | 127 | sendFailedEvent(connection.getDescription(), startTimeNanos, commandMessage, e); |
129 | 128 | throw e; |
| 129 | + } finally { |
| 130 | + if (responseBuffers != null) { |
| 131 | + responseBuffers.close(); |
| 132 | + } |
130 | 133 | } |
131 | 134 | } |
132 | 135 |
|
| 136 | + private static <D> D getResponseDocument(final ResponseBuffers responseBuffers, final CommandMessage commandMessage, |
| 137 | + final Decoder<D> decoder) { |
| 138 | + responseBuffers.reset(); |
| 139 | + ReplyMessage<D> replyMessage = new ReplyMessage<D>(responseBuffers, decoder, commandMessage.getId()); |
| 140 | + |
| 141 | + return replyMessage.getDocuments().get(0); |
| 142 | + } |
| 143 | + |
133 | 144 | @Override |
134 | 145 | public void executeAsync(final InternalConnection connection, final SingleResultCallback<T> callback) { |
135 | 146 | long startTimeNanos = System.nanoTime(); |
@@ -222,42 +233,53 @@ private void sendFailedEvent(final ConnectionDescription connectionDescription, |
222 | 233 | } |
223 | 234 | } |
224 | 235 |
|
225 | | - class CommandResultCallback extends CommandResultBaseCallback<BsonDocument> { |
| 236 | + class CommandResultCallback extends ResponseCallback { |
226 | 237 | private final SingleResultCallback<T> callback; |
227 | 238 | private final CommandMessage message; |
228 | 239 | private final ConnectionDescription connectionDescription; |
229 | 240 | private final long startTimeNanos; |
230 | 241 |
|
231 | 242 | CommandResultCallback(final SingleResultCallback<T> callback, final CommandMessage message, |
232 | 243 | final ConnectionDescription connectionDescription, final long startTimeNanos) { |
233 | | - super(new BsonDocumentCodec(), message.getId(), connectionDescription.getServerAddress()); |
| 244 | + super(message.getId(), connectionDescription.getServerAddress()); |
234 | 245 | this.callback = callback; |
235 | 246 | this.message = message; |
236 | 247 | this.connectionDescription = connectionDescription; |
237 | 248 | this.startTimeNanos = startTimeNanos; |
238 | 249 | } |
239 | 250 |
|
240 | 251 | @Override |
241 | | - protected void callCallback(final BsonDocument response, final Throwable throwableFromCallback) { |
| 252 | + protected void callCallback(final ResponseBuffers responseBuffers, final Throwable throwableFromCallback) { |
242 | 253 | try { |
243 | 254 | if (throwableFromCallback != null) { |
244 | 255 | throw throwableFromCallback; |
245 | | - } else { |
246 | | - if (LOGGER.isDebugEnabled()) { |
247 | | - LOGGER.debug("Command execution completed with status " + ProtocolHelper.isCommandOk(response)); |
248 | | - } |
249 | | - if (!ProtocolHelper.isCommandOk(response)) { |
250 | | - throw getCommandFailureException(response, getServerAddress()); |
251 | | - } else { |
252 | | - sendSucceededEvent(connectionDescription, startTimeNanos, message, response); |
253 | | - callback.onResult(commandResultDecoder.decode(new BsonDocumentReader(response), DecoderContext.builder().build()), |
254 | | - null); |
255 | | - } |
256 | 256 | } |
| 257 | + |
| 258 | + if (LOGGER.isDebugEnabled()) { |
| 259 | + LOGGER.debug("Command execution completed"); |
| 260 | + } |
| 261 | + |
| 262 | + if (!ProtocolHelper.isCommandOk(new BsonBinaryReader(new ByteBufferBsonInput(responseBuffers.getBodyByteBuffer())))) { |
| 263 | + throw getCommandFailureException(getResponseDocument(responseBuffers, message, new BsonDocumentCodec()), |
| 264 | + connectionDescription.getServerAddress()); |
| 265 | + } |
| 266 | + |
| 267 | + if (commandListener != null) { |
| 268 | + sendSucceededEvent(connectionDescription, startTimeNanos, message, |
| 269 | + getResponseDocument(responseBuffers, message, new RawBsonDocumentCodec())); |
| 270 | + } |
| 271 | + callback.onResult(getResponseDocument(responseBuffers, message, commandResultDecoder), null); |
| 272 | + |
257 | 273 | } catch (Throwable t) { |
258 | 274 | sendFailedEvent(connectionDescription, startTimeNanos, message, t); |
259 | 275 | callback.onResult(null, t); |
| 276 | + } finally { |
| 277 | + if (responseBuffers != null) { |
| 278 | + responseBuffers.close(); |
| 279 | + } |
260 | 280 | } |
| 281 | + |
| 282 | + |
261 | 283 | } |
262 | 284 | } |
263 | 285 | } |
0 commit comments