Source code for lidar_prod.tasks.building_validation

import logging
import os
import os.path as osp
import shutil
from dataclasses import dataclass
from tempfile import TemporaryDirectory, mkdtemp
from typing import Optional, Union

import geopandas
import numpy as np
import pdal
import yaml
from tqdm import tqdm

from lidar_prod.tasks.utils import (
    get_integer_bbox,
    get_pdal_writer,
    get_pipeline,
    request_bd_uni_for_building_shapefile,
    split_idx_by_dim,
)

log = logging.getLogger(__name__)


[docs] @dataclass class BuildingValidationClusterInfo: """Elements needed to confirm, refute, or be uncertain about a cluster of candidate building points.""" probabilities: np.ndarray overlays: np.ndarray entropies: np.ndarray # target is based on corrected labels - only needed for optimization of decision thresholds target: Optional[int] = None
[docs] class BuildingValidator: """Logic of building validation. The candidate building points identified with a rule-based algorithm are cluster together. The BDUni building vectors are overlayed on the points clouds, and points that fall under a vector are flagged. Then, classification dim is updated on a per-group basis, based on both AI probabilities and BDUni flag. See `README.md` for the detailed process. """ def __init__( self, shp_path: str = None, bd_uni_connection_params=None, cluster=None, bd_uni_request=None, data_format=None, thresholds=None, use_final_classification_codes: bool = True, ): self.shp_path = shp_path self.bd_uni_connection_params = bd_uni_connection_params self.cluster = cluster self.bd_uni_request = bd_uni_request self.use_final_classification_codes = use_final_classification_codes self.thresholds = thresholds # default values self.data_format = data_format # For easier access self.codes = data_format.codes.building self.candidate_buildings_codes = data_format.codes.building.candidates self.pipeline: pdal.pipeline.Pipeline = None self.setup()
[docs] def setup(self): """Setup. Defines useful variables.""" self.detailed_to_final_map: dict = { detailed: final for detailed, final in self.codes.detailed_to_final }
[docs] def run( self, input_values: Union[str, pdal.pipeline.Pipeline], target_las_path: str = None, las_metadata: dict = None, ) -> dict: """Runs application. Transforms cloud at `input_values` following building validation logic, and saves it to `target_las_path` Args: input_values (str| pdal.pipeline.Pipeline): path or pipeline to input LAS file with a building probability channel target_las_path (str): path for saving updated LAS file. las_metadata (dict): current pipeline metadata, used to propagate input metadata to the application output las (epsg, las version, etc) Returns: str: returns `las_metadata`: metadata of the input las, which contain information to pass to the writer in order for the application to have an output with the same header (las version, srs, ...) as the input """ self.pipeline, las_metadata = get_pipeline(input_values, self.data_format.epsg) with TemporaryDirectory() as td: log.info("Preparation : Clustering of candidates buildings & Import vectors") if isinstance(input_values, str): log.info(f"Applying Building Validation to file \n{input_values}") temp_f = osp.join(td, osp.basename(input_values)) else: temp_f = "" las_metadata = self.prepare(input_values, temp_f, las_metadata) log.info("Using AI and Databases to update cloud Classification") las_metadata = self.update(target_las_path=target_las_path, las_metadata=las_metadata) return las_metadata
[docs] def prepare( self, input_values: Union[str, pdal.pipeline.Pipeline], prepared_las_path: str, save_result: bool = False, las_metadata: dict = None, ) -> dict: f""" Prepare las for later decision process. . 1. Cluster candidates points, in a new `{self.data_format.las_dimensions.ClusterID_candidate_building}` dimension where the index of clusters starts at 1 (0 means no cluster). 2. Identify points overlayed by a BD Uni building, in a new `{self.data_format.las_dimensions.uni_db_overlay}` dimension (0/1 flag). In the process is created a new dimensions which identifies candidate buildings (0/1 flag) `{self.data_format.las_dimensions.candidate_buildings_flag}`, to ignore them in later buildings identification. Dimension classification should not be modified here, as optimization step needs to do this step once before testing multiple decision parameters on the same prepared data. Args: input_values (str| pdal.pipeline.Pipeline): path or pipeline to input LAS file with a building probability channel target_las_path (str): path for saving prepared LAS file. save_result (bool): True to save a las instead of propagating a pipeline las_metadata (dict): current pipeline metadata, used to propagate input metadata to the application output las (epsg, las version, etc) Returns: updated las metadata """ dim_candidate_flag = self.data_format.las_dimensions.candidate_buildings_flag dim_cluster_id_pdal = self.data_format.las_dimensions.cluster_id dim_cluster_id_candidates = self.data_format.las_dimensions.ClusterID_candidate_building dim_overlay = self.data_format.las_dimensions.uni_db_overlay self.pipeline, las_metadata = get_pipeline(input_values, self.data_format.epsg) # Identify candidates buildings points with a boolean flag self.pipeline |= pdal.Filter.ferry(dimensions=f"=>{dim_candidate_flag}") _is_candidate_building = ( "(" + " || ".join( f"Classification == {int(candidate_code)}" for candidate_code in self.candidate_buildings_codes ) + ")" ) self.pipeline |= pdal.Filter.assign( value=f"{dim_candidate_flag} = 1 WHERE {_is_candidate_building}" ) # Cluster candidates buildings points. This creates a ClusterID dimension (int) # in which unclustered points have index 0. self.pipeline |= pdal.Filter.cluster( min_points=self.cluster.min_points, tolerance=self.cluster.tolerance, where=f"{dim_candidate_flag} == 1", ) # Copy ClusterID into a new dim and reset it to 0 to avoid conflict with later tasks. self.pipeline |= pdal.Filter.ferry( dimensions=f"{dim_cluster_id_pdal}=>{dim_cluster_id_candidates}" ) self.pipeline |= pdal.Filter.assign(value=f"{dim_cluster_id_pdal} = 0") self.pipeline.execute() bbox = get_integer_bbox(self.pipeline, buffer=self.bd_uni_request.buffer) self.pipeline |= pdal.Filter.ferry(dimensions=f"=>{dim_overlay}") if self.shp_path: # no need for a temporay directory to add the shapefile in it, we already have the # shapefile temp_dirpath = None _shp_p = self.shp_path log.info(f"Read shapefile\n {_shp_p}") gdf = geopandas.read_file(_shp_p) buildings_in_bd_topo = not len(gdf) == 0 # check if there are buildings in the shp else: temp_dirpath = mkdtemp() # TODO: extract coordinates from LAS directly using pdal. # Request BDUni to get a shapefile of the known buildings in the LAS _shp_p = os.path.join(temp_dirpath, "temp.shp") log.info("Request Bd Uni") buildings_in_bd_topo = request_bd_uni_for_building_shapefile( self.bd_uni_connection_params, _shp_p, bbox, self.data_format.epsg ) # Create overlay dim # If there are some buildings in the database, create a BDTopoOverlay boolean # dimension to reflect it. if buildings_in_bd_topo: self.pipeline |= pdal.Filter.overlay( column="PRESENCE", datasource=_shp_p, dimension=dim_overlay ) if save_result: self.pipeline |= get_pdal_writer(prepared_las_path, las_metadata) os.makedirs(osp.dirname(prepared_las_path), exist_ok=True) self.pipeline.execute() if temp_dirpath: shutil.rmtree(temp_dirpath) return las_metadata
[docs] def update( self, src_las_path: str = None, target_las_path: str = None, las_metadata: dict = None ) -> dict: """Updates point cloud classification channel.""" if src_las_path: self.pipeline, las_metadata = get_pipeline(src_las_path, self.data_format.epsg) points = self.pipeline.arrays[0] # 1) Map all points to a single "not_building" class # to be sure that they will all be modified. dim_clf = self.data_format.las_dimensions.classification dim_flag = self.data_format.las_dimensions.candidate_buildings_flag candidates_mask = points[dim_flag] == 1 points[dim_clf][candidates_mask] = self.codes.final.not_building # 2) Decide at the group-level # TODO: check if this can be moved somewhere else. # WARNING: use_final_classification_codes may be modified in an unsafe manner during # optimization. Consider using a setter that will change decision_func alongside. # Decide level of details of classification codes decision_func = self._make_detailed_group_decision if self.use_final_classification_codes: decision_func = self._make_group_decision # Get the index of points of each cluster # Remove unclustered group that have ClusterID = 0 (i.e. the first "group") cluster_id_dim = points[self.data_format.las_dimensions.ClusterID_candidate_building] split_idx = split_idx_by_dim(cluster_id_dim) split_idx = split_idx[1:] # Iterate over groups and update their classification for pts_idx in tqdm(split_idx, desc="Update cluster classification", unit="clusters"): infos = self._extract_cluster_info_by_idx(points, pts_idx) points[dim_clf][pts_idx] = decision_func(infos) self.pipeline = pdal.Pipeline(arrays=[points]) if target_las_path: self.pipeline = get_pdal_writer(target_las_path, las_metadata).pipeline(points) os.makedirs(osp.dirname(target_las_path), exist_ok=True) self.pipeline.execute() return las_metadata
def _extract_cluster_info_by_idx( self, las: np.ndarray, pts_idx: np.ndarray ) -> BuildingValidationClusterInfo: """Extracts all necessary information to make a decision based on points indices. Args: las (np.ndarray): point cloud of interest pts_idx (np.ndarray): indices of points in considered clusters Returns: BuildingValidationClusterInfo: data necessary to make a decision at cluster level. """ pts = las[pts_idx] probabilities = pts[self.data_format.las_dimensions.ai_building_proba] overlays = pts[self.data_format.las_dimensions.uni_db_overlay] entropies = pts[self.data_format.las_dimensions.entropy] targets = pts[self.data_format.las_dimensions.classification] return BuildingValidationClusterInfo(probabilities, overlays, entropies, targets) def _make_group_decision(self, *args, **kwargs) -> int: f"""Wrapper to simplify decision codes during LAS update. Signature follows the one of {self._make_detailed_group_decision.__name__} Returns: int: final classification code for the considered group. """ detailed_code = self._make_detailed_group_decision(*args, **kwargs) return self.detailed_to_final_map[detailed_code] def _make_detailed_group_decision(self, infos: BuildingValidationClusterInfo) -> int: """Decision process at the cluster level. Confirm or refute candidate building groups based on fraction of confirmed/refuted points and on fraction of points overlayed by a building vector in BDUni. See Readme for details of this group-level decision process. Args: infos (BuildngValidationClusterInfo): arrays describing the cluster of candidate builiding points Returns: int: detailed classification code for the considered group. """ # HIGH ENTROPY high_entropy = ( np.mean(infos.entropies >= self.thresholds.min_entropy_uncertainty) >= self.thresholds.min_frac_entropy_uncertain ) # CONFIRMATION - threshold is relaxed under BDUni p_heq_threshold = infos.probabilities >= self.thresholds.min_confidence_confirmation relaxed_threshold = ( self.thresholds.min_confidence_confirmation * self.thresholds.min_frac_confirmation_factor_if_bd_uni_overlay ) p_heq_relaxed_threshold = infos.probabilities >= relaxed_threshold ia_confirmed_flag = np.logical_or( p_heq_threshold, np.logical_and(infos.overlays, p_heq_relaxed_threshold), ) ia_confirmed = np.mean(ia_confirmed_flag) >= self.thresholds.min_frac_confirmation # REFUTATION ia_refuted = ( np.mean((1 - infos.probabilities) >= self.thresholds.min_confidence_refutation) >= self.thresholds.min_frac_refutation ) uni_overlayed = np.mean(infos.overlays) >= self.thresholds.min_uni_db_overlay_frac # If low entropy, we may trust AI to confirm/refute if not high_entropy: if ia_refuted: if uni_overlayed: return self.codes.detailed.ia_refuted_but_under_db_uni return self.codes.detailed.ia_refuted if ia_confirmed: if uni_overlayed: return self.codes.detailed.both_confirmed return self.codes.detailed.ia_confirmed_only # Else, we may still use BDUni information if uni_overlayed: return self.codes.detailed.db_overlayed_only # Else: we are uncertain, and we specify why we can specify if entropy was # involved to conclude to uncertainty. if high_entropy: return self.codes.detailed.unsure_by_entropy return self.codes.detailed.both_unsure
[docs] @dataclass class thresholds: """The decision thresholds for a cluser-level decisions.""" min_confidence_confirmation: float min_frac_confirmation: float min_frac_confirmation_factor_if_bd_uni_overlay: float min_uni_db_overlay_frac: float min_confidence_refutation: float min_frac_refutation: float min_entropy_uncertainty: float min_frac_entropy_uncertain: float
[docs] def dump(self, filename: str): with open(filename, "w") as f: yaml.safe_dump(self.__dict__, f)
[docs] @staticmethod def load(filename: str): with open(filename, "r") as f: data = yaml.safe_load(f) return thresholds(**data)