
Swift servers can now be seamlessly reloaded by sending them a SIGUSR1 (instead of a SIGHUP). The server forks off a synchronized child to wait to close the old listen socket(s) until the new server has started up and bound its listen socket(s). The new server is exec'ed from the old one so its PID doesn't change. This makes Systemd happier, so a ReloadExec= stanza can now be used. The seamless part means that incoming connections will alwyas get accepted either by the old server or the new one. This eliminates client-perceived "downtime" during server reloads, while allowing the server to fully reload, re-reading configuration, becoming a fresh Python interpreter instance, etc. The SO_REUSEPORT socket option has already been getting used, so nothing had to change there. This patch also includes a non-invasive fix for a current eventlet bug; see https://github.com/eventlet/eventlet/pull/590 That bug prevents a SIGHUP "reload" from properly servicing existing requests before old worker processes close sockets and exit. The existing probtests missed this, but the new ones, in this patch, caught it. New probe tests cover both old SIGHUP "reload" behavior as well as the new SIGUSR1 seamless reload behavior. Change-Id: I3e5229d2fb04be67e53533ff65b0870038accbb7
402 lines
14 KiB
Python
402 lines
14 KiB
Python
#!/usr/bin/python -u
|
|
# Copyright (c) 2010-2012 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import unittest
|
|
|
|
from contextlib import contextmanager
|
|
import eventlet
|
|
import json
|
|
import os
|
|
import random
|
|
import shutil
|
|
import time
|
|
from uuid import uuid4
|
|
|
|
from six.moves import http_client as httplib
|
|
|
|
from swift.common.storage_policy import POLICIES
|
|
from swift.common.ring import Ring
|
|
from swift.common.manager import Manager
|
|
|
|
from test.probe.common import resetswift, ReplProbeTest, client
|
|
|
|
|
|
def putrequest(conn, method, path, headers):
|
|
|
|
conn.putrequest(method, path, skip_host=(headers and 'Host' in headers))
|
|
if headers:
|
|
for header, value in headers.items():
|
|
conn.putheader(header, str(value))
|
|
conn.endheaders()
|
|
|
|
|
|
def get_server_and_worker_pids(manager, old_workers=None):
|
|
# Gets all the server parent pids, as well as the set of all worker PIDs
|
|
# (i.e. any PID whose PPID is in the set of parent pids).
|
|
server_pid_set = {pid for server in manager.servers
|
|
for (_, pid) in server.iter_pid_files()}
|
|
children_pid_set = set()
|
|
old_worker_pid_set = set(old_workers or [])
|
|
all_pids = [int(f) for f in os.listdir('/proc') if f.isdigit()]
|
|
for pid in all_pids:
|
|
try:
|
|
with open('/proc/%d/status' % pid, 'r') as fh:
|
|
for line in fh:
|
|
if line.startswith('PPid:\t'):
|
|
ppid = int(line[6:])
|
|
if ppid in server_pid_set or pid in old_worker_pid_set:
|
|
children_pid_set.add(pid)
|
|
break
|
|
except Exception:
|
|
# No big deal, a process could have exited since we listed /proc,
|
|
# so we just ignore errors
|
|
pass
|
|
return {'server': server_pid_set, 'worker': children_pid_set}
|
|
|
|
|
|
def wait_for_pids(manager, callback, timeout=15, old_workers=None):
|
|
# Waits up to `timeout` seconds for the supplied callback to return True
|
|
# when passed in the manager's pid set.
|
|
start_time = time.time()
|
|
|
|
pid_sets = get_server_and_worker_pids(manager, old_workers=old_workers)
|
|
got = callback(pid_sets)
|
|
while not got and time.time() - start_time < timeout:
|
|
time.sleep(0.1)
|
|
pid_sets = get_server_and_worker_pids(manager, old_workers=old_workers)
|
|
got = callback(pid_sets)
|
|
if time.time() - start_time >= timeout:
|
|
raise AssertionError('timed out waiting for PID state; got %r' % (
|
|
pid_sets))
|
|
return pid_sets
|
|
|
|
|
|
class TestWSGIServerProcessHandling(ReplProbeTest):
|
|
# Subclasses need to define SERVER_NAME
|
|
HAS_INFO = False
|
|
PID_TIMEOUT = 25
|
|
|
|
def setUp(self):
|
|
super(TestWSGIServerProcessHandling, self).setUp()
|
|
self.container = 'container-%s' % uuid4()
|
|
client.put_container(self.url, self.token, self.container,
|
|
headers={'X-Storage-Policy':
|
|
self.policy.name})
|
|
self.manager = Manager([self.SERVER_NAME])
|
|
for server in self.manager.servers:
|
|
self.assertTrue(server.get_running_pids,
|
|
'No running PIDs for %s' % server.cmd)
|
|
self.starting_pids = get_server_and_worker_pids(self.manager)
|
|
|
|
def assert4xx(self, resp):
|
|
self.assertEqual(resp.status // 100, 4)
|
|
got_body = resp.read()
|
|
try:
|
|
self.assertIn('resource could not be found', got_body)
|
|
except AssertionError:
|
|
self.assertIn('Invalid path: blah', got_body)
|
|
|
|
def get_conn(self):
|
|
ip, port = self.get_ip_port()
|
|
return httplib.HTTPConnection('%s:%s' % (ip, port))
|
|
|
|
def _check_reload(self):
|
|
conn = self.get_conn()
|
|
self.addCleanup(conn.close)
|
|
|
|
# sanity request
|
|
self.start_write_req(conn, 'sanity')
|
|
resp = self.finish_write_req(conn)
|
|
self.check_write_resp(resp)
|
|
|
|
if self.HAS_INFO:
|
|
self.check_info_value(8192)
|
|
|
|
# Start another write request before reloading...
|
|
self.start_write_req(conn, 'across-reload')
|
|
|
|
if self.HAS_INFO:
|
|
self.swap_configs() # new server's max_header_size == 8191
|
|
|
|
self.do_reload()
|
|
|
|
wait_for_pids(self.manager, self.make_post_reload_pid_cb(),
|
|
old_workers=self.starting_pids['worker'],
|
|
timeout=self.PID_TIMEOUT)
|
|
|
|
# ... and make sure we can finish what we were doing
|
|
resp = self.finish_write_req(conn)
|
|
self.check_write_resp(resp)
|
|
|
|
# After this, we're in a funny spot. With eventlet 0.22.0, the
|
|
# connection's now closed, but with prior versions we could keep
|
|
# going indefinitely. See https://bugs.launchpad.net/swift/+bug/1792615
|
|
|
|
# Close our connections, to make sure old eventlet shuts down
|
|
conn.close()
|
|
|
|
# sanity
|
|
wait_for_pids(self.manager, self.make_post_close_pid_cb(),
|
|
old_workers=self.starting_pids['worker'],
|
|
timeout=self.PID_TIMEOUT)
|
|
|
|
if self.HAS_INFO:
|
|
self.check_info_value(8191)
|
|
|
|
|
|
class OldReloadMixin(object):
|
|
def make_post_reload_pid_cb(self):
|
|
def _cb(post_reload_pids):
|
|
# We expect all old server PIDs to be gone, a new server present,
|
|
# and for there to be exactly 1 old worker PID plus additional new
|
|
# worker PIDs.
|
|
old_servers_dead = not (self.starting_pids['server'] &
|
|
post_reload_pids['server'])
|
|
one_old_worker = 1 == len(self.starting_pids['worker'] &
|
|
post_reload_pids['worker'])
|
|
new_workers_present = (post_reload_pids['worker'] -
|
|
self.starting_pids['worker'])
|
|
return (post_reload_pids['server'] and old_servers_dead and
|
|
one_old_worker and new_workers_present)
|
|
return _cb
|
|
|
|
def make_post_close_pid_cb(self):
|
|
def _cb(post_close_pids):
|
|
# We expect all old server PIDs to be gone, a new server present,
|
|
# no old worker PIDs, and additional new worker PIDs.
|
|
old_servers_dead = not (self.starting_pids['server'] &
|
|
post_close_pids['server'])
|
|
old_workers_dead = not (self.starting_pids['worker'] &
|
|
post_close_pids['worker'])
|
|
new_workers_present = (post_close_pids['worker'] -
|
|
self.starting_pids['worker'])
|
|
return (post_close_pids['server'] and old_servers_dead and
|
|
old_workers_dead and new_workers_present)
|
|
return _cb
|
|
|
|
def do_reload(self):
|
|
self.manager.reload()
|
|
|
|
|
|
class SeamlessReloadMixin(object):
|
|
def make_post_reload_pid_cb(self):
|
|
def _cb(post_reload_pids):
|
|
# We expect all orig server PIDs to STILL BE PRESENT, no new server
|
|
# present, and for there to be exactly 1 old worker PID plus
|
|
# additional new worker PIDs.
|
|
same_servers = (self.starting_pids['server'] ==
|
|
post_reload_pids['server'])
|
|
one_old_worker = 1 == len(self.starting_pids['worker'] &
|
|
post_reload_pids['worker'])
|
|
new_workers_present = (post_reload_pids['worker'] -
|
|
self.starting_pids['worker'])
|
|
return (post_reload_pids['server'] and same_servers and
|
|
one_old_worker and new_workers_present)
|
|
return _cb
|
|
|
|
def make_post_close_pid_cb(self):
|
|
def _cb(post_close_pids):
|
|
# We expect all orig server PIDs to STILL BE PRESENT, no new server
|
|
# present, no old worker PIDs, and additional new worker PIDs.
|
|
same_servers = (self.starting_pids['server'] ==
|
|
post_close_pids['server'])
|
|
old_workers_dead = not (self.starting_pids['worker'] &
|
|
post_close_pids['worker'])
|
|
new_workers_present = (post_close_pids['worker'] -
|
|
self.starting_pids['worker'])
|
|
return (post_close_pids['server'] and same_servers and
|
|
old_workers_dead and new_workers_present)
|
|
return _cb
|
|
|
|
def do_reload(self):
|
|
self.manager.reload_seamless()
|
|
|
|
|
|
class TestObjectServerReloadBase(TestWSGIServerProcessHandling):
|
|
SERVER_NAME = 'object'
|
|
PID_TIMEOUT = 35
|
|
|
|
def get_ip_port(self):
|
|
policy = random.choice(list(POLICIES))
|
|
policy.load_ring('/etc/swift')
|
|
self.ring_node = random.choice(policy.object_ring.get_part_nodes(1))
|
|
return self.ring_node['ip'], self.ring_node['port']
|
|
|
|
def start_write_req(self, conn, suffix):
|
|
putrequest(conn, 'PUT', '/%s/123/%s/%s/blah-%s' % (
|
|
self.ring_node['device'], self.account, self.container, suffix),
|
|
headers={'X-Timestamp': str(time.time()),
|
|
'Content-Type': 'application/octet-string',
|
|
'Content-Length': len(self.BODY)})
|
|
|
|
def finish_write_req(self, conn):
|
|
conn.send(self.BODY)
|
|
return conn.getresponse()
|
|
|
|
def check_write_resp(self, resp):
|
|
got_body = resp.read()
|
|
self.assertEqual(resp.status // 100, 2, 'Got status %d; %r' %
|
|
(resp.status, got_body))
|
|
self.assertEqual('', got_body)
|
|
return resp
|
|
|
|
|
|
class TestObjectServerReload(OldReloadMixin, TestObjectServerReloadBase):
|
|
BODY = 'test-object' * 10
|
|
|
|
def test_object_reload(self):
|
|
self._check_reload()
|
|
|
|
|
|
class TestObjectServerReloadSeamless(SeamlessReloadMixin,
|
|
TestObjectServerReloadBase):
|
|
BODY = 'test-object' * 10
|
|
|
|
def test_object_reload_seamless(self):
|
|
self._check_reload()
|
|
|
|
|
|
class TestProxyServerReloadBase(TestWSGIServerProcessHandling):
|
|
SERVER_NAME = 'proxy-server'
|
|
HAS_INFO = True
|
|
|
|
def setUp(self):
|
|
super(TestProxyServerReloadBase, self).setUp()
|
|
self.swift_conf_path = '/etc/swift/swift.conf'
|
|
self.new_swift_conf_path = self.swift_conf_path + '.new'
|
|
self.saved_swift_conf_path = self.swift_conf_path + '.orig'
|
|
shutil.copy(self.swift_conf_path, self.saved_swift_conf_path)
|
|
shutil.copy(self.swift_conf_path, self.new_swift_conf_path)
|
|
with open(self.new_swift_conf_path, 'a+') as fh:
|
|
fh.seek(0, os.SEEK_END)
|
|
fh.write('\n[swift-constraints]\nmax_header_size = 8191\n')
|
|
fh.flush()
|
|
|
|
def tearDown(self):
|
|
shutil.move(self.saved_swift_conf_path, self.swift_conf_path)
|
|
try:
|
|
os.unlink(self.new_swift_conf_path)
|
|
except OSError:
|
|
pass
|
|
super(TestProxyServerReloadBase, self).tearDown()
|
|
|
|
def swap_configs(self):
|
|
shutil.copy(self.new_swift_conf_path, self.swift_conf_path)
|
|
|
|
def get_ip_port(self):
|
|
return 'localhost', 8080
|
|
|
|
def assertMaxHeaderSize(self, resp, exp_max_header_size):
|
|
self.assertEqual(resp.status // 100, 2)
|
|
info_dict = json.loads(resp.read())
|
|
self.assertEqual(exp_max_header_size,
|
|
info_dict['swift']['max_header_size'])
|
|
|
|
def check_info_value(self, expected_value):
|
|
# show that we're talking to the original server with the default
|
|
# max_header_size == 8192
|
|
conn2 = self.get_conn()
|
|
putrequest(conn2, 'GET', '/info',
|
|
headers={'Content-Length': '0',
|
|
'Accept': 'application/json'})
|
|
conn2.send('')
|
|
resp = conn2.getresponse()
|
|
self.assertMaxHeaderSize(resp, expected_value)
|
|
conn2.close()
|
|
|
|
def start_write_req(self, conn, suffix):
|
|
putrequest(conn, 'PUT', '/v1/%s/%s/blah-%s' % (
|
|
self.account, self.container, suffix),
|
|
headers={'X-Auth-Token': self.token,
|
|
'Content-Length': len(self.BODY)})
|
|
|
|
def finish_write_req(self, conn):
|
|
conn.send(self.BODY)
|
|
return conn.getresponse()
|
|
|
|
def check_write_resp(self, resp):
|
|
got_body = resp.read()
|
|
self.assertEqual(resp.status // 100, 2, 'Got status %d; %r' %
|
|
(resp.status, got_body))
|
|
self.assertEqual('', got_body)
|
|
return resp
|
|
|
|
|
|
class TestProxyServerReload(OldReloadMixin, TestProxyServerReloadBase):
|
|
BODY = 'proxy' * 10
|
|
|
|
def test_proxy_reload(self):
|
|
self._check_reload()
|
|
|
|
|
|
class TestProxyServerReloadSeamless(SeamlessReloadMixin,
|
|
TestProxyServerReloadBase):
|
|
BODY = 'proxy-seamless' * 10
|
|
|
|
def test_proxy_reload_seamless(self):
|
|
self._check_reload()
|
|
|
|
|
|
@contextmanager
|
|
def spawn_services(ip_ports, timeout=10):
|
|
q = eventlet.Queue()
|
|
|
|
def service(sock):
|
|
try:
|
|
conn, address = sock.accept()
|
|
q.put(address)
|
|
eventlet.sleep(timeout)
|
|
conn.close()
|
|
finally:
|
|
sock.close()
|
|
|
|
pool = eventlet.GreenPool()
|
|
for ip, port in ip_ports:
|
|
sock = eventlet.listen((ip, port))
|
|
pool.spawn(service, sock)
|
|
|
|
try:
|
|
yield q
|
|
finally:
|
|
for gt in list(pool.coroutines_running):
|
|
gt.kill()
|
|
|
|
|
|
class TestHungDaemon(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
resetswift()
|
|
self.ip_ports = [
|
|
(dev['ip'], dev['port'])
|
|
for dev in Ring('/etc/swift', ring_name='account').devs
|
|
if dev
|
|
]
|
|
|
|
def test_main(self):
|
|
reconciler = Manager(['container-reconciler'])
|
|
with spawn_services(self.ip_ports) as q:
|
|
reconciler.start()
|
|
# wait for the reconciler to connect
|
|
q.get()
|
|
# once it's hung in our connection - send it sig term
|
|
print('Attempting to stop reconciler!')
|
|
reconciler.stop()
|
|
self.assertEqual(1, reconciler.status())
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|