From 8334ec961b802ad7ef8571b776c5fc727206dc9b Mon Sep 17 00:00:00 2001 From: Simon Sawicki Date: Tue, 4 Jul 2023 21:41:04 +0200 Subject: [core] Process header cookies on loading --- test/test_YoutubeDL.py | 185 ++++++++++++++++++++++++++++++++++++++-- test/test_YoutubeDLCookieJar.py | 14 +++ 2 files changed, 194 insertions(+), 5 deletions(-) (limited to 'test') diff --git a/test/test_YoutubeDL.py b/test/test_YoutubeDL.py index 60780b8a7..6cf555827 100644 --- a/test/test_YoutubeDL.py +++ b/test/test_YoutubeDL.py @@ -10,14 +10,30 @@ import unittest sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import copy +import json -from test.helper import FakeYDL, assertRegexpMatches +from test.helper import ( + FakeYDL, + assertRegexpMatches, + try_rm, +) from youtube_dl import YoutubeDL -from youtube_dl.compat import compat_str, compat_urllib_error +from youtube_dl.compat import ( + compat_http_cookiejar_Cookie, + compat_http_cookies_SimpleCookie, + compat_kwargs, + compat_str, + compat_urllib_error, +) + from youtube_dl.extractor import YoutubeIE from youtube_dl.extractor.common import InfoExtractor from youtube_dl.postprocessor.common import PostProcessor -from youtube_dl.utils import ExtractorError, match_filter_func +from youtube_dl.utils import ( + ExtractorError, + match_filter_func, + traverse_obj, +) TEST_URL = 'http://localhost/sample.mp4' @@ -29,11 +45,14 @@ class YDL(FakeYDL): self.msgs = [] def process_info(self, info_dict): - self.downloaded_info_dicts.append(info_dict) + self.downloaded_info_dicts.append(info_dict.copy()) def to_screen(self, msg): self.msgs.append(msg) + def dl(self, *args, **kwargs): + assert False, 'Downloader must not be invoked for test_YoutubeDL' + def _make_result(formats, **kwargs): res = { @@ -42,8 +61,9 @@ def _make_result(formats, **kwargs): 'title': 'testttitle', 'extractor': 'testex', 'extractor_key': 'TestEx', + 'webpage_url': 'http://example.com/watch?v=shenanigans', } - res.update(**kwargs) + res.update(**compat_kwargs(kwargs)) return res @@ -1011,5 +1031,160 @@ class TestYoutubeDL(unittest.TestCase): self.assertEqual(out_info['release_date'], '20210930') +class TestYoutubeDLCookies(unittest.TestCase): + + @staticmethod + def encode_cookie(cookie): + if not isinstance(cookie, dict): + cookie = vars(cookie) + for name, value in cookie.items(): + yield name, compat_str(value) + + @classmethod + def comparable_cookies(cls, cookies): + # Work around cookiejar cookies not being unicode strings + return sorted(map(tuple, map(sorted, map(cls.encode_cookie, cookies)))) + + def assertSameCookies(self, c1, c2, msg=None): + return self.assertEqual( + *map(self.comparable_cookies, (c1, c2)), + msg=msg) + + def assertSameCookieStrings(self, c1, c2, msg=None): + return self.assertSameCookies( + *map(lambda c: compat_http_cookies_SimpleCookie(c).values(), (c1, c2)), + msg=msg) + + def test_header_cookies(self): + + ydl = FakeYDL() + ydl.report_warning = lambda *_, **__: None + + def cookie(name, value, version=None, domain='', path='', secure=False, expires=None): + return compat_http_cookiejar_Cookie( + version or 0, name, value, None, False, + domain, bool(domain), bool(domain), path, bool(path), + secure, expires, False, None, None, rest={}) + + test_url, test_domain = (t % ('yt.dl',) for t in ('https://%s/test', '.%s')) + + def test(encoded_cookies, cookies, headers=False, round_trip=None, error_re=None): + def _test(): + ydl.cookiejar.clear() + ydl._load_cookies(encoded_cookies, autoscope=headers) + if headers: + ydl._apply_header_cookies(test_url) + data = {'url': test_url} + ydl._calc_headers(data) + self.assertSameCookies( + cookies, ydl.cookiejar, + 'Extracted cookiejar.Cookie is not the same') + if not headers: + self.assertSameCookieStrings( + data.get('cookies'), round_trip or encoded_cookies, + msg='Cookie is not the same as round trip') + ydl.__dict__['_YoutubeDL__header_cookies'] = [] + + try: + _test() + except AssertionError: + raise + except Exception as e: + if not error_re: + raise + assertRegexpMatches(self, e.args[0], error_re.join(('.*',) * 2)) + + test('test=value; Domain=' + test_domain, [cookie('test', 'value', domain=test_domain)]) + test('test=value', [cookie('test', 'value')], error_re='Unscoped cookies are not allowed') + test('cookie1=value1; Domain={0}; Path=/test; cookie2=value2; Domain={0}; Path=/'.format(test_domain), [ + cookie('cookie1', 'value1', domain=test_domain, path='/test'), + cookie('cookie2', 'value2', domain=test_domain, path='/')]) + cookie_kw = compat_kwargs( + {'domain': test_domain, 'path': '/test', 'secure': True, 'expires': '9999999999', }) + test('test=value; Domain={domain}; Path={path}; Secure; Expires={expires}'.format(**cookie_kw), [ + cookie('test', 'value', **cookie_kw)]) + test('test="value; "; path=/test; domain=' + test_domain, [ + cookie('test', 'value; ', domain=test_domain, path='/test')], + round_trip='test="value\\073 "; Domain={0}; Path=/test'.format(test_domain)) + test('name=; Domain=' + test_domain, [cookie('name', '', domain=test_domain)], + round_trip='name=""; Domain=' + test_domain) + test('test=value', [cookie('test', 'value', domain=test_domain)], headers=True) + test('cookie1=value; Domain={0}; cookie2=value'.format(test_domain), [], + headers=True, error_re='Invalid syntax') + ydl.report_warning = ydl.report_error + test('test=value', [], headers=True, error_re='Passing cookies as a header is a potential security risk') + + def test_infojson_cookies(self): + TEST_FILE = 'test_infojson_cookies.info.json' + TEST_URL = 'https://example.com/example.mp4' + COOKIES = 'a=b; Domain=.example.com; c=d; Domain=.example.com' + COOKIE_HEADER = {'Cookie': 'a=b; c=d'} + + ydl = FakeYDL() + ydl.process_info = lambda x: ydl._write_info_json('test', x, TEST_FILE) + + def make_info(info_header_cookies=False, fmts_header_cookies=False, cookies_field=False): + fmt = {'url': TEST_URL} + if fmts_header_cookies: + fmt['http_headers'] = COOKIE_HEADER + if cookies_field: + fmt['cookies'] = COOKIES + return _make_result([fmt], http_headers=COOKIE_HEADER if info_header_cookies else None) + + def test(initial_info, note): + + def failure_msg(why): + return ' when '.join((why, note)) + + result = {} + result['processed'] = ydl.process_ie_result(initial_info) + self.assertTrue(ydl.cookiejar.get_cookies_for_url(TEST_URL), + msg=failure_msg('No cookies set in cookiejar after initial process')) + ydl.cookiejar.clear() + with open(TEST_FILE) as infojson: + result['loaded'] = ydl.sanitize_info(json.load(infojson), True) + result['final'] = ydl.process_ie_result(result['loaded'].copy(), download=False) + self.assertTrue(ydl.cookiejar.get_cookies_for_url(TEST_URL), + msg=failure_msg('No cookies set in cookiejar after final process')) + ydl.cookiejar.clear() + for key in ('processed', 'loaded', 'final'): + info = result[key] + self.assertIsNone( + traverse_obj(info, ((None, ('formats', 0)), 'http_headers', 'Cookie'), casesense=False, get_all=False), + msg=failure_msg('Cookie header not removed in {0} result'.format(key))) + self.assertSameCookieStrings( + traverse_obj(info, ((None, ('formats', 0)), 'cookies'), get_all=False), COOKIES, + msg=failure_msg('No cookies field found in {0} result'.format(key))) + + test({'url': TEST_URL, 'http_headers': COOKIE_HEADER, 'id': '1', 'title': 'x'}, 'no formats field') + test(make_info(info_header_cookies=True), 'info_dict header cokies') + test(make_info(fmts_header_cookies=True), 'format header cookies') + test(make_info(info_header_cookies=True, fmts_header_cookies=True), 'info_dict and format header cookies') + test(make_info(info_header_cookies=True, fmts_header_cookies=True, cookies_field=True), 'all cookies fields') + test(make_info(cookies_field=True), 'cookies format field') + test({'url': TEST_URL, 'cookies': COOKIES, 'id': '1', 'title': 'x'}, 'info_dict cookies field only') + + try_rm(TEST_FILE) + + def test_add_headers_cookie(self): + def check_for_cookie_header(result): + return traverse_obj(result, ((None, ('formats', 0)), 'http_headers', 'Cookie'), casesense=False, get_all=False) + + ydl = FakeYDL({'http_headers': {'Cookie': 'a=b'}}) + ydl._apply_header_cookies(_make_result([])['webpage_url']) # Scope to input webpage URL: .example.com + + fmt = {'url': 'https://example.com/video.mp4'} + result = ydl.process_ie_result(_make_result([fmt]), download=False) + self.assertIsNone(check_for_cookie_header(result), msg='http_headers cookies in result info_dict') + self.assertEqual(result.get('cookies'), 'a=b; Domain=.example.com', msg='No cookies were set in cookies field') + self.assertIn('a=b', ydl.cookiejar.get_cookie_header(fmt['url']), msg='No cookies were set in cookiejar') + + fmt = {'url': 'https://wrong.com/video.mp4'} + result = ydl.process_ie_result(_make_result([fmt]), download=False) + self.assertIsNone(check_for_cookie_header(result), msg='http_headers cookies for wrong domain') + self.assertFalse(result.get('cookies'), msg='Cookies set in cookies field for wrong domain') + self.assertFalse(ydl.cookiejar.get_cookie_header(fmt['url']), msg='Cookies set in cookiejar for wrong domain') + + if __name__ == '__main__': unittest.main() diff --git a/test/test_YoutubeDLCookieJar.py b/test/test_YoutubeDLCookieJar.py index 05f48bd74..4f9dd71ae 100644 --- a/test/test_YoutubeDLCookieJar.py +++ b/test/test_YoutubeDLCookieJar.py @@ -46,6 +46,20 @@ class TestYoutubeDLCookieJar(unittest.TestCase): # will be ignored self.assertFalse(cookiejar._cookies) + def test_get_cookie_header(self): + cookiejar = YoutubeDLCookieJar('./test/testdata/cookies/httponly_cookies.txt') + cookiejar.load(ignore_discard=True, ignore_expires=True) + header = cookiejar.get_cookie_header('https://www.foobar.foobar') + self.assertIn('HTTPONLY_COOKIE', header) + + def test_get_cookies_for_url(self): + cookiejar = YoutubeDLCookieJar('./test/testdata/cookies/session_cookies.txt') + cookiejar.load(ignore_discard=True, ignore_expires=True) + cookies = cookiejar.get_cookies_for_url('https://www.foobar.foobar/') + self.assertEqual(len(cookies), 2) + cookies = cookiejar.get_cookies_for_url('https://foobar.foobar/') + self.assertFalse(cookies) + if __name__ == '__main__': unittest.main() -- cgit 1.4.1