From 8334ec961b802ad7ef8571b776c5fc727206dc9b Mon Sep 17 00:00:00 2001 From: Simon Sawicki Date: Tue, 4 Jul 2023 21:41:04 +0200 Subject: [core] Process header cookies on loading --- youtube_dl/YoutubeDL.py | 182 ++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 154 insertions(+), 28 deletions(-) (limited to 'youtube_dl/YoutubeDL.py') diff --git a/youtube_dl/YoutubeDL.py b/youtube_dl/YoutubeDL.py index 1435754c2..98d080f43 100755 --- a/youtube_dl/YoutubeDL.py +++ b/youtube_dl/YoutubeDL.py @@ -5,6 +5,7 @@ from __future__ import absolute_import, unicode_literals import collections import contextlib +import copy import datetime import errno import fileinput @@ -34,10 +35,12 @@ from string import ascii_letters from .compat import ( compat_basestring, - compat_cookiejar, + compat_collections_chain_map as ChainMap, compat_filter as filter, compat_get_terminal_size, compat_http_client, + compat_http_cookiejar_Cookie, + compat_http_cookies_SimpleCookie, compat_integer_types, compat_kwargs, compat_map as map, @@ -53,6 +56,7 @@ from .compat import ( from .utils import ( age_restricted, args_to_str, + bug_reports_message, ContentTooShortError, date_from_str, DateRange, @@ -97,6 +101,7 @@ from .utils import ( std_headers, str_or_none, subtitles_filename, + traverse_obj, UnavailableVideoError, url_basename, version_tuple, @@ -376,6 +381,9 @@ class YoutubeDL(object): self.params.update(params) self.cache = Cache(self) + self._header_cookies = [] + self._load_cookies_from_headers(self.params.get('http_headers')) + def check_deprecated(param, option, suggestion): if self.params.get(param) is not None: self.report_warning( @@ -870,8 +878,83 @@ class YoutubeDL(object): raise return wrapper + def _remove_cookie_header(self, http_headers): + """Filters out `Cookie` header from an `http_headers` dict + The `Cookie` header is removed to prevent leaks as a result of unscoped cookies. + See: https://github.com/yt-dlp/yt-dlp/security/advisories/GHSA-v8mc-9377-rwjj + + @param http_headers An `http_headers` dict from which any `Cookie` header + should be removed, or None + """ + return dict(filter(lambda pair: pair[0].lower() != 'cookie', (http_headers or {}).items())) + + def _load_cookies(self, data, **kwargs): + """Loads cookies from a `Cookie` header + + This tries to work around the security vulnerability of passing cookies to every domain. + + @param data The Cookie header as a string to load the cookies from + @param autoscope If `False`, scope cookies using Set-Cookie syntax and error for cookie without domains + If `True`, save cookies for later to be stored in the jar with a limited scope + If a URL, save cookies in the jar with the domain of the URL + """ + # autoscope=True (kw-only) + autoscope = kwargs.get('autoscope', True) + + for cookie in compat_http_cookies_SimpleCookie(data).values() if data else []: + if autoscope and any(cookie.values()): + raise ValueError('Invalid syntax in Cookie Header') + + domain = cookie.get('domain') or '' + expiry = cookie.get('expires') + if expiry == '': # 0 is valid so we check for `''` explicitly + expiry = None + prepared_cookie = compat_http_cookiejar_Cookie( + cookie.get('version') or 0, cookie.key, cookie.value, None, False, + domain, True, True, cookie.get('path') or '', bool(cookie.get('path')), + bool(cookie.get('secure')), expiry, False, None, None, {}) + + if domain: + self.cookiejar.set_cookie(prepared_cookie) + elif autoscope is True: + self.report_warning( + 'Passing cookies as a header is a potential security risk; ' + 'they will be scoped to the domain of the downloaded urls. ' + 'Please consider loading cookies from a file or browser instead.', + only_once=True) + self._header_cookies.append(prepared_cookie) + elif autoscope: + self.report_warning( + 'The extractor result contains an unscoped cookie as an HTTP header. ' + 'If you are specifying an input URL, ' + bug_reports_message(), + only_once=True) + self._apply_header_cookies(autoscope, [prepared_cookie]) + else: + self.report_unscoped_cookies() + + def _load_cookies_from_headers(self, headers): + self._load_cookies(traverse_obj(headers, 'cookie', casesense=False)) + + def _apply_header_cookies(self, url, cookies=None): + """This method applies stray header cookies to the provided url + + This loads header cookies and scopes them to the domain provided in `url`. + While this is not ideal, it helps reduce the risk of them being sent to + an unintended destination. + """ + parsed = compat_urllib_parse.urlparse(url) + if not parsed.hostname: + return + + for cookie in map(copy.copy, cookies or self._header_cookies): + cookie.domain = '.' + parsed.hostname + self.cookiejar.set_cookie(cookie) + @__handle_extraction_exceptions def __extract_info(self, url, ie, download, extra_info, process): + # Compat with passing cookies in http headers + self._apply_header_cookies(url) + ie_result = ie.extract(url) if ie_result is None: # Finished already (backwards compatibility; listformats and friends should be moved here) return @@ -897,7 +980,7 @@ class YoutubeDL(object): def process_ie_result(self, ie_result, download=True, extra_info={}): """ - Take the result of the ie(may be modified) and resolve all unresolved + Take the result of the ie (may be modified) and resolve all unresolved references (URLs, playlist items). It will also download the videos if 'download'. @@ -1468,23 +1551,45 @@ class YoutubeDL(object): parsed_selector = _parse_format_selection(iter(TokenIterator(tokens))) return _build_selector_function(parsed_selector) - def _calc_headers(self, info_dict): - res = std_headers.copy() - - add_headers = info_dict.get('http_headers') - if add_headers: - res.update(add_headers) + def _calc_headers(self, info_dict, load_cookies=False): + if load_cookies: # For --load-info-json + # load cookies from http_headers in legacy info.json + self._load_cookies(traverse_obj(info_dict, ('http_headers', 'Cookie'), casesense=False), + autoscope=info_dict['url']) + # load scoped cookies from info.json + self._load_cookies(info_dict.get('cookies'), autoscope=False) - cookies = self._calc_cookies(info_dict) + cookies = self.cookiejar.get_cookies_for_url(info_dict['url']) if cookies: - res['Cookie'] = cookies + # Make a string like name1=val1; attr1=a_val1; ...name2=val2; ... + # By convention a cookie name can't be a well-known attribute name + # so this syntax is unambiguous and can be parsed by (eg) SimpleCookie + encoder = compat_http_cookies_SimpleCookie() + values = [] + attributes = (('Domain', '='), ('Path', '='), ('Secure',), ('Expires', '='), ('Version', '=')) + attributes = tuple([x[0].lower()] + list(x) for x in attributes) + for cookie in cookies: + _, value = encoder.value_encode(cookie.value) + # Py 2 '' --> '', Py 3 '' --> '""' + if value == '': + value = '""' + values.append('='.join((cookie.name, value))) + for attr in attributes: + value = getattr(cookie, attr[0], None) + if value: + values.append('%s%s' % (''.join(attr[1:]), value if len(attr) == 3 else '')) + info_dict['cookies'] = '; '.join(values) + + res = std_headers.copy() + res.update(info_dict.get('http_headers') or {}) + res = self._remove_cookie_header(res) if 'X-Forwarded-For' not in res: x_forwarded_for_ip = info_dict.get('__x_forwarded_for_ip') if x_forwarded_for_ip: res['X-Forwarded-For'] = x_forwarded_for_ip - return res + return res or None def _calc_cookies(self, info_dict): pr = sanitized_Request(info_dict['url']) @@ -1663,10 +1768,13 @@ class YoutubeDL(object): format['protocol'] = determine_protocol(format) # Add HTTP headers, so that external programs can use them from the # json output - full_format_info = info_dict.copy() - full_format_info.update(format) - format['http_headers'] = self._calc_headers(full_format_info) - # Remove private housekeeping stuff + format['http_headers'] = self._calc_headers(ChainMap(format, info_dict), load_cookies=True) + + # Safeguard against old/insecure infojson when using --load-info-json + info_dict['http_headers'] = self._remove_cookie_header( + info_dict.get('http_headers') or {}) or None + + # Remove private housekeeping stuff (copied to http_headers in _calc_headers()) if '__x_forwarded_for_ip' in info_dict: del info_dict['__x_forwarded_for_ip'] @@ -1927,17 +2035,9 @@ class YoutubeDL(object): (sub_lang, error_to_compat_str(err))) continue - if self.params.get('writeinfojson', False): - infofn = replace_extension(filename, 'info.json', info_dict.get('ext')) - if self.params.get('nooverwrites', False) and os.path.exists(encodeFilename(infofn)): - self.to_screen('[info] Video description metadata is already present') - else: - self.to_screen('[info] Writing video description metadata as JSON to: ' + infofn) - try: - write_json_file(self.filter_requested_info(info_dict), infofn) - except (OSError, IOError): - self.report_error('Cannot write metadata to JSON file ' + infofn) - return + self._write_info_json( + 'video description', info_dict, + replace_extension(filename, 'info.json', info_dict.get('ext'))) self._write_thumbnails(info_dict, filename) @@ -1958,7 +2058,11 @@ class YoutubeDL(object): fd.add_progress_hook(ph) if self.params.get('verbose'): self.to_screen('[debug] Invoking downloader on %r' % info.get('url')) - return fd.download(name, info) + + new_info = dict((k, v) for k, v in info.items() if not k.startswith('__p')) + new_info['http_headers'] = self._calc_headers(new_info) + + return fd.download(name, new_info) if info_dict.get('requested_formats') is not None: downloaded = [] @@ -2484,7 +2588,7 @@ class YoutubeDL(object): opts_proxy = self.params.get('proxy') if opts_cookiefile is None: - self.cookiejar = compat_cookiejar.CookieJar() + self.cookiejar = YoutubeDLCookieJar() else: opts_cookiefile = expand_path(opts_cookiefile) self.cookiejar = YoutubeDLCookieJar(opts_cookiefile) @@ -2545,6 +2649,28 @@ class YoutubeDL(object): encoding = preferredencoding() return encoding + def _write_info_json(self, label, info_dict, infofn, overwrite=None): + if not self.params.get('writeinfojson', False): + return False + + def msg(fmt, lbl): + return fmt % (lbl + ' metadata',) + + if overwrite is None: + overwrite = not self.params.get('nooverwrites', False) + + if not overwrite and os.path.exists(encodeFilename(infofn)): + self.to_screen(msg('[info] %s is already present', label.title())) + return 'exists' + else: + self.to_screen(msg('[info] Writing %s as JSON to: ' + infofn, label)) + try: + write_json_file(self.filter_requested_info(info_dict), infofn) + return True + except (OSError, IOError): + self.report_error(msg('Cannot write %s to JSON file ' + infofn, label)) + return + def _write_thumbnails(self, info_dict, filename): if self.params.get('writethumbnail', False): thumbnails = info_dict.get('thumbnails') -- cgit 1.4.1