@@ -25,43 +25,43 @@ protected class TracingAmazonDynamoDB(delegate : AmazonDynamoDB, eventStream :
25
25
26
26
def deleteItem (deleteItemRequest : DeleteItemRequest ) = {
27
27
deleteItemRequest.setReturnConsumedCapacity(ReturnConsumedCapacity .TOTAL )
28
- val (res, duration) = time (delegate.deleteItem(deleteItemRequest))
28
+ val (res, duration) = time (delegate.deleteItem(deleteItemRequest), deleteItemRequest.getTableName )
29
29
pub(DynamoRequestExecuted (Operation (deleteItemRequest.getTableName, Write , " DeleteItem" ), writeUnits = Option (scala.Double .unbox(res.getConsumedCapacity.getCapacityUnits)), duration = duration))
30
30
res
31
31
}
32
32
33
33
def getItem (getItemRequest : GetItemRequest ) = {
34
34
getItemRequest.setReturnConsumedCapacity(ReturnConsumedCapacity .TOTAL )
35
- val (res, duration) = time(delegate.getItem(getItemRequest))
35
+ val (res, duration) = time(delegate.getItem(getItemRequest), getItemRequest.getTableName )
36
36
pub(DynamoRequestExecuted (Operation (getItemRequest.getTableName, Read , " GetItem" ), readUnits = Option (scala.Double .unbox(res.getConsumedCapacity.getCapacityUnits)), duration = duration))
37
37
res
38
38
}
39
39
40
40
def scan (scanRequest : ScanRequest ) = {
41
41
scanRequest.setReturnConsumedCapacity(ReturnConsumedCapacity .TOTAL )
42
- val (res, duration) = time(delegate.scan(scanRequest))
42
+ val (res, duration) = time(delegate.scan(scanRequest), scanRequest.getTableName )
43
43
pub(DynamoRequestExecuted (Operation (scanRequest.getTableName, Read , " Scan" ), readUnits = Option (scala.Double .unbox(res.getConsumedCapacity.getCapacityUnits)), duration = duration))
44
44
res
45
45
}
46
46
47
47
48
48
def updateItem (updateItemRequest : UpdateItemRequest ) = {
49
49
updateItemRequest.setReturnConsumedCapacity(ReturnConsumedCapacity .TOTAL )
50
- val (res, duration) = time(delegate.updateItem(updateItemRequest))
50
+ val (res, duration) = time(delegate.updateItem(updateItemRequest), updateItemRequest.getTableName )
51
51
pub(DynamoRequestExecuted (Operation (updateItemRequest.getTableName, Write , " UpdateItem" ), writeUnits = Option (scala.Double .unbox(res.getConsumedCapacity.getCapacityUnits)), duration = duration))
52
52
res
53
53
}
54
54
55
55
def query (queryRequest : QueryRequest ) = {
56
56
queryRequest.setReturnConsumedCapacity(ReturnConsumedCapacity .TOTAL )
57
- val (res, duration) = time(delegate.query(queryRequest))
57
+ val (res, duration) = time(delegate.query(queryRequest), queryRequest.getTableName )
58
58
pub(DynamoRequestExecuted (Operation (queryRequest.getTableName, Read , " Query" ), readUnits = Option (scala.Double .unbox(res.getConsumedCapacity.getCapacityUnits)), duration = duration))
59
59
res
60
60
}
61
61
62
62
def putItem (putItemRequest : PutItemRequest ) = {
63
63
putItemRequest.setReturnConsumedCapacity(ReturnConsumedCapacity .TOTAL )
64
- val (res, duration) = time(delegate.putItem(putItemRequest))
64
+ val (res, duration) = time(delegate.putItem(putItemRequest), putItemRequest.getTableName )
65
65
pub(DynamoRequestExecuted (Operation (putItemRequest.getTableName, Write , " PutItem" ), writeUnits = Option (scala.Double .unbox(res.getConsumedCapacity.getCapacityUnits)), duration = duration))
66
66
res
67
67
}
@@ -70,30 +70,34 @@ protected class TracingAmazonDynamoDB(delegate : AmazonDynamoDB, eventStream :
70
70
71
71
def batchGetItem (batchGetItemRequest : BatchGetItemRequest ) = {
72
72
batchGetItemRequest.setReturnConsumedCapacity(ReturnConsumedCapacity .TOTAL )
73
- val (res, duration) = time(delegate.batchGetItem(batchGetItemRequest))
73
+ val (res, duration) = time(delegate.batchGetItem(batchGetItemRequest), batchGetItemRequest.getRequestItems.keySet().mkString( " , " ) )
74
74
75
75
res.getConsumedCapacity foreach {
76
- case consumedCapacity =>
76
+ case consumedCapacity =>
77
77
pub(DynamoRequestExecuted (Operation (consumedCapacity.getTableName(), Read , " BatchGetItem" ), readUnits = Option (scala.Double .unbox(consumedCapacity.getCapacityUnits)), duration = duration))
78
78
}
79
79
res
80
80
}
81
81
82
82
def batchWriteItem (batchWriteItemRequest : BatchWriteItemRequest ) = {
83
83
batchWriteItemRequest.setReturnConsumedCapacity(ReturnConsumedCapacity .TOTAL )
84
- val (res, duration) = time(delegate.batchWriteItem(batchWriteItemRequest))
84
+ val (res, duration) = time(delegate.batchWriteItem(batchWriteItemRequest),batchWriteItemRequest.getRequestItems.keySet().mkString( " , " ) )
85
85
res.getConsumedCapacity foreach {
86
- case consumedCapacity =>
86
+ case consumedCapacity =>
87
87
pub(DynamoRequestExecuted (Operation (consumedCapacity.getTableName(), Write , " BatchWriteItem" ), writeUnits = Option (scala.Double .unbox(consumedCapacity.getCapacityUnits)), duration = duration))
88
88
}
89
89
res
90
90
}
91
91
92
92
private def pub (op: DynamoRequestExecuted ) = eventStream.publish(op)
93
93
94
- def time [T ](f : => T ): (T , Long ) = {
94
+ def time [T ](f : => T , tables : => String ): (T , Long ) = {
95
95
val start = System .currentTimeMillis()
96
- val res = f
96
+ val res = try {
97
+ f
98
+ } finally {
99
+ case ptee : ProvisionedThroughputExceededException => new ProvisionedThroughputExceededException (s " provisioned throughput for the table(s) was exceeded: $tables" , ptee)
100
+ }
97
101
(res, System .currentTimeMillis() - start)
98
102
}
99
103
0 commit comments