1- use std:: { collections:: HashMap , fmt} ;
1+ use std:: { collections:: HashMap , fmt, sync:: Arc } ;
2+ use tokio:: sync:: Mutex ;
23use tonic:: { transport:: Server , Request , Response , Status } ;
34use tracing:: info;
45
@@ -57,6 +58,7 @@ struct RollkitDock {
5758 client : ikura_rpc:: Client ,
5859 submit_key : Option < Keypair > ,
5960 namespace : Option < ikura_nmt:: Namespace > ,
61+ cur_nonce : Arc < Mutex < Option < u64 > > > ,
6062}
6163
6264impl RollkitDock {
@@ -69,6 +71,7 @@ impl RollkitDock {
6971 client,
7072 submit_key,
7173 namespace,
74+ cur_nonce : Arc :: new ( Mutex :: new ( None ) ) ,
7275 }
7376 }
7477}
@@ -91,18 +94,16 @@ impl da_service_server::DaService for RollkitDock {
9194 let mut cache = HashMap :: new ( ) ;
9295 let mut response = GetResponse { blobs : vec ! [ ] } ;
9396 for ( index, id) in ids. into_iter ( ) . enumerate ( ) {
94- let blob_id = BlobId :: try_from ( id )
95- . map_err ( |_| Status :: invalid_argument ( format ! ( "not a valid ID at { index}" ) ) ) ?;
97+ let blob_id =
98+ BlobId :: try_from ( id ) . map_err ( |_| RollkitDockError :: GetInvalidBlobId { index } ) ?;
9699 let block_number = blob_id. block_number ;
97100 if !cache. contains_key ( & block_number) {
98101 let block_hash = self . client . await_finalized_height ( block_number) . await ;
99102 let block = self
100103 . client
101104 . await_block_at ( Some ( block_hash) )
102105 . await
103- . map_err ( |_| {
104- Status :: internal ( "failed to retrieve block number {block_number}" )
105- } ) ?;
106+ . map_err ( |_| RollkitDockError :: GetRetrieveBlock { block_number } ) ?;
106107 cache. insert ( blob_id. block_number , block) ;
107108 }
108109 // unwrap: at this point we know the block is in the cache, because at this point
@@ -117,7 +118,7 @@ impl da_service_server::DaService for RollkitDock {
117118 value : needle. data . clone ( ) ,
118119 } ) ;
119120 } else {
120- return Err ( Status :: not_found ( format ! ( "blob not found at { blob_id}" ) ) ) ;
121+ return Err ( RollkitDockError :: CantResolveBlobId ( blob_id) . into ( ) ) ;
121122 }
122123 }
123124 Ok ( Response :: new ( response) )
@@ -158,57 +159,73 @@ impl da_service_server::DaService for RollkitDock {
158159 . submit_key
159160 . as_ref ( )
160161 . cloned ( )
161- . ok_or_else ( || Status :: failed_precondition ( "no key for signing blobs" ) ) ?;
162- let namespace = self . namespace . ok_or_else ( || {
163- Status :: failed_precondition ( "no namespace provided, and no default namespace set" )
164- } ) ?;
162+ . ok_or_else ( || RollkitDockError :: NoSigningKey ) ?;
163+ let namespace = self
164+ . namespace
165+ . ok_or_else ( || RollkitDockError :: NamespaceNotProvided ) ?;
165166 let SubmitRequest {
166167 blobs,
167168 gas_price : _,
168169 } = request. into_inner ( ) ;
169- let mut response = SubmitResponse {
170- ids : vec ! [ ] ,
171- proofs : vec ! [ ] ,
172- } ;
173170 let blob_n = blobs. len ( ) ;
171+
172+ // First, prepare a list of extrinsics to submit.
173+ let mut extrinsics = vec ! [ ] ;
174174 for ( i, blob) in blobs. into_iter ( ) . enumerate ( ) {
175175 let data_hash = sha2_hash ( & blob. value ) ;
176- info ! (
177- "submitting blob {i}/{blob_n} (0x{}) to namespace {}" ,
178- hex:: encode( & data_hash) ,
179- namespace,
180- ) ;
181- let ( block_hash, extrinsic_index) = self
182- . client
183- . submit_blob ( blob. value , namespace, submit_key. clone ( ) )
176+ let nonce = self
177+ . gen_nonce ( )
184178 . await
185- . map_err ( |err| Status :: internal ( format ! ( "failed to submit blob: {err}" ) ) ) ?;
186- // TODO: getting the whole block is a bit inefficient, consider optimizing.
187- let block_number = match self
179+ . map_err ( RollkitDockError :: NonceGeneration ) ?;
180+ let extrinsic = self
188181 . client
189- . await_block_at ( Some ( block_hash ) )
182+ . make_blob_extrinsic ( blob . value , namespace , & submit_key , nonce )
190183 . await
191- . map ( |block| block. number )
192- {
193- Ok ( block_number) => block_number,
194- Err ( err) => {
195- return Err ( Status :: internal ( format ! (
196- "failed to obtain block number for 0x{}: {:?}" ,
197- hex:: encode( & block_hash) ,
198- err,
199- ) ) ) ;
200- }
201- } ;
202- let blob_id = BlobId {
203- block_number,
204- extrinsic_index,
205- data_hash,
206- } ;
207- info ! ( "blob landed: {blob_id}" ) ;
208- response. ids . push ( blob_id. into ( ) ) ;
209- response. proofs . push ( pbda:: Proof { value : vec ! [ ] } ) ;
184+ . map_err ( RollkitDockError :: MakeSubmitBlobExtrinsic ) ?;
185+ extrinsics. push ( ( i, data_hash, extrinsic) ) ;
210186 }
211- Ok ( Response :: new ( response) )
187+
188+ // Then, submit the extrinsics in parallel and collect the results.
189+ let futs = extrinsics
190+ . into_iter ( )
191+ . map ( |( i, data_hash, extrinsic) | async move {
192+ info ! (
193+ "submitting blob {i}/{blob_n} (0x{}) to namespace {}" ,
194+ hex:: encode( & data_hash) ,
195+ namespace
196+ ) ;
197+ let ( block_hash, extrinsic_index) = self
198+ . client
199+ . submit_blob ( & extrinsic)
200+ . await
201+ . map_err ( RollkitDockError :: SubmitBlob ) ?;
202+ // TODO: getting the whole block is a bit inefficient, consider optimizing.
203+ let block_number = match self
204+ . client
205+ . await_block_at ( Some ( block_hash) )
206+ . await
207+ . map ( |block| block. number )
208+ {
209+ Ok ( block_number) => block_number,
210+ Err ( err) => {
211+ return Err ( RollkitDockError :: SubmitRetrieveBlockNumber {
212+ block_hash,
213+ err,
214+ } ) ;
215+ }
216+ } ;
217+ let blob_id = BlobId {
218+ block_number,
219+ extrinsic_index,
220+ data_hash,
221+ } ;
222+ info ! ( "blob landed: {blob_id}" ) ;
223+ Ok ( blob_id. into ( ) )
224+ } ) ;
225+
226+ let ids: Vec < _ > = futures:: future:: try_join_all ( futs) . await ?;
227+ let proofs = ids. iter ( ) . map ( |_| pbda:: Proof { value : vec ! [ ] } ) . collect ( ) ;
228+ Ok ( Response :: new ( SubmitResponse { proofs, ids } ) )
212229 }
213230
214231 async fn validate (
@@ -241,11 +258,81 @@ impl da_service_server::DaService for RollkitDock {
241258 }
242259}
243260
261+ impl RollkitDock {
262+ /// Generates a new nonce suitable for signing an extrinsic from the signer.
263+ async fn gen_nonce ( & self ) -> anyhow:: Result < u64 > {
264+ let submit_key = self
265+ . submit_key
266+ . as_ref ( )
267+ . ok_or_else ( || anyhow:: anyhow!( "no key for signing blobs" ) ) ?; // should be unreachable
268+ let mut cur_nonce = self . cur_nonce . lock ( ) . await ;
269+ let nonce = match * cur_nonce {
270+ Some ( nonce) => nonce,
271+ None => self . client . get_last_nonce ( & submit_key) . await ?,
272+ } ;
273+ cur_nonce. replace ( nonce + 1 ) ;
274+ Ok ( nonce)
275+ }
276+ }
277+
244278fn sha2_hash ( data : & [ u8 ] ) -> [ u8 ; 32 ] {
245279 use sha2:: Digest ;
246280 sha2:: Sha256 :: digest ( data) . into ( )
247281}
248282
283+ enum RollkitDockError {
284+ NoSigningKey ,
285+ MakeSubmitBlobExtrinsic ( anyhow:: Error ) ,
286+ SubmitBlob ( anyhow:: Error ) ,
287+ NonceGeneration ( anyhow:: Error ) ,
288+ GetInvalidBlobId {
289+ index : usize ,
290+ } ,
291+ GetRetrieveBlock {
292+ block_number : u64 ,
293+ } ,
294+ SubmitRetrieveBlockNumber {
295+ block_hash : [ u8 ; 32 ] ,
296+ err : anyhow:: Error ,
297+ } ,
298+ CantResolveBlobId ( BlobId ) ,
299+ NamespaceNotProvided ,
300+ }
301+
302+ impl From < RollkitDockError > for Status {
303+ fn from ( me : RollkitDockError ) -> Status {
304+ use RollkitDockError :: * ;
305+ match me {
306+ NoSigningKey => {
307+ Status :: failed_precondition ( "the key for signing blobs is not provided" )
308+ }
309+ MakeSubmitBlobExtrinsic ( err) => {
310+ Status :: internal ( format ! ( "failed to create a submit blob extrinsic: {err}" ) )
311+ }
312+ SubmitBlob ( err) => Status :: internal ( format ! ( "failed to submit blob: {err}" ) ) ,
313+ NonceGeneration ( err) => Status :: internal ( format ! ( "failed to generate a nonce: {err}" ) ) ,
314+ GetInvalidBlobId { index } => {
315+ Status :: invalid_argument ( format ! ( "not a valid blob ID at index {index}" ) )
316+ }
317+ GetRetrieveBlock { block_number } => {
318+ Status :: internal ( format ! ( "failed to retrieve block number {block_number}" ) )
319+ }
320+ SubmitRetrieveBlockNumber { block_hash, err } => Status :: internal ( format ! (
321+ "failed to obtain block number for 0x{}: {}" ,
322+ hex:: encode( block_hash) ,
323+ err,
324+ ) ) ,
325+ CantResolveBlobId ( blob_id) => {
326+ Status :: not_found ( format ! ( "cannot resolve blob ID: {blob_id}" ) )
327+ }
328+ NamespaceNotProvided => Status :: failed_precondition (
329+ "no namespace provided, and no default names
330+ pace set" ,
331+ ) ,
332+ }
333+ }
334+ }
335+
249336struct BlobId {
250337 /// The block number at which the blob in question has been landed.
251338 block_number : u64 ,
0 commit comments