Add new class SysLog into read_info.py

pull/3/head
remittor 3 years ago
parent 6db1a85337
commit 0d5bcefea8

@ -603,19 +603,49 @@ class DevInfo():
print("") print("")
return self.env.fw return self.env.fw
def download_syslog(self, timeout = 4):
self.syslog = [] class SysLog():
if self.gw is not None: gw = None # Gateway()
verbose = 1
timeout = 10
files = []
mtdlist = []
bdata = None # EnvBuffer()
def __init__(self, gw, timeout = 10, verbose = 1, infolevel = 1):
self.gw = gateway.Gateway() if gw is None else gw
self.verbose = verbose
self.timeout = timeout
os.makedirs('outdir', exist_ok = True)
os.makedirs('tmp', exist_ok = True)
if infolevel > 0:
self.update(infolevel)
def update(self, infolevel):
if infolevel >= 1:
self.download_syslog()
if infolevel >= 2:
self.parse_mtdlist()
if infolevel >= 3:
self.parse_bdata()
def download_syslog(self, timeout = None):
timeout = timeout if timeout is not None else self.timeout
self.files = []
if not self.gw:
gw = gateway.Gateway()
gw.web_login()
else:
gw = self.gw gw = self.gw
if gw.status < 1: if gw.status < 1:
gw.detect_device() gw.detect_device()
else: if not gw.stok:
gw = gateway.Gateway(timeout = timeout) gw.web_login()
if gw.status < 1: if gw.status < 1:
die("Xiaomi Mi Wi-Fi device not found (IP: {})".format(gw.ip_addr)) die("Xiaomi Mi Wi-Fi device not found (IP: {})".format(gw.ip_addr))
stok = gw.web_login() if self.verbose > 0:
print("Start generating syslog...") print("Start generating syslog...")
r2 = requests.get(gw.apiurl + "misystem/sys_log") r2 = requests.get(gw.apiurl + "misystem/sys_log", timeout = self.timeout)
if r2.text.find('"code":0') < 0: if r2.text.find('"code":0') < 0:
die("SysLog not generated!") die("SysLog not generated!")
try: try:
@ -623,31 +653,89 @@ class DevInfo():
path = path.group(1).strip() path = path.group(1).strip()
except Exception: except Exception:
die("SysLog not generated! (2)") die("SysLog not generated! (2)")
#fn_local = 'syslog.tar.gz'
url = "http://" + path url = "http://" + path
print('Downloading SysLog from file "{}" ...'.format(url)) if self.verbose > 0:
print('Downloading SysLog from file "{}" ...'.format(url))
zip = b'' zip = b''
with requests.get(url, stream=True) as r3: with requests.get(url, stream=True, timeout = self.timeout) as r3:
r3.raise_for_status() r3.raise_for_status()
for chunk in r3.iter_content(chunk_size=8192): for chunk in r3.iter_content(chunk_size=8192):
zip += chunk zip += chunk
file = io.BytesIO(zip) fn_local = 'outdir/syslog.tar.gz'
tar = tarfile.open(fileobj = file, mode='r:gz') with open(fn_local, "wb") as file:
file.write(zip)
if os.path.exists("syslog_test.tar.gz"): # TEST
fn_local = "syslog_test.tar.gz"
tar = tarfile.open(fn_local, mode='r:gz')
for member in tar.getmembers(): for member in tar.getmembers():
if not member.isfile() or not member.name: if not member.isfile() or not member.name:
continue continue
if member.name.find('usr/log/') >= 0: # skip syslog files if member.name.find('usr/log/') >= 0: # skip raw syslog files
continue continue
file = types.SimpleNamespace() item = types.SimpleNamespace()
file.name = member.name item.name = member.name
file.size = member.size item.size = member.size
file.data = tar.extractfile(member).read() item.data = tar.extractfile(member).read()
self.syslog.append(file) self.files.append(item)
#print('name = "{}", size = {} ({})'.format(file.name, file.size, len(file.data))) if self.verbose >= 2:
#if len(file.data) < 200: print('name = "{}", size = {} ({})'.format(item.name, item.size, len(item.data)))
# print(file.data) if len(item.data) < 200:
tar.close() print(item.data)
return self.syslog tar.close()
return self.files
def get_file_by_name(self, filename, fatal_error = False):
if self.files:
for i, item in enumerate(self.files):
if os.path.basename(item.name) == filename:
return item
if fatal_error:
die('File "{}" not found in syslog!'.format(filename))
return None
def parse_mtdlist(self):
self.mtdlist = []
file = self.get_file_by_name('xiaoqiang.log', fatal_error = True)
txt = file.data.decode('ascii')
x = txt.find("\nMTD table:\n")
if x <= 0:
die('MTD table not found into syslog!')
mtdtbl = re.findall(r'mtd([0-9]+): ([0-9a-fA-F]+) ([0-9a-fA-F]+) "(.*?)"', txt)
if len(mtdtbl) <= 0:
return []
mtdlist = []
if self.verbose:
print("SysLog MTD table:")
for i, mtd in enumerate(mtdtbl):
item = types.SimpleNamespace()
item.id = int(mtd[0])
item.size = int(mtd[1], 16)
item.name = mtd[3]
mtdlist.append(item)
if self.verbose:
print(' %2d > size: 0x%08X name: "%s"' % (item.id, item.size, item.name))
self.mtdlist = mtdlist
return mtdlist
def get_mtd_by_name(self, name):
if self.mtdlist:
name = name.lower()
for i, mtd in enumerate(self.mtdlist):
if mtd.name.lower().endswith(name):
return mtd
return None
def parse_bdata(self):
self.bdata = None
file = self.get_file_by_name('bdata.txt', fatal_error = True)
env = EnvBuffer(file.data.decode('ascii'), '\n')
if self.verbose >= 2:
print('SysLog BData List:')
for i, (k, v) in enumerate(env.var.items()):
v = '' if (v is None) else ('=' + v)
print(" " + k + v)
self.bdata = env
return env
if __name__ == "__main__": if __name__ == "__main__":

Loading…
Cancel
Save