Source code for lidar_prod.tasks.building_identification

import logging
import os
import os.path as osp
from typing import Union

import pdal

from lidar_prod.tasks.utils import get_pdal_writer, get_pipeline

log = logging.getLogger(__name__)


[docs] class BuildingIdentifier: """Logic of building validation. Points that were not found by rule-based algorithms but which have a high-enough probability of being a building are clustered into candidate groups of buildings. High enough probability means p>=min_building_proba """ def __init__( self, min_building_proba: float = 0.5, cluster=None, data_format=None, ): self.cluster = cluster self.data_format = data_format self.min_building_proba = min_building_proba self.pipeline: pdal.pipeline.Pipeline = None
[docs] def run( self, input_values: Union[str, pdal.pipeline.Pipeline], target_las_path: str = None, las_metadata: dict = None, ) -> dict: """Identify potential buildings in a new channel, excluding former candidates as well as already confirmed building (confirmed by either Validation or Completion). Args: input_values (str | pdal.pipeline.Pipeline): path or pipeline to input LAS file with a building probability channel target_las_path (str): output LAS las_metadata (dict): current pipeline metadata, used to propagate input metadata to the application output las (epsg, las version, etc) Returns: updated las_metadata """ # aliases _cid = self.data_format.las_dimensions.cluster_id _completion_flag = self.data_format.las_dimensions.completion_non_candidate_flag log.info("Clustering of points with high building proba.") pipeline, las_metadata = get_pipeline(input_values, self.data_format.epsg, las_metadata) # Considered for identification: non_candidates = f"({self.data_format.las_dimensions.candidate_buildings_flag} == 0)" not_already_confirmed = ( f"({self.data_format.las_dimensions.classification} " + f"!= {self.data_format.codes.building.final.building})" ) not_a_potential_completion = f"({_completion_flag} != 1)" p_heq_threshold = f"(building>={self.min_building_proba})" where = ( f"({non_candidates} && {not_already_confirmed} " + f"&& {not_a_potential_completion} && {p_heq_threshold})" ) pipeline |= pdal.Filter.cluster( min_points=self.cluster.min_points, tolerance=self.cluster.tolerance, is3d=self.cluster.is3d, where=where, ) # Increment ClusterID, so that points from building completion can become cluster 1 pipeline |= pdal.Filter.assign(value=f"{_cid} = {_cid} + 1", where=f"{_cid} != 0") pipeline |= pdal.Filter.assign(value=f"{_cid} = 1", where=f"{_completion_flag} == 1") # Duplicate ClusterID to have an explicit name for it for inspection. # Do not reset it to zero to have access to it at human inspection stage. pipeline |= pdal.Filter.ferry( dimensions=f"{_cid}=>{self.data_format.las_dimensions.ai_building_identified}" ) if target_las_path: pipeline |= get_pdal_writer(target_las_path, las_metadata) os.makedirs(osp.dirname(target_las_path), exist_ok=True) pipeline.execute() self.pipeline = pipeline return las_metadata