diff --git a/gclient-new-workdir.py b/gclient-new-workdir.py index c9d8bdc91a..1af5f32902 100755 --- a/gclient-new-workdir.py +++ b/gclient-new-workdir.py @@ -138,16 +138,64 @@ def support_copy_on_write(src, dest): return True -def try_btrfs_subvol_snapshot(src, dest): +def btrfs_subvol_snapshot(src, dest): + """Creates a Btrfs snapshot of src at dest. + Fails hard with detailed diagnostics if it fails.""" try: subprocess.check_call(['btrfs', 'subvol', 'snapshot', src, dest], stderr=subprocess.STDOUT) except (subprocess.CalledProcessError, OSError): + print(f"Error: Failed to create Btrfs snapshot of '{src}'.") + + # Diagnostics + readable = os.access(src, os.R_OK) + parent_dest = os.path.dirname(dest) + writable = os.access(parent_dest, os.W_OK) + + if not readable: + print( + f" [✗] Permission denied: Source repository '{src}' is not readable by you." + ) + print( + " Please ensure you have read permissions to the source subvolume." + ) + + if not writable: + print( + f" [✗] Permission denied: Destination parent directory '{parent_dest}' is not writable by you." + ) + print( + " Please ensure you have write permissions to the destination parent directory." + ) + + if readable and writable: + print( + " [?] Permissions appear OK. The failure might be due to other Btrfs restrictions." + ) + print( + " Consider checking if you have reached disk quota or if the filesystem is read-only." + ) + return False assert os.path.exists(dest) return True +def is_btrfs_subvolume(path): + """Returns True if the path is a valid Btrfs subvolume (root).""" + try: + # Check if filesystem is btrfs + fstype = subprocess.check_output( + ['findmnt', '-no', 'FSTYPE', '--target', path], + stderr=subprocess.DEVNULL).decode().strip() + if fstype != 'btrfs': + return False + # In Btrfs, the root of a subvolume always has inode 256. + return os.stat(path).st_ino == 256 + except (subprocess.CalledProcessError, OSError): + return False + + def real_git_dir(repo_path): relative_git_dir = ( subprocess.check_output( @@ -221,12 +269,12 @@ def main(): args.repository = os.path.realpath(args.repository) args.new_workdir = os.path.realpath(args.new_workdir) - used_btrfs_subvol_snapshot = False - if try_btrfs_subvol_snapshot(args.repository, args.new_workdir): + if is_btrfs_subvolume(args.repository): + if not btrfs_subvol_snapshot(args.repository, args.new_workdir): + sys.exit(1) # If btrfs is being used, reflink support is always present, and there's # no benefit to not using it. args.copy_on_write = True - used_btrfs_subvol_snapshot = True else: os.makedirs(args.new_workdir) @@ -315,7 +363,7 @@ def main(): except Exception as e: print(f'Error: {e}') print(f'Cleaning up {args.new_workdir}') - if used_btrfs_subvol_snapshot: + if is_btrfs_subvolume(args.new_workdir): subprocess.check_call( ['btrfs', 'subvol', 'delete', args.new_workdir] ) diff --git a/tests/gclient-new-workdir_test.py b/tests/gclient-new-workdir_test.py new file mode 100644 index 0000000000..72262315ef --- /dev/null +++ b/tests/gclient-new-workdir_test.py @@ -0,0 +1,216 @@ +import importlib.machinery +import importlib.util +import os +import subprocess +import sys +import unittest +from unittest.mock import MagicMock, patch + +# Load the script with hyphens in name as a module +test_dir = os.path.dirname(os.path.abspath(__file__)) +script_path = os.path.abspath( + os.path.join(test_dir, "..", "gclient-new-workdir.py")) + +# Ensure depot_tools root is in sys.path so gclient_utils can be imported +depot_tools_dir = os.path.dirname(script_path) +if depot_tools_dir not in sys.path: + sys.path.insert(0, depot_tools_dir) + +loader = importlib.machinery.SourceFileLoader( + "gclient_new_workdir", + script_path, +) +spec = importlib.util.spec_from_loader(loader.name, loader) +gclient_new_workdir = importlib.util.module_from_spec(spec) +loader.exec_module(gclient_new_workdir) + + +class TestGclientNewWorkdir(unittest.TestCase): + + @patch("subprocess.check_output") + @patch("os.stat") + @patch("os.access") + @patch("subprocess.check_call") + @patch("os.makedirs") + @patch("sys.exit") + def test_abort_on_btrfs_fail(self, mock_exit, mock_makedirs, + mock_check_call, mock_os_access, mock_os_stat, + mock_check_output): + # Setup mocks + mock_check_output.return_value = b'btrfs' + mock_stat_res = MagicMock() + mock_stat_res.st_ino = 256 + mock_os_stat.return_value = mock_stat_res + + def mock_cc(args, **kwargs): + _ = kwargs + if args[2] == 'snapshot': + raise OSError("Failed") + + mock_check_call.side_effect = mock_cc + + # Make mock_exit raise SystemExit to stop execution + mock_exit.side_effect = SystemExit(1) + + # Mock os.access to return True for diagnostics + mock_os_access.return_value = True + + # Mock parse_options + mock_args = MagicMock() + mock_args.repository = "repo" + mock_args.new_workdir = "dest" + + with patch.object(gclient_new_workdir, + "parse_options", + return_value=mock_args): + try: + gclient_new_workdir.main() + except SystemExit: + pass # Expected + + # Assertions + mock_exit.assert_called_with(1) + mock_makedirs.assert_not_called() + + @patch("subprocess.check_output") + @patch("os.stat") + @patch("os.access") + @patch("os.makedirs") + @patch("sys.exit") + @patch.object(gclient_new_workdir, "support_copy_on_write") + def test_fallback_on_non_subvolume(self, mock_support_cow, mock_exit, + mock_makedirs, mock_os_access, + mock_os_stat, mock_check_output): + # Setup mocks + mock_check_output.return_value = b'btrfs' + mock_stat_res = MagicMock() + mock_stat_res.st_ino = 123 # Not subvolume! + mock_os_stat.return_value = mock_stat_res + + # Mock os.access to return True for diagnostics + mock_os_access.return_value = True + + # Mock support_copy_on_write to stop execution after os.makedirs + mock_support_cow.side_effect = SystemExit(0) + + # Mock parse_options + mock_args = MagicMock() + mock_args.repository = "repo" + mock_args.new_workdir = "dest" + + with patch.object(gclient_new_workdir, + "parse_options", + return_value=mock_args): + try: + gclient_new_workdir.main() + except SystemExit: + pass + + # Assertions + mock_exit.assert_not_called() + # It should proceed to os.makedirs with resolved path + mock_makedirs.assert_called_with(os.path.realpath("dest")) + + @patch("subprocess.check_output") + @patch("os.stat") + @patch("os.access") + @patch("subprocess.check_call") + @patch("os.makedirs") + @patch("sys.exit") + def test_diagnostics_repo_not_readable(self, mock_exit, mock_makedirs, + mock_check_call, mock_os_access, + mock_os_stat, mock_check_output): + # Setup mocks + mock_check_output.return_value = b'btrfs' + mock_stat_res = MagicMock() + mock_stat_res.st_ino = 256 + mock_os_stat.return_value = mock_stat_res + + def mock_cc(args, **kwargs): + _ = kwargs + if args[2] == 'snapshot': + raise OSError("Failed") + + mock_check_call.side_effect = mock_cc + + # Make mock_exit raise SystemExit to stop execution + mock_exit.side_effect = SystemExit(1) + + # Mock os.access: False for repo (not readable) + def side_effect(path, mode): + if path == "repo" and mode == os.R_OK: + return False + return True + + mock_os_access.side_effect = side_effect + + # Mock parse_options + mock_args = MagicMock() + mock_args.repository = "repo" + mock_args.new_workdir = "dest" + + with patch.object(gclient_new_workdir, + "parse_options", + return_value=mock_args): + try: + gclient_new_workdir.main() + except SystemExit: + pass + + # Assertions + mock_exit.assert_called_with(1) + mock_makedirs.assert_not_called() + + @patch("subprocess.check_output") + @patch("os.stat") + @patch("os.access") + @patch("subprocess.check_call") + @patch("os.makedirs") + @patch("sys.exit") + def test_diagnostics_dest_not_writable(self, mock_exit, mock_makedirs, + mock_check_call, mock_os_access, + mock_os_stat, mock_check_output): + # Setup mocks + mock_check_output.return_value = b'btrfs' + mock_stat_res = MagicMock() + mock_stat_res.st_ino = 256 + mock_os_stat.return_value = mock_stat_res + + def mock_cc(args, **kwargs): + _ = kwargs + if args[2] == 'snapshot': + raise OSError("Failed") + + mock_check_call.side_effect = mock_cc + + # Make mock_exit raise SystemExit to stop execution + mock_exit.side_effect = SystemExit(1) + + # Mock os.access: False for dest parent (not writable) + def side_effect(path, mode): + if path == "dir" and mode == os.W_OK: + return False + return True + + mock_os_access.side_effect = side_effect + + # Mock parse_options + mock_args = MagicMock() + mock_args.repository = "repo" + mock_args.new_workdir = "dir/dest" + + with patch.object(gclient_new_workdir, + "parse_options", + return_value=mock_args): + try: + gclient_new_workdir.main() + except SystemExit: + pass + + # Assertions + mock_exit.assert_called_with(1) + mock_makedirs.assert_not_called() + + +if __name__ == "__main__": + unittest.main()