Skip to content

Commit 202c07b

Browse files
authored
run time watchdog (AliceO2Group#3927)
. stopd processing at time frame boundaries if expected run time for next time frame is expected to exceed a given run time limit . run time limit is given in seconds by command line option time-limit
1 parent 9355dc7 commit 202c07b

3 files changed

Lines changed: 61 additions & 3 deletions

File tree

Framework/Core/include/Framework/AODReaderHelpers.h

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
#include "Framework/TableBuilder.h"
1515
#include "Framework/AlgorithmSpec.h"
16+
#include <uv.h>
1617

1718
namespace o2
1819
{
@@ -21,6 +22,49 @@ namespace framework
2122
namespace readers
2223
{
2324

25+
struct RuntimeWatchdog {
26+
int numberTimeFrames;
27+
uint64_t startTime;
28+
uint64_t lastTime;
29+
double runTime;
30+
uint64_t runTimeLimit;
31+
32+
RuntimeWatchdog(Long64_t limit)
33+
{
34+
numberTimeFrames = -1;
35+
startTime = uv_hrtime();
36+
lastTime = startTime;
37+
runTime = 0.;
38+
runTimeLimit = limit;
39+
}
40+
41+
bool update()
42+
{
43+
numberTimeFrames++;
44+
if (runTimeLimit <= 0) {
45+
return true;
46+
}
47+
48+
auto nowTime = uv_hrtime();
49+
50+
// time spent to process the time frame
51+
double time_spent = numberTimeFrames < 1 ? (double)(nowTime - lastTime) / 1.E9 : 0.;
52+
runTime += time_spent;
53+
lastTime = nowTime;
54+
55+
return ((double)(lastTime - startTime) / 1.E9 + runTime / (numberTimeFrames + 1)) < runTimeLimit;
56+
}
57+
58+
void printOut()
59+
{
60+
LOGP(INFO, "RuntimeWatchdog");
61+
LOGP(INFO, " run time limit: {}", runTimeLimit);
62+
LOGP(INFO, " number of time frames: {}", numberTimeFrames);
63+
LOGP(INFO, " estimated run time per time frame: {}", (numberTimeFrames >= 0) ? runTime / (numberTimeFrames + 1) : 0.);
64+
LOGP(INFO, " estimated total run time: {}", (double)(lastTime - startTime) / 1.E9 + ((numberTimeFrames >= 0) ? runTime / (numberTimeFrames + 1) : 0.));
65+
}
66+
};
67+
2468
struct AODReaderHelpers {
2569
static AlgorithmSpec rootFileReaderCallback();
2670
static AlgorithmSpec aodSpawnerCallback(std::vector<InputSpec> requested);

Framework/Core/src/AODReaderHelpers.cxx

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,9 @@ AlgorithmSpec AODReaderHelpers::rootFileReaderCallback()
270270
}
271271
}
272272

273+
// get the run time watchdog
274+
auto* watchdog = new RuntimeWatchdog(options.get<int64_t>("time-limit"));
275+
273276
// analyze type of requested tables
274277
uint64_t readMask = calculateReadMask(spec.outputs, header::DataOrigin{"AOD"});
275278
std::vector<OutputRoute> unknowns;
@@ -280,14 +283,24 @@ AlgorithmSpec AODReaderHelpers::rootFileReaderCallback()
280283
auto counter = std::make_shared<int>(0);
281284
return adaptStateless([readMask,
282285
unknowns,
283-
counter,
286+
watchdog,
284287
didir](DataAllocator& outputs, ControlService& control, DeviceSpec const& device) {
288+
// check if RuntimeLimit is reached
289+
if (!watchdog->update()) {
290+
LOGP(INFO, "Run time exceeds run time limit of {} seconds!", watchdog->runTimeLimit);
291+
LOGP(INFO, "Stopping after time frame {}.", watchdog->numberTimeFrames - 1);
292+
didir->closeInputFiles();
293+
control.endOfStream();
294+
control.readyToQuit(QuitRequest::All);
295+
return;
296+
}
297+
285298
// Each parallel reader reads the files whose index is associated to
286299
// their inputTimesliceId
287300
assert(device.inputTimesliceId < device.maxInputTimeslices);
288-
size_t fi = (*counter * device.maxInputTimeslices) + device.inputTimesliceId;
289-
*counter += 1;
301+
size_t fi = (watchdog->numberTimeFrames * device.maxInputTimeslices) + device.inputTimesliceId;
290302

303+
// check if EoF is reached
291304
if (didir->atEnd(fi)) {
292305
LOGP(INFO, "All input files processed");
293306
didir->closeInputFiles();

Framework/Core/src/WorkflowHelpers.cxx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
205205
readers::AODReaderHelpers::rootFileReaderCallback(),
206206
{ConfigParamSpec{"aod-file", VariantType::String, "aod.root", {"Input AOD file"}},
207207
ConfigParamSpec{"json-file", VariantType::String, {"json configuration file"}},
208+
ConfigParamSpec{"time-limit", VariantType::Int64, 0ll, {"Maximum run time limit in seconds"}},
208209
ConfigParamSpec{"start-value-enumeration", VariantType::Int64, 0ll, {"initial value for the enumeration"}},
209210
ConfigParamSpec{"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}},
210211
ConfigParamSpec{"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}}};

0 commit comments

Comments
 (0)