Source code for terracatalogueclient.client

import requests
import requests.adapters
import datetime as dt
import os
import boto3
import botocore.session
import botocore.handlers
from urllib.parse import urljoin, urlparse, parse_qs
from shapely.geometry import shape
from shapely.geometry.base import BaseGeometry
import shapely.wkt as wkt
import humanfriendly
import enum
from typing import (
    Iterator,
    List,
    Optional,
    Union,
    Dict,
    Iterable,
    Tuple,
    Callable,
    TypeVar,
)
import logging

from terracatalogueclient import auth, __title__, __version__
from terracatalogueclient.config import CatalogueConfig
from terracatalogueclient.exceptions import (
    TooManyResultsException,
    ProductDownloadException,
    ParameterParserException,
    SearchException,
)

T = TypeVar("T")

_DEFAULT_REQUEST_HEADERS = {"User-Agent": f"{__title__}/{__version__}"}
_SEARCH_TIMEOUT = 60
_DOWNLOAD_TIMEOUT = 300

logger = logging.getLogger(__name__)


[docs] class Collection: """ Collection returned from a catalogue search. :ivar id: collection identifier :vartype id: str :ivar geojson: GeoJSON representation of the collection :vartype geojson: dict :ivar geometry: collection geometry as a Shapely geometry :vartype geometry: BaseGeometry :ivar bbox: bounding box :vartype bbox: List[float] :ivar properties: collection properties :vartype properties: dict """ def __init__( self, id: str, geojson: dict, geometry: BaseGeometry, bbox: List[float], properties: dict, ): self.id = id self.geojson = geojson self.geometry = geometry self.bbox = bbox self.properties = properties def __str__(self): return self.id
[docs] class ProductFileType(enum.Flag): """ Enum flag to indicate the type of a product file. """ DATA = enum.auto() #: Data files RELATED = enum.auto() #: Related files (eg. cloud mask) PREVIEWS = enum.auto() #: Previews ALTERNATES = enum.auto() #: Metadata description in an alternative format ALL = DATA | RELATED | PREVIEWS | ALTERNATES #: Matches all types of files
[docs] class ProductFile: """ File that belongs to a product. :ivar href: URI locator of the product file :vartype href: str :ivar length: content length in bytes :vartype length: Optional[int] :ivar title: title of the product file :vartype title: Optional[str] :ivar type: content type :vartype type: Optional[str] :ivar category: category, only applicable for previews or related files :vartype category: Optional[str] """ def __init__( self, href: str, length: Optional[int], title: Optional[str] = None, type: Optional[str] = None, category: Optional[str] = None, ): self.href = href self.length = length self.title = title self.type = type self.category = category def __str__(self): return self.href def get_protocol(self): return self.href[: self.href.find("://")].lower()
[docs] class Product: """ Product entry returned from a catalogue search. :ivar id: product identifier :vartype id: str :ivar title: product title :vartype title: str :ivar geojson: GeoJSON representation of the product :vartype geojson: dict :ivar geometry: product geometry as a Shapely geometry :vartype geometry: BaseGeometry :ivar bbox: bounding box :vartype bbox: List[float] :ivar beginningDateTime: acquisition start date time :vartype beginningDateTime: dt.datetime :ivar endingDateTime: acquisition end date time :vartype endingDateTime: dt.datetime :ivar properties: product properties :vartype properties: dict :ivar data: product data files :vartype data: List[ProductFile] :ivar related: related resources (eg. cloud mask) :vartype related: List[ProductFile] :ivar previews: previews or quicklooks of the product :vartype previews: List[ProductFile] :ivar alternates: metadata description in an alternative format :vartype alternates: List[ProductFile] """ def __init__( self, id: str, title: str, geojson: dict, geometry: BaseGeometry, bbox: List[float], beginningDateTime: Optional[dt.datetime], endingDateTime: Optional[dt.datetime], properties: dict, data: List[ProductFile], related: List[ProductFile], previews: List[ProductFile], alternates: List[ProductFile], ): self.id = id self.title = title self.geojson = geojson self.geometry = geometry self.bbox = bbox self.beginningDateTime = beginningDateTime self.endingDateTime = endingDateTime self.properties = properties # product file references self.data = data self.related = related self.previews = previews self.alternates = alternates def __str__(self): return self.id
[docs] class Catalogue: """Connection to a catalogue endpoint, which allows for searching and downloading EO products.""" def __init__(self, config: CatalogueConfig = None): """ :param config: catalogue configuration. If none is supplied, the default Terrascope config is used. """ self.config = config if config else CatalogueConfig.get_default_config() self._auth = None self.s3 = None adapter = requests.adapters.HTTPAdapter( max_retries=requests.adapters.Retry( total=5, backoff_factor=2, status_forcelist=[429, 500, 502, 503, 504] ) ) self._session_search = requests.Session() self._session_search.headers.update(_DEFAULT_REQUEST_HEADERS) self._session_search.headers.update( {"Accept": "application/json, application/geo+json"} ) self._session_search.mount("http://", adapter) self._session_search.mount("https://", adapter) self._session_download = requests.Session() self._session_download.headers.update(_DEFAULT_REQUEST_HEADERS) self._session_download.headers.update({"Accept": "application/json"}) self._session_download.mount("http://", adapter) self._session_download.mount("https://", adapter)
[docs] def authenticate(self) -> "Catalogue": """ Authenticate to the catalogue in an interactive way. A browser window will open to handle the sign-in procedure. :return: the catalogue object """ if not self.config.oidc_interactive_supported: raise ProductDownloadException( "Interactive authentication is not supported for this catalogue endpoint" ) self._auth = auth.authorization_code_grant( authorization_url=self.config.oidc_authorization_endpoint, token_url=self.config.oidc_token_endpoint, client_id=self.config.oidc_client_id, ) self._session_search.auth = self._auth self._session_download.auth = self._auth return self
[docs] def authenticate_non_interactive(self, username: str, password: str) -> "Catalogue": """ Authenticate to the catalogue in a non-interactive way. This requires you to pass your user credentials directly in the code. :param username: username :param password: password :return: the catalogue object """ if not self.config.oidc_non_interactive_supported: raise ProductDownloadException( "Non interactive authentication is not supported for this catalogue endpoint" ) self._auth = auth.resource_owner_password_credentials_grant( username=username, password=password, client_id=self.config.oidc_client_id, client_secret=self.config.oidc_client_secret, token_url=self.config.oidc_token_endpoint, ) self._session_search.auth = self._auth self._session_download.auth = self._auth return self
[docs] def get_collections( self, start: Optional[Union[str, dt.date, dt.datetime]] = None, end: Optional[Union[str, dt.date, dt.datetime]] = None, bbox: Optional[ Union[str, List[Union[int, float]], Dict[str, Union[int, float]]] ] = None, geometry: Optional[Union[str, BaseGeometry]] = None, platform: Optional[str] = None, **kwargs, ) -> Iterator[Collection]: """ Get the collections in the catalogue. :param start: start of the temporal interval to search :param end: end of the temporal interval to search :param bbox: geographic bounding box as list or dict (west, south, east, north) :param geometry: geometry as WKT string or Shapely geometry :param platform: acquisition platform :param \**kwargs: additional query parameters can be provided as keyword arguments """ url = urljoin(self.config.catalogue_url, "collections") if start: kwargs["start"] = start if end: kwargs["end"] = end if bbox: kwargs["bbox"] = bbox if geometry: kwargs["geometry"] = geometry if platform: kwargs["platform"] = platform self._convert_parameters(kwargs) return self._get_paginated_feature_generator( url, kwargs, self._build_collection )
[docs] def get_products( self, collection: str, start: Optional[Union[str, dt.date, dt.datetime]] = None, end: Optional[Union[str, dt.date, dt.datetime]] = None, bbox: Optional[ Union[str, List[Union[int, float]], Dict[str, Union[int, float]]] ] = None, geometry: Optional[Union[str, BaseGeometry]] = None, title: Optional[str] = None, productType: Optional[str] = None, relativeOrbitNumber: Optional[Union[int, str]] = None, orbitDirection: Optional[str] = None, cloudCover: Optional[ Union[ Tuple[Union[float, int, None], Union[float, int, None]], float, int, str ] ] = None, tileId: Optional[str] = None, productGroupId: Optional[str] = None, publicationDate: Optional[ Union[ Tuple[ Union[dt.date, dt.datetime, str, None], Union[dt.date, dt.datetime, str, None], ], str, ] ] = None, modificationDate: Optional[ Union[ Tuple[ Union[dt.date, dt.datetime, str, None], Union[dt.date, dt.datetime, str, None], ], str, ] ] = None, accessedFrom: Optional[str] = None, limit: Optional[int] = None, **kwargs, ) -> Iterator[Product]: """Get the products matching the query. :param collection: collection to query :param start: start of the temporal interval to search :param end: end of the temporal interval to search :param bbox: geographic bounding box as list or dict (west, south, east, north) :param geometry: geometry as WKT string or Shapely geometry :param title: title of the product :param productType: product type :param relativeOrbitNumber: relative acquisition orbit number :param orbitDirection: acquisition orbit direction :param cloudCover: maximum cloud cover percentage as int/float; cloud cover percentage interval as tuple; or number, set or interval of cloud cover percentages as a str :param tileId: tile identifier :param productGroupId: string identifying the particular group to which a product belongs :param publicationDate: date of publication, as a date range in a date/datetime tuple (you can use None to have an unbounded interval) or as a str :param modificationDate: date of publication, as a date range in a date/datetime tuple (you can use None to have an unbounded interval) or as a str :param accessedFrom: information on the origin of the request :param limit: limit the number of requested products :param \**kwargs: additional query parameters can be provided as keyword arguments """ url = urljoin(self.config.catalogue_url, "products") kwargs["collection"] = collection if start: kwargs["start"] = start if end: kwargs["end"] = end if bbox: kwargs["bbox"] = bbox if geometry: kwargs["geometry"] = geometry if title: kwargs["title"] = title if productType: kwargs["productType"] = productType if relativeOrbitNumber: kwargs["relativeOrbitNumber"] = relativeOrbitNumber if orbitDirection: kwargs["orbitDirection"] = orbitDirection if cloudCover: kwargs["cloudCover"] = cloudCover if tileId: kwargs["tileId"] = tileId if productGroupId: kwargs["productGroupId"] = productGroupId if publicationDate: kwargs["publicationDate"] = publicationDate if modificationDate: kwargs["modificationDate"] = modificationDate if accessedFrom: kwargs["accessedFrom"] = accessedFrom if limit: kwargs["limit"] = limit self._convert_parameters(kwargs) return self._get_paginated_feature_generator(url, kwargs, self._build_product)
[docs] def get_product_count(self, collection: str, **kwargs): """Get the count of products matching the query. This is significantly more efficient than loading all results and then counting. :param collection: collection to query :param \**kwargs: query parameters, check :meth:`~terracatalogueclient.client.Catalogue.get_products` for more information on query parameters """ url = urljoin(self.config.catalogue_url, "products") kwargs["collection"] = collection kwargs["count"] = 0 self._convert_parameters(kwargs) response = self._session_search.get(url, params=kwargs, timeout=_SEARCH_TIMEOUT) logger.debug(f"{response.request.url} - {response.status_code}") if response.status_code == requests.codes.ok: response_json = response.json() return response_json["totalResults"] else: raise SearchException(response)
def _get_total_file_size( self, products: Iterable[Product], file_types: ProductFileType ) -> int: """Get the total file size of the given products. :param products: iterable of products :param file_types: product file types :return: total file size in bytes """ return sum( [ product_file.length for product in products for product_file in self._get_product_files_matching_file_types( product, file_types ) if product_file.length is not None ] ) def _get_product_files_matching_file_types( self, product: Product, file_types: ProductFileType ) -> List[ProductFile]: """ Get the product files matching the given file types. :param product: product :param file_types: product file types :return: product files matching the given file types """ files = [] if ProductFileType.DATA in file_types: files += product.data if ProductFileType.RELATED in file_types: files += product.related if ProductFileType.PREVIEWS in file_types: files += product.previews if ProductFileType.ALTERNATES in file_types: files += product.alternates return files
[docs] def download_products( self, products: Iterable[Product], path: str, file_types: ProductFileType = ProductFileType.ALL, force: bool = False, raise_on_failure: bool = True, ): """ Download the given products. This will download the files belonging to the given products matching the provided file types. :param products: iterable of products to download :param path: output directory to write files to :param file_types: type of product files to download :param force: skip download confirmation :param raise_on_failure: raise an exception on a failure or silently continue """ products = list(products) if not force: confirmed = False while not confirmed: in_confirmation = input( f"You are about to download {humanfriendly.format_size(self._get_total_file_size(products, file_types))}, do you want to continue? [Y/n] " ) if any(in_confirmation.lower() == s for s in ["y", ""]): confirmed = True elif in_confirmation.lower() == "n": return for product in products: self.download_product(product, path, file_types, raise_on_failure)
[docs] def download_product( self, product: Product, path: str, file_types: ProductFileType = ProductFileType.ALL, raise_on_failure: bool = True, ): """Download a single product. This will download all files belonging to the given product. :param product: product to download :param path: output directory to write files to :param file_types: type of product files to download :param raise_on_failure: raise an exception on a failure or silently continue """ for product_file in self._get_product_files_matching_file_types( product, file_types ): try: self.download_file(product_file, self._get_product_dir(path, product)) except ConnectionError as e: logger.error(e) if raise_on_failure: raise e
[docs] def download_file(self, product_file: ProductFile, path: str): """Download a single product file. :param product_file: product file to download :param path: output directory to write the file to """ protocol = product_file.get_protocol() if protocol.startswith("http"): self._download_file_http(product_file, path) elif protocol == "s3": self._download_file_s3(product_file, path) else: raise ProductDownloadException( f"Could not download product file, {product_file.href} is not a downloadable path." )
def _download_file_http(self, product_file: ProductFile, path: str): """Download a single product file over HTTP. Assumes that the href in the product file contains a valid HTTP address. :param product_file: product file to download :param path: output directory to write the file to """ if not self._is_authorized_to_download_http(product_file): raise ProductDownloadException( "You are not authorized to download this product. Make sure you are authenticated to the catalogue." ) # create output directory if it doesn't exist if not os.path.exists(path): logger.debug(f"Creating output directory {path}") os.makedirs(path) filename = os.path.basename(product_file.href) out_path = os.path.join(path, filename) logger.info(f"Downloading {product_file.href} to {out_path}") with self._session_download.get( product_file.href, stream=True, allow_redirects=False, timeout=_DOWNLOAD_TIMEOUT, ) as r: r.raise_for_status() with open(out_path, "wb") as f: for chunk in r.iter_content( chunk_size=self.config.http_download_chunk_size ): if chunk: f.write(chunk) def _download_file_s3(self, product_file: ProductFile, path: str): """Download a single product over S3. Assumes that the href in the product file contains a valid S3 link. :param product_file: product file to download :param path: output directory to write the file to """ if not os.path.exists(path): logger.debug(f"Creating output directory {path}") os.makedirs(path) filename = os.path.basename(product_file.href) out_path = os.path.join(path, filename) if not self.s3: self._init_s3_client() logger.info(f"Downloading {product_file.href} to {out_path}") _, tmp = product_file.href.split("://", 1) bucket_name, key = tmp.split("/", 1) bucket = self.s3.Bucket(bucket_name) bucket.download_file(key, out_path) def _init_s3_client(self): """Set up the S3 resource service client using the configuration values.""" # disable bucket name validation: https://creodias.eu/-/bucket-sharing-using-s3-bucket-policy logger.info("Initializing S3 client") botocore_session = botocore.session.Session() botocore_session.unregister( "before-parameter-build.s3", botocore.handlers.validate_bucket_name ) boto3.setup_default_session(botocore_session=botocore_session) if not ( self.config.s3_endpoint_url and self.config.s3_access_key and self.config.s3_secret_key ): raise ProductDownloadException( "Please provide S3 endpoint and credentials in the configuration file " "or using environment variables in order to use S3 as a download method." ) self.s3 = boto3.resource( "s3", endpoint_url=self.config.s3_endpoint_url, aws_access_key_id=self.config.s3_access_key, aws_secret_access_key=self.config.s3_secret_key, ) def _is_authorized_to_download_http(self, product_file: ProductFile) -> bool: """Check if the authenticated user has authorization to download the product file. If the user is not authenticated, a redirect will take place. :param product_file: product file """ r = self._session_download.head(product_file.href, timeout=_DOWNLOAD_TIMEOUT) return r.ok @staticmethod def _convert_parameters(params): parameter_time = ["start", "end"] parameter_time_interval = ["publicationDate", "modificationDate"] for p in parameter_time: if p in params: params[p] = _date_to_str(params[p]) for p in parameter_time_interval: if p in params: if type(params[p]) == tuple: params[p] = _tuple_to_interval_str(params[p], p, _date_to_str) if "geometry" in params: p = "geometry" if isinstance(params[p], str): pass elif isinstance(params[p], BaseGeometry): params[p] = wkt.dumps(params[p], trim=True) if "bbox" in params: p = "bbox" if isinstance(params[p], str): pass elif isinstance(params[p], list): params[p] = ",".join(str(i) for i in params[p]) elif isinstance(params[p], dict): params[ p ] = f"{params[p]['west']},{params[p]['south']},{params[p]['east']},{params[p]['north']}" if "cloudCover" in params: p = "cloudCover" if type(params[p]) == tuple: params[p] = _tuple_to_interval_str(params[p], p, str) elif isinstance(params[p], int) or isinstance(params[p], float): params[p] = f"{params[p]}]" return params @staticmethod def _can_get_all_features(response: dict, limit: Optional[int] = None) -> bool: """ Check if all features can be retrieved using pagination. If too many results are found for the query, it may not be possible due to a limitation on the pagination depth. :param response: FeatureCollection as a dict :param limit: limit the number of requested features :return: boolean indicating whether all features can be retrieved """ page_size = response["itemsPerPage"] total_results = response["totalResults"] requested_results = ( total_results if limit is None else min(limit, total_results) ) if ( "last" in response["properties"]["links"] and len(response["properties"]["links"]["last"]) == 1 ): last_href = response["properties"]["links"]["last"][0]["href"] last_start_index = int(parse_qs(urlparse(last_href).query)["startIndex"][0]) return requested_results <= last_start_index + page_size - 1 else: return requested_results <= page_size def _is_authenticated(self): """ Checks if the user is authenticated. :return: true if the user is authenticated. """ return self._auth is not None def _get_paginated_feature_generator( self, url: str, url_params: dict, builder ) -> Iterator: limit = url_params.pop("limit", None) feature_count = 0 response = self._session_search.get( url, params=url_params, timeout=_SEARCH_TIMEOUT ) logger.debug(f"{response.request.url} - {response.status_code}") if response.status_code == requests.codes.ok: response_json = response.json() if not Catalogue._can_get_all_features(response_json, limit): raise TooManyResultsException( f"Too many results: {response_json['totalResults']} found. " f"Please narrow down your search." ) for f in response_json["features"]: feature_count += 1 yield builder(f) if limit is not None and feature_count >= limit: return while "next" in response_json["properties"]["links"]: url = response_json["properties"]["links"]["next"][0]["href"] response = self._session_search.get(url, timeout=_SEARCH_TIMEOUT) logger.debug(f"{response.request.url} - {response.status_code}") if response.status_code == requests.codes.ok: response_json = response.json() for f in response_json["features"]: feature_count += 1 yield builder(f) if limit is not None and feature_count >= limit: return else: response.raise_for_status() else: raise SearchException(response) @staticmethod def _build_collection(feature: dict) -> Collection: """Build collection object from the JSON response. :param feature: feature as a JSON dict """ return Collection( feature["id"], feature, shape(feature["geometry"]), feature["bbox"], feature["properties"], ) @staticmethod def _build_product(feature: dict) -> Product: """Build product object from the JSON response. :param feature: feature as a JSON dict """ id = feature["id"] title = feature["properties"]["title"] geometry = shape(feature["geometry"]) bbox = feature["bbox"] # get first acquisitionParameters block, if available acquisitionParameters = next( iter( [ i["acquisitionParameters"] for i in feature["properties"]["acquisitionInformation"] if "acquisitionParameters" in i ] ), None, ) beginningDateTime = ( _parse_date(acquisitionParameters["beginningDateTime"]) if acquisitionParameters and "beginningDateTime" in acquisitionParameters else None ) endingDateTime = ( _parse_date(acquisitionParameters["endingDateTime"]) if acquisitionParameters and "endingDateTime" in acquisitionParameters else None ) # build product files links = feature["properties"]["links"] data = Catalogue._build_files(links["data"]) if "data" in links else [] related = Catalogue._build_files(links["related"]) if "related" in links else [] previews = ( Catalogue._build_files(links["previews"]) if "previews" in links else [] ) alternates = ( Catalogue._build_files(links["alternates"]) if "alternates" in links else [] ) return Product( id, title, feature, geometry, bbox, beginningDateTime, endingDateTime, feature["properties"], data, related, previews, alternates, ) @staticmethod def _build_files(links: list) -> List[ProductFile]: return [Catalogue._build_file(link) for link in links] @staticmethod def _build_file(link: dict) -> ProductFile: href = link.get("href") length = link.get("length", None) title = link.get("title", None) type = link.get("type", None) category = link.get("category", None) return ProductFile(href, length, title, type, category) @staticmethod def _get_product_dir(path: str, product: Product): try: return os.path.join(path, product.id[product.id.rindex(":") + 1 :]) except ValueError: return os.path.join(path, product.id)
def _parse_date(datestr: str) -> dt.datetime: # remove the milliseconds # eg. 2021-04-16T16:15:14.243Z --> 2021-04-16T16:15:14 datestr = datestr[: datestr.find(".")][: datestr.find("Z")] return dt.datetime.strptime(datestr, "%Y-%m-%dT%H:%M:%S") def _date_to_str(date: Union[str, dt.datetime, dt.date]) -> str: if isinstance(date, str): return date elif isinstance(date, dt.datetime): return dt.datetime.strftime(date, "%Y-%m-%dT%H:%M:%SZ") elif isinstance(date, dt.date): return dt.date.strftime(date, "%Y-%m-%d") def _tuple_to_interval_str( tuple: Tuple[T, T], param_name: str, formatter: Callable[[T], str] ) -> str: if len(tuple) != 2: raise ParameterParserException( f"Failed to parse the value of the '{param_name}' parameter. " f"A tuple of length {len(tuple)} is not supported. " f"To use an interval, the tuple should be of length 2." ) t1, t2 = tuple if t1 is None and t2 is None: # filtering doesn't have any effect, remove it return "" elif t1 is None: # left unbounded return f"{formatter(t2)}]" elif t2 is None: # right unbounded return f"[{formatter(t1)}" else: # both sides bounded return f"[{formatter(t1)},{formatter(t2)}]"