Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
debug
  • Loading branch information
SylvainSenechal committed Nov 26, 2025
commit 4eb69f0b11d16c199808d92c36762981e199e877
7 changes: 7 additions & 0 deletions extensions/replication/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -865,12 +865,16 @@ class QueueProcessor extends EventEmitter {
* @return {undefined}
*/
processReplicationEntry(kafkaEntry, done) {
console.log('YYYYY 1: processReplicationEntry called');
const sourceEntry = QueueEntry.createFromKafkaEntry(kafkaEntry);
console.log('YYYYY 2: sourceEntry created:', sourceEntry.getLogInfo ? sourceEntry.getLogInfo() : 'no log info');
if (sourceEntry.error) {
console.log('YYYYY 3: sourceEntry has error:', sourceEntry.error);
this.logger.error('error processing replication entry', { error: sourceEntry.error });
return process.nextTick(() => done(errors.InternalError));
}
if (sourceEntry.skip) {
console.log('YYYYY 4: sourceEntry should be skipped');
// skip message, noop
return process.nextTick(done);
}
Expand All @@ -894,9 +898,12 @@ class QueueProcessor extends EventEmitter {
}
}
if (task) {
console.log('YYYYY 5: task created, type:', task.constructor.name);
this.logger.debug('replication entry is being pushed', { entry: sourceEntry.getLogInfo() });
console.log('YYYYY 6: pushing task to scheduler');
return this.taskScheduler.push({ task, entry: sourceEntry, kafkaEntry }, done);
}
console.log('YYYYY 7: no task created, skipping entry');
this.logger.debug('skip replication entry', { entry: sourceEntry.getLogInfo() });
return process.nextTick(done);
}
Expand Down
6 changes: 5 additions & 1 deletion extensions/replication/tasks/CopyLocationTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const {
const { LifecycleMetrics } = require('../../lifecycle/LifecycleMetrics');
const ReplicationMetric = require('../ReplicationMetric');
const ReplicationMetrics = require('../ReplicationMetrics');
const { TIMEOUT_MS } = require('../../../lib/clients/utils');
const { isRetryableMiddleware, TIMEOUT_MS } = require('../../../lib/clients/utils');
const { getAccountCredentials } =
require('../../../lib/credentials/AccountCredentials');
const RoleCredentials =
Expand Down Expand Up @@ -90,6 +90,10 @@ class CopyLocationTask extends BackbeatTask {
maxAttempts: 1,
requestHandler,
});
this.backbeatClient.middlewareStack.add(isRetryableMiddleware(), {
Copy link
Contributor Author

@SylvainSenechal SylvainSenechal Nov 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed as a lot of code relies on having the boolean err.retryable set for the retry logic, which is not the case with sdk v3 anymore.
Some of these retry logic is unit tested

step: 'deserialize',
priority: 'high',
});
this.backbeatMetadataProxy = new BackbeatMetadataProxy(
`${transport}://${s3.host}:${s3.port}`, auth, this.sourceHTTPAgent);
this.backbeatMetadataProxy
Expand Down
28 changes: 27 additions & 1 deletion extensions/replication/tasks/ReplicateObject.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const {
} = require('@scality/cloudserverclient');

const mapLimitWaitPendingIfError = require('../../../lib/util/mapLimitWaitPendingIfError');
const { attachReqUids, TIMEOUT_MS } = require('../../../lib/clients/utils');
const { attachReqUids, isRetryableMiddleware, TIMEOUT_MS } = require('../../../lib/clients/utils');
const getExtMetrics = require('../utils/getExtMetrics');
const BackbeatTask = require('../../../lib/tasks/BackbeatTask');
const { getAccountCredentials } = require('../../../lib/credentials/AccountCredentials');
Expand Down Expand Up @@ -178,6 +178,7 @@ class ReplicateObject extends BackbeatTask {
}

