Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
### Breaking Changes

### Bugs Fixed
- Fixed an issue where polling for new blobs could miss recently created or updated blobs when the List Blobs/Get Blobs operation spans multiple requests. If a blob is added or modified after LogScan starts and appears in a later segment of the listing, any blobs changed between the initial listing start time and the last modified timestamp of those later blobs were not flagged as new in the subsequent LogScan.
- Bug fix ensuring that BlobTrigger log scan targets the correct storage account in multi-account scenarios.

### Other Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ internal class ContainerScanInfo
{
public ICollection<ITriggerExecutor<BlobTriggerExecutorContext>> Registrations { get; set; }

public DateTime LastSweepCycleLatestModified { get; set; }
public DateTimeOffset PollingStartTime { get; set; }

public DateTime CurrentSweepCycleLatestModified { get; set; }
public DateTimeOffset LastSweepCycleLatestModified { get; set; }

public DateTimeOffset CurrentSweepCycleLatestModified { get; set; }

public string ContinuationToken { get; set; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Listeners
{
internal interface IBlobScanInfoManager
{
Task<DateTime?> LoadLatestScanAsync(string storageAccountName, string containerName);
Task UpdateLatestScanAsync(string storageAccountName, string containerName, DateTime scanInfo);
Task<DateTimeOffset?> LoadLatestScanAsync(string storageAccountName, string containerName);
Task UpdateLatestScanAsync(string storageAccountName, string containerName, DateTimeOffset scanInfo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ private static class Logger
LoggerMessage.Define<string>(LogLevel.Debug, new EventId(3, nameof(ContainerDoesNotExist)),
"Container '{containerName}' does not exist.");

public static void InitializedScanInfo(ILogger<BlobListener> logger, string container, DateTime latestScanInfo) =>
public static void InitializedScanInfo(ILogger<BlobListener> logger, string container, DateTimeOffset latestScanInfo) =>
_initializedScanInfo(logger, container, latestScanInfo.ToString(Constants.DateTimeFormatString, CultureInfo.InvariantCulture), null);

public static void PollBlobContainer(ILogger<BlobListener> logger, string container, DateTime latestScanInfo, string clientRequestId, int blobCount, long latencyInMilliseconds, bool hasContinuationToken) =>
public static void PollBlobContainer(ILogger<BlobListener> logger, string container, DateTimeOffset latestScanInfo, string clientRequestId, int blobCount, long latencyInMilliseconds, bool hasContinuationToken) =>
_pollBlobContainer(logger, latestScanInfo.ToString(Constants.DateTimeFormatString, CultureInfo.InvariantCulture), container, clientRequestId, blobCount, latencyInMilliseconds, hasContinuationToken, null);

public static void ContainerDoesNotExist(ILogger<BlobListener> logger, string container) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public async Task RegisterAsync(BlobServiceClient blobServiceClient, BlobContain
if (!_scanInfo.TryGetValue(container, out ContainerScanInfo containerScanInfo))
{
// First, try to load serialized scanInfo for this container.
DateTime? latestStoredScan = await _blobScanInfoManager.LoadLatestScanAsync(blobServiceClient.AccountName, container.Name).ConfigureAwait(false);
DateTimeOffset? latestStoredScan = await _blobScanInfoManager.LoadLatestScanAsync(blobServiceClient.AccountName, container.Name).ConfigureAwait(false);

containerScanInfo = new ContainerScanInfo()
{
Expand Down Expand Up @@ -138,7 +138,7 @@ private async Task PollAndNotify(BlobContainerClient container, ContainerScanInf
List<BlobNotification> failedNotifications, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
DateTime lastScan = containerScanInfo.LastSweepCycleLatestModified;
DateTimeOffset lastScan = containerScanInfo.LastSweepCycleLatestModified;

// For tracking
string clientRequestId = Guid.NewGuid().ToString();
Expand All @@ -154,7 +154,7 @@ private async Task PollAndNotify(BlobContainerClient container, ContainerScanInf
// if the 'LatestModified' has changed, update it in the manager
if (containerScanInfo.LastSweepCycleLatestModified > lastScan)
{
DateTime latestScan = containerScanInfo.LastSweepCycleLatestModified;
DateTimeOffset latestScan = containerScanInfo.LastSweepCycleLatestModified;

// It's possible that we had some blobs that we failed to move to the queue. We want to make sure
// we continue to find these if the host needs to restart.
Expand Down Expand Up @@ -217,9 +217,10 @@ public async Task<IEnumerable<BlobBaseClient>> PollNewBlobsAsync(
string continuationToken = containerScanInfo.ContinuationToken;
Page<BlobItem> page;

// if starting the cycle, reset the sweep time
// if starting the cycle, reset the sweep time and set start time
if (continuationToken == null)
{
containerScanInfo.PollingStartTime = DateTimeOffset.UtcNow;
containerScanInfo.CurrentSweepCycleLatestModified = DateTime.MinValue;
}

Expand Down Expand Up @@ -266,14 +267,16 @@ public async Task<IEnumerable<BlobBaseClient>> PollNewBlobsAsync(
var properties = currentBlob.Properties;
DateTime lastModifiedTimestamp = properties.LastModified.Value.UtcDateTime;

if (lastModifiedTimestamp > containerScanInfo.CurrentSweepCycleLatestModified)
if (lastModifiedTimestamp > containerScanInfo.CurrentSweepCycleLatestModified &&
(continuationToken == null || lastModifiedTimestamp <= containerScanInfo.PollingStartTime))
{
containerScanInfo.CurrentSweepCycleLatestModified = lastModifiedTimestamp;
}

// Blob timestamps are rounded to the nearest second, so make sure we continue to check
// the previous timestamp to catch any blobs that came in slightly after our previous poll.
if (lastModifiedTimestamp >= containerScanInfo.LastSweepCycleLatestModified)
if (lastModifiedTimestamp >= containerScanInfo.LastSweepCycleLatestModified &&
lastModifiedTimestamp <= containerScanInfo.PollingStartTime)
{
newBlobs.Add(container.GetBlobClient(currentBlob.Name));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ public StorageBlobScanInfoManager(string hostId, BlobServiceClient blobClient)
_blobContainerClient = blobClient.GetBlobContainerClient(HostContainerNames.Hosts);
}

public async Task<DateTime?> LoadLatestScanAsync(string storageAccountName, string containerName)
public async Task<DateTimeOffset?> LoadLatestScanAsync(string storageAccountName, string containerName)
{
var scanInfoBlob = GetScanInfoBlobReference(storageAccountName, containerName);
DateTime? latestScan = null;
DateTimeOffset? latestScan = null;
try
{
string scanInfoLine = await scanInfoBlob.DownloadTextAsync(CancellationToken.None).ConfigureAwait(false);
Expand Down Expand Up @@ -74,7 +74,7 @@ public StorageBlobScanInfoManager(string hostId, BlobServiceClient blobClient)
}
}

public async Task UpdateLatestScanAsync(string storageAccountName, string containerName, DateTime latestScan)
public async Task UpdateLatestScanAsync(string storageAccountName, string containerName, DateTimeOffset latestScan)
{
string scanInfoLine;
ScanInfo scanInfo = new ScanInfo
Expand Down Expand Up @@ -109,7 +109,7 @@ private BlockBlobClient GetScanInfoBlobReference(string storageAccountName, stri

internal class ScanInfo
{
public DateTime LatestScan { get; set; }
public DateTimeOffset LatestScan { get; set; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
using Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Tests;
using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Listeners;
using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Tests;
using Microsoft.Azure.WebJobs.Host.Executors;
Expand Down Expand Up @@ -306,7 +307,7 @@ public async Task RegisterAsync_InitializesWithScanInfoManager()
// delay slightly so we guarantee a later timestamp
await Task.Delay(10);

await scanInfoManager.UpdateLatestScanAsync(AccountName, ContainerName, DateTime.UtcNow);
await scanInfoManager.UpdateLatestScanAsync(AccountName, ContainerName, DateTimeOffset.UtcNow);
await product.RegisterAsync(_blobClientMock.Object, container, executor, CancellationToken.None);

// delay slightly so we guarantee a later timestamp
Expand Down Expand Up @@ -422,7 +423,7 @@ public async Task ExecuteAsync_UpdatesScanInfo_WithEarliestFailure()
int testScanBlobLimitPerPoll = 6;

// we'll introduce multiple errors to make sure we take the earliest timestamp
DateTime earliestErrorTime = DateTime.UtcNow;
DateTimeOffset earliestErrorTime = DateTimeOffset.UtcNow.AddHours(-1);

var container = _blobContainerMock.Object;

Expand Down Expand Up @@ -455,13 +456,98 @@ public async Task ExecuteAsync_UpdatesScanInfo_WithEarliestFailure()

RunExecuteWithMultiPollingInterval(expectedNames, product, executor, testScanBlobLimitPerPoll);

DateTime? storedTime = await testScanInfoManager.LoadLatestScanAsync(accountName, ContainerName);
DateTimeOffset? storedTime = await testScanInfoManager.LoadLatestScanAsync(accountName, ContainerName);
Assert.True(storedTime < earliestErrorTime);
Assert.AreEqual(1, testScanInfoManager.UpdateCounts[accountName][ContainerName]);
_blobContainerMock.Verify(x => x.GetBlobsAsync(It.IsAny<BlobTraits>(), It.IsAny<BlobStates>(), It.IsAny<string>(), It.IsAny<CancellationToken>()),
Times.Exactly(2));
}

[Test]
public async Task ExecuteAsync_PollNewBlobsAsync_ContinuationTokenUpdatedBlobs()
{
// Arrange
int testScanBlobLimitPerPoll = 3;
IBlobListenerStrategy product = new ScanBlobScanLogHybridPollingStrategy(new TestBlobScanInfoManager(), _exceptionHandler, NullLogger<BlobListener>.Instance);
LambdaBlobTriggerExecutor executor = new LambdaBlobTriggerExecutor();
typeof(ScanBlobScanLogHybridPollingStrategy)
.GetField("_scanBlobLimitPerPoll", BindingFlags.Instance | BindingFlags.NonPublic)
.SetValue(product, testScanBlobLimitPerPoll);

// Setup container to have multiple GetBlobsAsync calls to simulate continuation tokens
Uri uri = new Uri("https://fakeaccount.blob.core.windows.net/fakecontainer2");
Mock<BlobContainerClient> containerMock = new Mock<BlobContainerClient>(uri, null);
containerMock.Setup(x => x.Uri).Returns(uri);
containerMock.Setup(x => x.Name).Returns(ContainerName);
containerMock.Setup(x => x.AccountName).Returns(AccountName);

// Create first page of blobs to list from
List<BlobItem> blobItems = new List<BlobItem>();
List<string> expectedNames = new List<string>();
for (int i = 0; i < 5; i++)
{
DateTimeOffset lastModified = DateTimeOffset.UtcNow.AddMinutes(-10 * i);
expectedNames.Add(CreateBlobAndUploadToContainer(containerMock, blobItems, lastModified: lastModified));
}
// Create second page
List<BlobItem> blobItemsPage2 = new List<BlobItem>();
for (int i = 0; i < 3; i++)
{
DateTimeOffset lastModified = DateTimeOffset.UtcNow.AddMinutes(-5 * i);
expectedNames.Add(CreateBlobAndUploadToContainer(containerMock, blobItemsPage2, lastModified: lastModified));
}

// Add at least one blob that has a LastModifiedTime that goes beyond the start time of polling
DateTimeOffset lastModifiedAfterStartPolling = DateTimeOffset.UtcNow.AddSeconds(5);
string blobNameWithLmtAfterStartedPolling = CreateBlobAndUploadToContainer(containerMock, blobItemsPage2, lastModified: lastModifiedAfterStartPolling);

// Update all the blobs in the second listing, that way they get detected again in the second polling
List<BlobItem> blobItemsUpdated = new List<BlobItem>();
List<string> secondSetExpectedNames = new List<string>();
for (int i = 0; i < 4; i++)
{
// Create LastModified to be after the LMT of the blob that was beyond the start of the polling time to test if blobs created after that time will also be detected
secondSetExpectedNames.Add(CreateBlobAndUploadToContainer(containerMock, blobItemsUpdated, lastModified: lastModifiedAfterStartPolling.AddSeconds(-2)));
}
// Add blob with LMT after polling started to the second polling expected names.
secondSetExpectedNames.Add(blobNameWithLmtAfterStartedPolling);

// Set up GetBlobsAsync to return pages with continuation token for each page, but not at the end of each polling
containerMock.SetupSequence(x => x.GetBlobsAsync(It.IsAny<BlobTraits>(), It.IsAny<BlobStates>(), It.IsAny<string>(), It.IsAny<CancellationToken>()))
// First polling
.Returns(() =>
{
return new TestAsyncPageableWithContinuationToken(blobItems, true);
})
.Returns(() =>
{
return new TestAsyncPageableWithContinuationToken(blobItemsPage2, false);
})
// Second polling
.Returns(() =>
{
return new TestAsyncPageableWithContinuationToken(blobItemsUpdated, true);
})
.Returns(() =>
{
return new TestAsyncPageableWithContinuationToken(blobItemsPage2, false);
});

// Register the container to initialize _scanInfo
await product.RegisterAsync(_blobClientMock.Object, containerMock.Object, executor, CancellationToken.None);

// Act / Assert - First Polling
RunExecuteWithMultiPollingInterval(expectedNames, product, executor, blobItems.Count);

// Wait 5 seconds to ensure that the blob with LMT after polling started is detected as a new blob.
await Task.Delay(TimeSpan.FromSeconds(5));

// Act / Assert - Second Polling
// We expect all the blobs we updated above to be detected and the blob that was created after the first polling started that wasn't detected
// to be now detected in this polling.
RunExecuteWithMultiPollingInterval(secondSetExpectedNames, product, executor, blobItemsUpdated.Count);
}

private void RunExecuterWithExpectedBlobsInternal(IDictionary<string, int> blobNameMap, IBlobListenerStrategy product, LambdaBlobTriggerExecutor executor, int expectedCount)
{
if (blobNameMap.Count == 0)
Expand Down Expand Up @@ -585,23 +671,23 @@ public Task<FunctionResult> ExecuteAsync(BlobTriggerExecutorContext value, Cance

private class TestBlobScanInfoManager : IBlobScanInfoManager
{
private IDictionary<string, IDictionary<string, DateTime>> _latestScans;
private IDictionary<string, IDictionary<string, DateTimeOffset>> _latestScans;

public TestBlobScanInfoManager()
{
_latestScans = new Dictionary<string, IDictionary<string, DateTime>>();
_latestScans = new Dictionary<string, IDictionary<string, DateTimeOffset>>();
UpdateCounts = new Dictionary<string, IDictionary<string, int>>();
}

public IDictionary<string, IDictionary<string, int>> UpdateCounts { get; private set; }

public Task<DateTime?> LoadLatestScanAsync(string storageAccountName, string containerName)
public Task<DateTimeOffset?> LoadLatestScanAsync(string storageAccountName, string containerName)
{
DateTime? value = null;
IDictionary<string, DateTime> accounts;
DateTimeOffset? value = null;
IDictionary<string, DateTimeOffset> accounts;
if (_latestScans.TryGetValue(storageAccountName, out accounts))
{
DateTime latestScan;
DateTimeOffset latestScan;
if (accounts.TryGetValue(containerName, out latestScan))
{
value = latestScan;
Expand All @@ -611,20 +697,20 @@ public TestBlobScanInfoManager()
return Task.FromResult(value);
}

public Task UpdateLatestScanAsync(string storageAccountName, string containerName, DateTime latestScan)
public Task UpdateLatestScanAsync(string storageAccountName, string containerName, DateTimeOffset latestScan)
{
SetScanInfo(storageAccountName, containerName, latestScan);
IncrementCount(storageAccountName, containerName);
return Task.FromResult(0);
}

public void SetScanInfo(string storageAccountName, string containerName, DateTime latestScan)
public void SetScanInfo(string storageAccountName, string containerName, DateTimeOffset latestScan)
{
IDictionary<string, DateTime> containers;
IDictionary<string, DateTimeOffset> containers;

if (!_latestScans.TryGetValue(storageAccountName, out containers))
{
_latestScans[storageAccountName] = new Dictionary<string, DateTime>();
_latestScans[storageAccountName] = new Dictionary<string, DateTimeOffset>();
containers = _latestScans[storageAccountName];
}

Expand Down
Loading