Unpin pydantic for openstack-hypervisor, cinder

Unpin pydantic for openstack-hypervisor, cinder
Update cos-agent librrary as latest version uses
pydantic 2.x
Update cosl package version

Change-Id: I90dfa1807222313b22b5c2e40447bf644bb87008
This commit is contained in:
Hemanth Nakkina 2024-08-29 08:27:10 +05:30 committed by Hemanth N
parent 41fc880836
commit 199619e6c5
4 changed files with 577 additions and 14 deletions

View File

@ -26,5 +26,5 @@ parts:
charm-binary-python-packages:
- cryptography
- jsonschema
- pydantic<2.0
- pydantic
- jinja2

View File

@ -22,4 +22,5 @@ parts:
charm-binary-python-packages:
- cryptography
- jsonschema
- pydantic
- jinja2

View File

@ -4,8 +4,8 @@ pyroute2
netifaces
jsonschema
jinja2
cosl==0.0.11 ; python_version >= "3.8"
pydantic==1.10.12 ; python_version >= "3.8"
cosl==0.0.24
pydantic
# This charm does not use lightkube* but ops_sunbeam requires it atm
lightkube

View File

@ -22,7 +22,7 @@ this charm library.
Using the `COSAgentProvider` object only requires instantiating it,
typically in the `__init__` method of your charm (the one which sends telemetry).
The constructor of `COSAgentProvider` has only one required and nine optional parameters:
The constructor of `COSAgentProvider` has only one required and ten optional parameters:
```python
def __init__(
@ -36,6 +36,7 @@ The constructor of `COSAgentProvider` has only one required and nine optional pa
log_slots: Optional[List[str]] = None,
dashboard_dirs: Optional[List[str]] = None,
refresh_events: Optional[List] = None,
tracing_protocols: Optional[List[str]] = None,
scrape_configs: Optional[Union[List[Dict], Callable]] = None,
):
```
@ -65,6 +66,8 @@ The constructor of `COSAgentProvider` has only one required and nine optional pa
- `refresh_events`: List of events on which to refresh relation data.
- `tracing_protocols`: List of requested tracing protocols that the charm requires to send traces.
- `scrape_configs`: List of standard scrape_configs dicts or a callable that returns the list in
case the configs need to be generated dynamically. The contents of this list will be merged
with the configs from `metrics_endpoints`.
@ -108,6 +111,7 @@ class TelemetryProviderCharm(CharmBase):
log_slots=["my-app:slot"],
dashboard_dirs=["./src/dashboards_1", "./src/dashboards_2"],
refresh_events=["update-status", "upgrade-charm"],
tracing_protocols=["otlp_http", "otlp_grpc"],
scrape_configs=[
{
"job_name": "custom_job",
@ -206,19 +210,34 @@ class GrafanaAgentMachineCharm(GrafanaAgentCharm)
```
"""
import enum
import json
import logging
import socket
from collections import namedtuple
from itertools import chain
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, ClassVar, Dict, List, Optional, Set, Tuple, Union
from typing import (
TYPE_CHECKING,
Any,
Callable,
ClassVar,
Dict,
List,
Literal,
MutableMapping,
Optional,
Set,
Tuple,
Union,
)
import pydantic
from cosl import GrafanaDashboard, JujuTopology
from cosl.rules import AlertRules
from ops.charm import RelationChangedEvent
from ops.framework import EventBase, EventSource, Object, ObjectEvents
from ops.model import Relation
from ops.model import ModelError, Relation
from ops.testing import CharmType
if TYPE_CHECKING:
@ -234,9 +253,9 @@ if TYPE_CHECKING:
LIBID = "dc15fa84cef84ce58155fb84f6c6213a"
LIBAPI = 0
LIBPATCH = 8
LIBPATCH = 11
PYDEPS = ["cosl", "pydantic < 2"]
PYDEPS = ["cosl", "pydantic"]
DEFAULT_RELATION_NAME = "cos-agent"
DEFAULT_PEER_RELATION_NAME = "peers"
@ -249,7 +268,207 @@ logger = logging.getLogger(__name__)
SnapEndpoint = namedtuple("SnapEndpoint", "owner, name")
class CosAgentProviderUnitData(pydantic.BaseModel):
# Note: MutableMapping is imported from the typing module and not collections.abc
# because subscripting collections.abc.MutableMapping was added in python 3.9, but
# most of our charms are based on 20.04, which has python 3.8.
_RawDatabag = MutableMapping[str, str]
class TransportProtocolType(str, enum.Enum):
"""Receiver Type."""
http = "http"
grpc = "grpc"
receiver_protocol_to_transport_protocol = {
"zipkin": TransportProtocolType.http,
"kafka": TransportProtocolType.http,
"tempo_http": TransportProtocolType.http,
"tempo_grpc": TransportProtocolType.grpc,
"otlp_grpc": TransportProtocolType.grpc,
"otlp_http": TransportProtocolType.http,
"jaeger_thrift_http": TransportProtocolType.http,
}
_tracing_receivers_ports = {
# OTLP receiver: see
# https://github.com/open-telemetry/opentelemetry-collector/tree/v0.96.0/receiver/otlpreceiver
"otlp_http": 4318,
"otlp_grpc": 4317,
# Jaeger receiver: see
# https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.96.0/receiver/jaegerreceiver
"jaeger_grpc": 14250,
"jaeger_thrift_http": 14268,
# Zipkin receiver: see
# https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.96.0/receiver/zipkinreceiver
"zipkin": 9411,
}
ReceiverProtocol = Literal["otlp_grpc", "otlp_http", "zipkin", "jaeger_thrift_http", "jaeger_grpc"]
class TracingError(Exception):
"""Base class for custom errors raised by tracing."""
class NotReadyError(TracingError):
"""Raised by the provider wrapper if a requirer hasn't published the required data (yet)."""
class ProtocolNotRequestedError(TracingError):
"""Raised if the user attempts to obtain an endpoint for a protocol it did not request."""
class DataValidationError(TracingError):
"""Raised when data validation fails on IPU relation data."""
class AmbiguousRelationUsageError(TracingError):
"""Raised when one wrongly assumes that there can only be one relation on an endpoint."""
# TODO we want to eventually use `DatabagModel` from cosl but it likely needs a move to common package first
if int(pydantic.version.VERSION.split(".")[0]) < 2: # type: ignore
class DatabagModel(pydantic.BaseModel): # type: ignore
"""Base databag model."""
class Config:
"""Pydantic config."""
# ignore any extra fields in the databag
extra = "ignore"
"""Ignore any extra fields in the databag."""
allow_population_by_field_name = True
"""Allow instantiating this class by field name (instead of forcing alias)."""
_NEST_UNDER = None
@classmethod
def load(cls, databag: MutableMapping):
"""Load this model from a Juju databag."""
if cls._NEST_UNDER:
return cls.parse_obj(json.loads(databag[cls._NEST_UNDER]))
try:
data = {
k: json.loads(v)
for k, v in databag.items()
# Don't attempt to parse model-external values
if k in {f.alias for f in cls.__fields__.values()}
}
except json.JSONDecodeError as e:
msg = f"invalid databag contents: expecting json. {databag}"
logger.error(msg)
raise DataValidationError(msg) from e
try:
return cls.parse_raw(json.dumps(data)) # type: ignore
except pydantic.ValidationError as e:
msg = f"failed to validate databag: {databag}"
logger.debug(msg, exc_info=True)
raise DataValidationError(msg) from e
def dump(self, databag: Optional[MutableMapping] = None, clear: bool = True):
"""Write the contents of this model to Juju databag.
:param databag: the databag to write the data to.
:param clear: ensure the databag is cleared before writing it.
"""
if clear and databag:
databag.clear()
if databag is None:
databag = {}
if self._NEST_UNDER:
databag[self._NEST_UNDER] = self.json(by_alias=True)
return databag
dct = self.dict()
for key, field in self.__fields__.items(): # type: ignore
value = dct[key]
databag[field.alias or key] = json.dumps(value)
return databag
else:
from pydantic import ConfigDict
class DatabagModel(pydantic.BaseModel):
"""Base databag model."""
model_config = ConfigDict(
# ignore any extra fields in the databag
extra="ignore",
# Allow instantiating this class by field name (instead of forcing alias).
populate_by_name=True,
# Custom config key: whether to nest the whole datastructure (as json)
# under a field or spread it out at the toplevel.
_NEST_UNDER=None, # type: ignore
arbitrary_types_allowed=True,
)
"""Pydantic config."""
@classmethod
def load(cls, databag: MutableMapping):
"""Load this model from a Juju databag."""
nest_under = cls.model_config.get("_NEST_UNDER") # type: ignore
if nest_under:
return cls.model_validate(json.loads(databag[nest_under])) # type: ignore
try:
data = {
k: json.loads(v)
for k, v in databag.items()
# Don't attempt to parse model-external values
if k in {(f.alias or n) for n, f in cls.__fields__.items()}
}
except json.JSONDecodeError as e:
msg = f"invalid databag contents: expecting json. {databag}"
logger.error(msg)
raise DataValidationError(msg) from e
try:
return cls.model_validate_json(json.dumps(data)) # type: ignore
except pydantic.ValidationError as e:
msg = f"failed to validate databag: {databag}"
logger.debug(msg, exc_info=True)
raise DataValidationError(msg) from e
def dump(self, databag: Optional[MutableMapping] = None, clear: bool = True):
"""Write the contents of this model to Juju databag.
:param databag: the databag to write the data to.
:param clear: ensure the databag is cleared before writing it.
"""
if clear and databag:
databag.clear()
if databag is None:
databag = {}
nest_under = self.model_config.get("_NEST_UNDER")
if nest_under:
databag[nest_under] = self.model_dump_json( # type: ignore
by_alias=True,
# skip keys whose values are default
exclude_defaults=True,
)
return databag
dct = self.model_dump() # type: ignore
for key, field in self.model_fields.items(): # type: ignore
value = dct[key]
if value == field.default:
continue
databag[field.alias or key] = json.dumps(value)
return databag
class CosAgentProviderUnitData(DatabagModel):
"""Unit databag model for `cos-agent` relation."""
# The following entries are the same for all units of the same principal.
@ -267,13 +486,16 @@ class CosAgentProviderUnitData(pydantic.BaseModel):
metrics_scrape_jobs: List[Dict]
log_slots: List[str]
# Requested tracing protocols.
tracing_protocols: Optional[List[str]] = None
# when this whole datastructure is dumped into a databag, it will be nested under this key.
# while not strictly necessary (we could have it 'flattened out' into the databag),
# this simplifies working with the model.
KEY: ClassVar[str] = "config"
class CosAgentPeersUnitData(pydantic.BaseModel):
class CosAgentPeersUnitData(DatabagModel):
"""Unit databag model for `peers` cos-agent machine charm peer relation."""
# We need the principal unit name and relation metadata to be able to render identifiers
@ -304,6 +526,83 @@ class CosAgentPeersUnitData(pydantic.BaseModel):
return self.unit_name.split("/")[0]
if int(pydantic.version.VERSION.split(".")[0]) < 2: # type: ignore
class ProtocolType(pydantic.BaseModel): # type: ignore
"""Protocol Type."""
class Config:
"""Pydantic config."""
use_enum_values = True
"""Allow serializing enum values."""
name: str = pydantic.Field(
...,
description="Receiver protocol name. What protocols are supported (and what they are called) "
"may differ per provider.",
examples=["otlp_grpc", "otlp_http", "tempo_http"],
)
type: TransportProtocolType = pydantic.Field(
...,
description="The transport protocol used by this receiver.",
examples=["http", "grpc"],
)
else:
class ProtocolType(pydantic.BaseModel):
"""Protocol Type."""
model_config = pydantic.ConfigDict(
# Allow serializing enum values.
use_enum_values=True
)
"""Pydantic config."""
name: str = pydantic.Field(
...,
description="Receiver protocol name. What protocols are supported (and what they are called) "
"may differ per provider.",
examples=["otlp_grpc", "otlp_http", "tempo_http"],
)
type: TransportProtocolType = pydantic.Field(
...,
description="The transport protocol used by this receiver.",
examples=["http", "grpc"],
)
class Receiver(pydantic.BaseModel):
"""Specification of an active receiver."""
protocol: ProtocolType = pydantic.Field(..., description="Receiver protocol name and type.")
url: str = pydantic.Field(
...,
description="""URL at which the receiver is reachable. If there's an ingress, it would be the external URL.
Otherwise, it would be the service's fqdn or internal IP.
If the protocol type is grpc, the url will not contain a scheme.""",
examples=[
"http://traefik_address:2331",
"https://traefik_address:2331",
"http://tempo_public_ip:2331",
"https://tempo_public_ip:2331",
"tempo_public_ip:2331",
],
)
class CosAgentRequirerUnitData(DatabagModel): # noqa: D101
"""Application databag model for the COS-agent requirer."""
receivers: List[Receiver] = pydantic.Field(
...,
description="List of all receivers enabled on the tracing provider.",
)
class COSAgentProvider(Object):
"""Integration endpoint wrapper for the provider side of the cos_agent interface."""
@ -318,6 +617,7 @@ class COSAgentProvider(Object):
log_slots: Optional[List[str]] = None,
dashboard_dirs: Optional[List[str]] = None,
refresh_events: Optional[List] = None,
tracing_protocols: Optional[List[str]] = None,
*,
scrape_configs: Optional[Union[List[dict], Callable]] = None,
):
@ -336,6 +636,7 @@ class COSAgentProvider(Object):
in the form ["snap-name:slot", ...].
dashboard_dirs: Directory where the dashboards are stored.
refresh_events: List of events on which to refresh relation data.
tracing_protocols: List of protocols that the charm will be using for sending traces.
scrape_configs: List of standard scrape_configs dicts or a callable
that returns the list in case the configs need to be generated dynamically.
The contents of this list will be merged with the contents of `metrics_endpoints`.
@ -353,6 +654,8 @@ class COSAgentProvider(Object):
self._log_slots = log_slots or []
self._dashboard_dirs = dashboard_dirs
self._refresh_events = refresh_events or [self._charm.on.config_changed]
self._tracing_protocols = tracing_protocols
self._is_single_endpoint = charm.meta.relations[relation_name].limit == 1
events = self._charm.on[relation_name]
self.framework.observe(events.relation_joined, self._on_refresh)
@ -377,6 +680,7 @@ class COSAgentProvider(Object):
dashboards=self._dashboards,
metrics_scrape_jobs=self._scrape_jobs,
log_slots=self._log_slots,
tracing_protocols=self._tracing_protocols,
)
relation.data[self._charm.unit][data.KEY] = data.json()
except (
@ -441,6 +745,103 @@ class COSAgentProvider(Object):
dashboards.append(dashboard)
return dashboards
@property
def relations(self) -> List[Relation]:
"""The tracing relations associated with this endpoint."""
return self._charm.model.relations[self._relation_name]
@property
def _relation(self) -> Optional[Relation]:
"""If this wraps a single endpoint, the relation bound to it, if any."""
if not self._is_single_endpoint:
objname = type(self).__name__
raise AmbiguousRelationUsageError(
f"This {objname} wraps a {self._relation_name} endpoint that has "
"limit != 1. We can't determine what relation, of the possibly many, you are "
f"referring to. Please pass a relation instance while calling {objname}, "
"or set limit=1 in the charm metadata."
)
relations = self.relations
return relations[0] if relations else None
def is_ready(self, relation: Optional[Relation] = None):
"""Is this endpoint ready?"""
relation = relation or self._relation
if not relation:
logger.debug(f"no relation on {self._relation_name !r}: tracing not ready")
return False
if relation.data is None:
logger.error(f"relation data is None for {relation}")
return False
if not relation.app:
logger.error(f"{relation} event received but there is no relation.app")
return False
try:
unit = next(iter(relation.units), None)
if not unit:
return False
databag = dict(relation.data[unit])
CosAgentRequirerUnitData.load(databag)
except (json.JSONDecodeError, pydantic.ValidationError, DataValidationError):
logger.info(f"failed validating relation data for {relation}")
return False
return True
def get_all_endpoints(
self, relation: Optional[Relation] = None
) -> Optional[CosAgentRequirerUnitData]:
"""Unmarshalled relation data."""
relation = relation or self._relation
if not relation or not self.is_ready(relation):
return None
unit = next(iter(relation.units), None)
if not unit:
return None
return CosAgentRequirerUnitData.load(relation.data[unit]) # type: ignore
def _get_tracing_endpoint(
self, relation: Optional[Relation], protocol: ReceiverProtocol
) -> Optional[str]:
unit_data = self.get_all_endpoints(relation)
if not unit_data:
return None
receivers: List[Receiver] = [i for i in unit_data.receivers if i.protocol.name == protocol]
if not receivers:
logger.error(f"no receiver found with protocol={protocol!r}")
return None
if len(receivers) > 1:
logger.error(
f"too many receivers with protocol={protocol!r}; using first one. Found: {receivers}"
)
return None
receiver = receivers[0]
return receiver.url
def get_tracing_endpoint(
self, protocol: ReceiverProtocol, relation: Optional[Relation] = None
) -> Optional[str]:
"""Receiver endpoint for the given protocol."""
endpoint = self._get_tracing_endpoint(relation or self._relation, protocol=protocol)
if not endpoint:
requested_protocols = set()
relations = [relation] if relation else self.relations
for relation in relations:
try:
databag = CosAgentProviderUnitData.load(relation.data[self._charm.unit])
except DataValidationError:
continue
if databag.tracing_protocols:
requested_protocols.update(databag.tracing_protocols)
if protocol not in requested_protocols:
raise ProtocolNotRequestedError(protocol, relation)
return None
return endpoint
class COSAgentDataChanged(EventBase):
"""Event emitted by `COSAgentRequirer` when relation data changes."""
@ -554,6 +955,12 @@ class COSAgentRequirer(Object):
if not (provider_data := self._validated_provider_data(raw)):
return
# write enabled receivers to cos-agent relation
try:
self.update_tracing_receivers()
except ModelError:
raise
# Copy data from the cos_agent relation to the peer relation, so the leader could
# follow up.
# Save the originating unit name, so it could be used for topology later on by the leader.
@ -574,6 +981,37 @@ class COSAgentRequirer(Object):
# need to emit `on.data_changed`), so we're emitting `on.data_changed` either way.
self.on.data_changed.emit() # pyright: ignore
def update_tracing_receivers(self):
"""Updates the list of exposed tracing receivers in all relations."""
try:
for relation in self._charm.model.relations[self._relation_name]:
CosAgentRequirerUnitData(
receivers=[
Receiver(
url=f"{self._get_tracing_receiver_url(protocol)}",
protocol=ProtocolType(
name=protocol,
type=receiver_protocol_to_transport_protocol[protocol],
),
)
for protocol in self.requested_tracing_protocols()
],
).dump(relation.data[self._charm.unit])
except ModelError as e:
# args are bytes
msg = e.args[0]
if isinstance(msg, bytes):
if msg.startswith(
b"ERROR cannot read relation application settings: permission denied"
):
logger.error(
f"encountered error {e} while attempting to update_relation_data."
f"The relation must be gone."
)
return
raise
def _validated_provider_data(self, raw) -> Optional[CosAgentProviderUnitData]:
try:
return CosAgentProviderUnitData(**json.loads(raw))
@ -586,6 +1024,55 @@ class COSAgentRequirer(Object):
# FIXME: Figure out what we should do here
self.on.data_changed.emit() # pyright: ignore
def _get_requested_protocols(self, relation: Relation):
# Coherence check
units = relation.units
if len(units) > 1:
# should never happen
raise ValueError(
f"unexpected error: subordinate relation {relation} "
f"should have exactly one unit"
)
unit = next(iter(units), None)
if not unit:
return None
if not (raw := relation.data[unit].get(CosAgentProviderUnitData.KEY)):
return None
if not (provider_data := self._validated_provider_data(raw)):
return None
return provider_data.tracing_protocols
def requested_tracing_protocols(self):
"""All receiver protocols that have been requested by our related apps."""
requested_protocols = set()
for relation in self._charm.model.relations[self._relation_name]:
try:
protocols = self._get_requested_protocols(relation)
except NotReadyError:
continue
if protocols:
requested_protocols.update(protocols)
return requested_protocols
def _get_tracing_receiver_url(self, protocol: str):
scheme = "http"
try:
if self._charm.cert.enabled: # type: ignore
scheme = "https"
# not only Grafana Agent can implement cos_agent. If the charm doesn't have the `cert` attribute
# using our cert_handler, it won't have the `enabled` parameter. In this case, we pass and assume http.
except AttributeError:
pass
# the assumption is that a subordinate charm will always be accessible to its principal charm under its fqdn
if receiver_protocol_to_transport_protocol[protocol] == TransportProtocolType.grpc:
return f"{socket.getfqdn()}:{_tracing_receivers_ports[protocol]}"
return f"{scheme}://{socket.getfqdn()}:{_tracing_receivers_ports[protocol]}"
@property
def _remote_data(self) -> List[Tuple[CosAgentProviderUnitData, JujuTopology]]:
"""Return a list of remote data from each of the related units.
@ -721,8 +1208,18 @@ class COSAgentRequirer(Object):
@property
def snap_log_endpoints(self) -> List[SnapEndpoint]:
"""Fetch logging endpoints exposed by related snaps."""
endpoints = []
endpoints_with_topology = self.snap_log_endpoints_with_topology
for endpoint, _ in endpoints_with_topology:
endpoints.append(endpoint)
return endpoints
@property
def snap_log_endpoints_with_topology(self) -> List[Tuple[SnapEndpoint, JujuTopology]]:
"""Fetch logging endpoints and charm topology for each related snap."""
plugs = []
for data, _ in self._remote_data:
for data, topology in self._remote_data:
targets = data.log_slots
if targets:
for target in targets:
@ -733,15 +1230,16 @@ class COSAgentRequirer(Object):
"endpoints; this should not happen."
)
else:
plugs.append(target)
plugs.append((target, topology))
endpoints = []
for plug in plugs:
for plug, topology in plugs:
if ":" not in plug:
logger.error(f"invalid plug definition received: {plug}. Ignoring...")
else:
endpoint = SnapEndpoint(*plug.split(":"))
endpoints.append(endpoint)
endpoints.append((endpoint, topology))
return endpoints
@property
@ -804,3 +1302,67 @@ class COSAgentRequirer(Object):
)
return dashboards
def charm_tracing_config(
endpoint_requirer: COSAgentProvider, cert_path: Optional[Union[Path, str]]
) -> Tuple[Optional[str], Optional[str]]:
"""Utility function to determine the charm_tracing config you will likely want.
If no endpoint is provided:
disable charm tracing.
If https endpoint is provided but cert_path is not found on disk:
disable charm tracing.
If https endpoint is provided and cert_path is None:
ERROR
Else:
proceed with charm tracing (with or without tls, as appropriate)
Usage:
If you are using charm_tracing >= v1.9:
>>> from lib.charms.tempo_k8s.v1.charm_tracing import trace_charm
>>> from lib.charms.tempo_k8s.v0.cos_agent import charm_tracing_config
>>> @trace_charm(tracing_endpoint="my_endpoint", cert_path="cert_path")
>>> class MyCharm(...):
>>> _cert_path = "/path/to/cert/on/charm/container.crt"
>>> def __init__(self, ...):
>>> self.cos_agent = COSAgentProvider(...)
>>> self.my_endpoint, self.cert_path = charm_tracing_config(
... self.cos_agent, self._cert_path)
If you are using charm_tracing < v1.9:
>>> from lib.charms.tempo_k8s.v1.charm_tracing import trace_charm
>>> from lib.charms.tempo_k8s.v2.tracing import charm_tracing_config
>>> @trace_charm(tracing_endpoint="my_endpoint", cert_path="cert_path")
>>> class MyCharm(...):
>>> _cert_path = "/path/to/cert/on/charm/container.crt"
>>> def __init__(self, ...):
>>> self.cos_agent = COSAgentProvider(...)
>>> self.my_endpoint, self.cert_path = charm_tracing_config(
... self.cos_agent, self._cert_path)
>>> @property
>>> def my_endpoint(self):
>>> return self._my_endpoint
>>> @property
>>> def cert_path(self):
>>> return self._cert_path
"""
if not endpoint_requirer.is_ready():
return None, None
endpoint = endpoint_requirer.get_tracing_endpoint("otlp_http")
if not endpoint:
return None, None
is_https = endpoint.startswith("https://")
if is_https:
if cert_path is None:
raise TracingError("Cannot send traces to an https endpoint without a certificate.")
if not Path(cert_path).exists():
# if endpoint is https BUT we don't have a server_cert yet:
# disable charm tracing until we do to prevent tls errors
return None, None
return endpoint, str(cert_path)
return endpoint, None