_getAndPutPart(sourceEntry, destEntry, part, log, cb) {
console.log('ZZZZZ 8: _getAndPutPart called for part:', part);
const partLogger = this.logger.newRequestLogger(log.getUids());
this.retry({
actionDesc: 'stream part data',
Expand Down Expand Up @@ -262,6 +263,7 @@ class ReplicateObject extends BackbeatTask {
}

_setupRolesOnce(entry, log, cb) {
console.log('ZZZZZ 3: _setupRolesOnce called, about to call getBucketReplication');
log.debug('getting bucket replication',
{ entry: entry.getLogInfo() });
const entryRolesString = entry.getReplicationRoles();
Expand All @@ -287,8 +289,10 @@ class ReplicateObject extends BackbeatTask {
const command = new GetBucketReplicationCommand(
{ Bucket: entry.getBucket() });
attachReqUids(command, log);
console.log('ZZZZZ 4: About to send GetBucketReplicationCommand');
return this.S3source.send(command)
.then(data => {
console.log('ZZZZZ 5: GetBucketReplicationCommand succeeded');
const replicationEnabled = (
data.ReplicationConfiguration.Rules.some(
rule => entry.getObjectKey().startsWith(rule.Prefix)
Expand Down Expand Up @@ -338,6 +342,8 @@ class ReplicateObject extends BackbeatTask {
return cb(null, roles[0], roles[1]);
})
.catch(err => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: using utils.callbackify() instead of promise.then() allows to keep the same exact structure as before... or you can at least put the .catch block before .then

console.log('ZZZZZ 6: GetBucketReplicationCommand FAILED with error:', err);
console.log('ZZZZZ 6b: Error properties - retryable:', err.retryable, '$retryable:', err.$retryable, 'code:', err.code, 'name:', err.name);
// eslint-disable-next-line no-param-reassign
err.origin = 'source';
log.error('error getting replication ' +
Expand Down Expand Up @@ -443,6 +449,7 @@ class ReplicateObject extends BackbeatTask {
}

_getAndPutData(sourceEntry, destEntry, log, cb) {
console.log('ZZZZZ 7: _getAndPutData called - about to replicate data');
log.debug('replicating data', { entry: sourceEntry.getLogInfo() });
if (sourceEntry.getLocation().some(part => {
const partObj = new ObjectMDLocation(part);
Expand Down Expand Up @@ -522,6 +529,7 @@ class ReplicateObject extends BackbeatTask {
}

_getAndPutPartOnce(sourceEntry, destEntry, part, log, done) {
console.log('ZZZZZ 9: _getAndPutPartOnce called - about to send GetObjectCommand');
const doneOnce = jsutil.once(done);
const partObj = new ObjectMDLocation(part);
const partNumber = partObj.getPartNumber();
Expand All @@ -541,8 +549,10 @@ class ReplicateObject extends BackbeatTask {
attachReqUids(command, log);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaces with a attribute in command param?

const readStartTime = Date.now();

console.log('ZZZZZ 10: Sending GetObjectCommand to S3source');
this.S3source.send(command, { abortSignal: abortController.signal })
.then(response => {
console.log('ZZZZZ 11: GetObjectCommand succeeded, got response');
const incomingMsg = response.Body;
incomingMsg.on('error', err => {
if (!sourceStreamAborted && !destRequestAborted) {
Expand Down Expand Up @@ -798,6 +808,12 @@ class ReplicateObject extends BackbeatTask {
},
maxAttempts: 1,
});

this.S3source.middlewareStack.add(isRetryableMiddleware(), {
step: 'deserialize',
priority: 'high',
});

const requestHandler = {
[this.sourceConfig.transport === 'https' ? 'httpsAgent' : 'httpAgent']: this.sourceHTTPAgent,
requestTimeout: TIMEOUT_MS,
Expand All @@ -813,6 +829,10 @@ class ReplicateObject extends BackbeatTask {
disableHostPrefix: true,
signingEscapePath: false,
});
this.backbeatSource.middlewareStack.add(isRetryableMiddleware(), {
step: 'deserialize',
priority: 'high',
});
this.backbeatSourceProxy = new BackbeatMetadataProxy(
`${this.sourceConfig.transport}://` +
`${sourceS3.host}:${sourceS3.port}`,
Expand Down Expand Up @@ -862,12 +882,18 @@ class ReplicateObject extends BackbeatTask {
maxAttempts: 1,
requestHandler,
});
this.backbeatDest.middlewareStack.add(isRetryableMiddleware(), {
step: 'deserialize',
priority: 'high',
});
}

processQueueEntry(sourceEntry, kafkaEntry, done) {
console.log('ZZZZZ 1: ReplicateObject.processQueueEntry called');
const log = this.logger.newRequestLogger();
const destEntry = sourceEntry.toReplicaEntry(this.site);

console.log('ZZZZZ 2: sourceEntry info:', sourceEntry.getLogInfo());
log.debug('processing entry',
{ entry: sourceEntry.getLogInfo() });

Expand Down
6 changes: 5 additions & 1 deletion lib/BackbeatMetadataProxy.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const errors = require('arsenal').errors;
const jsutil = require('arsenal').jsutil;
const { TIMEOUT_MS } = require('./clients/utils');
const { isRetryableMiddleware, TIMEOUT_MS } = require('./clients/utils');
const VaultClientCache = require('./clients/VaultClientCache');
const BackbeatTask = require('./tasks/BackbeatTask');
const RoleCredentials = require('./credentials/RoleCredentials');
Expand Down Expand Up @@ -351,6 +351,10 @@ class BackbeatMetadataProxy extends BackbeatTask {
maxAttempts: 1, // Disable retries, use our own retry policy
requestHandler,
});
this.backbeatSource.middlewareStack.add(isRetryableMiddleware(), {
step: 'deserialize',
priority: 'high',
});
return this;
}

Expand Down
14 changes: 0 additions & 14 deletions lib/KafkaBacklogMetrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -242,20 +242,6 @@ class KafkaBacklogMetrics extends EventEmitter {
const zkPath = this._getOffsetZkPath(
topic, partition, offsetType, label);
if (offset === undefined || offset === null) {
console.log('OFFSET ERROR: offset value is undefined or null', {
topic,
partition,
offsetType,
label,
offset,
});
this._log.error('offset value is undefined or null', {
topic,
partition,
offsetType,
label,
offset,
});
return cb(new Error('Invalid offset value'));
}
const zkData = Buffer.from(offset.toString());
Expand Down
6 changes: 5 additions & 1 deletion lib/clients/ClientManager.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const CredentialsManager = require('../credentials/CredentialsManager');
const BackbeatMetadataProxy = require('../BackbeatMetadataProxy');
const { createS3Client, TIMEOUT_MS } = require('./utils');
const { createS3Client, isRetryableMiddleware, TIMEOUT_MS } = require('./utils');
const { authTypeAssumeRole } = require('../constants');
const { http: HttpAgent, https: HttpsAgent } = require('httpagent');
const {
Expand Down Expand Up @@ -160,6 +160,10 @@ class ClientManager {
maxAttempts: 1,
requestHandler,
});
this.backbeatClients[accountId].middlewareStack.add(isRetryableMiddleware(), {
step: 'deserialize',
priority: 'high',
});

return this.backbeatClients[accountId];
}
Expand Down
47 changes: 47 additions & 0 deletions lib/clients/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,52 @@ function attachReqUids(s3req, log) {
);
}

// From : https://github.com/smithy-lang/smithy-typescript/blob/main/packages/service-error-classification/src/constants.ts
const transientErrors = new Set([500, 502, 503, 504]);
const nodejsTimeoutErrorCodes = new Set(["ECONNRESET", "ECONNREFUSED", "EPIPE", "ETIMEDOUT"]);
const transientErrorCodes = new Set(["TimeoutError", "RequestTimeout", "RequestTimeoutException"]);
const throttlingErrorCodes = new Set([
"BandwidthLimitExceeded",
"EC2ThrottledException",
"LimitExceededException",
"PriorRequestNotComplete",
"ProvisionedThroughputExceededException",
"RequestLimitExceeded",
"RequestThrottled",
"RequestThrottledException",
"SlowDown",
"ThrottledException",
"Throttling",
"ThrottlingException",
"TooManyRequestsException",
"TransactionInProgressException",
]);

function isRetryableMiddleware() {
return (next) => async (args) => {
try {
return await next(args);
} catch (error) {
// Set retryable flag. Logic similar to these documentations :
// https://github.com/smithy-lang/smithy-typescript/blob/main/packages/service-error-classification/src/index.ts
// https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html
const isRetryable = (statusCode, errorCode) => {
return transientErrors.has(statusCode) ||
nodejsTimeoutErrorCodes.has(errorCode || '') ||
transientErrorCodes.has(errorCode || '') ||
throttlingErrorCodes.has(errorCode || '');
};

const code = error.code || error.Code;
const retryable = isRetryable(error.$metadata?.httpStatusCode, code);
error.$retryable = retryable;
error.retryable = retryable;

throw error;
}
};
}

function createS3Client(params) {
const { transport, host, port, credentials, agent } = params;
// Determine what to pass as credentials to S3Client
Expand Down Expand Up @@ -54,5 +100,6 @@ function createS3Client(params) {
module.exports = {
attachReqUids,
createS3Client,
isRetryableMiddleware,
TIMEOUT_MS,
};
6 changes: 5 additions & 1 deletion lib/queuePopulator/IngestionProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const ObjectMD = require('arsenal').models.ObjectMD;
const VID_SEP = require('arsenal').versioning.VersioningConstants
.VersionId.Separator;

const { attachReqUids } = require('../clients/utils');
const { attachReqUids, isRetryableMiddleware } = require('../clients/utils');
const RaftLogEntry = require('../models/RaftLogEntry');
const IngestionPopulatorMetrics = require('./IngestionPopulatorMetrics');
const { http: HttpAgent, https: HttpsAgent } = require('httpagent');
Expand Down Expand Up @@ -102,6 +102,10 @@ class IngestionProducer {
maxAttempts: 1,
requestHandler,
});
this._ringReader.middlewareStack.add(isRetryableMiddleware(), {
step: 'deserialize',
priority: 'high',
});
const s3endpoint = process.env.CI === 'true' ?
`${protocol}://${host}:8000` :
endpoint;
Expand Down
30 changes: 21 additions & 9 deletions tests/functional/replication/queueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -1193,21 +1193,33 @@ describe('queue processor functional tests with mocking', () => {
[errors.InternalError].forEach(error => {
it(`replication should retry on ${error.code} ` +
`(${error.message}) from source S3 on ${action}`, done => {
console.log('XXXXX 1: Test starting for action:', action);
console.log('XXXXX 2: Error to install:', error);
s3mock.installS3ErrorResponder(
`source.s3.${action}`, error, { once: true });

async.parallel([
done => {
s3mock.onPutSourceMd = done;
},
done => queueProcessorSF.processReplicationEntry(
s3mock.getParam('kafkaEntry'), err => {
assert.ifError(err);
assert(s3mock.hasPutTargetData);
assert(s3mock.hasPutTargetMd);
assert.strictEqual(s3mock.partsDeleted.length, 0);
console.log('XXXXX 3: Waiting for onPutSourceMd callback');
s3mock.onPutSourceMd = () => {
console.log('XXXXX 7: onPutSourceMd called!');
done();
}),
};
},
done => {
console.log('XXXXX 4: Starting processReplicationEntry');
queueProcessorSF.processReplicationEntry(
s3mock.getParam('kafkaEntry'), err => {
console.log('XXXXX 5: processReplicationEntry callback, err:', err);
console.log('XXXXX 6: hasPutTargetData:', s3mock.hasPutTargetData);
console.log('XXXXX 6b: hasPutTargetMd:', s3mock.hasPutTargetMd);
assert.ifError(err);
assert(s3mock.hasPutTargetData);
assert(s3mock.hasPutTargetMd);
assert.strictEqual(s3mock.partsDeleted.length, 0);
done();
});
},
], done);
});
});
Expand Down
Loading