523 lines
15 KiB
Python

import math
import signal
import sys
import time
from dataclasses import dataclass
from typing import Tuple
import cv2
import numpy as np
import motor
FRAME_WIDTH = 320
FRAME_HEIGHT = 240
GUIDE_CENTER_X = 160
GUIDE_CENTER_Y = 120
CONTROL_DEADZONE = 15
CONTROL_BASE_SPEED = 0.55
CONTROL_MAX_SPEED = 1.0
CONTROL_LOOKAHEAD_FRAMES = 1.4
CONTROL_LOOKAHEAD_GAIN = 0.06
MIN_CONTOUR_AREA = 90
MAX_CONTOUR_AREA = 6000
EDGE_IGNORE = 12
TARGET_LOCK_DURATION = 3.0
TRACK_MAX_MISSES = 4
TRACK_FAST_MOVE_PX = 24.0
TRACK_CENTER_WINDOW = 28
@dataclass
class TrackState:
cx: int
cy: int
area: float
bbox: Tuple[int, int, int, int]
vx: float = 0.0
vy: float = 0.0
misses: int = 0
visible: bool = True
def signal_handler(sig, frame):
print("\nStop signal received, stopping motors...")
motor.stop()
print("Motors stopped, exiting.")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
def estimate_camera_motion(prev_gray, gray):
identity = np.array([[1.0, 0.0, 0.0], [0.0, 1.0, 0.0]], dtype=np.float32)
prev_pts = cv2.goodFeaturesToTrack(
prev_gray,
maxCorners=180,
qualityLevel=0.01,
minDistance=8,
blockSize=7,
)
if prev_pts is None or len(prev_pts) < 8:
return identity, 0.0, 0
curr_pts, status, _ = cv2.calcOpticalFlowPyrLK(
prev_gray,
gray,
prev_pts,
None,
winSize=(21, 21),
maxLevel=3,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01),
)
if curr_pts is None or status is None:
return identity, 0.0, 0
valid = status.reshape(-1) == 1
prev_valid = prev_pts[valid].reshape(-1, 2)
curr_valid = curr_pts[valid].reshape(-1, 2)
if len(prev_valid) < 8:
return identity, 0.0, len(prev_valid)
transform, inliers = cv2.estimateAffinePartial2D(
prev_valid,
curr_valid,
method=cv2.RANSAC,
ransacReprojThreshold=3.0,
maxIters=2000,
confidence=0.99,
)
if transform is None:
return identity, 0.0, len(prev_valid)
tx = float(transform[0, 2])
ty = float(transform[1, 2])
rotation = math.degrees(math.atan2(transform[1, 0], transform[0, 0]))
motion_level = math.hypot(tx, ty) + abs(rotation) * 1.5
inlier_count = int(inliers.sum()) if inliers is not None else len(prev_valid)
return transform.astype(np.float32), motion_level, inlier_count
def build_motion_mask(prev_gray, gray, transform, motion_level):
height, width = gray.shape
aligned_prev = cv2.warpAffine(
prev_gray,
transform,
(width, height),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE,
)
valid_region = cv2.warpAffine(
np.full_like(prev_gray, 255),
transform,
(width, height),
flags=cv2.INTER_NEAREST,
borderMode=cv2.BORDER_CONSTANT,
borderValue=0,
)
diff = cv2.absdiff(aligned_prev, gray)
diff = cv2.GaussianBlur(diff, (5, 5), 0)
threshold_value = int(min(40, 18 + motion_level * 1.5))
_, mask = cv2.threshold(diff, threshold_value, 255, cv2.THRESH_BINARY)
if EDGE_IGNORE > 0:
mask[:EDGE_IGNORE, :] = 0
mask[-EDGE_IGNORE:, :] = 0
mask[:, :EDGE_IGNORE] = 0
mask[:, -EDGE_IGNORE:] = 0
mask = cv2.bitwise_and(mask, valid_region)
kernel_small = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))
kernel_large = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (7, 7))
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel_small)
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel_large)
mask = cv2.dilate(mask, kernel_small, iterations=1)
return mask
def contour_candidates(mask):
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
candidates = []
height, width = mask.shape
for contour in contours:
area = cv2.contourArea(contour)
if area < MIN_CONTOUR_AREA or area > MAX_CONTOUR_AREA:
continue
x, y, w, h = cv2.boundingRect(contour)
if x <= EDGE_IGNORE or y <= EDGE_IGNORE:
continue
if x + w >= width - EDGE_IGNORE or y + h >= height - EDGE_IGNORE:
continue
aspect_ratio = float(w) / h if h else 0.0
if aspect_ratio < 0.25 or aspect_ratio > 4.0:
continue
rect_area = float(w * h)
fill_ratio = area / rect_area if rect_area else 0.0
if fill_ratio < 0.18:
continue
cx = x + w // 2
cy = y + h // 2
candidates.append(
{
"contour": contour,
"bbox": (x, y, w, h),
"area": float(area),
"cx": cx,
"cy": cy,
}
)
return candidates
def bbox_iou(box_a, box_b):
ax, ay, aw, ah = box_a
bx, by, bw, bh = box_b
left = max(ax, bx)
top = max(ay, by)
right = min(ax + aw, bx + bw)
bottom = min(ay + ah, by + bh)
if right <= left or bottom <= top:
return 0.0
intersection = (right - left) * (bottom - top)
union = aw * ah + bw * bh - intersection
return intersection / union if union > 0 else 0.0
def select_target(candidates, track_state):
if not candidates:
return None
if track_state is None:
return max(
candidates,
key=lambda c: c["area"] - 0.6 * abs(c["cx"] - GUIDE_CENTER_X),
)
predicted_x = track_state.cx + track_state.vx
predicted_y = track_state.cy + track_state.vy
best_candidate = None
best_score = -1e9
base_radius = max(55.0, math.sqrt(max(track_state.area, 1.0)) * 2.5)
search_radius = base_radius + min(track_state.misses * 18.0, 60.0)
for candidate in candidates:
distance = math.hypot(candidate["cx"] - predicted_x, candidate["cy"] - predicted_y)
overlap = bbox_iou(candidate["bbox"], track_state.bbox)
area_ratio = candidate["area"] / max(track_state.area, 1.0)
area_penalty = abs(math.log(max(area_ratio, 1e-6)))
if distance > search_radius and overlap < 0.02:
continue
score = (
candidate["area"] * 0.08
+ overlap * 60.0
- distance * 1.4
- area_penalty * 22.0
)
if score > best_score:
best_score = score
best_candidate = candidate
if best_candidate is not None:
return best_candidate
return None
def update_track_state(track_state, candidate):
if candidate is None:
return None
if track_state is None:
return TrackState(
cx=candidate["cx"],
cy=candidate["cy"],
area=candidate["area"],
bbox=candidate["bbox"],
)
raw_dx = candidate["cx"] - track_state.cx
raw_dy = candidate["cy"] - track_state.cy
motion_mag = math.hypot(raw_dx, raw_dy)
crossed_guide_x = (track_state.cx - GUIDE_CENTER_X) * (candidate["cx"] - GUIDE_CENTER_X) < 0
near_guide_x = (
abs(track_state.cx - GUIDE_CENTER_X) < TRACK_CENTER_WINDOW
or abs(candidate["cx"] - GUIDE_CENTER_X) < TRACK_CENTER_WINDOW
)
if track_state.misses > 0:
smooth_prev = 0.30
else:
smooth_prev = 0.58
if motion_mag > TRACK_FAST_MOVE_PX:
smooth_prev = min(smooth_prev, 0.34)
if crossed_guide_x and near_guide_x:
smooth_prev = min(smooth_prev, 0.18)
new_cx = int(smooth_prev * track_state.cx + (1.0 - smooth_prev) * candidate["cx"])
new_cy = int(smooth_prev * track_state.cy + (1.0 - smooth_prev) * candidate["cy"])
raw_vx = new_cx - track_state.cx
raw_vy = new_cy - track_state.cy
velocity_keep = 0.60
if motion_mag > TRACK_FAST_MOVE_PX:
velocity_keep = 0.42
if crossed_guide_x and near_guide_x:
velocity_keep = 0.30
new_vx = velocity_keep * track_state.vx + (1.0 - velocity_keep) * raw_vx
new_vy = velocity_keep * track_state.vy + (1.0 - velocity_keep) * raw_vy
track_state.cx = new_cx
track_state.cy = new_cy
track_state.area = candidate["area"]
track_state.bbox = candidate["bbox"]
track_state.vx = new_vx
track_state.vy = new_vy
track_state.misses = 0
track_state.visible = True
return track_state
def predict_track_state(track_state):
if track_state is None:
return None
x, y, w, h = track_state.bbox
next_cx = int(np.clip(track_state.cx + track_state.vx, 0, FRAME_WIDTH - 1))
next_cy = int(np.clip(track_state.cy + track_state.vy, 0, FRAME_HEIGHT - 1))
next_x = int(np.clip(x + track_state.vx, 0, max(0, FRAME_WIDTH - w)))
next_y = int(np.clip(y + track_state.vy, 0, max(0, FRAME_HEIGHT - h)))
track_state.cx = next_cx
track_state.cy = next_cy
track_state.bbox = (next_x, next_y, w, h)
track_state.vx *= 0.82
track_state.vy *= 0.82
track_state.misses += 1
track_state.visible = False
return track_state
def predicted_control_point(track_state):
if track_state is None:
return GUIDE_CENTER_X, GUIDE_CENTER_Y
speed = math.hypot(track_state.vx, track_state.vy)
lookahead = CONTROL_LOOKAHEAD_FRAMES + min(1.3, speed * CONTROL_LOOKAHEAD_GAIN)
px = int(np.clip(track_state.cx + track_state.vx * lookahead, 0, FRAME_WIDTH - 1))
py = int(np.clip(track_state.cy + track_state.vy * lookahead, 0, FRAME_HEIGHT - 1))
return px, py
def control_speed(error, velocity):
magnitude = abs(error) + abs(velocity) * 1.3
if magnitude <= CONTROL_DEADZONE:
return 0.0
normalized = min(1.0, (magnitude - CONTROL_DEADZONE) / 90.0)
return CONTROL_BASE_SPEED + normalized * (CONTROL_MAX_SPEED - CONTROL_BASE_SPEED)
def draw_debug(frame, mask, track_state, motion_level, inlier_count, lock_remaining):
frame_width = frame.shape[1]
cv2.circle(frame, (GUIDE_CENTER_X, GUIDE_CENTER_Y), 5, (255, 0, 0), -1)
cv2.putText(
frame,
f"cam:{motion_level:4.1f} feat:{inlier_count:3d}",
(8, 18),
cv2.FONT_HERSHEY_SIMPLEX,
0.45,
(0, 255, 255),
1,
cv2.LINE_AA,
)
if lock_remaining > 0:
cv2.putText(
frame,
f"LOCK {lock_remaining:0.1f}s",
(8, 38),
cv2.FONT_HERSHEY_SIMPLEX,
0.55,
(0, 0, 255),
2,
cv2.LINE_AA,
)
if track_state is not None:
x, y, w, h = track_state.bbox
color = (0, 255, 0) if track_state.visible else (0, 165, 255)
cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2)
cv2.circle(frame, (track_state.cx, track_state.cy), 5, (0, 0, 255), -1)
predicted_x, predicted_y = predicted_control_point(track_state)
cv2.circle(frame, (predicted_x, predicted_y), 4, (0, 255, 255), -1)
status = "track" if track_state.visible else f"hold:{track_state.misses}"
cv2.putText(
frame,
status,
(x, max(15, y - 8)),
cv2.FONT_HERSHEY_SIMPLEX,
0.45,
color,
1,
cv2.LINE_AA,
)
mask_preview = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR)
preview_h, preview_w = 60, 80
mask_preview = cv2.resize(mask_preview, (preview_w, preview_h))
frame[0:preview_h, frame_width - preview_w : frame_width] = mask_preview
def drive_to_target(track_state, lock_active=False):
if lock_active or track_state is None:
motor.stop()
return
target_x, target_y = predicted_control_point(track_state)
dx = target_x - GUIDE_CENTER_X
dy = target_y - GUIDE_CENTER_Y
speed_x = control_speed(dx, track_state.vx)
speed_y = control_speed(dy, track_state.vy)
if speed_x == 0.0 and speed_y == 0.0:
motor.stop()
elif speed_x == 0.0:
if dy > 0:
motor.move_right(speed=speed_y)
else:
motor.move_left(speed=speed_y)
elif speed_y == 0.0:
if dx > 0:
motor.backward(speed=speed_x)
else:
motor.forward(speed=speed_x)
else:
speed = max(speed_x, speed_y)
if dx > 0 and dy > 0:
motor.move_right_backward(speed=speed)
elif dx > 0 and dy < 0:
motor.move_left_backward(speed=speed)
elif dx < 0 and dy > 0:
motor.move_right_forward(speed=speed)
else:
motor.move_left_forward(speed=speed)
print("Starting camera tracking...")
print(f"OpenCV version: {cv2.__version__}")
cap = cv2.VideoCapture(0)
if not cap.isOpened():
print("Unable to open camera.")
sys.exit(1)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, FRAME_WIDTH)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, FRAME_HEIGHT)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
print(f"Camera ready: {cap.get(cv2.CAP_PROP_FRAME_WIDTH)}x{cap.get(cv2.CAP_PROP_FRAME_HEIGHT)}")
success_count = 0
for _ in range(10):
ret, _ = cap.read()
if ret:
success_count += 1
time.sleep(0.05)
print(f"Warm-up finished, successful reads: {success_count}/10")
ret, prev_frame = cap.read()
if not ret:
print("Unable to read initial frame.")
cap.release()
sys.exit(1)
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
prev_gray = cv2.GaussianBlur(prev_gray, (5, 5), 0)
track_state = None
lock_until = 0.0
while True:
ret, frame = cap.read()
if not ret:
break
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gray = cv2.GaussianBlur(gray, (5, 5), 0)
transform, motion_level, inlier_count = estimate_camera_motion(prev_gray, gray)
motion_mask = build_motion_mask(prev_gray, gray, transform, motion_level)
candidates = contour_candidates(motion_mask)
now = time.monotonic()
lock_remaining = max(0.0, lock_until - now)
lock_active = lock_remaining > 0.0
if lock_active:
track_state = None
selected = None
else:
selected = select_target(candidates, track_state)
if selected is not None:
track_state = update_track_state(track_state, selected)
elif track_state is not None:
track_state = predict_track_state(track_state)
if track_state.misses > TRACK_MAX_MISSES:
motor.stop()
track_state = None
lock_until = now + TARGET_LOCK_DURATION
lock_remaining = TARGET_LOCK_DURATION
lock_active = True
else:
track_state = None
draw_debug(frame, motion_mask, track_state, motion_level, inlier_count, lock_remaining)
drive_to_target(track_state, lock_active=lock_active)
if lock_active:
print(f"track locked {lock_remaining:0.1f}s, cam={motion_level:4.1f}")
elif track_state is not None:
print(
f"track=({track_state.cx:3d},{track_state.cy:3d}) "
f"area={track_state.area:6.1f} cam={motion_level:4.1f}"
)
else:
print(f"waiting target, cam={motion_level:4.1f}")
cv2.imshow("tracking", frame)
prev_gray = gray.copy()
if cv2.waitKey(1) & 0xFF == 27:
break
cap.release()
cv2.destroyAllWindows()
motor.stop()