@@ -6,7 +6,7 @@ import Queue from 'p-queue'
66import { createTopology } from '@libp2p/topology'
77import { codes } from './errors.js'
88import { PeerStreams as PeerStreamsImpl } from './peer-streams.js'
9- import { toMessage , ensureArray , randomSeqno , noSignMsgId , msgId , toRpcMessage } from './utils.js'
9+ import { toMessage , ensureArray , noSignMsgId , msgId , toRpcMessage , randomSeqno } from './utils.js'
1010import {
1111 signMessage ,
1212 verifySignature
@@ -17,6 +17,7 @@ import type { Connection } from '@libp2p/interface-connection'
1717import type { PubSub , Message , StrictNoSign , StrictSign , PubSubInit , PubSubEvents , PeerStreams , PubSubRPCMessage , PubSubRPC , PubSubRPCSubscription , SubscriptionChangeData , PublishResult } from '@libp2p/interface-pubsub'
1818import { PeerMap , PeerSet } from '@libp2p/peer-collections'
1919import { Components , Initializable } from '@libp2p/components'
20+ import type { Uint8ArrayList } from 'uint8arraylist'
2021
2122const log = logger ( 'libp2p:pubsub' )
2223
@@ -284,7 +285,7 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
284285 /**
285286 * Responsible for processing each RPC message received by other peers.
286287 */
287- async processMessages ( peerId : PeerId , stream : AsyncIterable < Uint8Array > , peerStreams : PeerStreams ) {
288+ async processMessages ( peerId : PeerId , stream : AsyncIterable < Uint8ArrayList > , peerStreams : PeerStreams ) {
288289 try {
289290 await pipe (
290291 stream ,
@@ -446,6 +447,10 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
446447 const signaturePolicy = this . globalSignaturePolicy
447448 switch ( signaturePolicy ) {
448449 case 'StrictSign' :
450+ if ( msg . type !== 'signed' ) {
451+ throw errcode ( new Error ( 'Message type should be "signed" when signature policy is StrictSign but it was not' ) , codes . ERR_MISSING_SIGNATURE )
452+ }
453+
449454 if ( msg . sequenceNumber == null ) {
450455 throw errcode ( new Error ( 'Need seqno when signature policy is StrictSign but it was missing' ) , codes . ERR_MISSING_SEQNO )
451456 }
@@ -474,19 +479,19 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
474479 * Decode Uint8Array into an RPC object.
475480 * This can be override to use a custom router protobuf.
476481 */
477- abstract decodeRpc ( bytes : Uint8Array ) : PubSubRPC
482+ abstract decodeRpc ( bytes : Uint8Array | Uint8ArrayList ) : PubSubRPC
478483
479484 /**
480485 * Encode RPC object into a Uint8Array.
481486 * This can be override to use a custom router protobuf.
482487 */
483- abstract encodeRpc ( rpc : PubSubRPC ) : Uint8Array
488+ abstract encodeRpc ( rpc : PubSubRPC ) : Uint8ArrayList
484489
485490 /**
486491 * Encode RPC object into a Uint8Array.
487492 * This can be override to use a custom router protobuf.
488493 */
489- abstract encodeMessage ( rpc : PubSubRPCMessage ) : Uint8Array
494+ abstract encodeMessage ( rpc : PubSubRPCMessage ) : Uint8ArrayList
490495
491496 /**
492497 * Send an rpc object to a peer
@@ -523,26 +528,42 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
523528 const signaturePolicy = this . globalSignaturePolicy
524529 switch ( signaturePolicy ) {
525530 case 'StrictNoSign' :
531+ if ( message . type !== 'unsigned' ) {
532+ throw errcode ( new Error ( 'Message type should be "unsigned" when signature policy is StrictNoSign but it was not' ) , codes . ERR_MISSING_SIGNATURE )
533+ }
534+
535+ // @ts -expect-error should not be present
526536 if ( message . signature != null ) {
527537 throw errcode ( new Error ( 'StrictNoSigning: signature should not be present' ) , codes . ERR_UNEXPECTED_SIGNATURE )
528538 }
539+
540+ // @ts -expect-error should not be present
529541 if ( message . key != null ) {
530542 throw errcode ( new Error ( 'StrictNoSigning: key should not be present' ) , codes . ERR_UNEXPECTED_KEY )
531543 }
544+
545+ // @ts -expect-error should not be present
532546 if ( message . sequenceNumber != null ) {
533547 throw errcode ( new Error ( 'StrictNoSigning: seqno should not be present' ) , codes . ERR_UNEXPECTED_SEQNO )
534548 }
535549 break
536550 case 'StrictSign' :
551+ if ( message . type !== 'signed' ) {
552+ throw errcode ( new Error ( 'Message type should be "signed" when signature policy is StrictSign but it was not' ) , codes . ERR_MISSING_SIGNATURE )
553+ }
554+
537555 if ( message . signature == null ) {
538556 throw errcode ( new Error ( 'StrictSigning: Signing required and no signature was present' ) , codes . ERR_MISSING_SIGNATURE )
539557 }
558+
540559 if ( message . sequenceNumber == null ) {
541- throw errcode ( new Error ( 'StrictSigning: Signing required and no seqno was present' ) , codes . ERR_MISSING_SEQNO )
560+ throw errcode ( new Error ( 'StrictSigning: Signing required and no sequenceNumber was present' ) , codes . ERR_MISSING_SEQNO )
542561 }
562+
543563 if ( ! ( await verifySignature ( message , this . encodeMessage . bind ( this ) ) ) ) {
544564 throw errcode ( new Error ( 'StrictSigning: Invalid message signature' ) , codes . ERR_INVALID_SIGNATURE )
545565 }
566+
546567 break
547568 default :
548569 throw errcode ( new Error ( 'Cannot validate message: unhandled signature policy' ) , codes . ERR_UNHANDLED_SIGNATURE_POLICY )
@@ -559,14 +580,16 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
559580 * Normalizes the message and signs it, if signing is enabled.
560581 * Should be used by the routers to create the message to send.
561582 */
562- async buildMessage ( message : Message ) {
583+ async buildMessage ( message : { from : PeerId , topic : string , data : Uint8Array , sequenceNumber : bigint } ) : Promise < Message > {
563584 const signaturePolicy = this . globalSignaturePolicy
564585 switch ( signaturePolicy ) {
565586 case 'StrictSign' :
566- message . sequenceNumber = randomSeqno ( )
567587 return await signMessage ( this . components . getPeerId ( ) , message , this . encodeMessage . bind ( this ) )
568588 case 'StrictNoSign' :
569- return await Promise . resolve ( message )
589+ return await Promise . resolve ( {
590+ type : 'unsigned' ,
591+ ...message
592+ } )
570593 default :
571594 throw errcode ( new Error ( 'Cannot build message: unhandled signature policy' ) , codes . ERR_UNHANDLED_SIGNATURE_POLICY )
572595 }
@@ -603,10 +626,11 @@ export abstract class PubSubBaseProtocol<Events = PubSubEvents> extends EventEmi
603626 throw new Error ( 'Pubsub has not started' )
604627 }
605628
606- const message : Message = {
629+ const message = {
607630 from : this . components . getPeerId ( ) ,
608631 topic,
609- data : data ?? new Uint8Array ( 0 )
632+ data : data ?? new Uint8Array ( 0 ) ,
633+ sequenceNumber : randomSeqno ( )
610634 }
611635
612636 log ( 'publish topic: %s from: %p data: %m' , topic , message . from , message . data )
0 commit comments