import osmnx as ox
import pandas as pd
import numpy as np
import geopandas as gpd
import pyvista as pv
from shapely.geometry import Point, LineString, Polygon, MultiPolygon, mapping, MultiLineString
from shapely.ops import linemerge, unary_union
from scipy.sparse import linalg
pd.set_option("display.precision", 3)
import concurrent.futures
from .utilities import scaling_columnDF, polygon_2d_to_3d, downloader
from .angles import get_coord_angle
[docs]def get_buildings_fromFile(path, epsg, case_study_area = None, distance_from_center = 1000, height_field = None, base_field = None,
land_use_field = None):
"""
The function take a building footprint .shp or .gpkg, returns two GDFs of buildings: the case-study area, plus a larger area containing other
buildings, called "obstructions" (for analyses which include adjacent buildings). Otherise, the user can provide a "distance from center"
value; in this case, the buildings_gdf are extracted by selecting buildings within a buffer from the center, with a radius equal to the
distance_from_center value. If none are passed, the buildings_gdf and the obstructions_gdf will be the same.
Additionally, provide the fields containing height, base elevation, and land use information.
Parameters
----------
path: str
Path where the file is stored.
epsg: int
Epsg of the area considered; if None OSMNx is used for the projection.
case_study_area: Polygon
The Polygon to be use for clipping and identifying the case-study area, within the input .shp. If not available, use "distance_from_center".
distance_from_center: float
So to identify the case-study area on the basis of distance from the center of the input .shp.
height_field, base_field: str
Height and base elevation fields name in the original data-source.
land_use_field: str
Field that describes the land use of the buildings.
Returns
-------
buildings_gdf, obstructions_gdf: tuple of GeoDataFrames
The buildings and the obstructions GeoDataFrames.
"""
# trying reading buildings footprints shapefile from directory
crs = 'EPSG:'+str(epsg)
obstructions_gdf = gpd.read_file(path).to_crs(crs)
# computing area, reassigning columns
obstructions_gdf["area"] = obstructions_gdf["geometry"].area
if height_field is not None:
obstructions_gdf["height"] = obstructions_gdf[height_field]
if base_field is None:
obstructions_gdf["base"] = 0.0
else:
obstructions_gdf["base"] = obstructions_gdf[base_field]
if land_use_field is not None:
obstructions_gdf["land_use_raw"] = obstructions_gdf[land_use_field]
else:
obstructions_gdf["land_use_raw"] = None
# dropping small buildings and buildings with null height
obstructions_gdf = obstructions_gdf[(obstructions_gdf["area"] >= 50) & (obstructions_gdf["height"] >= 1)]
obstructions_gdf = obstructions_gdf[["height", "base","geometry", "area", "land_use_raw"]]
# assigning ID
obstructions_gdf["buildingID"] = obstructions_gdf.index.values.astype(int)
# if case-study area and distance not defined
if (case_study_area is None) and (distance_from_center is None):
buildings_gdf = obstructions_gdf.copy()
return buildings_gdf, obstructions_gdf
if case_study_area is None:
case_study_area = obstructions_gdf.geometry.unary_union.centroid.buffer(distance_from_center)
buildings_gdf = obstructions_gdf[obstructions_gdf.geometry.within(case_study_area)]
# clipping buildings in the case-study area
return buildings_gdf, obstructions_gdf
[docs]def get_buildings_fromOSM(place, download_method: str, epsg = None, distance = 1000):
"""
The function downloads and cleans building footprint geometries and create a buildings GeoDataFrames for the area of interest.
The function exploits OSMNx functions for downloading the data as well as for projecting it.
The land use classification for each building is extracted. Only relevant columns are kept.
Parameters
----------
place: str, tuple, Shapely Polygon
Name of cities or areas in OSM:
- when using "distance_from_point" please provide a (lat, lon) tuple to create the bounding box around it;
- when using "distance_from_address" provide an existing OSM address;
- when using "OSMplace" provide an OSM place name. The query must be geocodable and OSM must have polygon boundaries for the geocode result.
- when using "polygon" please provide a Shapely Polygon in unprojected latitude-longitude degrees (EPSG:4326) CRS;
download_method: str, {"distance_from_address", "distance_from_point", "OSMplace", "polygon"}
It indicates the method that should be used for downloading the data.
epsg: int
Epsg of the area considered; if None OSMNx is used for the projection.
distance: float
Used when download_method == "distance from address" or == "distance from point".
Returns
-------
buildings_gdf: Polygon GeoDataFrame
The buildings GeoDataFrame.
"""
columns_to_keep = ['amenity', 'building', 'geometry', 'historic', 'land_use_raw']
tags = {"building": True}
buildings_gdf = downloader(place = place, download_method = download_method, tags = tags, distance = distance)
if epsg is None:
buildings_gdf = ox.projection.project_gdf(buildings_gdf)
else:
crs = 'EPSG:'+str(epsg)
buildings_gdf = buildings_gdf.to_crs(crs)
buildings_gdf['land_use_raw'] = None
buildings_gdf['land_use_raw'] = buildings_gdf.filter(regex='^building:use:').apply(lambda x: x.name[13:] if x.notnull().any() else None)
buildings_gdf.drop(columns=[col for col in buildings_gdf.columns if col not in columns_to_keep], inplace=True)
# remove the empty geometries
buildings_gdf = buildings_gdf[~buildings_gdf['geometry'].is_empty]
# replace 'yes' with NaN in 'building' column
buildings_gdf['building'] = buildings_gdf['building'].replace('yes', np.nan)
# fill missing values in 'building' column with 'amenity' values
buildings_gdf['building'] = buildings_gdf['building'].fillna(value=buildings_gdf['amenity'])
# fill missing values in 'land_use_raw' column with 'building' values
# Create a mask for rows where the 'building' column is NA
mask = buildings_gdf['building'].isna()
# Use the mask to assign values from 'amenity' to 'building' only where 'building' is NA
buildings_gdf.loc[mask, 'building'] = buildings_gdf.loc[mask, 'amenity']
buildings_gdf['land_use_raw'] = buildings_gdf['land_use_raw'].fillna(value=buildings_gdf['building'])
# fill remaining missing values in 'land_use_raw' column with 'residential'
buildings_gdf['land_use_raw'] = buildings_gdf['land_use_raw'].fillna(value='residential')
buildings_gdf = buildings_gdf[['geometry', 'historic', 'land_use_raw']]
buildings_gdf['area'] = buildings_gdf.geometry.area
buildings_gdf = buildings_gdf[buildings_gdf['area'] >= 50]
# reset index
buildings_gdf = buildings_gdf.reset_index(drop = True)
buildings_gdf['buildingID'] = buildings_gdf.index.values.astype('int')
return buildings_gdf
[docs]def structural_score(buildings_gdf, obstructions_gdf, edges_gdf, advance_vis_expansion_distance = 300, neighbours_radius = 150):
"""
The function computes the "Structural Landmark Component" sub-scores of each building.
It considers:
- distance from the street network:
- advance 2d visibility polygon;
- number of neighbouring buildings in a given radius.
Parameters
----------
buildings_gdf: Polygon GeoDataFrame
Buildings GeoDataFrame - case study area.
edges_gdf: LineString GeoDataFrame
Street segmetns GeoDataFrame.
obstructions_gdf: Polygon GeoDataFrame
Obstructions GeoDataFrame.
advance_vis_expansion_distance: float
2d advance visibility - it indicates up to which distance from the building boundaries the 2dvisibility polygon can expand.
neighbours_radius: float
Neighbours - research radius for other adjacent buildings.
Returns
-------
buildings_gdf: Polygon GeoDataFrame
The updated buildings GeoDataFrame.
"""
buildings_gdf = buildings_gdf.copy()
obstructions_gdf = buildings_gdf if obstructions_gdf is None else obstructions_gdf
sindex = obstructions_gdf.sindex
street_network = edges_gdf.geometry.unary_union
buildings_gdf["road"] = buildings_gdf.geometry.distance(street_network)
buildings_gdf["2dvis"] = buildings_gdf.geometry.apply(lambda row: visibility_polygon2d(row, obstructions_gdf, sindex, max_expansion_distance=
advance_vis_expansion_distance))
buildings_gdf["neigh"] = buildings_gdf.geometry.apply(lambda row: number_neighbours(row, obstructions_gdf, sindex, radius=neighbours_radius))
return buildings_gdf
[docs]def number_neighbours(geometry, obstructions_gdf, obstructions_sindex, radius):
"""
The function counts the number of neighbours, in a GeoDataFrame, around a given geometry, within a
research radius.
Parameters
----------
geometry: Shapely Geometry
The geometry for which neighbors are counted.
obstructions_gdf: GeoDataFrame
The GeoDataFrame containing the obstructions.
obstructions_sindex: Spatial Index
The spatial index of the obstructions GeoDataFrame.
radius: float
The research radius for neighboring buildings.
Returns
-------
int
The number of neighbors.
"""
buffer = geometry.buffer(radius)
possible_neigh_index = list(obstructions_sindex.intersection(buffer.bounds))
possible_neigh = obstructions_gdf.iloc[possible_neigh_index]
precise_neigh = possible_neigh[possible_neigh.intersects(buffer)]
return len(precise_neigh)
[docs]def visibility_polygon2d(building_geometry, obstructions_gdf, obstructions_sindex, max_expansion_distance = 600):
"""
It creates a 2d polygon of visibility around a polygon geometry (e.g. building footprint) and computes its area.
This can be considered as a measure of 2d advance visibility. Such a polygon is built by constructing lines around the centroid of the building,
breaking them at obstructions and connecting the new formed geometries to get the final polygon.
The "max_expansion_distance" parameter indicates up to which distance from the building boundaries the visibility polygon can expand.
Parameters
----------
building_geometry: Polygon
The building geometry.
obstructions_gdf: Polygon GeoDataFrame
Obstructions GeoDataFrame.
obstructions_sindex: Spatial Index
The spatial index of the obstructions GeoDataFrame.
max_expansion_distance: float
It indicates up to which distance from the building boundaries the 2dvisibility polygon can expand.
Returns
-------
float
The area of visibility.
"""
# creating buffer
distance_along = 10
origin = building_geometry.centroid
building_geometry = building_geometry.convex_hull if building_geometry.geom_type == 'MultiPolygon' else building_geometry
max_expansion_distance += origin.distance(building_geometry.envelope.exterior)
angles = np.arange(0, 360, distance_along)
coords = np.array([get_coord_angle([origin.x, origin.y], distance=max_expansion_distance, angle=i) for i in angles])
lines = [LineString([origin, Point(x)]) for x in coords]
obstacles = obstructions_gdf[obstructions_gdf.crosses(unary_union(lines))]
obstacles = obstacles[obstacles.geometry != building_geometry]
obstacles = obstacles[~obstacles.geometry.within(building_geometry.convex_hull)]
# creating lines all around the building till a defined distance
if len(obstacles) > 0:
ob = unary_union(obstacles.geometry)
intersections = [line.intersection(ob) for line in lines]
clipped_lines = [LineString([origin, Point(intersection.geoms[0].coords[0])])
if ((type(intersection) == MultiLineString) & (not intersection.is_empty))
else LineString([origin, Point(intersection.coords[0])])
if ((type(intersection) == LineString) & (not intersection.is_empty))
else LineString([origin, Point(intersection[0].coords[0])])
if ((type(intersection) == Point) & (not intersection.is_empty))
else line for intersection, line in zip(intersections, lines)]
# the line are not interrupted, keeping the original ones
else:
clipped_lines = lines
# creating a polygon of visibility based on the lines and their progression, taking into account the origin Point too
poly = Polygon([[p.x, p.y] for p in [origin] + [Point(line.coords[1]) for line in clipped_lines ] + [origin]])
poly_vis = poly.difference(building_geometry)
if poly_vis.is_empty:
poly_vis = poly.buffer(0).difference(building_geometry)
return poly_vis.area
[docs]def compute_3d_sight_lines(nodes_gdf, buildings_gdf, distance_along = 200, distance_min_observer_target = 300):
"""
Computes the 3D sight lines between observer points in a Point GeoDataFrame and target buildings, based on a given distance along the buildings' exterior and a
minimum distance between observer and target (e.g. lines will not be constructed for observer points and targets whose distance is lower
than "distance_min_observer_target")
Parameters
----------
nodes_gdf: Point GeoDataFrame
A GeoDataFrame containing the observer nodes, with at least a Point geometry column.
buildings_gdf: Polygon GeoDataFrame
A GeoDataFrame containing the target buildings, with at least a Polygon geometry column.
distance_along: float
The distance along the exterior of the buildings for selecting target points.
distance_min_observer_target: float
The minimum distance between observer and target.
Returns:
----------
sight_lines: LineString GeoDataFrame
The visibile sight lines GeoDataFrame.
"""
nodes_gdf = nodes_gdf.copy()
buildings_gdf = buildings_gdf.copy()
# add a 'base' column to the buildings GeoDataFrame with a default value of 0.0, if not provided
if 'base' not in buildings_gdf.columns:
buildings_gdf["base"] = 0.0
def add_height(exterior, base, height):
# create a new list of Point objects with the z-coordinate set to (height - base)
coords = exterior.coords
new_coords = [Point(x, y, height-base) for x, y in coords]
# return a LineString constructed from the new Point objects
return LineString(new_coords)
# create a Series with the height-extended LineString objects
building_exteriors_roof = buildings_gdf.apply(lambda row: add_height(row.geometry.exterior, row.base, row.height), axis=1)
# create a list with the number of intervals along the exterior LineString to divide the line into
num_intervals = [max(1, int(exterior.length / distance_along)) for exterior in building_exteriors_roof]
# create a new column with the list of targets along the exterior LineString
buildings_gdf['target'] = pd.Series([([(exterior.interpolate(min(exterior.length/2, distance_along)*i)) for i in range(x)])
for exterior, x in zip(building_exteriors_roof, num_intervals)], index=buildings_gdf.index)
# create a new column in the nodes GeoDataFrame with the Point objects representing the observer positions
nodes_gdf['observer'] = nodes_gdf.apply(lambda row: Point(row.geometry.x, row.geometry.y, row.z), axis=1)
# create a temporary dataframe with building targets
tmp_buildings = buildings_gdf.explode('target')
tmp_nodes = nodes_gdf[['nodeID', 'observer']].copy()
tmp_buildings = tmp_buildings[['buildingID', 'target']].copy()
# obtain the sight_lines dataframe by means of a cartesian product between GeoDataFrames
sight_lines = pd.merge(tmp_nodes.assign(key=1), tmp_buildings.assign(key=1), on='key').drop('key', axis=1)
# calculate the distance between observer and target and filter
sight_lines['distance']= [p1.distance(p2) for p1, p2 in zip(sight_lines.observer, sight_lines.target)]
sight_lines = sight_lines[sight_lines['distance'] >= distance_min_observer_target]
# add geometry column with LineString connecting observer and target
sight_lines['geometry'] = [LineString([p1, p2]) for p1, p2 in zip(sight_lines.observer, sight_lines.target)]
sight_lines.reset_index(drop = True, inplace = True)
# create pyvista 3d objects for buildings
buildings_gdf['building_3d'] = [polygon_2d_to_3d(geo, base, height) for geo,base, height in zip(buildings_gdf['geometry'], buildings_gdf['base'], buildings_gdf['height'])]
# extract tuples for starting and target points of the sight lines
sight_lines['start'] = [[observer.x, observer.y, observer.z] for observer in sight_lines.observer]
sight_lines['stop'] =[[target.x, target.y, target.z] for target in sight_lines.target]
buildings_sindex = buildings_gdf.sindex
# check intervisibility and filter out sight_lines that are obstructed
sight_lines['visible'] = sight_lines.apply(lambda row: intervisibility(row.geometry, row.buildingID, row.start, row.stop, buildings_gdf, buildings_sindex), axis =1)
sight_lines = sight_lines[sight_lines['visible'] == True]
sight_lines.drop(['start', 'stop', 'observer', 'target', 'visible'], axis = 1, inplace = True)
sight_lines = sight_lines.set_geometry('geometry').set_crs(buildings_gdf.crs)
return sight_lines
[docs]def intervisibility(line2d, buildingID, start, stop, buildings_gdf, buildings_sindex):
"""
Check if a 3d line of sight between two points is obstructed by any buildings.
Parameters
----------
line2d: Shapely LineString
The 2D line of sight to check for obstruction.
buildingID: int
The buildingID of the building that the line of sight points at.
start: tuple
The starting point of the line of sight in the form of (x, y, z).
stop: tuple
The ending point of the line of sight in the form of (x, y, z).
Returns:
----------
bool: True
When the line of sight is not obstructed, False otherwise.
"""
# first just check for 2d intersections and considers the ones that have a 2d intersection.
# if there is no 2d intersection, it is not necessary to check for the 3d one.
possible_matches_index = list(buildings_sindex.intersection(line2d.buffer(5).bounds)) # looking for possible candidates in the external GDF
possible_matches = buildings_gdf.iloc[possible_matches_index]
matches = possible_matches[possible_matches.intersects(line2d)]
matches = matches[matches.buildingID != buildingID]
if len(matches) == 0:
return True
# if there are 2d intersections, check for 3d ones
visible = True
for _, row_building in matches.iterrows():
building_3d = row_building.building_3d.extract_surface().triangulate()
points, intersections = building_3d.ray_trace(start, stop)
if len(intersections) > 0:
return False
return visible
[docs]def visibility_score(buildings_gdf, sight_lines = pd.DataFrame({'a': []}), method = 'longest'):
"""
The function calculates the sub-scores of the "Visibility Landmark Component".
- 3d visibility;
- facade area;
- (height).
Parameters
----------
buildings_gdf: Polygon GeoDataFrame
Buildings GeoDataFrame - case study area.
sight_lines: LineString GeoDataFrame
The Sight Lines GeoDataFrame.
Returns
-------
buildings_gdf: Polygon GeoDataFrame
The updated buildings GeoDataFrame.
"""
buildings_gdf = buildings_gdf.copy()
buildings_gdf["fac"] = 0.0
if ("height" not in buildings_gdf.columns) | (sight_lines.empty):
return buildings_gdf, sight_lines
sight_lines['nodeID'] = sight_lines['nodeID'].astype(int)
sight_lines['buildingID'] = sight_lines['buildingID'].astype(int)
buildings_gdf["fac"] = buildings_gdf.apply(lambda row: facade_area(row["geometry"], row["height"]), axis = 1)
sight_lines["length"] = sight_lines["geometry"].length
sight_lines = sight_lines.sort_values(['buildingID', 'nodeID', 'length'],ascending=[False, False, False]).drop_duplicates(['buildingID', 'nodeID'], keep='first')
sight_lines.reset_index(inplace = True, drop = True)
stats = sight_lines.groupby('buildingID').agg({'length': ['mean','max', 'count']})
stats.columns = stats.columns.droplevel(0)
stats.rename(columns = {"count": "nr_lines"}, inplace = True)
stats["max"].fillna((stats["max"].min()), inplace = True)
stats["mean"].fillna((stats["mean"].min()), inplace = True)
stats["nr_lines"].fillna((stats["nr_lines"].min()), inplace = True)
stats.reset_index(inplace = True)
columns = ["max", "mean", "nr_lines"]
for column in columns:
stats[column+"_sc"] = scaling_columnDF(stats[column])
if method == 'longest':
stats["3dvis"] = stats["max_sc"]
elif method == 'combined':
stats["3dvis"] = stats["max_sc"]*0.5+stats["mean_sc"]*0.25+stats["nr_lines_sc"]*0.25
buildings_gdf = pd.merge(buildings_gdf, stats[["buildingID", "3dvis"]], on = "buildingID", how = "left")
buildings_gdf['3dvis'] = buildings_gdf['3dvis'].where(pd.notnull(buildings_gdf['3dvis']), 0.0)
return buildings_gdf, sight_lines
[docs]def facade_area(building_geometry, building_height):
"""
Compute the approximate facade area of a building given its geometry and height.
Parameters
----------
building_geometry: Polygon
The geometry of the building.
building_height: float
The height of the building.
Returns
-------
float
The computed approximate facade area of the building.
"""
envelope = building_geometry.envelope
coords = mapping(envelope)["coordinates"][0]
d = [(Point(coords[0])).distance(Point(coords[1])), (Point(coords[1])).distance(Point(coords[2]))]
width = min(d)
return width*building_height
[docs]def get_historical_buildings_fromOSM(place, download_method, epsg = None, distance = 1000):
"""
The function downloads and cleans building footprint geometries and create a buildings GeoDataFrames for the area of interest.
However, it only keeps the buildings that are considered historical buildings or heritage buildings in OSM.
Parameters
----------
place: str, tuple, Shapely Polygon
Name of cities or areas in OSM:
- when using "distance_from_point" please provide a (lat, lon) tuple to create the bounding box around it;
- when using "distance_from_address" provide an existing OSM address;
- when using "OSMplace" provide an OSM place name. The query must be geocodable and OSM must have polygon boundaries for the geocode result.
- when using "polygon" please provide a Shapely Polygon in unprojected latitude-longitude degrees (EPSG:4326) CRS;
download_method: str, {"distance_from_address", "distance_from_point", "OSMplace", "polygon"}
It indicates the method that should be used for downloading the data.
epsg: int
Epsg of the area considered; if None OSMNx is used for the projection.
distance: float
Used when download_method == "distance from address" or == "distance from point".
Returns
-------
historic_buildings: Polygon GeoDataFrame
The historic buildings GeoDataFrame.
"""
columns = ['geometry', 'historic']
tags = {"building": True}
historic_buildings = downloader(place = place, download_method = download_method, tags = tags, distance = distance)
if 'heritage' in historic_buildings:
columns.append('heritage')
historic_buildings = historic_buildings[columns]
if 'heritage' in historic_buildings:
historic_buildings = historic_buildings[~(historic_buildings.historic.isnull() & historic_buildings.heritage.isnull())]
else:
historic_buildings = historic_buildings[~historic_buildings.historic.isnull()]
if epsg is None:
historic_buildings = ox.projection.project_gdf(historic_buildings)
else:
crs = 'EPSG:'+str(epsg)
historic_buildings = historic_buildings.to_crs(crs)
historic_buildings.loc[historic_buildings["historic"] != 0, "historic"] = 1
historic_buildings = historic_buildings[['geometry', 'historic']]
historic_buildings['area'] = historic_buildings.geometry.area
return historic_buildings
[docs]def cultural_score(buildings_gdf, historical_elements_gdf = pd.DataFrame({'a': []}), historical_score = None, from_OSM = False):
"""
The function computes the "Cultural Landmark Component" based on the number of features listed in historical/cultural landmarks datasets. It can be
obtained either on the basis of a score given by the data-provider or on the number of features intersecting the buildings object
of analysis.
"historical_score" indicates the attribute field containing scores assigned to historical buildings, if existing.
Alternatively, if the column has been already assigned to the buildings_gdf, one can use OSM historic categorisation (binbary).
Parameters
----------
buildings_gdf: Polygon GeoDataFrame
Buildings GeoDataFrame - case study area.
historical_elements_gdf: Point or Polygon GeoDataFrame
The GeoDataFrame containing information about listed historical buildings or elements.
historical_score: str
The name of the column in the historical_elements_gdf that provides information on the classification of the historical listings.
from_OSM: boolean
If using the historic field from OSM. This column should be already in the buildings_gdf columns.
Returns
-------
buildings_gdf: Polygon GeoDataFrame
The updated buildings GeoDataFrame.
"""
buildings_gdf = buildings_gdf.copy()
buildings_gdf["cult"] = 0
# using OSM binary value
if (from_OSM) & ("historic" in buildings_gdf.columns):
# Set 'historic' column to 0 where it is currently null
buildings_gdf.loc[buildings_gdf["historic"].isnull(), "historic"] = 0
# Set 'historic' column to 1 where it is not 0
buildings_gdf.loc[buildings_gdf["historic"] != 0, "historic"] = 1
buildings_gdf["cult"] = buildings_gdf["historic"]
return buildings_gdf
def _cultural_score_building(building_geometry, historical_elements_gdf, historical_elements_gdf_sindex, score = None):
"""
Compute the cultural score for a single building based on its intersection with historical elements.
Parameters
----------
building_geometry: Polygon
The geometry of the building.
historical_elements_gdf: GeoDataFrame
The GeoDataFrame containing historical elements.
historical_elements_gdf_sindex: Spatial Index
The spatial index of the historical elements GeoDataFrame.
score: str
The name of the score column in the historical elements GeoDataFrame, by default None.
Returns
-------
float
The computed cultural score for the building.
"""
possible_matches_index = list(historical_elements_gdf_sindex.intersection(building_geometry.bounds)) # looking for possible candidates in the external GDF
possible_matches = historical_elements_gdf.iloc[possible_matches_index]
matches = possible_matches[possible_matches.intersects(building_geometry)]
if (score is None):
cs = len(matches) # score only based on number of intersecting elements
elif len(matches) == 0:
cs = 0
else:
cs = matches[score].sum() # otherwise sum the scores of the intersecting elements
return cs
# spatial index
sindex = historical_elements_gdf.sindex
buildings_gdf["cult"] = buildings_gdf.geometry.apply(lambda row: _cultural_score_building(row, historical_elements_gdf, sindex, score = historical_score))
return buildings_gdf
[docs]def pragmatic_score(buildings_gdf, research_radius = 200):
"""
The function computes the "Pragmatic Landmark Component" based on the frequency, and therefore unexpectedness, of a land_use class in an area around a building.
The area is defined by the parameter "research_radius".
Parameters
----------
buildings_gdf: Polygon GeoDataFrame
Buildings GeoDataFrame - case study area.
research_radius: float
The radius to be used around the given building to identify neighbours.
Returns
-------
buildings_gdf: Polygon GeoDataFrame
The updated buildings GeoDataFrame.
"""
buildings_gdf = buildings_gdf.copy()
buildings_gdf["nr"] = 1 # to count
sindex = buildings_gdf.sindex # spatial index
if 'land_use' not in buildings_gdf.columns:
buildings_gdf['land_use'] = buildings_gdf['land_use_raw']
def _pragmatic_meaning_building(building_geometry, building_land_use, buildings_gdf, buildings_gdf_sindex, radius):
"""
Compute the pragmatic score for a single building based on its proximity to buildings with the same land use.
Parameters
----------
building_geometry: Polygon
The geometry of the building.
building_land_use: str
The land use category of the building.
buildings_gdf: Polygon GeoDataFrame
The buildings GeoDataFrame for the case study area.
buildings_gdf_sindex: Spatial Index
The spatial index of the buildings GeoDataFrame.
radius: float
The radius to consider for proximity calculation.
Returns
-------
float
The computed pragmatic score for the building.
"""
buffer = building_geometry.buffer(radius)
possible_matches_index = list(buildings_gdf_sindex.intersection(buffer.bounds))
possible_matches = buildings_gdf.iloc[possible_matches_index]
matches = possible_matches[possible_matches.intersects(buffer)]
neigh = matches.groupby(["land_use"], as_index = True)["nr"].sum()
Nj = neigh.loc[building_land_use] # nr of neighbours with same land_use
Pj = 1-(Nj/matches["nr"].sum()) # inverting the value # Pj = Nj/N
return Pj
buildings_gdf["prag"] = buildings_gdf.apply(lambda row: _pragmatic_meaning_building(row.geometry, row.land_use, buildings_gdf,
sindex, radius = research_radius), axis = 1)
buildings_gdf.drop('nr', axis = 1, inplace = True)
return buildings_gdf
[docs]def compute_global_scores(buildings_gdf, global_indexes_weights, global_components_weights):
"""
Computes the component and global scores for a buildings GeoDataFrame, rescaling values when necessary and assigning weights to the different properties measured.
Parameters
----------
buildings_gdf: Polygon GeoDataFrame
The input GeoDataFrame containing buildings information.
global_indexes_weights: dict
Dictionary with index names (string) as keys and weights as values.
global_components_weights: dict
Dictionary with component names (string) as keys and weights as values.
Returns
-------
buildings_gdf: Polygon GeoDataFrame
The updated buildings GeoDataFrame with computed scores.
Examples
--------
# Example 1: Computing global scores for a buildings GeoDataFrame
>>> import geopandas as gpd
>>> buildings_gdf = gpd.GeoDataFrame(...)
>>> global_indexes_weights = {"3dvis": 0.50, "fac": 0.30, "height": 0.20, "area": 0.30, "2dvis": 0.30, "neigh": 0.20, "road": 0.20}
>>> global_components_weights = {"vScore": 0.50, "sScore": 0.30, "cScore": 0.20, "pScore": 0.10}
>>> buildings_gdf_scores = compute_global_scores(buildings_gdf, global_indexes_weights, global_components_weights)
>>> print(buildings_gdf_scores)
"""
# scaling
col = ["3dvis", "fac", "height", "area","2dvis", "cult", "prag"]
col_inverse = ["neigh", "road"]
# keeping out visibility analysis if there is no information about buildings' elevation. Reassigning the weights to the other
# components if any was given to the visibility
if ("height" not in buildings_gdf.columns) or (("height" in buildings_gdf.columns) and (buildings_gdf.height.max() == 0.0)):
buildings_gdf[['height','3dvis', 'fac']] = 0.0
if global_components_weights['vScore'] != 0.0:
to_add = global_components_weights['vScore']/3
global_components_weights['sScore'] += to_add
global_components_weights['cScore'] += to_add
global_components_weights['pScore'] += to_add
global_components_weights['vScore'] = 0.0
# rescaling values from 0 to 1
for column in col + col_inverse:
if buildings_gdf[column].max() == 0.0:
buildings_gdf[column+"_sc"] = 0.0
else:
buildings_gdf[column+"_sc"] = scaling_columnDF(buildings_gdf[column], inverse = column in col_inverse)
# computing component scores
vScore_terms = [buildings_gdf["fac_sc"] * global_indexes_weights["fac"],
buildings_gdf["height_sc"] * global_indexes_weights["height"],
buildings_gdf["3dvis_sc"] * global_indexes_weights["3dvis"]]
buildings_gdf["vScore"] = sum(vScore_terms)
sScore_terms = [buildings_gdf["area_sc"] * global_indexes_weights["area"],
buildings_gdf["neigh_sc"] * global_indexes_weights["neigh"],
buildings_gdf["2dvis_sc"] * global_indexes_weights["2dvis"],
buildings_gdf["road_sc"] * global_indexes_weights["road"]]
buildings_gdf["sScore"] = sum(sScore_terms)
# rescaling them
for column in ["vScore", "sScore"]:
if buildings_gdf[column].max() == 0.0:
buildings_gdf[column+"_sc"] = 0.0
else:
buildings_gdf[column+"_sc"] = scaling_columnDF(buildings_gdf[column])
buildings_gdf["cScore"] = buildings_gdf["cult_sc"]
buildings_gdf["pScore"] = buildings_gdf["prag_sc"]
# final global score
buildings_gdf["gScore"] = (buildings_gdf["vScore_sc"]*global_components_weights["vScore"] + buildings_gdf["sScore_sc"]*global_components_weights["sScore"] +
buildings_gdf["cScore"]*global_components_weights["cScore"] + buildings_gdf["pScore"]*global_components_weights["pScore"])
buildings_gdf["gScore_sc"] = scaling_columnDF(buildings_gdf["gScore"])
return buildings_gdf
[docs]def compute_local_scores(buildings_gdf, local_indexes_weights, local_components_weights, rescaling_radius = 1500):
"""
The function computes landmarkness at the local level. The components' weights may be different from the ones used to calculate the
global score. The radius parameter indicates the extent of the area considered to rescale the landmarkness local score.
- local_indexes_weights: keys are index names (string), items are weights.
- local_components_weights: keys are component names (string), items are weights.
Parameters
----------
buildings_gdf: Polygon GeoDataFrame
The input GeoDataFrame containing buildings information.
local_indexes_weights: dict
Dictionary with index names (string) as keys and weights as values.
local_components_weights: dict
Dictionary with component names (string) as keys and weights as values.
Returns
-------
buildings_gdf: Polygon GeoDataFrame
The updated buildings GeoDataFrame.
Examples
--------
>>> # local landmarkness indexes weights, cScore and pScore have only 1 index each
>>> local_indexes_weights = {"3dvis": 0.50, "fac": 0.30, "height": 0.20, "area": 0.40, "2dvis": 0.00, "neigh": 0.30 , "road": 0.30}
>>> # local landmarkness components weights
>>> local_components_weights = {"vScore": 0.25, "sScore": 0.35, "cScore":0.10 , "pScore": 0.30}
"""
buildings_gdf = buildings_gdf.copy()
buildings_gdf.index = buildings_gdf.buildingID
buildings_gdf.index.name = None
sindex = buildings_gdf.sindex # spatial index
buildings_gdf["lScore"] = 0.0
buildings_gdf["vScore_l"], buildings_gdf["sScore_l"] = 0.0, 0.0
def _building_local_score(building_geometry, buildingID, buildings_gdf, buildings_gdf_sindex, local_components_weights, local_indexes_weights, radius):
"""
The function computes landmarkness at the local level for a single building.
Parameters
----------
building_geometry Polygon
The geometry of the building.
buildingID: int
The ID of the building.
buildings_gdf: Polygon GeoDataFrame
The GeoDataFrame containing the buildings.
buildings_gdf_sindex: Spatial Index
The spatial index of the buildings GeoDataFrame.
local_components_weights: dictionary
The weights assigned to local-level components.
local_indexes_weights: dictionary
The weights assigned to local-level indexes.
radius: float
The radius that regulates the area around the building within which the scores are recomputed.
Returns
-------
score : float
The computed local-level landmarkness score for the building.
"""
col = ["3dvis", "fac", "height", "area","2dvis", "cult","prag"]
col_inverse = ["neigh", "road"]
buffer = building_geometry.buffer(radius)
possible_matches_index = list(buildings_gdf_sindex.intersection(buffer.bounds))
possible_matches = buildings_gdf.iloc[possible_matches_index].copy()
matches = possible_matches[possible_matches.intersects(buffer)]
# rescaling the values
for column in col + col_inverse:
if matches[column].max() == 0.0:
matches[column+"_sc"] = 0.0
else:
matches[column+"_sc"] = scaling_columnDF(matches[column], inverse = column in col_inverse)
# recomputing scores
vScore_terms = [matches["fac_sc"] * local_indexes_weights["fac"],
matches["height_sc"] * local_indexes_weights["height"],
matches["3dvis"] * local_indexes_weights["3dvis"]]
matches["vScore_l"] = sum(vScore_terms)
sScore_terms = [matches["area_sc"] * local_indexes_weights["area"],
matches["neigh_sc"] * local_indexes_weights["neigh"],
matches["road_sc"] * local_indexes_weights["road"],
matches["2dvis_sc"] * local_indexes_weights["fac"]]
matches["sScore_l"] = sum(sScore_terms)
matches["cScore_l"] = matches["cult_sc"]
matches["pScore_l"] = matches["prag_sc"]
for column in ["vScore_l", "sScore_l"]:
if matches[column].max() == 0.0:
matches[column+"_sc"] = 0.0
else:
matches[column+"_sc"] = scaling_columnDF(matches[column])
lScore_terms = [matches["vScore_l_sc"]*local_components_weights["vScore"],
matches["sScore_l_sc"]*local_components_weights["sScore"],
matches["cScore_l"]*local_components_weights["cScore"],
matches["pScore_l"]*local_components_weights["pScore"]]
matches["lScore"] = sum(lScore_terms)
local_score = float("{0:.3f}".format(matches.loc[buildingID, "lScore"]))
return local_score
with concurrent.futures.ThreadPoolExecutor() as executor:
future_scores = {executor.submit(_building_local_score, row["geometry"], row["buildingID"], buildings_gdf, sindex, local_components_weights, local_indexes_weights,
rescaling_radius): row["buildingID"] for _, row in buildings_gdf.iterrows()}
for future in concurrent.futures.as_completed(future_scores):
buildingID = future_scores[future]
try:
score = future.result()
buildings_gdf.loc[buildingID, "lScore"] = score
except Exception as exc:
print(f'{buildingID} generated an exception: {exc}')
buildings_gdf["lScore_sc"] = scaling_columnDF(buildings_gdf["lScore"])
return buildings_gdf