From 4b7fc3a23a3f63c696485037e9b3d3835aefb800 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Wed, 6 Mar 2019 10:13:06 -0800 Subject: [PATCH] Updating samples * Updating samples to use the new interfaces with LeaseLost and ShardEnded methods --- .../basic_sample/consumer/sample_kcl_app.js | 23 ++++++++----------- .../consumer/click_stream_consumer.js | 22 ++++++++++-------- 2 files changed, 23 insertions(+), 22 deletions(-) diff --git a/samples/basic_sample/consumer/sample_kcl_app.js b/samples/basic_sample/consumer/sample_kcl_app.js index c338ef96..e886dd14 100644 --- a/samples/basic_sample/consumer/sample_kcl_app.js +++ b/samples/basic_sample/consumer/sample_kcl_app.js @@ -15,9 +15,6 @@ permissions and limitations under the License. 'use strict'; - -var fs = require('fs'); -var path = require('path'); var util = require('util'); var kcl = require('../../..'); var logger = require('../../util/logger'); @@ -66,20 +63,20 @@ function recordProcessor() { }); }, - shutdownRequested: function(shutdownRequestedInput, completeCallback) { - shutdownRequestedInput.checkpointer.checkpoint(function (err) { + leaseLost: function(leaseLostInput, completeCallback) { + log.info(util.format('Lease was lost for ShardId: %s', shardId)); + completeCallback(); + }, + + shardEnded: function(shardEndedInput, completeCallback) { + log.info(util.format('ShardId: %s has ended. Will checkpoint now.', shardId)); + shardEndedInput.checkpointer.checkpoint(function(err) { completeCallback(); }); }, - shutdown: function(shutdownInput, completeCallback) { - // Checkpoint should only be performed when shutdown reason is TERMINATE. - if (shutdownInput.reason !== 'TERMINATE') { - completeCallback(); - return; - } - // Whenever checkpointing, completeCallback should only be invoked once checkpoint is complete. - shutdownInput.checkpointer.checkpoint(function(err) { + shutdownRequested: function(shutdownRequestedInput, completeCallback) { + shutdownRequestedInput.checkpointer.checkpoint(function (err) { completeCallback(); }); } diff --git a/samples/click_stream_sample/consumer/click_stream_consumer.js b/samples/click_stream_sample/consumer/click_stream_consumer.js index cdcaf5ba..dee6be15 100644 --- a/samples/click_stream_sample/consumer/click_stream_consumer.js +++ b/samples/click_stream_sample/consumer/click_stream_consumer.js @@ -162,25 +162,29 @@ function clickStreamProcessor(emitter, cfg) { /** * Called by the KCL to indicate that this record processor should shut down. * After the shutdown operation is complete, there will not be any more calls to - * any other functions of this record processor. Note that the shutdown reason - * could be either TERMINATE or ZOMBIE. If ZOMBIE, clients should not + * any other functions of this record processor. If lease is lost, clients should not * checkpoint because there is possibly another record processor which has - * acquired the lease for this shard. If TERMINATE, then + * acquired the lease for this shard. + */ + leaseLost: function(leaseLostInput, completeCallback) { + completeCallback(); + }, + + /** + * Called by the KCL to indicate that this record processor should shut down. + * After the shutdown operation is complete, there will not be any more calls to + * any other functions of this record processor. If shard has ended, then * checkpointer.checkpoint() should be called to checkpoint at the end of * the shard so that this processor will be shut down and new processors * will be created for the children of this shard. */ - shutdown: function(shutdownInput, completeCallback) { - if (shutdownInput.reason !== 'TERMINATE') { - completeCallback(); - return; - } + shardEnded: function(shardEndedInput, completeCallback) { // Make sure to emit all remaining buffered data to S3 before shutting down. commitQueue.push({ key: shardId + '/' + buffer.getFirstSequenceNumber() + '-' + buffer.getLastSequenceNumber(), sequenceNumber: buffer.getLastSequenceNumber(), data: buffer.readAndClearRecords(), - checkpointer: shutdownInput.checkpointer + checkpointer: shardEndedInput.checkpointer }, function(error) { if (error) { log.error(util.format('Received error while shutting down: %s', error));