[tls-certificates] refactor tls certificate handler

This change refactors tls-certificates relation handler.
List of changes:
- Allow management of multiple tls certificates on the same relation
- Allow easier override of certificate signing request (csr)
- Enable certificate renewal on expiration / revokation
- Upgrade tls-certificates relation from v1 to v3

Change-Id: I4f6ac6a5570635388cc10131b34fbc6b422e1bca
Signed-off-by: Guillaume Boutry <guillaume.boutry@canonical.com>
This commit is contained in:
Guillaume Boutry 2024-06-26 19:17:19 +02:00
parent e31f1eb34c
commit ed4ed712bb
No known key found for this signature in database
GPG Key ID: E95E3326872E55DE
19 changed files with 1393 additions and 616 deletions

View File

@ -1,3 +1,3 @@
{% if certificates -%}
{{ certificates.cert }}
{{ certificates.cert_main }}
{% endif -%}

View File

@ -1,3 +1,3 @@
{% if certificates -%}
{{ certificates.key }}
{{ certificates.key_main }}
{% endif -%}

View File

@ -1,3 +1,3 @@
{% if certificates -%}
{{ certificates.ca_cert }}
{{ certificates.ca_cert_main }}
{% endif -%}

View File

@ -1,3 +1,3 @@
{% if certificates -%}
{{ certificates.ca_cert }}
{{ certificates.ca_cert_main }}
{% endif -%}

View File

@ -1,3 +1,3 @@
{% if certificates -%}
{{ certificates.cert }}
{{ certificates.cert_main }}
{% endif -%}

View File

@ -1,3 +1,3 @@
{% if certificates -%}
{{ certificates.key }}
{{ certificates.key_main }}
{% endif -%}

View File

