Misc refactoring
This commit is contained in:
parent
0407b517d5
commit
f4fe94d0d8
@ -1,5 +1,236 @@
|
||||
options:
|
||||
debug:
|
||||
ceph-osd-replication-count:
|
||||
default: 3
|
||||
type: int
|
||||
description: |
|
||||
This value dictates the number of replicas ceph must make of any
|
||||
object it stores within the cinder rbd pool. Of course, this only
|
||||
applies if using Ceph as a backend store. Note that once the cinder
|
||||
rbd pool has been created, changing this value will not have any
|
||||
effect (although it can be changed in ceph by manually configuring
|
||||
your ceph cluster).
|
||||
ceph-pool-weight:
|
||||
type: int
|
||||
default: 40
|
||||
description: |
|
||||
Defines a relative weighting of the pool as a percentage of the total
|
||||
amount of data in the Ceph cluster. This effectively weights the number
|
||||
of placement groups for the pool created to be appropriately portioned
|
||||
to the amount of data expected. For example, if the ephemeral volumes
|
||||
for the OpenStack compute instances are expected to take up 20% of the
|
||||
overall configuration then this value would be specified as 20. Note -
|
||||
it is important to choose an appropriate value for the pool weight as
|
||||
this directly affects the number of placement groups which will be
|
||||
created for the pool. The number of placement groups for a pool can
|
||||
only be increased, never decreased - so it is important to identify the
|
||||
percent of data that will likely reside in the pool.
|
||||
volume-backend-name:
|
||||
default:
|
||||
type: string
|
||||
description: |
|
||||
Volume backend name for the backend. The default value is the
|
||||
application name in the Juju model, e.g. "cinder-ceph-mybackend"
|
||||
if it's deployed as `juju deploy cinder-ceph cinder-ceph-mybackend`.
|
||||
A common backend name can be set to multiple backends with the
|
||||
same characters so that those can be treated as a single virtual
|
||||
backend associated with a single volume type.
|
||||
backend-availability-zone:
|
||||
default:
|
||||
type: string
|
||||
description: |
|
||||
Availability zone name of this volume backend. If set, it will
|
||||
override the default availability zone. Supported for Pike or
|
||||
newer releases.
|
||||
use-syslog:
|
||||
type: boolean
|
||||
default: False
|
||||
description: Enable debug logging.
|
||||
type: boolean
|
||||
description: |
|
||||
Setting this to True will configure services to log to syslog.
|
||||
restrict-ceph-pools:
|
||||
default: False
|
||||
type: boolean
|
||||
description: |
|
||||
Optionally restrict Ceph key permissions to access pools as required.
|
||||
rbd-pool-name:
|
||||
default:
|
||||
type: string
|
||||
description: |
|
||||
Optionally specify an existing rbd pool that cinder should map to.
|
||||
rbd-flatten-volume-from-snapshot:
|
||||
default:
|
||||
type: boolean
|
||||
default: False
|
||||
description: |
|
||||
Flatten volumes created from snapshots to remove dependency from
|
||||
volume to snapshot. Supported on Queens+
|
||||
rbd-mirroring-mode:
|
||||
type: string
|
||||
default: pool
|
||||
description: |
|
||||
The RBD mirroring mode used for the Ceph pool. This option is only used
|
||||
with 'replicated' pool type, as it's not supported for 'erasure-coded'
|
||||
pool type - valid values: 'pool' and 'image'
|
||||
pool-type:
|
||||
type: string
|
||||
default: replicated
|
||||
description: |
|
||||
Ceph pool type to use for storage - valid values include ‘replicated’
|
||||
and ‘erasure-coded’.
|
||||
ec-profile-name:
|
||||
type: string
|
||||
default:
|
||||
description: |
|
||||
Name for the EC profile to be created for the EC pools. If not defined
|
||||
a profile name will be generated based on the name of the pool used by
|
||||
the application.
|
||||
ec-rbd-metadata-pool:
|
||||
type: string
|
||||
default:
|
||||
description: |
|
||||
Name of the metadata pool to be created (for RBD use-cases). If not
|
||||
defined a metadata pool name will be generated based on the name of
|
||||
the data pool used by the application. The metadata pool is always
|
||||
replicated, not erasure coded.
|
||||
ec-profile-k:
|
||||
type: int
|
||||
default: 1
|
||||
description: |
|
||||
Number of data chunks that will be used for EC data pool. K+M factors
|
||||
should never be greater than the number of available zones (or hosts)
|
||||
for balancing.
|
||||
ec-profile-m:
|
||||
type: int
|
||||
default: 2
|
||||
description: |
|
||||
Number of coding chunks that will be used for EC data pool. K+M factors
|
||||
should never be greater than the number of available zones (or hosts)
|
||||
for balancing.
|
||||
ec-profile-locality:
|
||||
type: int
|
||||
default:
|
||||
description: |
|
||||
(lrc plugin - l) Group the coding and data chunks into sets of size l.
|
||||
For instance, for k=4 and m=2, when l=3 two groups of three are created.
|
||||
Each set can be recovered without reading chunks from another set. Note
|
||||
that using the lrc plugin does incur more raw storage usage than isa or
|
||||
jerasure in order to reduce the cost of recovery operations.
|
||||
ec-profile-crush-locality:
|
||||
type: string
|
||||
default:
|
||||
description: |
|
||||
(lrc plugin) The type of the crush bucket in which each set of chunks
|
||||
defined by l will be stored. For instance, if it is set to rack, each
|
||||
group of l chunks will be placed in a different rack. It is used to
|
||||
create a CRUSH rule step such as step choose rack. If it is not set,
|
||||
no such grouping is done.
|
||||
ec-profile-durability-estimator:
|
||||
type: int
|
||||
default:
|
||||
description: |
|
||||
(shec plugin - c) The number of parity chunks each of which includes
|
||||
each data chunk in its calculation range. The number is used as a
|
||||
durability estimator. For instance, if c=2, 2 OSDs can be down
|
||||
without losing data.
|
||||
ec-profile-helper-chunks:
|
||||
type: int
|
||||
default:
|
||||
description: |
|
||||
(clay plugin - d) Number of OSDs requested to send data during
|
||||
recovery of a single chunk. d needs to be chosen such that
|
||||
k+1 <= d <= k+m-1. Larger the d, the better the savings.
|
||||
ec-profile-scalar-mds:
|
||||
type: string
|
||||
default:
|
||||
description: |
|
||||
(clay plugin) specifies the plugin that is used as a building
|
||||
block in the layered construction. It can be one of jerasure,
|
||||
isa, shec (defaults to jerasure).
|
||||
ec-profile-plugin:
|
||||
type: string
|
||||
default: jerasure
|
||||
description: |
|
||||
EC plugin to use for this applications pool. The following list of
|
||||
plugins acceptable - jerasure, lrc, isa, shec, clay.
|
||||
ec-profile-technique:
|
||||
type: string
|
||||
default:
|
||||
description: |
|
||||
EC profile technique used for this applications pool - will be
|
||||
validated based on the plugin configured via ec-profile-plugin.
|
||||
Supported techniques are ‘reed_sol_van’, ‘reed_sol_r6_op’,
|
||||
‘cauchy_orig’, ‘cauchy_good’, ‘liber8tion’ for jerasure,
|
||||
‘reed_sol_van’, ‘cauchy’ for isa and ‘single’, ‘multiple’
|
||||
for shec.
|
||||
ec-profile-device-class:
|
||||
type: string
|
||||
default:
|
||||
description: |
|
||||
Device class from CRUSH map to use for placement groups for
|
||||
erasure profile - valid values: ssd, hdd or nvme (or leave
|
||||
unset to not use a device class).
|
||||
bluestore-compression-algorithm:
|
||||
type: string
|
||||
default:
|
||||
description: |
|
||||
Compressor to use (if any) for pools requested by this charm.
|
||||
.
|
||||
NOTE: The ceph-osd charm sets a global default for this value (defaults
|
||||
to 'lz4' unless configured by the end user) which will be used unless
|
||||
specified for individual pools.
|
||||
bluestore-compression-mode:
|
||||
type: string
|
||||
default:
|
||||
description: |
|
||||
Policy for using compression on pools requested by this charm.
|
||||
.
|
||||
'none' means never use compression.
|
||||
'passive' means use compression when clients hint that data is
|
||||
compressible.
|
||||
'aggressive' means use compression unless clients hint that
|
||||
data is not compressible.
|
||||
'force' means use compression under all circumstances even if the clients
|
||||
hint that the data is not compressible.
|
||||
bluestore-compression-required-ratio:
|
||||
type: float
|
||||
default:
|
||||
description: |
|
||||
The ratio of the size of the data chunk after compression relative to the
|
||||
original size must be at least this small in order to store the
|
||||
compressed version on pools requested by this charm.
|
||||
bluestore-compression-min-blob-size:
|
||||
type: int
|
||||
default:
|
||||
description: |
|
||||
Chunks smaller than this are never compressed on pools requested by
|
||||
this charm.
|
||||
bluestore-compression-min-blob-size-hdd:
|
||||
type: int
|
||||
default:
|
||||
description: |
|
||||
Value of bluestore compression min blob size for rotational media on
|
||||
pools requested by this charm.
|
||||
bluestore-compression-min-blob-size-ssd:
|
||||
type: int
|
||||
default:
|
||||
description: |
|
||||
Value of bluestore compression min blob size for solid state media on
|
||||
pools requested by this charm.
|
||||
bluestore-compression-max-blob-size:
|
||||
type: int
|
||||
default:
|
||||
description: |
|
||||
Chunks larger than this are broken into smaller blobs sizing bluestore
|
||||
compression max blob size before being compressed on pools requested by
|
||||
this charm.
|
||||
bluestore-compression-max-blob-size-hdd:
|
||||
type: int
|
||||
default:
|
||||
description: |
|
||||
Value of bluestore compression max blob size for rotational media on
|
||||
pools requested by this charm.
|
||||
bluestore-compression-max-blob-size-ssd:
|
||||
type: int
|
||||
default:
|
||||
description: |
|
||||
Value of bluestore compression max blob size for solid state media on
|
||||
pools requested by this charm.
|
||||
|
522
charms/cinder-ceph-k8s/lib/charms/ceph/v0/ceph_client.py
Executable file
522
charms/cinder-ceph-k8s/lib/charms/ceph/v0/ceph_client.py
Executable file
@ -0,0 +1,522 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import logging
|
||||
import json
|
||||
import sys
|
||||
sys.path.append('lib') # noqa
|
||||
|
||||
import charmhelpers.contrib.storage.linux.ceph as ch_ceph
|
||||
import charmhelpers.contrib.network.ip as ch_ip
|
||||
|
||||
from ops.framework import (
|
||||
StoredState,
|
||||
EventBase,
|
||||
ObjectEvents,
|
||||
EventSource,
|
||||
Object
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BrokerAvailableEvent(EventBase):
|
||||
pass
|
||||
|
||||
|
||||
class PoolAvailableEvent(EventBase):
|
||||
pass
|
||||
|
||||
|
||||
class CephClientEvents(ObjectEvents):
|
||||
broker_available = EventSource(BrokerAvailableEvent)
|
||||
pools_available = EventSource(PoolAvailableEvent)
|
||||
|
||||
|
||||
class CephClientRequires(Object):
|
||||
|
||||
on = CephClientEvents()
|
||||
_stored = StoredState()
|
||||
|
||||
def __init__(self, charm, relation_name):
|
||||
super().__init__(charm, relation_name)
|
||||
self.name = relation_name
|
||||
self.this_unit = self.model.unit
|
||||
self.relation_name = relation_name
|
||||
self._stored.set_default(
|
||||
pools_available=False,
|
||||
broker_available=False,
|
||||
broker_req={})
|
||||
self.framework.observe(
|
||||
charm.on[relation_name].relation_joined,
|
||||
self.on_joined)
|
||||
self.framework.observe(
|
||||
charm.on[relation_name].relation_changed,
|
||||
self.on_changed)
|
||||
self.new_request = ch_ceph.CephBrokerRq()
|
||||
self.previous_requests = self.get_previous_requests_from_relations()
|
||||
|
||||
def on_joined(self, event):
|
||||
relation = self.model.relations[self.relation_name]
|
||||
if relation:
|
||||
logging.info("emiting broker_available")
|
||||
self._stored.broker_available = True
|
||||
self.on.broker_available.emit()
|
||||
|
||||
def request_osd_settings(self, settings):
|
||||
for relation in self.model.relations[self.relation_name]:
|
||||
relation.data[self.model.unit]['osd-settings'] = json.dumps(
|
||||
settings,
|
||||
sort_keys=True)
|
||||
|
||||
@property
|
||||
def pools_available(self):
|
||||
return self._stored.pools_available
|
||||
|
||||
@property
|
||||
def broker_available(self):
|
||||
return self._stored.broker_available
|
||||
|
||||
def mon_hosts(self, mon_ips):
|
||||
"""List of all monitor host public addresses"""
|
||||
hosts = []
|
||||
for ceph_addrs in mon_ips:
|
||||
# NOTE(jamespage): This looks odd but deals with
|
||||
# use with ceph-proxy which
|
||||
# presents all monitors in
|
||||
# a single space delimited field.
|
||||
for addr in ceph_addrs.split(' '):
|
||||
hosts.append(ch_ip.format_ipv6_addr(addr) or addr)
|
||||
hosts.sort()
|
||||
return hosts
|
||||
|
||||
def get_relation_data(self):
|
||||
data = {}
|
||||
mon_ips = []
|
||||
for relation in self.model.relations[self.relation_name]:
|
||||
for unit in relation.units:
|
||||
_data = {
|
||||
'key': relation.data[unit].get('key'),
|
||||
'auth': relation.data[unit].get('auth')}
|
||||
mon_ip = relation.data[unit].get('ceph-public-address')
|
||||
if mon_ip:
|
||||
mon_ips.append(mon_ip)
|
||||
if all(_data.values()):
|
||||
data = _data
|
||||
if data:
|
||||
data['mon_hosts'] = self.mon_hosts(mon_ips)
|
||||
return data
|
||||
|
||||
def existing_request_complete(self):
|
||||
rq = self.get_existing_request()
|
||||
if rq and self.is_request_complete(rq,
|
||||
self.model.relations[self.name]):
|
||||
return True
|
||||
return False
|
||||
|
||||
def on_changed(self, event):
|
||||
logging.info("ceph client on_changed")
|
||||
relation_data = self.get_relation_data()
|
||||
if relation_data:
|
||||
if self.existing_request_complete():
|
||||
logging.info("emiting pools available")
|
||||
self._stored.pools_available = True
|
||||
self.on.pools_available.emit()
|
||||
else:
|
||||
logging.info("incomplete request. broker_req not found")
|
||||
|
||||
def get_broker_rsp_key(self):
|
||||
return 'broker-rsp-{}'.format(self.this_unit.name.replace('/', '-'))
|
||||
|
||||
def get_existing_request(self):
|
||||
logging.info("get_existing_request")
|
||||
# json.dumps of the CephBrokerRq()
|
||||
rq = ch_ceph.CephBrokerRq()
|
||||
|
||||
if self._stored.broker_req:
|
||||
try:
|
||||
j = json.loads(self._stored.broker_req)
|
||||
logging.info(
|
||||
"Json request: {}".format(self._stored.broker_req))
|
||||
rq.set_ops(j['ops'])
|
||||
except ValueError as err:
|
||||
logging.info(
|
||||
"Unable to decode broker_req: %s. Error %s",
|
||||
self._stored.broker_req,
|
||||
err)
|
||||
return rq
|
||||
|
||||
def _handle_broker_request(self, request_method, **kwargs):
|
||||
"""Handle a broker request
|
||||
|
||||
Add a ceph broker request using `request_method` and the provided
|
||||
`kwargs`.
|
||||
|
||||
:param request_method: ch_ceph.CephBrokerRq method name to use for
|
||||
request.
|
||||
:type request_method,: str
|
||||
"""
|
||||
relations = self.model.relations[self.name]
|
||||
logging.info("%s: %s", request_method, relations)
|
||||
if not relations:
|
||||
return
|
||||
rq = self.new_request
|
||||
logging.info("Adding %s request", request_method)
|
||||
getattr(rq, request_method)(**kwargs)
|
||||
logging.info("Storing request")
|
||||
self._stored.broker_req = rq.request
|
||||
logging.info("Calling send_request_if_needed")
|
||||
self.send_request_if_needed(rq, relations)
|
||||
|
||||
def _handle_pool_create_broker_request(self, request_method, **kwargs):
|
||||
"""Process request to create a pool.
|
||||
|
||||
:param request_method: ch_ceph.CephBrokerRq method name to use for
|
||||
request.
|
||||
:type request_method: str
|
||||
:param app_name: Tag pool with application name. Note that there is
|
||||
certain protocols emerging upstream with regard to
|
||||
meaningful application names to use.
|
||||
Examples are 'rbd' and 'rgw'.
|
||||
:type app_name: Optional[str]
|
||||
:param compression_algorithm: Compressor to use, one of:
|
||||
('lz4', 'snappy', 'zlib', 'zstd')
|
||||
:type compression_algorithm: Optional[str]
|
||||
:param compression_mode: When to compress data, one of:
|
||||
('none', 'passive', 'aggressive', 'force')
|
||||
:type compression_mode: Optional[str]
|
||||
:param compression_required_ratio: Minimum compression ratio for data
|
||||
chunk, if the requested ratio is not
|
||||
achieved the compressed version will
|
||||
be thrown away and the original
|
||||
stored.
|
||||
:type compression_required_ratio: Optional[float]
|
||||
:param compression_min_blob_size: Chunks smaller than this are never
|
||||
compressed (unit: bytes).
|
||||
:type compression_min_blob_size: Optional[int]
|
||||
:param compression_min_blob_size_hdd: Chunks smaller than this are not
|
||||
compressed when destined to
|
||||
rotational media (unit: bytes).
|
||||
:type compression_min_blob_size_hdd: Optional[int]
|
||||
:param compression_min_blob_size_ssd: Chunks smaller than this are not
|
||||
compressed when destined to flash
|
||||
media (unit: bytes).
|
||||
:type compression_min_blob_size_ssd: Optional[int]
|
||||
:param compression_max_blob_size: Chunks larger than this are broken
|
||||
into N * compression_max_blob_size
|
||||
chunks before being compressed
|
||||
(unit: bytes).
|
||||
:type compression_max_blob_size: Optional[int]
|
||||
:param compression_max_blob_size_hdd: Chunks larger than this are
|
||||
broken into
|
||||
N * compression_max_blob_size_hdd
|
||||
chunks before being compressed
|
||||
when destined for rotational
|
||||
media (unit: bytes)
|
||||
:type compression_max_blob_size_hdd: Optional[int]
|
||||
:param compression_max_blob_size_ssd: Chunks larger than this are
|
||||
broken into
|
||||
N * compression_max_blob_size_ssd
|
||||
chunks before being compressed
|
||||
when destined for flash media
|
||||
(unit: bytes).
|
||||
:type compression_max_blob_size_ssd: Optional[int]
|
||||
:param group: Group to add pool to
|
||||
:type group: Optional[str]
|
||||
:param max_bytes: Maximum bytes quota to apply
|
||||
:type max_bytes: Optional[int]
|
||||
:param max_objects: Maximum objects quota to apply
|
||||
:type max_objects: Optional[int]
|
||||
:param namespace: Group namespace
|
||||
:type namespace: Optional[str]
|
||||
:param weight: The percentage of data that is expected to be contained
|
||||
in the pool from the total available space on the OSDs.
|
||||
Used to calculate number of Placement Groups to create
|
||||
for pool.
|
||||
:type weight: Optional[float]
|
||||
:raises: AssertionError
|
||||
"""
|
||||
self._handle_broker_request(
|
||||
request_method,
|
||||
**kwargs)
|
||||
|
||||
def create_replicated_pool(self, name, replicas=3, pg_num=None,
|
||||
**kwargs):
|
||||
"""Adds an operation to create a replicated pool.
|
||||
|
||||
See docstring of `_handle_pool_create_broker_request` for additional
|
||||
common pool creation arguments.
|
||||
|
||||
:param name: Name of pool to create
|
||||
:type name: str
|
||||
:param replicas: Number of copies Ceph should keep of your data.
|
||||
:type replicas: int
|
||||
:param pg_num: Request specific number of Placement Groups to create
|
||||
for pool.
|
||||
:type pg_num: int
|
||||
:raises: AssertionError if provided data is of invalid type/range
|
||||
"""
|
||||
self._handle_pool_create_broker_request(
|
||||
'add_op_create_replicated_pool',
|
||||
name=name,
|
||||
replica_count=replicas,
|
||||
pg_num=pg_num,
|
||||
**kwargs)
|
||||
|
||||
def create_erasure_pool(self, name, erasure_profile=None,
|
||||
allow_ec_overwrites=False, **kwargs):
|
||||
"""Adds an operation to create a erasure coded pool.
|
||||
|
||||
See docstring of `_handle_pool_create_broker_request` for additional
|
||||
common pool creation arguments.
|
||||
|
||||
:param name: Name of pool to create
|
||||
:type name: str
|
||||
:param erasure_profile: Name of erasure code profile to use. If not
|
||||
set the ceph-mon unit handling the broker
|
||||
request will set its default value.
|
||||
:type erasure_profile: str
|
||||
:param allow_ec_overwrites: allow EC pools to be overriden
|
||||
:type allow_ec_overwrites: bool
|
||||
:raises: AssertionError if provided data is of invalid type/range
|
||||
"""
|
||||
self._handle_pool_create_broker_request(
|
||||
'add_op_create_erasure_pool',
|
||||
name=name,
|
||||
erasure_profile=erasure_profile,
|
||||
allow_ec_overwrites=allow_ec_overwrites,
|
||||
**kwargs)
|
||||
|
||||
def create_erasure_profile(self, name,
|
||||
erasure_type='jerasure',
|
||||
erasure_technique=None,
|
||||
k=None, m=None,
|
||||
failure_domain=None,
|
||||
lrc_locality=None,
|
||||
shec_durability_estimator=None,
|
||||
clay_helper_chunks=None,
|
||||
device_class=None,
|
||||
clay_scalar_mds=None,
|
||||
lrc_crush_locality=None):
|
||||
"""Adds an operation to create a erasure coding profile.
|
||||
|
||||
:param name: Name of profile to create
|
||||
:type name: str
|
||||
:param erasure_type: Which of the erasure coding plugins should be used
|
||||
:type erasure_type: string
|
||||
:param erasure_technique: EC plugin technique to use
|
||||
:type erasure_technique: string
|
||||
:param k: Number of data chunks
|
||||
:type k: int
|
||||
:param m: Number of coding chunks
|
||||
:type m: int
|
||||
:param lrc_locality: Group the coding and data chunks into sets of size
|
||||
locality (lrc plugin)
|
||||
:type lrc_locality: int
|
||||
:param durability_estimator: The number of parity chuncks each of which
|
||||
includes a data chunk in its calculation
|
||||
range (shec plugin)
|
||||
:type durability_estimator: int
|
||||
:param helper_chunks: The number of helper chunks to use for recovery
|
||||
operations (clay plugin)
|
||||
:type: helper_chunks: int
|
||||
:param failure_domain: Type of failure domain from Ceph bucket types
|
||||
to be used
|
||||
:type failure_domain: string
|
||||
:param device_class: Device class to use for profile (ssd, hdd)
|
||||
:type device_class: string
|
||||
:param clay_scalar_mds: Plugin to use for CLAY layered construction
|
||||
(jerasure|isa|shec)
|
||||
:type clay_scaler_mds: string
|
||||
:param lrc_crush_locality: Type of crush bucket in which set of chunks
|
||||
defined by lrc_locality will be stored.
|
||||
:type lrc_crush_locality: string
|
||||
"""
|
||||
self._handle_broker_request(
|
||||
'add_op_create_erasure_profile',
|
||||
name=name,
|
||||
erasure_type=erasure_type,
|
||||
erasure_technique=erasure_technique,
|
||||
k=k, m=m,
|
||||
failure_domain=failure_domain,
|
||||
lrc_locality=lrc_locality,
|
||||
shec_durability_estimator=shec_durability_estimator,
|
||||
clay_helper_chunks=clay_helper_chunks,
|
||||
device_class=device_class,
|
||||
clay_scalar_mds=clay_scalar_mds,
|
||||
lrc_crush_locality=lrc_crush_locality
|
||||
)
|
||||
|
||||
def request_ceph_permissions(self, client_name, permissions):
|
||||
logging.info("request_ceph_permissions")
|
||||
relations = self.model.relations[self.name]
|
||||
if not relations:
|
||||
return
|
||||
rq = self.new_request
|
||||
rq.add_op({'op': 'set-key-permissions',
|
||||
'permissions': permissions,
|
||||
'client': client_name})
|
||||
self._stored.broker_req = rq.request
|
||||
# ch_ceph.send_request_if_needed(rq, relation=self.name)
|
||||
self.send_request_if_needed(rq, relations)
|
||||
|
||||
def request_access_to_group(self,
|
||||
name,
|
||||
object_prefix_permissions,
|
||||
permission):
|
||||
"""Request access to a specific group of pools
|
||||
|
||||
:param name: the group name to request access for
|
||||
:type name: string
|
||||
:param object_prefix_permissions: any hierarchy permissions neded
|
||||
:type object_prefix_permissions: dict
|
||||
:param permission: permissions for the specificed group of pools
|
||||
:type permission: string
|
||||
"""
|
||||
logging.info("request_access_to_group")
|
||||
self._handle_broker_request(
|
||||
'add_op_request_access_to_group',
|
||||
name, object_prefix_permissions, permission
|
||||
)
|
||||
|
||||
def get_previous_requests_from_relations(self):
|
||||
"""Get the previous requests.
|
||||
|
||||
:returns: The previous ceph requests.
|
||||
:rtype: Dict[str, ch_ceph.CephBrokerRq]
|
||||
"""
|
||||
requests = {}
|
||||
for relation in self.model.relations[self.relation_name]:
|
||||
broker_req = relation.data[self.this_unit].get('broker_req')
|
||||
rid = "{}:{}".format(relation.name, relation.id)
|
||||
if broker_req:
|
||||
request_data = json.loads(broker_req)
|
||||
request = ch_ceph.CephBrokerRq(
|
||||
api_version=request_data['api-version'],
|
||||
request_id=request_data['request-id'])
|
||||
request.set_ops(request_data['ops'])
|
||||
requests[rid] = request
|
||||
return requests
|
||||
|
||||
def get_request_states(self, request, relations):
|
||||
"""Get the existing requests and their states.
|
||||
|
||||
:param request: A CephBrokerRq object
|
||||
:type request: ch_ceph.CephBrokerRq
|
||||
:param relations: List of relations to check for existing request.
|
||||
:type relations: [ops.model.Relation, ...]
|
||||
:returns: Whether request is complete.
|
||||
:rtype: bool
|
||||
"""
|
||||
complete = []
|
||||
requests = {}
|
||||
for relation in relations:
|
||||
rid = "{}:{}".format(relation.name, relation.id)
|
||||
complete = False
|
||||
previous_request = self.previous_requests.get(
|
||||
rid,
|
||||
ch_ceph.CephBrokerRq())
|
||||
if request == previous_request:
|
||||
sent = True
|
||||
complete = self.is_request_complete_for_relation(
|
||||
previous_request,
|
||||
relation)
|
||||
else:
|
||||
sent = False
|
||||
complete = False
|
||||
|
||||
requests[rid] = {
|
||||
'sent': sent,
|
||||
'complete': complete,
|
||||
}
|
||||
|
||||
return requests
|
||||
|
||||
def is_request_complete_for_relation(self, request, relation):
|
||||
"""Check if a given request has been completed on the given relation
|
||||
|
||||
:param request: A CephBrokerRq object
|
||||
:type request: ch_ceph.CephBrokerRq
|
||||
:param relation: A relation to check for an existing request.
|
||||
:type relation: ops.model.Relation
|
||||
:returns: Whether request is complete.
|
||||
:rtype: bool
|
||||
"""
|
||||
broker_key = self.get_broker_rsp_key()
|
||||
for unit in relation.units:
|
||||
if relation.data[unit].get(broker_key):
|
||||
rsp = ch_ceph.CephBrokerRsp(relation.data[unit][broker_key])
|
||||
if rsp.request_id == request.request_id:
|
||||
if not rsp.exit_code:
|
||||
return True
|
||||
else:
|
||||
if relation.data[unit].get('broker_rsp'):
|
||||
logging.info('No response for this unit yet')
|
||||
return False
|
||||
|
||||
def is_request_complete(self, request, relations):
|
||||
"""Check a functionally equivalent request has already been completed
|
||||
|
||||
Returns True if a similair request has been completed
|
||||
|
||||
:param request: A CephBrokerRq object
|
||||
:type request: ch_ceph.CephBrokerRq
|
||||
:param relations: List of relations to check for existing request.
|
||||
:type relations: [ops.model.Relation, ...]
|
||||
:returns: Whether request is complete.
|
||||
:rtype: bool
|
||||
"""
|
||||
states = self.get_request_states(request, relations)
|
||||
for rid in states.keys():
|
||||
if not states[rid]['complete']:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def is_request_sent(self, request, relations):
|
||||
"""Check if a functionally equivalent request has already been sent
|
||||
|
||||
Returns True if a similair request has been sent
|
||||
|
||||
:param request: A CephBrokerRq object
|
||||
:type request: ch_ceph.CephBrokerRq
|
||||
:param relations: List of relations to check for existing request.
|
||||
:type relations: [ops.model.Relation, ...]
|
||||
:returns: Whether equivalent request has been sent.
|
||||
:rtype: bool
|
||||
"""
|
||||
states = self.get_request_states(request, relations)
|
||||
for rid in states.keys():
|
||||
if not states[rid]['sent']:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def send_request_if_needed(self, request, relations):
|
||||
"""Send request if an equivalent request has not already been sent
|
||||
|
||||
:param request: A CephBrokerRq object
|
||||
:type request: ch_ceph.CephBrokerRq
|
||||
:param relations: List of relations to check for existing request.
|
||||
:type relations: [ops.model.Relation, ...]
|
||||
"""
|
||||
states = self.get_request_states(request, relations)
|
||||
for relation in relations:
|
||||
rid = "{}:{}".format(relation.name, relation.id)
|
||||
if states[rid]['sent']:
|
||||
logging.debug(
|
||||
('Request %s is a duplicate of the previous broker request'
|
||||
' %s. Restoring previous broker request'),
|
||||
request.request_id,
|
||||
self.previous_requests[rid].request_id)
|
||||
# The previous request was stored at the beggining. The ops of
|
||||
# the new request match that of the old. But as the new request
|
||||
# was constructed broker data may have been set on the relation
|
||||
# during the construction of this request. This is because the
|
||||
# interface has no explicit commit method. Every op request has
|
||||
# in implicit send which updates the relation data. So make
|
||||
# sure # the relation data matches the data at the beggining so
|
||||
# that a new request is not triggered.
|
||||
request = self.previous_requests[rid]
|
||||
else:
|
||||
logging.debug('Sending request %s', request.request_id)
|
||||
relation.data[self.this_unit]['broker_req'] = request.request
|
@ -1,10 +1,10 @@
|
||||
"""RabbitMQAMQPProvides and Requires module.
|
||||
"""AMQPProvides and Requires module.
|
||||
|
||||
|
||||
This library contains the Requires and Provides classes for handling
|
||||
the amqp interface.
|
||||
|
||||
Import `RabbitMQAMQPRequires` in your charm, with the charm object and the
|
||||
Import `AMQPRequires` in your charm, with the charm object and the
|
||||
relation name:
|
||||
- self
|
||||
- "amqp"
|
||||
@ -14,43 +14,56 @@ Also provide two additional parameters to the charm object:
|
||||
- vhost
|
||||
|
||||
Two events are also available to respond to:
|
||||
- has_amqp_servers
|
||||
- ready_amqp_servers
|
||||
- connected
|
||||
- ready
|
||||
- goneaway
|
||||
|
||||
A basic example showing the usage of this relation follows:
|
||||
|
||||
```
|
||||
from charms.sunbeam_rabbitmq_operator.v0.amqp import RabbitMQAMQPRequires
|
||||
from charms.sunbeam_rabbitmq_operator.v0.amqp import AMQPRequires
|
||||
|
||||
class AMQPClientCharm(CharmBase):
|
||||
def __init__(self, *args):
|
||||
super().__init__(*args)
|
||||
# AMQP Requires
|
||||
self.amqp_requires = RabbitMQAMQPRequires(
|
||||
self.amqp = AMQPRequires(
|
||||
self, "amqp",
|
||||
username = "amqp-client",
|
||||
vhost = "amqp-client-vhost"
|
||||
username="myusername",
|
||||
vhost="vhostname"
|
||||
)
|
||||
self.framework.observe(
|
||||
self.amqp_requires.on.has_amqp_servers, self._on_has_amqp_servers)
|
||||
self.amqp.on.connected, self._on_amqp_connected)
|
||||
self.framework.observe(
|
||||
self.amqp_requires.on.ready_amqp_servers, self._on_ready_amqp_servers)
|
||||
self.amqp.on.ready, self._on_amqp_ready)
|
||||
self.framework.observe(
|
||||
self.amqp.on.goneaway, self._on_amqp_goneaway)
|
||||
|
||||
def _on_has_amqp_servers(self, event):
|
||||
'''React to the AMQP relation joined.
|
||||
def _on_amqp_connected(self, event):
|
||||
'''React to the AMQP connected event.
|
||||
|
||||
The AMQP interface will use the provided username and vhost to commuicate
|
||||
with the.
|
||||
This event happens when n AMQP relation is added to the
|
||||
model before credentials etc have been provided.
|
||||
'''
|
||||
# Do something before the relation is complete
|
||||
pass
|
||||
|
||||
def _on_ready_amqp_servers(self, event):
|
||||
'''React to the AMQP relation joined.
|
||||
def _on_amqp_ready(self, event):
|
||||
'''React to the AMQP ready event.
|
||||
|
||||
The AMQP interface will use the provided username and vhost for the
|
||||
request to the rabbitmq server.
|
||||
'''
|
||||
# AMQP Relation is ready. Do something with the completed relation.
|
||||
pass
|
||||
|
||||
def _on_amqp_goneaway(self, event):
|
||||
'''React to the AMQP goneaway event.
|
||||
|
||||
This event happens when an AMQP relation is removed.
|
||||
'''
|
||||
# AMQP Relation has goneaway. shutdown services or suchlike
|
||||
pass
|
||||
```
|
||||
"""
|
||||
|
||||
@ -82,31 +95,37 @@ from typing import List
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HasAMQPServersEvent(EventBase):
|
||||
"""Has AMQPServers Event."""
|
||||
class AMQPConnectedEvent(EventBase):
|
||||
"""AMQP connected Event."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class ReadyAMQPServersEvent(EventBase):
|
||||
"""Ready AMQPServers Event."""
|
||||
class AMQPReadyEvent(EventBase):
|
||||
"""AMQP ready for use Event."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class RabbitMQAMQPServerEvents(ObjectEvents):
|
||||
class AMQPGoneAwayEvent(EventBase):
|
||||
"""AMQP relation has gone-away Event"""
|
||||
|
||||
pass
|
||||
|
||||
class AMQPServerEvents(ObjectEvents):
|
||||
"""Events class for `on`"""
|
||||
|
||||
has_amqp_servers = EventSource(HasAMQPServersEvent)
|
||||
ready_amqp_servers = EventSource(ReadyAMQPServersEvent)
|
||||
connected = EventSource(AMQPConnectedEvent)
|
||||
ready = EventSource(AMQPReadyEvent)
|
||||
goneaway = EventSource(AMQPGoneAwayEvent)
|
||||
|
||||
|
||||
class RabbitMQAMQPRequires(Object):
|
||||
class AMQPRequires(Object):
|
||||
"""
|
||||
RabbitMQAMQPRequires class
|
||||
AMQPRequires class
|
||||
"""
|
||||
|
||||
on = RabbitMQAMQPServerEvents()
|
||||
on = AMQPServerEvents()
|
||||
_stored = StoredState()
|
||||
|
||||
def __init__(self, charm, relation_name: str, username: str, vhost: str):
|
||||
@ -123,6 +142,10 @@ class RabbitMQAMQPRequires(Object):
|
||||
self.charm.on[relation_name].relation_changed,
|
||||
self._on_amqp_relation_changed,
|
||||
)
|
||||
self.framework.observe(
|
||||
self.charm.on[relation_name].relation_departed,
|
||||
self._on_amqp_relation_changed,
|
||||
)
|
||||
self.framework.observe(
|
||||
self.charm.on[relation_name].relation_broken,
|
||||
self._on_amqp_relation_broken,
|
||||
@ -131,19 +154,19 @@ class RabbitMQAMQPRequires(Object):
|
||||
def _on_amqp_relation_joined(self, event):
|
||||
"""AMQP relation joined."""
|
||||
logging.debug("RabbitMQAMQPRequires on_joined")
|
||||
self.on.has_amqp_servers.emit()
|
||||
self.on.connected.emit()
|
||||
self.request_access(self.username, self.vhost)
|
||||
|
||||
def _on_amqp_relation_changed(self, event):
|
||||
"""AMQP relation changed."""
|
||||
logging.debug("RabbitMQAMQPRequires on_changed")
|
||||
if self.password:
|
||||
self.on.ready_amqp_servers.emit()
|
||||
self.on.ready.emit()
|
||||
|
||||
def _on_amqp_relation_broken(self, event):
|
||||
"""AMQP relation broken."""
|
||||
# TODO clear data on the relation
|
||||
logging.debug("RabbitMQAMQPRequires on_departed")
|
||||
logging.debug("RabbitMQAMQPRequires on_broken")
|
||||
self.on.goneaway.emit()
|
||||
|
||||
@property
|
||||
def _amqp_rel(self) -> Relation:
|
||||
@ -188,19 +211,19 @@ class ReadyAMQPClientsEvent(EventBase):
|
||||
pass
|
||||
|
||||
|
||||
class RabbitMQAMQPClientEvents(ObjectEvents):
|
||||
class AMQPClientEvents(ObjectEvents):
|
||||
"""Events class for `on`"""
|
||||
|
||||
has_amqp_clients = EventSource(HasAMQPClientsEvent)
|
||||
ready_amqp_clients = EventSource(ReadyAMQPClientsEvent)
|
||||
|
||||
|
||||
class RabbitMQAMQPProvides(Object):
|
||||
class AMQPProvides(Object):
|
||||
"""
|
||||
RabbitMQAMQPProvides class
|
||||
AMQPProvides class
|
||||
"""
|
||||
|
||||
on = RabbitMQAMQPClientEvents()
|
||||
on = AMQPClientEvents()
|
||||
_stored = StoredState()
|
||||
|
||||
def __init__(self, charm, relation_name):
|
||||
|
@ -2,3 +2,4 @@
|
||||
jinja2
|
||||
git+https://github.com/canonical/operator@2875e73e#egg=ops
|
||||
git+https://opendev.org/openstack/charm-ops-openstack#egg=ops_openstack
|
||||
git+https://github.com/openstack-charmers/advanced-sunbeam-openstack#egg=advanced_sunbeam_openstack
|
||||
|
@ -9,75 +9,78 @@ import logging
|
||||
from ops.charm import CharmBase
|
||||
from ops.framework import StoredState
|
||||
from ops.main import main
|
||||
from ops.model import ActiveStatus
|
||||
from ops.model import ActiveStatus, BlockedStatus, WaitingStatus
|
||||
|
||||
from charms.sunbeam_rabbitmq_operator.v0.amqp import RabbitMQAMQPRequires
|
||||
from charms.sunbeam_rabbitmq_operator.v0.amqp import AMQPRequires
|
||||
from charms.ceph.v0.ceph_client import CephClientRequires
|
||||
|
||||
from typing import List
|
||||
|
||||
# NOTE: rename sometime
|
||||
import advanced_sunbeam_openstack.core as core
|
||||
import advanced_sunbeam_openstack.adapters as adapters
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
CINDER_VOLUME_CONTAINER = 'cinder-volume'
|
||||
|
||||
class CinderCephOperatorCharm(CharmBase):
|
||||
"""Charm the service."""
|
||||
|
||||
_stored = StoredState()
|
||||
|
||||
def __init__(self, *args):
|
||||
super().__init__(*args)
|
||||
|
||||
self.framework.observe(
|
||||
self.on.cinder_volume_pebble_ready,
|
||||
self._on_cinder_volume_pebble_ready,
|
||||
)
|
||||
|
||||
self.framework.observe(self.on.config_changed, self._on_config_changed)
|
||||
|
||||
self._stored.set_default(amqp_ready=False)
|
||||
self._stored.set_default(ceph_ready=False)
|
||||
|
||||
self.amqp = RabbitMQAMQPRequires(
|
||||
self, "amqp", username="cinder", vhost="openstack"
|
||||
)
|
||||
self.framework.observe(
|
||||
self.amqp.on.ready_amqp_servers, self._on_amqp_ready
|
||||
)
|
||||
# TODO
|
||||
# State modelling
|
||||
# AMQP + Ceph -> +Volume
|
||||
class CinderCephAdapters(adapters.OPSRelationAdapters):
|
||||
|
||||
@property
|
||||
def _pebble_cinder_volume_layer(self):
|
||||
"""Pebble layer for Cinder volume"""
|
||||
return {
|
||||
"summary": "cinder layer",
|
||||
"description": "pebble configuration for cinder services",
|
||||
"services": {
|
||||
"cinder-volume": {
|
||||
"override": "replace",
|
||||
"summary": "Cinder Volume",
|
||||
"command": "cinder-volume --use-syslog",
|
||||
"startup": "enabled",
|
||||
}
|
||||
},
|
||||
}
|
||||
def interface_map(self):
|
||||
_map = super().interface_map
|
||||
_map.update({
|
||||
'rabbitmq': adapters.AMQPAdapter})
|
||||
return _map
|
||||
|
||||
def _on_cinder_volume_pebble_ready(self, event):
|
||||
"""Define and start a workload using the Pebble API."""
|
||||
container = event.workload
|
||||
container.add_layer(
|
||||
"cinder-volume", self._pebble_cinder_volume_layer, combine=True
|
||||
|
||||
class CinderCephOperatorCharm(core.OSBaseOperatorCharm):
|
||||
"""Cinder/Ceph Operator charm"""
|
||||
|
||||
# NOTE: service_name == container_name
|
||||
service_name = 'cinder-volume'
|
||||
|
||||
service_user = 'cinder'
|
||||
service_group = 'cinder'
|
||||
|
||||
cinder_conf = '/etc/cinder/cinder.conf'
|
||||
|
||||
def __init__(self, framework):
|
||||
super().__init__(
|
||||
framework,
|
||||
adapters=CinderCephAdapters(self)
|
||||
)
|
||||
container.autostart()
|
||||
|
||||
def _on_config_changed(self, _):
|
||||
"""Just an example to show how to deal with changed configuration"""
|
||||
# TODO
|
||||
# Set debug logging and restart services
|
||||
def get_relation_handlers(self) -> List[core.RelationHandler]:
|
||||
"""Relation handlers for the service."""
|
||||
self.amqp = core.AMQPHandler(
|
||||
self, "amqp", self.configure_charm
|
||||
)
|
||||
return [self.amqp]
|
||||
|
||||
@property
|
||||
def container_configs(self) -> List[core.ContainerConfigFile]:
|
||||
_cconfigs = super().container_configs
|
||||
_cconfigs.extend([
|
||||
core.ContainerConfigFile(
|
||||
[self.service_name],
|
||||
self.cinder_conf,
|
||||
self.service_user,
|
||||
self.service_group
|
||||
)
|
||||
])
|
||||
|
||||
def _do_bootstrap(self):
|
||||
"""No-op the bootstrap method as none required"""
|
||||
pass
|
||||
|
||||
def _on_amqp_ready(self, event):
|
||||
"""AMQP service ready for use"""
|
||||
self._stored.amqp_ready = True
|
||||
|
||||
|
||||
class CinderCephVictoriaOperatorCharm(CinderCephOperatorCharm):
|
||||
|
||||
openstack_relesae = 'victoria'
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main(CinderCephOperatorCharm)
|
||||
main(CinderCephVictoriaOperatorCharm, use_juju_for_storage=True)
|
||||
|
Loading…
x
Reference in New Issue
Block a user