600 lines
28 KiB
Python
600 lines
28 KiB
Python
# Copyright (c) 2010 OpenStack, LLC.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from __future__ import with_statement
|
|
import unittest
|
|
import os
|
|
from shutil import rmtree
|
|
from StringIO import StringIO
|
|
from uuid import uuid4
|
|
from logging import StreamHandler
|
|
|
|
from webob import Request
|
|
|
|
from swift.auth import server as auth_server
|
|
from swift.common.db import DatabaseConnectionError
|
|
from swift.common.utils import get_logger
|
|
|
|
|
|
class TestException(Exception):
|
|
pass
|
|
|
|
|
|
def fake_http_connect(*code_iter, **kwargs):
|
|
class FakeConn(object):
|
|
def __init__(self, status):
|
|
self.status = status
|
|
self.reason = 'Fake'
|
|
self.host = '1.2.3.4'
|
|
self.port = '1234'
|
|
def getresponse(self):
|
|
if 'slow' in kwargs:
|
|
sleep(0.2)
|
|
if 'raise_exc' in kwargs:
|
|
raise kwargs['raise_exc']
|
|
return self
|
|
def getheaders(self):
|
|
return {'x-account-bytes-used': '20'}
|
|
def read(self, amt=None):
|
|
return ''
|
|
def getheader(self, name):
|
|
return self.getheaders().get(name.lower())
|
|
code_iter = iter(code_iter)
|
|
def connect(*args, **ckwargs):
|
|
if 'give_content_type' in kwargs:
|
|
if len(args) >= 7 and 'content_type' in args[6]:
|
|
kwargs['give_content_type'](args[6]['content-type'])
|
|
else:
|
|
kwargs['give_content_type']('')
|
|
return FakeConn(code_iter.next())
|
|
return connect
|
|
|
|
|
|
class FakeRing(object):
|
|
def get_nodes(self, path):
|
|
return 1, [{'ip': '10.0.0.%s' % x, 'port': 1000+x, 'device': 'sda'}
|
|
for x in xrange(3)]
|
|
|
|
|
|
class TestAuthServer(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.testdir = os.path.join(os.path.dirname(__file__),
|
|
'auth_server')
|
|
rmtree(self.testdir, ignore_errors=1)
|
|
os.mkdir(self.testdir)
|
|
self.conf = {'swift_dir': self.testdir, 'log_name': 'auth'}
|
|
self.controller = auth_server.AuthController(self.conf, FakeRing())
|
|
|
|
def tearDown(self):
|
|
rmtree(self.testdir, ignore_errors=1)
|
|
|
|
def test_get_conn(self):
|
|
with self.controller.get_conn() as conn:
|
|
pass
|
|
exc = False
|
|
try:
|
|
with self.controller.get_conn() as conn:
|
|
raise TestException('test')
|
|
except TestException:
|
|
exc = True
|
|
self.assert_(exc)
|
|
# We allow reentrant calls for the auth-server
|
|
with self.controller.get_conn() as conn1:
|
|
exc = False
|
|
try:
|
|
with self.controller.get_conn() as conn2:
|
|
self.assert_(conn1 is not conn2)
|
|
except DatabaseConnectionError:
|
|
exc = True
|
|
self.assert_(not exc)
|
|
self.controller.conn = None
|
|
with self.controller.get_conn() as conn:
|
|
self.assert_(conn is not None)
|
|
|
|
def test_validate_token_non_existant_token(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
cfaccount = self.controller.create_account(
|
|
'test', 'tester', 'testing',).split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Storage-User': 'tester',
|
|
'X-Storage-Pass': 'testing'}))
|
|
token = res.headers['x-storage-token']
|
|
self.assertEquals(self.controller.validate_token(token + 'bad',
|
|
cfaccount), False)
|
|
|
|
def test_validate_token_non_existant_cfaccount(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
cfaccount = self.controller.create_account(
|
|
'test', 'tester', 'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Storage-User': 'tester',
|
|
'X-Storage-Pass': 'testing'}))
|
|
token = res.headers['x-storage-token']
|
|
self.assertEquals(self.controller.validate_token(token,
|
|
cfaccount + 'bad'), False)
|
|
|
|
def test_validate_token_good(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
cfaccount = self.controller.create_account(
|
|
'test', 'tester', 'testing',).split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Storage-User': 'tester',
|
|
'X-Storage-Pass': 'testing'}))
|
|
token = res.headers['x-storage-token']
|
|
ttl = self.controller.validate_token(token, cfaccount)
|
|
self.assert_(ttl > 0, repr(ttl))
|
|
|
|
def test_validate_token_expired(self):
|
|
orig_time = auth_server.time
|
|
try:
|
|
auth_server.time = lambda: 1
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
cfaccount = self.controller.create_account('test', 'tester',
|
|
'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Storage-User': 'tester',
|
|
'X-Storage-Pass': 'testing'}))
|
|
token = res.headers['x-storage-token']
|
|
ttl = self.controller.validate_token(
|
|
token, cfaccount)
|
|
self.assert_(ttl > 0, repr(ttl))
|
|
auth_server.time = lambda: 1 + self.controller.token_life
|
|
self.assertEquals(self.controller.validate_token(
|
|
token, cfaccount), False)
|
|
finally:
|
|
auth_server.time = orig_time
|
|
|
|
def test_create_account_no_new_account(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
result = self.controller.create_account('', 'tester', 'testing')
|
|
self.assertFalse(result)
|
|
|
|
def test_create_account_no_new_user(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
result = self.controller.create_account('test', '', 'testing')
|
|
self.assertFalse(result)
|
|
|
|
def test_create_account_no_new_password(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
result = self.controller.create_account('test', 'tester', '')
|
|
self.assertFalse(result)
|
|
|
|
def test_create_account_good(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
url = self.controller.create_account('test', 'tester', 'testing')
|
|
self.assert_(url)
|
|
self.assertEquals('/'.join(url.split('/')[:-1]),
|
|
self.controller.default_cluster_url.rstrip('/'), repr(url))
|
|
|
|
def test_recreate_accounts_none(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
rv = self.controller.recreate_accounts()
|
|
self.assertEquals(rv.split()[0], '0', repr(rv))
|
|
self.assertEquals(rv.split()[-1], '[]', repr(rv))
|
|
|
|
def test_recreate_accounts_one(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
self.controller.create_account('test', 'tester', 'testing')
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
rv = self.controller.recreate_accounts()
|
|
self.assertEquals(rv.split()[0], '1', repr(rv))
|
|
self.assertEquals(rv.split()[-1], '[]', repr(rv))
|
|
|
|
def test_recreate_accounts_several(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
self.controller.create_account('test1', 'tester', 'testing')
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
self.controller.create_account('test2', 'tester', 'testing')
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
self.controller.create_account('test3', 'tester', 'testing')
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
self.controller.create_account('test4', 'tester', 'testing')
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201,
|
|
201, 201, 201,
|
|
201, 201, 201,
|
|
201, 201, 201)
|
|
rv = self.controller.recreate_accounts()
|
|
self.assertEquals(rv.split()[0], '4', repr(rv))
|
|
self.assertEquals(rv.split()[-1], '[]', repr(rv))
|
|
|
|
def test_recreate_accounts_one_fail(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
url = self.controller.create_account('test', 'tester', 'testing')
|
|
cfaccount = url.split('/')[-1]
|
|
auth_server.http_connect = fake_http_connect(500, 500, 500)
|
|
rv = self.controller.recreate_accounts()
|
|
self.assertEquals(rv.split()[0], '1', repr(rv))
|
|
self.assertEquals(rv.split()[-1], '[%s]' % repr(cfaccount),
|
|
repr(rv))
|
|
|
|
def test_recreate_accounts_several_fail(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
url = self.controller.create_account('test1', 'tester', 'testing')
|
|
cfaccounts = [url.split('/')[-1]]
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
url = self.controller.create_account('test2', 'tester', 'testing')
|
|
cfaccounts.append(url.split('/')[-1])
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
url = self.controller.create_account('test3', 'tester', 'testing')
|
|
cfaccounts.append(url.split('/')[-1])
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
url = self.controller.create_account('test4', 'tester', 'testing')
|
|
cfaccounts.append(url.split('/')[-1])
|
|
auth_server.http_connect = fake_http_connect(500, 500, 500,
|
|
500, 500, 500,
|
|
500, 500, 500,
|
|
500, 500, 500)
|
|
rv = self.controller.recreate_accounts()
|
|
self.assertEquals(rv.split()[0], '4', repr(rv))
|
|
failed = rv.split('[', 1)[-1][:-1].split(', ')
|
|
self.assertEquals(failed, [repr(a) for a in cfaccounts])
|
|
|
|
def test_recreate_accounts_several_fail_some(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
url = self.controller.create_account('test1', 'tester', 'testing')
|
|
cfaccounts = [url.split('/')[-1]]
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
url = self.controller.create_account('test2', 'tester', 'testing')
|
|
cfaccounts.append(url.split('/')[-1])
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
url = self.controller.create_account('test3', 'tester', 'testing')
|
|
cfaccounts.append(url.split('/')[-1])
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
url = self.controller.create_account('test4', 'tester', 'testing')
|
|
cfaccounts.append(url.split('/')[-1])
|
|
auth_server.http_connect = fake_http_connect(500, 500, 500,
|
|
201, 201, 201,
|
|
500, 500, 500,
|
|
201, 201, 201)
|
|
rv = self.controller.recreate_accounts()
|
|
self.assertEquals(rv.split()[0], '4', repr(rv))
|
|
failed = rv.split('[', 1)[-1][:-1].split(', ')
|
|
expected = []
|
|
for i, value in enumerate(cfaccounts):
|
|
if not i % 2:
|
|
expected.append(repr(value))
|
|
self.assertEquals(failed, expected)
|
|
|
|
def test_auth_bad_path(self):
|
|
self.assertRaises(ValueError, self.controller.handle_auth,
|
|
Request.blank('', environ={'REQUEST_METHOD': 'GET'}))
|
|
res = self.controller.handle_auth(Request.blank('/bad',
|
|
environ={'REQUEST_METHOD': 'GET'}))
|
|
self.assertEquals(res.status_int, 400)
|
|
|
|
def test_auth_SOSO_missing_headers(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
cfaccount = self.controller.create_account(
|
|
'test', 'tester', 'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Storage-Pass': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Storage-User': 'tester'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
|
|
def test_auth_SOSO_bad_account(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
cfaccount = self.controller.create_account(
|
|
'test', 'tester', 'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/v1/testbad/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Storage-User': 'tester',
|
|
'X-Storage-Pass': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
res = self.controller.handle_auth(Request.blank('/v1//auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Storage-User': 'tester',
|
|
'X-Storage-Pass': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
|
|
def test_auth_SOSO_bad_user(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
cfaccount = self.controller.create_account(
|
|
'test', 'tester', 'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Storage-User': 'testerbad',
|
|
'X-Storage-Pass': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Storage-User': '',
|
|
'X-Storage-Pass': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
|
|
def test_auth_SOSO_bad_password(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
cfaccount = self.controller.create_account(
|
|
'test', 'tester', 'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Storage-User': 'tester',
|
|
'X-Storage-Pass': 'testingbad'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Storage-User': 'tester',
|
|
'X-Storage-Pass': ''}))
|
|
self.assertEquals(res.status_int, 401)
|
|
|
|
def test_auth_SOSO_good(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
cfaccount = self.controller.create_account(
|
|
'test', 'tester', 'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Storage-User': 'tester',
|
|
'X-Storage-Pass': 'testing'}))
|
|
token = res.headers['x-storage-token']
|
|
ttl = self.controller.validate_token(token, cfaccount)
|
|
self.assert_(ttl > 0, repr(ttl))
|
|
|
|
def test_auth_SOSO_good_Mosso_headers(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
cfaccount = self.controller.create_account(
|
|
'test', 'tester', 'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': 'test:tester',
|
|
'X-Auth-Key': 'testing'}))
|
|
token = res.headers['x-storage-token']
|
|
ttl = self.controller.validate_token(token, cfaccount)
|
|
self.assert_(ttl > 0, repr(ttl))
|
|
|
|
def test_auth_SOSO_bad_Mosso_headers(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
cfaccount = self.controller.create_account(
|
|
'test', 'tester', 'testing',).split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': 'test2:tester',
|
|
'X-Auth-Key': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': ':tester',
|
|
'X-Auth-Key': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
res = self.controller.handle_auth(Request.blank('/v1/test/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': 'test:',
|
|
'X-Auth-Key': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
|
|
def test_auth_Mosso_missing_headers(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
cfaccount = self.controller.create_account(
|
|
'test', 'tester', 'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/auth',
|
|
environ={'REQUEST_METHOD': 'GET'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
res = self.controller.handle_auth(Request.blank('/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-Key': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
res = self.controller.handle_auth(Request.blank('/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': 'test:tester'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
|
|
def test_auth_Mosso_bad_header_format(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
cfaccount = self.controller.create_account(
|
|
'test', 'tester', 'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': 'badformat',
|
|
'X-Auth-Key': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
res = self.controller.handle_auth(Request.blank('/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': '',
|
|
'X-Auth-Key': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
|
|
def test_auth_Mosso_bad_account(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
cfaccount = self.controller.create_account(
|
|
'test', 'tester', 'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': 'testbad:tester',
|
|
'X-Auth-Key': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
res = self.controller.handle_auth(Request.blank('/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': ':tester',
|
|
'X-Auth-Key': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
|
|
def test_auth_Mosso_bad_user(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
cfaccount = self.controller.create_account(
|
|
'test', 'tester', 'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': 'test:testerbad',
|
|
'X-Auth-Key': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
res = self.controller.handle_auth(Request.blank('/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': 'test:',
|
|
'X-Auth-Key': 'testing'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
|
|
def test_auth_Mosso_bad_password(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
cfaccount = self.controller.create_account(
|
|
'test', 'tester', 'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': 'test:tester',
|
|
'X-Auth-Key': 'testingbad'}))
|
|
self.assertEquals(res.status_int, 401)
|
|
res = self.controller.handle_auth(Request.blank('/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': 'test:tester',
|
|
'X-Auth-Key': ''}))
|
|
self.assertEquals(res.status_int, 401)
|
|
|
|
def test_auth_Mosso_good(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
cfaccount = self.controller.create_account(
|
|
'test', 'tester', 'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Auth-User': 'test:tester',
|
|
'X-Auth-Key': 'testing'}))
|
|
token = res.headers['x-storage-token']
|
|
ttl = self.controller.validate_token(token, cfaccount)
|
|
self.assert_(ttl > 0, repr(ttl))
|
|
|
|
def test_auth_Mosso_good_SOSO_header_names(self):
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
cfaccount = self.controller.create_account(
|
|
'test', 'tester', 'testing').split('/')[-1]
|
|
res = self.controller.handle_auth(Request.blank('/auth',
|
|
environ={'REQUEST_METHOD': 'GET'},
|
|
headers={'X-Storage-User': 'test:tester',
|
|
'X-Storage-Pass': 'testing'}))
|
|
token = res.headers['x-storage-token']
|
|
ttl = self.controller.validate_token(token, cfaccount)
|
|
self.assert_(ttl > 0, repr(ttl))
|
|
|
|
def test_basic_logging(self):
|
|
log = StringIO()
|
|
log_handler = StreamHandler(log)
|
|
logger = get_logger(self.conf, 'auth')
|
|
logger.logger.addHandler(log_handler)
|
|
try:
|
|
auth_server.http_connect = fake_http_connect(201, 201, 201)
|
|
url = self.controller.create_account('test', 'tester', 'testing')
|
|
self.assertEquals(log.getvalue().rsplit(' ', 1)[0],
|
|
"auth SUCCESS create_account('test', 'tester', _) = %s" %
|
|
repr(url))
|
|
log.truncate(0)
|
|
def start_response(*args):
|
|
pass
|
|
self.controller.handleREST({'REQUEST_METHOD': 'GET',
|
|
'SCRIPT_NAME': '',
|
|
'PATH_INFO': '/v1/test/auth',
|
|
'QUERY_STRING': 'test=True',
|
|
'SERVER_NAME': '127.0.0.1',
|
|
'SERVER_PORT': '8080',
|
|
'SERVER_PROTOCOL': 'HTTP/1.0',
|
|
'CONTENT_LENGTH': '0',
|
|
'wsgi.version': (1, 0),
|
|
'wsgi.url_scheme': 'http',
|
|
'wsgi.input': StringIO(),
|
|
'wsgi.errors': StringIO(),
|
|
'wsgi.multithread': False,
|
|
'wsgi.multiprocess': False,
|
|
'wsgi.run_once': False,
|
|
'HTTP_X_FORWARDED_FOR': 'testhost',
|
|
'HTTP_X_STORAGE_USER': 'tester',
|
|
'HTTP_X_STORAGE_PASS': 'testing'},
|
|
start_response)
|
|
logsegs = log.getvalue().split(' [', 1)
|
|
logsegs[1:] = logsegs[1].split('] ', 1)
|
|
logsegs[1] = '[01/Jan/2001:01:02:03 +0000]'
|
|
logsegs[2:] = logsegs[2].split(' ')
|
|
logsegs[-1] = '0.1234'
|
|
self.assertEquals(' '.join(logsegs), 'auth testhost - - '
|
|
'[01/Jan/2001:01:02:03 +0000] "GET /v1/test/auth?test=True '
|
|
'HTTP/1.0" 204 - "-" "-" - - - - - - - - - "-" "None" "-" '
|
|
'0.1234')
|
|
self.controller.log_headers = True
|
|
log.truncate(0)
|
|
self.controller.handleREST({'REQUEST_METHOD': 'GET',
|
|
'SCRIPT_NAME': '',
|
|
'PATH_INFO': '/v1/test/auth',
|
|
'SERVER_NAME': '127.0.0.1',
|
|
'SERVER_PORT': '8080',
|
|
'SERVER_PROTOCOL': 'HTTP/1.0',
|
|
'CONTENT_LENGTH': '0',
|
|
'wsgi.version': (1, 0),
|
|
'wsgi.url_scheme': 'http',
|
|
'wsgi.input': StringIO(),
|
|
'wsgi.errors': StringIO(),
|
|
'wsgi.multithread': False,
|
|
'wsgi.multiprocess': False,
|
|
'wsgi.run_once': False,
|
|
'HTTP_X_STORAGE_USER': 'tester',
|
|
'HTTP_X_STORAGE_PASS': 'testing'},
|
|
start_response)
|
|
logsegs = log.getvalue().split(' [', 1)
|
|
logsegs[1:] = logsegs[1].split('] ', 1)
|
|
logsegs[1] = '[01/Jan/2001:01:02:03 +0000]'
|
|
logsegs[2:] = logsegs[2].split(' ')
|
|
logsegs[-1] = '0.1234'
|
|
self.assertEquals(' '.join(logsegs), 'auth None - - [01/Jan/2001:'
|
|
'01:02:03 +0000] "GET /v1/test/auth HTTP/1.0" 204 - "-" "-" - '
|
|
'- - - - - - - - "-" "None" "Content-Length: 0\n'
|
|
'X-Storage-User: tester\nX-Storage-Pass: testing" 0.1234')
|
|
finally:
|
|
logger.logger.handlers.remove(log_handler)
|
|
|
|
def test_unhandled_exceptions(self):
|
|
def request_causing_exception(*args, **kwargs):
|
|
pass
|
|
def start_response(*args):
|
|
pass
|
|
orig_Request = auth_server.Request
|
|
log = StringIO()
|
|
log_handler = StreamHandler(log)
|
|
logger = get_logger(self.conf, 'auth')
|
|
logger.logger.addHandler(log_handler)
|
|
try:
|
|
auth_server.Request = request_causing_exception
|
|
self.controller.handleREST({'REQUEST_METHOD': 'GET',
|
|
'SCRIPT_NAME': '',
|
|
'PATH_INFO': '/v1/test/auth',
|
|
'SERVER_NAME': '127.0.0.1',
|
|
'SERVER_PORT': '8080',
|
|
'SERVER_PROTOCOL': 'HTTP/1.0',
|
|
'CONTENT_LENGTH': '0',
|
|
'wsgi.version': (1, 0),
|
|
'wsgi.url_scheme': 'http',
|
|
'wsgi.input': StringIO(),
|
|
'wsgi.errors': StringIO(),
|
|
'wsgi.multithread': False,
|
|
'wsgi.multiprocess': False,
|
|
'wsgi.run_once': False,
|
|
'HTTP_X_STORAGE_USER': 'tester',
|
|
'HTTP_X_STORAGE_PASS': 'testing'},
|
|
start_response)
|
|
self.assert_(log.getvalue().startswith(
|
|
'auth ERROR Unhandled exception in ReST request'),
|
|
log.getvalue())
|
|
log.truncate(0)
|
|
finally:
|
|
auth_server.Request = orig_Request
|
|
logger.logger.handlers.remove(log_handler)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|