#!/usr/bin/env python3 # -*- coding: utf-8 -*- import cgi import mimetypes import os from pip._internal.download import PipSession from pip._internal.models.link import Link from pip._vendor.requests.models import CONTENT_CHUNK_SIZE from pip._internal.utils.misc import splitext, consume, format_size from pip._internal.utils.ui import DownloadProgressProvider from pip._vendor import requests import logging logger = logging.getLogger(__name__) def download_http_url( url, # type: str download_dir, # type: str hashes=None, # type: Optional[Hashes] progress_bar='on' # type: str ): logger.debug('download_http_url("%s", "%s")', url, download_dir) link = Link(url) session = PipSession() def _download_url( resp, # type: Response link, # type: Link content_file, # type: IO hashes, # type: Optional[Hashes] progress_bar # type: str ): # type: (...) -> None try: total_length = int(resp.headers['content-length']) except (ValueError, KeyError, TypeError): total_length = 0 cached_resp = getattr(resp, 'from_cache', False) if cached_resp: show_progress = False elif total_length > (40 * 1000): show_progress = True elif not total_length: show_progress = True else: show_progress = False def resp_read(chunk_size): try: # Special case for urllib3. for chunk in resp.raw.stream( chunk_size, decode_content=False): yield chunk except AttributeError: # Standard file-like object. while True: chunk = resp.raw.read(chunk_size) if not chunk: break yield chunk def written_chunks(chunks): for chunk in chunks: content_file.write(chunk) yield chunk def _progress_indicator(iterable, *args, **kwargs): return iterable progress_indicator = _progress_indicator if show_progress: # We don't show progress on cached responses progress_indicator = DownloadProgressProvider(progress_bar, max=total_length) if total_length: print('Downloading {} ({})'.format(url, format_size(total_length))) else: print(f'Downloading {link.url}') elif cached_resp: print(f'Using cached {link.url}') else: print(f'Downloading {link.url}') print(f'Downloading from URL {link}') downloaded_chunks = written_chunks( progress_indicator( resp_read(CONTENT_CHUNK_SIZE), CONTENT_CHUNK_SIZE ) ) if hashes: hashes.check_against_chunks(downloaded_chunks) else: consume(downloaded_chunks) # type: (...) -> Tuple[str, str] """Download link url into temp_dir using provided session""" target_url = link.url.split('#', 1)[0] try: resp = session.get( target_url, headers={"Accept-Encoding": "identity"}, stream=True, ) resp.raise_for_status() except requests.HTTPError as exc: print('HTTP error {} while getting {}'.format(exc.response.status_code, link)) raise content_type = resp.headers.get('content-type', '') filename = link.filename # fallback # Have a look at the Content-Disposition header for a better guess content_disposition = resp.headers.get('content-disposition') if content_disposition: # type: (str, str) -> str def sanitize_content_filename(filename): # type: (str) -> str """ Sanitize the "filename" value from a Content-Disposition header. """ return os.path.basename(filename) """ Parse the "filename" value from a Content-Disposition header, and return the default filename if the result is empty. """ _type, params = cgi.parse_header(content_disposition) filename = params.get('filename') if filename: # We need to sanitize the filename to prevent directory traversal # in case the filename contains ".." path parts. filename = sanitize_content_filename(filename) ext = splitext(filename)[1] # type: Optional[str] if not ext: ext = mimetypes.guess_extension(content_type) if ext: filename += ext if not ext and link.url != resp.url: ext = os.path.splitext(resp.url)[1] if ext: filename += ext file_path = os.path.join(download_dir, filename) with open(file_path, 'wb') as content_file: _download_url(resp, link, content_file, hashes, progress_bar) return file_path, content_type