Skip to content
Merged
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## [Unreleased](https://github.com/crim-ca/stac-populator) (latest)

<!-- insert list items of new changes here -->
* Add option to automatically update collection extents and summaries based on ingested items.

## [0.9.0](https://github.com/crim-ca/stac-populator/tree/0.9.0) (2025-08-26)

Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ STAC API on a server node.
It can also be used to export data from an existing STAC API or catalog to files on disk. These can then later
be used to populate a STAC API with the `DirectoryLoader` implementation.

It can also be used to update a STAC collection's extents and/or summaries based on the STAC items that already
are part of the collection. It does this by iterating through the items in the collection and updating the
relevant collection properties accordingly.

## Framework

The framework is centered around a Python Abstract Base Class: `STACpopulatorBase` that implements all the logic
Expand Down Expand Up @@ -67,6 +71,9 @@ stac-populator run [implementation] --help

# obtain general help about exporting STAC catalogs to a directory on disk
stac-populator export --help

# obtain general help about updating STAC collections based on their items
stac-populator update-collection --help
```

### CMIP6 extension: extra requirements
Expand Down
36 changes: 36 additions & 0 deletions STACpopulator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
import sys
import warnings
from types import ModuleType
from typing import get_args

import pystac
import requests

from STACpopulator import __version__, implementations
from STACpopulator.collection_update import UpdateModes, UpdateModesOptional, update_api_collection
from STACpopulator.exceptions import STACPopulatorError
from STACpopulator.export import export_catalog
from STACpopulator.log import add_logging_options, setup_logging
Expand Down Expand Up @@ -43,6 +45,38 @@ def add_parser_args(parser: argparse.ArgumentParser) -> None:
for implementation_module_name, module in implementation_modules().items():
implementation_parser = populators_subparser.add_parser(implementation_module_name)
module.add_parser_args(implementation_parser)
implementation_parser.add_argument(
"--update-collection-mode",
dest="update_collection",
choices=get_args(UpdateModesOptional),
default="none",
help="Update collection information based on new items created or updated by this populator. "
"Only applies if --update is also set.",
)
implementation_parser.add_argument(
"--exclude-summary",
nargs="*",
action="extend",
default=[],
help="Exclude these properties when updating collection summaries. ",
)
update_parser = commands_subparser.add_parser(
"update-collection", description="Update collection information based on items in the collection"
)
update_parser.add_argument("stac-collection-uri", help="URI of collection to update from a STAC API instance")
update_parser.add_argument(
"--mode",
choices=get_args(UpdateModes),
default="all",
help="Choose whether to update summaries, extents, or all (both).",
)
update_parser.add_argument(
"--exclude-summary",
nargs="*",
action="extend",
default=[],
help="Exclude these properties when updating collection summaries. ",
)
export_parser = commands_subparser.add_parser("export", description="Export a STAC catalog to JSON files on disk.")
export_parser.add_argument("stac_host", help="STAC API URL")
export_parser.add_argument("directory", type=str, help="Path to a directory to write STAC catalog contents.")
Expand Down Expand Up @@ -81,6 +115,8 @@ def run(ns: argparse.Namespace) -> int:
if ns.stac_version:
pystac.set_stac_version(ns.stac_version)
return implementation_modules()[ns.populator].runner(ns, session) or 0
elif ns.command == "update_collection":
return update_api_collection(ns.mode, ns.stac_collection_uri, ns.exclude_summary) or 0
else:
return export_catalog(ns.directory, ns.stac_host, session, ns.resume, ns.ignore_duplicate_ids) or 0

Expand Down
217 changes: 217 additions & 0 deletions STACpopulator/collection_update.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
import logging
from collections.abc import Iterable
from datetime import datetime
from typing import Literal, TypeAlias

import pystac
import requests
from pystac_client.stac_api_io import StacApiIO

from STACpopulator.api_requests import post_stac_collection

LOGGER = logging.getLogger(__name__)

UpdateModes: TypeAlias = Literal["extents", "summaries", "all"]
UpdateModesOptional = Literal["extents", "summaries", "all", "none"]


def check_wgs84_compliance(bbox: list[int | float], stac_object_type: str, stac_object_id: str | None) -> None:
"""
Display a warning if the bbox does not conform to WGS84.

WGS84 requires longitude values to be between -180 and 180 (inclusive) and latitude values to be between
-90 and 90 (inclusive).
"""
longitude = (bbox[0], bbox[len(bbox) // 2])
latitude = (bbox[1], bbox[(len(bbox) // 2) + 1])
for lon in longitude:
if lon < -180 or lon > 180:
LOGGER.warning(
"STAC %s with id [%s] contains a bbox with a longitude outside of the accepted range of -180 and 180",
stac_object_type,
stac_object_id,
)
for lat in latitude:
if lat < -90 or lat > 90:
LOGGER.warning(
"STAC %s with id [%s] contains a bbox with a latitude outside of the accepted range of -90 and 90",
stac_object_type,
stac_object_id,
)


def sorted_bbox(bbox: list[int | float]) -> list[int | float]:
"""
Return the bbox but sorted so that dimensional values are in sorted order.

For example:

>>> bbox = [1, 3, 5, 2, 0, 4]
>>> sorted_bbox(bbox)
[1, 0, 4, 2, 3, 5]
"""
return [a for b in zip(*(sorted(axis) for axis in zip(bbox[: len(bbox) // 2], bbox[len(bbox) // 2 :]))) for a in b]


def update_collection_bbox(collection: dict, item: dict) -> None:
"""Update the spatial extent values in the collection based on the datetime properties in item."""
item_bbox = item.get("bbox")
if item_bbox is None:
# bbox can be missing if there is no geometry
return
bbox = sorted_bbox(item_bbox)
if item_bbox != bbox:
LOGGER.warning(
"STAC item with id [%s] contains a bbox with unsorted values: %s should be %s",
item.get("id"),
item_bbox,
bbox,
)
item_bbox = bbox
check_wgs84_compliance(item_bbox, "item", item.get("id"))
collection_bboxes = collection["extent"]["spatial"]["bbox"]
if collection_bboxes:
collection_bbox = collection_bboxes[0]
if len(item_bbox) == 4 and len(collection_bbox) == 6:
# collection bbox has a z axis and item bbox does not
item_bbox = [*item_bbox[:2], collection_bbox[2], item_bbox[2:], collection_bbox[5]]
elif len(item_bbox) == 6 and len(collection_bbox) == 4:
# item bbox has a z axis and collection bbox does not
collection_bbox.insert(2, item_bbox[2])
collection_bbox.append(item_bbox[5])
for i in range(len(item_bbox) // 2):
if item_bbox[i] < collection_bbox[i]:
collection_bbox[i] = item_bbox[i]
for i in range(len(item_bbox) // 2, len(item_bbox)):
if item_bbox[i] > collection_bbox[i]:
collection_bbox[i] = item_bbox[i]
elif item_bbox:
collection_bboxes.append(item_bbox)
check_wgs84_compliance(collection_bboxes[0], "collection", collection.get("id"))


def update_collection_interval(collection: dict, item: dict) -> None:
"""Update the temporal extent values in the collection based on the datetime properties in item."""
if (datetime := item["properties"].get("datetime")) is not None:
item_interval = [datetime, datetime]
else:
item_interval = [item["properties"][prop] for prop in ("start_datetime", "end_datetime")]
collection_intervals = collection["extent"]["temporal"]["interval"]
if collection_intervals:
collection_interval = collection_intervals[0]
if collection_interval[0] is not None and item_interval[0] < collection_interval[0]:
collection_interval[0] = item_interval[0]
if collection_interval[1] is not None and item_interval[1] > collection_interval[1]:
collection_interval[1] = item_interval[1]
else:
collection_intervals.append(item_interval)


def update_collection_summaries(collection: dict, item: dict, exclude_summaries: Iterable = ()) -> None:
"""
Update the summaries value in the collection based on the values in item.

This only creates summaries for simple types (strings, numbers, boolean) and does not
create summaries as JSON schema objects.
"""
if "summaries" not in collection:
collection["summaries"] = {}
elif "needs_summaries_update" in collection["summaries"]:
collection["summaries"].pop("needs_summaries_update")
summaries = collection["summaries"]
# the STAC spec does not recommend including summaries that are covered by the extent already
exclude_summaries = tuple(exclude_summaries) + ("datetime", "start_datetime", "end_datetime")
for name, value in item["properties"].items():
summary = summaries.get(name)
if name in exclude_summaries:
continue
elif isinstance(value, bool):
if summary is None:
summaries[name] = [value]
elif value not in summary:
summary.append(value)
elif isinstance(value, str):
try:
time_value = datetime.fromisoformat(value)
except ValueError:
if summary is None:
summaries[name] = [value]
elif isinstance(summary, list):
if value not in summary:
summary.append(value)
else:
if summary is None:
summaries[name] = {"minimum": value, "maximum": value}
elif summary.get("minimum") is not None and summary.get("maximum") is not None:
if time_value < datetime.fromisoformat(summary["minimum"]):
summary["minimum"] = value
elif time_value > datetime.fromisoformat(summary["maximum"]):
summary["maximum"] = value
elif isinstance(value, (int, float)):
if summary is None:
summaries[name] = {"minimum": value, "maximum": value}
elif isinstance(summary, list):
# this property does not necessarily contain all numeric values
if value not in summary:
summary.append(value)
elif summary.get("minimum") is not None and summary.get("maximum") is not None:
if value < summary["minimum"]:
summary["minimum"] = value
elif value > summary["maximum"]:
summary["maximum"] = value


def update_collection(mode: UpdateModes, collection: dict, item: dict, exclude_summaries: Iterable = ()) -> None:
"""
Update various values in the collection based on the values in item.

If mode is "extents", this will update temporal and spatial extents.

If mode is "summaries", this will update summary values based on the item's properties except for the
properties listed in exclude_summaries.

If mode is "all", both extents and summaries will be updated.
"""
if mode in ("extents", "all"):
LOGGER.info(
"Updating collection extents [%s] with data from item [%s]",
collection.get("id"),
item.get("id"),
)
update_collection_bbox(collection, item)
update_collection_interval(collection, item)
if mode in ("summaries", "all"):
LOGGER.info(
"Updating collection summaries [%s] with data from item [%s]",
collection.get("id"),
item.get("id"),
)
update_collection_summaries(collection, item, exclude_summaries)


def update_api_collection(
mode: UpdateModes, collection_uri: str, exclude_summaries: Iterable, session: requests.Session
) -> None:
"""
Update various values in the collection based on all of the items in the collection.

The collection will be updated to the STAC API where the collection exists.

If mode is "extents", this will update temporal and spatial extents.

If mode is "summaries", this will update summary values based on the item's properties except for the
properties listed in exclude_summaries.

If mode is "all", both extents and summaries will be updated.
"""
stac_api_io = StacApiIO()
stac_api_io.session = session
pystac_collection = pystac.Collection.from_file(collection_uri, stac_io=stac_api_io)
# transforming hrefs is unnecessary for this operation and pystac makes additional requests to the API if True
collection_dict = pystac_collection.to_dict(transform_hrefs=False)
LOGGER.info("Updating collection located at '%s' with mode '%s'.", collection_uri, mode)
for item in pystac_collection.get_items(recursive=True):
LOGGER.info("Updating collection (id='%s') with values from item (id='%s')")
update_collection(mode, collection_dict, item.to_dict(transform_hrefs=False), exclude_summaries)
stac_root = pystac_collection.get_root_link()
post_stac_collection(stac_root, collection_dict, update=True, session=session)
25 changes: 22 additions & 3 deletions STACpopulator/implementations/CMIP6_UofT/add_CMIP6.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
import json
import logging
import os
from typing import Any, MutableMapping, Optional, Union
from typing import Any, Iterable, MutableMapping, Optional, Union

from pystac import STACValidationError
from pystac.extensions.datacube import DatacubeExtension
from requests.sessions import Session

from STACpopulator.collection_update import UpdateModesOptional
from STACpopulator.extensions.cmip6 import CMIP6Helper, CMIP6Properties
from STACpopulator.extensions.datacube import DataCubeHelper
from STACpopulator.extensions.thredds import THREDDSExtension, THREDDSHelper
Expand All @@ -31,8 +32,18 @@ def __init__(
update: Optional[bool] = False,
session: Optional[Session] = None,
config_file: Optional[Union[os.PathLike[str], str]] = None,
update_collection: UpdateModesOptional = "none",
exclude_summaries: Iterable[str] = (),
) -> None:
super().__init__(stac_host, data_loader, update=update, session=session, config_file=config_file)
super().__init__(
stac_host,
data_loader,
update=update,
session=session,
config_file=config_file,
update_collection=update_collection,
exclude_summaries=exclude_summaries,
)

def create_stac_item(
self, item_name: str, item_data: MutableMapping[str, Any]
Expand Down Expand Up @@ -109,6 +120,14 @@ def runner(ns: argparse.Namespace, session: Session) -> int:
# To be implemented
data_loader = ErrorLoader()

c = CMIP6populator(ns.stac_host, data_loader, update=ns.update, session=session, config_file=ns.config)
c = CMIP6populator(
ns.stac_host,
data_loader,
update=ns.update,
session=session,
config_file=ns.config,
update_collection=ns.update_collection,
exclude_summaries=ns.exclude_summary,
)
c.ingest()
return 0
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ def runner(ns: argparse.Namespace, session: Session) -> int:
update=ns.update,
session=session,
config_file=ns.config,
update_collection=ns.update_collection,
exclude_summaries=ns.exclude_summary,
)
c.ingest()
return 0
Loading