Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
added spec and bugfixes
  • Loading branch information
Aaronontheweb committed Sep 16, 2022
commit 0222be1d1e02187a6d7e3bb85764a29cc008a74a
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
//-----------------------------------------------------------------------
// <copyright file="ShardRegionQueriesSpecs.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.Tools.Singleton;
using Akka.Configuration;
using Akka.TestKit;
using Akka.TestKit.TestActors;
using Akka.Util;
using Xunit;
using Xunit.Abstractions;
using FluentAssertions;

namespace Akka.Cluster.Sharding.Tests
{
public class ShardRegionQueriesSpecs : AkkaSpec
{
private Cluster _cluster;
private ClusterSharding _clusterSharding;
private IActorRef _shardRegion;

private ActorSystem _proxySys;

public ShardRegionQueriesSpecs(ITestOutputHelper outputHelper) : base(GetConfig(), outputHelper)
{
_clusterSharding = ClusterSharding.Get(Sys);
_cluster = Cluster.Get(Sys);
_shardRegion = _clusterSharding.Start("entity", s => EchoActor.Props(this, true),
ClusterShardingSettings.Create(Sys).WithRole("shard"), ExtractEntityId, ExtractShardId);

var proxySysConfig = ConfigurationFactory.ParseString("akka.cluster.roles = [proxy]")
.WithFallback(Sys.Settings.Config);
_proxySys = ActorSystem.Create(Sys.Name, proxySysConfig);

_cluster.Join(_cluster.SelfAddress);
AwaitAssert(() =>
{
_cluster.SelfMember.Status.ShouldBe(MemberStatus.Up);
});

// form a 2-node cluster
var proxyCluster = Cluster.Get(_proxySys);
proxyCluster.Join(_cluster.SelfAddress);
AwaitAssert(() =>
{
proxyCluster.SelfMember.Status.ShouldBe(MemberStatus.Up);
});
}

protected override void AfterAll()
{
Shutdown(_proxySys);
base.AfterAll();
}

private Option<(string, object)> ExtractEntityId(object message)
{
switch (message)
{
case int i:
return (i.ToString(), message);
}
throw new NotSupportedException();
}

private string ExtractShardId(object message)
{
switch (message)
{
case int i:
return (i % 10).ToString();
case ShardRegion.StartEntity se:
return se.EntityId;
}
throw new NotSupportedException();
}

private static Config GetConfig()
{
return ConfigurationFactory.ParseString(@"
akka.loglevel = WARNING
akka.actor.provider = cluster
akka.remote.dot-netty.tcp.port = 0
akka.cluster.roles = [shard]")

.WithFallback(Sharding.ClusterSharding.DefaultConfig())
.WithFallback(DistributedData.DistributedData.DefaultConfig())
.WithFallback(ClusterSingletonManager.DefaultConfig());
}

[Fact(DisplayName = "ShardRegion should support GetEntityLocation queries locally")]
public async Task ShardRegion_should_support_GetEntityLocation_query()
{
// arrange
await _shardRegion.Ask<int>(1, TimeSpan.FromSeconds(3));
await _shardRegion.Ask<int>(2, TimeSpan.FromSeconds(3));

// act
var q1 = await _shardRegion.Ask<EntityLocation>(new GetEntityLocation("1", TimeSpan.FromSeconds(1)));
var q2 = await _shardRegion.Ask<EntityLocation>(new GetEntityLocation("2", TimeSpan.FromSeconds(1)));
var q3 = await _shardRegion.Ask<EntityLocation>(new GetEntityLocation("3", TimeSpan.FromSeconds(1)));

// assert
void AssertValidEntityLocation(EntityLocation e, string entityId)
{
e.EntityId.Should().Be(entityId);
e.EntityRef.Should().NotBe(Option<IActorRef>.None);
e.ShardId.Should().NotBeNullOrEmpty();
e.ShardRegion.Should().Be(_cluster.SelfAddress);
}

AssertValidEntityLocation(q1, "1");
AssertValidEntityLocation(q2, "2");

q3.EntityRef.Should().Be(Option<IActorRef>.None);
q3.ShardId.Should().NotBeNullOrEmpty(); // should still have computed a valid shard?
q3.ShardRegion.Should().Be(Address.AllSystems);
}

}
}
93 changes: 54 additions & 39 deletions src/contrib/cluster/Akka.Cluster.Sharding/ShardRegion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -936,56 +936,71 @@ Address GetNodeAddress(IActorRef shardOrRegionRef)
{
return shardOrRegionRef.Path.Address.HasGlobalScope ? shardOrRegionRef.Path.Address : Cluster.SelfAddress;
}

var shardId = this.ExtractShardId(getEntityLocation.EntityId);
if (string.IsNullOrEmpty(shardId))
{
// unsupported entityId - could only happen in highly customized extractors
sender.Tell(new EntityLocation(getEntityLocation.EntityId, shardId, Address.AllSystems, Option<IActorRef>.None));
return;
}

if(!Shards.TryGetValue(shardId, out var shardActorRef))

try
{
// shard is not homed yet, so try looking up the ShardRegion
if (!RegionByShard.TryGetValue(shardId, out var shardRegionRef))
var shardId = ExtractShardId(new StartEntity(getEntityLocation.EntityId));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing that is awkward about this query is that the ShardExtractor has to support the ShardRegion.StartEntity type in order for this to work - the reason being is that even though we already know what the entity id is, the extractors that resolve the ShardId are looking for an input message to generate the ShardId, not the EntityId. Therefore I have to smuggle the EnttityId back through a ShardRegion.StartEntity message. That makes this a bit hacky but it's the best compromise I could make.

if (string.IsNullOrEmpty(shardId))
{
// shardRegion isn't allocated either
sender.Tell(new EntityLocation(getEntityLocation.EntityId, shardId, Address.AllSystems, Option<IActorRef>.None));
// unsupported entityId - could only happen in highly customized extractors
sender.Tell(new EntityLocation(getEntityLocation.EntityId, shardId, Address.AllSystems,
Option<IActorRef>.None));
return;
}
else

if (!Shards.TryGetValue(shardId, out var shardActorRef))
{
// ShardRegion exists, but shard is not homed
sender.Tell(new EntityLocation(getEntityLocation.EntityId, shardId, GetNodeAddress(shardRegionRef), Option<IActorRef>.None));
// shard is not homed yet, so try looking up the ShardRegion
if (!RegionByShard.TryGetValue(shardId, out var shardRegionRef))
{
// shardRegion isn't allocated either
sender.Tell(new EntityLocation(getEntityLocation.EntityId, shardId, Address.AllSystems,
Option<IActorRef>.None));
}
else
{
// ShardRegion exists, but shard is not homed
sender.Tell(new EntityLocation(getEntityLocation.EntityId, shardId,
GetNodeAddress(shardRegionRef), Option<IActorRef>.None));
}

return;
}

return;
}

var destinationAddress = GetNodeAddress(shardActorRef);

async Task ResolveEntityRef()
{
// we have a ShardRef - now we just need to check to see if an entity ref exists
// we are going to
var destinationAddress = GetNodeAddress(shardActorRef);

var entityPath = shardActorRef.Path / shardId;

try
{
var entityRef = await Context.ActorSelection(entityPath).ResolveOne(getEntityLocation.Timeout);
sender.Tell(new EntityLocation(getEntityLocation.EntityId, shardId, destinationAddress, new Option<IActorRef>(entityRef)));
}
catch (ActorNotFoundException ex)
async Task ResolveEntityRef()
{
// entity does not exist
sender.Tell(new EntityLocation(getEntityLocation.EntityId, shardId, destinationAddress, Option<IActorRef>.None));
// we have a ShardRef - now we just need to check to see if an entity ref exists
// we are going to

var entityPath = shardActorRef.Path / shardId;

try
{
var entityRef = await Context.ActorSelection(entityPath).ResolveOne(getEntityLocation.Timeout);
sender.Tell(new EntityLocation(getEntityLocation.EntityId, shardId, destinationAddress,
new Option<IActorRef>(entityRef)));
}
catch (ActorNotFoundException ex)
{
// entity does not exist
sender.Tell(new EntityLocation(getEntityLocation.EntityId, shardId, destinationAddress,
Option<IActorRef>.None));
}
}
}

#pragma warning disable CS4014
ResolveEntityRef(); // needs to run as a detached task
#pragma warning disable CS4014
ResolveEntityRef(); // needs to run as a detached task
#pragma warning restore CS4014
}
catch (Exception ex)
{
_log.Error(ex, "Error while trying to resolve GetEntityLocation query for entityId [{0}]", getEntityLocation.EntityId);
// unsupported entityId - could only happen in highly customized extractors
sender.Tell(new EntityLocation(getEntityLocation.EntityId, string.Empty, Address.AllSystems,
Option<IActorRef>.None));
}

}

Expand Down
3 changes: 3 additions & 0 deletions src/contrib/cluster/Akka.Cluster.Sharding/ShardingMessages.cs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ private GetCurrentRegions()
/// </summary>
/// <remarks>
/// This is used primarily for testing and telemetry purposes.
///
/// In order for this query to work, the <see cref="MessageExtractor"/> must support <see cref="ShardRegion.StartEntity"/>,
/// which is also used when remember-entities=on.
/// </remarks>
public sealed class GetEntityLocation : IShardRegionQuery
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This message never gets sent over the wire, so it doesn't require any changes to the wire type nor does it need to support IClusterShardingSerializable.

{
Expand Down