|
|
from ultralytics import SAM |
|
|
import cv2 |
|
|
from shapely.geometry import shape |
|
|
from rapidfuzz import process, fuzz |
|
|
from huggingface_hub import hf_hub_download |
|
|
from config import OUTPUT_DIR |
|
|
from pathlib import Path |
|
|
from PIL import Image |
|
|
import spaces |
|
|
import numpy as np |
|
|
import os |
|
|
import json |
|
|
from PIL import Image |
|
|
|
|
|
|
|
|
def box_inside_global(box, global_box): |
|
|
x1, y1, x2, y2 = box |
|
|
gx1, gy1, gx2, gy2 = global_box |
|
|
return (x1 >= gx1 and y1 >= gy1 and x2 <= gx2 and y2 <= gy2) |
|
|
|
|
|
def nms_iou(box1, box2): |
|
|
x1 = max(box1[0], box2[0]) |
|
|
y1 = max(box1[1], box2[1]) |
|
|
x2 = min(box1[2], box2[2]) |
|
|
y2 = min(box1[3], box2[3]) |
|
|
|
|
|
inter_area = max(0, x2 - x1) * max(0, y2 - y1) |
|
|
box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1]) |
|
|
box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1]) |
|
|
union_area = box1_area + box2_area - inter_area |
|
|
|
|
|
return inter_area / union_area if union_area > 0 else 0 |
|
|
|
|
|
def non_max_suppression(boxes, scores, iou_threshold=0.5): |
|
|
idxs = np.argsort(scores)[::-1] |
|
|
keep = [] |
|
|
|
|
|
while len(idxs) > 0: |
|
|
current = idxs[0] |
|
|
keep.append(current) |
|
|
idxs = idxs[1:] |
|
|
idxs = np.array([i for i in idxs if nms_iou(boxes[current], boxes[i]) < iou_threshold]) |
|
|
|
|
|
return keep |
|
|
|
|
|
|
|
|
|
|
|
def tile_image_with_overlap(image_path, tile_size=1024, overlap=256): |
|
|
"""Tile image into overlapping RGB tiles.""" |
|
|
image = cv2.imread(image_path) |
|
|
height, width, _ = image.shape |
|
|
|
|
|
step = tile_size - overlap |
|
|
tile_list = [] |
|
|
seen = set() |
|
|
|
|
|
for y in range(0, height, step): |
|
|
if y + tile_size > height: |
|
|
y = height - tile_size |
|
|
for x in range(0, width, step): |
|
|
if x + tile_size > width: |
|
|
x = width - tile_size |
|
|
|
|
|
|
|
|
x_start = max(0, x) |
|
|
y_start = max(0, y) |
|
|
x_end = x_start + tile_size |
|
|
y_end = y_start + tile_size |
|
|
|
|
|
coords = (x_start, y_start) |
|
|
if coords in seen: |
|
|
continue |
|
|
seen.add(coords) |
|
|
|
|
|
tile = image[y_start:y_end, x_start:x_end, :] |
|
|
tile_list.append((tile, coords)) |
|
|
|
|
|
return tile_list, image.shape |
|
|
|
|
|
|
|
|
|
|
|
def compute_iou(box1, box2): |
|
|
"""Compute Intersection over Union for two boxes.""" |
|
|
x1 = max(box1[0], box2[0]) |
|
|
y1 = max(box1[1], box2[1]) |
|
|
x2 = min(box1[2], box2[2]) |
|
|
y2 = min(box1[3], box2[3]) |
|
|
|
|
|
inter_area = max(0, x2 - x1) * max(0, y2 - y1) |
|
|
area1 = (box1[2] - box1[0]) * (box1[3] - box1[1]) |
|
|
area2 = (box2[2] - box2[0]) * (box2[3] - box2[1]) |
|
|
union_area = area1 + area2 - inter_area |
|
|
|
|
|
return inter_area / union_area if union_area > 0 else 0 |
|
|
|
|
|
|
|
|
def merge_boxes(boxes, iou_threshold=0.8): |
|
|
"""Merge overlapping boxes based on IoU.""" |
|
|
merged = [] |
|
|
used = [False] * len(boxes) |
|
|
|
|
|
for i, box in enumerate(boxes): |
|
|
if used[i]: |
|
|
continue |
|
|
group = [box] |
|
|
used[i] = True |
|
|
for j in range(i + 1, len(boxes)): |
|
|
if used[j]: |
|
|
continue |
|
|
if compute_iou(box, boxes[j]) > iou_threshold: |
|
|
group.append(boxes[j]) |
|
|
used[j] = True |
|
|
|
|
|
|
|
|
x1 = min(b[0] for b in group) |
|
|
y1 = min(b[1] for b in group) |
|
|
x2 = max(b[2] for b in group) |
|
|
y2 = max(b[3] for b in group) |
|
|
merged.append([x1, y1, x2, y2]) |
|
|
|
|
|
return merged |
|
|
|
|
|
|
|
|
def box_area(box): |
|
|
return max(0, box[2] - box[0]) * max(0, box[3] - box[1]) |
|
|
|
|
|
def is_contained(box1, box2, containment_threshold=0.9): |
|
|
|
|
|
x1 = max(box1[0], box2[0]) |
|
|
y1 = max(box1[1], box2[1]) |
|
|
x2 = min(box1[2], box2[2]) |
|
|
y2 = min(box1[3], box2[3]) |
|
|
|
|
|
inter_area = max(0, x2 - x1) * max(0, y2 - y1) |
|
|
area1 = box_area(box1) |
|
|
area2 = box_area(box2) |
|
|
|
|
|
|
|
|
smaller_area = min(area1, area2) |
|
|
if smaller_area == 0: |
|
|
return False |
|
|
return (inter_area / smaller_area) >= containment_threshold |
|
|
|
|
|
def merge_boxes_iterative(boxes, iou_threshold=0.25, containment_threshold=0.75): |
|
|
boxes = boxes.copy() |
|
|
changed = True |
|
|
|
|
|
while changed: |
|
|
changed = False |
|
|
merged = [] |
|
|
used = [False] * len(boxes) |
|
|
|
|
|
for i, box in enumerate(boxes): |
|
|
if used[i]: |
|
|
continue |
|
|
group = [box] |
|
|
used[i] = True |
|
|
for j in range(i + 1, len(boxes)): |
|
|
if used[j]: |
|
|
continue |
|
|
iou = compute_iou(box, boxes[j]) |
|
|
contained = is_contained(box, boxes[j], containment_threshold) |
|
|
if iou > iou_threshold or contained: |
|
|
group.append(boxes[j]) |
|
|
used[j] = True |
|
|
|
|
|
|
|
|
x1 = min(b[0] for b in group) |
|
|
y1 = min(b[1] for b in group) |
|
|
x2 = max(b[2] for b in group) |
|
|
y2 = max(b[3] for b in group) |
|
|
merged.append([x1, y1, x2, y2]) |
|
|
|
|
|
if len(merged) < len(boxes): |
|
|
changed = True |
|
|
boxes = merged |
|
|
|
|
|
return boxes |
|
|
|
|
|
|
|
|
def get_corner_points(box): |
|
|
x1, y1, x2, y2 = box |
|
|
return [ |
|
|
[x1, y1], |
|
|
[x2, y1], |
|
|
[x1, y2], |
|
|
[x2, y2], |
|
|
] |
|
|
|
|
|
|
|
|
def sample_negative_points_outside_boxes(mask, num_points): |
|
|
points = [] |
|
|
tries = 0 |
|
|
max_tries = num_points * 20 |
|
|
while len(points) < num_points and tries < max_tries: |
|
|
x = np.random.randint(0, mask.shape[1]) |
|
|
y = np.random.randint(0, mask.shape[0]) |
|
|
if not mask[y, x]: |
|
|
points.append([x, y]) |
|
|
tries += 1 |
|
|
return np.array(points) |
|
|
|
|
|
def get_inset_corner_points(box, margin=5): |
|
|
x1, y1, x2, y2 = box |
|
|
|
|
|
|
|
|
x1i = min(x1 + margin, x2) |
|
|
y1i = min(y1 + margin, y2) |
|
|
x2i = max(x2 - margin, x1) |
|
|
y2i = max(y2 - margin, y1) |
|
|
|
|
|
return [ |
|
|
[x1i, y1i], |
|
|
[x2i, y1i], |
|
|
[x1i, y2i], |
|
|
[x2i, y2i], |
|
|
] |
|
|
|
|
|
|
|
|
def processYOLOBoxes(iou): |
|
|
|
|
|
BOXES_PATH = os.path.join(OUTPUT_DIR,"boxes.json") |
|
|
with open(BOXES_PATH, "r") as f: |
|
|
box_data = json.load(f) |
|
|
|
|
|
|
|
|
boxes = np.array([item["bbox"] for item in box_data]) |
|
|
scores = np.array([item["score"] for item in box_data]) |
|
|
|
|
|
keep_indices = non_max_suppression(boxes, scores, iou) |
|
|
|
|
|
box_data = [box_data[i] for i in keep_indices] |
|
|
|
|
|
|
|
|
boxes_full = [b["bbox"] for b in box_data] |
|
|
return boxes_full |
|
|
|
|
|
def prepare_tiles(image_path, boxes_full, tile_size=1024, overlap=50, iou=0.5, c_th=0.75, edge_margin=10): |
|
|
""" |
|
|
Tiles the image and prepares per-tile metadata including filtered boxes and point prompts. |
|
|
Returns full image size H, W. |
|
|
""" |
|
|
tiles, (H, W, _) = tile_image_with_overlap(image_path, tile_size, overlap) |
|
|
os.makedirs("tmp/tiles", exist_ok=True) |
|
|
meta = [] |
|
|
|
|
|
for idx, (tile_array, (x_offset, y_offset)) in enumerate(tiles): |
|
|
tile_path = f"tmp/tiles/tile_{idx}.png" |
|
|
tile_array = cv2.cvtColor(tile_array, cv2.COLOR_BGR2RGB) |
|
|
Image.fromarray(tile_array).save(tile_path) |
|
|
|
|
|
tile_h, tile_w, _ = tile_array.shape |
|
|
|
|
|
|
|
|
candidate_boxes = [] |
|
|
for x1, y1, x2, y2 in boxes_full: |
|
|
if (x2 > x_offset) and (x1 < x_offset + tile_w) and (y2 > y_offset) and (y1 < y_offset + tile_h): |
|
|
candidate_boxes.append([x1, y1, x2, y2]) |
|
|
|
|
|
if not candidate_boxes: |
|
|
meta.append({ |
|
|
"idx": idx, |
|
|
"x_off": x_offset, |
|
|
"y_off": y_offset, |
|
|
"local_boxes": [], |
|
|
"point_coords": [], |
|
|
"point_labels": [] |
|
|
}) |
|
|
continue |
|
|
|
|
|
|
|
|
merged_boxes = merge_boxes_iterative(candidate_boxes, iou_threshold=iou, containment_threshold=c_th) |
|
|
|
|
|
|
|
|
local_boxes = [] |
|
|
for x1, y1, x2, y2 in merged_boxes: |
|
|
new_x1 = max(0, x1 - x_offset) |
|
|
new_y1 = max(0, y1 - y_offset) |
|
|
new_x2 = min(tile_w, x2 - x_offset) |
|
|
new_y2 = min(tile_h, y2 - y_offset) |
|
|
local_boxes.append([new_x1, new_y1, new_x2, new_y2]) |
|
|
|
|
|
|
|
|
filtered_local_boxes = [] |
|
|
for box in local_boxes: |
|
|
x1, y1, x2, y2 = box |
|
|
if (x1 > edge_margin and y1 > edge_margin and (tile_w - x2) > edge_margin and (tile_h - y2) > edge_margin): |
|
|
filtered_local_boxes.append(box) |
|
|
|
|
|
if not filtered_local_boxes: |
|
|
meta.append({ |
|
|
"idx": idx, |
|
|
"x_off": x_offset, |
|
|
"y_off": y_offset, |
|
|
"local_boxes": [], |
|
|
"point_coords": [], |
|
|
"point_labels": [] |
|
|
}) |
|
|
continue |
|
|
|
|
|
|
|
|
centroids = [((bx1 + bx2) / 2, (by1 + by2) / 2) for bx1, by1, bx2, by2 in filtered_local_boxes] |
|
|
negative_points_per_box = [get_inset_corner_points(box, margin=2) for box in filtered_local_boxes] |
|
|
|
|
|
point_coords = [] |
|
|
point_labels = [] |
|
|
for centroid, neg_points in zip(centroids, negative_points_per_box): |
|
|
if not isinstance(neg_points, list): |
|
|
neg_points = neg_points.tolist() |
|
|
all_points = [centroid] + neg_points |
|
|
all_labels = [1] + [0] * len(neg_points) |
|
|
point_coords.append(all_points) |
|
|
point_labels.append(all_labels) |
|
|
|
|
|
meta.append({ |
|
|
"idx": idx, |
|
|
"x_off": x_offset, |
|
|
"y_off": y_offset, |
|
|
"local_boxes": filtered_local_boxes, |
|
|
"point_coords": point_coords, |
|
|
"point_labels": point_labels |
|
|
}) |
|
|
|
|
|
|
|
|
os.makedirs("tmp", exist_ok=True) |
|
|
with open("tmp/tiles_meta.json", "w") as f: |
|
|
json.dump(meta, f) |
|
|
|
|
|
return H, W |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def merge_tile_masks(H, W): |
|
|
""" |
|
|
Merge predicted tile masks into a full-size image. |
|
|
|
|
|
Args: |
|
|
H (int): full image height |
|
|
W (int): full image width |
|
|
|
|
|
Returns: |
|
|
full_mask (np.ndarray): merged mask array |
|
|
""" |
|
|
full_mask = np.zeros((H, W), dtype=np.uint16) |
|
|
instance_id = 1 |
|
|
|
|
|
|
|
|
with open("tmp/tiles_meta.json", "r") as f: |
|
|
tiles_meta = json.load(f) |
|
|
|
|
|
for tile in tiles_meta: |
|
|
tile_idx = tile["idx"] |
|
|
x_off = tile["x_off"] |
|
|
y_off = tile["y_off"] |
|
|
|
|
|
mask_path = f"tmp/masks/tile_{tile_idx}.npy" |
|
|
if not Path(mask_path).exists(): |
|
|
continue |
|
|
|
|
|
|
|
|
tile_masks = np.load(mask_path) |
|
|
|
|
|
if tile_masks.ndim == 2: |
|
|
tile_masks = tile_masks[None, :, :] |
|
|
|
|
|
for mask in tile_masks: |
|
|
mask = mask.astype(bool) |
|
|
|
|
|
|
|
|
pad_h = 1024 - mask.shape[0] |
|
|
pad_w = 1024 - mask.shape[1] |
|
|
if pad_h > 0 or pad_w > 0: |
|
|
mask = np.pad(mask, ((0, pad_h), (0, pad_w)), mode='constant', constant_values=0) |
|
|
|
|
|
|
|
|
h_end = min(y_off + mask.shape[0], H) |
|
|
w_end = min(x_off + mask.shape[1], W) |
|
|
|
|
|
region = full_mask[y_off:h_end, x_off:w_end] |
|
|
mask = mask[:h_end - y_off, :w_end - x_off] |
|
|
|
|
|
region[mask & (region == 0)] = instance_id |
|
|
instance_id += 1 |
|
|
|
|
|
|
|
|
final_mask = Image.fromarray(full_mask) |
|
|
MASK_PATH = os.path.join(OUTPUT_DIR,"mask.tif") |
|
|
final_mask.save(MASK_PATH) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def chunkify(lst, n): |
|
|
"""Yield successive n-sized chunks from lst.""" |
|
|
for i in range(0, len(lst), n): |
|
|
yield lst[i:i + n] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def img_shape(image_path): |
|
|
img = cv2.imread(image_path) |
|
|
return img.shape |
|
|
|
|
|
|
|
|
|
|
|
def best_street_match(point, query_name, edges_gdf, max_distance=100): |
|
|
buffer = point.buffer(max_distance) |
|
|
nearby_edges = edges_gdf[edges_gdf.intersects(buffer)] |
|
|
|
|
|
if nearby_edges.empty: |
|
|
return None, 0 |
|
|
|
|
|
candidate_names = nearby_edges['name'].tolist() |
|
|
best_match = process.extractOne(query_name, candidate_names, scorer=fuzz.ratio) |
|
|
return best_match |