vprdb.reduction_methods
1# Copyright (c) 2023, Ivan Moskalenko, Anastasiia Kornilova 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14from vprdb.reduction_methods.cube_division import CubeDivision 15from vprdb.reduction_methods.distance_vector import DistanceVector 16from vprdb.reduction_methods.dominating_set import DominatingSet 17from vprdb.reduction_methods.every_nth import EveryNth 18 19__all__ = ["CubeDivision", "DistanceVector", "DominatingSet", "EveryNth"]
21class CubeDivision(ReductionMethod): 22 """ 23 The method divides the cuboid in which the trajectory is located 24 into many small cubes, in each of which only one pose is chosen 25 """ 26 27 def __init__(self, cube_size: float): 28 """ 29 Constructs CubeDivision reduction method 30 :param cube_size: The size of the cubes into which the cuboid will be divided 31 """ 32 self.cube_size = cube_size 33 34 def reduce(self, db: Database) -> Database: 35 traj = np.asarray(db.trajectory) 36 xyz_traj = traj[:, :3, 3] 37 min_over_axes = np.amin(xyz_traj, axis=0) 38 max_over_axes = np.amax(xyz_traj, axis=0) 39 voxel_grid = VoxelGrid(min_over_axes, max_over_axes, self.cube_size) 40 41 # Association of points with cubes 42 cubes = dict() 43 for i, trans_matrix in enumerate(traj): 44 point = trans_matrix[:3, 3] 45 cube_coordinates = voxel_grid.get_voxel_coordinates(point) 46 47 point = np.append(point, i) 48 if cube_coordinates in cubes: 49 cubes[cube_coordinates] = np.append( 50 cubes[cube_coordinates], [point], axis=0 51 ) 52 else: 53 cubes[cube_coordinates] = np.asarray([point]) 54 55 # Finding the closest point to the center in each cube 56 res_indices = [] 57 for (x, y, z), enum_points in cubes.items(): 58 indices = enum_points[:, 3] 59 points = enum_points[:, :3] 60 cube_center = ( 61 x + self.cube_size / 2, 62 y + self.cube_size / 2, 63 z + self.cube_size / 2, 64 ) 65 distances = np.apply_along_axis( 66 lambda p: np.linalg.norm(p - cube_center), axis=1, arr=points 67 ) 68 res_indices.append(int(indices[np.argmin(distances)])) 69 res_indices.sort() 70 71 new_rgb = [db.color_images[i] for i in res_indices] 72 new_point_clouds = [db.point_clouds[i] for i in res_indices] 73 new_traj = [db.trajectory[i] for i in res_indices] 74 return Database(new_rgb, new_point_clouds, new_traj) 75 76 reduce.__doc__ = ReductionMethod.reduce.__doc__
The method divides the cuboid in which the trajectory is located into many small cubes, in each of which only one pose is chosen
27 def __init__(self, cube_size: float): 28 """ 29 Constructs CubeDivision reduction method 30 :param cube_size: The size of the cubes into which the cuboid will be divided 31 """ 32 self.cube_size = cube_size
Constructs CubeDivision reduction method
Parameters
- cube_size: The size of the cubes into which the cuboid will be divided
34 def reduce(self, db: Database) -> Database: 35 traj = np.asarray(db.trajectory) 36 xyz_traj = traj[:, :3, 3] 37 min_over_axes = np.amin(xyz_traj, axis=0) 38 max_over_axes = np.amax(xyz_traj, axis=0) 39 voxel_grid = VoxelGrid(min_over_axes, max_over_axes, self.cube_size) 40 41 # Association of points with cubes 42 cubes = dict() 43 for i, trans_matrix in enumerate(traj): 44 point = trans_matrix[:3, 3] 45 cube_coordinates = voxel_grid.get_voxel_coordinates(point) 46 47 point = np.append(point, i) 48 if cube_coordinates in cubes: 49 cubes[cube_coordinates] = np.append( 50 cubes[cube_coordinates], [point], axis=0 51 ) 52 else: 53 cubes[cube_coordinates] = np.asarray([point]) 54 55 # Finding the closest point to the center in each cube 56 res_indices = [] 57 for (x, y, z), enum_points in cubes.items(): 58 indices = enum_points[:, 3] 59 points = enum_points[:, :3] 60 cube_center = ( 61 x + self.cube_size / 2, 62 y + self.cube_size / 2, 63 z + self.cube_size / 2, 64 ) 65 distances = np.apply_along_axis( 66 lambda p: np.linalg.norm(p - cube_center), axis=1, arr=points 67 ) 68 res_indices.append(int(indices[np.argmin(distances)])) 69 res_indices.sort() 70 71 new_rgb = [db.color_images[i] for i in res_indices] 72 new_point_clouds = [db.point_clouds[i] for i in res_indices] 73 new_traj = [db.trajectory[i] for i in res_indices] 74 return Database(new_rgb, new_point_clouds, new_traj)
Method for reducing the Database
Parameters
- db: Database for reduction
Returns
Reduced database
21class DistanceVector(ReductionMethod): 22 """ 23 The method calculates the distance between neighboring poses. 24 Then using the calculated distances, it calculates the distance from the current pose to the last pose taken. 25 If the distance is greater than the threshold value, the current pose is added to the final database. 26 """ 27 28 def __init__(self, distance_threshold: float): 29 """ 30 Constructs DistanceVector reduction method 31 :param distance_threshold: Threshold value for distance 32 """ 33 self.distance_threshold = distance_threshold 34 35 def reduce(self, db: Database) -> Database: 36 traj = np.asarray(db.trajectory) 37 new_traj = [traj[0]] 38 new_color = [db.color_images[0]] 39 new_point_clouds = [db.point_clouds[0]] 40 first_points = traj[:-1, :3, 3] 41 last_points = traj[1:, :3, 3] 42 distances = np.linalg.norm(last_points - first_points, axis=1) 43 partial_distance = 0 44 for i, cur_distance in enumerate(distances): 45 partial_distance += cur_distance 46 if partial_distance > self.distance_threshold: 47 new_traj.append(traj[i + 1]) 48 new_color.append(db.color_images[i + 1]) 49 new_point_clouds.append(db.point_clouds[i + 1]) 50 partial_distance = 0 51 52 return Database(new_color, new_point_clouds, new_traj) 53 54 reduce.__doc__ = ReductionMethod.reduce.__doc__
The method calculates the distance between neighboring poses. Then using the calculated distances, it calculates the distance from the current pose to the last pose taken. If the distance is greater than the threshold value, the current pose is added to the final database.
28 def __init__(self, distance_threshold: float): 29 """ 30 Constructs DistanceVector reduction method 31 :param distance_threshold: Threshold value for distance 32 """ 33 self.distance_threshold = distance_threshold
Constructs DistanceVector reduction method
Parameters
- distance_threshold: Threshold value for distance
35 def reduce(self, db: Database) -> Database: 36 traj = np.asarray(db.trajectory) 37 new_traj = [traj[0]] 38 new_color = [db.color_images[0]] 39 new_point_clouds = [db.point_clouds[0]] 40 first_points = traj[:-1, :3, 3] 41 last_points = traj[1:, :3, 3] 42 distances = np.linalg.norm(last_points - first_points, axis=1) 43 partial_distance = 0 44 for i, cur_distance in enumerate(distances): 45 partial_distance += cur_distance 46 if partial_distance > self.distance_threshold: 47 new_traj.append(traj[i + 1]) 48 new_color.append(db.color_images[i + 1]) 49 new_point_clouds.append(db.point_clouds[i + 1]) 50 partial_distance = 0 51 52 return Database(new_color, new_point_clouds, new_traj)
Method for reducing the Database
Parameters
- db: Database for reduction
Returns
Reduced database
21class DominatingSet(ReductionMethod): 22 """ 23 Method constructs a graph where the images are vertices that 24 are connected if the IoU is greater than the 25 given threshold. Then it finds a minimal subset of graph vertices so 26 that every vertex is either in the set or is connected to a 27 vertex from it. 28 """ 29 30 def __init__(self, threshold: float = 0.3, voxel_size: float = 0.3): 31 """ 32 Constructs DominatingSet reduction method 33 :param threshold: Threshold value for IoU 34 :param voxel_size: The value indicating which IoU value will be enough 35 to consider the point clouds as overlapping 36 """ 37 self.threshold = threshold 38 self.voxel_size = voxel_size 39 40 def reduce(self, db: Database) -> Database: 41 min_bounds, max_bounds = db.bounds 42 voxel_grid = VoxelGrid(min_bounds, max_bounds, self.voxel_size) 43 # Determine which frames cover a particular voxel 44 voxel_to_frames_dict = dict() 45 # Frame size is the number of voxels it covers 46 frames_sizes = [] 47 for i, pose in enumerate(db.trajectory): 48 pcd = db.point_clouds[i].point_cloud.transform(pose) 49 down_sampled_pcd = voxel_grid.voxel_down_sample(pcd) 50 points = down_sampled_pcd.points 51 frames_sizes.append(len(points)) 52 for point in points: 53 voxel_index = voxel_grid.get_voxel_index(point) 54 if voxel_index in voxel_to_frames_dict: 55 voxel_to_frames_dict[voxel_index].append(i) 56 else: 57 voxel_to_frames_dict[voxel_index] = [i] 58 59 intersections = dict() 60 for covering_frames in voxel_to_frames_dict.values(): 61 for i, frame_1 in enumerate(covering_frames): 62 for frame_2 in covering_frames[i + 1 :]: 63 intersection = tuple(sorted((frame_1, frame_2))) 64 if intersection in intersections: 65 intersections[intersection] += 1 66 else: 67 intersections[intersection] = 1 68 69 IoUs = dict.fromkeys(intersections.keys()) 70 for (frame_1, frame_2), intersection in intersections.items(): 71 IoUs[(frame_1, frame_2)] = intersection / ( 72 frames_sizes[frame_1] + frames_sizes[frame_2] - intersection 73 ) 74 75 G = nx.Graph() 76 G.add_nodes_from(range(len(db))) 77 for (fr1, fr2), IoU in IoUs.items(): 78 if IoU > self.threshold: 79 G.add_edge(fr1, fr2) 80 result_indices = list(nx.dominating_set(G)) 81 result_indices.sort() 82 new_rgb = [db.color_images[i] for i in result_indices] 83 new_point_clouds = [db.point_clouds[i] for i in result_indices] 84 new_traj = [db.trajectory[i] for i in result_indices] 85 return Database(new_rgb, new_point_clouds, new_traj) 86 87 reduce.__doc__ = ReductionMethod.reduce.__doc__
Method constructs a graph where the images are vertices that are connected if the IoU is greater than the given threshold. Then it finds a minimal subset of graph vertices so that every vertex is either in the set or is connected to a vertex from it.
30 def __init__(self, threshold: float = 0.3, voxel_size: float = 0.3): 31 """ 32 Constructs DominatingSet reduction method 33 :param threshold: Threshold value for IoU 34 :param voxel_size: The value indicating which IoU value will be enough 35 to consider the point clouds as overlapping 36 """ 37 self.threshold = threshold 38 self.voxel_size = voxel_size
Constructs DominatingSet reduction method
Parameters
- threshold: Threshold value for IoU
- voxel_size: The value indicating which IoU value will be enough to consider the point clouds as overlapping
40 def reduce(self, db: Database) -> Database: 41 min_bounds, max_bounds = db.bounds 42 voxel_grid = VoxelGrid(min_bounds, max_bounds, self.voxel_size) 43 # Determine which frames cover a particular voxel 44 voxel_to_frames_dict = dict() 45 # Frame size is the number of voxels it covers 46 frames_sizes = [] 47 for i, pose in enumerate(db.trajectory): 48 pcd = db.point_clouds[i].point_cloud.transform(pose) 49 down_sampled_pcd = voxel_grid.voxel_down_sample(pcd) 50 points = down_sampled_pcd.points 51 frames_sizes.append(len(points)) 52 for point in points: 53 voxel_index = voxel_grid.get_voxel_index(point) 54 if voxel_index in voxel_to_frames_dict: 55 voxel_to_frames_dict[voxel_index].append(i) 56 else: 57 voxel_to_frames_dict[voxel_index] = [i] 58 59 intersections = dict() 60 for covering_frames in voxel_to_frames_dict.values(): 61 for i, frame_1 in enumerate(covering_frames): 62 for frame_2 in covering_frames[i + 1 :]: 63 intersection = tuple(sorted((frame_1, frame_2))) 64 if intersection in intersections: 65 intersections[intersection] += 1 66 else: 67 intersections[intersection] = 1 68 69 IoUs = dict.fromkeys(intersections.keys()) 70 for (frame_1, frame_2), intersection in intersections.items(): 71 IoUs[(frame_1, frame_2)] = intersection / ( 72 frames_sizes[frame_1] + frames_sizes[frame_2] - intersection 73 ) 74 75 G = nx.Graph() 76 G.add_nodes_from(range(len(db))) 77 for (fr1, fr2), IoU in IoUs.items(): 78 if IoU > self.threshold: 79 G.add_edge(fr1, fr2) 80 result_indices = list(nx.dominating_set(G)) 81 result_indices.sort() 82 new_rgb = [db.color_images[i] for i in result_indices] 83 new_point_clouds = [db.point_clouds[i] for i in result_indices] 84 new_traj = [db.trajectory[i] for i in result_indices] 85 return Database(new_rgb, new_point_clouds, new_traj)
Method for reducing the Database
Parameters
- db: Database for reduction
Returns
Reduced database
19class EveryNth(ReductionMethod): 20 """The method returns every N-th item from DB""" 21 22 def __init__(self, n: int): 23 """ 24 Constructs EveryNth reduction method 25 :param n: Step of method 26 """ 27 self.n = n 28 29 def reduce(self, db: Database) -> Database: 30 new_traj = db.trajectory[:: self.n] 31 new_color = db.color_images[:: self.n] 32 new_point_clouds = db.point_clouds[:: self.n] 33 return Database(new_color, new_point_clouds, new_traj) 34 35 reduce.__doc__ = ReductionMethod.reduce.__doc__
The method returns every N-th item from DB
22 def __init__(self, n: int): 23 """ 24 Constructs EveryNth reduction method 25 :param n: Step of method 26 """ 27 self.n = n
Constructs EveryNth reduction method
Parameters
- n: Step of method
29 def reduce(self, db: Database) -> Database: 30 new_traj = db.trajectory[:: self.n] 31 new_color = db.color_images[:: self.n] 32 new_point_clouds = db.point_clouds[:: self.n] 33 return Database(new_color, new_point_clouds, new_traj)
Method for reducing the Database
Parameters
- db: Database for reduction
Returns
Reduced database