diff --git a/charms/openstack-hypervisor/src/charm.py b/charms/openstack-hypervisor/src/charm.py index 32f2c928..b25697bb 100755 --- a/charms/openstack-hypervisor/src/charm.py +++ b/charms/openstack-hypervisor/src/charm.py @@ -35,29 +35,14 @@ import ops_sunbeam.charm as sunbeam_charm import ops_sunbeam.guard as sunbeam_guard import ops_sunbeam.ovn.relation_handlers as ovn_relation_handlers import ops_sunbeam.relation_handlers as sunbeam_rhandlers -from netifaces import AF_INET, gateways, ifaddresses from ops.charm import ActionEvent from ops.main import main +from utils import get_local_ip_by_default_route + logger = logging.getLogger(__name__) -def _get_local_ip_by_default_route() -> str: - """Get IP address of host associated with default gateway.""" - interface = "lo" - ip = "127.0.0.1" - - # TOCHK: Gathering only IPv4 - if "default" in gateways(): - interface = gateways()["default"][AF_INET][1] - - ip_list = ifaddresses(interface)[AF_INET] - if len(ip_list) > 0 and "addr" in ip_list[0]: - ip = ip_list[0]["addr"] - - return ip - - class HypervisorOperatorCharm(sunbeam_charm.OSBaseOperatorCharm): """Charm the service.""" @@ -179,7 +164,7 @@ class HypervisorOperatorCharm(sunbeam_charm.OSBaseOperatorCharm): self.check_relation_handlers_ready() config = self.model.config.get self.ensure_snap_present() - local_ip = _get_local_ip_by_default_route() + local_ip = get_local_ip_by_default_route() try: contexts = self.contexts() sb_connection_strs = list(contexts.ovsdb_cms.db_ingress_sb_connection_strs) diff --git a/charms/openstack-hypervisor/src/utils.py b/charms/openstack-hypervisor/src/utils.py new file mode 100644 index 00000000..94631852 --- /dev/null +++ b/charms/openstack-hypervisor/src/utils.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python3 + +# Copyright 2023 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Utility helper functions.""" + + +import logging +from typing import Optional + +import netifaces + +logger = logging.getLogger(__name__) + + +def _get_default_gw_iface_fallback() -> Optional[str]: + """Returns the default gateway interface. + + Parses the /proc/net/route table to determine the interface with a default + route. The interface with the default route will have a destination of 0x000000, + a mask of 0x000000 and will have flags indicating RTF_GATEWAY and RTF_UP. + + :return Optional[str, None]: the name of the interface the default gateway or + None if one cannot be found. + """ + # see include/uapi/linux/route.h in kernel source for more explanation + RTF_UP = 0x1 # noqa - route is usable + RTF_GATEWAY = 0x2 # noqa - destination is a gateway + + iface = None + with open("/proc/net/route", "r") as f: + contents = [line.strip() for line in f.readlines() if line.strip()] + logger.debug(contents) + + entries = [] + # First line is a header line of the table contents. Note, we skip blank entries + # by default there's an extra column due to an extra \t character for the table + # contents to line up. This is parsing the /proc/net/route and creating a set of + # entries. Each entry is a dict where the keys are table header and the values + # are the values in the table rows. + header = [col.strip().lower() for col in contents[0].split("\t") if col] + for row in contents[1:]: + cells = [col.strip() for col in row.split("\t") if col] + entries.append(dict(zip(header, cells))) + + def is_up(flags: str) -> bool: + return int(flags, 16) & RTF_UP == RTF_UP + + def is_gateway(flags: str) -> bool: + return int(flags, 16) & RTF_GATEWAY == RTF_GATEWAY + + # Check each entry to see if it has the default gateway. The default gateway + # will have destination and mask set to 0x00, will be up and is noted as a + # gateway. + for entry in entries: + if int(entry.get("destination", 0xFF), 16) != 0: + continue + if int(entry.get("mask", 0xFF), 16) != 0: + continue + flags = entry.get("flags", 0x00) + if is_up(flags) and is_gateway(flags): + iface = entry.get("iface", None) + break + + return iface + + +def get_ifaddresses_by_default_route() -> dict: + """Get address configuration from interface associated with default gateway.""" + interface = "lo" + ip = "127.0.0.1" + netmask = "255.0.0.0" + + # TOCHK: Gathering only IPv4 + default_gateways = netifaces.gateways().get("default", {}) + if default_gateways and netifaces.AF_INET in default_gateways: + interface = netifaces.gateways()["default"][netifaces.AF_INET][1] + else: + # There are some cases where netifaces doesn't return the machine's default + # gateway, but it does exist. Let's check the /proc/net/route table to see + # if we can find the proper gateway. + interface = _get_default_gw_iface_fallback() or "lo" + + ip_list = netifaces.ifaddresses(interface)[netifaces.AF_INET] + if len(ip_list) > 0 and "addr" in ip_list[0]: + return ip_list[0] + + return {"addr": ip, "netmask": netmask} + + +def get_local_ip_by_default_route() -> str: + """Get IP address of host associated with default gateway.""" + return get_ifaddresses_by_default_route()["addr"] diff --git a/charms/openstack-hypervisor/tests/unit/test_charm.py b/charms/openstack-hypervisor/tests/unit/test_charm.py index d08e94c3..8391ff90 100644 --- a/charms/openstack-hypervisor/tests/unit/test_charm.py +++ b/charms/openstack-hypervisor/tests/unit/test_charm.py @@ -33,7 +33,7 @@ class _HypervisorOperatorCharm(charm.HypervisorOperatorCharm): class TestCharm(test_utils.CharmTestCase): - PATCHES = ["socket", "snap", "_get_local_ip_by_default_route"] + PATCHES = ["socket", "snap", "get_local_ip_by_default_route", "os"] def setUp(self): """Setup OpenStack Hypervisor tests.""" @@ -79,7 +79,7 @@ class TestCharm(test_utils.CharmTestCase): def test_all_relations(self): """Test all the charms relations.""" - self._get_local_ip_by_default_route.return_value = "10.0.0.10" + self.get_local_ip_by_default_route.return_value = "10.0.0.10" hypervisor_snap_mock = mock.MagicMock() hypervisor_snap_mock.present = False self.snap.SnapState.Latest = "latest"