From c2c080c3490d70cd4d0ec49adc14c933a531191c Mon Sep 17 00:00:00 2001 From: remittor Date: Thu, 7 Nov 2024 09:30:23 +0300 Subject: [PATCH] [install_fw] Check images after flashing and before flashing --- activate_boot.py | 3 +- gateway.py | 75 +++++++++++++++++++++++++++++++++++++++++++---- install_fw.py | 76 ++++++++++++++++++++++++++++-------------------- read_info.py | 42 ++++++++++++++++++++++++++ 4 files changed, 158 insertions(+), 38 deletions(-) diff --git a/activate_boot.py b/activate_boot.py index 49d91cf..ab7f52a 100644 --- a/activate_boot.py +++ b/activate_boot.py @@ -80,8 +80,7 @@ def uboot_boot_change(gw, fw_num): cmd.append("nvram set flag_try_sys2_failed=0") cmd.append("nvram set flag_boot_rootfs={}".format(fw_num)) cmd.append("nvram commit") - gw.run_cmd(cmd) - return True + return gw.run_cmd(';'.join(cmd)) if __name__ == "__main__": diff --git a/gateway.py b/gateway.py index 21925ca..94de6c2 100644 --- a/gateway.py +++ b/gateway.py @@ -79,6 +79,7 @@ class Gateway(): self.login = 'root' # default username def __init__(self, timeout = 4, verbose = 2, detect_device = True, detect_ssh = True, load_cfg = True): + random.seed() self.__init_fields() self.verbose = verbose self.timeout = timeout @@ -798,6 +799,7 @@ class Gateway(): tn.read_until(tn.prompt, timeout = 4 if timeout is None else timeout) if not self.use_ssh: tn.write(b"exit\n") + ret = True return ret def download(self, fn_remote, fn_local, verbose = 1): @@ -826,11 +828,12 @@ class Gateway(): raise RuntimeError('FIXME') return True - def upload(self, fn_local, fn_remote, verbose = 1): - try: - file = open(fn_local, 'rb') - except Exception: - die('File "{}" not found.'.format(fn_local)) + def upload(self, fn_local, fn_remote, md5chk = True, verbose = 1): + if not os.path.exists(fn_local): + die(f'File "{fn_local}" not found.') + if md5chk: + md5_local = self.get_md5_for_local_file(fn_local) + file = open(fn_local, 'rb') if verbose and self.verbose: print('Upload file: "{}" ....'.format(fn_local)) if self.use_ssh: @@ -848,8 +851,70 @@ class Gateway(): else: raise RuntimeError('FIXME') file.close() + if md5chk: + md5_remote = self.get_md5_for_remote_file(fn_remote) + if md5_remote != md5_local: + if md5chk == 2: + die(f'File "{fn_local}" uploaded, but MD5 incorrect!') + #if verbose: + print(f'ERROR: File "{fn_local}" uploaded, but MD5 incorrect!') + return False return True + def get_md5_for_remote_file(self, fn_remote): + fname = os.path.basename(fn_remote) + num = str(random.randint(10000, 1000000)) + md5_local_fn = f"tmp/{fname}.{num}.md5" + md5_remote_fn = f"/tmp/{fname}.{num}.md5" + cmd = f'md5sum "{fn_remote}" &> "{md5_remote_fn}" ' + rc = self.run_cmd(cmd, timeout = 4) + if not rc: + return -5 + os.remove(md5_local_fn) if os.path.exists(md5_local_fn) else None + self.download(md5_remote_fn, md5_local_fn) + if not os.path.exists(md5_local_fn): + return -4 + with open(md5_local_fn, 'r', encoding = 'latin1') as file: + md5 = file.read() + os.remove(md5_local_fn) + if not md5: + return -3 + if md5.startswith('md5sum:'): + return -2 + md5 = md5.split(' ')[0] + md5 = md5.strip() + if len(md5) != 32: + return -1 + return md5.lower() + + def get_md5_for_local_file(self, fn_local, size = None): + hasher = hashlib.md5() + bs = 512*1024 + if size is None: + with open(fn_local, 'rb') as file: + for chunk in iter(lambda: file.read(bs), b''): + hasher.update(chunk) + elif size > 0: + tail_size = 0 + nsize = size + filesize = os.path.getsize(fn_local) + if size > filesize: + tail_size = size - filesize + nsize = filesize + readed = 0 + with open(fn_local, 'rb') as file: + while True: + if readed + bs > nsize: + bs = nsize - readed + chunk = file.read(bs) + hasher.update(chunk) + readed += bs + if readed >= nsize: + break + if tail_size: + hasher.update(b'\0' * tail_size) + return hasher.hexdigest() + #=============================================================================== if __name__ == "__main__": diff --git a/install_fw.py b/install_fw.py index 009f44b..72492df 100644 --- a/install_fw.py +++ b/install_fw.py @@ -1062,11 +1062,11 @@ class XqFlash(): gw.set_timeout(12) if fw_img.cmd: - gw.upload(fw_img.fn_local, fw_img.fn_remote) + gw.upload(fw_img.fn_local, fw_img.fn_remote, md5chk = 2) if kernel.cmd: - gw.upload(kernel.fn_local, kernel.fn_remote) + gw.upload(kernel.fn_local, kernel.fn_remote, md5chk = 2) if rootfs.cmd: - gw.upload(rootfs.fn_local, rootfs.fn_remote) + gw.upload(rootfs.fn_local, rootfs.fn_remote, md5chk = 2) if self.img_write: cmd = [ ] @@ -1076,52 +1076,66 @@ class XqFlash(): cmd.append("nvram set ssh_en=1") cmd.append("nvram set uart_en=1") cmd.append("nvram commit") - gw.run_cmd(cmd, timeout = 8) - - if self.install_fw_num is not None: - print("Run scripts for change NVRAM params...") - if self.img_write: - activate_boot.uboot_boot_change(gw, self.install_fw_num) - if hasattr(kernel, 'partname') and kernel.partname: - print(f'Boot from partition "{kernel.partname}" activated. ({self.install_fw_num})') - else: - print(f'Boot from firmware [{self.install_fw_num}] activated.') + rc = gw.run_cmd(';'.join(cmd), timeout = 8) + if not rc: + die(f'Cannot change nvram parameters!') if fw_img.cmd: - print('Writing firmware image to addr {} ...'.format("0x%08X" % fw_img.addr)) - print(" " + fw_img.cmd) - if self.img_write: - gw.run_cmd(fw_img.cmd, timeout = 60) - + self.flash_data_to_mtd('firmware', fw_img, timeout = 60) + if kernel.cmd: - print('Writing kernel image to addr {} ...'.format("0x%08X" % kernel.addr)) - print(" " + kernel.cmd) - if self.img_write: - gw.run_cmd(kernel.cmd, timeout = 34) + self.flash_data_to_mtd('kernel', kernel, timeout = 34) if rootfs.cmd: - print('Writing rootfs image to addr {} ...'.format("0x%08X" % rootfs.addr)) - print(" " + rootfs.cmd) - if self.img_write: - gw.run_cmd(rootfs.cmd, timeout = 60) + self.flash_data_to_mtd('rootfs', rootfs, timeout = 60) if not self.img_write: die('===== Flash TEST is over =====') + if self.install_fw_num is not None: + print("Run scripts for change NVRAM params...") + activate_boot.uboot_boot_change(gw, self.install_fw_num) + if hasattr(kernel, 'partname') and kernel.partname: + print(f'Boot from partition "{kernel.partname}" activated. [{self.install_fw_num}]') + else: + print(f'Boot from firmware [{self.install_fw_num}] activated.') + nvram = self.dev.get_nvram() + if 'flag_boot_rootfs' not in nvram: + die(f'Parameter "flag_boot_rootfs" not founeded into nvram') + flag_boot_rootfs = int(nvram['flag_boot_rootfs']) + if flag_boot_rootfs != self.install_fw_num: + die(f'Parameter flag_boot_rootfs = {flag_boot_rootfs} , but expected [{self.install_fw_num}]') + print("The firmware has been successfully flashed!") if self.install_method == 100: gw.run_cmd("sync ; umount -a", timeout = 5) print("Please, reboot router!") else: - import ssh2 print('Send command "reboot" via SSH/Telnet ...') - try: - gw.run_cmd("reboot -f") - except ssh2.exceptions.SocketRecvError as e: - pass + gw.run_cmd("reboot -f", die_on_error = False) print("Forced REBOOT activated!") + def flash_data_to_mtd(self, img_name, img: Image, timeout, check = True): + print(f'Writing {img_name} image to addr 0x{img.addr:08X} ...') + print(f" {img.cmd}") + partname = img.partname + size = os.path.getsize(img.fn_local) + size = (size // 4096) * 4096 + md5_orig = self.gw.get_md5_for_local_file(img.fn_local, size) + if not self.img_write: + return True + rc = self.gw.run_cmd(img.cmd, timeout = timeout, die_on_error = True) + if not rc: + print(f' ERROR: cannot flash data to partition "{partname}"') + return False + if check: + md5 = self.dev.get_md5_for_mtd_data(partname, offset = 0, size = size) + if md5 != md5_orig: + die(f'Flashed data corrupted! Partition "{partname}" md5: {md5}') + return False + return True + # ===================================================================== xf = XqFlash() diff --git a/read_info.py b/read_info.py index 09a96b4..ae32248 100644 --- a/read_info.py +++ b/read_info.py @@ -668,6 +668,48 @@ class DevInfo(): print("") return self.ver + def get_md5_for_mtd_data(self, partname, offset = 0, size = None): + if not self.partlist: + return -10 + mtd_num = self.get_part_num(partname) + if mtd_num < 0: + return -9 + mtd_part = self.partlist[mtd_num] + bs = 4096 + if not size: + size = mtd_part['size'] + if size > mtd_part['size']: + return -8 + if size % bs != 0: + return -7 + if offset % bs != 0: + return -6 + skip = f'skip={offset // bs}' if offset else '' + num = str(random.randint(10000, 1000000)) + md5_local_fn = f"tmp/mtd{mtd_num}_{offset}_{size}_{num}.md5" + md5_remote_fn = f"/tmp/mtd{mtd_num}_{offset}_{size}_{num}.md5" + count = size // bs + cmd = f'dd if=/dev/mtd{mtd_num} bs={bs} count={count} {skip} | md5sum > "{md5_remote_fn}" ' + try: + self.gw.run_cmd(cmd) + self.gw.download(md5_remote_fn, md5_local_fn) + except Exception: + return -5 + if not os.path.exists(md5_local_fn): + return -4 + with open(md5_local_fn, 'r', encoding = 'latin1') as file: + md5 = file.read() + os.remove(md5_local_fn) + if not md5: + return -3 + if md5.startswith('md5sum:'): + return -2 + md5 = md5.split(' ')[0] + md5 = md5.strip() + if len(md5) != 32: + return -1 + return md5.lower() + def get_bootloader(self, verbose = None): verbose = verbose if verbose is not None else self.verbose self.bl = Bootloader()