""" Geocoding service - Convert cell tower / WiFi AP data to lat/lon coordinates, and reverse geocode coordinates to addresses. Uses free APIs: - Cell tower: Google Geolocation API (if key available) or unwiredlabs.com - WiFi: Same APIs support WiFi AP lookup - Reverse geocoding: 天地图 (Tianditu) - free, WGS84 native """ import json import logging from collections import OrderedDict from typing import Optional from urllib.parse import quote import aiohttp logger = logging.getLogger(__name__) # Import keys from centralized config (no more hardcoded values here) from app.config import settings as _settings GOOGLE_API_KEY: Optional[str] = _settings.GOOGLE_API_KEY UNWIRED_API_TOKEN: Optional[str] = _settings.UNWIRED_API_TOKEN TIANDITU_API_KEY: Optional[str] = _settings.TIANDITU_API_KEY BAIDU_MAP_AK: Optional[str] = _settings.BAIDU_MAP_AK # Maximum cache entries (LRU eviction) — configurable via settings _CACHE_MAX_SIZE = _settings.GEOCODING_CACHE_SIZE class LRUCache(OrderedDict): """Simple LRU cache based on OrderedDict.""" def __init__(self, maxsize: int = _CACHE_MAX_SIZE): super().__init__() self._maxsize = maxsize def get_cached(self, key): if key in self: self.move_to_end(key) return self[key] return None def put(self, key, value): if key in self: self.move_to_end(key) self[key] = value while len(self) > self._maxsize: self.popitem(last=False) # Cache cell tower lookups to avoid redundant API calls _cell_cache: LRUCache = LRUCache() _wifi_cache: LRUCache = LRUCache() # Cache reverse geocoding results (coord rounded to ~100m -> address) _address_cache: LRUCache = LRUCache() async def geocode_location( mcc: Optional[int] = None, mnc: Optional[int] = None, lac: Optional[int] = None, cell_id: Optional[int] = None, wifi_list: Optional[list[dict]] = None, neighbor_cells: Optional[list[dict]] = None, ) -> tuple[Optional[float], Optional[float]]: """ Convert cell tower and/or WiFi AP data to lat/lon. Parameters ---------- mcc : int - Mobile Country Code mnc : int - Mobile Network Code lac : int - Location Area Code cell_id : int - Cell Tower ID wifi_list : list[dict] - WiFi APs [{"mac": "AA:BB:CC:DD:EE:FF", "signal": -70}, ...] neighbor_cells : list[dict] - Neighbor cells [{"lac": ..., "cell_id": ..., "rssi": ...}, ...] Returns ------- (latitude, longitude) or (None, None) """ # Check cache first (cell tower) if mcc is not None and lac is not None and cell_id is not None: cache_key = (mcc, mnc or 0, lac, cell_id) cached = _cell_cache.get_cached(cache_key) if cached is not None: return cached # Try Google Geolocation API first if GOOGLE_API_KEY: result = await _geocode_google(mcc, mnc, lac, cell_id, wifi_list, neighbor_cells) if result[0] is not None: if mcc is not None and lac is not None and cell_id is not None: _cell_cache.put((mcc, mnc or 0, lac, cell_id), result) return result # Try Unwired Labs API if UNWIRED_API_TOKEN: result = await _geocode_unwired(mcc, mnc, lac, cell_id, wifi_list, neighbor_cells) if result[0] is not None: if mcc is not None and lac is not None and cell_id is not None: _cell_cache.put((mcc, mnc or 0, lac, cell_id), result) return result # Fallback: Mylnikov.org (free, no API key required) if mcc is not None and lac is not None and cell_id is not None: result = await _geocode_mylnikov_cell(mcc, mnc or 0, lac, cell_id) if result[0] is not None: _cell_cache.put((mcc, mnc or 0, lac, cell_id), result) return result # Try WiFi via Mylnikov if wifi_list: for ap in wifi_list: mac = ap.get("mac", "") if mac: result = await _geocode_mylnikov_wifi(mac) if result[0] is not None: return result return (None, None) async def _geocode_google( mcc, mnc, lac, cell_id, wifi_list, neighbor_cells ) -> tuple[Optional[float], Optional[float]]: """Use Google Geolocation API.""" url = f"https://www.googleapis.com/geolocation/v1/geolocate?key={GOOGLE_API_KEY}" body: dict = {} if mcc is not None: body["homeMobileCountryCode"] = mcc if mnc is not None: body["homeMobileNetworkCode"] = mnc # Cell towers towers = [] if lac is not None and cell_id is not None: towers.append({ "cellId": cell_id, "locationAreaCode": lac, "mobileCountryCode": mcc or 0, "mobileNetworkCode": mnc or 0, }) if neighbor_cells: for nc in neighbor_cells: towers.append({ "cellId": nc.get("cell_id", 0), "locationAreaCode": nc.get("lac", 0), "mobileCountryCode": mcc or 0, "mobileNetworkCode": mnc or 0, "signalStrength": -(nc.get("rssi", 0)), }) if towers: body["cellTowers"] = towers # WiFi APs if wifi_list: aps = [] for ap in wifi_list: aps.append({ "macAddress": ap.get("mac", ""), "signalStrength": -(ap.get("signal", 0)), }) body["wifiAccessPoints"] = aps try: async with aiohttp.ClientSession() as session: async with session.post(url, json=body, timeout=aiohttp.ClientTimeout(total=5)) as resp: if resp.status == 200: data = await resp.json() loc = data.get("location", {}) lat = loc.get("lat") lng = loc.get("lng") if lat is not None and lng is not None: logger.info("Google geocode: lat=%.6f, lon=%.6f", lat, lng) return (lat, lng) else: text = await resp.text() logger.warning("Google geocode failed: %d %s", resp.status, text[:200]) except Exception as e: logger.warning("Google geocode error: %s", e) return (None, None) async def _geocode_unwired( mcc, mnc, lac, cell_id, wifi_list, neighbor_cells ) -> tuple[Optional[float], Optional[float]]: """Use Unwired Labs LocationAPI.""" url = "https://us1.unwiredlabs.com/v2/process.php" body: dict = {"token": UNWIRED_API_TOKEN} # Cell towers cells = [] if mcc is not None and lac is not None and cell_id is not None: cells.append({ "lac": lac, "cid": cell_id, "mcc": mcc, "mnc": mnc or 0, }) if neighbor_cells: for nc in neighbor_cells: cells.append({ "lac": nc.get("lac", 0), "cid": nc.get("cell_id", 0), "mcc": mcc or 0, "mnc": mnc or 0, "signal": -(nc.get("rssi", 0)), }) if cells: body["cells"] = cells # WiFi APs if wifi_list: aps = [] for ap in wifi_list: aps.append({ "bssid": ap.get("mac", ""), "signal": -(ap.get("signal", 0)), }) body["wifi"] = aps try: async with aiohttp.ClientSession() as session: async with session.post(url, json=body, timeout=aiohttp.ClientTimeout(total=5)) as resp: if resp.status == 200: data = await resp.json() if data.get("status") == "ok": lat = data.get("lat") lon = data.get("lon") if lat is not None and lon is not None: logger.info("Unwired geocode: lat=%.6f, lon=%.6f", lat, lon) return (lat, lon) else: logger.warning("Unwired geocode: %s", data.get("message", "unknown error")) except Exception as e: logger.warning("Unwired geocode error: %s", e) return (None, None) async def _geocode_mylnikov_cell( mcc: int, mnc: int, lac: int, cell_id: int ) -> tuple[Optional[float], Optional[float]]: """Use Mylnikov.org free cell tower geocoding API (no API key required).""" url = ( f"https://api.mylnikov.org/geolocation/cell" f"?v=1.1&data=open" f"&mcc={mcc}&mnc={mnc}&lac={lac}&cellid={cell_id}" ) try: async with aiohttp.ClientSession() as session: async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp: if resp.status == 200: data = await resp.json(content_type=None) if data.get("result") == 200: lat = data.get("data", {}).get("lat") lon = data.get("data", {}).get("lon") if lat is not None and lon is not None: logger.info("Mylnikov cell geocode: lat=%.6f, lon=%.6f", lat, lon) return (lat, lon) else: logger.debug("Mylnikov cell: no result for MCC=%d MNC=%d LAC=%d CellID=%d", mcc, mnc, lac, cell_id) else: logger.warning("Mylnikov cell API HTTP %d", resp.status) except Exception as e: logger.warning("Mylnikov cell geocode error: %s", e) return (None, None) async def _geocode_mylnikov_wifi(mac: str) -> tuple[Optional[float], Optional[float]]: """Use Mylnikov.org free WiFi AP geocoding API.""" # Normalize MAC format (needs colons) mac = mac.upper().replace("-", ":") url = f"https://api.mylnikov.org/geolocation/wifi?v=1.1&data=open&bssid={mac}" try: async with aiohttp.ClientSession() as session: async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp: if resp.status == 200: data = await resp.json(content_type=None) if data.get("result") == 200: lat = data.get("data", {}).get("lat") lon = data.get("data", {}).get("lon") if lat is not None and lon is not None: logger.info("Mylnikov WiFi geocode: lat=%.6f, lon=%.6f (MAC=%s)", lat, lon, mac) _wifi_cache.put(mac, (lat, lon)) return (lat, lon) else: logger.debug("Mylnikov WiFi API HTTP %d for MAC=%s", resp.status, mac) except Exception as e: logger.warning("Mylnikov WiFi geocode error: %s", e) return (None, None) # --------------------------------------------------------------------------- # Reverse Geocoding: coordinates -> address # --------------------------------------------------------------------------- async def reverse_geocode( lat: float, lon: float ) -> Optional[str]: """ Convert lat/lon to a human-readable address. Priority: Baidu Map > Tianditu (fallback). Both accept WGS84 coordinates natively (Baidu via coordtype=wgs84ll). Returns None if no reverse geocoding service is available. """ # Round to ~100m for cache key to reduce API calls cache_key = (round(lat, 3), round(lon, 3)) cached = _address_cache.get_cached(cache_key) if cached is not None: return cached # Try Baidu Map first (higher quality addresses for China) if BAIDU_MAP_AK: result = await _reverse_geocode_baidu(lat, lon) if result: _address_cache.put(cache_key, result) return result # Fallback to Tianditu if TIANDITU_API_KEY: result = await _reverse_geocode_tianditu(lat, lon) if result: _address_cache.put(cache_key, result) return result return None async def _reverse_geocode_baidu( lat: float, lon: float ) -> Optional[str]: """ Use Baidu Map reverse geocoding API. API docs: https://lbsyun.baidu.com/faq/api?title=webapi/guide/webservice-geocoding Input coordtype: wgs84ll (WGS-84, same as GPS data, no conversion needed). Free tier: 5,000 requests/day (personal developer). """ url = ( f"https://api.map.baidu.com/reverse_geocoding/v3/" f"?ak={BAIDU_MAP_AK}&output=json&coordtype=wgs84ll" f"&location={lat},{lon}" ) try: async with aiohttp.ClientSession() as session: async with session.get( url, timeout=aiohttp.ClientTimeout(total=5) ) as resp: if resp.status == 200: data = await resp.json(content_type=None) if data.get("status") == 0: result = data.get("result", {}) formatted = result.get("formatted_address", "") if formatted: # Add sematic_description for more context sematic = result.get("sematic_description", "") address = formatted if sematic and sematic not in formatted: address = f"{formatted} ({sematic})" logger.info( "Baidu reverse geocode: %.6f,%.6f -> %s", lat, lon, address, ) return address else: logger.warning( "Baidu reverse geocode error: status=%s, msg=%s", data.get("status"), data.get("message", ""), ) else: logger.warning("Baidu reverse geocode HTTP %d", resp.status) except Exception as e: logger.warning("Baidu reverse geocode error: %s", e) return None async def _reverse_geocode_tianditu( lat: float, lon: float ) -> Optional[str]: """ Use 天地图 (Tianditu) reverse geocoding API. API docs: http://lbs.tianditu.gov.cn/server/geocoding.html Coordinate system: WGS84 (same as our GPS data, no conversion needed). Free tier: 10,000 requests/day. """ post_str = json.dumps({"lon": lon, "lat": lat, "ver": 1}, separators=(",", ":")) url = ( f"http://api.tianditu.gov.cn/geocoder" f"?postStr={quote(post_str)}&type=geocode&tk={TIANDITU_API_KEY}" ) try: async with aiohttp.ClientSession() as session: async with session.get( url, timeout=aiohttp.ClientTimeout(total=5) ) as resp: if resp.status == 200: data = await resp.json(content_type=None) if data.get("status") == "0": result = data.get("result", {}) # Build address from components addr_comp = result.get("addressComponent", {}) formatted = result.get("formatted_address", "") if formatted: # Add nearby POI if available poi = addr_comp.get("poi", "") address = formatted if poi and poi not in formatted: address = f"{formatted} ({poi})" logger.info( "Tianditu reverse geocode: %.6f,%.6f -> %s", lat, lon, address, ) return address else: logger.warning( "Tianditu reverse geocode error: %s", data.get("msg", data), ) else: logger.warning("Tianditu reverse geocode HTTP %d", resp.status) except Exception as e: logger.warning("Tianditu reverse geocode error: %s", e) return None