Skip to content

Commit 7bb5b6e

Browse files
cleaning pr
1 parent 4eb69f0 commit 7bb5b6e

File tree

17 files changed

+88
-126
lines changed

17 files changed

+88
-126
lines changed

bin/ingestion.js

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -281,31 +281,27 @@ function initAndStart(zkClient) {
281281
loadProcessors(zkClient);
282282

283283
async.series([
284-
done => {
285-
ingestionPopulator.open(done);
286-
},
284+
done => ingestionPopulator.open(done),
287285
done => {
288286
scheduler = schedule.scheduleJob(ingestionExtConfigs.cronRule,
289287
() => queueBatch(ingestionPopulator, log));
290288
return done();
291289
},
292-
done => {
293-
startProbeServer(ingestionExtConfigs.probeServer, (err, probeServer) => {
294-
if (err) {
295-
log.error('error starting probe server', { error: err });
296-
return done(err);
297-
}
298-
if (probeServer !== undefined) {
299-
// following the same pattern as other extensions, where liveness
300-
// and readiness are handled by the same handler
301-
probeServer.addHandler([DEFAULT_LIVE_ROUTE, DEFAULT_READY_ROUTE], handleLiveness);
302-
// retaining the old route and adding support to new route, until
303-
// metrics handling is consolidated
304-
probeServer.addHandler(['/_/monitoring/metrics', DEFAULT_METRICS_ROUTE], handleMetrics);
305-
}
306-
return done();
307-
});
308-
},
290+
done => startProbeServer(ingestionExtConfigs.probeServer, (err, probeServer) => {
291+
if (err) {
292+
log.error('error starting probe server', { error: err });
293+
return done(err);
294+
}
295+
if (probeServer !== undefined) {
296+
// following the same pattern as other extensions, where liveness
297+
// and readiness are handled by the same handler
298+
probeServer.addHandler([DEFAULT_LIVE_ROUTE, DEFAULT_READY_ROUTE], handleLiveness);
299+
// retaining the old route and adding support to new route, until
300+
// metrics handling is consolidated
301+
probeServer.addHandler(['/_/monitoring/metrics', DEFAULT_METRICS_ROUTE], handleMetrics);
302+
}
303+
return done();
304+
}),
309305
], err => {
310306
if (err) {
311307
log.fatal('error during ingestion populator initialization', {

extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ class LifecycleBucketProcessor {
139139
logFields: { value },
140140
actionFunc: (done, nbRetries) => task.processBucketEntry(
141141
rules, value, s3target, backbeatMetadataProxy, nbRetries, done),
142-
shouldRetryFunc: err => err.retryable || err.$retryable,
142+
shouldRetryFunc: err => err.retryable,
143143
log: this._log,
144144
}, cb);
145145
}, this._lcConfig.bucketProcessor.concurrency);
@@ -373,7 +373,7 @@ class LifecycleBucketProcessor {
373373
.then(data => done(null, data))
374374
.catch(done);
375375
},
376-
shouldRetryFunc: err => err.retryable || err.$retryable,
376+
shouldRetryFunc: err => err.retryable,
377377
log: this._log,
378378
}, cb);
379379
}

extensions/lifecycle/conductor/LifecycleConductor.js

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -331,16 +331,15 @@ class LifecycleConductor {
331331
unknownCanonicalIds,
332332
10,
333333
(canonicalId, done) => {
334-
log.info('AAAAA LC-1: requesting account ID from Vault', {
335-
method: 'LifecycleConductor._getAccountIds',
336-
canonicalId,
337-
});
338334
this.vaultClientWrapper.getAccountIds([canonicalId], (err, accountIds) => {
339335
// TODO: BB-344 fixes me
340336
// LifecycleMetrics.onVaultRequest(this.logger, 'getAccountIds', err);
341337
if (err) {
342338
if (err.NoSuchEntity) {
339+
log.error('canonical id does not exist', { error: err, canonicalId });
343340
this._accountIdCache.miss(canonicalId);
341+
} else {
342+
log.error('could not get account id', { error: err, canonicalId });
344343
}
345344

346345
// don't propagate the error, to avoid interrupting the whole cargo

extensions/lifecycle/management.js

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,14 @@ function putLifecycleConfiguration(bucketName, workflows, cb) {
8585
await getS3Client(endpoint).send(command);
8686
logger.debug('lifecycle configuration apply done', {
8787
bucket: bucketName });
88-
cb();
88+
return cb();
8989
} catch (err) {
9090
logger.debug('lifecycle configuration apply done', {
9191
bucket: bucketName, error: err });
9292
if (err.name === 'NoSuchBucket') {
93-
cb();
94-
} else {
95-
cb(err);
93+
return cb();
9694
}
95+
return cb(err);
9796
}
9897
})();
9998
}
@@ -113,15 +112,14 @@ function deleteLifecycleConfiguration(bucketName, cb) {
113112
await getS3Client(endpoint).send(command);
114113
logger.debug('lifecycle configuration deleted', {
115114
bucket: bucketName });
116-
cb();
115+
return cb();
117116
} catch (err) {
118117
logger.debug('lifecycle configuration deleted', {
119118
bucket: bucketName, error: err });
120119
if (err.name === 'NoSuchBucket') {
121-
cb();
122-
} else {
123-
cb(err);
120+
return cb();
124121
}
122+
return cb(err);
125123
}
126124
})();
127125
}

