# Copyright 2011 Google Inc. All Rights Reserved. # Copyright 2011, Nexenta Systems Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import boto import errno import gzip import hashlib import mimetypes import os import platform import re import subprocess import stat import sys import tempfile import threading import time from boto import config from boto.exception import GSResponseError from boto.exception import ResumableUploadException from boto.gs.resumable_upload_handler import ResumableUploadHandler from boto.s3.keyfile import KeyFile from boto.s3.resumable_download_handler import ResumableDownloadHandler from boto.storage_uri import BucketStorageUri from gslib.command import COMMAND_NAME from gslib.command import COMMAND_NAME_ALIASES from gslib.command import CONFIG_REQUIRED from gslib.command import Command from gslib.command import FILE_URIS_OK from gslib.command import MAX_ARGS from gslib.command import MIN_ARGS from gslib.command import PROVIDER_URIS_OK from gslib.command import SUPPORTED_SUB_ARGS from gslib.command import URIS_START_ARG from gslib.exception import CommandException from gslib.help_provider import HELP_NAME from gslib.help_provider import HELP_NAME_ALIASES from gslib.help_provider import HELP_ONE_LINE_SUMMARY from gslib.help_provider import HELP_TEXT from gslib.help_provider import HELP_TYPE from gslib.help_provider import HelpType from gslib.name_expansion import NameExpansionIterator from gslib.util import ExtractErrorDetail from gslib.util import IS_WINDOWS from gslib.util import MakeHumanReadable from gslib.util import NO_MAX from gslib.util import TWO_MB from gslib.wildcard_iterator import ContainsWildcard _detailed_help_text = (""" SYNOPSIS gsutil cp [OPTION]... src_uri dst_uri - or - gsutil cp [OPTION]... src_uri... dst_uri - or - gsutil cp [OPTION]... -I dst_uri DESCRIPTION The gsutil cp command allows you to copy data between your local file system and the cloud, copy data within the cloud, and copy data between cloud storage providers. For example, to copy all text files from the local directory to a bucket you could do: gsutil cp *.txt gs://my_bucket Similarly, you can download text files from a bucket by doing: gsutil cp gs://my_bucket/*.txt . If you want to copy an entire directory tree you need to use the -R option: gsutil cp -R dir gs://my_bucket If you have a large number of files to upload you might want to use the gsutil -m option, to perform a parallel (multi-threaded/multi-processing) copy: gsutil -m cp -R dir gs://my_bucket You can pass a list of URIs to copy on STDIN instead of as command line arguments by using the -I option. This allows you to use gsutil in a pipeline to copy files and objects as generated by a program, such as: some_program | gsutil -m cp -I gs://my_bucket The contents of STDIN can name files, cloud URIs, and wildcards of files and cloud URIs. HOW NAMES ARE CONSTRUCTED The gsutil cp command strives to name objects in a way consistent with how Linux cp works, which causes names to be constructed in varying ways depending on whether you're performing a recursive directory copy or copying individually named objects; and whether you're copying to an existing or non-existent directory. When performing recursive directory copies, object names are constructed that mirror the source directory structure starting at the point of recursive processing. For example, the command: gsutil cp -R dir1/dir2 gs://my_bucket will create objects named like gs://my_bucket/dir2/a/b/c, assuming dir1/dir2 contains the file a/b/c. In contrast, copying individually named files will result in objects named by the final path component of the source files. For example, the command: gsutil cp dir1/dir2/** gs://my_bucket will create objects named like gs://my_bucket/c. The same rules apply for downloads: recursive copies of buckets and bucket subdirectories produce a mirrored filename structure, while copying individually (or wildcard) named objects produce flatly named files. Note that in the above example the '**' wildcard matches all names anywhere under dir. The wildcard '*' will match names just one level deep. For more details see 'gsutil help wildcards'. There's an additional wrinkle when working with subdirectories: the resulting names depend on whether the destination subdirectory exists. For example, if gs://my_bucket/subdir exists as a subdirectory, the command: gsutil cp -R dir1/dir2 gs://my_bucket/subdir will create objects named like gs://my_bucket/subdir/dir2/a/b/c. In contrast, if gs://my_bucket/subdir does not exist, this same gsutil cp command will create objects named like gs://my_bucket/subdir/a/b/c. COPYING TO/FROM SUBDIRECTORIES; DISTRIBUTING TRANSFERS ACROSS MACHINES You can use gsutil to copy to and from subdirectories by using a command like: gsutil cp -R dir gs://my_bucket/data This will cause dir and all of its files and nested subdirectories to be copied under the specified destination, resulting in objects with names like gs://my_bucket/data/dir/a/b/c. Similarly you can download from bucket subdirectories by using a command like: gsutil cp -R gs://my_bucket/data dir This will cause everything nested under gs://my_bucket/data to be downloaded into dir, resulting in files with names like dir/data/a/b/c. Copying subdirectories is useful if you want to add data to an existing bucket directory structure over time. It's also useful if you want to parallelize uploads and downloads across multiple machines (often reducing overall transfer time compared with simply running gsutil -m cp on one machine). For example, if your bucket contains this structure: gs://my_bucket/data/result_set_01/ gs://my_bucket/data/result_set_02/ ... gs://my_bucket/data/result_set_99/ you could perform concurrent downloads across 3 machines by running these commands on each machine, respectively: gsutil -m cp -R gs://my_bucket/data/result_set_[0-3]* dir gsutil -m cp -R gs://my_bucket/data/result_set_[4-6]* dir gsutil -m cp -R gs://my_bucket/data/result_set_[7-9]* dir Note that dir could be a local directory on each machine, or it could be a directory mounted off of a shared file server; whether the latter performs acceptably may depend on a number of things, so we recommend you experiment and find out what works best for you. COPYING IN THE CLOUD AND METADATA PRESERVATION If both the source and destination URI are cloud URIs from the same provider, gsutil copies data "in the cloud" (i.e., without downloading to and uploading from the machine where you run gsutil). In addition to the performance and cost advantages of doing this, copying in the cloud preserves metadata (like Content-Type and Cache-Control). In contrast, when you download data from the cloud it ends up in a file, which has no associated metadata. Thus, unless you have some way to hold on to or re-create that metadata, downloading to a file will not retain the metadata. Note that by default, the gsutil cp command does not copy the object ACL to the new object, and instead will use the default bucket ACL (see "gsutil help setdefacl"). You can override this behavior with the -p option (see OPTIONS below). gsutil does not preserve metadata when copying objects between providers. RESUMABLE TRANSFERS gsutil automatically uses the Google Cloud Storage resumable upload feature whenever you use the cp command to upload an object that is larger than 2 MB. You do not need to specify any special command line options to make this happen. If your upload is interrupted you can restart the upload by running the same cp command that you ran to start the upload. Similarly, gsutil automatically performs resumable downloads (using HTTP standard Range GET operations) whenever you use the cp command to download an object larger than 2 MB. Resumable uploads and downloads store some state information in a file in ~/.gsutil named by the destination object or file. If you attempt to resume a transfer from a machine with a different directory, the transfer will start over from scratch. See also "gsutil help prod" for details on using resumable transfers in production. STREAMING TRANSFERS Use '-' in place of src_uri or dst_uri to perform a streaming transfer. For example: long_running_computation | gsutil cp - gs://my_bucket/obj Streaming transfers do not support resumable uploads/downloads. (The Google resumable transfer protocol has a way to support streaming transers, but gsutil doesn't currently implement support for this.) CHANGING TEMP DIRECTORIES gsutil writes data to a temporary directory in several cases: - when compressing data to be uploaded (see the -z option) - when decompressing data being downloaded (when the data has Content-Encoding:gzip, e.g., as happens when uploaded using gsutil cp -z) - when running integration tests (using the gsutil test command) In these cases it's possible the temp file location on your system that gsutil selects by default may not have enough space. If you find that gsutil runs out of space during one of these operations (e.g., raising "CommandException: Inadequate temp space available to compress " during a gsutil cp -z operation), you can change where it writes these temp files by setting the TMPDIR environment variable. On Linux and MacOS you can do this either by running gsutil this way: TMPDIR=/some/directory gsutil cp ... or by adding this line to your ~/.bashrc file and then restarting the shell before running gsutil: export TMPDIR=/some/directory On Windows 7 you can change the TMPDIR environment variable from Start -> Computer -> System -> Advanced System Settings -> Environment Variables. You need to reboot after making this change for it to take effect. (Rebooting is not necessary after running the export command on Linux and MacOS.) OPTIONS -a canned_acl Sets named canned_acl when uploaded objects created. See 'gsutil help acls' for further details. -c If an error occurrs, continue to attempt to copy the remaining files. -D Copy in "daisy chain" mode, i.e., copying between two buckets by hooking a download to an upload, via the machine where gsutil is run. By default, data are copied between two buckets "in the cloud", i.e., without needing to copy via the machine where gsutil runs. However, copy-in-the-cloud is not supported when copying between different locations (like US and EU) or between different storage classes (like STANDARD and DURABLE_REDUCED_AVAILABILITY). For these cases, you can use the -D option to copy data between buckets. Note: Daisy chain mode is automatically used when copying between providers (e.g., to copy data from Google Cloud Storage to another provider). -e Exclude symlinks. When specified, symbolic links will not be copied. -n No-clobber. When specified, existing files or objects at the destination will not be overwritten. Any items that are skipped by this option will be reported as being skipped. This option will perform an additional HEAD request to check if an item exists before attempting to upload the data. This will save retransmitting data, but the additional HTTP requests may make small object transfers slower and more expensive. This option can be combined with the -c option to build a script that copies a large number of objects, allowing retries when some failures occur from which gsutil doesn't automatically recover, using a bash script like the following: status=1 while [ $status -ne 0 ] ; do gsutil cp -c -n -R ./dir gs://bucket status=$? done The -c option will cause copying to continue after failures occur, and the -n option will cause objects already copied to be skipped on subsequent iterations. The loop will continue running as long as gsutil exits with a non-zero status (such a status indicates there was at least one failure during the gsutil run). -p Causes ACLs to be preserved when copying in the cloud. Note that this option has performance and cost implications, because it is essentially performing three requests (getacl, cp, setacl). (The performance issue can be mitigated to some degree by using gsutil -m cp to cause parallel copying.) You can avoid the additional performance and cost of using cp -p if you want all objects in the destination bucket to end up with the same ACL by setting a default ACL on that bucket instead of using cp -p. See "help gsutil setdefacl". Note that it's not valid to specify both the -a and -p options together. -q Causes copies to be performed quietly, i.e., without reporting progress indicators of files being copied. Errors are still reported. This option can be useful for running gsutil from a cron job that logs its output to a file, for which the only information desired in the log is failures. -R, -r Causes directories, buckets, and bucket subdirectories to be copied recursively. If you neglect to use this option for an upload, gsutil will copy any files it finds and skip any directories. Similarly, neglecting to specify -R for a download will cause gsutil to copy any objects at the current bucket directory level, and skip any subdirectories. -v Requests that the version-specific URI for each uploaded object be printed. Given this URI you can make future upload requests that are safe in the face of concurrent updates, because Google Cloud Storage will refuse to perform the update if the current object version doesn't match the version-specific URI. See 'gsutil help versioning' for more details. Note: at present this option does not work correctly for objects copied "in the cloud" (e.g., gsutil cp gs://bucket/obj1 gs://bucket/obj2). -z ext1,... Compresses file uploads with the given extensions. If you are uploading a large file with compressible content, such as a .js, .css, or .html file, you can gzip-compress the file during the upload process by specifying the -z option. Compressing data before upload saves on usage charges because you are uploading a smaller amount of data. When you specify the -z option, the data from your files is compressed before it is uploaded, but your actual files are left uncompressed on the local disk. The uploaded objects retain the original content type and name as the original files but are given a Content-Encoding header with the value "gzip" to indicate that the object data stored are compressed on the Google Cloud Storage servers. For example, the following command: gsutil cp -z html -a public-read cattypes.html gs://mycats will do all of the following: - Upload as the object gs://mycats/cattypes.html (cp command) - Set the Content-Type to text/html (based on file extension) - Compress the data in the file cattypes.html (-z option) - Set the Content-Encoding to gzip (-z option) - Set the ACL to public-read (-a option) - If a user tries to view cattypes.html in a browser, the browser will know to uncompress the data based on the Content-Encoding header, and to render it as HTML based on the Content-Type header. """) class CpCommand(Command): """ Implementation of gsutil cp command. Note that CpCommand is run for both gsutil cp and gsutil mv. The latter happens by MvCommand calling CpCommand and passing the hidden (undocumented) -M option. This allows the copy and remove needed for each mv to run together (rather than first running all the cp's and then all the rm's, as we originally had implemented), which in turn avoids the following problem with removing the wrong objects: starting with a bucket containing only the object gs://bucket/obj, say the user does: gsutil mv gs://bucket/* gs://bucket/d.txt If we ran all the cp's and then all the rm's and we didn't expand the wildcard first, the cp command would first copy gs://bucket/obj to gs://bucket/d.txt, and the rm command would then remove that object. In the implementation prior to gsutil release 3.12 we avoided this by building a list of objects to process and then running the copies and then the removes; but building the list up front limits scalability (compared with the current approach of processing the bucket listing iterator on the fly). """ # Set default Content-Type type. DEFAULT_CONTENT_TYPE = 'application/octet-stream' USE_MAGICFILE = boto.config.getbool('GSUtil', 'use_magicfile', False) # Command specification (processed by parent class). command_spec = { # Name of command. COMMAND_NAME : 'cp', # List of command name aliases. COMMAND_NAME_ALIASES : ['copy'], # Min number of args required by this command. MIN_ARGS : 1, # Max number of args required by this command, or NO_MAX. MAX_ARGS : NO_MAX, # Getopt-style string specifying acceptable sub args. # -t is deprecated but leave intact for now to avoid breakage. SUPPORTED_SUB_ARGS : 'a:cDeIMNnpqrRtvz:', # True if file URIs acceptable for this command. FILE_URIS_OK : True, # True if provider-only URIs acceptable for this command. PROVIDER_URIS_OK : False, # Index in args of first URI arg. URIS_START_ARG : 0, # True if must configure gsutil before running command. CONFIG_REQUIRED : True, } help_spec = { # Name of command or auxiliary help info for which this help applies. HELP_NAME : 'cp', # List of help name aliases. HELP_NAME_ALIASES : ['copy'], # Type of help: HELP_TYPE : HelpType.COMMAND_HELP, # One line summary of this help. HELP_ONE_LINE_SUMMARY : 'Copy files and objects', # The full help text. HELP_TEXT : _detailed_help_text, } def _CheckFinalMd5(self, key, file_name): """ Checks that etag from server agrees with md5 computed after the download completes. """ obj_md5 = key.etag.strip('"\'') file_md5 = None if hasattr(key, 'md5') and key.md5: file_md5 = key.md5 else: print 'Computing MD5 from scratch for resumed download' # Open file in binary mode to avoid surprises in Windows. fp = open(file_name, 'rb') try: file_md5 = key.compute_md5(fp)[0] finally: fp.close() if self.debug: print 'Checking file md5 against etag. (%s/%s)' % (file_md5, obj_md5) if file_md5 != obj_md5: # Checksums don't match - remove file and raise exception. os.unlink(file_name) raise CommandException( 'File changed during download: md5 signature doesn\'t match ' 'etag (incorrect downloaded file deleted)') def _CheckForDirFileConflict(self, exp_src_uri, dst_uri): """Checks whether copying exp_src_uri into dst_uri is not possible. This happens if a directory exists in local file system where a file needs to go or vice versa. In that case we print an error message and exits. Example: if the file "./x" exists and you try to do: gsutil cp gs://mybucket/x/y . the request can't succeed because it requires a directory where the file x exists. Note that we don't enforce any corresponding restrictions for buckets, because the flat namespace semantics for buckets doesn't prohibit such cases the way hierarchical file systems do. For example, if a bucket contains an object called gs://bucket/dir and then you run the command: gsutil cp file1 file2 gs://bucket/dir you'll end up with objects gs://bucket/dir, gs://bucket/dir/file1, and gs://bucket/dir/file2. Args: exp_src_uri: Expanded source StorageUri of copy. dst_uri: Destination URI. Raises: CommandException: if errors encountered. """ if dst_uri.is_cloud_uri(): # The problem can only happen for file destination URIs. return dst_path = dst_uri.object_name final_dir = os.path.dirname(dst_path) if os.path.isfile(final_dir): raise CommandException('Cannot retrieve %s because a file exists ' 'where a directory needs to be created (%s).' % (exp_src_uri, final_dir)) if os.path.isdir(dst_path): raise CommandException('Cannot retrieve %s because a directory exists ' '(%s) where the file needs to be created.' % (exp_src_uri, dst_path)) def _InsistDstUriNamesContainer(self, exp_dst_uri, have_existing_dst_container, command_name): """ Raises an exception if URI doesn't name a directory, bucket, or bucket subdir, with special exception for cp -R (see comments below). Args: exp_dst_uri: Wildcard-expanding dst_uri. have_existing_dst_container: bool indicator of whether exp_dst_uri names a container (directory, bucket, or existing bucket subdir). command_name: Name of command making call. May not be the same as self.command_name in the case of commands implemented atop other commands (like mv command). Raises: CommandException: if the URI being checked does not name a container. """ if exp_dst_uri.is_file_uri(): ok = exp_dst_uri.names_directory() else: if have_existing_dst_container: ok = True else: # It's ok to specify a non-existing bucket subdir, for example: # gsutil cp -R dir gs://bucket/abc # where gs://bucket/abc isn't an existing subdir. ok = exp_dst_uri.names_object() if not ok: raise CommandException('Destination URI must name a directory, bucket, ' 'or bucket\nsubdirectory for the multiple ' 'source form of the %s command.' % command_name) class _FileCopyCallbackHandler(object): """Outputs progress info for large copy requests.""" def __init__(self, upload): if upload: self.announce_text = 'Uploading' else: self.announce_text = 'Downloading' def call(self, total_bytes_transferred, total_size): sys.stderr.write('%s: %s/%s \r' % ( self.announce_text, MakeHumanReadable(total_bytes_transferred), MakeHumanReadable(total_size))) if total_bytes_transferred == total_size: sys.stderr.write('\n') class _StreamCopyCallbackHandler(object): """Outputs progress info for Stream copy to cloud. Total Size of the stream is not known, so we output only the bytes transferred. """ def call(self, total_bytes_transferred, total_size): sys.stderr.write('Uploading: %s \r' % ( MakeHumanReadable(total_bytes_transferred))) if total_size and total_bytes_transferred == total_size: sys.stderr.write('\n') def _GetTransferHandlers(self, dst_uri, size, upload): """ Selects upload/download and callback handlers. We use a callback handler that shows a simple textual progress indicator if size is above the configurable threshold. We use a resumable transfer handler if size is >= the configurable threshold and resumable transfers are supported by the given provider. boto supports resumable downloads for all providers, but resumable uploads are currently only supported by GS. Args: dst_uri: the destination URI. size: size of file (object) being uploaded (downloaded). upload: bool indication of whether transfer is an upload. """ config = boto.config resumable_threshold = config.getint('GSUtil', 'resumable_threshold', TWO_MB) transfer_handler = None cb = None num_cb = None # Checks whether the destination file is a "special" file, like /dev/null on # Linux platforms or null on Windows platforms, so we can disable resumable # download support since the file size of the destination won't ever be # correct. dst_is_special = False if dst_uri.is_file_uri(): # Check explicitly first because os.stat doesn't work on 'nul' in Windows. if dst_uri.object_name == os.devnull: dst_is_special = True try: mode = os.stat(dst_uri.object_name).st_mode if stat.S_ISCHR(mode): dst_is_special = True except OSError: pass if size >= resumable_threshold and not dst_is_special: if not self.quiet: cb = self._FileCopyCallbackHandler(upload).call num_cb = int(size / TWO_MB) resumable_tracker_dir = config.get( 'GSUtil', 'resumable_tracker_dir', os.path.expanduser('~' + os.sep + '.gsutil')) if not os.path.exists(resumable_tracker_dir): os.makedirs(resumable_tracker_dir) if upload: # Encode the dest bucket and object name into the tracker file name. res_tracker_file_name = ( re.sub('[/\\\\]', '_', 'resumable_upload__%s__%s.url' % (dst_uri.bucket_name, dst_uri.object_name))) else: # Encode the fully-qualified dest file name into the tracker file name. res_tracker_file_name = ( re.sub('[/\\\\]', '_', 'resumable_download__%s.etag' % (os.path.realpath(dst_uri.object_name)))) res_tracker_file_name = _hash_filename(res_tracker_file_name) tracker_file = '%s%s%s' % (resumable_tracker_dir, os.sep, res_tracker_file_name) if upload: if dst_uri.scheme == 'gs': transfer_handler = ResumableUploadHandler(tracker_file) else: transfer_handler = ResumableDownloadHandler(tracker_file) return (cb, num_cb, transfer_handler) def _LogCopyOperation(self, src_uri, dst_uri, headers): """ Logs copy operation being performed, including Content-Type if appropriate. """ if self.quiet: return if 'Content-Type' in headers and dst_uri.is_cloud_uri(): content_type_msg = ' [Content-Type=%s]' % headers['Content-Type'] else: content_type_msg = '' if src_uri.is_stream(): self.THREADED_LOGGER.info('Copying from %s...', content_type_msg) else: self.THREADED_LOGGER.info('Copying %s%s...', src_uri, content_type_msg) # We pass the headers explicitly to this call instead of using self.headers # so we can set different metadata (like Content-Type type) for each object. def _CopyObjToObjInTheCloud(self, src_key, src_uri, dst_uri, headers): """Performs copy-in-the cloud from specified src to dest object. Args: src_key: Source Key. src_uri: Source StorageUri. dst_uri: Destination StorageUri. headers: A copy of the headers dictionary. Returns: (elapsed_time, bytes_transferred, dst_uri) excluding overhead like initial HEAD. Note: At present copy-in-the-cloud doesn't return the generation of the created object, so the returned URI is actually not version-specific (unlike other cp cases). Raises: CommandException: if errors encountered. """ self._SetContentTypeHeader(src_uri, headers) self._LogCopyOperation(src_uri, dst_uri, headers) # Do Object -> object copy within same provider (uses # x--copy-source metadata HTTP header to request copying at the # server). src_bucket = src_uri.get_bucket(False, headers) preserve_acl = False canned_acl = None if self.sub_opts: for o, a in self.sub_opts: if o == '-a': canned_acls = dst_uri.canned_acls() if a not in canned_acls: raise CommandException('Invalid canned ACL "%s".' % a) canned_acl = a headers[dst_uri.get_provider().acl_header] = canned_acl if o == '-p': preserve_acl = True if preserve_acl and canned_acl: raise CommandException( 'Specifying both the -p and -a options together is invalid.') start_time = time.time() # Pass headers in headers param not metadata param, so boto will copy # existing key's metadata and just set the additional headers specified # in the headers param (rather than using the headers to override existing # metadata). In particular this allows us to copy the existing key's # Content-Type and other metadata users need while still being able to # set headers the API needs (like x-goog-project-id). Note that this means # you can't do something like: # gsutil cp -t Content-Type text/html gs://bucket/* gs://bucket2 # to change the Content-Type while copying. try: dst_key = dst_uri.copy_key( src_bucket.name, src_uri.object_name, preserve_acl=False, headers=headers, src_version_id=src_uri.version_id, src_generation=src_uri.generation) except GSResponseError as e: exc_name, error_detail = ExtractErrorDetail(e) if (exc_name == 'GSResponseError' and ('Copy-in-the-cloud disallowed' in error_detail)): raise CommandException('%s.\nNote: you can copy between locations ' 'and between storage classes by using the ' 'gsutil cp -D option.' % error_detail) else: raise end_time = time.time() return (end_time - start_time, src_key.size, dst_uri.clone_replace_key(dst_key)) def _CheckFreeSpace(self, path): """Return path/drive free space (in bytes).""" if platform.system() == 'Windows': from ctypes import c_int, c_uint64, c_wchar_p, windll, POINTER, WINFUNCTYPE, WinError try: GetDiskFreeSpaceEx = WINFUNCTYPE(c_int, c_wchar_p, POINTER(c_uint64), POINTER(c_uint64), POINTER(c_uint64)) GetDiskFreeSpaceEx = GetDiskFreeSpaceEx( ('GetDiskFreeSpaceExW', windll.kernel32), ( (1, 'lpszPathName'), (2, 'lpFreeUserSpace'), (2, 'lpTotalSpace'), (2, 'lpFreeSpace'),)) except AttributeError: GetDiskFreeSpaceEx = WINFUNCTYPE(c_int, c_char_p, POINTER(c_uint64), POINTER(c_uint64), POINTER(c_uint64)) GetDiskFreeSpaceEx = GetDiskFreeSpaceEx( ('GetDiskFreeSpaceExA', windll.kernel32), ( (1, 'lpszPathName'), (2, 'lpFreeUserSpace'), (2, 'lpTotalSpace'), (2, 'lpFreeSpace'),)) def GetDiskFreeSpaceEx_errcheck(result, func, args): if not result: raise WinError() return args[1].value GetDiskFreeSpaceEx.errcheck = GetDiskFreeSpaceEx_errcheck return GetDiskFreeSpaceEx(os.getenv('SystemDrive')) else: (_, f_frsize, _, _, f_bavail, _, _, _, _, _) = os.statvfs(path) return f_frsize * f_bavail def _PerformResumableUploadIfApplies(self, fp, dst_uri, canned_acl, headers): """ Performs resumable upload if supported by provider and file is above threshold, else performs non-resumable upload. Returns (elapsed_time, bytes_transferred, version-specific dst_uri). """ start_time = time.time() # Determine file size different ways for case where fp is actually a wrapper # around a Key vs an actual file. if isinstance(fp, KeyFile): file_size = fp.getkey().size else: file_size = os.path.getsize(fp.name) (cb, num_cb, res_upload_handler) = self._GetTransferHandlers( dst_uri, file_size, True) if dst_uri.scheme == 'gs': # Resumable upload protocol is Google Cloud Storage-specific. dst_uri.set_contents_from_file(fp, headers, policy=canned_acl, cb=cb, num_cb=num_cb, res_upload_handler=res_upload_handler) else: dst_uri.set_contents_from_file(fp, headers, policy=canned_acl, cb=cb, num_cb=num_cb) if res_upload_handler: # ResumableUploadHandler does not update upload_start_point from its # initial value of -1 if transferring the whole file, so clamp at 0 bytes_transferred = file_size - max( res_upload_handler.upload_start_point, 0) else: bytes_transferred = file_size end_time = time.time() return (end_time - start_time, bytes_transferred, dst_uri) def _PerformStreamingUpload(self, fp, dst_uri, headers, canned_acl=None): """ Performs a streaming upload to the cloud. Args: fp: The file whose contents to upload. dst_uri: Destination StorageUri. headers: A copy of the headers dictionary. canned_acl: Optional canned ACL to set on the object. Returns (elapsed_time, bytes_transferred, version-specific dst_uri). """ start_time = time.time() if self.quiet: cb = None else: cb = self._StreamCopyCallbackHandler().call dst_uri.set_contents_from_stream( fp, headers, policy=canned_acl, cb=cb) try: bytes_transferred = fp.tell() except: bytes_transferred = 0 end_time = time.time() return (end_time - start_time, bytes_transferred, dst_uri) def _SetContentTypeHeader(self, src_uri, headers): """ Sets content type header to value specified in '-h Content-Type' option (if specified); else sets using Content-Type detection. """ if 'Content-Type' in headers: # If empty string specified (i.e., -h "Content-Type:") set header to None, # which will inhibit boto from sending the CT header. Otherwise, boto will # pass through the user specified CT header. if not headers['Content-Type']: headers['Content-Type'] = None # else we'll keep the value passed in via -h option (not performing # content type detection). else: # Only do content type recognition is src_uri is a file. Object-to-object # copies with no -h Content-Type specified re-use the content type of the # source object. if src_uri.is_file_uri(): object_name = src_uri.object_name content_type = None # Streams (denoted by '-') are expected to be 'application/octet-stream' # and 'file' would partially consume them. if object_name != '-': if self.USE_MAGICFILE: p = subprocess.Popen(['file', '--mime-type', object_name], stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, error = p.communicate() if p.returncode != 0 or error: raise CommandException( 'Encountered error running "file --mime-type %s" ' '(returncode=%d).\n%s' % (object_name, p.returncode, error)) # Parse output by removing line delimiter and splitting on last ": content_type = output.rstrip().rpartition(': ')[2] else: content_type = mimetypes.guess_type(object_name)[0] if not content_type: content_type = self.DEFAULT_CONTENT_TYPE headers['Content-Type'] = content_type def _UploadFileToObject(self, src_key, src_uri, dst_uri, headers, should_log=True): """Uploads a local file to an object. Args: src_key: Source StorageUri. Must be a file URI. src_uri: Source StorageUri. dst_uri: Destination StorageUri. headers: The headers dictionary. should_log: bool indicator whether we should log this operation. Returns: (elapsed_time, bytes_transferred, version-specific dst_uri), excluding overhead like initial HEAD. Raises: CommandException: if errors encountered. """ gzip_exts = [] canned_acl = None if self.sub_opts: for o, a in self.sub_opts: if o == '-a': canned_acls = dst_uri.canned_acls() if a not in canned_acls: raise CommandException('Invalid canned ACL "%s".' % a) canned_acl = a elif o == '-t': print('Warning: -t is deprecated, and will be removed in the future. ' 'Content type\ndetection is ' 'now performed by default, unless inhibited by specifying ' 'a\nContent-Type header via the -h option.') elif o == '-z': gzip_exts = a.split(',') self._SetContentTypeHeader(src_uri, headers) if should_log: self._LogCopyOperation(src_uri, dst_uri, headers) if 'Content-Language' not in headers: content_language = config.get_value('GSUtil', 'content_language') if content_language: headers['Content-Language'] = content_language fname_parts = src_uri.object_name.split('.') if len(fname_parts) > 1 and fname_parts[-1] in gzip_exts: if self.debug: print 'Compressing %s (to tmp)...' % src_key (gzip_fh, gzip_path) = tempfile.mkstemp() gzip_fp = None try: # Check for temp space. Assume the compressed object is at most 2x # the size of the object (normally should compress to smaller than # the object) if (self._CheckFreeSpace(gzip_path) < 2*int(os.path.getsize(src_key.name))): raise CommandException('Inadequate temp space available to compress ' '%s' % src_key.name) gzip_fp = gzip.open(gzip_path, 'wb') gzip_fp.writelines(src_key.fp) finally: if gzip_fp: gzip_fp.close() os.close(gzip_fh) headers['Content-Encoding'] = 'gzip' gzip_fp = open(gzip_path, 'rb') try: (elapsed_time, bytes_transferred, result_uri) = ( self._PerformResumableUploadIfApplies(gzip_fp, dst_uri, canned_acl, headers)) finally: gzip_fp.close() try: os.unlink(gzip_path) # Windows sometimes complains the temp file is locked when you try to # delete it. except Exception, e: pass elif (src_key.is_stream() and dst_uri.get_provider().supports_chunked_transfer()): (elapsed_time, bytes_transferred, result_uri) = ( self._PerformStreamingUpload(src_key.fp, dst_uri, headers, canned_acl)) else: if src_key.is_stream(): # For Providers that doesn't support chunked Transfers tmp = tempfile.NamedTemporaryFile() file_uri = self.suri_builder.StorageUri('file://%s' % tmp.name) try: file_uri.new_key(False, headers).set_contents_from_file( src_key.fp, headers) src_key = file_uri.get_key() finally: file_uri.close() try: (elapsed_time, bytes_transferred, result_uri) = ( self._PerformResumableUploadIfApplies(src_key.fp, dst_uri, canned_acl, headers)) finally: if src_key.is_stream(): tmp.close() else: src_key.close() return (elapsed_time, bytes_transferred, result_uri) def _DownloadObjectToFile(self, src_key, src_uri, dst_uri, headers, should_log=True): """Downloads an object to a local file. Args: src_key: Source StorageUri. Must be a file URI. src_uri: Source StorageUri. dst_uri: Destination StorageUri. headers: The headers dictionary. should_log: bool indicator whether we should log this operation. Returns: (elapsed_time, bytes_transferred, dst_uri), excluding overhead like initial HEAD. Raises: CommandException: if errors encountered. """ if should_log: self._LogCopyOperation(src_uri, dst_uri, headers) (cb, num_cb, res_download_handler) = self._GetTransferHandlers( dst_uri, src_key.size, False) file_name = dst_uri.object_name dir_name = os.path.dirname(file_name) if dir_name and not os.path.exists(dir_name): # Do dir creation in try block so can ignore case where dir already # exists. This is needed to avoid a race condition when running gsutil # -m cp. try: os.makedirs(dir_name) except OSError, e: if e.errno != errno.EEXIST: raise # For gzipped objects not named *.gz download to a temp file and unzip. if (hasattr(src_key, 'content_encoding') and src_key.content_encoding == 'gzip' and not file_name.endswith('.gz')): # We can't use tempfile.mkstemp() here because we need a predictable # filename for resumable downloads. download_file_name = '%s_.gztmp' % file_name need_to_unzip = True else: download_file_name = file_name need_to_unzip = False fp = None try: if res_download_handler: fp = open(download_file_name, 'ab') else: fp = open(download_file_name, 'wb') start_time = time.time() src_key.get_contents_to_file(fp, headers, cb=cb, num_cb=num_cb, res_download_handler=res_download_handler) # If a custom test method is defined, call it here. For the copy command, # test methods are expected to take one argument: an open file pointer, # and are used to perturb the open file during download to exercise # download error detection. if self.test_method: self.test_method(fp) end_time = time.time() finally: if fp: fp.close() # Discard the md5 if we are resuming a partial download. if res_download_handler and res_download_handler.download_start_point: src_key.md5 = None # Verify downloaded file checksum matched source object's checksum. self._CheckFinalMd5(src_key, download_file_name) if res_download_handler: bytes_transferred = ( src_key.size - res_download_handler.download_start_point) else: bytes_transferred = src_key.size if need_to_unzip: # Log that we're uncompressing if the file is big enough that # decompressing would make it look like the transfer "stalled" at the end. if not self.quiet and bytes_transferred > 10 * 1024 * 1024: self.THREADED_LOGGER.info('Uncompressing downloaded tmp file to %s...', file_name) # Downloaded gzipped file to a filename w/o .gz extension, so unzip. f_in = gzip.open(download_file_name, 'rb') f_out = open(file_name, 'wb') try: while True: data = f_in.read(8192) if not data: break f_out.write(data) finally: f_out.close() f_in.close() os.unlink(download_file_name) return (end_time - start_time, bytes_transferred, dst_uri) def _PerformDownloadToStream(self, src_key, src_uri, str_fp, headers): (cb, num_cb, res_download_handler) = self._GetTransferHandlers( src_uri, src_key.size, False) start_time = time.time() src_key.get_contents_to_file(str_fp, headers, cb=cb, num_cb=num_cb) end_time = time.time() bytes_transferred = src_key.size end_time = time.time() return (end_time - start_time, bytes_transferred) def _CopyFileToFile(self, src_key, src_uri, dst_uri, headers): """Copies a local file to a local file. Args: src_key: Source StorageUri. Must be a file URI. src_uri: Source StorageUri. dst_uri: Destination StorageUri. headers: The headers dictionary. Returns: (elapsed_time, bytes_transferred, dst_uri), excluding overhead like initial HEAD. Raises: CommandException: if errors encountered. """ self._LogCopyOperation(src_uri, dst_uri, headers) dst_key = dst_uri.new_key(False, headers) start_time = time.time() dst_key.set_contents_from_file(src_key.fp, headers) end_time = time.time() return (end_time - start_time, os.path.getsize(src_key.fp.name), dst_uri) def _CopyObjToObjDaisyChainMode(self, src_key, src_uri, dst_uri, headers): """Copies from src_uri to dst_uri in "daisy chain" mode. See -D OPTION documentation about what daisy chain mode is. Args: src_key: Source Key. src_uri: Source StorageUri. dst_uri: Destination StorageUri. headers: A copy of the headers dictionary. Returns: (elapsed_time, bytes_transferred, version-specific dst_uri) excluding overhead like initial HEAD. Raises: CommandException: if errors encountered. """ self._SetContentTypeHeader(src_uri, headers) self._LogCopyOperation(src_uri, dst_uri, headers) canned_acl = None if self.sub_opts: for o, a in self.sub_opts: if o == '-a': canned_acls = dst_uri.canned_acls() if a not in canned_acls: raise CommandException('Invalid canned ACL "%s".' % a) canned_acl = a elif o == '-p': # We don't attempt to preserve ACLs across providers because # GCS and S3 support different ACLs and disjoint principals. raise NotImplementedError('Cross-provider cp -p not supported') return self._PerformResumableUploadIfApplies(KeyFile(src_key), dst_uri, canned_acl, headers) def _PerformCopy(self, src_uri, dst_uri): """Performs copy from src_uri to dst_uri, handling various special cases. Args: src_uri: Source StorageUri. dst_uri: Destination StorageUri. Returns: (elapsed_time, bytes_transferred, version-specific dst_uri) excluding overhead like initial HEAD. Raises: CommandException: if errors encountered. """ # Make a copy of the input headers each time so we can set a different # content type for each object. if self.headers: headers = self.headers.copy() else: headers = {} src_key = src_uri.get_key(False, headers) if not src_key: raise CommandException('"%s" does not exist.' % src_uri) # On Windows, stdin is opened as text mode instead of binary which causes # problems when piping a binary file, so this switches it to binary mode. if IS_WINDOWS and src_uri.is_file_uri() and src_key.is_stream(): import msvcrt msvcrt.setmode(src_key.fp.fileno(), os.O_BINARY) if self.no_clobber: # There are two checks to prevent clobbering: # 1) The first check is to see if the item # already exists at the destination and prevent the upload/download # from happening. This is done by the exists() call. # 2) The second check is only relevant if we are writing to gs. We can # enforce that the server only writes the object if it doesn't exist # by specifying the header below. This check only happens at the # server after the complete file has been uploaded. We specify this # header to prevent a race condition where a destination file may # be created after the first check and before the file is fully # uploaded. # In order to save on unnecessary uploads/downloads we perform both # checks. However, this may come at the cost of additional HTTP calls. if dst_uri.exists(headers): if not self.quiet: self.THREADED_LOGGER.info('Skipping existing item: %s' % dst_uri.uri) return (0, 0, None) if dst_uri.is_cloud_uri() and dst_uri.scheme == 'gs': headers['x-goog-if-generation-match'] = '0' if src_uri.is_cloud_uri() and dst_uri.is_cloud_uri(): if src_uri.scheme == dst_uri.scheme and not self.daisy_chain: return self._CopyObjToObjInTheCloud(src_key, src_uri, dst_uri, headers) else: return self._CopyObjToObjDaisyChainMode(src_key, src_uri, dst_uri, headers) elif src_uri.is_file_uri() and dst_uri.is_cloud_uri(): return self._UploadFileToObject(src_key, src_uri, dst_uri, headers) elif src_uri.is_cloud_uri() and dst_uri.is_file_uri(): return self._DownloadObjectToFile(src_key, src_uri, dst_uri, headers) elif src_uri.is_file_uri() and dst_uri.is_file_uri(): return self._CopyFileToFile(src_key, src_uri, dst_uri, headers) else: raise CommandException('Unexpected src/dest case') def _ExpandDstUri(self, dst_uri_str): """ Expands wildcard if present in dst_uri_str. Args: dst_uri_str: String representation of requested dst_uri. Returns: (exp_dst_uri, have_existing_dst_container) where have_existing_dst_container is a bool indicating whether exp_dst_uri names an existing directory, bucket, or bucket subdirectory. Raises: CommandException: if dst_uri_str matched more than 1 URI. """ dst_uri = self.suri_builder.StorageUri(dst_uri_str) # Handle wildcarded dst_uri case. if ContainsWildcard(dst_uri): blr_expansion = list(self.WildcardIterator(dst_uri)) if len(blr_expansion) != 1: raise CommandException('Destination (%s) must match exactly 1 URI' % dst_uri_str) blr = blr_expansion[0] uri = blr.GetUri() if uri.is_cloud_uri(): return (uri, uri.names_bucket() or blr.HasPrefix() or blr.GetKey().endswith('/')) else: return (uri, uri.names_directory()) # Handle non-wildcarded dst_uri: if dst_uri.is_file_uri(): return (dst_uri, dst_uri.names_directory()) if dst_uri.names_bucket(): return (dst_uri, True) # For object URIs check 3 cases: (a) if the name ends with '/' treat as a # subdir; else, perform a wildcard expansion with dst_uri + "*" and then # find if (b) there's a Prefix matching dst_uri, or (c) name is of form # dir_$folder$ (and in both these cases also treat dir as a subdir). if dst_uri.is_cloud_uri() and dst_uri_str.endswith('/'): return (dst_uri, True) blr_expansion = list(self.WildcardIterator( '%s*' % dst_uri_str.rstrip(dst_uri.delim))) for blr in blr_expansion: if blr.GetRStrippedUriString().endswith('_$folder$'): return (dst_uri, True) if blr.GetRStrippedUriString() == dst_uri_str.rstrip(dst_uri.delim): return (dst_uri, blr.HasPrefix()) return (dst_uri, False) def _ConstructDstUri(self, src_uri, exp_src_uri, src_uri_names_container, src_uri_expands_to_multi, have_multiple_srcs, exp_dst_uri, have_existing_dest_subdir): """ Constructs the destination URI for a given exp_src_uri/exp_dst_uri pair, using context-dependent naming rules that mimic Linux cp and mv behavior. Args: src_uri: src_uri to be copied. exp_src_uri: Single StorageUri from wildcard expansion of src_uri. src_uri_names_container: True if src_uri names a container (including the case of a wildcard-named bucket subdir (like gs://bucket/abc, where gs://bucket/abc/* matched some objects). Note that this is additional semantics tha src_uri.names_container() doesn't understand because the latter only understands StorageUris, not wildcards. src_uri_expands_to_multi: True if src_uri expanded to multiple URIs. have_multiple_srcs: True if this is a multi-source request. This can be true if src_uri wildcard-expanded to multiple URIs or if there were multiple source URIs in the request. exp_dst_uri: the expanded StorageUri requested for the cp destination. Final written path is constructed from this plus a context-dependent variant of src_uri. have_existing_dest_subdir: bool indicator whether dest is an existing subdirectory. Returns: StorageUri to use for copy. Raises: CommandException if destination object name not specified for source and source is a stream. """ if self._ShouldTreatDstUriAsSingleton( have_multiple_srcs, have_existing_dest_subdir, exp_dst_uri): # We're copying one file or object to one file or object. return exp_dst_uri if exp_src_uri.is_stream(): if exp_dst_uri.names_container(): raise CommandException('Destination object name needed when ' 'source is a stream') return exp_dst_uri if not self.recursion_requested and not have_multiple_srcs: # We're copying one file or object to a subdirectory. Append final comp # of exp_src_uri to exp_dst_uri. src_final_comp = exp_src_uri.object_name.rpartition(src_uri.delim)[-1] return self.suri_builder.StorageUri('%s%s%s' % ( exp_dst_uri.uri.rstrip(exp_dst_uri.delim), exp_dst_uri.delim, src_final_comp)) # Else we're copying multiple sources to a directory, bucket, or a bucket # "sub-directory". # Ensure exp_dst_uri ends in delim char if we're doing a multi-src copy or # a copy to a directory. (The check for copying to a directory needs # special-case handling so that the command: # gsutil cp gs://bucket/obj dir # will turn into file://dir/ instead of file://dir -- the latter would cause # the file "dirobj" to be created.) # Note: need to check have_multiple_srcs or src_uri.names_container() # because src_uri could be a bucket containing a single object, named # as gs://bucket. if ((have_multiple_srcs or src_uri.names_container() or os.path.isdir(exp_dst_uri.object_name)) and not exp_dst_uri.uri.endswith(exp_dst_uri.delim)): exp_dst_uri = exp_dst_uri.clone_replace_name( '%s%s' % (exp_dst_uri.object_name, exp_dst_uri.delim) ) # Making naming behavior match how things work with local Linux cp and mv # operations depends on many factors, including whether the destination is a # container, the plurality of the source(s), and whether the mv command is # being used: # 1. For the "mv" command that specifies a non-existent destination subdir, # renaming should occur at the level of the src subdir, vs appending that # subdir beneath the dst subdir like is done for copying. For example: # gsutil rm -R gs://bucket # gsutil cp -R dir1 gs://bucket # gsutil cp -R dir2 gs://bucket/subdir1 # gsutil mv gs://bucket/subdir1 gs://bucket/subdir2 # would (if using cp naming behavior) end up with paths like: # gs://bucket/subdir2/subdir1/dir2/.svn/all-wcprops # whereas mv naming behavior should result in: # gs://bucket/subdir2/dir2/.svn/all-wcprops # 2. Copying from directories, buckets, or bucket subdirs should result in # objects/files mirroring the source directory hierarchy. For example: # gsutil cp dir1/dir2 gs://bucket # should create the object gs://bucket/dir2/file2, assuming dir1/dir2 # contains file2). # To be consistent with Linux cp behavior, there's one more wrinkle when # working with subdirs: The resulting object names depend on whether the # destination subdirectory exists. For example, if gs://bucket/subdir # exists, the command: # gsutil cp -R dir1/dir2 gs://bucket/subdir # should create objects named like gs://bucket/subdir/dir2/a/b/c. In # contrast, if gs://bucket/subdir does not exist, this same command # should create objects named like gs://bucket/subdir/a/b/c. # 3. Copying individual files or objects to dirs, buckets or bucket subdirs # should result in objects/files named by the final source file name # component. Example: # gsutil cp dir1/*.txt gs://bucket # should create the objects gs://bucket/f1.txt and gs://bucket/f2.txt, # assuming dir1 contains f1.txt and f2.txt. if (self.perform_mv and self.recursion_requested and src_uri_expands_to_multi and not have_existing_dest_subdir): # Case 1. Handle naming rules for bucket subdir mv. Here we want to # line up the src_uri against its expansion, to find the base to build # the new name. For example, running the command: # gsutil mv gs://bucket/abcd gs://bucket/xyz # when processing exp_src_uri=gs://bucket/abcd/123 # exp_src_uri_tail should become /123 # Note: mv.py code disallows wildcard specification of source URI. exp_src_uri_tail = exp_src_uri.uri[len(src_uri.uri):] dst_key_name = '%s/%s' % (exp_dst_uri.object_name.rstrip('/'), exp_src_uri_tail.strip('/')) return exp_dst_uri.clone_replace_name(dst_key_name) if src_uri_names_container and not exp_dst_uri.names_file(): # Case 2. Build dst_key_name from subpath of exp_src_uri past # where src_uri ends. For example, for src_uri=gs://bucket/ and # exp_src_uri=gs://bucket/src_subdir/obj, dst_key_name should be # src_subdir/obj. src_uri_path_sans_final_dir = _GetPathBeforeFinalDir(src_uri) dst_key_name = exp_src_uri.uri[ len(src_uri_path_sans_final_dir):].lstrip(src_uri.delim) # Handle case where dst_uri is a non-existent subdir. if not have_existing_dest_subdir: dst_key_name = dst_key_name.partition(src_uri.delim)[-1] # Handle special case where src_uri was a directory named with '.' or # './', so that running a command like: # gsutil cp -r . gs://dest # will produce obj names of the form gs://dest/abc instead of # gs://dest/./abc. if dst_key_name.startswith('.%s' % os.sep): dst_key_name = dst_key_name[2:] else: # Case 3. dst_key_name = exp_src_uri.object_name.rpartition(src_uri.delim)[-1] if (exp_dst_uri.is_file_uri() or self._ShouldTreatDstUriAsBucketSubDir( have_multiple_srcs, exp_dst_uri, have_existing_dest_subdir)): if exp_dst_uri.object_name.endswith(exp_dst_uri.delim): dst_key_name = '%s%s%s' % ( exp_dst_uri.object_name.rstrip(exp_dst_uri.delim), exp_dst_uri.delim, dst_key_name) else: delim = exp_dst_uri.delim if exp_dst_uri.object_name else '' dst_key_name = '%s%s%s' % (exp_dst_uri.object_name, delim, dst_key_name) return exp_dst_uri.clone_replace_name(dst_key_name) def _FixWindowsNaming(self, src_uri, dst_uri): """ Rewrites the destination URI built by _ConstructDstUri() to translate Windows pathnames to cloud pathnames if needed. Args: src_uri: Source URI to be copied. dst_uri: The destination URI built by _ConstructDstUri(). Returns: StorageUri to use for copy. """ if (src_uri.is_file_uri() and src_uri.delim == '\\' and dst_uri.is_cloud_uri()): trans_uri_str = re.sub(r'\\', '/', dst_uri.uri) dst_uri = self.suri_builder.StorageUri(trans_uri_str) return dst_uri # Command entry point. def RunCommand(self): # Inner funcs. def _CopyExceptionHandler(e): """Simple exception handler to allow post-completion status.""" self.THREADED_LOGGER.error(str(e)) self.copy_failure_count += 1 def _CopyFunc(name_expansion_result): """Worker function for performing the actual copy (and rm, for mv).""" if self.perform_mv: cmd_name = 'mv' else: cmd_name = self.command_name src_uri = self.suri_builder.StorageUri( name_expansion_result.GetSrcUriStr()) exp_src_uri = self.suri_builder.StorageUri( name_expansion_result.GetExpandedUriStr()) src_uri_names_container = name_expansion_result.NamesContainer() src_uri_expands_to_multi = name_expansion_result.NamesContainer() have_multiple_srcs = name_expansion_result.IsMultiSrcRequest() have_existing_dest_subdir = ( name_expansion_result.HaveExistingDstContainer()) if src_uri.names_provider(): raise CommandException( 'The %s command does not allow provider-only source URIs (%s)' % (cmd_name, src_uri)) if have_multiple_srcs: self._InsistDstUriNamesContainer(exp_dst_uri, have_existing_dst_container, cmd_name) if self.perform_mv: if name_expansion_result.NamesContainer(): # Use recursion_requested when performing name expansion for the # directory mv case so we can determine if any of the source URIs are # directories (and then use cp -R and rm -R to perform the move, to # match the behavior of Linux mv (which when moving a directory moves # all the contained files). self.recursion_requested = True # Disallow wildcard src URIs when moving directories, as supporting it # would make the name transformation too complex and would also be # dangerous (e.g., someone could accidentally move many objects to the # wrong name, or accidentally overwrite many objects). if ContainsWildcard(src_uri): raise CommandException('The mv command disallows naming source ' 'directories using wildcards') if (exp_dst_uri.is_file_uri() and not os.path.exists(exp_dst_uri.object_name) and have_multiple_srcs): os.makedirs(exp_dst_uri.object_name) dst_uri = self._ConstructDstUri(src_uri, exp_src_uri, src_uri_names_container, src_uri_expands_to_multi, have_multiple_srcs, exp_dst_uri, have_existing_dest_subdir) dst_uri = self._FixWindowsNaming(src_uri, dst_uri) self._CheckForDirFileConflict(exp_src_uri, dst_uri) if self._SrcDstSame(exp_src_uri, dst_uri): raise CommandException('%s: "%s" and "%s" are the same file - ' 'abort.' % (cmd_name, exp_src_uri, dst_uri)) if dst_uri.is_cloud_uri() and dst_uri.is_version_specific: raise CommandException('%s: a version-specific URI\n(%s)\ncannot be ' 'the destination for gsutil cp - abort.' % (cmd_name, dst_uri)) elapsed_time = bytes_transferred = 0 try: (elapsed_time, bytes_transferred, result_uri) = ( self._PerformCopy(exp_src_uri, dst_uri)) except Exception, e: if self._IsNoClobberServerException(e): if not self.quiet: self.THREADED_LOGGER.info('Rejected (noclobber): %s' % dst_uri.uri) elif self.continue_on_error: if not self.quiet: self.THREADED_LOGGER.error('Error copying %s: %s' % (src_uri.uri, str(e))) self.copy_failure_count += 1 else: raise if self.print_ver: # Some cases don't return a version-specific URI (e.g., if destination # is a file). if hasattr(result_uri, 'version_specific_uri'): self.THREADED_LOGGER.info('Created: %s' % result_uri.version_specific_uri) else: self.THREADED_LOGGER.info('Created: %s' % result_uri.uri) # TODO: If we ever use -n (noclobber) with -M (move) (not possible today # since we call copy internally from move and don't specify the -n flag) # we'll need to only remove the source when we have not skipped the # destination. if self.perform_mv: if not self.quiet: self.THREADED_LOGGER.info('Removing %s...', exp_src_uri) exp_src_uri.delete_key(validate=False, headers=self.headers) stats_lock.acquire() self.total_elapsed_time += elapsed_time self.total_bytes_transferred += bytes_transferred stats_lock.release() # Start of RunCommand code. self._ParseArgs() self.total_elapsed_time = self.total_bytes_transferred = 0 if self.args[-1] == '-' or self.args[-1] == 'file://-': self._HandleStreamingDownload() return 0 if self.read_args_from_stdin: if len(self.args) != 1: raise CommandException('Source URIs cannot be specified with -I option') uri_strs = self._StdinIterator() else: if len(self.args) < 2: raise CommandException('Wrong number of arguments for "cp" command.') uri_strs = self.args[0:len(self.args)-1] (exp_dst_uri, have_existing_dst_container) = self._ExpandDstUri( self.args[-1]) name_expansion_iterator = NameExpansionIterator( self.command_name, self.proj_id_handler, self.headers, self.debug, self.bucket_storage_uri_class, uri_strs, self.recursion_requested or self.perform_mv, have_existing_dst_container) # Use a lock to ensure accurate statistics in the face of # multi-threading/multi-processing. stats_lock = threading.Lock() # Tracks if any copies failed. self.copy_failure_count = 0 # Start the clock. start_time = time.time() # Tuple of attributes to share/manage across multiple processes in # parallel (-m) mode. shared_attrs = ('copy_failure_count', 'total_bytes_transferred') # Perform copy requests in parallel (-m) mode, if requested, using # configured number of parallel processes and threads. Otherwise, # perform requests with sequential function calls in current process. self.Apply(_CopyFunc, name_expansion_iterator, _CopyExceptionHandler, shared_attrs) if self.debug: print 'total_bytes_transferred:' + str(self.total_bytes_transferred) end_time = time.time() self.total_elapsed_time = end_time - start_time # Sometimes, particularly when running unit tests, the total elapsed time # is really small. On Windows, the timer resolution is too small and # causes total_elapsed_time to be zero. try: float(self.total_bytes_transferred) / float(self.total_elapsed_time) except ZeroDivisionError: self.total_elapsed_time = 0.01 self.total_bytes_per_second = (float(self.total_bytes_transferred) / float(self.total_elapsed_time)) if self.debug == 3: # Note that this only counts the actual GET and PUT bytes for the copy # - not any transfers for doing wildcard expansion, the initial HEAD # request boto performs when doing a bucket.get_key() operation, etc. if self.total_bytes_transferred != 0: self.THREADED_LOGGER.info( 'Total bytes copied=%d, total elapsed time=%5.3f secs (%sps)', self.total_bytes_transferred, self.total_elapsed_time, MakeHumanReadable(self.total_bytes_per_second)) if self.copy_failure_count: plural_str = '' if self.copy_failure_count > 1: plural_str = 's' raise CommandException('%d file%s/object%s could not be transferred.' % ( self.copy_failure_count, plural_str, plural_str)) return 0 def _ParseArgs(self): self.perform_mv = False self.exclude_symlinks = False self.quiet = False self.no_clobber = False self.continue_on_error = False self.daisy_chain = False self.read_args_from_stdin = False self.print_ver = False # self.recursion_requested initialized in command.py (so can be checked # in parent class for all commands). if self.sub_opts: for o, unused_a in self.sub_opts: if o == '-c': self.continue_on_error = True elif o == '-D': self.daisy_chain = True elif o == '-e': self.exclude_symlinks = True elif o == '-I': self.read_args_from_stdin = True elif o == '-M': # Note that we signal to the cp command to perform a move (copy # followed by remove) and use directory-move naming rules by passing # the undocumented (for internal use) -M option when running the cp # command from mv.py. self.perform_mv = True elif o == '-n': self.no_clobber = True elif o == '-q': self.quiet = True elif o == '-r' or o == '-R': self.recursion_requested = True elif o == '-v': self.print_ver = True def _HandleStreamingDownload(self): # Destination is . Manipulate sys.stdout so as to redirect all # debug messages to . stdout_fp = sys.stdout sys.stdout = sys.stderr did_some_work = False for uri_str in self.args[0:len(self.args)-1]: for uri in self.WildcardIterator(uri_str).IterUris(): did_some_work = True key = uri.get_key(False, self.headers) (elapsed_time, bytes_transferred) = self._PerformDownloadToStream( key, uri, stdout_fp, self.headers) self.total_elapsed_time += elapsed_time self.total_bytes_transferred += bytes_transferred if not did_some_work: raise CommandException('No URIs matched') if self.debug == 3: if self.total_bytes_transferred != 0: self.THREADED_LOGGER.info( 'Total bytes copied=%d, total elapsed time=%5.3f secs (%sps)', self.total_bytes_transferred, self.total_elapsed_time, MakeHumanReadable(float(self.total_bytes_transferred) / float(self.total_elapsed_time))) def _StdinIterator(self): """A generator function that returns lines from stdin.""" for line in sys.stdin: # Strip CRLF. yield line.rstrip() def _SrcDstSame(self, src_uri, dst_uri): """Checks if src_uri and dst_uri represent the same object or file. We don't handle anything about hard or symbolic links. Args: src_uri: Source StorageUri. dst_uri: Destination StorageUri. Returns: Bool indicator. """ if src_uri.is_file_uri() and dst_uri.is_file_uri(): # Translate a/b/./c to a/b/c, so src=dst comparison below works. new_src_path = os.path.normpath(src_uri.object_name) new_dst_path = os.path.normpath(dst_uri.object_name) return (src_uri.clone_replace_name(new_src_path).uri == dst_uri.clone_replace_name(new_dst_path).uri) else: return (src_uri.uri == dst_uri.uri and src_uri.generation == dst_uri.generation and src_uri.version_id == dst_uri.version_id) def _ShouldTreatDstUriAsBucketSubDir(self, have_multiple_srcs, dst_uri, have_existing_dest_subdir): """ Checks whether dst_uri should be treated as a bucket "sub-directory". The decision about whether something constitutes a bucket "sub-directory" depends on whether there are multiple sources in this request and whether there is an existing bucket subdirectory. For example, when running the command: gsutil cp file gs://bucket/abc if there's no existing gs://bucket/abc bucket subdirectory we should copy file to the object gs://bucket/abc. In contrast, if there's an existing gs://bucket/abc bucket subdirectory we should copy file to gs://bucket/abc/file. And regardless of whether gs://bucket/abc exists, when running the command: gsutil cp file1 file2 gs://bucket/abc we should copy file1 to gs://bucket/abc/file1 (and similarly for file2). Note that we don't disallow naming a bucket "sub-directory" where there's already an object at that URI. For example it's legitimate (albeit confusing) to have an object called gs://bucket/dir and then run the command gsutil cp file1 file2 gs://bucket/dir Doing so will end up with objects gs://bucket/dir, gs://bucket/dir/file1, and gs://bucket/dir/file2. Args: have_multiple_srcs: Bool indicator of whether this is a multi-source operation. dst_uri: StorageUri to check. have_existing_dest_subdir: bool indicator whether dest is an existing subdirectory. Returns: bool indicator. """ return ((have_multiple_srcs and dst_uri.is_cloud_uri()) or (have_existing_dest_subdir)) def _ShouldTreatDstUriAsSingleton(self, have_multiple_srcs, have_existing_dest_subdir, dst_uri): """ Checks that dst_uri names a singleton (file or object) after dir/wildcard expansion. The decision is more nuanced than simply dst_uri.names_singleton()) because of the possibility that an object path might name a bucket sub-directory. Args: have_multiple_srcs: Bool indicator of whether this is a multi-source operation. have_existing_dest_subdir: bool indicator whether dest is an existing subdirectory. dst_uri: StorageUri to check. Returns: bool indicator. """ if have_multiple_srcs: # Only a file meets the criteria in this case. return dst_uri.names_file() return not have_existing_dest_subdir and dst_uri.names_singleton() def _IsNoClobberServerException(self, e): """ Checks to see if the server attempted to clobber a file after we specified in the header that we didn't want the file clobbered. Args: e: The Exception that was generated by a failed copy operation Returns: bool indicator - True indicates that the server did attempt to clobber an existing file. """ return self.no_clobber and ( (isinstance(e, GSResponseError) and e.status==412) or (isinstance(e, ResumableUploadException) and 'code 412' in e.message)) def _GetPathBeforeFinalDir(uri): """ Returns the part of the path before the final directory component for the given URI, handling cases for file system directories, bucket, and bucket subdirectories. Example: for gs://bucket/dir/ we'll return 'gs://bucket', and for file://dir we'll return file:// Args: uri: StorageUri. Returns: String name of above-described path, sans final path separator. """ sep = uri.delim assert not uri.names_file() if uri.names_directory(): past_scheme = uri.uri[len('file://'):] if past_scheme.find(sep) == -1: return 'file://' else: return 'file://%s' % past_scheme.rstrip(sep).rpartition(sep)[0] if uri.names_bucket(): return '%s://' % uri.scheme # Else it names a bucket subdir. return uri.uri.rstrip(sep).rpartition(sep)[0] def _hash_filename(filename): """ Apply a hash function (SHA1) to shorten the passed file name. The spec for the hashed file name is as follows: TRACKER__ where hash is a SHA1 hash on the original file name and trailing is the last 16 chars from the original file name. Max file name lengths vary by operating system so the goal of this function is to ensure the hashed version takes fewer than 100 characters. Args: filename: file name to be hashed. Returns: shorter, hashed version of passed file name """ if not isinstance(filename, unicode): filename = unicode(filename, 'utf8').encode('utf-8') m = hashlib.sha1(filename) return "TRACKER_" + m.hexdigest() + '.' + filename[-16:]