Interfuser-Driving-Model / modeling_interfuser.py
mohammed-aljafry's picture
Update modeling_interfuser.py
943ac91 verified
raw
history blame
50.8 kB
# ==============================================================================
# InterFuser End-to-End Control Model
# (Ready for Hugging Face Hub)
# ==============================================================================
import torch
from torch import nn
import torch.nn.functional as F
from transformers import PreTrainedModel, PretrainedConfig
from transformers.utils.generic import ModelOutput
from functools import partial
import math
import copy
from typing import Optional, Tuple, Union, List
from torch import Tensor
from dataclasses import dataclass
import numpy as np
from collections import deque
# --- حاول استيراد timm، وإلا أبلغ المستخدم
try:
from timm.models.resnet import resnet50d, resnet26d, resnet18d
except ImportError:
raise ImportError("Please install timm to use this model: `pip install timm`")
class PIDController:
def __init__(self, K_P=1.0, K_I=0.0, K_D=0.0, n=20):
self._K_P = K_P
self._K_I = K_I
self._K_D = K_D
self._window = deque([0 for _ in range(n)], maxlen=n)
self._max = 0.0
self._min = 0.0
def step(self, error):
self._window.append(error)
self._max = max(self._max, abs(error))
self._min = -abs(self._max)
if len(self._window) >= 2:
integral = np.mean(self._window)
derivative = self._window[-1] - self._window[-2]
else:
integral = 0.0
derivative = 0.0
return self._K_P * error + self._K_I * integral + self._K_D * derivative
# ================== 1. الإعدادات ==================
SAVE_VIDEO = True
OUTPUT_FILENAME = 'simulation_output.mp4'
FPS = 10
WAYPOINT_SCALE_FACTOR = 5.0
T1_FUTURE_TIME = 1.0 # ثانية في المستقبل
T2_FUTURE_TIME = 2.0 # ثانيتين في المستقبل
TRACKER_FREQUENCY = 10
MERGE_PERCENT = 0.4
# حساب الثوابت
PIXELS_PER_METER = 8
MAX_DISTANCE = 32
IMG_SIZE = MAX_DISTANCE * PIXELS_PER_METER * 2
EGO_CAR_X = IMG_SIZE // 2
EGO_CAR_Y = IMG_SIZE - (4.0 * PIXELS_PER_METER)
reweight_array = np.ones((20, 20, 7)) # يمكن تعديل هذه القيمة حسب الحاجة
# المتغيرات العامة للحالة
last_valid_waypoints = None
last_valid_theta = 0.0
# ================== 2. وظائف المساعدة ==================
def ensure_rgb(image):
"""تحويل الصورة إلى RGB إذا كانت grayscale."""
if len(image.shape) == 2 or image.shape[2] == 1:
return cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
return image
def process_camera_image(tensor_image):
"""تحويل صورة الكاميرا من Tensor إلى NumPy Array."""
image_np = tensor_image.permute(1, 2, 0).cpu().numpy()
image_np = (image_np * np.array([0.229, 0.224, 0.225])) + np.array([0.485, 0.456, 0.406])
image_np = np.clip(image_np, 0, 1)
return (image_np * 255).astype(np.uint8)[:, :, ::-1] # BGR
def convert_grid_to_xy(i, j):
"""تحويل الشبكة إلى إحداثيات x, y."""
return (j - 9.5) * 1.6, (19.5 - i) * 1.6
def add_rect(img, loc, ori, box, value, color):
"""
إضافة مستطيل إلى الخريطة.
"""
center_x = int(loc[0] * PIXELS_PER_METER + MAX_DISTANCE * PIXELS_PER_METER)
center_y = int(loc[1] * PIXELS_PER_METER + MAX_DISTANCE * PIXELS_PER_METER)
size_px = (
int(box[0] * PIXELS_PER_METER),
int(box[1] * PIXELS_PER_METER)
)
angle_deg = -np.degrees(math.atan2(ori[1], ori[0]))
box_points = cv2.boxPoints(((center_x, center_y), size_px, angle_deg))
box_points = np.int32(box_points)
adjusted_color = [int(x * value) for x in color]
cv2.fillConvexPoly(img, box_points, adjusted_color)
return img
def find_peak_box(data):
"""
اكتشاف القمم في البيانات وتصنيفها.
"""
det_data = np.zeros((22, 22, 7))
det_data[1:21, 1:21] = data
detected_objects = []
for i in range(1, 21):
for j in range(1, 21):
if det_data[i, j, 0] > 0.6 and (
det_data[i, j, 0] > det_data[i, j - 1, 0]
and det_data[i, j, 0] > det_data[i, j + 1, 0]
and det_data[i, j, 0] > det_data[i - 1, j, 0]
and det_data[i, j, 0] > det_data[i + 1, j, 0]
):
length = det_data[i, j, 4]
width = det_data[i, j, 5]
confidence = det_data[i, j, 0]
obj_type = 'unknown'
if length > 4.0:
obj_type = 'car'
elif length / width > 1.5:
obj_type = 'bike'
else:
obj_type = 'pedestrian'
detected_objects.append({
'coords': (i - 1, j - 1),
'type': obj_type,
'confidence': confidence,
'raw_data': det_data[i, j]
})
return detected_objects
def render(det_data, t=0):
"""
رسم كائنات الكشف على الخريطة BEV.
"""
CLASS_COLORS = {'car': (0, 0, 255), 'bike': (0, 255, 0), 'pedestrian': (255, 0, 0), 'unknown': (128, 128, 128)}
det_weighted = det_data * reweight_array
detected_objects = find_peak_box(det_weighted)
counts = {cls: 0 for cls in CLASS_COLORS.keys()}
[counts.update({obj['type']: counts.get(obj['type'], 0) + 1}) for obj in detected_objects]
img = np.zeros((IMG_SIZE, IMG_SIZE, 3), np.uint8)
for obj in detected_objects:
i, j = obj['coords']
obj_data = obj['raw_data']
speed = obj_data[6]
center_x, center_y = convert_grid_to_xy(i, j)
theta = obj_data[3] * np.pi
ori = np.array([math.cos(theta), math.sin(theta)])
loc_x = center_x + obj_data[1] + t * speed * ori[0]
loc_y = center_y + obj_data[2] - t * speed * ori[1]
box = np.array([obj_data[4], obj_data[5]])
if obj['type'] == 'pedestrian':
box *= 1.5
add_rect(
img,
loc=np.array([loc_x, loc_y]),
ori=ori,
box=box,
value=obj['confidence'],
color=CLASS_COLORS[obj['type']]
)
return img, counts
def render_self_car(loc, ori, box, pixels_per_meter=PIXELS_PER_METER):
"""
رسم السيارة الذاتية على الخريطة BEV.
Args:
loc: موقع السيارة [x, y] في النظام العالمي.
ori: اتجاه السيارة [cos(theta), sin(theta)].
box: أبعاد السيارة [طول, عرض].
pixels_per_meter: عدد البكسلات لكل متر.
Returns:
self_car_map: خريطة السيارة ذاتية القيادة (RGB - 3 قنوات).
"""
img = np.zeros((IMG_SIZE, IMG_SIZE, 3), np.uint8)
center_x = int(loc[0] * pixels_per_meter + MAX_DISTANCE * pixels_per_meter)
center_y = int(loc[1] * pixels_per_meter + MAX_DISTANCE * pixels_per_meter)
size_px = (
int(box[0] * pixels_per_meter),
int(box[1] * pixels_per_meter)
)
angle_deg = -np.degrees(math.atan2(ori[1], ori[0]))
box_points = cv2.boxPoints(((center_x, center_y), size_px, angle_deg))
box_points = np.int32(box_points)
ego_color = (0, 255, 255) # أصفر
cv2.fillConvexPoly(img, box_points, ego_color)
return img # ← نرجع الصورة بأكملها وليس جزءًا منها
def render_waypoints(waypoints, pixels_per_meter=PIXELS_PER_METER):
global last_valid_waypoints
img = np.zeros((IMG_SIZE, IMG_SIZE, 3), np.uint8)
current_waypoints = waypoints
if waypoints is not None and len(waypoints) > 2:
last_valid_waypoints = waypoints
else:
current_waypoints = last_valid_waypoints
if current_waypoints is None:
return img
origin_x, origin_y = EGO_CAR_X, EGO_CAR_Y
for i, point in enumerate(current_waypoints):
px = int(origin_x + point[1] * pixels_per_meter)
py = int(origin_y - point[0] * pixels_per_meter)
color = (0, 0, 255) if i == len(current_waypoints) - 1 else (0, 255, 0)
cv2.circle(img, (px, py), 4, color, -1)
return img
def collision_detections(map1, map2, threshold=0.04):
"""
تحقق من وجود تداخل بين خريطة البيئة ونموذج السيارة.
"""
print("map1 shape:", map1.shape)
print("map2 shape:", map2.shape)
# تحويل map2 إلى grayscale إذا كانت تحتوي على 3 قنوات (RGB)
if len(map2.shape) == 3 and map2.shape[2] == 3:
map2 = cv2.cvtColor(map2, cv2.COLOR_BGR2GRAY)
# التأكد من أن map1 و map2 لها نفس الأبعاد
assert map1.shape == map2.shape
overlap_map = (map1 > 0.01) & (map2 > 0.01)
ratio = float(np.sum(overlap_map)) / np.sum(map2 > 0)
return ratio < threshold
def get_max_safe_distance(meta_data, downsampled_waypoints, t, collision_buffer, threshold):
"""
حساب أقصى مسافة آمنة قبل حدوث تصادم.
"""
surround_map = meta_data.reshape(20, 20, 7)[..., :3][..., 0]
if np.sum(surround_map) < 1:
return np.linalg.norm(downsampled_waypoints[-3])
hero_bounding_box = np.array([2.45, 1.0]) + collision_buffer
safe_distance = 0.0
for i in range(len(downsampled_waypoints) - 2):
aim = (downsampled_waypoints[i + 1] + downsampled_waypoints[i + 2]) / 2.0
loc = downsampled_waypoints[i]
ori = aim - loc
self_car_map = render_self_car(loc=loc, ori=ori, box=hero_bounding_box, pixels_per_meter=PIXELS_PER_METER)
# تصغير الخريطة والتحويل إلى grayscale
self_car_map_resized = cv2.resize(self_car_map, (20, 20))
self_car_map_gray = cv2.cvtColor(self_car_map_resized, cv2.COLOR_BGR2GRAY)
if not collision_detections(surround_map, self_car_map_gray, threshold):
break
safe_distance = max(safe_distance, np.linalg.norm(loc))
return safe_distance
def downsample_waypoints(waypoints, precision=0.2):
"""
تقليل عدد نقاط المسار.
"""
downsampled_waypoints = []
last_waypoint = np.array([0.0, 0.0])
for i in range(len(waypoints)):
now_waypoint = waypoints[i]
dis = np.linalg.norm(now_waypoint - last_waypoint)
if dis > precision:
interval = int(dis / precision)
move_vector = (now_waypoint - last_waypoint) / (interval + 1)
for j in range(interval):
downsampled_waypoints.append(last_waypoint + move_vector * (j + 1))
downsampled_waypoints.append(now_waypoint)
last_waypoint = now_waypoint
return downsampled_waypoints
# ================== 3. فئة التتبع ==================
class TrackedObject:
def __init__(self):
self.last_step = 0
self.last_pos = [0, 0]
self.historical_pos = []
self.historical_steps = []
self.historical_features = []
class Tracker:
def __init__(self, frequency=10):
self.tracks = []
self.alive_ids = []
self.frequency = frequency
def update_and_predict(self, det_data, pos, theta, frame_num):
det_data_weighted = det_data * reweight_array
detected_objects = find_peak_box(det_data_weighted)
objects_info = []
R = np.array([[np.cos(-theta), -np.sin(-theta)], [np.sin(-theta), np.cos(-theta)]])
for obj in detected_objects:
i, j = obj['coords']
obj_data = obj['raw_data']
center_y, center_x = convert_grid_to_xy(i, j)
center_x += obj_data[1]
center_y += obj_data[2]
loc = R.T.dot(np.array([center_x, center_y]))
objects_info.append([loc[0] + pos[0], loc[1] + pos[1], obj_data[1:]]) # [x, y, features...]
updates_ids = self._update(objects_info, frame_num)
speed_results, heading_results = self._predict(updates_ids)
for k, poi in enumerate(updates_ids):
i, j = poi
if heading_results[k] is not None:
factor = MERGE_PERCENT * 0.1
det_data[i, j, 3] = heading_results[k] * factor + det_data[i, j, 3] * (1 - factor)
if speed_results[k] is not None:
factor = MERGE_PERCENT * 0.1
det_data[i, j, 6] = speed_results[k] * factor + det_data[i, j, 6] * (1 - factor)
return det_data
def _update(self, objects_info, step):
latest_ids = []
if len(self.tracks) == 0:
for object_info in objects_info:
to = TrackedObject()
to.update(step, object_info)
self.tracks.append(to)
latest_ids.append(len(self.tracks) - 1)
else:
matched_ids = set()
for idx, object_info in enumerate(objects_info):
min_id, min_error = -1, float('inf')
pos_x, pos_y = object_info[:2]
for _id in self.alive_ids:
if _id in matched_ids:
continue
track_pos = self.tracks[_id].last_pos
distance = np.sqrt((track_pos[0] - pos_x)**2 + (track_pos[1] - pos_y)**2)
if distance < 2.0 and distance < min_error:
min_error = distance
min_id = _id
if min_id != -1:
self.tracks[min_id].update(step, objects_info[idx])
latest_ids.append(min_id)
matched_ids.add(min_id)
else:
to = TrackedObject()
self.tracks.append(to)
latest_ids.append(len(self.tracks) - 1)
self.alive_ids = [i for i, track in enumerate(self.tracks) if track.last_step > step - 6]
return latest_ids
def _match(self, objects_info):
results = []
matched_ids = set()
for object_info in objects_info:
min_id, min_error = -1, float('inf')
pos_x, pos_y = object_info[:2]
for _id in self.alive_ids:
if _id in matched_ids:
continue
track_pos = self.tracks[_id].last_pos
distance = np.sqrt((track_pos[0] - pos_x)**2 + (track_pos[1] - pos_y)**2)
if distance < min_error:
min_error = distance
min_id = _id
results.append(min_id)
if min_id != -1:
matched_ids.add(min_id)
return results
def _predict(self, updates_ids):
speed_results, heading_results = [], []
for each_id in updates_ids:
to = self.tracks[each_id]
avg_speed, avg_heading = [], []
for feature in to.historical_features:
avg_speed.append(feature[2])
avg_heading.append(feature[:2])
if len(avg_speed) < 2:
speed_results.append(None)
heading_results.append(None)
continue
avg_speed = np.mean(avg_speed)
avg_heading = np.mean(np.stack(avg_heading), axis=0)
yaw_angle = get_yaw_angle(avg_heading)
heading_results.append((4 - yaw_angle / np.pi) % 2)
speed_results.append(avg_speed)
return speed_results, heading_results
def get_yaw_angle(forward_vector):
forward_vector = forward_vector / np.linalg.norm(forward_vector)
yaw = math.atan2(forward_vector[1], forward_vector[0])
return yaw
# ================== 4. فئة المتحكم ==================
class InterfuserController:
def __init__(self, config):
self.turn_controller = PIDController(
K_P=config.turn_KP,
K_I=config.turn_KI,
K_D=config.turn_KD,
n=config.turn_n,
)
self.speed_controller = PIDController(
K_P=config.speed_KP,
K_I=config.speed_KI,
K_D=config.speed_KD,
n=config.speed_n,
)
self.config = config
self.collision_buffer = np.array(config.collision_buffer)
self.detect_threshold = config.detect_threshold
self.stop_steps = 0
self.forced_forward_steps = 0
self.red_light_steps = 0
self.block_red_light = 0
self.in_stop_sign_effect = False
self.block_stop_sign_distance = 0
self.stop_sign_timer = 0
self.stop_sign_trigger_times = 0
def run_step(
self, speed, waypoints, junction, traffic_light_state, stop_sign, meta_data
):
# --- تحديث حالة التوقف ---
if speed < 0.2:
self.stop_steps += 1
else:
self.stop_steps = max(0, self.stop_steps - 10)
if speed < 0.06 and self.in_stop_sign_effect:
self.in_stop_sign_effect = False
if junction < 0.3:
self.stop_sign_trigger_times = 0
if traffic_light_state > 0.7:
self.red_light_steps += 1
else:
self.red_light_steps = 0
if self.red_light_steps > 1000:
self.block_red_light = 80
self.red_light_steps = 0
if self.block_red_light > 0:
self.block_red_light -= 1
traffic_light_state = 0.01
if stop_sign < 0.6 and self.block_stop_sign_distance < 0.1:
self.in_stop_sign_effect = True
self.block_stop_sign_distance = 2.0
self.stop_sign_trigger_times = 3
self.block_stop_sign_distance = max(
0, self.block_stop_sign_distance - 0.05 * speed
)
if self.block_stop_sign_distance < 0.1:
if self.stop_sign_trigger_times > 0:
self.block_stop_sign_distance = 2.0
self.stop_sign_trigger_times -= 1
self.in_stop_sign_effect = True
# --- حساب زاوية الانعطاف ---
aim = (waypoints[1] + waypoints[0]) / 2.0
angle = np.degrees(np.pi / 2 - np.arctan2(aim[1], aim[0])) / 90
if speed < 0.01:
angle = 0
steer = self.turn_controller.step(angle)
steer = np.clip(steer, -1.0, 1.0)
brake = False
throttle = 0.0
desired_speed = 0.0
downsampled_waypoints = downsample_waypoints(waypoints)
d_0 = get_max_safe_distance(
meta_data,
downsampled_waypoints,
t=0,
collision_buffer=self.collision_buffer,
threshold=self.detect_threshold,
)
d_05 = get_max_safe_distance(
meta_data,
downsampled_waypoints,
t=0.5,
collision_buffer=self.collision_buffer,
threshold=self.detect_threshold,
)
d_075 = get_max_safe_distance(
meta_data,
downsampled_waypoints,
t=0.75,
collision_buffer=self.collision_buffer,
threshold=self.detect_threshold,
)
d_1 = get_max_safe_distance(
meta_data,
downsampled_waypoints,
t=1,
collision_buffer=self.collision_buffer,
threshold=self.detect_threshold,
)
d_15 = get_max_safe_distance(
meta_data,
downsampled_waypoints,
t=1.5,
collision_buffer=self.collision_buffer,
threshold=self.detect_threshold,
)
d_2 = get_max_safe_distance(
meta_data,
downsampled_waypoints,
t=2,
collision_buffer=self.collision_buffer,
threshold=self.detect_threshold,
)
d_05 = min(d_0, d_05, d_075)
d_1 = min(d_05, d_075, d_15, d_2)
safe_dis = min(d_05, d_1)
d_0 = max(0, d_0 - 2.0)
d_05 = max(0, d_05 - 2.0)
d_1 = max(0, d_1 - 2.0)
# --- تفعيل الفرملة فقط إذا كانت الإشارة حمراء أو هناك علامة Stop ---
if traffic_light_state > 0.5:
brake = True
desired_speed = 0.0
elif stop_sign > 0.6 and traffic_light_state <= 0.5:
if self.stop_sign_timer < 20:
brake = True
desired_speed = 0.0
self.stop_sign_timer += 1
else:
brake = False
desired_speed = max(0, min(self.config.max_speed, speed + 0.2))
else:
brake = False
desired_speed = max(0, min(self.config.max_speed, speed + 0.2))
delta = np.clip(desired_speed - speed, 0.0, self.config.clip_delta)
throttle = self.speed_controller.step(delta)
throttle = np.clip(throttle, 0.0, self.config.max_throttle)
# --- إذا كانت السرعة أعلى من 1.1 مرة السرعة المستهدفة، نفرم ---
if speed > desired_speed * self.config.brake_ratio:
brake = True
# --- إعداد معلومات التشخيص ---
meta_info_1 = f"speed: {speed:.2f}, target_speed: {desired_speed:.2f}"
meta_info_2 = f"on_road_prob: {junction:.2f}, red_light_prob: {traffic_light_state:.2f}, stop_sign_prob: {1 - stop_sign:.2f}"
meta_info_3 = f"stop_steps: {self.stop_steps}, block_stop_sign_distance: {self.block_stop_sign_distance:.1f}"
# --- حالة خاصة بعد فترة طويلة من التوقف ---
if self.stop_steps > 1200:
self.forced_forward_steps = 12
self.stop_steps = 0
if self.forced_forward_steps > 0:
throttle = 0.8
brake = False
self.forced_forward_steps -= 1
if self.in_stop_sign_effect:
throttle = 0
brake = True
return steer, throttle, brake, (meta_info_1, meta_info_2, meta_info_3, safe_dis)
class ControllerConfig:
turn_KP, turn_KI, turn_KD, turn_n = 1.0, 0.1, 0.1, 20
speed_KP, speed_KI, speed_KD, speed_n = 0.5, 0.05, 0.1, 20
max_speed, max_throttle, clip_delta = 6.0, 0.75, 0.25
collision_buffer, detect_threshold = [0.0, 0.0], 0.04
brake_speed, brake_ratio = 0.4, 1.1
# ================== 5. واجهة العرض ==================
class DisplayInterface:
def __init__(self, width=1200, height=600):
self._width = width
self._height = height
def run_interface(self, data):
dashboard = np.zeros((self._height, self._width, 3), dtype=np.uint8)
font = cv2.FONT_HERSHEY_SIMPLEX
dashboard[:, :800] = cv2.resize(data.get('camera_view'), (800, 600))
dashboard[:400, 800:1200] = cv2.resize(data['map_t0'], (400, 400))
dashboard[400:600, 800:1000] = cv2.resize(data['map_t1'], (200, 200))
dashboard[400:600, 1000:1200] = cv2.resize(data['map_t2'], (200, 200))
# خطوط فصل
cv2.line(dashboard, (800, 0), (800, 600), (255, 255, 255), 2)
cv2.line(dashboard, (800, 400), (1200, 400), (255, 255, 255), 2)
cv2.line(dashboard, (1000, 400), (1000, 600), (255, 255, 255), 2)
y_pos = 40
for key, text in data['text_info'].items():
cv2.putText(dashboard, text, (820, y_pos), font, 0.6, (255, 255, 255), 1)
y_pos += 30
y_pos += 10
for t, counts in data['object_counts'].items():
count_str = f"{t}: C={counts['car']} B={counts['bike']} P={counts['pedestrian']}"
cv2.putText(dashboard, count_str, (820, y_pos), font, 0.5, (255, 255, 255), 1)
y_pos += 20
cv2.putText(dashboard, "t0", (1160, 30), font, 0.8, (0, 255, 255), 2)
cv2.putText(dashboard, "t1", (960, 430), font, 0.8, (0, 255, 255), 2)
cv2.putText(dashboard, "t2", (1160, 430), font, 0.8, (0, 255, 255), 2)
return dashboard
from torch.utils.data import random_split
# --- تحديد التحوّلات ---
transform = transforms.Compose([
transforms.ToPILImage(),
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
lidar_transform = transforms.Compose([
transforms.ToPILImage(),
transforms.Resize((112, 112)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.5], std=[0.5]),
])
class LMDriveDataset(Dataset):
def __init__(self, data_dir, transform=None, lidar_transform=None):
self.data_dir = Path(data_dir)
self.transform = transform
self.lidar_transform = lidar_transform
self.samples = []
measurement_dir = self.data_dir / "measurements"
image_dir = self.data_dir / "rgb_full"
measurement_files = sorted([f for f in os.listdir(measurement_dir) if f.endswith(".json")])
image_files = sorted([f for f in os.listdir(image_dir) if f.endswith(".jpg")])
num_samples = min(len(measurement_files), len(image_files))
for i in range(num_samples):
frame_id = i
measurement_path = str(measurement_dir / f"{frame_id:04d}.json")
image_name = f"{frame_id:04d}.jpg"
image_path = str(image_dir / image_name)
if not os.path.exists(measurement_path) or not os.path.exists(image_path):
continue
with open(measurement_path, "r") as f:
measurements_data = json.load(f)
self.samples.append({
"image_path": image_path,
"measurement_path": measurement_path,
"frame_id": frame_id,
"measurements": measurements_data
})
def __len__(self):
return len(self.samples)
def __getitem__(self, idx):
sample = self.samples[idx]
# قراءة الصورة الكاملة (2400x800)
full_image = cv2.imread(sample["image_path"])
if full_image is None:
raise ValueError(f"Failed to load image: {sample['image_path']}")
full_image = cv2.cvtColor(full_image, cv2.COLOR_BGR2RGB)
# تقسيم الصورة إلى أجزاء (كل جزء 600x800)
front_image = full_image[:600, :800] # الجزء الأول
left_image = full_image[600:1200, :800] # الجزء الثاني
right_image = full_image[1200:1800, :800] # الجزء الثالث
center_image = full_image[1800:2400, :800]# الجزء الرابع
# تطبيق التحويل على كل صورة
front_image_tensor = self.transform(front_image)
left_image_tensor = self.transform(left_image)
right_image_tensor = self.transform(right_image)
center_image_tensor = self.transform(center_image)
# تحميل الليدار
lidar_path = str(self.data_dir / "lidar" / f"{sample['frame_id']:04d}.png")
lidar = cv2.imread(lidar_path)
if lidar is None:
lidar = np.zeros((112, 112, 3), dtype=np.uint8) # مكان فارغ
else:
if len(lidar.shape) == 2:
lidar = cv2.cvtColor(lidar, cv2.COLOR_GRAY2BGR)
lidar = cv2.cvtColor(lidar, cv2.COLOR_BGR2RGB)
lidar_tensor = self.lidar_transform(lidar)
# استخراج القياسات
measurements_data = sample["measurements"]
x = measurements_data.get("x", 0.0)
y = measurements_data.get("y", 0.0)
theta = measurements_data.get("theta", 0.0)
speed = measurements_data.get("speed", 0.0)
steer = measurements_data.get("steer", 0.0)
throttle = measurements_data.get("throttle", 0.0)
brake = int(measurements_data.get("brake", False))
command = measurements_data.get("command", 0)
is_junction = int(measurements_data.get("is_junction", False))
should_brake = int(measurements_data.get("should_brake", 0))
x_command = measurements_data.get("x_command", 0.0)
y_command = measurements_data.get("y_command", 0.0)
target_point = torch.tensor([x_command, y_command], dtype=torch.float32)
measurements = torch.tensor(
[x, y, theta, speed, steer, throttle, brake, command, is_junction, should_brake],
dtype=torch.float32
)
return {
"rgb": front_image_tensor,
"rgb_left": left_image_tensor,
"rgb_right": right_image_tensor,
"rgb_center": center_image_tensor,
"lidar": lidar_tensor,
"measurements": measurements,
"target_point": target_point
}
# ==============================================================================
# SECTION 1: HELPER CLASSES (Copied from original implementation)
# ==============================================================================
def to_2tuple(x):
if isinstance(x, tuple):
return x
return (x, x)
class HybridEmbed(nn.Module):
def __init__(self, backbone, img_size=224, patch_size=1, feature_size=None, in_chans=3, embed_dim=768):
super().__init__()
img_size = to_2tuple(img_size)
self.img_size = img_size
self.patch_size = to_2tuple(patch_size)
self.backbone = backbone
if feature_size is None:
with torch.no_grad():
training = backbone.training
if training:
backbone.eval()
o = self.backbone(torch.zeros(1, in_chans, img_size[0], img_size[1]))
if isinstance(o, (list, tuple)):
o = o[-1]
feature_size = o.shape[-2:]
feature_dim = o.shape[1]
backbone.train(training)
else:
feature_size = to_2tuple(feature_size)
if hasattr(self.backbone, 'feature_info'):
feature_dim = self.backbone.feature_info.channels()[-1]
else:
feature_dim = self.backbone.num_features
self.proj = nn.Conv2d(feature_dim, embed_dim, kernel_size=1)
def forward(self, x):
x = self.backbone(x)
if isinstance(x, (list, tuple)):
x = x[-1]
x = self.proj(x)
return x
class PositionEmbeddingSine(nn.Module):
def __init__(self, num_pos_feats=64, temperature=10000, normalize=False, scale=None):
super().__init__()
self.num_pos_feats = num_pos_feats
self.temperature = temperature
self.normalize = normalize
if scale is not None and normalize is False:
raise ValueError("normalize should be True if scale is passed")
if scale is None:
scale = 2 * math.pi
self.scale = scale
def forward(self, tensor):
x = tensor
bs, _, h, w = x.shape
not_mask = torch.ones((bs, h, w), device=x.device)
y_embed = not_mask.cumsum(1, dtype=torch.float32)
x_embed = not_mask.cumsum(2, dtype=torch.float32)
if self.normalize:
eps = 1e-6
y_embed = y_embed / (y_embed[:, -1:, :] + eps) * self.scale
x_embed = x_embed / (x_embed[:, :, -1:] + eps) * self.scale
dim_t = torch.arange(self.num_pos_feats, dtype=torch.float32, device=x.device)
dim_t = self.temperature ** (2 * (dim_t // 2) / self.num_pos_feats)
pos_x = x_embed[:, :, :, None] / dim_t
pos_y = y_embed[:, :, :, None] / dim_t
pos_x = torch.stack((pos_x[:, :, :, 0::2].sin(), pos_x[:, :, :, 1::2].cos()), dim=4).flatten(3)
pos_y = torch.stack((pos_y[:, :, :, 0::2].sin(), pos_y[:, :, :, 1::2].cos()), dim=4).flatten(3)
pos = torch.cat((pos_y, pos_x), dim=3).permute(0, 3, 1, 2)
return pos
class TransformerEncoder(nn.Module):
def __init__(self, encoder_layer, num_layers, norm=None):
super().__init__()
self.layers = _get_clones(encoder_layer, num_layers)
self.num_layers = num_layers
self.norm = norm
def forward(self, src, mask: Optional[Tensor] = None, src_key_padding_mask: Optional[Tensor] = None, pos: Optional[Tensor] = None):
output = src
for layer in self.layers:
output = layer(output, src_mask=mask, src_key_padding_mask=src_key_padding_mask, pos=pos)
if self.norm is not None:
output = self.norm(output)
return output
class TransformerDecoder(nn.Module):
def __init__(self, decoder_layer, num_layers, norm=None, return_intermediate=False):
super().__init__()
self.layers = _get_clones(decoder_layer, num_layers)
self.num_layers = num_layers
self.norm = norm
self.return_intermediate = return_intermediate
def forward(self, tgt, memory, tgt_mask: Optional[Tensor] = None, memory_mask: Optional[Tensor] = None, tgt_key_padding_mask: Optional[Tensor] = None, memory_key_padding_mask: Optional[Tensor] = None, pos: Optional[Tensor] = None, query_pos: Optional[Tensor] = None):
output = tgt
intermediate = []
for layer in self.layers:
output = layer(output, memory, tgt_mask=tgt_mask, memory_mask=memory_mask, tgt_key_padding_mask=tgt_key_padding_mask, memory_key_padding_mask=memory_key_padding_mask, pos=pos, query_pos=query_pos)
if self.return_intermediate: intermediate.append(self.norm(output))
if self.norm is not None:
output = self.norm(output)
if self.return_intermediate: intermediate.pop(); intermediate.append(output)
return torch.stack(intermediate) if self.return_intermediate else output.unsqueeze(0)
class TransformerEncoderLayer(nn.Module):
def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1, activation=nn.ReLU, normalize_before=False):
super().__init__()
self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
self.linear1 = nn.Linear(d_model, dim_feedforward); self.dropout = nn.Dropout(dropout); self.linear2 = nn.Linear(dim_feedforward, d_model)
self.norm1 = nn.LayerNorm(d_model); self.norm2 = nn.LayerNorm(d_model)
self.dropout1 = nn.Dropout(dropout); self.dropout2 = nn.Dropout(dropout)
self.activation = activation(); self.normalize_before = normalize_before
def with_pos_embed(self, tensor, pos: Optional[Tensor]): return tensor if pos is None else tensor + pos
def forward(self, src, src_mask: Optional[Tensor] = None, src_key_padding_mask: Optional[Tensor] = None, pos: Optional[Tensor] = None):
q = k = self.with_pos_embed(src, pos)
src2 = self.self_attn(q, k, value=src, attn_mask=src_mask, key_padding_mask=src_key_padding_mask)[0]
src = src + self.dropout1(src2)
src = self.norm1(src)
src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))
src = src + self.dropout2(src2)
src = self.norm2(src)
return src
class TransformerDecoderLayer(nn.Module):
def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1, activation=nn.ReLU, normalize_before=False):
super().__init__()
self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
self.multihead_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
self.linear1 = nn.Linear(d_model, dim_feedforward); self.dropout = nn.Dropout(dropout); self.linear2 = nn.Linear(dim_feedforward, d_model)
self.norm1 = nn.LayerNorm(d_model); self.norm2 = nn.LayerNorm(d_model); self.norm3 = nn.LayerNorm(d_model)
self.dropout1 = nn.Dropout(dropout); self.dropout2 = nn.Dropout(dropout); self.dropout3 = nn.Dropout(dropout)
self.activation = activation(); self.normalize_before = normalize_before
def with_pos_embed(self, tensor, pos: Optional[Tensor]): return tensor if pos is None else tensor + pos
def forward(self, tgt, memory, tgt_mask: Optional[Tensor] = None, memory_mask: Optional[Tensor] = None, tgt_key_padding_mask: Optional[Tensor] = None, memory_key_padding_mask: Optional[Tensor] = None, pos: Optional[Tensor] = None, query_pos: Optional[Tensor] = None):
q = k = self.with_pos_embed(tgt, query_pos)
tgt2 = self.self_attn(q, k, value=tgt, attn_mask=tgt_mask, key_padding_mask=tgt_key_padding_mask)[0]
tgt = tgt + self.dropout1(tgt2)
tgt = self.norm1(tgt)
tgt2 = self.multihead_attn(query=self.with_pos_embed(tgt, query_pos), key=self.with_pos_embed(memory, pos), value=memory, attn_mask=memory_mask, key_padding_mask=memory_key_padding_mask)[0]
tgt = tgt + self.dropout2(tgt2)
tgt = self.norm2(tgt)
tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt))))
tgt = tgt + self.dropout3(tgt2)
tgt = self.norm3(tgt)
return tgt
def _get_clones(module, N): return nn.ModuleList([copy.deepcopy(module) for i in range(N)])
class LinearWaypointsPredictor(nn.Module):
def __init__(self, input_dim, cumsum=True):
super().__init__()
self.cumsum = cumsum
self.rank_embed = nn.Parameter(torch.zeros(1, 10, input_dim))
self.head_fc1_list = nn.ModuleList([nn.Linear(input_dim, 64) for _ in range(6)])
self.head_relu = nn.ReLU(inplace=True)
self.head_fc2_list = nn.ModuleList([nn.Linear(64, 2) for _ in range(6)])
def forward(self, x, measurements):
bs, n, dim = x.shape
x = (x + self.rank_embed).reshape(-1, dim)
mask = measurements[:, :6].unsqueeze(-1).repeat(n, 1, 2)
rs = [self.head_fc2_list[i](self.head_relu(self.head_fc1_list[i](x))) for i in range(6)]
x = torch.sum(torch.stack(rs, 1) * mask, dim=1).view(bs, n, 2)
return torch.cumsum(x, 1) if self.cumsum else x
class GRUWaypointsPredictor(nn.Module):
def __init__(self, input_dim, waypoints=10):
super().__init__()
self.gru = torch.nn.GRU(input_size=input_dim, hidden_size=64, batch_first=True)
self.encoder = nn.Linear(2, 64)
self.decoder = nn.Linear(64, 2)
self.waypoints = waypoints
def forward(self, x, target_point):
bs = x.shape[0]
z = self.encoder(target_point).unsqueeze(0)
output, _ = self.gru(x, z)
output = self.decoder(output.reshape(bs * self.waypoints, -1)).reshape(bs, self.waypoints, 2)
return torch.cumsum(output, 1)
# ==============================================================================
# SECTION 2: CONTROL LOGIC CLASSES (From 2nd script)
# ==============================================================================
class PIDController:
def __init__(self, K_P=1.0, K_I=0.0, K_D=0.0, n=20):
self._K_P = K_P
self._K_I = K_I
self._K_D = K_D
self._window = deque([0 for _ in range(n)], maxlen=n)
def step(self, error):
self._window.append(error)
if len(self._window) >= 2:
integral = np.mean(self._window)
derivative = self._window[-1] - self._window[-2]
else:
integral = 0.0
derivative = 0.0
return self._K_P * error + self._K_I * integral + self._K_D * derivative
class ControllerConfig:
turn_KP, turn_KI, turn_KD, turn_n = 1.0, 0.1, 0.1, 20
speed_KP, speed_KI, speed_KD, speed_n = 0.5, 0.05, 0.1, 20
max_speed, max_throttle, clip_delta = 6.0, 0.75, 0.25
collision_buffer, detect_threshold = [0.0, 0.0], 0.04
brake_speed, brake_ratio = 0.4, 1.1
class InterfuserController:
def __init__(self, config: ControllerConfig):
self.turn_controller = PIDController(K_P=config.turn_KP, K_I=config.turn_KI, K_D=config.turn_KD, n=config.turn_n)
self.speed_controller = PIDController(K_P=config.speed_KP, K_I=config.speed_KI, K_D=config.speed_KD, n=config.speed_n)
self.config = config
self.stop_steps = 0
self.red_light_steps = 0
def run_step(self, speed, waypoints, junction, traffic_light_state, stop_sign, meta_data):
if speed < 0.2: self.stop_steps += 1
else: self.stop_steps = max(0, self.stop_steps - 10)
aim = (waypoints[1] + waypoints[0]) / 2.0
angle = np.degrees(np.pi / 2 - np.arctan2(aim[1], aim[0])) / 90
if speed < 0.01: angle = 0.0
steer = self.turn_controller.step(angle)
steer = np.clip(steer, -1.0, 1.0)
brake = False
# Simplified speed control logic for clarity
if traffic_light_state > 0.5 or stop_sign > 0.5:
desired_speed = 0.0
brake = True
else:
# A simple logic to move forward towards a target speed
desired_speed = self.config.max_speed
if speed > desired_speed:
brake = True
delta = np.clip(desired_speed - speed, 0.0, self.config.clip_delta)
throttle = self.speed_controller.step(delta)
throttle = np.clip(throttle, 0.0, self.config.max_throttle)
if brake:
throttle = 0.0
metadata = {
"speed": f"{speed:.2f}", "target_speed": f"{desired_speed:.2f}",
"steer": f"{steer:.2f}", "throttle": f"{throttle:.2f}", "brake": f"{brake}",
"junction": f"{junction:.2f}", "light_state": f"{traffic_light_state:.2f}", "stop_sign": f"{stop_sign:.2f}"
}
return steer, throttle, brake, metadata
# ==============================================================================
# SECTION 3: HUGGING FACE MODEL CONFIGURATION
# ==============================================================================
class InterfuserConfig(PretrainedConfig):
model_type = "interfuser"
def __init__(self, img_size=224, patch_size=8, in_chans=3, embed_dim=768, enc_depth=6, dec_depth=6,
dim_feedforward=2048, normalize_before=False, rgb_backbone_name="r26", lidar_backbone_name="r26",
num_heads=8, dropout=0.1, waypoints_pred_head="linear-sum", use_view_embed=True, **kwargs):
super().__init__(**kwargs)
self.img_size = img_size
self.patch_size = patch_size
self.in_chans = in_chans
self.embed_dim = embed_dim
self.enc_depth = enc_depth
self.dec_depth = dec_depth
self.dim_feedforward = dim_feedforward
self.normalize_before = normalize_before
self.rgb_backbone_name = rgb_backbone_name
self.lidar_backbone_name = lidar_backbone_name
self.num_heads = num_heads
self.dropout = dropout
self.waypoints_pred_head = waypoints_pred_head
self.use_view_embed = use_view_embed
@dataclass
class InterfuserControlOutput(ModelOutput):
steer: torch.FloatTensor = None
throttle: torch.FloatTensor = None
brake: torch.FloatTensor = None
waypoints: Optional[torch.FloatTensor] = None
traffic_predictions: Optional[torch.FloatTensor] = None
metadata: Optional[dict] = None
# ==============================================================================
# SECTION 4: CORE PREDICTION MODEL
# ==============================================================================
class Interfuser(nn.Module):
def __init__(self, config: InterfuserConfig):
super().__init__()
self.config = config
embed_dim = config.embed_dim
act_layer = nn.GELU
backbone_map = {"r50": resnet50d, "r26": resnet26d, "r18": resnet18d}
rgb_backbone = backbone_map.get(config.rgb_backbone_name, resnet26d)(pretrained=True, in_chans=3, features_only=True, out_indices=[4])
lidar_backbone = backbone_map.get(config.lidar_backbone_name, resnet26d)(pretrained=False, in_chans=3, features_only=True, out_indices=[4])
self.rgb_patch_embed = HybridEmbed(backbone=rgb_backbone, img_size=config.img_size, patch_size=config.patch_size, in_chans=3, embed_dim=embed_dim)
self.lidar_patch_embed = HybridEmbed(backbone=lidar_backbone, img_size=config.img_size, patch_size=config.patch_size, in_chans=3, embed_dim=embed_dim)
self.use_view_embed = config.use_view_embed
if self.use_view_embed:
self.view_embed = nn.Parameter(torch.zeros(1, embed_dim, 5, 1))
self.global_embed = nn.Parameter(torch.zeros(1, embed_dim, 5))
nn.init.uniform_(self.view_embed)
nn.init.uniform_(self.global_embed)
self.query_pos_embed = nn.Parameter(torch.zeros(1, embed_dim, 11))
self.query_embed = nn.Parameter(torch.zeros(400 + 11, 1, embed_dim))
nn.init.uniform_(self.query_pos_embed)
nn.init.uniform_(self.query_embed)
self.waypoints_generator = LinearWaypointsPredictor(embed_dim, cumsum=True)
self.junction_pred_head = nn.Linear(embed_dim, 2)
self.traffic_light_pred_head = nn.Linear(embed_dim, 2)
self.stop_sign_head = nn.Linear(embed_dim, 2)
self.traffic_pred_head = nn.Sequential(nn.Linear(embed_dim + 32, 64), nn.ReLU(), nn.Linear(64, 7), nn.Sigmoid())
self.position_encoding = PositionEmbeddingSine(embed_dim // 2, normalize=True)
encoder_layer = TransformerEncoderLayer(embed_dim, config.num_heads, config.dim_feedforward, config.dropout, act_layer, config.normalize_before)
self.encoder = TransformerEncoder(encoder_layer, config.enc_depth, None)
decoder_layer = TransformerDecoderLayer(embed_dim, config.num_heads, config.dim_feedforward, config.dropout, act_layer, config.normalize_before)
decoder_norm = nn.LayerNorm(embed_dim)
self.decoder = TransformerDecoder(decoder_layer, config.dec_depth, decoder_norm, return_intermediate=False)
def forward_features(self, images, lidar):
features = []
token_embeds = [self.rgb_patch_embed(images[:, i]) for i in range(4)]
token_embeds.append(self.lidar_patch_embed(lidar))
for i, token_embed in enumerate(token_embeds):
pos_embed = self.position_encoding(token_embed)
if self.use_view_embed:
token_embed += self.view_embed[:, :, i:i+1, :]
spatial_token = (token_embed + pos_embed).flatten(2).permute(2, 0, 1)
global_token = torch.mean(token_embed, [2,3], keepdim=False)[:,:,None].permute(2,0,1)
if self.use_view_embed:
global_token += self.global_embed[:,:,i:i+1].permute(2,0,1)
features.extend([spatial_token, global_token])
return torch.cat(features, 0)
def forward(self, inputs):
images = torch.stack([inputs['rgb'], inputs['rgb_left'], inputs['rgb_right'], inputs['rgb_center']], dim=1)
lidar = inputs['lidar']
measurements = inputs['measurements']
target_point = inputs['target_point']
bs = images.shape[0]
features = self.forward_features(images, lidar)
tgt = self.position_encoding(torch.ones((bs, 1, 20, 20), device=images.device)).flatten(2)
tgt = torch.cat([tgt, self.query_pos_embed.repeat(bs, 1, 1)], 2)
tgt = tgt.permute(2, 0, 1)
memory = self.encoder(features)
hs = self.decoder(self.query_embed.repeat(1, bs, 1), memory, query_pos=tgt)[0].permute(1, 0, 2)
traffic_feature, is_junction_feature, waypoints_feature = hs[:, :400], hs[:, 400], hs[:, 401:411]
waypoints = self.waypoints_generator(waypoints_feature, measurements)
is_junction = self.junction_pred_head(is_junction_feature)
traffic_light_state = self.traffic_light_pred_head(is_junction_feature)
stop_sign = self.stop_sign_head(is_junction_feature)
velocity = measurements[:, 6:7].unsqueeze(-1).repeat(1, 400, 32)
traffic_feature_with_vel = torch.cat([traffic_feature, velocity], dim=2)
traffic = self.traffic_pred_head(traffic_feature_with_vel)
return traffic, waypoints, is_junction, traffic_light_state, stop_sign, traffic_feature
# ==============================================================================
# SECTION 5: THE FINAL INTEGRATED MODEL FOR HUGGING FACE
# ==============================================================================
class InterfuserForEndToEndControl(PreTrainedModel):
config_class = InterfuserConfig
def __init__(self, config: InterfuserConfig):
super().__init__(config)
self.model = Interfuser(config)
self.controller = InterfuserController(ControllerConfig())
self.WAYPOINT_SCALE_FACTOR = 5.0
def _init_weights(self, module):
if hasattr(module, 'reset_parameters'):
module.reset_parameters()
elif isinstance(module, nn.Linear):
module.weight.data.normal_(mean=0.0, std=0.02)
if module.bias is not None:
module.bias.data.zero_()
def forward(
self,
rgb: torch.FloatTensor,
rgb_left: torch.FloatTensor,
rgb_right: torch.FloatTensor,
rgb_center: torch.FloatTensor,
lidar: torch.FloatTensor,
measurements: torch.FloatTensor,
target_point: torch.FloatTensor,
return_dict: Optional[bool] = None,
) -> Union[Tuple, InterfuserControlOutput]:
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
if self.training:
raise NotImplementedError("This end-to-end model is designed for inference only.")
if rgb.shape[0] > 1:
raise NotImplementedError("End-to-end control model currently supports batch_size=1 only.")
inputs_for_model = {"rgb": rgb, "rgb_left": rgb_left, "rgb_right": rgb_right, "rgb_center": rgb_center,
"lidar": lidar, "measurements": measurements, "target_point": target_point}
(traffic, waypoints, is_junction, traffic_light_state, stop_sign, _) = self.model(inputs_for_model)
speed_mps = measurements[0, 6].item()
traffic_np = traffic[0].detach().cpu().numpy().reshape(20, 20, -1)
waypoints_np = waypoints[0].detach().cpu().numpy().reshape(-1, 2) * self.WAYPOINT_SCALE_FACTOR
steer, throttle, brake, metadata = self.controller.run_step(
speed=speed_mps,
waypoints=waypoints_np,
junction=is_junction.sigmoid()[0, 1].item(),
traffic_light_state=traffic_light_state.sigmoid()[0, 0].item(),
stop_sign=stop_sign.sigmoid()[0, 1].item(),
meta_data=traffic_np
)
if not return_dict:
return (steer, throttle, brake)
return InterfuserControlOutput(
steer=torch.tensor(steer, device=self.device),
throttle=torch.tensor(throttle, device=self.device),
brake=torch.tensor(brake, device=self.device),
waypoints=waypoints,
traffic_predictions=traffic,
metadata=metadata
)
def reset(self):
self.controller = InterfuserController(ControllerConfig())
print("Control logic has been reset.")