Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor(oracles): needed for sports-db
WIP
  • Loading branch information
reo101 authored and Owliie committed Dec 9, 2025
commit ce75cf1b0d11dba5e9b52e3214b85c1537b9a9c4
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async function getBlocksenseFeedsCompatibility(
}
const dataFeedId = dataFeed.id;

const { base, quote } = dataFeed.additional_feed_info.pair;
const { base, quote } = dataFeed.additional_feed_info.pair!;

const baseAddress = isSupportedCurrencySymbol(base)
? currencySymbolToDenominationAddress[base]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export async function addDataProviders(
const dataFeedsWithCryptoResources = await Promise.all(
filteredFeeds.map(async feed => {
const providers = getAllProvidersForPair(
feed.additional_feed_info.pair,
feed.additional_feed_info.pair!,
providersData,
);
return {
Expand All @@ -37,5 +37,5 @@ export async function addDataProviders(

// Filter feeds that have a quote
function filterFeedsWithQuotes(feeds: SimplifiedFeed[]): SimplifiedFeed[] {
return feeds.filter(feed => feed.additional_feed_info.pair.quote);
return feeds.filter(feed => feed.additional_feed_info.pair!.quote);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export function getUniqueDataFeeds(
const seenPairs = new Set<string>();

return dataFeeds.filter(feed => {
const pairKey = pairToString(feed.additional_feed_info.pair);
const pairKey = pairToString(feed.additional_feed_info.pair!);

if (seenPairs.has(pairKey)) {
return false;
Expand All @@ -27,7 +27,7 @@ export function addStableCoinVariants(
feeds: SimplifiedFeed[],
): SimplifiedFeed[] {
const stableCoinVariants = feeds.flatMap(feed => {
const { base, quote } = feed.additional_feed_info.pair;
const { base, quote } = feed.additional_feed_info.pair!;
if (quote in stableCoins) {
return stableCoins[quote as keyof typeof stableCoins]
.map(altStableCoin => createPair(base, altStableCoin))
Expand Down Expand Up @@ -65,7 +65,7 @@ export async function addMarketCapRank(
const asset = cmcMarketCap.find(
asset =>
asset.symbol.toLowerCase() ===
feed.additional_feed_info.pair.base.toLowerCase(),
feed.additional_feed_info.pair!.base.toLowerCase(),
);
return {
...feed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export async function updateExchangesArgumentConfig(): Promise<NewFeedsConfig> {
for (const feed of feedsConfig.feeds) {
const pair = feed.additional_feed_info.pair;
const initialExchangePrices = getExchangesPriceDataForPair(
pair,
pair!,
providersData,
);
const outlierExchanges = detectPriceOutliers(
Expand All @@ -44,14 +44,14 @@ export async function updateExchangesArgumentConfig(): Promise<NewFeedsConfig> {

normalizedProvidersData = removePriceOutliers(
providersData,
pair,
pair!,
outlierExchanges,
);
}

const updatedFeedConfig = feedsConfig.feeds.map(feed => {
const providers = getAllProvidersForPair(
feed.additional_feed_info.pair,
feed.additional_feed_info.pair!,
normalizedProvidersData,
);

Expand Down
8 changes: 4 additions & 4 deletions apps/e2e-tests/src/test-scenarios/general/e2e.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import { rgSearchPattern } from '../../utils/utilities';
import { expectedPCStatuses03 } from './expected-service-status';

describe.sequential('E2E Tests with process-compose', () => {
const testEnvironment = `e2e-general`;
const testScenario = `general`;
const network = 'ink_sepolia';
const MAX_HISTORY_ELEMENTS_PER_FEED = 8192;

Expand All @@ -67,7 +67,7 @@ describe.sequential('E2E Tests with process-compose', () => {
const res = await pipe(
Effect.gen(function* () {
processCompose = yield* EnvironmentManager;
yield* processCompose.start(testEnvironment);
yield* processCompose.start(testScenario);
hasProcessComposeStarted = true;

if (!process.listenerCount('SIGINT')) {
Expand All @@ -94,7 +94,7 @@ describe.sequential('E2E Tests with process-compose', () => {

if (Exit.isFailure(res)) {
throw new Error(
`Failed to start test environment: ${testEnvironment}. Reason: ${res.cause}`,
`Failed to start test environment: ${testScenario}. Reason: ${res.cause}`,
);
}
});
Expand Down Expand Up @@ -336,7 +336,7 @@ describe.sequential('E2E Tests with process-compose', () => {

describe.sequential('Reporter behavior based on logs', () => {
const reporterLogsFile =
getProcessComposeLogsFiles(testEnvironment)['reporter-a'];
getProcessComposeLogsFiles(testScenario)['reporter-a'];

it.live('Reporter should NOT panic', () =>
Effect.gen(function* () {
Expand Down
10 changes: 10 additions & 0 deletions apps/e2e-tests/src/test-scenarios/wit/environment-setup.nix
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,16 @@ in
default-exec-interval = 10;
secret-key-path = "${testKeysDir}/reporter_secret_key";
second-consensus-secret-key-path = "${testKeysDir}/reporter_second_consensus_secret_key";
api-keys = {
ALPHAVANTAGE_API_KEY = "${apiKeysDir}/ALPHAVANTAGE_API_KEY";
APCA_API_KEY_ID = "${apiKeysDir}/APCA_API_KEY_ID";
APCA_API_SECRET_KEY = "${apiKeysDir}/APCA_API_SECRET_KEY";
YAHOO_FINANCE_API_KEY = "${apiKeysDir}/YAHOO_FINANCE_API_KEY";
TWELVEDATA_API_KEY = "${apiKeysDir}/TWELVEDATA_API_KEY";
FMP_API_KEY = "${apiKeysDir}/FMP_API_KEY";
SPOUT_RWA_API_KEY = "${apiKeysDir}/SPOUT_RWA_API_KEY";
METALS_API_KEY = "${apiKeysDir}/METALS_API_KEY";
};
};
};

Expand Down
46 changes: 46 additions & 0 deletions apps/e2e-tests/src/test-scenarios/wit/expected-service-status.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
export const expectedPCStatuses03 = {
'anvil-impersonate-and-fund-ink-sepolia': {
status: 'Completed',
exit_code: 0,
},
'anvil-ink-sepolia': {
status: 'Running',
exit_code: 0,
},
'blocksense-reporter-a': {
status: 'Running',
exit_code: 0,
},
'blocksense-sequencer': {
status: 'Running',
exit_code: 0,
},
};

export const expectedProcessesStatus = {
...expectedPCStatuses03,
'aggregate-consensus-reader': {
status: 'Running',
exit_code: 0,
},
'anvil-impersonate-and-fund-ethereum-sepolia': {
status: 'Completed',
exit_code: 0,
},
'anvil-ethereum-sepolia': {
status: 'Running',
exit_code: 0,
},
'blockchain-reader': {
status: 'Running',
exit_code: 0,
},
kafka: {
status: 'Running',
exit_code: 0,
},
blama: {
status: 'Running',
exit_code: 0,
},
};
32 changes: 32 additions & 0 deletions apps/e2e-tests/src/test-scenarios/wit/feeds_config_v2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"feeds": [
{
"id": "0",
"full_name": "Liverpool Matches Results",
"description": "Results of Liverpool matches",
"type": "sport-feed",
"oracle_id": "sports-db",
"value_type": "bytes",
"stride": 4,
"quorum": {
"percentage": 90,
"aggregation": "majority"
},
"schedule": {
"interval_ms": 10000,
"heartbeat_ms": 30000,
"deviation_percentage": 0.1,
"first_report_start_unix_time_ms": 0
},
"additional_feed_info": {
"decimals": 0,
"category": "Sports",
"arguments": {
"kind": "sports-db",
"team_id": 133602,
"sport_type": "Soccer"
}
}
}
]
}
22 changes: 15 additions & 7 deletions apps/e2e-tests/src/test-scenarios/wit/wit-e2e.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ import { getDataFeedsInfoFromNetwork } from '../../utils/services/onchain';
import type { SequencerService } from '../../utils/services/sequencer';
import { Sequencer } from '../../utils/services/sequencer';
import type { UpdatesToNetwork } from '../../utils/services/types';
import { expectedPCStatuses03 } from '../general/expected-service-status';
import { expectedPCStatuses03 } from './expected-service-status';

describe.sequential('E2E Tests with process-compose', () => {
const testEnvironment = `e2e-wit`;
const testScenario = `wit`;
const testEnvironment = `e2e-${testScenario}`;
const network = 'ink_sepolia';
const MAX_HISTORY_ELEMENTS_PER_FEED = 8192;

Expand All @@ -63,7 +64,7 @@ describe.sequential('E2E Tests with process-compose', () => {
const res = await pipe(
Effect.gen(function* () {
processCompose = yield* EnvironmentManager;
yield* processCompose.start(testEnvironment);
yield* processCompose.start(testScenario);
hasProcessComposeStarted = true;

if (!process.listenerCount('SIGINT')) {
Expand Down Expand Up @@ -99,7 +100,7 @@ describe.sequential('E2E Tests with process-compose', () => {
}
});

it.live('Test processes state shortly after start', () =>
it.live.only('Test processes state shortly after start', () =>
gateEffect(
failFastGateway,
Effect.gen(function* () {
Expand All @@ -125,7 +126,7 @@ describe.sequential('E2E Tests with process-compose', () => {
),
);

it.live('Test sequencer configs are available and in correct format', () =>
it.live.only('Test sequencer configs are available and in correct format', () =>
Effect.gen(function* () {
sequencerConfig = yield* sequencer.getConfig();
feedsConfig = yield* sequencer.getFeedsConfig();
Expand Down Expand Up @@ -162,15 +163,15 @@ describe.sequential('E2E Tests with process-compose', () => {
);

it.live(
'Test processes state after at least 2 updates of each feeds have been made',
'Test sports db yields metrics',
() =>
Effect.gen(function* () {
updatesToNetworks = yield* Effect.retry(
sequencer
.fetchUpdatesToNetworksMetric()
.pipe(
Effect.filterOrFail(updates =>
valuesOf(updates[network]).every(v => v > 2),
valuesOf(updates[network]).every(v => v >= 1),
),
),
{
Expand Down Expand Up @@ -203,6 +204,13 @@ describe.sequential('E2E Tests with process-compose', () => {
initialRounds,
);

console.log({
feedIds,
initialRounds,
initialFeedsInfoLocal,
updatesToNetwork: updatesToNetworks[network],
});

expect(initialFeedsInfo).toEqual(initialFeedsInfoLocal);

// If some feeds were not updated, we will log a warning
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
Stream,
String as EString,
} from 'effect';
import { Command } from '@effect/platform';
import { Command, FileSystem } from '@effect/platform';
import { NodeContext } from '@effect/platform-node';

import { arrayToObject } from '@blocksense/base-utils/array-iter';
Expand All @@ -18,6 +18,17 @@ import { logTestEnvironmentInfo } from '../utilities';
import { EnvironmentError, EnvironmentManager } from './types';

export const GENERAL_SCENARIO_FEEDS_CONFIG_DIR = `${rootDir}/apps/e2e-tests/src/test-scenarios/general`;
export function getFeedsConfigForScenario(scenario: string) {
return Effect.gen(function* () {
const configDir = `${rootDir}/apps/e2e-tests/src/test-scenarios/${scenario}`;
const exists = yield* FileSystem.FileSystem.pipe(
Effect.flatMap(fs => fs.exists(configDir)),
Effect.provide(NodeContext.layer),
);

return exists ? configDir : GENERAL_SCENARIO_FEEDS_CONFIG_DIR;
});
}

export const ProcessComposeLive = Layer.succeed(
EnvironmentManager,
Expand Down Expand Up @@ -56,7 +67,7 @@ export const ProcessComposeLive = Layer.succeed(
}),
);

function startEnvironment(testEnvironment: string): Effect.Effect<void, Error> {
function startEnvironment(testScenario: string): Effect.Effect<void, Error> {
const runString = <E, R>(
stream: Stream.Stream<Uint8Array, E, R>,
): Effect.Effect<string, E, R> =>
Expand All @@ -65,16 +76,19 @@ function startEnvironment(testEnvironment: string): Effect.Effect<void, Error> {
Stream.runFold(EString.empty, EString.concat),
);

const testEnvironment = `e2e-${testScenario}`;

return Effect.gen(function* () {
yield* logTestEnvironmentInfo('Starting', testEnvironment);
yield* Effect.sync(() => {
process.env['FEEDS_CONFIG_DIR'] = GENERAL_SCENARIO_FEEDS_CONFIG_DIR;
});

process.env['FEEDS_CONFIG_DIR'] =
yield* getFeedsConfigForScenario(testScenario);

const program = Effect.gen(function* () {
const command = Command.make(
'just',
'start-environment',
'e2e-general',
testEnvironment,
'0',
'--detached',
);
Expand Down
36 changes: 35 additions & 1 deletion libs/feed_registry/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,46 @@ impl FeedAggregate {
.clone();
FeedType::Text(result)
} else if frequency_map_bytes.len() > 0 {
let result = frequency_map_bytes
let most_frequent = frequency_map_bytes
.into_iter()
.max_by_key(|&(_, count)| count)
.map(|(s, _)| s)
.unwrap()
.clone();

fn prefix_size(stride: u8) -> u8 {
// `2 ^ (n + 1)` divided by 8, rounded up
(stride + 5 + 7) / 8
}

fn right_align_truncate(dst: &mut [u8], src: &[u8]) {
let dst_len = dst.len();
let src_len = src.len();

let n = std::cmp::min(dst_len, src_len);
dst[dst_len - n..].copy_from_slice(&src[src_len - n..]);
}

let prefix_size = prefix_size(4) as usize;
let mut prefix = vec![0; prefix_size];

let length = most_frequent.len();

// length | prefix
// (in bytes) |
// ------------------------------------
// 0x 00 .. 00 12 34 | 0x 00 00 00 00

// NOTE: Zip together the pointers to the bytes of the prefix and the length, starting from the "right"
// std::iter::zip(prefix[..].iter_mut().rev(), length.to_le_bytes()).for_each(
// |(t, s)| {
// *t = s;
// },
// );
right_align_truncate(&mut prefix, &length.to_be_bytes());

let result = [prefix, most_frequent].concat();

FeedType::Bytes(result)
} else {
panic!("No valid types to aggregate in MajorityVoteAggregator!");
Expand Down
Loading