Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 10 additions & 13 deletions samples/basic_sample/consumer/sample_kcl_app.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ permissions and limitations under the License.

'use strict';


var fs = require('fs');
var path = require('path');
var util = require('util');
var kcl = require('../../..');
var logger = require('../../util/logger');
Expand Down Expand Up @@ -66,20 +63,20 @@ function recordProcessor() {
});
},

shutdownRequested: function(shutdownRequestedInput, completeCallback) {
shutdownRequestedInput.checkpointer.checkpoint(function (err) {
leaseLost: function(leaseLostInput, completeCallback) {
log.info(util.format('Lease was lost for ShardId: %s', shardId));
completeCallback();
},

shardEnded: function(shardEndedInput, completeCallback) {
log.info(util.format('ShardId: %s has ended. Will checkpoint now.', shardId));
shardEndedInput.checkpointer.checkpoint(function(err) {
completeCallback();
});
},

shutdown: function(shutdownInput, completeCallback) {
// Checkpoint should only be performed when shutdown reason is TERMINATE.
if (shutdownInput.reason !== 'TERMINATE') {
completeCallback();
return;
}
// Whenever checkpointing, completeCallback should only be invoked once checkpoint is complete.
shutdownInput.checkpointer.checkpoint(function(err) {
shutdownRequested: function(shutdownRequestedInput, completeCallback) {
shutdownRequestedInput.checkpointer.checkpoint(function (err) {
completeCallback();
});
}
Expand Down
22 changes: 13 additions & 9 deletions samples/click_stream_sample/consumer/click_stream_consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,25 +162,29 @@ function clickStreamProcessor(emitter, cfg) {
/**
* Called by the KCL to indicate that this record processor should shut down.
* After the shutdown operation is complete, there will not be any more calls to
* any other functions of this record processor. Note that the shutdown reason
* could be either TERMINATE or ZOMBIE. If ZOMBIE, clients should not
* any other functions of this record processor. If lease is lost, clients should not
* checkpoint because there is possibly another record processor which has
* acquired the lease for this shard. If TERMINATE, then
* acquired the lease for this shard.
*/
leaseLost: function(leaseLostInput, completeCallback) {
completeCallback();
},

/**
* Called by the KCL to indicate that this record processor should shut down.
* After the shutdown operation is complete, there will not be any more calls to
* any other functions of this record processor. If shard has ended, then
* checkpointer.checkpoint() should be called to checkpoint at the end of
* the shard so that this processor will be shut down and new processors
* will be created for the children of this shard.
*/
shutdown: function(shutdownInput, completeCallback) {
if (shutdownInput.reason !== 'TERMINATE') {
completeCallback();
return;
}
shardEnded: function(shardEndedInput, completeCallback) {
// Make sure to emit all remaining buffered data to S3 before shutting down.
commitQueue.push({
key: shardId + '/' + buffer.getFirstSequenceNumber() + '-' + buffer.getLastSequenceNumber(),
sequenceNumber: buffer.getLastSequenceNumber(),
data: buffer.readAndClearRecords(),
checkpointer: shutdownInput.checkpointer
checkpointer: shardEndedInput.checkpointer
}, function(error) {
if (error) {
log.error(util.format('Received error while shutting down: %s', error));
Expand Down