diff --git a/bin/kcl-bootstrap b/bin/kcl-bootstrap index 4c24ed32..9643bb04 100755 --- a/bin/kcl-bootstrap +++ b/bin/kcl-bootstrap @@ -33,13 +33,13 @@ var MAVEN_PACKAGE_LIST = [ getMavenPackageInfo('commons-logging', 'commons-logging', '1.1.3'), getMavenPackageInfo('commons-lang', 'commons-lang', '2.6'), getMavenPackageInfo('joda-time', 'joda-time', '2.8.1'), - getMavenPackageInfo('com.amazonaws', 'aws-java-sdk-core', '1.11.14'), - getMavenPackageInfo('com.amazonaws', 'aws-java-sdk-cloudwatch', '1.11.14'), - getMavenPackageInfo('com.amazonaws', 'aws-java-sdk-dynamodb', '1.11.14'), - getMavenPackageInfo('com.amazonaws', 'aws-java-sdk-kinesis', '1.11.14'), - getMavenPackageInfo('com.amazonaws', 'aws-java-sdk-kms', '1.11.14'), - getMavenPackageInfo('com.amazonaws', 'aws-java-sdk-s3', '1.11.14'), - getMavenPackageInfo('com.amazonaws', 'amazon-kinesis-client', '1.7.2'), + getMavenPackageInfo('com.amazonaws', 'aws-java-sdk-core', '1.11.151'), + getMavenPackageInfo('com.amazonaws', 'aws-java-sdk-cloudwatch', '1.11.151'), + getMavenPackageInfo('com.amazonaws', 'aws-java-sdk-dynamodb', '1.11.151'), + getMavenPackageInfo('com.amazonaws', 'aws-java-sdk-kinesis', '1.11.151'), + getMavenPackageInfo('com.amazonaws', 'aws-java-sdk-kms', '1.11.151'), + getMavenPackageInfo('com.amazonaws', 'aws-java-sdk-s3', '1.11.151'), + getMavenPackageInfo('com.amazonaws', 'amazon-kinesis-client', '1.7.6'), getMavenPackageInfo('com.fasterxml.jackson.core', 'jackson-databind', '2.6.6'), getMavenPackageInfo('com.fasterxml.jackson.core', 'jackson-core', '2.6.6'), getMavenPackageInfo('com.fasterxml.jackson.core', 'jackson-annotations', '2.6.0'), diff --git a/lib/kcl/kcl_manager.js b/lib/kcl/kcl_manager.js index 2f4a107b..8f1753c0 100644 --- a/lib/kcl/kcl_manager.js +++ b/lib/kcl/kcl_manager.js @@ -74,6 +74,10 @@ var KCLStateMachine = BehavioralFsm.extend({ this.transition(context, 'Processing'); return true; }, + beginShutdownRequested: function(context) { + this.transition(context, 'ShutdownRequested'); + return true; + }, beginShutdown: function(context) { this.transition(context, 'ShuttingDown'); return true; @@ -107,6 +111,30 @@ var KCLStateMachine = BehavioralFsm.extend({ return false; } }, + ShutdownRequestedCheckpointing: { + finishCheckpoint: function(context) { + this.transition(context, 'ShutdownRequested'); + return true; + }, + '*': function(context) { + this.transition(context, 'Error'); + return false; + } + }, + ShutdownRequested: { + beginCheckpoint: function(context) { + this.transition(context, 'ShutdownRequestedCheckpointing'); + return true; + }, + finishShutdownRequested: function(context) { + this.transition(context, 'Ready'); + return true; + }, + '*': function(context) { + this.transition(context, 'Error'); + return false; + } + }, ShuttingDown: { beginCheckpoint: function(context) { this.transition(context, 'FinalCheckpointing'); @@ -212,12 +240,17 @@ KCLManager.prototype.checkpoint = function(sequenceNumber) { */ KCLManager.prototype._onAction = function(action) { var actionType = action.action; - if (actionType === 'initialize' || actionType === 'processRecords' || actionType === 'shutdown') { + if (actionType === 'initialize' || + actionType === 'processRecords' || + actionType === 'shutdown') { this._onRecordProcessorAction(action); } else if (actionType === 'checkpoint') { this._onCheckpointAction(action); } + else if (actionType === 'shutdownRequested') { + this._onShutdownRequested(action); + } else { this._reportError(util.format('Invalid action received: %j', action)); } @@ -309,6 +342,33 @@ KCLManager.prototype._onCheckpointAction = function(action) { checkpointer.onCheckpointerResponse.apply(checkpointer, [action.error, action.sequenceNumber]); }; +/** + * Gets invoked when shutdownRequested is called. + * @param {Object} action - RecordProcessor related action + * @private + */ +KCLManager.prototype._onShutdownRequested = function(action) { + var context = this._context; + var recordProcessor = context.recordProcessor; + var recordProcessorFunc = recordProcessor.shutdownRequested; + + if (typeof recordProcessorFunc === 'function') { + var recordProcessorFuncInput = cloneToInput(action); + var checkpointer = context.checkpointer; + + this._handleStateInput(context, 'beginShutdownRequested'); + var callbackFunc = function() { + this._recordProcessorCallback(context, action, 'finishShutdownRequested'); + }.bind(this); + + recordProcessorFuncInput.checkpointer = checkpointer; + recordProcessorFunc.apply(recordProcessor, [recordProcessorFuncInput, callbackFunc]); + } + else { + this._sendAction(context, {action: 'status', responseFor: action.action}); + } +}; + /** * Sends the given action to the MultiLangDaemon. * @param {object} context - Record processor context for which this action belongs to. diff --git a/samples/basic_sample/consumer/sample_kcl_app.js b/samples/basic_sample/consumer/sample_kcl_app.js index 8afd8ff6..4c3d89db 100644 --- a/samples/basic_sample/consumer/sample_kcl_app.js +++ b/samples/basic_sample/consumer/sample_kcl_app.js @@ -66,6 +66,12 @@ function recordProcessor() { }); }, + shutdownRequested: function(shutdownRequestedInput, completeCallback) { + shutdownRequestedInput.checkpointer.checkpoint(function (err) { + completeCallback(); + }); + }, + shutdown: function(shutdownInput, completeCallback) { // Checkpoint should only be performed when shutdown reason is TERMINATE. if (shutdownInput.reason !== 'TERMINATE') {