""" A submission script client for the GridMap API. This script takes a local data folder that contains map files, and sees which ones are either new or contain information that needs to be updated to the API. This script has been created as an example for the Programmable Web Project course at the University of Oulu. License: MIT Author: Mika Oja """ import argparse import os import json import requests from packaging.version import Version from slugify import slugify from urllib.parse import urljoin BUILT_AGAINST = "1.0.0" class APIDataSource: """ Data source class that acts as a wrapper for making API calls with requests. """ def __init__(self, host, ca_cert=None): assert host.startswith("http"), "No protocol in host address" self.host = host self.session = requests.Session() if ca_cert: self.session.verify = ca_cert # These two make the class compatible with with statements. def __enter__(self): return self def __exit__(self, exc_type, exc_value, exc_tb): self.session.close() def _get(self, uri): response = self.session.get(urljoin(self.host, uri)) assert response.status_code == 200 return response.json() def _post(self, uri, data): response = self.session.post(urljoin(self.host, uri), json=data) assert response.status_code == 201 return response.headers.get("Location") def _put(self, uri, data): response = self.session.put(urljoin(self.host, uri), json=data) assert response.status_code == 204 def _delete(self, uri): response = self.session.delete(urljoin(self.host, uri)) assert response.status_code == 204 def check_api_version(self, require_version): response = self._get("/api/") return Version(response["api_version"]) == Version(require_version) def get_maps(self): return self._get("/api/maps/") def get_map(self, map_slug): return self._get(f"/api/maps/{map_slug}/") def create_map(self, map_data): self._post("/api/maps/", map_data) def create_observer(self, map_slug, observer_data): self._post(f"/api/maps/{map_slug}/observers/", observer_data) def create_obstacle(self, map_slug, x, y): self._post(f"/api/maps/{map_slug}/obstacles/", {"x": x, "y": y}) def update_map(self, map_slug, map_data): self._put(f"/api/maps/{map_slug}/", map_data) def update_observer(self, map_slug, observer_data): self._put(f"/api/maps/{map_slug}/observers/{observer_data["slug"]}/", observer_data) def delete_observer(self, map_slug, observer_slug): self._delete(f"/api/maps/{map_slug}/observers/{observer_slug}/") def delete_obstacle(self, map_slug, x, y): self._delete(f"/api/maps/{map_slug}/obstacles/x/y/") class Map: def __init__(self): self.name = "" self.slug = "" self.width = 0 self.height = 0 self.observers = {} self.obstacles = set() # These two make the class compatible with sets def __hash__(self): return hash(self.slug) def __eq__(self, other): return self.slug == other.slug def __str__(self): return self.name def serialize_json(self): return { "name": self.name, "slug": self.slug, "width": self.width, "height": self.height, } def attribute_difference(self, other): return self.width != other.width or self.height != other.height: def observer_difference(self, other): new = [] changed = [] gone = [] for slug, observer_data in self.observers.items(): if slug in other: if any(observer_data[key] != other[slug][key] for key in observer_data): changed.append(observer_data) else: new.append(observer_data) for slug in other: if slug not in self.observers: gone.append(slug) return new, changed, gone def obstacle_difference(self, other): new = self.obstacles - other gone = other - self.obstacles return new, gone def update_from_api(self, api): document = api.get_map(self.slug) for observer in document["observers"]: self.observers[observer["slug"]] = observer for obstacle in document["obstacles"]: self.obstacles.add((obstacle["x"], obstacle["y"])) def serialize_observers(self): return self.observers.values() def serialize_obstacles(self): return self.obstacles def serialize_file(self): raise NotImplementedError class LocalMap(Map): def __init__(self, file_path): super().__init__() self.observer_positions = {} self._read_file(file_path) def _read_file(self, file_path): with open(file_path) as source: self.name = source.readline().rstrip() self.slug = slugify(self.name) for i, line in enumerate(source): line = line.rstrip() if i == 0: self.width = len(line) else: if len(line) == self.width: self._parse_line(line, i) else: break self.height = i + 1 for i, line in enumerate(source, start=self.height): if line: try: self._parse_observer(line.rstrip()) except ValueError as e: raise AssertionError(f"{file_path} ({i}): Invalid observer line") from e except KeyError as e: raise AssertionError(f"{file_path} ({i}): Unknown observer key") from e def _parse_observer(self, line): observer_key, name, vision = line.split(",") slug = slugify(name) self.observers[slug] = { "name": name, "slug": slugify(name), "vision": int(vision), } self.observers[slug].update(self.observer_positions[observer_key]) def _parse_line(self, line, row_idx): for col_idx, char in enumerate(line): if char == "#": self.obstacles.add((col_idx, row_idx)) elif char != ".": self.observer_positions[char] = {"x": col_idx, "y": row_idx} class RemoteMap(Map): def __init__(self, document): super().__init__() self._read_map_info(document) def _read_map_info(self, document): for key, value in document.items(): setattr(self, key, value) def read_local_maps(folder): maps = [] for fname in os.listdir(folder): maps.append(LocalMap(os.path.join(folder, fname))) return maps def read_remote_maps(api): maps = [] document = api.get_maps() for map_data in document["maps"]: maps.append(RemoteMap(map_data)) return maps def compare_observers(api, local, remote): new, changed, gone = local.observer_difference(remote.observers) for observer_data in new: print(f"[{local.name}] Creating new observer {observer_data["name"]}") api.create_observer(local.slug, observer_data) for observer_data in changed: print(f"[{local.name}] Updating observer {observer_data["name"]}") api.update_observer(local.slug, observer_data) for slug in gone: print(f"[{local.name}] Deleting observer {slug}") api.delete_observer(local.slug, slug) def compare_obstacles(api, local, remote): new, gone = local.obstacle_difference(remote.obstacles) for x, y in new: print(f"[{local.name}] Creating new obstacle at ({x}, {y})") api.create_obstacle(local.slug, x, y) for x, y in gone: print(f"[{local.name}] Deleting obstacle at ({x}, {y})") api.delete_obstacle(local.slug, x, y) def compare_maps(api, local, remote): for map_obj in local: if map_obj in remote: remote_obj = remote[remote.index(map_obj)] remote_obj.update_from_api(api) if map_obj.attribute_difference(remote_obj): api.update_map(map_obj.slug, map_obj) compare_observers(api, map_obj, remote_obj) compare_obstacles(api, map_obj, remote_obj) else: api.create_map(map_obj.serialize_json()) for observer_data in map_obj.serialize_observers(): api.create_observer(map_obj.slug, observer_data) for x, y in map_obj.serialize_obstacles(): api.create_obstacle(map_obj.slug, x, y) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("folder", help="Path to folder containing map files") parser.add_argument("--host", dest="host", help="API host address") parser.add_argument("--ca", dest="ca", default=None, help="CA certificate file") args = parser.parse_args() with APIDataSource(args.host, args.ca) as api: if not api.check_api_version(BUILT_AGAINST): print( f"Warning: API version has changed. " f"Current: {response["api_version"]} - Built against: {BUILT_AGAINST}" ) print("Review API changes and update the script accordingly") else: local_maps = read_local_maps(args.folder) remote_maps = read_remote_maps(api) compare_maps(api, local_maps, remote_maps)