diff --git a/Cargo.lock b/Cargo.lock index 7750fe9..4b5e652 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -523,9 +523,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.46" +version = "1.0.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94e2ef8dbfc347b10c094890f778ee2e36ca9bb4262e86dc99cd217e35f3470b" +checksum = "5ea3d908b0e36316caf9e9e2c4625cdde190a7e6f440d794667ed17a1855e725" dependencies = [ "unicode-ident", ] @@ -628,9 +628,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "syn" -version = "1.0.101" +version = "1.0.105" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e90cde112c4b9690b8cbe810cba9ddd8bc1d7472e2cae317b69e9438c1cba7d2" +checksum = "60b9b43d45702de4c839cb9b51d9f529c5dd26a4aff255b42b1ebc03e88ee908" dependencies = [ "proc-macro2", "quote", diff --git a/orchestra/Cargo.toml b/orchestra/Cargo.toml index e17b415..0949da5 100644 --- a/orchestra/Cargo.toml +++ b/orchestra/Cargo.toml @@ -29,7 +29,7 @@ name = "duo" crate-type = ["bin"] [[example]] -name = "solo" +name = "dig" crate-type = ["bin"] [features] diff --git a/orchestra/examples/solo.rs b/orchestra/examples/dig.rs similarity index 67% rename from orchestra/examples/solo.rs rename to orchestra/examples/dig.rs index 13f2df1..6bc4795 100644 --- a/orchestra/examples/solo.rs +++ b/orchestra/examples/dig.rs @@ -24,36 +24,57 @@ mod misc; pub use self::misc::*; #[orchestra(signal=SigSigSig, event=EvX, error=Yikes, gen=AllMessages)] -struct Solo { - #[subsystem(consumes: Plinko, sends: [MsgStrukt])] +struct Dig { + #[subsystem(consumes: Plinko)] goblin_tower: GoblinTower, + + #[subsystem(sends: [Plinko])] + goldmine: Goldmine, } +use self::messages::*; + #[derive(Default)] pub struct Fortified; #[orchestra::subsystem(GoblinTower, error=Yikes)] impl Fortified { fn start(self, mut ctx: Context) -> SpawnedSubsystem { - let mut sender = ctx.sender().clone(); - ctx.spawn( - "GoblinTower", - Box::pin(async move { - sender.send_message(MsgStrukt(8u8)).await; + SpawnedSubsystem { + name: "GoblinTower", + future: Box::pin(async move { + while let Ok(FromOrchestra::Communication { msg: _ }) = ctx.recv().await { + println!("Look a plinko!") + } + Ok(()) }), - ) - .unwrap(); - unimplemented!("welcum") + } + } +} + +#[derive(Default)] +pub struct DragonsLair; + +#[orchestra::subsystem(Goldmine, error=Yikes)] +impl DragonsLair { + fn start(self, mut ctx: Context) -> SpawnedSubsystem { + let mut sender = ctx.sender().clone(); + let future = Box::pin(async move { + sender.send_message(Plinko).await; + Ok(()) + }); + + SpawnedSubsystem { name: "RedThorntail", future } } } async fn setup() { - let builder = Solo::builder(); + let builder = Dig::builder(); let builder = builder.goblin_tower(Fortified::default()); - + let builder = builder.goldmine(DragonsLair::default()); let builder = builder.spawner(DummySpawner); - let (orchestra, _handle): (Solo<_>, _) = builder.build().unwrap(); + let (orchestra, _handle) = builder.build().unwrap(); let orchestra_fut = orchestra .running_subsystems diff --git a/orchestra/proc-macro/Cargo.toml b/orchestra/proc-macro/Cargo.toml index d6184ac..2968ea7 100644 --- a/orchestra/proc-macro/Cargo.toml +++ b/orchestra/proc-macro/Cargo.toml @@ -14,9 +14,9 @@ targets = ["x86_64-unknown-linux-gnu"] proc-macro = true [dependencies] -syn = { version = "1.0.95", features = ["full", "extra-traits"] } +syn = { version = "1.0.105", features = ["full", "extra-traits"] } quote = "1.0.20" -proc-macro2 = "1.0.43" +proc-macro2 = { version = "1.0.47", features = ["span-locations"] } proc-macro-crate = "1.1.3" expander = { version = "0.0.6", default-features = false } petgraph = "0.6.0" diff --git a/orchestra/proc-macro/src/graph.rs b/orchestra/proc-macro/src/graph.rs index 13406ef..aeb4807 100644 --- a/orchestra/proc-macro/src/graph.rs +++ b/orchestra/proc-macro/src/graph.rs @@ -60,10 +60,12 @@ impl<'a> ConnectionGraph<'a> { for outgoing in ssf.messages_to_send.iter() { outgoing_lut.entry(outgoing).or_default().push((&ssf.generic, node_index)); } - if let Some(_first_consument) = - consuming_lut.insert(&ssf.message_to_consume, (&ssf.generic, node_index)) - { - // bail, two subsystems consuming the same message + if let Some(ref consumes) = ssf.message_to_consume { + if let Some(_first_consument) = + consuming_lut.insert(consumes, (&ssf.generic, node_index)) + { + // bail, two subsystems consuming the same message + } } } diff --git a/orchestra/proc-macro/src/impl_builder.rs b/orchestra/proc-macro/src/impl_builder.rs index 1a88dc9..d96261f 100644 --- a/orchestra/proc-macro/src/impl_builder.rs +++ b/orchestra/proc-macro/src/impl_builder.rs @@ -40,7 +40,7 @@ pub(crate) fn impl_builder(info: &OrchestraInfo) -> proc_macro2::TokenStream { let subsystem_generics = &info.subsystem_generic_types(); let consumes = &info.consumes_without_wip(); - let channel_name = &info.channel_names_without_wip(""); + let channel_name = &info.channel_names_without_wip(None); let channel_name_unbounded = &info.channel_names_without_wip("_unbounded"); let channel_name_tx = &info.channel_names_without_wip("_tx"); @@ -107,7 +107,7 @@ pub(crate) fn impl_builder(info: &OrchestraInfo) -> proc_macro2::TokenStream { info.subsystems().iter().filter(|ssf| !ssf.wip).enumerate().map(|(idx, ssf)| { let field_name = &ssf.name; let field_type = &ssf.generic; - let subsystem_consumes = &ssf.message_to_consume; + let subsystem_consumes = &ssf.message_to_consume(); // Remove state generic for the item to be replaced. It sufficient to know `field_type` for // that since we always move from `Init<#field_type>` to `Init`. let impl_subsystem_state_generics = recollect_without_idx(&subsystem_passthrough_state_generics[..], idx); @@ -314,7 +314,7 @@ pub(crate) fn impl_builder(info: &OrchestraInfo) -> proc_macro2::TokenStream { .iter() .map(|ssf| { let field_type = &ssf.generic; - let consumes = &ssf.message_to_consume; + let consumes = &ssf.message_to_consume(); let subsystem_sender_trait = format_ident!("{}SenderTrait", ssf.generic); let subsystem_ctx_trait = format_ident!("{}ContextTrait", ssf.generic); quote! { diff --git a/orchestra/proc-macro/src/impl_channels_out.rs b/orchestra/proc-macro/src/impl_channels_out.rs index 5b694d1..1eb3771 100644 --- a/orchestra/proc-macro/src/impl_channels_out.rs +++ b/orchestra/proc-macro/src/impl_channels_out.rs @@ -22,7 +22,7 @@ use super::*; pub(crate) fn impl_channels_out_struct(info: &OrchestraInfo) -> Result { let message_wrapper = info.message_wrapper.clone(); - let channel_name = &info.channel_names_without_wip(""); + let channel_name = &info.channel_names_without_wip(None); let channel_name_unbounded = &info.channel_names_without_wip("_unbounded"); let consumes = &info.consumes_without_wip(); diff --git a/orchestra/proc-macro/src/impl_message_wrapper.rs b/orchestra/proc-macro/src/impl_message_wrapper.rs index 3c2f999..25d8d75 100644 --- a/orchestra/proc-macro/src/impl_message_wrapper.rs +++ b/orchestra/proc-macro/src/impl_message_wrapper.rs @@ -13,6 +13,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; + +use itertools::Itertools; use quote::quote; use syn::{spanned::Spanned, Result}; @@ -50,7 +53,7 @@ pub(crate) fn impl_message_wrapper_enum(info: &OrchestraInfo) -> Result Result::from_iter( + info.subsystems().iter().map(|ssf| ssf.messages_to_send.iter()).flatten(), + ); + let incoming = HashSet::<&Path>::from_iter( + info.subsystems().iter().filter_map(|ssf| ssf.message_to_consume.as_ref()), + ); + + // Try to maintain the ordering according to the span start in the declaration. + fn cmp<'p, 'q>(a: &'p &&Path, b: &'q &&Path) -> std::cmp::Ordering { + a.span() + .start() + .partial_cmp(&b.span().start()) + .unwrap_or(std::cmp::Ordering::Equal) + } + + // sent but not received + for sbnr in outgoing.difference(&incoming).sorted_by(cmp) { + ts.extend( + syn::Error::new( + sbnr.span(), + format!( + "Message `{}` is sent but never received", + sbnr.get_ident() + .expect("Message is a path that must end in an identifier. qed") + ), + ) + .to_compile_error(), + ); + } + + // received but not sent + for rbns in incoming.difference(&outgoing).sorted_by(cmp) { + ts.extend( + syn::Error::new( + rbns.span(), + format!( + "Message `{}` is received but never sent", + rbns.get_ident() + .expect("Message is a path that must end in an identifier. qed") + ), + ) + .to_compile_error(), + ); + } + Ok(ts) } diff --git a/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs b/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs index 58e95ca..1fb3f49 100644 --- a/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs +++ b/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs @@ -114,6 +114,7 @@ pub(crate) fn impl_subsystem_types_all(info: &OrchestraInfo) -> Result Result, /// Types of messages to be sent by the subsystem. pub(crate) messages_to_send: Vec, /// If the subsystem implementation is blocking execution and hence @@ -107,6 +107,41 @@ pub(crate) struct SubSysField { pub(crate) wip: bool, } +impl SubSysField { + pub(crate) fn dummy_msg_name(&self) -> Ident { + Ident::new(format!("{}Message", self.generic).as_str(), self.name.span()) + } + + /// Returns either the specified to be consumed messsage + /// or the generated dummy message. + pub(crate) fn message_to_consume(&self) -> Path { + if let Some(ref consumes) = self.message_to_consume { + consumes.clone() + } else { + Path::from(self.dummy_msg_name()) + } + } + + /// Generate the dummy message type if the subsystem does not consume one + /// + /// Note: Only required to the internal structure anchoring everything to + /// the consuming message type. See #11 for a full solution. + pub(crate) fn gen_dummy_message_ty(&self) -> TokenStream { + if self.message_to_consume.is_none() { + let dummy_msg_ident = self.dummy_msg_name(); + quote! { + #[doc = + r###"A dummy implementation to satisfy the current internal structure + and cannot be constructed delibarately, since it's not meant to be sent or used at all"###] + #[derive(Debug, Clone, Copy)] + pub enum #dummy_msg_ident {} + } + } else { + TokenStream::new() + } + } +} + // Converts a type enum to a path if this type is a TypePath fn try_type_to_path(ty: &Type, span: Span) -> Result { match ty { @@ -425,13 +460,14 @@ impl OrchestraInfo { } pub(crate) fn any_message(&self) -> Vec { - self.subsystems - .iter() - .map(|ssf| ssf.message_to_consume.clone()) - .collect::>() + self.subsystems.iter().map(|ssf| ssf.message_to_consume()).collect::>() } - pub(crate) fn channel_names_without_wip(&self, suffix: &'static str) -> Vec { + pub(crate) fn channel_names_without_wip( + &self, + suffix: impl Into>, + ) -> Vec { + let suffix = suffix.into().unwrap_or(""); self.subsystems .iter() .filter(|ssf| !ssf.wip) @@ -443,7 +479,7 @@ impl OrchestraInfo { self.subsystems .iter() .filter(|ssf| !ssf.wip) - .map(|ssf| ssf.message_to_consume.clone()) + .map(|ssf| ssf.message_to_consume()) .collect::>() } } @@ -487,8 +523,8 @@ impl OrchestraGuts { ) })?; - // a `#[subsystem(..)]` annotation exists if let Some((attr_tokens, span)) = subsystem_attr.next() { + // a `#[subsystem(..)]` annotation exists if let Some((_attr_tokens2, span2)) = subsystem_attr.next() { return Err({ let mut err = Error::new(span, "The first subsystem annotation is at"); @@ -528,12 +564,7 @@ impl OrchestraGuts { } else { vec![] }; - // messages deemed for consumption - let consumes = if let Some(consumes) = consumes { - consumes.consumes - } else { - return Err(Error::new(span, "Must provide exactly one consuming message type")) - }; + let consumes = consumes.map(|consumes| consumes.consumes); subsystems.push(SubSysField { name: ident, @@ -544,6 +575,7 @@ impl OrchestraGuts { blocking, }); } else { + // collect the "baggage" let flattened = flatten_type(&ty, ident.span())?; let generic_types = flattened .iter()