""" Trajectory Filtering Visualizer (x,y,time) with Preimage Visualization ===================================================================== """ import math import random # kept, though not required in current code; OK for future extensions import tkinter as tk from tkinter import messagebox import numpy as np import matplotlib matplotlib.use("TkAgg") from matplotlib import pyplot as plt from mpl_toolkits.mplot3d import Axes3D # noqa: F401 # ============================================================================= # 1) Environment: Square [-1,1] x [-1,1] # ============================================================================= SQUARE_HALF = 1.0 def in_env_square(x, y): return abs(x) <= SQUARE_HALF + 1e-12 and abs(y) <= SQUARE_HALF + 1e-12 def distance_to_boundary_square(x, y): """ Distance to closest boundary for the square [-1,1]x[-1,1]. """ return min(SQUARE_HALF - abs(x), SQUARE_HALF - abs(y)) # ============================================================================= # 3) Preimage helpers (for NEW sensor model) # ============================================================================= def preimage_boundary_points(d_target, tol, N=120): """ Sample the (x,y) plane and return points where |distance_to_boundary_square(x,y) - d_target| <= tol """ xs = np.linspace(-1, 1, N) ys = np.linspace(-1, 1, N) PX, PY = [], [] for x in xs: for y in ys: if abs(distance_to_boundary_square(x, y) - d_target) <= tol: PX.append(x) PY.append(y) return np.array(PX), np.array(PY) def preimage_x_region_points(x_target, tol_x, N=120): """ Vertical strip preimage: |x - x_target| <= tol_x """ xs = np.linspace(-1, 1, N) ys = np.linspace(-1, 1, N) PX, PY = [], [] for x in xs: if abs(x - x_target) <= tol_x: for y in ys: PX.append(x) PY.append(y) return np.array(PX), np.array(PY) def preimage_y_region_points(y_target, tol_y, N=120): """ Horizontal strip preimage: |y - y_target| <= tol_y """ xs = np.linspace(-1, 1, N) ys = np.linspace(-1, 1, N) PX, PY = [], [] for y in ys: if abs(y - y_target) <= tol_y: for x in xs: PX.append(x) PY.append(y) return np.array(PX), np.array(PY) def preimage_combined_points( d_boundary, tol_boundary, x_targets, tol_x, y_targets, tol_y, N=140 ): """ Compute a combined preimage for a set of constraints at a given time slice. Constraints: - If d_boundary is not None: |distance_to_boundary_square(x,y) - d_boundary| <= tol_boundary - For each x in x_targets (non-None): |x - x_target| <= tol_x - For each y in y_targets (non-None): |y - y_target| <= tol_y Returns: (PX, PY) arrays of points satisfying ALL active constraints. This is the most directly meaningful visualization for pruning: it shows the region that would "survive" based on measurements alone at that time slice. """ xs = np.linspace(-1, 1, N) ys = np.linspace(-1, 1, N) PX, PY = [], [] # Filter out None targets safely x_targets = [xt for xt in x_targets if xt is not None] y_targets = [yt for yt in y_targets if yt is not None] for x in xs: # x-strip constraints ok_x = True for xt in x_targets: if abs(x - xt) > tol_x: ok_x = False break if not ok_x: continue for y in ys: # y-strip constraints ok_y = True for yt in y_targets: if abs(y - yt) > tol_y: ok_y = False break if not ok_y: continue # boundary constraint if d_boundary is not None: if abs(distance_to_boundary_square(x, y) - d_boundary) > tol_boundary: continue PX.append(x) PY.append(y) return np.array(PX), np.array(PY) # ============================================================================= # 4) Particle filtering / trajectory sampling # ============================================================================= class ParticleSet: """ Each particle has state (x, y, theta, v) and a trajectory history of (x, y, t). IMPORTANT: - Even though sensors no longer use heading theta, theta remains in the state for motion propagation. """ def __init__(self): self.x = np.array([], dtype=float) self.y = np.array([], dtype=float) self.theta = np.array([], dtype=float) # radians self.v = np.array([], dtype=float) self.history = [] # list of np arrays shape (k,3) per particle def size(self): return len(self.x) def initialize(self, x0_list, y0_list, theta_list, v_list): # Cartesian product initialization particles = [] for x0 in x0_list: for y0 in y0_list: if not in_env_square(x0, y0): continue for th0 in theta_list: for v0 in v_list: particles.append((x0, y0, th0, v0)) if not particles: self.x = np.array([], dtype=float) self.y = np.array([], dtype=float) self.theta = np.array([], dtype=float) self.v = np.array([], dtype=float) self.history = [] return arr = np.array(particles, dtype=float) self.x = arr[:, 0].copy() self.y = arr[:, 1].copy() self.theta = arr[:, 2].copy() self.v = arr[:, 3].copy() self.history = [] for i in range(len(self.x)): self.history.append(np.array([[self.x[i], self.y[i], 0.0]], dtype=float)) def _reflect_if_outside(self, xnew, ynew): """ Legacy reflection function (currently disabled in your original code). You can re-enable reflection later if you want particles to stay inside. For now, to preserve behavior, we keep it as "no-op" exactly like your code. """ # Original reflection logic was commented out; we keep it off: xnew = min(max(xnew, -SQUARE_HALF), SQUARE_HALF) ynew = min(max(ynew, -SQUARE_HALF), SQUARE_HALF) return xnew, ynew def propagate( self, t_from, t_to, dt, allow_heading_change, allow_speed_change, max_speed, heading_noise_deg, speed_noise ): """ Propagate all particles from t_from to t_to using step dt. This matches your current code: - If heading change allowed: theta += uniform noise each step - If speed change allowed: v += uniform noise each step (clipped) (You previously mentioned a different noise model; you can adjust later, but here we keep your current implementation stable.) """ if self.size() == 0: return nsteps = max(1, int(round((t_to - t_from) / dt))) dt_eff = (t_to - t_from) / nsteps heading_noise = math.radians(heading_noise_deg) for step in range(nsteps): # random perturbations if allow_heading_change and heading_noise > 0: dth = np.random.uniform(-heading_noise, heading_noise, size=self.size()) self.theta = (self.theta + dth) % (2 * math.pi) if allow_speed_change and speed_noise > 0: dv = np.random.uniform(-speed_noise, speed_noise, size=self.size()) self.v = np.clip(self.v + dv, 0.0, max_speed) # motion update dx = self.v * np.cos(self.theta) * dt_eff dy = self.v * np.sin(self.theta) * dt_eff self.x = self.x + dx self.y = self.y + dy # reflect to stay inside (currently no-op) for i in range(self.size()): xn, yn = self._reflect_if_outside(self.x[i], self.y[i]) self.x[i], self.y[i] = xn, yn t_now = t_from + (step + 1) * dt_eff # append to history for i in range(self.size()): self.history[i] = np.vstack([self.history[i], [self.x[i], self.y[i], t_now]]) def prune( self, t_meas, d_boundary=None, x1=None, y1=None, tol_boundary=0.05, tol_x=0.05, tol_y=0.05 ): """ Keep only particles consistent with available measurements at time t_meas. Sensors: - Distance-to-boundary: |d_boundary(x,y) - d_boundary| <= tol_boundary - Region sensors: x1: |x - x1| <= tol_x y1: |y - y1| <= tol_y Any of x1,y1 can be None => unconstrained. """ if self.size() == 0: return keep = np.ones(self.size(), dtype=bool) # boundary distance constraint if d_boundary is not None: dw = np.array([distance_to_boundary_square(self.x[i], self.y[i]) for i in range(self.size())]) keep &= np.abs(dw - d_boundary) <= tol_boundary # x region constraints if x1 is not None: keep &= (np.abs(self.x - x1) <= tol_x) # y region constraints if y1 is not None: keep &= (np.abs(self.y - y1) <= tol_y) idx = np.where(keep)[0] self.x = self.x[idx] self.y = self.y[idx] self.theta = self.theta[idx] self.v = self.v[idx] self.history = [self.history[i] for i in idx] # ============================================================================= # 5) Visualization (matplotlib 3D): trajectories + time slice plane + preimage # ============================================================================= class Visualizer3D: def __init__(self): plt.ion() self.fig = plt.figure("Trajectory Filtering (𝑥₁,𝑥₂,time)", figsize=(8, 7)) self.ax = self.fig.add_subplot(111, projection="3d") self._last_plane = None def clear(self): self.ax.cla() def draw( self, particles: ParticleSet, t_current, t_next=None, title_extra="", preimages=None, intersection_xy=None ): """ Draw: - time slice plane z=t_current (wireframe) - optional preimage points (faint gray) on that plane - particle trajectories (lines) - particle endpoints at current time (scatter) """ self.clear() # Bounds self.ax.set_xlim(-1, 1) self.ax.set_ylim(-1, 1) self.ax.set_zlim(0, 4) self.ax.set_zticks(np.arange(0, 5, 1)) self.ax.set_xticks(np.arange(-1.0, 1.02, 0.4)) self.ax.set_yticks(np.arange(-1.0, 1.02, 0.4)) self.ax.set_xlabel("x₁") self.ax.set_ylabel("x₂") self.ax.set_zlabel("time") n = particles.size() # Plot trajectories if n == 0: self.ax.text2D(0.05, 0.95, "No trajectories (empty set).", transform=self.ax.transAxes) else: for hist in particles.history: self.ax.plot(hist[:, 0], hist[:, 1], hist[:, 2], linewidth=0.8) # Scatter current endpoints self.ax.scatter(particles.x, particles.y, np.full(n, t_current), s=10) # Draw time slice plane self._draw_time_plane(t_current) if preimages is not None: for name, (px, py) in preimages.items(): if len(px) == 0: continue pz = np.full_like(px, t_current) if name == "boundary": color, alpha = "tab:green", 0.06 elif name == "x1": color, alpha = "tab:blue", 0.06 elif name == "y1": color, alpha = "tab:orange", 0.06 else: color, alpha = "gray", 0.06 self.ax.scatter( px, py, pz, s=6, alpha=alpha, c=color, depthshade=False ) if intersection_xy is not None: px, py = intersection_xy if len(px) > 0: pz = np.full_like(px, t_current) self.ax.scatter( px, py, pz, s=8, alpha=0.1, c="purple", depthshade=False ) self.ax.set_title(f"Survivors: {n} | at t = {t_current:.2f} {title_extra}") self.fig.canvas.draw() self.fig.show() def _draw_time_plane(self, t): xs = np.linspace(-1, 1, 11) ys = np.linspace(-1, 1, 11) X, Y = np.meshgrid(xs, ys) Z = np.full_like(X, t) self.ax.plot_wireframe(X, Y, Z, rstride=1, cstride=1, linewidth=0.4) # ============================================================================= # 6) GUI / App logic # ============================================================================= class App: def reset_to_defaults(self): # -------- Initial conditions -------- self.e_x0.delete(0, tk.END) self.e_y0.delete(0, tk.END) self.e_th0.delete(0, tk.END) self.e_v0.delete(0, tk.END) # -------- Sampling -------- self._set_entry(self.e_xy_grid, "6") self._set_entry(self.e_th_samples, "4") self._set_entry(self.e_v_samples, "4") self._set_entry(self.e_v_max, "0.5") # -------- Motion model -------- self._set_entry(self.e_dt, "0.1") self._set_entry(self.e_heading_noise, "180") self._set_entry(self.e_speed_noise, "0.5") self.var_heading_change.set(0) self.var_speed_change.set(0) # -------- Measurements -------- for e_dw, e_x1, e_y1 in self.meas_entries: e_dw.delete(0, tk.END) e_x1.delete(0, tk.END) e_y1.delete(0, tk.END) # -------- Tolerances -------- self._set_entry(self.e_tol_boundary, "0.2") self._set_entry(self.e_tol_x, "0.2") self._set_entry(self.e_tol_y, "0.2") # -------- Clear particles & redraw -------- self.particles = ParticleSet() self.stage = 0 self.vis.draw( self.particles, t_current=0.0, title_extra="(defaults restored)", preimages=None ) def _set_entry(self, entry, value): entry.delete(0, tk.END) entry.insert(0, value) def __init__(self): self.root = tk.Tk() self.root.tk.call("tk", "scaling", 2.0) self.root.title("Controls: Triangulation + Filtering (Trajectories)") self.vis = Visualizer3D() self.particles = ParticleSet() # Times (fixed): t0=0 then t1..t5 self.measure_times = [0, 1, 2, 3, 4] self.stage = 0 # 0 means at t=0, next measurement is t=1 self._build_controls() # ---------- Helpers to parse optional entries ---------- def _parse_optional_float(self, entry): s = entry.get().strip() if s == "": return None return float(s) def _parse_required_float(self, entry, name): s = entry.get().strip() if s == "": raise ValueError(f"Missing value for {name}") return float(s) def _build_controls(self): # --------------------------------------------------------------------- # Initial conditions # --------------------------------------------------------------------- f_init = tk.LabelFrame(self.root, text="Initial conditions (leave blank = unknown)") f_init.pack(fill="x", padx=10, pady=6) self.e_x0 = self._labeled_entry(f_init, "𝑥₁,₀:", row=0, col=0, default="") self.e_th0 = self._labeled_entry(f_init, "heading 𝜃₀ (deg):", row=0, col=2, default="") self.e_y0 = self._labeled_entry(f_init, "𝑥₂,₀:", row=1, col=0, default="") self.e_v0 = self._labeled_entry(f_init, "speed 𝑠₀:", row=1, col=2, default="") # --------------------------------------------------------------------- # Sampling / discretization # --------------------------------------------------------------------- f_samp = tk.LabelFrame(self.root, text="Sampling (for unknown initial values)") f_samp.pack(fill="x", padx=10, pady=6) self.e_xy_grid = self._labeled_entry(f_samp, "𝑥₁/𝑥₂ grid points:", row=0, col=0, default="6") self.e_v_max = self._labeled_entry(f_samp, "max speed, 𝑠ₘₐₓ:", row=1, col=0, default="0.5") self.e_th_samples = self._labeled_entry(f_samp, "heading samples:", row=0, col=2, default="4") self.e_v_samples = self._labeled_entry(f_samp, "speed samples:", row=1, col=2, default="4") # --------------------------------------------------------------------- # Motion model # --------------------------------------------------------------------- f_motion = tk.LabelFrame(self.root, text="Motion model") f_motion.pack(fill="x", padx=10, pady=6) self.var_heading_change = tk.IntVar(value=0) self.var_speed_change = tk.IntVar(value=0) tk.Checkbutton( f_motion, text="Heading can change", variable=self.var_heading_change ).grid(row=1, column=0, sticky="w", padx=6, pady=2) tk.Checkbutton( f_motion, text="Speed can change", variable=self.var_speed_change ).grid(row=2, column=0, sticky="w", padx=6, pady=2) self.e_dt = self._labeled_entry(f_motion, "time step, Δ𝑡:", row=0, col=0, default="0.1") self.e_heading_noise = self._labeled_entry(f_motion, "max change, Δ𝜃/Δ𝑡:", row=1, col=1, default="180") self.e_speed_noise = self._labeled_entry(f_motion, "max change, Δ𝑠/Δ𝑡:", row=2, col=1, default="0.5") # --------------------------------------------------------------------- # Measurements (UPDATED): # At each time t=0..4, user can set: # - d_closest # - x1, y1 # Blank means unconstrained. # --------------------------------------------------------------------- f_meas = tk.LabelFrame(self.root, text="Measurements at time stages 𝑡 = {0, 1, 2, 3, 4} (leave blank = unconstrained)") f_meas.pack(fill="x", padx=10, pady=6) self.meas_entries = [] for i, t in enumerate(self.measure_times): tk.Label(f_meas, text=f"t={t}").grid(row=i+1, column=0, padx=6, pady=2, sticky="w") e_dw = tk.Entry(f_meas, width=10) # d_closest (distance to boundary) e_x1 = tk.Entry(f_meas, width=10) e_y1 = tk.Entry(f_meas, width=10) e_dw.grid(row=i+1, column=1, padx=4, pady=2) e_x1.grid(row=i+1, column=2, padx=4, pady=2) e_y1.grid(row=i+1, column=3, padx=4, pady=2) if i == 0: tk.Label(f_meas, text="ℎ₁ (dist)").grid(row=0, column=1, padx=4, pady=2, sticky="n") tk.Label(f_meas, text="ℎ₂ (x₁-strip)").grid(row=0, column=2, padx=4, pady=2, sticky="n") tk.Label(f_meas, text="ℎ₃ (x₂-strip)").grid(row=0, column=3, padx=4, pady=2, sticky="n") self.meas_entries.append((e_dw, e_x1, e_y1)) # --------------------------------------------------------------------- # Measurement tolerances (UPDATED): # - tol d_closest (boundary distance tolerance) # - tol_x (used for x1) # - tol_y (used for y1) # --------------------------------------------------------------------- f_tol = tk.LabelFrame(self.root, text="Measurement tolerances") f_tol.pack(fill="x", padx=10, pady=6) self.e_tol_boundary = self._labeled_entry(f_tol, "𝜀₁:", row=0, col=0, default="0.2") self.e_tol_x = self._labeled_entry(f_tol, "𝜀₂:", row=0, col=2, default="0.2") self.e_tol_y = self._labeled_entry(f_tol, "𝜀₃:", row=0, col=4, default="0.2") # --------------------------------------------------------------------- # Buttons # --------------------------------------------------------------------- f_btn = tk.Frame(self.root) f_btn.pack(fill="x", padx=10, pady=10) tk.Button(f_btn, text="Initialize", command=self.on_reset).pack(side="left", padx=5) tk.Button(f_btn, text="Next Stage (propagate + prune)", command=self.on_step).pack(side="left", padx=5) tk.Button(f_btn, text="Run All", command=self.on_run_all).pack(side="left", padx=5) tk.Button(f_btn, text="Quit", fg="red", command=self.root.destroy).pack(side="right", padx=5) # Initial draw self.vis.draw(self.particles, t_current=0.0, title_extra="(not initialized)", preimages=None) tk.Button( f_btn, text="Reset to Defaults", command=self.reset_to_defaults ).pack(side="left", padx=5) def _labeled_entry(self, parent, label, row, col, default=""): tk.Label(parent, text=label).grid(row=row, column=col, padx=6, pady=2, sticky="w") e = tk.Entry(parent, width=10) e.grid(row=row, column=col + 1, padx=4, pady=2, sticky="w") if default != "": e.insert(0, default) return e # ------------------------------------------------------------------------- # Core logic # ------------------------------------------------------------------------- def on_reset(self): try: # Required sampling params n_xy = int(self._parse_required_float(self.e_xy_grid, "x/y grid points")) n_th = int(self._parse_required_float(self.e_th_samples, "heading samples")) n_v = int(self._parse_required_float(self.e_v_samples, "speed samples")) v_max = self._parse_required_float(self.e_v_max, "max speed") # Optional initial conditions x0 = self._parse_optional_float(self.e_x0) y0 = self._parse_optional_float(self.e_y0) th0_deg = self._parse_optional_float(self.e_th0) v0 = self._parse_optional_float(self.e_v0) # Build candidate lists if x0 is None: xs = np.linspace(-1, 1, n_xy) else: xs = np.array([x0], dtype=float) if y0 is None: ys = np.linspace(-1, 1, n_xy) else: ys = np.array([y0], dtype=float) if th0_deg is None: thetas = np.linspace(0, 2 * math.pi, n_th, endpoint=False) else: thetas = np.array([math.radians(th0_deg) % (2 * math.pi)], dtype=float) if v0 is None: # include 0..v_max if n_v <= 1: vs = np.array([min(0.3, v_max)], dtype=float) else: vs = np.linspace(0.0, v_max, n_v) else: vs = np.array([v0], dtype=float) self.particles.initialize(xs, ys, thetas, vs) # --- NEW: prune at t = 0 --- dw, x1, y1 = self._get_measurement_at_stage(1) # row 0 tol_w = self._parse_required_float(self.e_tol_boundary, "tol d_closest") tol_x = self._parse_required_float(self.e_tol_x, "tol_x") tol_y = self._parse_required_float(self.e_tol_y, "tol_y") self.particles.prune( t_meas=0.0, d_boundary=dw, x1=x1, y1=y1, tol_boundary=tol_w, tol_x=tol_x, tol_y=tol_y ) self.stage = 1 # --- NEW: compute preimages for visualization at t = 0 --- any_measurement = (dw is not None) or (x1 is not None) or (y1 is not None) preimage_intersection = None if any_measurement: preimage_intersection = preimage_combined_points( d_boundary=dw, tol_boundary=tol_w, x_targets=[x1], tol_x=tol_x, y_targets=[y1], tol_y=tol_y, N=140 ) preimages = {} if dw is not None: preimages["boundary"] = preimage_boundary_points(dw, tol_w, N=140) if x1 is not None: preimages["x1"] = preimage_x_region_points(x1, tol_x, N=140) if y1 is not None: preimages["y1"] = preimage_y_region_points(y1, tol_y, N=140) self.vis.draw( self.particles, t_current=0.0, title_extra="(pruned at t=0)", preimages=preimages, intersection_xy=preimage_intersection ) #self.vis.draw(self.particles, t_current=0.0, title_extra="(initialized)", preimages=None) except Exception as e: messagebox.showerror("Initialization error", str(e)) def _get_measurement_at_stage(self, stage_index): """ stage_index: 0..4 (corresponding to t=0..4) returns: dw, x1, y1 Each is float or None. """ i = stage_index - 1 e_dw, e_x1, e_y1 = self.meas_entries[i] dw_s = e_dw.get().strip() x1_s = e_x1.get().strip() y1_s = e_y1.get().strip() dw = float(dw_s) if dw_s != "" else None x1 = float(x1_s) if x1_s != "" else None y1 = float(y1_s) if y1_s != "" else None return dw, x1, y1 def on_step(self): if self.particles.size() == 0: messagebox.showinfo("No particles", "Initialize first (Reset / Initialize).") return if self.stage >= len(self.measure_times): messagebox.showinfo("Done", "Already at final time.") return try: dt = self._parse_required_float(self.e_dt, "dt") v_max = self._parse_required_float(self.e_v_max, "max speed") heading_noise = self._parse_required_float(self.e_heading_noise, "heading noise") speed_noise = self._parse_required_float(self.e_speed_noise, "speed noise") tol_w = self._parse_required_float(self.e_tol_boundary, "tol d_closest") tol_x = self._parse_required_float(self.e_tol_x, "tol_x") tol_y = self._parse_required_float(self.e_tol_y, "tol_y") allow_heading_change = bool(self.var_heading_change.get()) allow_speed_change = bool(self.var_speed_change.get()) t_from = float(self.measure_times[self.stage - 1]) t_to = float(self.measure_times[self.stage]) # 1) Propagate self.particles.propagate( t_from=t_from, t_to=t_to, dt=dt, allow_heading_change=allow_heading_change, allow_speed_change=allow_speed_change, max_speed=v_max, heading_noise_deg=heading_noise, speed_noise=speed_noise ) # 2) Read measurements for this time step dw, x1, y1 = self._get_measurement_at_stage(self.stage + 1) # 3) Compute combined preimage for visualization (BEFORE pruning) x_targets = [x1] y_targets = [y1] # If no sensors are provided at this stage, we skip preimage any_measurement = (dw is not None) or (x1 is not None) or (y1 is not None) preimage_intersection = None if any_measurement: preimage_intersection = preimage_combined_points( d_boundary=dw, tol_boundary=tol_w, x_targets=x_targets, tol_x=tol_x, y_targets=y_targets, tol_y=tol_y, N=140 ) preimages = {} # name -> (PX, PY) if dw is not None: preimages["boundary"] = preimage_boundary_points(dw, tol_w, N=140) if x1 is not None: preimages["x1"] = preimage_x_region_points(x1, tol_x, N=140) if y1 is not None: preimages["y1"] = preimage_y_region_points(y1, tol_y, N=140) # 4) Prune self.particles.prune( t_meas=t_to, d_boundary=dw, x1=x1, y1=y1, tol_boundary=tol_w, tol_x=tol_x, tol_y=tol_y ) self.stage += 1 # 5) Title extras if any_measurement: extra = f"(pruned at t={t_to:g})" else: extra = f"(no measurement at t={t_to:g})" # 6) Draw with preimage self.vis.draw( self.particles, t_current=t_to, title_extra=extra, preimages=preimages, intersection_xy=preimage_intersection ) except Exception as e: messagebox.showerror("Step error", str(e)) def on_run_all(self): while self.stage < len(self.measure_times) and self.particles.size() > 0: self.on_step() def run(self): self.root.mainloop() # ============================================================================= # 7) Main # ============================================================================= if __name__ == "__main__": App().run()