Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions pallets/tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub mod pallet {
fn set_send_message_task_reward() -> Weight;
fn cancel_task() -> Weight;
fn reset_tasks() -> Weight;
fn set_shard_task_limit() -> Weight;
fn unregister_gateways() -> Weight;
}

Expand Down Expand Up @@ -86,6 +87,10 @@ pub mod pallet {
Weight::default()
}

fn set_shard_task_limit() -> Weight {
Weight::default()
}

fn unregister_gateways() -> Weight {
Weight::default()
}
Expand Down Expand Up @@ -135,6 +140,11 @@ pub mod pallet {
pub type UnassignedTasks<T: Config> =
StorageDoubleMap<_, Blake2_128Concat, NetworkId, Blake2_128Concat, TaskId, (), OptionQuery>;

#[pallet::storage]
#[pallet::getter(fn shard_task_limit)]
pub type ShardTaskLimit<T: Config> =
StorageMap<_, Blake2_128Concat, NetworkId, u32, OptionQuery>;

#[pallet::storage]
pub type ShardTasks<T: Config> =
StorageDoubleMap<_, Blake2_128Concat, ShardId, Blake2_128Concat, TaskId, (), OptionQuery>;
Expand Down Expand Up @@ -253,6 +263,8 @@ pub mod pallet {
WriteTaskRewardSet(NetworkId, BalanceOf<T>),
/// Send message task reward set for network
SendMessageTaskRewardSet(NetworkId, BalanceOf<T>),
/// Set the maximum number of assigned tasks for all shards on the network
ShardTaskLimitSet(NetworkId, u32),
}

#[pallet::error]
Expand Down Expand Up @@ -487,7 +499,7 @@ pub mod pallet {
Ok(())
}

#[pallet::call_index(10)]
#[pallet::call_index(9)]
#[pallet::weight(<T as Config>::WeightInfo::cancel_task())]
pub fn sudo_cancel_tasks(origin: OriginFor<T>) -> DispatchResult {
ensure_root(origin)?;
Expand All @@ -502,7 +514,7 @@ pub mod pallet {
Ok(())
}

#[pallet::call_index(9)]
#[pallet::call_index(10)]
#[pallet::weight(<T as Config>::WeightInfo::reset_tasks())]
pub fn reset_tasks(origin: OriginFor<T>) -> DispatchResult {
ensure_root(origin)?;
Expand All @@ -524,6 +536,19 @@ pub mod pallet {
}

#[pallet::call_index(11)]
#[pallet::weight(<T as Config>::WeightInfo::set_shard_task_limit())]
pub fn set_shard_task_limit(
origin: OriginFor<T>,
network: NetworkId,
limit: u32,
) -> DispatchResult {
ensure_root(origin)?;
ShardTaskLimit::<T>::insert(network, limit);
Self::deposit_event(Event::ShardTaskLimitSet(network, limit));
Ok(())
}

#[pallet::call_index(12)]
#[pallet::weight(<T as Config>::WeightInfo::unregister_gateways())]
pub fn unregister_gateways(origin: OriginFor<T>) -> DispatchResult {
ensure_root(origin)?;
Expand Down Expand Up @@ -829,10 +854,17 @@ pub mod pallet {
}

fn schedule_tasks_shard(network: NetworkId, shard_id: ShardId) {
let tasks = ShardTasks::<T>::iter_prefix(shard_id).count();
let tasks = ShardTasks::<T>::iter_prefix(shard_id)
.filter(|(t, _)| TaskOutput::<T>::get(t).is_none())
.count();
let shard_size = T::Shards::shard_members(shard_id).len() as u16;
let is_registered = ShardRegistered::<T>::get(shard_id).is_some();
let capacity = 10.saturating_sub(tasks);
let shard_task_limit = ShardTaskLimit::<T>::get(network).unwrap_or(10) as usize;
let capacity = shard_task_limit.saturating_sub(tasks);
if capacity.is_zero() {
// no new tasks assigned if capacity reached or exceeded
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already handle the zero case later no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We take(capacity) below after filtering for tasks that are assignable. I think returning early is better if capacity is 0 rather than going through all the tasks just to take 0 of them afterwards.

let tasks = UnassignedTasks::<T>::iter_prefix(network)
.filter(|(task_id, _)| {
let Some(task) = Tasks::<T>::get(task_id) else { return false };
Expand Down
59 changes: 57 additions & 2 deletions pallets/tasks/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::mock::*;
use crate::{
Error, Event, Gateway, NetworkReadReward, NetworkSendMessageReward, NetworkShards,
NetworkWriteReward, ShardRegistered, ShardTasks, SignerPayout, TaskHash, TaskIdCounter,
TaskOutput, TaskPhaseState, TaskRewardConfig, TaskSignature, TaskSigner, UnassignedTasks,
NetworkWriteReward, ShardRegistered, ShardTaskLimit, ShardTasks, SignerPayout, TaskHash,
TaskIdCounter, TaskOutput, TaskPhaseState, TaskRewardConfig, TaskSignature, TaskSigner,
UnassignedTasks,
};
use frame_support::traits::Get;
use frame_support::{assert_noop, assert_ok};
Expand Down Expand Up @@ -1591,6 +1592,24 @@ fn register_gateway_fails_previous_shard_registration_tasks() {
});
}

#[test]
fn set_shard_task_limit_updates_storage_and_emits_event() {
new_test_ext().execute_with(|| {
Shards::create_shard(
ETHEREUM,
[[0u8; 32].into(), [1u8; 32].into(), [2u8; 32].into()].to_vec(),
1,
);
assert_eq!(ShardTaskLimit::<Test>::get(ETHEREUM), None);
assert_ok!(Tasks::set_shard_task_limit(RawOrigin::Root.into(), ETHEREUM, 5));
assert_eq!(ShardTaskLimit::<Test>::get(ETHEREUM), Some(5));
System::assert_last_event(Event::<Test>::ShardTaskLimitSet(ETHEREUM, 5).into());
assert_ok!(Tasks::set_shard_task_limit(RawOrigin::Root.into(), ETHEREUM, 50));
assert_eq!(ShardTaskLimit::<Test>::get(ETHEREUM), Some(50));
System::assert_last_event(Event::<Test>::ShardTaskLimitSet(ETHEREUM, 50).into());
});
}

#[test]
fn cancel_task_sets_task_output_to_err() {
new_test_ext().execute_with(|| {
Expand Down Expand Up @@ -1618,6 +1637,42 @@ fn cancel_task_sets_task_output_to_err() {
});
}

#[test]
fn set_shard_task_limit_successfully_limits_task_assignment() {
new_test_ext().execute_with(|| {
Shards::create_shard(
ETHEREUM,
[[0u8; 32].into(), [1u8; 32].into(), [2u8; 32].into()].to_vec(),
1,
);
ShardState::<Test>::insert(0, ShardStatus::Online);
Tasks::shard_online(0, ETHEREUM);
for _ in 0..5 {
assert_ok!(Tasks::create_task(
RawOrigin::Signed([0; 32].into()).into(),
mock_task(ETHEREUM)
));
}
assert_eq!(ShardTasks::<Test>::iter_prefix(0).count(), 5);
assert_eq!(UnassignedTasks::<Test>::iter().collect::<Vec<_>>().len(), 0);
assert_ok!(Tasks::set_shard_task_limit(RawOrigin::Root.into(), ETHEREUM, 5));
assert_ok!(Tasks::create_task(
RawOrigin::Signed([0; 32].into()).into(),
mock_task(ETHEREUM)
));
assert_eq!(ShardTasks::<Test>::iter_prefix(0).count(), 5);
assert_eq!(UnassignedTasks::<Test>::iter().collect::<Vec<_>>().len(), 1);
assert_ok!(Tasks::set_shard_task_limit(RawOrigin::Root.into(), ETHEREUM, 6));
assert_ok!(Tasks::create_task(
RawOrigin::Signed([0; 32].into()).into(),
mock_task(ETHEREUM)
));
assert_eq!(ShardTasks::<Test>::iter_prefix(0).count(), 6);
assert_eq!(UnassignedTasks::<Test>::iter().collect::<Vec<_>>().len(), 1);
assert_ok!(Tasks::set_shard_task_limit(RawOrigin::Root.into(), ETHEREUM, 6));
});
}

#[test]
fn unregister_gateways_removes_all_gateways_and_shard_registrations() {
new_test_ext().execute_with(|| {
Expand Down
4 changes: 4 additions & 0 deletions runtime/src/weights/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,4 +237,8 @@ impl<T: frame_system::Config> pallet_tasks::WeightInfo for WeightInfo<T> {
fn unregister_gateways() -> Weight {
Weight::from_parts(0, 0)
}

fn set_shard_task_limit() -> Weight {
Weight::from_parts(0, 0)
}
}