@ -72,111 +72,68 @@ MTLS_USAGES = {x509.OID_SERVER_AUTH, x509.OID_CLIENT_AUTH}
class MTlsCertificatesHandler(sunbeam_rhandlers.TlsCertificatesHandler):
"""Handler for certificates interface."""
def update_relation_data(self):
"""Update relation outside of relation context."""
relations = self.model.relations[self.relation_name]
if len(relations) != 1:
logger.debug(
f"Unit has wrong number of {self.relation_name!r} relations."
)
return
relation = relations[0]
csr = self._get_csr_from_relation_unit_data()
if not csr:
self._request_certificates()
return
certs = self._get_cert_from_relation_data(csr)
if "cert" not in certs or not self._has_certificate_mtls_extensions(
certs["cert"]
):
logger.info(
"Requesting new certificates, current is missing mTLS extensions."
)
relation.data[self.model.unit][
"certificate_signing_requests"
] = "[]"
self._request_certificates()
def csrs(self) -> dict[str, bytes]:
"""Return a dict of generated csrs for self.key_names().
def _has_certificate_mtls_extensions(self, certificate: str) -> bool:
"""Check current certificate has mTLS extensions."""
cert = x509.load_pem_x509_certificate(certificate.encode())
for extension in cert.extensions:
if extension.oid != x509.OID_EXTENDED_KEY_USAGE:
continue
extension_oids = {ext.dotted_string for ext in extension.value}
mtls_oids = {oid.dotted_string for oid in MTLS_USAGES}
if mtls_oids.issubset(extension_oids):
return True
return False
def _request_certificates(self):
"""Request certificates from remote provider."""
The method calling this method will ensure that all keys have a matching
csr.
"""
# Lazy import to ensure this lib is only required if the charm
# has this relation.
from charms.tls_certificates_interface.v1.tls_certificates import (
from charms.tls_certificates_interface.v3.tls_certificates import (
generate_csr,
)
if self.ready:
logger.debug("Certificate request already complete.")
return
main_key = self._private_keys.get("main")
if not main_key:
return {}
if self.private_key:
logger.debug("Private key found, requesting certificates")
else:
logger.debug("Cannot request certificates, private key not found")
return
csr = generate_csr(
private_key=self.private_key.encode(),
subject=socket.getfqdn(),
sans_dns=self.sans_dns,
sans_ip=self.sans_ips,
additional_critical_extensions=[
x509.KeyUsage(
digital_signature=True,
content_commitment=False,
key_encipherment=True,
data_encipherment=False,
key_agreement=True,
key_cert_sign=False,
crl_sign=False,
encipher_only=False,
decipher_only=False,
),
x509.ExtendedKeyUsage(MTLS_USAGES),
],
)
self.certificates.request_certificate_creation(
certificate_signing_request=csr
)
return {
"main": generate_csr(
private_key=main_key.encode(),
subject=socket.getfqdn(),
sans_dns=self.sans_dns,
sans_ip=self.sans_ips,
additional_critical_extensions=[
x509.KeyUsage(
digital_signature=True,
content_commitment=False,
key_encipherment=True,
data_encipherment=False,
key_agreement=True,
key_cert_sign=False,
crl_sign=False,
encipher_only=False,
decipher_only=False,
),
x509.ExtendedKeyUsage(MTLS_USAGES),
],
)
}
def context(self) -> dict:
"""Certificates context."""
csr_from_unit = self._get_csr_from_relation_unit_data()
if not csr_from_unit:
certs = self.interface.get_assigned_certificates()
if len(certs) != len(self.key_names()):
return {}
# openstack-hypervisor only has a main key
csr = self.store.get_csr("main")
if csr is None:
return {}
certs = self._get_cert_from_relation_data(csr_from_unit)
cert = certs["cert"]
ca_cert = certs["ca"]
ca_with_intermediates = certs["ca"] + "\n" + "\n".join(certs["chain"])
ctxt = {
"key": self.private_key,
"cert": cert,
"ca_cert": ca_cert,
"ca_with_intermediates": ca_with_intermediates,
}
return ctxt
@property
def ready(self) -> bool:
"""Whether handler ready for use."""
try:
return super().ready
except KeyError:
return False
for cert in certs:
if cert.csr == csr:
return {
"key": self._private_keys["main"],
"cert": cert.certificate,
"ca_cert": cert.ca,
"ca_with_intermediates": cert.ca
+ "\n"
+ "\n".join(cert.chain),
}
else:
logger.warning("No certificate found for CSR main")
return {}
class HypervisorOperatorCharm(sunbeam_charm.OSBaseOperatorCharm):

View File

@ -15,7 +15,6 @@
"""Tests for Openstack hypervisor charm."""
import base64
import json
from unittest.mock import (
MagicMock,
)
@ -52,20 +51,9 @@ class TestCharm(test_utils.CharmTestCase):
def initial_setup(self):
"""Setting up relations."""
rel_id = self.harness.add_relation("certificates", "vault")
self.harness.add_relation_unit(rel_id, "vault/0")
self.harness.update_config({"snap-channel": "essex/stable"})
self.harness.begin_with_initial_hooks()
csr = {"certificate_signing_request": test_utils.TEST_CSR}
self.harness.update_relation_data(
rel_id,
self.harness.charm.unit.name,
{
"ingress-address": "10.0.0.34",
"certificate_signing_requests": json.dumps([csr]),
},
)
test_utils.add_certificates_relation_certs(self.harness, rel_id)
test_utils.add_complete_certificates_relation(self.harness)
ovs_rel_id = self.harness.add_relation("ovsdb-cms", "ovn-relay")
self.harness.add_relation_unit(ovs_rel_id, "ovn-relay/0")
self.harness.update_relation_data(

View File

@ -1,2 +1,2 @@
# {{ certificates }}
{{ certificates.cert }}
{{ certificates.cert_main }}

View File

@ -1 +1 @@
{{ certificates.key }}
{{ certificates.key_main }}

View File

@ -1 +1 @@
{{ certificates.ca_cert }}
{{ certificates.ca_cert_main }}

View File

@ -1,2 +1,2 @@
# {{ certificates }}
{{ certificates.cert }}
{{ certificates.cert_main }}

View File

@ -1 +1 @@
{{ certificates.key }}
{{ certificates.key_main }}

View File

@ -1 +1 @@
{{ certificates.ca_cert }}
{{ certificates.ca_cert_main }}

View File

@ -14,7 +14,7 @@ charmcraft fetch-lib charms.operator_libs_linux.v0.sysctl
charmcraft fetch-lib charms.operator_libs_linux.v2.snap
charmcraft fetch-lib charms.prometheus_k8s.v0.prometheus_scrape
charmcraft fetch-lib charms.rabbitmq_k8s.v0.rabbitmq
charmcraft fetch-lib charms.tls_certificates_interface.v1.tls_certificates
charmcraft fetch-lib charms.tls_certificates_interface.v3.tls_certificates
charmcraft fetch-lib charms.traefik_k8s.v2.ingress
charmcraft fetch-lib charms.traefik_route_k8s.v0.traefik_route
charmcraft fetch-lib charms.vault_k8s.v0.vault_kv

View File

@ -431,11 +431,11 @@ class OSBaseOperatorCharm(ops.charm.CharmBase):
if isinstance(event, RabbitMQGoneAwayEvent):
_is_broken = True
case "certificates":
from charms.tls_certificates_interface.v1.tls_certificates import (
CertificateExpiredEvent,
from charms.tls_certificates_interface.v3.tls_certificates import (
AllCertificatesInvalidatedEvent,
)
if isinstance(event, CertificateExpiredEvent):
if isinstance(event, AllCertificatesInvalidatedEvent):
_is_broken = True
case "ovsdb-cms":
from charms.ovn_central_k8s.v0.ovsdb import (

View File

@ -14,11 +14,13 @@
"""Base classes for defining a charm using the Operator framework."""
import abc
import hashlib
import json
import logging
import secrets
import string
import typing
from typing import (
Callable,
Dict,
@ -783,33 +785,118 @@ class CephClientHandler(RelationHandler):
return ctxt
class _StoreEntry(typing.TypedDict, total=False):
"""Type definition for a store entry."""
private_key: str
csr: str
class _Store(abc.ABC):
@abc.abstractmethod
def ready(self) -> bool:
"""Check if store is ready."""
...
@abc.abstractmethod
def get_entries(self) -> dict[str, _StoreEntry]:
"""Get store dict from relation data."""
...
@abc.abstractmethod
def save_entries(self, entries: dict[str, _StoreEntry]):
"""Save store dict to relation data."""
...
def get_entry(self, name: str) -> _StoreEntry | None:
"""Return store entry."""
if not self.ready():
logger.debug("Store not ready, cannot get entry.")
return None
return self.get_entries().get(name)
def save_entry(self, name: str, entry: _StoreEntry):
"""Save store entry."""
if not self.ready():
logger.debug("Store not ready, cannot set entry.")
return
store = self.get_entries()
store[name] = entry
self.save_entries(store)
def get_private_key(self, name: str) -> str | None:
"""Return private key."""
if entry := self.get_entry(name):
return entry.get("private_key")
return None
def get_csr(self, name: str) -> str | None:
"""Return csr."""
if entry := self.get_entry(name):
return entry.get("csr")
return None
def set_private_key(self, name: str, private_key: str):
"""Update private key."""
entry = self.get_entry(name) or {}
entry["private_key"] = private_key
self.save_entry(name, entry)
def set_csr(self, name: str, csr: bytes):
"""Update csr."""
entry = self.get_entry(name) or {}
entry["csr"] = csr.decode()
self.save_entry(name, entry)
def delete_csr(self, name: str):
"""Delete csr."""
entry = self.get_entry(name) or {}
entry.pop("csr", None)
self.save_entry(name, entry)
class TlsCertificatesHandler(RelationHandler):
"""Handler for certificates interface."""
class PeerKeyStore:
"""Store private key sercret id in peer storage relation."""
if typing.TYPE_CHECKING:
from charms.tls_certificates_interface.v3.tls_certificates import (
TLSCertificatesRequiresV3,
)
def __init__(self, relation, unit):
interface: TLSCertificatesRequiresV3
class PeerStore(_Store):
"""Store private key secret id in peer storage relation."""
STORE_KEY: str = "tls-store"
def __init__(
self, relation: ops.Relation, entity: ops.Unit | ops.Application
):
self.relation = relation
self.unit = unit
self.entity = entity
def store_ready(self) -> bool:
def ready(self) -> bool:
"""Check if store is ready."""
return bool(self.relation)
return bool(self.relation) and self.relation.active
def get_private_key(self) -> str:
"""Return private key."""
try:
key = self.relation.data[self.unit].get("private_key")
except AttributeError:
key = None
return key
def get_entries(self) -> dict[str, _StoreEntry]:
"""Get store dict from relation data."""
if not self.ready():
return {}
return json.loads(
self.relation.data[self.entity].get(self.STORE_KEY, "{}")
)
def set_private_key(self, value: str):
"""Update private key."""
self.relation.data[self.unit]["private_key"] = value
def save_entries(self, entries: dict[str, _StoreEntry]):
"""Save store dict to relation data."""
if self.ready():
self.relation.data[self.entity][self.STORE_KEY] = json.dumps(
entries
)
class LocalDBKeyStore:
class LocalDBStore(_Store):
"""Store private key sercret id in local unit db.
This is a fallback for when the peer relation is not
@ -819,56 +906,111 @@ class TlsCertificatesHandler(RelationHandler):
def __init__(self, state_db):
self.state_db = state_db
try:
self.state_db.private_key
self.state_db.tls_store
except AttributeError:
self.state_db.private_key = None
self.state_db.tls_store = "{}"
def store_ready(self) -> bool:
def ready(self) -> bool:
"""Check if store is ready."""
return True
def get_private_key(self) -> str:
"""Return private key."""
return self.state_db.private_key
def get_entries(self) -> dict[str, _StoreEntry]:
"""Get store dict from relation data."""
return json.loads(self.state_db.tls_store)
def set_private_key(self, value: str):
"""Update private key."""
self.state_db.private_key = value
def save_entries(self, entries: dict[str, _StoreEntry]):
"""Save store dict to relation data."""
self.state_db.tls_store = json.dumps(entries)
def __init__(
self,
charm: ops.charm.CharmBase,
charm: ops.CharmBase,
relation_name: str,
callback_f: Callable,
sans_dns: List[str] = None,
sans_ips: List[str] = None,
sans_dns: list[str] | None = None,
sans_ips: list[str] | None = None,
mandatory: bool = False,
) -> None:
"""Run constructor."""
self._private_key = None
self._private_keys: dict[str, str] = {}
self.sans_dns = sans_dns
self.sans_ips = sans_ips
super().__init__(charm, relation_name, callback_f, mandatory)
try:
self.store = self.PeerKeyStore(
self.model.get_relation("peers"), self.charm.model.unit
self.store = self.PeerStore(
self.model.get_relation("peers"), self.get_entity()
)
except KeyError:
self.store = self.LocalDBKeyStore(charm._state)
self.setup_private_key()
if self.app_managed_certificates():
raise RuntimeError(
"Application managed certificates require a peer relation"
)
self.store = self.LocalDBStore(charm._state)
self.setup_private_keys()
def setup_event_handler(self) -> None:
def get_entity(self) -> ops.Unit | ops.Application:
"""Return the entity for the key store.
Defaults to the unit.
"""
return self.charm.model.unit
def i_am_allowed(self) -> bool:
"""Whether this unit is allowed to modify the store."""
i_need_to_be_leader = self.app_managed_certificates()
if i_need_to_be_leader:
return self.charm.unit.is_leader()
return True
def app_managed_certificates(self) -> bool:
"""Whether the application manages its own certificates."""
return isinstance(self.get_entity(), ops.Application)
def key_names(self) -> list[str]:
"""Return the key names managed by this relation.
First key is considered as default key.
"""
return ["main"]
def csrs(self) -> dict[str, bytes]:
"""Return a dict of generated csrs for self.key_names().
The method calling this method will ensure that all keys have a matching
csr.
"""
# Lazy import to ensure this lib is only required if the charm
# has this relation.
from charms.tls_certificates_interface.v3.tls_certificates import (
generate_csr,
)
main_key = self._private_keys.get("main")
if not main_key:
return {}
return {
"main": generate_csr(
private_key=main_key.encode(),
subject=self.get_entity().name.replace("/", "-"),
sans_dns=self.sans_dns,
sans_ip=self.sans_ips,
)
}
def setup_event_handler(self) -> ops.Object:
"""Configure event handlers for tls relation."""
logger.debug("Setting up certificates event handler")
# Lazy import to ensure this lib is only required if the charm
# has this relation.
from charms.tls_certificates_interface.v1.tls_certificates import (
TLSCertificatesRequiresV1,
from charms.tls_certificates_interface.v3.tls_certificates import (
TLSCertificatesRequiresV3,
)
self.certificates = TLSCertificatesRequiresV1(
self.certificates = TLSCertificatesRequiresV3(
self.charm, "certificates"
)
self.framework.observe(
self.charm.on.certificates_relation_joined,
self._on_certificates_relation_joined,
@ -886,26 +1028,25 @@ class TlsCertificatesHandler(RelationHandler):
self._on_certificate_expiring,
)
self.framework.observe(
self.certificates.on.certificate_expired,
self._on_certificate_expired,
self.certificates.on.certificate_invalidated,
self._on_certificate_invalidated,
)
self.framework.observe(
self.certificates.on.all_certificates_invalidated,
self._on_all_certificate_invalidated,
)
return self.certificates
def setup_private_key(self) -> None:
def _setup_private_key(self, key: str):
"""Create and store private key if needed."""
# Lazy import to ensure this lib is only required if the charm
# has this relation.
from charms.tls_certificates_interface.v1.tls_certificates import (
from charms.tls_certificates_interface.v3.tls_certificates import (
generate_private_key,
)
if not self.store.store_ready():
logger.debug("Store not ready, cannot generate key")
return
if self.store.get_private_key():
if private_key_secret_id := self.store.get_private_key(key):
logger.debug("Private key already present")
private_key_secret_id = self.store.get_private_key()
try:
private_key_secret = self.model.get_secret(
id=private_key_secret_id
@ -924,29 +1065,49 @@ class TlsCertificatesHandler(RelationHandler):
private_key_secret = self.model.get_secret(
id=private_key_secret_id
)
self._private_key = (
private_key_secret.get_content(refresh=True)
.get("private-key")
.encode()
self._private_keys[key] = private_key_secret.get_content(
refresh=True
)["private-key"]
return
self._private_keys[key] = generate_private_key().decode()
private_key_secret = self.get_entity().add_secret(
{"private-key": self._private_keys[key]},
label=f"{self.get_entity().name}-{key}-private-key",
)
self.store.set_private_key(
key, typing.cast(str, private_key_secret.id)
)
def setup_private_keys(self) -> None:
"""Create and store private key if needed."""
if not self.i_am_allowed():
logger.debug(
"Unit is not allow to handle private keys, skipping setup"
)
return
self._private_key = generate_private_key()
private_key_secret = self.model.unit.add_secret(
{"private-key": self._private_key.decode()},
label=f"{self.charm.model.unit}-private-key",
)
if not self.store.ready():
logger.debug("Store not ready, cannot generate key")
return
self.store.set_private_key(private_key_secret.id)
keys = self.key_names()
if not keys:
raise RuntimeError("No keys to generate, this is always a bug.")
for key in keys:
self._setup_private_key(key)
@property
def private_key(self):
"""Private key for certificates."""
if self._private_key:
return self._private_key.decode()
else:
# Private key has not been set yet
return None
def private_key(self) -> str | None:
"""Private key for certificates.
Return the first key from key_names.
"""
if private_key := self._private_keys.get(self.key_names()[0]):
return private_key
return None
def update_relation_data(self):
"""Request certificates outside of relation context."""
@ -957,120 +1118,131 @@ class TlsCertificatesHandler(RelationHandler):
"Not updating certificate request data, no relation found"
)
def _on_certificates_relation_joined(
self, event: ops.framework.EventBase
) -> None:
def _on_certificates_relation_joined(self, event: ops.EventBase) -> None:
"""Request certificates in response to relation join event."""
self._request_certificates()
def _request_certificates(self):
def _request_certificates(self, renew=False):
"""Request certificates from remote provider."""
# Lazy import to ensure this lib is only required if the charm
# has this relation.
from charms.tls_certificates_interface.v1.tls_certificates import (
generate_csr,
)
if not self.i_am_allowed():
logger.debug(
"Unit is not allow to handle private keys, skipping setup"
)
return
if self.ready:
logger.debug("Certificate request already complete.")
return
if self.private_key:
logger.debug("Private key found, requesting certificates")
else:
logger.debug("Cannot request certificates, private key not found")
keys = self.key_names()
if set(keys) != set(self._private_keys.keys()):
logger.debug("Not all private keys are setup, skipping request.")
return
csr = generate_csr(
private_key=self.private_key.encode(),
subject=self.charm.model.unit.name.replace("/", "-"),
sans_dns=self.sans_dns,
sans_ip=self.sans_ips,
)
self.certificates.request_certificate_creation(
certificate_signing_request=csr
)
csrs = self.csrs()
def _on_certificates_relation_broken(
self, event: ops.framework.EventBase
) -> None:
if set(keys) != set(csrs.keys()):
raise RuntimeError(
"Mismatch between keys and csrs, this is always a bug."
)
for name, csr in csrs.items():
previous_csr = self.store.get_csr(name)
csr = csr.strip()
if renew and previous_csr:
self.certificates.request_certificate_renewal(
old_certificate_signing_request=previous_csr.encode(),
new_certificate_signing_request=csr,
)
self.store.set_csr(name, csr)
elif previous_csr:
logger.debug(
"CSR already exists for %s, skipping request.", name
)
else:
self.certificates.request_certificate_creation(
certificate_signing_request=csr
)
self.store.set_csr(name, csr)
def _on_certificates_relation_broken(self, event: ops.EventBase) -> None:
if self.mandatory:
self.status.set(BlockedStatus("integration missing"))
def _on_certificate_available(
self, event: ops.framework.EventBase
) -> None:
def _on_certificate_available(self, event: ops.EventBase) -> None:
self.callback_f(event)
def _on_certificate_expiring(self, event: ops.framework.EventBase) -> None:
logger.warning("Certificate getting expired")
def _on_certificate_expiring(self, event: ops.EventBase) -> None:
self.status.set(ActiveStatus("Certificates are getting expired soon"))
logger.warning("Certificate getting expired, requesting new ones.")
self._request_certificates(renew=True)
self.callback_f(event)
def _on_certificate_expired(self, event: ops.framework.EventBase) -> None:
logger.warning("Certificate expired")
self.status.set(BlockedStatus("Certificates expired"))
def _on_certificate_invalidated(self, event: ops.EventBase) -> None:
logger.warning("Certificate invalidated, requesting new ones.")
if (
self.i_am_allowed()
and (relation := self.model.get_relation(self.relation_name))
and relation.active
):
self._request_certificates(renew=True)
self.callback_f(event)
def _get_csr_from_relation_unit_data(self) -> Optional[str]:
certificate_relations = list(self.model.relations[self.relation_name])
if not certificate_relations:
return None
def _on_all_certificate_invalidated(self, event: ops.EventBase) -> None:
logger.warning(
"Certificates invalidated, most likely a relation broken."
)
self.status.set(BlockedStatus("Certificates invalidated"))
if self.i_am_allowed():
for name in self.key_names():
self.store.delete_csr(name)
self.callback_f(event)
# unit_data format:
# {"certificate_signing_requests": "['certificate_signing_request': 'CSRTEXT']"}
unit_data = certificate_relations[0].data[self.charm.model.unit]
csr = json.loads(unit_data.get("certificate_signing_requests", "[]"))
if not csr:
return None
csr = csr[0].get("certificate_signing_request", None)
return csr
def _get_cert_from_relation_data(self, csr: str) -> dict:
certificate_relations = list(self.model.relations[self.relation_name])
if not certificate_relations:
return {}
# app data format:
# {"certificates": "['certificate_signing_request': 'CSR',
# 'certificate': 'CERT', 'ca': 'CA', 'chain': 'CHAIN']"}
certs = certificate_relations[0].data[certificate_relations[0].app]
certs = json.loads(certs.get("certificates", "[]"))
for certificate in certs:
csr_from_app = certificate.get("certificate_signing_request", "")
if csr.strip() == csr_from_app.strip():
return {
"cert": certificate.get("certificate", None),
"ca": certificate.get("ca", None),
"chain": certificate.get("chain", []),
}
return {}
def get_certs(self) -> list:
"""Return certificates."""
# If certificates are managed at the app level
# return all the certificates
if self.app_managed_certificates():
return self.interface.get_provider_certificates()
# If the certificates are managed at the unit level
# return the certificates for the unit
return self.interface.get_assigned_certificates()
@property
def ready(self) -> bool:
"""Whether handler ready for use."""
csr_from_unit = self._get_csr_from_relation_unit_data()
if not csr_from_unit:
return False
certs = self.get_certs()
certs = self._get_cert_from_relation_data(csr_from_unit)
return True if certs else False
if len(certs) != len(self.key_names()):
return False
return True
def context(self) -> dict:
"""Certificates context."""
csr_from_unit = self._get_csr_from_relation_unit_data()
if not csr_from_unit:
certs = self.get_certs()
if len(certs) != len(self.key_names()):
return {}
certs = self._get_cert_from_relation_data(csr_from_unit)
cert = certs["cert"]
ca_cert = certs["ca"] + "\n" + "\n".join(certs["chain"])
ctxt = {
"key": self.private_key,
"cert": cert,
"ca_cert": ca_cert,
}
ctxt = {}
for name, entry in self.store.get_entries().items():
csr = entry.get("csr")
key = self._private_keys.get(name)
if csr is None or key is None:
logger.warning("Tls Store Entry %s is incomplete", name)
continue
for cert in certs:
if cert.csr == csr:
ctxt.update(
{
"key_" + name: key,
"ca_cert_"
+ name: cert.ca
+ "\n"
+ "\n".join(cert.chain),
"cert_" + name: cert.certificate,
}
)
else:
logger.debug("No certificate found for CSR %s", name)
return ctxt

View File

@ -589,6 +589,7 @@ def add_base_certificates_relation(harness: Harness) -> str:
"certificate_signing_requests": json.dumps([csr]),
},
)
harness.charm.certs.store.set_csr("main", TEST_CSR.encode())
return rel_id