22using System . Text . Json . Nodes ;
33using System . Threading . Tasks ;
44using Azure . Core ;
5+ using Azure . Storage . Blobs ;
6+ using Azure . Storage . Blobs . Models ;
57using Microsoft . Azure . Functions . Worker ;
68using Microsoft . Extensions . Logging ;
79using Microsoft . OneFuzz . Service . OneFuzzLib . Orm ;
@@ -60,7 +62,11 @@ public async Async.Task Run(
6062 try {
6163 var result = await FileAdded ( storageAccount , fileChangeEvent ) ;
6264 if ( ! result . IsOk ) {
63- await RequeueMessage ( msg , result . ErrorV . Code == ErrorCode . ADO_WORKITEM_PROCESSING_DISABLED ? TimeSpan . FromDays ( 1 ) : null ) ;
65+ if ( result . ErrorV . Code == ErrorCode . ADO_WORKITEM_PROCESSING_DISABLED ) {
66+ await RequeueMessage ( msg , TimeSpan . FromDays ( 1 ) , incrementDequeueCount : false ) ;
67+ } else {
68+ await RequeueMessage ( msg ) ;
69+ }
6470 }
6571 } catch ( Exception e ) {
6672 _log . LogError ( e , "File Added failed" ) ;
@@ -83,21 +89,26 @@ private async Async.Task<OneFuzzResultVoid> FileAdded(ResourceIdentifier storage
8389
8490 _log . LogInformation ( "file added : {Container} - {Path}" , container . String , path ) ;
8591
92+ var account = await _storage . GetBlobServiceClientForAccount ( storageAccount ) ;
93+ var containerClient = account . GetBlobContainerClient ( container . String ) ;
94+ var containerProps = await containerClient . GetPropertiesAsync ( ) ;
95+
96+ if ( _context . NotificationOperations . ShouldPauseNotificationsForContainer ( containerProps . Value . Metadata ) ) {
97+ return Error . Create ( ErrorCode . ADO_WORKITEM_PROCESSING_DISABLED , $ "container { container } has a metadata tag set to pause notifications processing") ;
98+ }
99+
86100 var ( _, result ) = await (
87- ApplyRetentionPolicy ( storageAccount , container , path ) ,
101+ ApplyRetentionPolicy ( containerClient , containerProps , path ) ,
88102 _notificationOperations . NewFiles ( container , path ) ) ;
89103
90104 return result ;
91105 }
92106
93- private async Async . Task < bool > ApplyRetentionPolicy ( ResourceIdentifier storageAccount , Container container , string path ) {
107+ private async Async . Task < bool > ApplyRetentionPolicy ( BlobContainerClient containerClient , BlobContainerProperties containerProps , string path ) {
94108 if ( await _context . FeatureManagerSnapshot . IsEnabledAsync ( FeatureFlagConstants . EnableContainerRetentionPolicies ) ) {
95109 // default retention period can be applied to the container
96110 // if one exists, we will set the expiry date on the newly-created blob, if it doesn't already have one
97- var account = await _storage . GetBlobServiceClientForAccount ( storageAccount ) ;
98- var containerClient = account . GetBlobContainerClient ( container . String ) ;
99- var containerProps = await containerClient . GetPropertiesAsync ( ) ;
100- var retentionPeriod = RetentionPolicyUtils . GetContainerRetentionPeriodFromMetadata ( containerProps . Value . Metadata ) ;
111+ var retentionPeriod = RetentionPolicyUtils . GetContainerRetentionPeriodFromMetadata ( containerProps . Metadata ) ;
101112 if ( ! retentionPeriod . IsOk ) {
102113 _log . LogError ( "invalid retention period: {Error}" , retentionPeriod . ErrorV ) ;
103114 } else if ( retentionPeriod . OkV is TimeSpan period ) {
@@ -116,7 +127,7 @@ private async Async.Task<bool> ApplyRetentionPolicy(ResourceIdentifier storageAc
116127 return false ;
117128 }
118129
119- private async Async . Task RequeueMessage ( string msg , TimeSpan ? visibilityTimeout = null ) {
130+ private async Async . Task RequeueMessage ( string msg , TimeSpan ? visibilityTimeout = null , bool incrementDequeueCount = true ) {
120131 var json = JsonNode . Parse ( msg ) ;
121132
122133 // Messages that are 'manually' requeued by us as opposed to being requeued by the azure functions runtime
@@ -135,7 +146,9 @@ await _context.Queue.QueueObject(
135146 StorageType . Config )
136147 . IgnoreResult ( ) ;
137148 } else {
138- json ! [ "data" ] ! [ "customDequeueCount" ] = newCustomDequeueCount + 1 ;
149+ if ( incrementDequeueCount ) {
150+ json ! [ "data" ] ! [ "customDequeueCount" ] = newCustomDequeueCount + 1 ;
151+ }
139152 await _context . Queue . QueueObject (
140153 QueueFileChangesQueueName ,
141154 json ,
0 commit comments