Skip to content

Commit 8211abc

Browse files
committed
Merge branch 'release/1.17.2'
2 parents 3479f62 + 75d0528 commit 8211abc

4 files changed

Lines changed: 61 additions & 81 deletions

File tree

CHANGELOG.md

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1313
### Fixed
1414

1515

16-
## [1.17.1]
16+
## [1.17.2]
1717
### Changed
18-
- Improved PlanScore error logging & double timeout [#1204](https://github.com/PublicMapping/districtbuilder/pull/1204)
19-
- Revert to using JSON for data serialization [#1206](https://github.com/PublicMapping/districtbuilder/pull/1206)
18+
- Reduce max cache size [#1213](https://github.com/PublicMapping/districtbuilder/pull/1213)
19+
20+
### Fixed
21+
- Fixed typos in process-geojson & serialize-topojson commands [#1209](https://github.com/PublicMapping/districtbuilder/pull/1209)
22+
- Recreate workers when they time out [#1214](https://github.com/PublicMapping/districtbuilder/pull/1214)
2023

24+
## [1.17.1]
25+
### Changed
26+
- Improved PlanScore error logging & double timeout [#1204](https://github.com/PublicMapping/districtbuilder/pull/1204)
27+
- Revert to using JSON for data serialization [#1206](https://github.com/PublicMapping/districtbuilder/pull/1206)
2128

2229
## [1.17.0]
2330

@@ -499,7 +506,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
499506

500507
- Initial release.
501508

502-
[unreleased]: https://github.com/publicmapping/districtbuilder/compare/1.17.1...HEAD
509+
[unreleased]: https://github.com/publicmapping/districtbuilder/compare/1.17.2...HEAD
510+
[1.17.2]: https://github.com/publicmapping/districtbuilder/compare/1.17.1...1.17.2
503511
[1.17.1]: https://github.com/publicmapping/districtbuilder/compare/1.17.0...1.17.1
504512
[1.17.0]: https://github.com/publicmapping/districtbuilder/compare/1.16.1...1.17.0
505513
[1.16.1]: https://github.com/publicmapping/districtbuilder/compare/1.16.0...1.16.1

src/manage/src/commands/process-geojson.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -522,8 +522,8 @@ it when necessary (file sizes ~1GB+).
522522
) {
523523
const filteredTopojson = this.filterTopoJson(topology, demographics, voting);
524524
this.log("Writing topojson file");
525-
const path = join(dir, "topo.buf");
526-
const output = createWriteStream(path, { encoding: "binary" });
525+
const path = join(dir, "topo.json");
526+
const output = createWriteStream(path, { encoding: "utf-8" });
527527
output.write(JSON.stringify(filteredTopojson));
528528
output.close();
529529
}

src/manage/src/commands/serialize-topojson.ts

Lines changed: 20 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ export default class SerializeTopojson extends Command {
4343
cli.action.start(`Reading base TopoJSON: ${s3URI}`);
4444
const baseTopojson = await (flags.input === "json"
4545
? this.readJson(s3URI)
46-
: flags.output === "buf"
46+
: flags.input === "buf"
4747
? this.readBuf(s3URI)
4848
: this.readPbf(s3URI));
4949
cli.action.stop();
@@ -61,15 +61,7 @@ export default class SerializeTopojson extends Command {
6161
// Reads a TopoJSON file from S3, given the S3 run directory
6262
async readJson(inputS3Dir: string): Promise<Topology<Objects<{}>>> {
6363
this.log("Reading topo.json");
64-
const uriComponents = inputS3Dir.split("/");
65-
const bucket = uriComponents[2];
66-
const key = `${uriComponents.slice(3).join("/")}topo.json`;
67-
const response: any = await new S3()
68-
.getObject({
69-
Bucket: bucket,
70-
Key: key
71-
})
72-
.promise();
64+
const response: any = await new S3().getObject(this.s3Options(inputS3Dir, "json")).promise();
7365

7466
const objects = await new Promise(resolve =>
7567
createReadStream(response.Body as Buffer)
@@ -120,76 +112,50 @@ export default class SerializeTopojson extends Command {
120112
} as Topology<Objects<{}>>;
121113
}
122114

123-
jsonOptions(inputS3Dir: string) {
124-
const uriComponents = inputS3Dir.split("/");
125-
return {
126-
Bucket: uriComponents[2],
127-
Key: `${uriComponents.slice(3).join("/")}topo.json`
128-
};
129-
}
130-
131-
bufOptions(inputS3Dir: string) {
132-
const uriComponents = inputS3Dir.split("/");
133-
return {
134-
Bucket: uriComponents[2],
135-
Key: `${uriComponents.slice(3).join("/")}topo.buf`
136-
};
115+
read(inputS3Dir: string, ext: string) {
116+
this.log(`Reading topo.${ext}`);
117+
const s3Client = new S3();
118+
return s3Client.getObject(this.s3Options(inputS3Dir, ext)).promise();
137119
}
138120

139-
pbfOptions(inputS3Dir: string) {
121+
s3Options(inputS3Dir: string, ext: string) {
140122
const uriComponents = inputS3Dir.split("/");
141123
return {
142124
Bucket: uriComponents[2],
143-
Key: `${uriComponents.slice(3).join("/")}topo.pbf`
125+
Key: `${uriComponents.slice(3).join("/")}topo.${ext}`
144126
};
145127
}
146128

147129
async readBuf(inputS3Dir: string) {
148-
this.log("Reading topo.buf");
149-
150-
const s3Client = new S3();
151-
const resp = await s3Client.getObject(this.bufOptions(inputS3Dir)).promise();
130+
const resp = await this.read(inputS3Dir, "buf");
152131
return deserialize(resp.Body as Buffer);
153132
}
154133

155134
async readPbf(inputS3Dir: string) {
156-
this.log("Reading topo.buf");
157-
158-
const s3Client = new S3();
159-
const resp = await s3Client.getObject(this.bufOptions(inputS3Dir)).promise();
135+
const resp = await this.read(inputS3Dir, "pbf");
160136
return decode(new Pbf(resp.Body as Buffer)) as Topology;
161137
}
162138

163-
writeJson(inputS3Dir: string, topology: Topology<Objects<{}>>) {
164-
this.log("Streaming topo.json");
139+
write(inputS3Dir: string, ext: string, body: string | Buffer | Uint8Array) {
140+
this.log(`Writing topo.${ext}`);
165141
const s3Client = new S3();
166142
return s3Client
167143
.upload({
168-
Body: JSON.stringify(topology),
169-
...this.jsonOptions(inputS3Dir)
144+
Body: body,
145+
...this.s3Options(inputS3Dir, ext)
170146
})
171147
.promise();
172148
}
173149

150+
writeJson(inputS3Dir: string, topology: Topology<Objects<{}>>) {
151+
return this.write(inputS3Dir, "json", JSON.stringify(topology));
152+
}
153+
174154
writeBuf(inputS3Dir: string, topology: Topology<Objects<{}>>) {
175-
this.log("Streaming topo.buf");
176-
const s3Client = new S3();
177-
return s3Client
178-
.upload({
179-
Body: serialize(topology),
180-
...this.bufOptions(inputS3Dir)
181-
})
182-
.promise();
155+
return this.write(inputS3Dir, "buf", serialize(topology));
183156
}
184157

185158
writePbf(inputS3Dir: string, topology: Topology<Objects<{}>>) {
186-
this.log("Streaming topo.pbf");
187-
const s3Client = new S3();
188-
return s3Client
189-
.upload({
190-
Body: encode(topology, new Pbf()),
191-
...this.pbfOptions(inputS3Dir)
192-
})
193-
.promise();
159+
return this.write(inputS3Dir, "pbf", encode(topology, new Pbf()));
194160
}
195161
}

src/server/src/districts/services/worker-pool.service.ts

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,10 @@ const dockerMemLimit = existsSync("/sys/fs/cgroup/memory.max")
2525
const hostmem = os.totalmem();
2626
const totalmem = Math.min(hostmem, dockerMemLimit);
2727
// Targets:
28-
// 1.5Gb of cache for 12Gb total (dev)
29-
// 3Gb of cache for 15Gb total (16Gb instance w/ ~1Gb ECS agent)
30-
// 11Gb of cache for 31Gb total (32Gb instance w/ ~1Gb ECS agent)
31-
// 27Gb of cache for 63Gb total (64Gb instance w/ ~1Gb ECS agent)
32-
export const cacheSize = totalmem * 0.5 - 4.5 * 1024 * 1024 * 1024;
28+
// 1.8Gb of cache for 12Gb total (dev)
29+
// 3.0Gb of cache for 15Gb total (16Gb instance w/ ~1Gb ECS agent)
30+
// 9.5Gb of cache for 31Gb total (32Gb instance w/ ~1Gb ECS agent)
31+
export const cacheSize = totalmem * 0.4 - 3 * 1024 * 1024 * 1024;
3332
const maxWorkerCacheSize = Math.ceil(cacheSize / NUM_WORKERS);
3433

3534
// Not that important to limit small regions, but large regions in every worker will eat up our cache
@@ -97,7 +96,10 @@ export class WorkerPoolService {
9796
(!this.pendingWorkersByRegion[regionConfig.id] ||
9897
!this.pendingWorkersByRegion[regionConfig.id].includes(worker));
9998
const availableWorkerIndexes = [...this.workerQueues.keys()].filter(workerLacksRegion);
100-
const lastUsed = this.workersByRegion[regionConfig.id] || [];
99+
const lastUsed = [
100+
...(this.workersByRegion[regionConfig.id] || []),
101+
...(this.pendingWorkersByRegion[regionConfig.id] || [])
102+
];
101103

102104
const lastUsedSettled = getSettledQueues(this.workerQueues, lastUsed);
103105
const settledQueues = getSettledQueues(this.workerQueues, availableWorkerIndexes);
@@ -203,7 +205,7 @@ export class WorkerPoolService {
203205
const [workerIndex, addedToRegion] = this.findQueue(regionConfig);
204206
const task = this.workerQueues[workerIndex].add(() =>
205207
this.workers[workerIndex]
206-
.then((worker: undefined | WorkerThread | Promise<WorkerThread>) => {
208+
.then(async (worker: WorkerThread | undefined) => {
207209
// If this region wasn't already in this workers cache, update the worker size
208210
// This may trigger recreating the worker thread if we would exceed the max size
209211
if (addedToRegion) {
@@ -213,7 +215,7 @@ export class WorkerPoolService {
213215

214216
if (this.workerSizes[workerIndex] + size > maxWorkerCacheSize) {
215217
// eslint-disable-next-line no-param-reassign
216-
worker = this.createWorker(workerIndex, "OoM");
218+
worker = await this.createWorker(workerIndex, "OoM");
217219
// Reset tracking info for this worker (any pending data stays the same)
218220
this.workerSizes[workerIndex] = 0;
219221
this.workersByRegion = _.mapValues(this.workersByRegion, workers =>
@@ -231,26 +233,30 @@ export class WorkerPoolService {
231233
}
232234
return worker;
233235
})
234-
.then(worker =>
235-
// If we haven't created the worker yet, do so now
236-
worker ? cb(worker) : this.createWorker(workerIndex).then(cb)
236+
.then(
237+
worker =>
238+
new Promise<R>((resolve, reject) => {
239+
// Clear pre-existing timeouts if we're adding a new one before cleanup could happen
240+
this.clearQueueTimeout(workerIndex);
241+
if (timeout) {
242+
// eslint-disable-next-line functional/immutable-data
243+
this.timeouts[workerIndex] = setTimeout(() => {
244+
reject("Worker request timed out");
245+
}, timeout);
246+
}
247+
// If we haven't created the worker yet, do so now
248+
const result = worker ? cb(worker) : this.createWorker(workerIndex).then(cb);
249+
result.then(resolve).catch(reject);
250+
})
237251
)
238252
);
239253
resolve(
240254
new Promise((resolve, reject) => {
241-
// Clear pre-existing timeouts if we're adding a new one before cleanup could happen
242-
this.clearQueueTimeout(workerIndex);
243-
if (timeout) {
244-
// eslint-disable-next-line functional/immutable-data
245-
this.timeouts[workerIndex] = setTimeout(() => {
246-
this.terminateWorker(workerIndex, "timeout").finally(() => reject());
247-
}, timeout);
248-
}
249255
task
250-
.then(worker => {
256+
.then(result => {
251257
// Clear out any timeouts after the task completes
252258
this.clearQueueTimeout(workerIndex);
253-
resolve(worker);
259+
resolve(result);
254260
})
255261
.catch(error => {
256262
this.terminateWorker(workerIndex, "error").finally(() => reject(error));

0 commit comments

Comments
 (0)