385 lines
14 KiB
Python
385 lines
14 KiB
Python
"""
|
|
Geocoding service - Convert cell tower / WiFi AP data to lat/lon coordinates,
|
|
and reverse geocode coordinates to addresses.
|
|
|
|
Uses free APIs:
|
|
- Cell tower: Google Geolocation API (if key available) or unwiredlabs.com
|
|
- WiFi: Same APIs support WiFi AP lookup
|
|
- Reverse geocoding: 天地图 (Tianditu) - free, WGS84 native
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
from collections import OrderedDict
|
|
from typing import Optional
|
|
from urllib.parse import quote
|
|
|
|
import aiohttp
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Google Geolocation API key (set to enable Google geocoding)
|
|
GOOGLE_API_KEY: Optional[str] = None
|
|
|
|
# Unwired Labs API token (free tier: 100 requests/day)
|
|
# Sign up at https://unwiredlabs.com/
|
|
UNWIRED_API_TOKEN: Optional[str] = None
|
|
|
|
# 天地图 API key (free tier: 10000 requests/day)
|
|
# Sign up at https://lbs.tianditu.gov.cn/
|
|
TIANDITU_API_KEY: Optional[str] = os.environ.get("TIANDITU_API_KEY", "439fca3920a6f31584014424f89c3ecc")
|
|
|
|
# Maximum cache entries (LRU eviction)
|
|
_CACHE_MAX_SIZE = 10000
|
|
|
|
|
|
class LRUCache(OrderedDict):
|
|
"""Simple LRU cache based on OrderedDict."""
|
|
|
|
def __init__(self, maxsize: int = _CACHE_MAX_SIZE):
|
|
super().__init__()
|
|
self._maxsize = maxsize
|
|
|
|
def get_cached(self, key):
|
|
if key in self:
|
|
self.move_to_end(key)
|
|
return self[key]
|
|
return None
|
|
|
|
def put(self, key, value):
|
|
if key in self:
|
|
self.move_to_end(key)
|
|
self[key] = value
|
|
while len(self) > self._maxsize:
|
|
self.popitem(last=False)
|
|
|
|
|
|
# Cache cell tower lookups to avoid redundant API calls
|
|
_cell_cache: LRUCache = LRUCache()
|
|
_wifi_cache: LRUCache = LRUCache()
|
|
# Cache reverse geocoding results (coord rounded to ~100m -> address)
|
|
_address_cache: LRUCache = LRUCache()
|
|
|
|
|
|
async def geocode_location(
|
|
mcc: Optional[int] = None,
|
|
mnc: Optional[int] = None,
|
|
lac: Optional[int] = None,
|
|
cell_id: Optional[int] = None,
|
|
wifi_list: Optional[list[dict]] = None,
|
|
neighbor_cells: Optional[list[dict]] = None,
|
|
) -> tuple[Optional[float], Optional[float]]:
|
|
"""
|
|
Convert cell tower and/or WiFi AP data to lat/lon.
|
|
|
|
Parameters
|
|
----------
|
|
mcc : int - Mobile Country Code
|
|
mnc : int - Mobile Network Code
|
|
lac : int - Location Area Code
|
|
cell_id : int - Cell Tower ID
|
|
wifi_list : list[dict] - WiFi APs [{"mac": "AA:BB:CC:DD:EE:FF", "signal": -70}, ...]
|
|
neighbor_cells : list[dict] - Neighbor cells [{"lac": ..., "cell_id": ..., "rssi": ...}, ...]
|
|
|
|
Returns
|
|
-------
|
|
(latitude, longitude) or (None, None)
|
|
"""
|
|
# Check cache first (cell tower)
|
|
if mcc is not None and lac is not None and cell_id is not None:
|
|
cache_key = (mcc, mnc or 0, lac, cell_id)
|
|
cached = _cell_cache.get_cached(cache_key)
|
|
if cached is not None:
|
|
return cached
|
|
|
|
# Try Google Geolocation API first
|
|
if GOOGLE_API_KEY:
|
|
result = await _geocode_google(mcc, mnc, lac, cell_id, wifi_list, neighbor_cells)
|
|
if result[0] is not None:
|
|
if mcc is not None and lac is not None and cell_id is not None:
|
|
_cell_cache.put((mcc, mnc or 0, lac, cell_id), result)
|
|
return result
|
|
|
|
# Try Unwired Labs API
|
|
if UNWIRED_API_TOKEN:
|
|
result = await _geocode_unwired(mcc, mnc, lac, cell_id, wifi_list, neighbor_cells)
|
|
if result[0] is not None:
|
|
if mcc is not None and lac is not None and cell_id is not None:
|
|
_cell_cache.put((mcc, mnc or 0, lac, cell_id), result)
|
|
return result
|
|
|
|
# Fallback: Mylnikov.org (free, no API key required)
|
|
if mcc is not None and lac is not None and cell_id is not None:
|
|
result = await _geocode_mylnikov_cell(mcc, mnc or 0, lac, cell_id)
|
|
if result[0] is not None:
|
|
_cell_cache.put((mcc, mnc or 0, lac, cell_id), result)
|
|
return result
|
|
|
|
# Try WiFi via Mylnikov
|
|
if wifi_list:
|
|
for ap in wifi_list:
|
|
mac = ap.get("mac", "")
|
|
if mac:
|
|
result = await _geocode_mylnikov_wifi(mac)
|
|
if result[0] is not None:
|
|
return result
|
|
|
|
return (None, None)
|
|
|
|
|
|
async def _geocode_google(
|
|
mcc, mnc, lac, cell_id, wifi_list, neighbor_cells
|
|
) -> tuple[Optional[float], Optional[float]]:
|
|
"""Use Google Geolocation API."""
|
|
url = f"https://www.googleapis.com/geolocation/v1/geolocate?key={GOOGLE_API_KEY}"
|
|
body: dict = {}
|
|
|
|
if mcc is not None:
|
|
body["homeMobileCountryCode"] = mcc
|
|
if mnc is not None:
|
|
body["homeMobileNetworkCode"] = mnc
|
|
|
|
# Cell towers
|
|
towers = []
|
|
if lac is not None and cell_id is not None:
|
|
towers.append({
|
|
"cellId": cell_id,
|
|
"locationAreaCode": lac,
|
|
"mobileCountryCode": mcc or 0,
|
|
"mobileNetworkCode": mnc or 0,
|
|
})
|
|
if neighbor_cells:
|
|
for nc in neighbor_cells:
|
|
towers.append({
|
|
"cellId": nc.get("cell_id", 0),
|
|
"locationAreaCode": nc.get("lac", 0),
|
|
"mobileCountryCode": mcc or 0,
|
|
"mobileNetworkCode": mnc or 0,
|
|
"signalStrength": -(nc.get("rssi", 0)),
|
|
})
|
|
if towers:
|
|
body["cellTowers"] = towers
|
|
|
|
# WiFi APs
|
|
if wifi_list:
|
|
aps = []
|
|
for ap in wifi_list:
|
|
aps.append({
|
|
"macAddress": ap.get("mac", ""),
|
|
"signalStrength": -(ap.get("signal", 0)),
|
|
})
|
|
body["wifiAccessPoints"] = aps
|
|
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(url, json=body, timeout=aiohttp.ClientTimeout(total=5)) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
loc = data.get("location", {})
|
|
lat = loc.get("lat")
|
|
lng = loc.get("lng")
|
|
if lat is not None and lng is not None:
|
|
logger.info("Google geocode: lat=%.6f, lon=%.6f", lat, lng)
|
|
return (lat, lng)
|
|
else:
|
|
text = await resp.text()
|
|
logger.warning("Google geocode failed: %d %s", resp.status, text[:200])
|
|
except Exception as e:
|
|
logger.warning("Google geocode error: %s", e)
|
|
|
|
return (None, None)
|
|
|
|
|
|
async def _geocode_unwired(
|
|
mcc, mnc, lac, cell_id, wifi_list, neighbor_cells
|
|
) -> tuple[Optional[float], Optional[float]]:
|
|
"""Use Unwired Labs LocationAPI."""
|
|
url = "https://us1.unwiredlabs.com/v2/process.php"
|
|
body: dict = {"token": UNWIRED_API_TOKEN}
|
|
|
|
# Cell towers
|
|
cells = []
|
|
if mcc is not None and lac is not None and cell_id is not None:
|
|
cells.append({
|
|
"lac": lac,
|
|
"cid": cell_id,
|
|
"mcc": mcc,
|
|
"mnc": mnc or 0,
|
|
})
|
|
if neighbor_cells:
|
|
for nc in neighbor_cells:
|
|
cells.append({
|
|
"lac": nc.get("lac", 0),
|
|
"cid": nc.get("cell_id", 0),
|
|
"mcc": mcc or 0,
|
|
"mnc": mnc or 0,
|
|
"signal": -(nc.get("rssi", 0)),
|
|
})
|
|
if cells:
|
|
body["cells"] = cells
|
|
|
|
# WiFi APs
|
|
if wifi_list:
|
|
aps = []
|
|
for ap in wifi_list:
|
|
aps.append({
|
|
"bssid": ap.get("mac", ""),
|
|
"signal": -(ap.get("signal", 0)),
|
|
})
|
|
body["wifi"] = aps
|
|
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(url, json=body, timeout=aiohttp.ClientTimeout(total=5)) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
if data.get("status") == "ok":
|
|
lat = data.get("lat")
|
|
lon = data.get("lon")
|
|
if lat is not None and lon is not None:
|
|
logger.info("Unwired geocode: lat=%.6f, lon=%.6f", lat, lon)
|
|
return (lat, lon)
|
|
else:
|
|
logger.warning("Unwired geocode: %s", data.get("message", "unknown error"))
|
|
except Exception as e:
|
|
logger.warning("Unwired geocode error: %s", e)
|
|
|
|
return (None, None)
|
|
|
|
|
|
async def _geocode_mylnikov_cell(
|
|
mcc: int, mnc: int, lac: int, cell_id: int
|
|
) -> tuple[Optional[float], Optional[float]]:
|
|
"""Use Mylnikov.org free cell tower geocoding API (no API key required)."""
|
|
url = (
|
|
f"https://api.mylnikov.org/geolocation/cell"
|
|
f"?v=1.1&data=open"
|
|
f"&mcc={mcc}&mnc={mnc}&lac={lac}&cellid={cell_id}"
|
|
)
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json(content_type=None)
|
|
if data.get("result") == 200:
|
|
lat = data.get("data", {}).get("lat")
|
|
lon = data.get("data", {}).get("lon")
|
|
if lat is not None and lon is not None:
|
|
logger.info("Mylnikov cell geocode: lat=%.6f, lon=%.6f", lat, lon)
|
|
return (lat, lon)
|
|
else:
|
|
logger.debug("Mylnikov cell: no result for MCC=%d MNC=%d LAC=%d CellID=%d",
|
|
mcc, mnc, lac, cell_id)
|
|
else:
|
|
logger.warning("Mylnikov cell API HTTP %d", resp.status)
|
|
except Exception as e:
|
|
logger.warning("Mylnikov cell geocode error: %s", e)
|
|
|
|
return (None, None)
|
|
|
|
|
|
async def _geocode_mylnikov_wifi(mac: str) -> tuple[Optional[float], Optional[float]]:
|
|
"""Use Mylnikov.org free WiFi AP geocoding API."""
|
|
# Normalize MAC format (needs colons)
|
|
mac = mac.upper().replace("-", ":")
|
|
url = f"https://api.mylnikov.org/geolocation/wifi?v=1.1&data=open&bssid={mac}"
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json(content_type=None)
|
|
if data.get("result") == 200:
|
|
lat = data.get("data", {}).get("lat")
|
|
lon = data.get("data", {}).get("lon")
|
|
if lat is not None and lon is not None:
|
|
logger.info("Mylnikov WiFi geocode: lat=%.6f, lon=%.6f (MAC=%s)", lat, lon, mac)
|
|
_wifi_cache.put(mac, (lat, lon))
|
|
return (lat, lon)
|
|
else:
|
|
logger.debug("Mylnikov WiFi API HTTP %d for MAC=%s", resp.status, mac)
|
|
except Exception as e:
|
|
logger.warning("Mylnikov WiFi geocode error: %s", e)
|
|
|
|
return (None, None)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Reverse Geocoding: coordinates -> address
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def reverse_geocode(
|
|
lat: float, lon: float
|
|
) -> Optional[str]:
|
|
"""
|
|
Convert lat/lon to a human-readable address.
|
|
|
|
Tries 天地图 (Tianditu) first, which uses WGS84 natively.
|
|
Returns None if no reverse geocoding service is available.
|
|
"""
|
|
# Round to ~100m for cache key to reduce API calls
|
|
cache_key = (round(lat, 3), round(lon, 3))
|
|
cached = _address_cache.get_cached(cache_key)
|
|
if cached is not None:
|
|
return cached
|
|
|
|
if TIANDITU_API_KEY:
|
|
result = await _reverse_geocode_tianditu(lat, lon)
|
|
if result:
|
|
_address_cache.put(cache_key, result)
|
|
return result
|
|
|
|
return None
|
|
|
|
|
|
async def _reverse_geocode_tianditu(
|
|
lat: float, lon: float
|
|
) -> Optional[str]:
|
|
"""
|
|
Use 天地图 (Tianditu) reverse geocoding API.
|
|
|
|
API docs: http://lbs.tianditu.gov.cn/server/geocoding.html
|
|
Coordinate system: WGS84 (same as our GPS data, no conversion needed).
|
|
Free tier: 10,000 requests/day.
|
|
"""
|
|
post_str = json.dumps({"lon": lon, "lat": lat, "ver": 1}, separators=(",", ":"))
|
|
url = (
|
|
f"http://api.tianditu.gov.cn/geocoder"
|
|
f"?postStr={quote(post_str)}&type=geocode&tk={TIANDITU_API_KEY}"
|
|
)
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(
|
|
url, timeout=aiohttp.ClientTimeout(total=5)
|
|
) as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json(content_type=None)
|
|
if data.get("status") == "0":
|
|
result = data.get("result", {})
|
|
# Build address from components
|
|
addr_comp = result.get("addressComponent", {})
|
|
formatted = result.get("formatted_address", "")
|
|
if formatted:
|
|
# Add nearby POI if available
|
|
poi = addr_comp.get("poi", "")
|
|
address = formatted
|
|
if poi and poi not in formatted:
|
|
address = f"{formatted} ({poi})"
|
|
logger.info(
|
|
"Tianditu reverse geocode: %.6f,%.6f -> %s",
|
|
lat, lon, address,
|
|
)
|
|
return address
|
|
else:
|
|
logger.warning(
|
|
"Tianditu reverse geocode error: %s",
|
|
data.get("msg", data),
|
|
)
|
|
else:
|
|
logger.warning("Tianditu reverse geocode HTTP %d", resp.status)
|
|
except Exception as e:
|
|
logger.warning("Tianditu reverse geocode error: %s", e)
|
|
|
|
return None
|