extensions/lifecycle/tasks/LifecycleDeleteObjectTask.js

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ class LifecycleDeleteObjectTask extends BackbeatTask {
5656
// In this case, instead of logging an error, it should be logged as a debug message,
5757
// to avoid causing unnecessary concern to the customer.
5858
// TODO: BB-612
59-
6059
const logLevel = err.code === 'InvalidBucketState' ? 'debug' : 'error';
6160
log[logLevel]('error getting metadata blob from S3', Object.assign({
6261
method: 'LifecycleDeleteObjectTask._getMetadata',
@@ -91,8 +90,6 @@ class LifecycleDeleteObjectTask extends BackbeatTask {
9190
const bucket = entry.getAttribute('target.bucket');
9291
const key = entry.getAttribute('target.key');
9392
const lastModified = entry.getAttribute('details.lastModified');
94-
console.log('KKKKKKK 1', lastModified)
95-
console.log('KKKKKKK 2', typeof lastModified)
9693
if (lastModified) {
9794
const reqParams = {
9895
Bucket: bucket,
@@ -213,7 +210,7 @@ class LifecycleDeleteObjectTask extends BackbeatTask {
213210
LifecycleMetrics.onLifecycleCompleted(log,
214211
actionType === 'deleteMPU' ? 'expiration:mpu' : 'expiration',
215212
location, Date.now() - entry.getAttribute('transitionTime'));
216-
if (err) {
213+
if (err) {
217214
log.error(
218215
`an error occurred on ${reqMethod} to S3`, Object.assign({
219216
method: 'LifecycleDeleteObjectTask._executeDelete',

extensions/lifecycle/tasks/LifecycleTask.js

Lines changed: 21 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,18 +1028,9 @@ class LifecycleTask extends BackbeatTask {
10281028
* (i.e. transition) should not apply.
10291029
*/
10301030
_checkAndApplyExpirationRule(bucketData, obj, rules, log) {
1031-
console.log('KKKKKKK 3', obj.LastModified)
1032-
console.log('KKKKKKK 4.1', typeof obj.LastModified)
1033-
console.log('KKKKKKK 4.2', new Date(obj.LastModified))
1034-
// todo get transition timestamp check
10351031
const daysSinceInitiated = this._lifecycleDateTime.findDaysSince(
10361032
new Date(obj.LastModified)
10371033
);
1038-
console.log('KKKKKKK 4.3', daysSinceInitiated)
1039-
const tmp = this._lifecycleDateTime.getTransitionTimestamp(
1040-
rules.Expiration, obj.LastModified)
1041-
console.log('KKKKKKK 4.4', tmp)
1042-
10431034
const currentDate = this._lifecycleDateTime.getCurrentDate();
10441035

10451036
if (rules.Expiration.Date &&
@@ -1597,8 +1588,6 @@ class LifecycleTask extends BackbeatTask {
15971588
return this.s3target.send(command)
15981589
.then(data => {
15991590
LifecycleMetrics.onS3Request(log, 'headObject', 'bucket', null);
1600-
console.log('KKKKKKK 6', data.LastModified)
1601-
console.log('KKKKKKK 6.5', typeof data.LastModified)
16021591
const lastModified = data.LastModified instanceof Date
16031592
? data.LastModified.toISOString()
16041593
: data.LastModified;
@@ -1614,31 +1603,30 @@ class LifecycleTask extends BackbeatTask {
16141603
return done();
16151604
}
16161605
if (rules.Transition) {
1617-
return this._applyTransitionRule({
1618-
owner: bucketData.target.owner,
1619-
accountId: bucketData.target.accountId,
1606+
return this._applyTransitionRule({
1607+
owner: bucketData.target.owner,
1608+
accountId: bucketData.target.accountId,
1609+
bucket: bucketData.target.bucket,
1610+
objectKey: obj.Key,
1611+
eTag: obj.ETag,
1612+
lastModified: obj.LastModified,
1613+
site: rules.Transition.StorageClass,
1614+
transitionTime: this._lifecycleDateTime.getTransitionTimestamp(
1615+
rules.Transition, object.LastModified),
1616+
}, log, done);
1617+
}
1618+
return done();
1619+
})
1620+
.catch(err => {
1621+
LifecycleMetrics.onS3Request(log, 'headObject', 'bucket', err);
1622+
log.error('failed to get object', {
1623+
method: 'LifecycleTask._compareObject',
1624+
error: err,
16201625
bucket: bucketData.target.bucket,
16211626
objectKey: obj.Key,
1622-
eTag: obj.ETag,
1623-
lastModified: obj.LastModified,
1624-
site: rules.Transition.StorageClass,
1625-
transitionTime: this._lifecycleDateTime.getTransitionTimestamp(
1626-
rules.Transition, object.LastModified),
1627-
}, log, done);
1628-
}
1629-
1630-
return done();
1631-
})
1632-
.catch(err => {
1633-
LifecycleMetrics.onS3Request(log, 'headObject', 'bucket', err);
1634-
log.error('failed to get object', {
1635-
method: 'LifecycleTask._compareObject',
1636-
error: err,
1637-
bucket: bucketData.target.bucket,
1638-
objectKey: obj.Key,
1627+
});
1628+
return done(err);
16391629
});
1640-
return done(err);
1641-
});
16421630
}
16431631

16441632
/**
@@ -1757,22 +1745,15 @@ class LifecycleTask extends BackbeatTask {
17571745
const filteredRules = this._lifecycleUtils.filterRules(bucketLCRules, upload, noTags);
17581746
const aRules = this._lifecycleUtils.getApplicableRules(filteredRules, {});
17591747

1760-
console.log('UUUUUU 1', upload.Initiated);
1761-
console.log('UUUUUU 2', typeof upload.Initiated);
1762-
console.log('UUUUUU 3', upload.Initiated instanceof Date);
17631748
const daysSinceInitiated = this._lifecycleDateTime.findDaysSince(
17641749
new Date(upload.Initiated)
17651750
);
1766-
console.log('UUUUUU 4', daysSinceInitiated);
17671751
const abortRule = aRules.AbortIncompleteMultipartUpload;
1768-
console.log('UUUUUU 5', abortRule);
1769-
console.log('UUUUUU 6', abortRule ? abortRule.DaysAfterInitiation : 'no abort rule');
17701752

17711753
// NOTE: DaysAfterInitiation can be 0 in tests
17721754
const doesAbortRuleApply = (abortRule &&
17731755
abortRule.DaysAfterInitiation !== undefined &&
17741756
daysSinceInitiated >= abortRule.DaysAfterInitiation);
1775-
console.log('UUUUUU 7', doesAbortRuleApply);
17761757
if (doesAbortRuleApply) {
17771758
log.debug('send mpu upload for aborting', {
17781759
bucket: bucketData.target.bucket,
@@ -1795,7 +1776,6 @@ class LifecycleTask extends BackbeatTask {
17951776
this._lifecycleDateTime.getTransitionTimestamp(
17961777
{ Days: abortRule.DaysAfterInitiation }, upload.Initiated)
17971778
);
1798-
console.log('UUUUUU 8', 'Sending deleteMPU action for upload:', upload.Key);
17991779
this._sendObjectAction(entry, err => {
18001780
if (!err) {
18011781
log.debug('sent object entry for consumption',

extensions/replication/queueProcessor/QueueProcessor.js

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -865,16 +865,12 @@ class QueueProcessor extends EventEmitter {
865865
* @return {undefined}
866866
*/
867867
processReplicationEntry(kafkaEntry, done) {
868-
console.log('YYYYY 1: processReplicationEntry called');
869868
const sourceEntry = QueueEntry.createFromKafkaEntry(kafkaEntry);
870-
console.log('YYYYY 2: sourceEntry created:', sourceEntry.getLogInfo ? sourceEntry.getLogInfo() : 'no log info');
871869
if (sourceEntry.error) {
872-
console.log('YYYYY 3: sourceEntry has error:', sourceEntry.error);
873870
this.logger.error('error processing replication entry', { error: sourceEntry.error });
874871
return process.nextTick(() => done(errors.InternalError));
875872
}
876873
if (sourceEntry.skip) {
877-
console.log('YYYYY 4: sourceEntry should be skipped');
878874
// skip message, noop
879875
return process.nextTick(done);
880876
}
@@ -898,12 +894,9 @@ class QueueProcessor extends EventEmitter {
898894
}
899895
}
900896
if (task) {
901-
console.log('YYYYY 5: task created, type:', task.constructor.name);
902897
this.logger.debug('replication entry is being pushed', { entry: sourceEntry.getLogInfo() });
903-
console.log('YYYYY 6: pushing task to scheduler');
904898
return this.taskScheduler.push({ task, entry: sourceEntry, kafkaEntry }, done);
905899
}
906-
console.log('YYYYY 7: no task created, skipping entry');
907900
this.logger.debug('skip replication entry', { entry: sourceEntry.getLogInfo() });
908901
return process.nextTick(done);
909902
}

extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ class ReplicationStatusProcessor {
448448
method: 'ReplicationStatusProcessor.stop',
449449
});
450450
return next();
451-
},
451+
}
452452
], done);
453453
}
454454

extensions/replication/tasks/CopyLocationTask.js

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -229,14 +229,12 @@ class CopyLocationTask extends BackbeatTask {
229229
const doneOnce = jsutil.once(done);
230230
const size = objMD.getContentLength();
231231

232-
// Use abort controller to cancel requests on error
233232
const abortController = new AbortController();
234233
let sourceStreamAborted = false;
235234

236235
const performPutObject = incomingMsg => {
237236
log.debug('putting data', actionEntry.getLogInfo());
238237

239-
// Set up stream error handler if we have a stream
240238
if (incomingMsg) {
241239
incomingMsg.on('error', err => {
242240
if (!sourceStreamAborted) {
@@ -265,6 +263,7 @@ class CopyLocationTask extends BackbeatTask {
265263
if (err && incomingMsg && !sourceStreamAborted) {
266264
// Abort the source stream on PUT error
267265
sourceStreamAborted = true;
266+
abortController.abort();
268267
if (incomingMsg.destroy) {
269268
incomingMsg.destroy();
270269
}
@@ -327,8 +326,6 @@ class CopyLocationTask extends BackbeatTask {
327326
_sendMultipleBackendPutObject(actionEntry, objMD, size,
328327
incomingMsg, log, cb) {
329328
const { bucket, key, version } = actionEntry.getAttribute('target');
330-
const userMetadata = objMD.getUserMetadata();
331-
332329
const command = new MultipleBackendPutObjectCommand({
333330
Bucket: bucket,
334331
Key: key,
@@ -337,7 +334,7 @@ class CopyLocationTask extends BackbeatTask {
337334
StorageType: this.destType,
338335
StorageClass: this.site,
339336
VersionId: version,
340-
UserMetaData: userMetadata,
337+
UserMetaData: objMD.getUserMetadata(),
341338
ContentType: objMD.getContentType() || undefined,
342339
CacheControl: objMD.getCacheControl() || undefined,
343340
ContentDisposition:
@@ -415,7 +412,6 @@ class CopyLocationTask extends BackbeatTask {
415412
}, actionEntry.getLogInfo()));
416413
// A 0-byte object has no range, otherwise range is inclusive.
417414
const size = range ? range.end - range.start + 1 : 0;
418-
// Create AbortController even for 0-byte parts for consistency
419415
const abortController = new AbortController();
420416

421417
const { bucket, key, version } = actionEntry.getAttribute('target');
@@ -445,7 +441,7 @@ class CopyLocationTask extends BackbeatTask {
445441
return done(err);
446442
});
447443
}
448-
// For 0-byte parts, pass undefined body but still provide abortController
444+
449445
return this._putMPUPart(actionEntry, objMD, undefined, size,
450446
uploadId, partNumber, log, abortController, done);
451447
}
@@ -589,7 +585,6 @@ class CopyLocationTask extends BackbeatTask {
589585
const command = new MultipleBackendPutMPUPartCommand({
590586
Bucket: bucket,
591587
Key: key,
592-
ContentLength: size,
593588
StorageType: this.destType,
594589
StorageClass: this.site,
595590
PartNumber: partNumber,
@@ -806,11 +801,8 @@ class CopyLocationTask extends BackbeatTask {
806801
if (err) {
807802
return next(err);
808803
}
809-
const partNumber = typeof data.partNumber === 'string'
810-
? parseInt(data.partNumber, 10)
811-
: data.partNumber;
812804
const res = {
813-
PartNumber: [partNumber],
805+
PartNumber: [parseInt(data.partNumber, 10)],
814806
ETag: [data.ETag],
815807
Size: [ranges[n].end - ranges[n].start + 1],
816808
};

0 commit comments

Comments
 (0)