From cb1f742100489458ea046aa4f508ac895cd23351 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 5 May 2022 12:20:36 +0300 Subject: [PATCH] eth/filters: annotate logs in a sub with block end markers --- eth/filters/api.go | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/eth/filters/api.go b/eth/filters/api.go index 7196e90f9eb..3b393d90734 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -108,7 +108,6 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { pendingTxs = make(chan []common.Hash) pendingTxSub = api.events.SubscribePendingTxs(pendingTxs) ) - api.filtersMu.Lock() api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: pendingTxSub} api.filtersMu.Unlock() @@ -130,7 +129,6 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { } } }() - return pendingTxSub.ID } @@ -141,7 +139,6 @@ func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Su if !supported { return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported } - rpcSub := notifier.CreateSubscription() go func() { @@ -165,7 +162,6 @@ func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Su } } }() - return rpcSub, nil } @@ -178,7 +174,6 @@ func (api *PublicFilterAPI) NewBlockFilter() rpc.ID { headers = make(chan *types.Header) headerSub = api.events.SubscribeNewHeads(headers) ) - api.filtersMu.Lock() api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: headerSub} api.filtersMu.Unlock() @@ -200,7 +195,6 @@ func (api *PublicFilterAPI) NewBlockFilter() rpc.ID { } } }() - return headerSub.ID } @@ -210,7 +204,6 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er if !supported { return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported } - rpcSub := notifier.CreateSubscription() go func() { @@ -230,7 +223,6 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er } } }() - return rpcSub, nil } @@ -240,24 +232,32 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc if !supported { return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported } - var ( rpcSub = notifier.CreateSubscription() matchedLogs = make(chan []*types.Log) ) - logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), matchedLogs) if err != nil { return nil, err } - + // Create an ephemeral log wrapper that contains an extra marker to signal + // the last log in a block + type annLog struct { + *types.Log // Embed for flat json fields + Last bool `json:"last"` // Whether it's the last matched log in the block + } go func() { - for { select { case logs := <-matchedLogs: - for _, log := range logs { - notifier.Notify(rpcSub.ID, &log) + for i, log := range logs { + // If this is the last log of a block, annotate it as such + if i == len(logs)-1 || log.BlockHash != logs[i+1].BlockHash { + notifier.Notify(rpcSub.ID, &annLog{Log: log, Last: true}) + continue + } + // Otherwise, stream the log without annotation to avoid increasing the size + notifier.Notify(rpcSub.ID, log) } case <-rpcSub.Err(): // client send an unsubscribe request logsSub.Unsubscribe() @@ -268,7 +268,6 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc } } }() - return rpcSub, nil }