Switch spec from docker to locally run azurite instance#207
Switch spec from docker to locally run azurite instance#207Aaronontheweb merged 9 commits intopetabridge:devfrom
Conversation
| tags: tags); | ||
|
|
||
| batchItems = batchItems.Add(newItem); | ||
| batchItems.Add(new TableTransactionAction(TableTransactionActionType.Add, newItem.WriteEntity())); |
There was a problem hiding this comment.
New SDK way of building a batch data CRUD, everything is encapsulated inside a transaction-like batch
| filter: $"PartitionKey eq '{AllPersistenceIdsEntry.PartitionKeyValue}'", | ||
| maxPerPage: maxPerPage, | ||
| @select: new[] { "RowKey" }, | ||
| cancellationToken: cancellationToken |
There was a problem hiding this comment.
The biggest change in the SDK, it doesn't use an awkward expression builder anymore, it actually takes an OData query directly. We could also use a strongly typed Expression<Func<bool>> linq-like builder, but that turns out to be problematic for building string comparison queries.
There was a problem hiding this comment.
A little cheat-sheet on OData:
eqis equalsneis not equalsltis less thanleis less than or equalgtis greater thangeis greater than or equal
| $"RowKey le '{toSequenceNr.ToJournalRowKey()}'", | ||
| maxPerPage: maxPerPage, | ||
| @select: null, | ||
| cancellationToken: cancellationToken); |
There was a problem hiding this comment.
Comparing this and the old way of building queries, you can see that this is a lot more clear and concise.
| } | ||
| } | ||
|
|
||
| private async Task<bool> IsTableExist(string name, CancellationToken token) |
There was a problem hiding this comment.
The Table.Exists method was removed by design in the new SDK, so we need to roll our own here.
|
|
||
| namespace Akka.Persistence.Azure.Journal | ||
| { | ||
| internal sealed class HighestSequenceNrEntry |
There was a problem hiding this comment.
This class is a duplicate of Akka.Persistence.Azure.TableEntities.HighestSequenceNrEntry, deleting it so that it doesn't cause any bug in the future.
|
|
||
| // In order to use this in a TableQuery a parameterless constructor is required | ||
| public AllPersistenceIdsEntry() | ||
| public AllPersistenceIdsEntry(TableEntity entity) |
There was a problem hiding this comment.
Instead of relying on the SDK deserialization code, we will use the EventHubs generic TableEntity serialization and populate our own data from it.
By doing this, we can convert all of the table entity data model back to immutable classes.
There was a problem hiding this comment.
I think you mean Table Storage generic serialization, but your point is well taken.
| public string[] Tags { get; set; } | ||
|
|
||
| public DateTimeOffset Timestamp { get; set; } | ||
| public string[] Tags { get; } |
There was a problem hiding this comment.
This property is the main reason why I opted not to use the built-in serializer. In the strongly typed json based serializer, this would be saved as a JSON array, while we traditionally have been saving it as a CSV. Trying to coerce the internal JSON serializer to serialize this as CSV would be a mess.
Aaronontheweb
left a comment
There was a problem hiding this comment.
Not quite done reviewing yet, but left some thoughts / questions.
Very interested in seeing if we can get #129 (comment) resolved as well.
| Initialize(); | ||
| } | ||
|
|
||
| [WindowsFact(SkipUnixReason = "Batch delete is not supported by Azurite in Linux")] |
|
|
||
| public static Config AzureConfig(Func<string, Config> configTemplate) | ||
| { | ||
| var connString = Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR"); |
| foreach (var r in eventTagsResponse.Value) | ||
| _log.Debug("Azure table storage wrote entity with status code [{1}]", r.Status); | ||
|
|
||
| if (HasTagSubscribers && taggedEntries.Count != 0) |
There was a problem hiding this comment.
Should we get rid of the tag subscribers stuff inside the write loop? Seems out of place given that we still have to poll for changes on the table to detect changes that happen exclusively on other nodes in the network. This is something that got copied and pasted into a lot of journals but it was only ever really appropriate for SQLite / in-memory.
There was a problem hiding this comment.
We should, should we open another issue and PR for this?
| * Either everything fails or everything succeeds is the idea I guess. | ||
| */ | ||
| return exceptions.Any(ex => ex != null) ? exceptions : null; | ||
| return exceptions.Any(ex => ex != null) ? exceptions.ToImmutableList() : null; |
The query will grab all persistence id entries filtered to their last row key, then the code will compare all of those entries for the highest sequence number. In theory, this will be slow for journal tables with a lot of persistence ids. |
Yes, the hack shaves some query over HTTP latency for the next page in the query. |
| var returnValue = new TableQuery<HighestSequenceNrEntry>().Where(filter); | ||
|
|
||
| return returnValue; | ||
| return Table.QueryAsync<TableEntity>( |
There was a problem hiding this comment.
This is a constant time query since it looks up a specific table entry for each entity that stores the highestSequenceNr
| get | ||
| { | ||
| if (_tableStorage_DoNotUseDirectly == null) | ||
| throw new Exception("Table storage has not been initialized yet. PreStart() has not been invoked"); |
| foreach (var r in eventTagsResponse.Value) | ||
| _log.Debug("Azure table storage wrote entity with status code [{1}]", r.Status); | ||
|
|
||
| if (HasTagSubscribers && taggedEntries.Count != 0) |
| var encodedKey = PartitionKeyEscapeHelper.Escape(x.Key); | ||
| allPersistenceIdsBatch.InsertOrReplace(new AllPersistenceIdsEntry(encodedKey)); | ||
| }); | ||
| allPersistenceIdsBatch.Add(new TableTransactionAction( |
There was a problem hiding this comment.
Should add a comment here explaining that this is updating a specific object that stores the highest seqNo used in recovery
|
|
||
| // In order to use this in a TableQuery a parameterless constructor is required | ||
| public AllPersistenceIdsEntry() | ||
| public AllPersistenceIdsEntry(TableEntity entity) |
There was a problem hiding this comment.
I think you mean Table Storage generic serialization, but your point is well taken.
| Timestamp = Timestamp, | ||
| [PayloadKeyName] = Payload, | ||
| [SeqNoKeyName] = SeqNo, | ||
| [TagsKeyName] = string.Join(";", Tags), |
There was a problem hiding this comment.
Looks the same as before, good job
Windows.Azure.Cosmos.Tablepackage toAzure.Data.TablesNOTE:
Will fail until akkadotnet/akka.net#5849 is merged and released