Source code for cityImage.utilities

import pandas as pd
import numpy as np
import geopandas as gpd
import math
import pyproj
import osmnx as ox
import networkx as nx

from typing import List
from math import sqrt
from shapely.geometry import LineString, Point, Polygon, MultiPoint, mapping
from shapely.ops import unary_union, transform, nearest_points, split, linemerge
from shapely.affinity import scale
from shapely.geometry.base import BaseGeometry
import pyvista as pv
pd.set_option("display.precision", 3)

class Error(Exception):
    """Base class for other exceptions"""

class downloadError(Error):
    """Raised when a wrong download method is provided""" 
    
[docs]def downloader(place, download_method, tags = None, distance = 500.0, downloading_graph = False, network_type = None): """ The function downloads certain geometries from OSM, by means of OSMNX functions. It returns a GeoDataFrame, that could be empty when no geometries are found, with the provided tags. Parameters ---------- place: str, tuple, Shapely Polygon Name of cities or areas in OSM: - when using "distance_from_point" please provide a (lat, lon) tuple to create the bounding box around it; - when using "distance_from_address" provide an existing OSM address; - when using "OSMplace" provide an OSM place name. The query must be geocodable and OSM must have polygon boundaries for the geocode result. - when using "polygon" please provide a Shapely Polygon in unprojected latitude-longitude degrees (EPSG:4326) CRS; download_method: str, {"distance_from_address", "distance_from_point", "OSMplace", "polygon"} It indicates the method that should be used for downloading the data. tag: dict The desired OSMN tags. distance: float Used when download_method == "distance from address" or == "distance from point". downloading_graph: bool Download a graph, instead of, as by default, GeoDataFrame. network_type: str {"walk", "bike", "drive", "drive_service", "all", "all_private", "none"} It indicates type of street or other network to extract - from OSMNx paramaters. Returns ------- G, geometries_gdf: NetworkX Graph, GeoDataFrame The resulting Graph (when downloading_graph) or a GeoDataFrame. """ download_options = {"distance_from_address", "distance_from_point", "OSMplace", "polygon"} if download_method not in download_options: raise downloadError('Provide a download method amongst {}'.format(download_options)) download_method_dict = { 'distance_from_address': ox.features_from_address, 'distance_from_point': ox.features_from_point, 'OSMplace': ox.features_from_place, 'polygon': ox.features_from_polygon } if downloading_graph: download_method_dict = { 'distance_from_address': ox.graph_from_address, 'distance_from_point': ox.graph_from_point, 'OSMplace': ox.graph_from_place, 'polygon': ox.graph_from_polygon } download_func = download_method_dict.get(download_method) # using OSMNx to download data from OpenStreetMap try: if download_func: if download_method in ['distance_from_address', 'distance_from_point']: if downloading_graph: G = download_func(place, network_type = network_type, dist = distance, simplify = True) else: geometries_gdf = download_func(place, tags = tags, dist = distance) else: if downloading_graph: G = download_func(place, network_type = network_type, simplify = True) else: geometries_gdf = download_func(place, tags = tags) except ox._errors.InsufficientResponseError: # Handle the InsufficientResponseError error by returning an empty GeoDataFrame geometries_gdf = gpd.GeoDataFrame(columns=['id', 'geometry'], geometry='geometry').set_crs('EPSG:4326') if downloading_graph: G=nx.empty_graph() if downloading_graph: return G return geometries_gdf
[docs]def scaling_columnDF(series, inverse = False): """ Rescales the values in a dataframe's column from 0 to 1 or from 1 to 0. Parameters ---------- series: pd.Series The pd.Series to rescale. inverse: bool, optional If True, rescales from 1 to 0 instead of 0 to 1. Default is False. Returns ------- pd.Series The rescaled pd.Series. """ scaled = pd.Series((series-series.min())/(series.max()-series.min())) if inverse: scaled = 1-scaled return scaled
[docs]def dict_to_df(list_dict, list_col): """ It takes a list of dictionaries and creates from them a pandas DataFrame. Each dictionary becomes a Series, where keys are rows, values cell values. The column names are renamed trough a list of strings (list_col). Parameters ---------- list_dict: list of dict The list of dictionaries. list_col: list of string The corresponding column names to assign to the series. Returns: ---------- df: Pandas DataFrame The resulting DataFrame. """ df = pd.DataFrame(list_dict).T df.columns = ["d{}".format(i) for i, col in enumerate(df, 1)] df.columns = list_col return df
[docs]def center_line(line_geometryA, line_geometryB): """ Given two LineStrings, it derives the corresponding center line Parameters ---------- line_geometryA: LineString The first line. line_geometryB: LineString The second line. Returns: ---------- center_line: LineString The resulting center line. """ line_coordsA = list(line_geometryA.coords) line_coordsB = list(line_geometryB.coords) # If not, reverse the coordinates of B if line_coordsA[0] != line_coordsB[-1]: line_coordsB = line_coordsB[::-1] # Remove the middle point of the longer list until both lists have the same length while len(line_coordsA) != len(line_coordsB): if len(line_coordsA) > len(line_coordsB): del line_coordsA[int(len(line_coordsA)/2)] else: del line_coordsB[int(len(line_coordsB)/2)] # Calculate the center line coordinates center_line_coords = [[(a[0] + b[0])/2, (a[1] + b[1])/2] for a, b in zip(line_coordsA, line_coordsB)] # Assign the first and last point of the line A to the first and last point of the center line center_line_coords[0] = line_coordsA[0] center_line_coords[-1] = line_coordsA[-1] # Create a LineString object from the center line coordinates center_line = LineString([coor for coor in center_line_coords]) return center_line
[docs]def min_distance_geometry_gdf(geometry, gdf): """ Given a geometry and a GeoDataFrame, it returns the minimum distance between the geometry and the GeoDataFrame. It provides also the index of the closest geometry in the GeoDataFrame. Parameters ---------- geometry: Point, LineString or Polygon gdf: GeoDataFrame Returns: ---------- distance, index: tuple The closest distance from the geometry, and the index of the closest geometry in the gdf. """ sindex = gdf.sindex closest = sindex.nearest(geometry, return_distance = True) iloc = closest[0][1][0] distance = closest[1][0] index = gdf.iloc[iloc].name return distance, index
[docs]def split_line_at_MultiPoint(line_geometry, intersections, z = 0.0): """ The function checks whether the coordinates of one or more Points in a Point Collections are part of the sequence of coordinates of a LineString. When this has been ascerted or fixed, the LineString line_geometry is split at each of the intersecting points in the collection and a list of lines, each representing one of the sections resulting from the intersections is returned. Parameters ------- line_geometry: LineString The LineString which has to be split. intersections: MultiPoint The intersecting points. z: float The z-coordinate value to assign to each vertex of the resulting lines. Default is 0.0. Returns ------- lines: List of MultiLineString The resulting segments composing the original line_geometry. """ line_geometry_tmp = line_geometry # iterate over each point in the intersections for point in intersections: # create a new list of coordinates from the line_geometry new_line_coords = list(line_geometry_tmp.coords) # iterate over the new line coordinates; skip the first coordinate for n, coord in enumerate(new_line_coords): if n == 0: continue # create a LineString between the previous and current coordinates line_section = LineString([Point(new_line_coords[n-1]), Point(coord)]) # check if the point intersects this line section or if it is very close to it if ((point.intersects(line_section)) | (line_section.distance(point) < 1e-8)): # if so, insert the coordinates of the intersection point into the new_line_coords list new_line_coords.insert(n, point.coords[0]) break # create a new LineString from the updated coordinates line_geometry_tmp = LineString([coor for coor in new_line_coords]) # convert the intersections to a MultiPoint object intersections = MultiPoint(intersections) # split the line_geometry_tmp at each of the intersecting points lines = split(line_geometry_tmp, intersections) # create a list of individual LineString geometries from the split result lines_list = [line for line in lines.geoms] # create LineString objects with z-coordinate values for each vertex if z is not None: lineZ = [LineString([(coords[0], coords[1], z) for coords in line.coords]) for line in lines_list] # return the list of resulting line segments return lines_list
[docs]def merge_line_geometries(line_geometries): """ Given a list of LineString geometries, this function reorders the geometries in the correct sequence based on their starting and ending points, and returns a merged LineString feature. Parameters: ---------- line_geometries: List of LineString A list of LineString geometries to be merged. Returns: ---------- LineString: The merged LineString feature. """ if not all(isinstance(line, LineString) for line in line_geometries): raise ValueError("Input must be a list of LineString geometries") if not all(isinstance(line, BaseGeometry) for line in line_geometries): raise ValueError("Input must be a list of valid geometries") if len(line_geometries) < 2: raise ValueError("At least 2 LineStrings are required to merge") # create a dictionary to store the "from" and "to" vertexes of each LineString as keys lines = {(line.coords[0], line.coords[-1]): line for line in line_geometries} # sort the line geometries based on their starting and ending coordinates line_geometries = sorted(line_geometries, key=lambda line: (line.coords[0], line.coords[-1])) # initialize an empty list to store the merged line geometries merged = [] # rmove and store the first line geometry from the line_geometries list first_line = line_geometries.pop(0) # add the first line geometry to the merged list merged.append(first_line) # iterate over the remaining line geometries for line in line_geometries: # check if the last coordinate of the previously merged line is the same as the first coordinate of the current line if merged[-1].coords[-1] == line.coords[0]: # if so, add the current line to the merged list merged.append(line) # check if the last coordinate of the previously merged line is the same as the last coordinate of the current line elif merged[-1].coords[-1] == line.coords[-1]: # if so, add the reversed version of the current line to the merged list merged.append(LineString(list(reversed(line.coords)))) return LineString(list(merged))
[docs]def envelope_wgs(gdf): """ Given a GeoDataFrame it derives its envelope in the WGS coordinate system. Parameters ---------- gdf: GeoDataFrame Return ---------- envelope_wgs: Polygon The resulting envelope. """ envelope = gdf.unary_union.envelope.buffer(100) # Define a transformer for projecting from the GeoDataFrame's CRS to WGS84 transformer = pyproj.Transformer.from_crs(gdf.crs, 'epsg:4326', always_xy=True) envelope_wgs = transform(transformer.transform, envelope) return envelope_wgs
[docs]def convex_hull_wgs(gdf): """ Given a GeoDataFrame it derives its convex hull in the WGS coordinate system. Parameters ---------- gdf: GeoDataFrame Return ---------- convex_hull_wgs: Polygon The resulting hexagon. """ # Compute the convex hull without buffering convex_hull = gdf.unary_union.convex_hull # Define a transformer for projecting from the GeoDataFrame's CRS to WGS84 transformer = pyproj.Transformer.from_crs(gdf.crs, 'epsg:4326', always_xy=True) # Apply the transformation convex_hull_wgs = transform(transformer.transform, convex_hull) return convex_hull_wgs
[docs]def rescale_ranges(n, range1, range2): """ Given a value n and the range which it belongs to, the function rescale the value, between a different range. Parameters ---------- n: int, float A value. range1: tuple A certain range, e.g. (0, 1) or (10.5, 100). The value n should be within this range. range2: tuple A range, e.g. (0, 1) or (10.5, 100), that should used to rescale the value. Return ---------- value: float The rescaled value. """ delta1 = range1[1] - range1[0] delta2 = range2[1] - range2[0] value = (delta2 * (n - range1[0]) / delta1) + range2[0] return value
[docs]def gdf_from_geometries(geometries, crs): """ The function creates a GeoDataFrame from a list of geometries. Parameters ---------- geometries: list of LineString, Polygon or Points The geometries to be included in the GeoDataFrame. Returns ------- gdf: GeoDataFrame The resulting GeoDataFrame. """ df = pd.DataFrame({'geometry': geometries}) gdf = gpd.GeoDataFrame(df, geometry = df['geometry'], crs = crs) gdf['length'] = gdf['geometry'].length return gdf
[docs]def line_at_centroid(line_geometry, offset): """ Given a LineString, it creates a perpendicular LineString that intersects the given geometry at its centroid. The offset determines the distance from the original line. This fictional line can be used to count precisely the number of trajectories/features intersecting a segment. This function should be executed per row by means of the df.apply(lambda row : ..) function. Parameters ---------- line_geometry: LineString A street segment geometry. offset: float The offset from the geometry. Returns ------- LineString The perpendicular line intersecting the given one. """ left = line_geometry.parallel_offset(offset, 'left') right = line_geometry.parallel_offset(offset, 'right') if left.geom_type == 'MultiLineString': left = merge_disconnected_lines(left) if right.geom_type == 'MultiLineString': right = merge_disconnected_lines(right) if (left.is_empty) & (not right.is_empty): left = line_geometry if (right.is_empty) & (not left.is_empty): right = line_geometry left_centroid = left.interpolate(0.5, normalized = True) right_centroid = right.interpolate(0.5, normalized = True) return LineString([left_centroid, right_centroid])
[docs]def sum_at_centroid(line_geometry, lines_gdf, column): """ Given a LineString geometry, it sums the column values of all the features in a LineString GeoDataFrame intersecting the line. This function should be executed per row by means of the df.apply(lambda row : ..) function. Parameters ---------- line_geometry: LineString A street segment geometry. lines_gdf: LineString GeoDataFrame The GeoDataFrame. column: string The name of the column. Returns ------- int """ return lines_gdf[lines_gdf.geometry.intersects(line_geometry)][column].sum()
[docs]def polygons_gdf_multiparts_to_singleparts(polygons_gdf): """ The function extracts the individual parts of the multi-part geometries and creates a new GeoDataFrame with single part geometries. Parameters ---------- polygons_gdf: GeoDataFrame GeoDataFrame containing building footprint geometries. Returns ------- single_parts_gdf: Polygon GeoDataFrame The new GeoDataFrame with simplified single part building footprint geometries. """ polygons_gdf = polygons_gdf.copy() single_parts = gpd.geoseries.GeoSeries([geom for geom in polygons_gdf.unary_union.geoms]) single_parts_gdf = gpd.GeoDataFrame(geometry=single_parts, crs = polygons_gdf.crs) return single_parts_gdf
[docs]def fix_multiparts_LineString_gdf(gdf): """ Fixes MultiLineString geometries in a GeoDataFrame by merging them into LineString geometries. If there are still MultiLineString geometries present after merging, they are exploded into individual LineString geometries. Parameters ---------- gdf: GeoDataFrame The input GeoDataFrame with potentially problematic MultiLineString geometries. Returns ------- gdf: GeoDataFrame The fixed GeoDataFrame with MultiLineString geometries merged into LineString geometries. """ gdf = gdf.copy() # check if the GeoDataFrame contains MultiLineString geometries if 'MultiLineString' in gdf.geometry.type.unique(): # select the rows where the geometry type is MultiLineString condition = (gdf.geometry.type == 'MultiLineString') # merge MultiLineString geometries into LineString geometries gdf.loc[condition, 'geometry'] = gpd.GeoSeries([linemerge(geo) for geo in gdf[condition].geometry], index=gdf[condition].index) # check again if MultiLineString geometries are still present after merging if 'MultiLineString' in gdf.geometry.type.unique(): # create a separate GeoDataFrame with only the MultiLineString geometries multi_gdf = gdf[gdf.geometry.type == 'MultiLineString'].copy() # remove the MultiLineString geometries from the original GeoDataFrame gdf = gdf[gdf.geometry.type == 'LineString'] # explode the MultiLineString geometries into individual LineString geometries multi_gdf = multi_gdf.explode(ignore_index=True) # append the exploded LineString geometries back to the original GeoDataFrame gdf = gdf.append(multi_gdf, ignore_index=True) return gdf
[docs]def remove_lists_columns(df): """ For each cell in the DataFrame, if the cell contains a list, update it to keep only the first element of the list. Parameters ---------- df: DataFrame The input DataFrame. Returns ------- df: DataFrame A DataFrame with the transformation applied to the relevant cells. """ df = df.copy() for column in df.columns: df[column] = df[column].apply(lambda x: x[0] if isinstance(x, list) and len(x) > 0 else x) return df
[docs]def polygon_2d_to_3d(building_polygon, base, height): """ Convert a 2D polygon to a 3D polygon. This function takes a 2D polygon (building_polygon) and extrudes it into 3D space, creating a pv.PolyData, creating a 3D polygon with a base and a height elevation. Parameters ---------- building_polygon (shapely.geometry.Polygon): 2D polygon to be extruded. base (float): Base height of the 3D polygon. height (float): Height of the 3D polygon. Returns: ---------- pv.PolyData: A 3D polygon. """ def reorient_coords(xy): """ Reorient the coordinates of the polygon This function reorients the coordinates of a polygon if the polygon has counterclockwise orientation. Args ---------- xy: list List of tuples, each representing a coordinate of the polygon Returns ---------- list: Reoriented list of tuples representing the polygon's coordinates """ value = 0 for i in range(len(xy)): x1, y1 = xy[i] x2, y2 = xy[(i+1)%len(xy)] value += (x2-x1)*(y2+y1) if value > 0: return xy else: return xy[::-1] poly_points = building_polygon.exterior.coords # Reorient the coordinates of the polygon xy = reorient_coords(poly_points) # Create 3D coordinates with the base height xyz_base = [(x,y,base) for x,y in xy] # Create faces of the polygon faces = [len(xyz_base), *range(len(xyz_base))] # Create the 3D polygon using pyvista polygon = pv.PolyData(xyz_base, faces=faces) # Extrude the 3D polygon to the specified height return polygon.extrude((0, 0, height - base), capping=True)