""" Geocoding service - Convert cell tower / WiFi AP data to lat/lon coordinates, and reverse geocode coordinates to addresses. All services use 高德 (Amap) API exclusively. - Forward geocoding (cell/WiFi → coords): 高德 IoT 定位 v5 API - Reverse geocoding (coords → address): 高德逆地理编码 """ import hashlib import logging import math from collections import OrderedDict from typing import Optional import aiohttp logger = logging.getLogger(__name__) from app.config import settings as _settings AMAP_KEY: Optional[str] = _settings.AMAP_KEY AMAP_SECRET: Optional[str] = _settings.AMAP_SECRET AMAP_HARDWARE_KEY: Optional[str] = _settings.AMAP_HARDWARE_KEY AMAP_HARDWARE_SECRET: Optional[str] = _settings.AMAP_HARDWARE_SECRET _CACHE_MAX_SIZE = _settings.GEOCODING_CACHE_SIZE # --------------------------------------------------------------------------- # WGS-84 → GCJ-02 coordinate conversion (server-side) # --------------------------------------------------------------------------- _A = 6378245.0 _EE = 0.00669342162296594 def _out_of_china(lat: float, lon: float) -> bool: return not (73.66 < lon < 135.05 and 3.86 < lat < 53.55) def _transform_lat(x: float, y: float) -> float: ret = -100.0 + 2.0 * x + 3.0 * y + 0.2 * y * y + 0.1 * x * y + 0.2 * math.sqrt(abs(x)) ret += (20.0 * math.sin(6.0 * x * math.pi) + 20.0 * math.sin(2.0 * x * math.pi)) * 2.0 / 3.0 ret += (20.0 * math.sin(y * math.pi) + 40.0 * math.sin(y / 3.0 * math.pi)) * 2.0 / 3.0 ret += (160.0 * math.sin(y / 12.0 * math.pi) + 320.0 * math.sin(y * math.pi / 30.0)) * 2.0 / 3.0 return ret def _transform_lon(x: float, y: float) -> float: ret = 300.0 + x + 2.0 * y + 0.1 * x * x + 0.1 * x * y + 0.1 * math.sqrt(abs(x)) ret += (20.0 * math.sin(6.0 * x * math.pi) + 20.0 * math.sin(2.0 * x * math.pi)) * 2.0 / 3.0 ret += (20.0 * math.sin(x * math.pi) + 40.0 * math.sin(x / 3.0 * math.pi)) * 2.0 / 3.0 ret += (150.0 * math.sin(x / 12.0 * math.pi) + 300.0 * math.sin(x / 30.0 * math.pi)) * 2.0 / 3.0 return ret def wgs84_to_gcj02(lat: float, lon: float) -> tuple[float, float]: """Convert WGS-84 to GCJ-02 (used by 高德).""" if _out_of_china(lat, lon): return (lat, lon) d_lat = _transform_lat(lon - 105.0, lat - 35.0) d_lon = _transform_lon(lon - 105.0, lat - 35.0) rad_lat = lat / 180.0 * math.pi magic = math.sin(rad_lat) magic = 1 - _EE * magic * magic sqrt_magic = math.sqrt(magic) d_lat = (d_lat * 180.0) / ((_A * (1 - _EE)) / (magic * sqrt_magic) * math.pi) d_lon = (d_lon * 180.0) / (_A / sqrt_magic * math.cos(rad_lat) * math.pi) return (lat + d_lat, lon + d_lon) def gcj02_to_wgs84(lat: float, lon: float) -> tuple[float, float]: """Convert GCJ-02 to WGS-84 (reverse of wgs84_to_gcj02).""" if _out_of_china(lat, lon): return (lat, lon) gcj_lat, gcj_lon = wgs84_to_gcj02(lat, lon) return (lat * 2 - gcj_lat, lon * 2 - gcj_lon) # --------------------------------------------------------------------------- # LRU Cache # --------------------------------------------------------------------------- class LRUCache(OrderedDict): """Simple LRU cache based on OrderedDict.""" def __init__(self, maxsize: int = _CACHE_MAX_SIZE): super().__init__() self._maxsize = maxsize def get_cached(self, key): if key in self: self.move_to_end(key) return self[key] return None def put(self, key, value): if key in self: self.move_to_end(key) self[key] = value while len(self) > self._maxsize: self.popitem(last=False) _cell_cache: LRUCache = LRUCache() _wifi_cache: LRUCache = LRUCache() _address_cache: LRUCache = LRUCache() # --------------------------------------------------------------------------- # 高德数字签名 (AMAP_SECRET) # --------------------------------------------------------------------------- def _amap_sign(params: dict) -> str: """Generate 高德 API digital signature (MD5).""" if not AMAP_SECRET: return "" sorted_str = "&".join(f"{k}={params[k]}" for k in sorted(params.keys())) raw = sorted_str + AMAP_SECRET return hashlib.md5(raw.encode()).hexdigest() # =========================================================================== # Forward Geocoding: cell/WiFi → lat/lon # =========================================================================== async def geocode_location( mcc: Optional[int] = None, mnc: Optional[int] = None, lac: Optional[int] = None, cell_id: Optional[int] = None, wifi_list: Optional[list[dict]] = None, neighbor_cells: Optional[list[dict]] = None, imei: Optional[str] = None, location_type: Optional[str] = None, ) -> tuple[Optional[float], Optional[float]]: """ Convert cell tower and/or WiFi AP data to lat/lon. Uses 高德 IoT 定位 v5 API (restapi.amap.com/v5/position/IoT). Falls back to legacy API (apilocate.amap.com/position) if v5 fails. location_type: "lbs"/"wifi" for 2G(GSM), "lbs_4g"/"wifi_4g" for 4G(LTE). """ # Check cache first if wifi_list: wifi_cache_key = tuple(sorted((ap.get("mac", "") for ap in wifi_list))) cached = _wifi_cache.get_cached(wifi_cache_key) if cached is not None: return cached elif mcc is not None and lac is not None and cell_id is not None: cache_key = (mcc, mnc or 0, lac, cell_id) cached = _cell_cache.get_cached(cache_key) if cached is not None: return cached api_key = AMAP_KEY if not api_key: return (None, None) # Determine network type from location_type is_4g = location_type in ("lbs_4g", "wifi_4g", "gps_4g") # Try v5 API first (POST restapi.amap.com/v5/position/IoT) result = await _geocode_amap_v5( mcc, mnc, lac, cell_id, wifi_list, neighbor_cells, imei=imei, api_key=api_key, is_4g=is_4g, ) # Fallback to legacy API if v5 fails and hardware key is available if result[0] is None and AMAP_HARDWARE_KEY: result = await _geocode_amap_legacy( mcc, mnc, lac, cell_id, wifi_list, neighbor_cells, imei=imei, api_key=AMAP_HARDWARE_KEY, ) if result[0] is not None: if wifi_list: _wifi_cache.put(wifi_cache_key, result) elif mcc is not None and lac is not None and cell_id is not None: _cell_cache.put((mcc, mnc or 0, lac, cell_id), result) return result def _build_bts(mcc: Optional[int], mnc: Optional[int], lac: Optional[int], cell_id: Optional[int]) -> str: """Build bts (base station) parameter: mcc,mnc,lac,cellid,signal,cage""" if mcc is not None and lac is not None and cell_id is not None: return f"{mcc},{mnc or 0},{lac},{cell_id},-65,0" return "" def _build_nearbts( neighbor_cells: Optional[list[dict]], mcc: Optional[int], mnc: Optional[int] ) -> list[str]: """Build nearbts (neighbor cell) parts.""" parts = [] if neighbor_cells: for nc in neighbor_cells: nc_lac = nc.get("lac", 0) nc_cid = nc.get("cell_id", 0) nc_signal = -(nc.get("rssi", 0)) if nc.get("rssi") else -80 parts.append(f"{mcc or 460},{mnc or 0},{nc_lac},{nc_cid},{nc_signal},0") return parts def _build_wifi_parts(wifi_list: Optional[list[dict]]) -> list[str]: """Build WiFi MAC parts: mac,signal,ssid,fresh""" parts = [] if wifi_list: for ap in wifi_list: mac = ap.get("mac", "") # v5 API requires colon-separated lowercase MAC if ":" not in mac: # Convert raw hex to colon-separated mac_clean = mac.lower().replace("-", "") if len(mac_clean) == 12: mac = ":".join(mac_clean[i:i+2] for i in range(0, 12, 2)) else: mac = mac.lower() else: mac = mac.lower() signal = -(ap.get("signal", 0)) if ap.get("signal") else -70 ssid = ap.get("ssid", "") parts.append(f"{mac},{signal},{ssid},0") return parts def _select_mmac(wifi_parts: list[str]) -> tuple[str, list[str]]: """Select strongest WiFi AP as mmac (connected WiFi), rest as macs. v5 API requires mmac when accesstype=2. Returns (mmac_str, remaining_macs_parts). """ if not wifi_parts: return ("", []) # Find strongest signal (most negative = weakest, so max of negative values) # Parts format: "mac,signal,ssid,fresh" best_idx = 0 best_signal = -999 for i, part in enumerate(wifi_parts): fields = part.split(",") if len(fields) >= 2: try: sig = int(fields[1]) if sig > best_signal: best_signal = sig best_idx = i except ValueError: pass mmac = wifi_parts[best_idx] remaining = [p for i, p in enumerate(wifi_parts) if i != best_idx] return (mmac, remaining) async def _geocode_amap_v5( mcc: Optional[int], mnc: Optional[int], lac: Optional[int], cell_id: Optional[int], wifi_list: Optional[list[dict]], neighbor_cells: Optional[list[dict]], *, imei: Optional[str] = None, api_key: str, is_4g: bool = False, ) -> tuple[Optional[float], Optional[float]]: """ Use 高德 IoT 定位 v5 API (POST restapi.amap.com/v5/position/IoT). Key differences from legacy: - POST method, key in URL params, data in body - accesstype: 0=未知, 1=移动网络, 2=WiFi - WiFi requires mmac (connected WiFi) + macs (nearby, 2-30) - network: GSM(default)/LTE/WCDMA/NR — critical for 4G accuracy - diu replaces imei - No digital signature needed - show_fields can return address directly """ bts = _build_bts(mcc, mnc, lac, cell_id) nearbts_parts = _build_nearbts(neighbor_cells, mcc, mnc) wifi_parts = _build_wifi_parts(wifi_list) if not bts and not wifi_parts: return (None, None) # Determine accesstype: 2=WiFi (when we have WiFi data), 1=mobile network has_wifi = len(wifi_parts) >= 2 # v5 requires 2+ WiFi APs accesstype = "2" if has_wifi else "1" # Build POST body body: dict[str, str] = { "accesstype": accesstype, "cdma": "0", "network": "LTE" if is_4g else "GSM", "diu": imei or _settings.GEOCODING_DEFAULT_IMEI, "show_fields": "formatted_address", } if bts: body["bts"] = bts if nearbts_parts: body["nearbts"] = "|".join(nearbts_parts) if has_wifi: mmac, remaining_macs = _select_mmac(wifi_parts) body["mmac"] = mmac if remaining_macs: body["macs"] = "|".join(remaining_macs) elif wifi_parts: # Less than 2 WiFi APs: include as macs anyway, use accesstype=1 body["macs"] = "|".join(wifi_parts) url = f"https://restapi.amap.com/v5/position/IoT?key={api_key}" try: async with aiohttp.ClientSession() as session: async with session.post( url, data=body, timeout=aiohttp.ClientTimeout(total=5) ) as resp: if resp.status == 200: data = await resp.json(content_type=None) if data.get("status") == "1": position = data.get("position", {}) location = position.get("location", "") if isinstance(position, dict) else "" if location and "," in location: lon_str, lat_str = location.split(",") gcj_lat = float(lat_str) gcj_lon = float(lon_str) lat, lon = gcj02_to_wgs84(gcj_lat, gcj_lon) radius = position.get("radius", "?") if isinstance(position, dict) else "?" logger.info( "Amap v5 geocode: GCJ-02(%.6f,%.6f) -> WGS-84(%.6f,%.6f) radius=%s", gcj_lat, gcj_lon, lat, lon, radius, ) return (lat, lon) else: infocode = data.get("infocode", "") logger.warning( "Amap v5 geocode error: %s (code=%s)", data.get("info", ""), infocode, ) else: logger.warning("Amap v5 geocode HTTP %d", resp.status) except Exception as e: logger.warning("Amap v5 geocode error: %s", e) return (None, None) async def _geocode_amap_legacy( mcc: Optional[int], mnc: Optional[int], lac: Optional[int], cell_id: Optional[int], wifi_list: Optional[list[dict]], neighbor_cells: Optional[list[dict]], *, imei: Optional[str] = None, api_key: str, ) -> tuple[Optional[float], Optional[float]]: """ Legacy 高德智能硬件定位 API (GET apilocate.amap.com/position). Used as fallback when v5 API fails. """ bts = _build_bts(mcc, mnc, lac, cell_id) nearbts_parts = _build_nearbts(neighbor_cells, mcc, mnc) # Build macs (legacy format without fresh field) macs_parts = [] if wifi_list: for ap in wifi_list: mac = ap.get("mac", "").lower().replace(":", "") signal = -(ap.get("signal", 0)) if ap.get("signal") else -70 ssid = ap.get("ssid", "") macs_parts.append(f"{mac},{signal},{ssid}") if not bts and not macs_parts: return (None, None) params: dict[str, str] = { "accesstype": "0", "imei": imei or _settings.GEOCODING_DEFAULT_IMEI, "key": api_key, } if bts: params["bts"] = bts if nearbts_parts: params["nearbts"] = "|".join(nearbts_parts) if macs_parts: params["macs"] = "|".join(macs_parts) # Only sign if using a key that has its own secret hw_secret = AMAP_HARDWARE_SECRET if hw_secret: sorted_str = "&".join(f"{k}={params[k]}" for k in sorted(params.keys())) sig = hashlib.md5((sorted_str + hw_secret).encode()).hexdigest() params["sig"] = sig url = "https://apilocate.amap.com/position" try: async with aiohttp.ClientSession() as session: async with session.get( url, params=params, timeout=aiohttp.ClientTimeout(total=5) ) as resp: if resp.status == 200: data = await resp.json(content_type=None) if data.get("status") == "1" and data.get("result"): result = data["result"] location = result.get("location", "") if location and "," in location: lon_str, lat_str = location.split(",") gcj_lat = float(lat_str) gcj_lon = float(lon_str) lat, lon = gcj02_to_wgs84(gcj_lat, gcj_lon) logger.info( "Amap legacy geocode: GCJ-02(%.6f,%.6f) -> WGS-84(%.6f,%.6f)", gcj_lat, gcj_lon, lat, lon, ) return (lat, lon) else: infocode = data.get("infocode", "") if infocode == "10012": logger.debug("Amap legacy geocode: insufficient permissions (enterprise cert needed)") else: logger.warning("Amap legacy geocode error: %s (code=%s)", data.get("info", ""), infocode) else: logger.warning("Amap legacy geocode HTTP %d", resp.status) except Exception as e: logger.warning("Amap legacy geocode error: %s", e) return (None, None) # =========================================================================== # Reverse Geocoding: coordinates → address # =========================================================================== async def reverse_geocode( lat: float, lon: float ) -> Optional[str]: """ Convert lat/lon (WGS-84) to a human-readable address. Uses 高德逆地理编码 API exclusively. """ cache_key = (round(lat, 3), round(lon, 3)) cached = _address_cache.get_cached(cache_key) if cached is not None: return cached if AMAP_KEY: result = await _reverse_geocode_amap(lat, lon) if result: _address_cache.put(cache_key, result) return result return None async def _reverse_geocode_amap( lat: float, lon: float ) -> Optional[str]: """ Use 高德逆地理编码 API. API: https://restapi.amap.com/v3/geocode/regeo Input: GCJ-02 coordinates (need to convert from WGS-84). Free tier: 5,000 requests/day (personal), 1,000,000/day (enterprise). """ gcj_lat, gcj_lon = wgs84_to_gcj02(lat, lon) params = { "key": AMAP_KEY, "location": f"{gcj_lon:.6f},{gcj_lat:.6f}", "extensions": "base", "output": "json", } sig = _amap_sign(params) if sig: params["sig"] = sig url = "https://restapi.amap.com/v3/geocode/regeo" try: async with aiohttp.ClientSession() as session: async with session.get( url, params=params, timeout=aiohttp.ClientTimeout(total=5) ) as resp: if resp.status == 200: data = await resp.json(content_type=None) if data.get("status") == "1": regeocode = data.get("regeocode", {}) formatted = regeocode.get("formatted_address", "") if formatted and formatted != "[]": logger.info( "Amap reverse geocode: %.6f,%.6f -> %s", lat, lon, formatted, ) return formatted else: logger.warning( "Amap reverse geocode error: info=%s, infocode=%s", data.get("info", ""), data.get("infocode", ""), ) else: logger.warning("Amap reverse geocode HTTP %d", resp.status) except Exception as e: logger.warning("Amap reverse geocode error: %s", e) return None