certwrangler.models module

Contents

certwrangler.models module#

This module contains all the models used in certwrangler’s config and state. Note that the name field is automatically populated on loading of the config based on the key of the object.

class certwrangler.models.NamedModel[source]#

Bases: BaseModel

Base class for models that have a name.

The _name attribute is set by the Config class as part of Config.__post_populate() based on the key that the model was defined under.

_name: str#
property name: str#
model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class certwrangler.models.StateModel[source]#

Bases: BaseModel

Base class for models representing state.

The _migrated attribute is set if the model schema was migrated.

schema_migrations: ClassVar[List[Callable[[Dict[str, Any]], Dict[str, Any]]]]#
_migrated: bool#
property _schema_version: int#

The version of the schema, which is based on how many schema migrations are defined on the model.

classmethod _handle_schema_migrations(data: Any, handler: ModelWrapValidatorHandler[Self]) Any[source]#

Iterate through the defined schema_migrations callables to perform any needed migrations. This checks the incoming data for the current schema version to determine which migrations it should apply. This also instantiates the class and sets the _migrated variable to True if any of the migration callables were applied.

The callables should mutate the data dict as needed to migrate the schema then return it.

model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class certwrangler.models.Solver(*, driver: str, zones: List[Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^(?:(\\*\\.|[a-zA-Z0-9])(?:[a-zA-Z0-9-_]{0,61}[A-Za-z0-9])?\\.)+[A-Za-z0-9][A-Za-z0-9-_]{0,61}[A-Za-z]$')])]])[source]#

Bases: NamedModel

Base class for ACME challenge solver drivers.

model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

driver: str#
zones: List[Domain]#
abstractmethod create(name: str, domain: str, content: str) None[source]#

This should handle the logic of creating a TXT record.

abstractmethod delete(name: str, domain: str, content: str) None[source]#

This should handle the logic of deleting a TXT record.

initialize() None[source]#

Any driver specific initialization steps (creating resources, setting up clients, etc) should be placed here.

classmethod __validate_zones(values: List[Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^(?:(\\*\\.|[a-zA-Z0-9])(?:[a-zA-Z0-9-_]{0,61}[A-Za-z0-9])?\\.)+[A-Za-z0-9][A-Za-z0-9-_]{0,61}[A-Za-z]$')])]]) List[Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^(?:(\\*\\.|[a-zA-Z0-9])(?:[a-zA-Z0-9-_]{0,61}[A-Za-z0-9])?\\.)+[A-Za-z0-9][A-Za-z0-9-_]{0,61}[A-Za-z]$')])]]#

Validate that the configured zones have valid SOA records.

Returns:

A list of valid zones.

Raises:

ValueError – Raised if a configured zone doesn’t have an SOA record.

class certwrangler.models.Encryptor(fernets: Iterable[Fernet])[source]#

Bases: MultiFernet

This just adds the ability to generate a fingerprint of a fernet key.

property fingerprint: str#

Returns the fingerprint of the active encryption key.

Returns:

A string representing the fingerprint of the active key.

class certwrangler.models.StateManager(*, driver: str, encryption_keys: Fernet, ~pydantic.functional_validators.BeforeValidator(func=~certwrangler.types.<lambda>, json_schema_input_type=PydanticUndefined), ~pydantic.json_schema.WithJsonSchema(json_schema={'type': 'string'}, mode=None)]] = <factory>)[source]#

Bases: BaseModel

Base class for state manager drivers.

model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

driver: str#
encryption_keys: List[FernetKey]#
_config: Config#
_encryptor: Encryptor | None#
property encryptor: Encryptor | None#

This sets up and returns an Encryptor if encryption_keys are defined.

Returns:

The initialized Encryptor if encryption_keys are defined, otherwise returns None.

initialize() None[source]#

Any driver specific initialization steps (creating resources, setting up clients, etc) should be placed here.

abstractmethod list() Dict[str, Dict[str, Any]][source]#

Lists all the saved states for the given entity_class including encryption fingerprint.

abstractmethod save(entity: Account | Cert, encrypt: bool = True) None[source]#

Saves the state of the given entity.

abstractmethod load(entity: Account | Cert) None[source]#

Loads the state of the given entity to memory.

abstractmethod delete(entity_class: Literal['account', 'cert'], entity_name: str) None[source]#

Deletes the given entity_name from state.

class certwrangler.models.Store(*, driver: str)[source]#

Bases: NamedModel

Base class for store drivers.

model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

driver: str#
abstractmethod publish(cert: Cert) None[source]#

This should handle the logic of publishing the cert to the store.

initialize() None[source]#

Any driver specific initialization steps (creating resources, setting up clients, etc) should be placed here.

static _join_certs(*certs: str) str[source]#

Joins multiple certs together into a bundle.

class certwrangler.models.AccountStatus(*values)[source]#

Bases: str, Enum

new#
active#
class certwrangler.models.AccountState(*, registration: RegistrationResource, ~pydantic.functional_validators.BeforeValidator(func=~certwrangler.types.<lambda>, json_schema_input_type=PydanticUndefined), ~pydantic.functional_serializers.PlainSerializer(func=~certwrangler.types.<lambda>, return_type=PydanticUndefined, when_used=always), ~pydantic.json_schema.WithJsonSchema(json_schema={'type': 'object'}, mode=None)] | None = None, key: JWK, ~pydantic.functional_validators.BeforeValidator(func=~certwrangler.types.<lambda>, json_schema_input_type=PydanticUndefined), ~pydantic.functional_serializers.PlainSerializer(func=~certwrangler.types.<lambda>, return_type=PydanticUndefined, when_used=always), ~pydantic.json_schema.WithJsonSchema(json_schema={'type': 'object'}, mode=None)] | None = None, key_size: int | None = None, status: AccountStatus = AccountStatus.new)[source]#

Bases: StateModel

Managed ACME account state.

model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

schema_migrations#
registration: Registration | None#
key: JWKRSAKey | None#
key_size: int | None#
status: AccountStatus#
class certwrangler.models.Account(*, emails: List[EmailStr], server: HttpUrl = HttpUrl('https://acme-v02.api.letsencrypt.org/directory'), key_size: int = 2048)[source]#

Bases: NamedModel

Managed ACME account definition.

model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

emails: List[EmailStr]#
server: HttpUrl#
key_size: int#
_state: AccountState#
property state: AccountState#
classmethod __validate_unique_emails(values: List[EmailStr]) List[EmailStr]#

Validates that all the configured emails are unique.

class certwrangler.models.Subject(*, country: NameAttribute, ~pydantic.functional_validators.BeforeValidator(func=~certwrangler.types.<lambda>, json_schema_input_type=PydanticUndefined), ~pydantic.functional_serializers.PlainSerializer(func=~certwrangler.types.<lambda>, return_type=PydanticUndefined, when_used=always), ~pydantic.json_schema.WithJsonSchema(json_schema={'type': 'string'}, mode=None)] | None = None, state_or_province: NameAttribute, ~pydantic.functional_validators.BeforeValidator(func=~certwrangler.types.<lambda>, json_schema_input_type=PydanticUndefined), ~pydantic.functional_serializers.PlainSerializer(func=~certwrangler.types.<lambda>, return_type=PydanticUndefined, when_used=always), ~pydantic.json_schema.WithJsonSchema(json_schema={'type': 'string'}, mode=None)] | None = None, locality: NameAttribute, ~pydantic.functional_validators.BeforeValidator(func=~certwrangler.types.<lambda>, json_schema_input_type=PydanticUndefined), ~pydantic.functional_serializers.PlainSerializer(func=~certwrangler.types.<lambda>, return_type=PydanticUndefined, when_used=always), ~pydantic.json_schema.WithJsonSchema(json_schema={'type': 'string'}, mode=None)] | None = None, organization: NameAttribute, ~pydantic.functional_validators.BeforeValidator(func=~certwrangler.types.<lambda>, json_schema_input_type=PydanticUndefined), ~pydantic.functional_serializers.PlainSerializer(func=~certwrangler.types.<lambda>, return_type=PydanticUndefined, when_used=always), ~pydantic.json_schema.WithJsonSchema(json_schema={'type': 'string'}, mode=None)] | None = None, organizational_unit: NameAttribute, ~pydantic.functional_validators.BeforeValidator(func=~certwrangler.types.<lambda>, json_schema_input_type=PydanticUndefined), ~pydantic.functional_serializers.PlainSerializer(func=~certwrangler.types.<lambda>, return_type=PydanticUndefined, when_used=always), ~pydantic.json_schema.WithJsonSchema(json_schema={'type': 'string'}, mode=None)] | None = None)[source]#

Bases: NamedModel

Cert subject.

model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

country: CountryNameOID | None#
state_or_province: StateOrProvinceOID | None#
locality: LocalityOID | None#
organization: OrganizationOID | None#
organizational_unit: OrganizationalUnitOID | None#
class certwrangler.models.CertStatus(*values)[source]#

Bases: str, Enum

new#
active#
renewing#
class certwrangler.models.CertState(*, url: str | None = None, key: EllipticCurvePrivateKey, ~pydantic.functional_validators.BeforeValidator(func=~certwrangler.types.<lambda>, json_schema_input_type=PydanticUndefined), ~pydantic.functional_serializers.PlainSerializer(func=~certwrangler.types.<lambda>, return_type=PydanticUndefined, when_used=always), ~pydantic.json_schema.WithJsonSchema(json_schema={'type': 'string'}, mode=None)] | None = None, key_size: int | None = None, cert: Certificate, ~pydantic.functional_validators.BeforeValidator(func=~certwrangler.types.<lambda>, json_schema_input_type=PydanticUndefined), ~pydantic.functional_serializers.PlainSerializer(func=~certwrangler.types.<lambda>, return_type=PydanticUndefined, when_used=always), ~pydantic.json_schema.WithJsonSchema(json_schema={'type': 'string'}, mode=None)] | None = None, chain: Certificate, ~pydantic.functional_validators.BeforeValidator(func=~certwrangler.types.<lambda>, json_schema_input_type=PydanticUndefined), ~pydantic.functional_serializers.PlainSerializer(func=~certwrangler.types.<lambda>, return_type=PydanticUndefined, when_used=always), ~pydantic.json_schema.WithJsonSchema(json_schema={'type': 'string'}, mode=None)]] | None = None, csr: CertificateSigningRequest, ~pydantic.functional_validators.BeforeValidator(func=~certwrangler.types.<lambda>, json_schema_input_type=PydanticUndefined), ~pydantic.functional_serializers.PlainSerializer(func=~certwrangler.types.<lambda>, return_type=PydanticUndefined, when_used=always), ~pydantic.json_schema.WithJsonSchema(json_schema={'type': 'string'}, mode=None)] | None = None, order: OrderResource, ~pydantic.functional_validators.BeforeValidator(func=~certwrangler.types._order_loader, json_schema_input_type=PydanticUndefined), ~pydantic.functional_serializers.PlainSerializer(func=~certwrangler.types.<lambda>, return_type=PydanticUndefined, when_used=always), ~pydantic.json_schema.WithJsonSchema(json_schema={'type': 'object'}, mode=None)] | None = None, status: CertStatus = CertStatus.new)[source]#

Bases: StateModel

Managed cert state.

model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

schema_migrations#
url: str | None#
key: RSAKey | None#
key_size: int | None#
cert: X509Certificate | None#
chain: List[X509Certificate] | None#
csr: X509CSR | None#
order: Order | None#
status: CertStatus#
property fullchain: WithJsonSchema(json_schema={'type': 'string'}, mode=None)]] | None#

The full chain of trust including the leaf cert.

class certwrangler.models.Cert(*, common_name: ~typing.Annotated[str, _PydanticGeneralMetadata(pattern='^(?:(\\*\\.|[a-zA-Z0-9])(?:[a-zA-Z0-9-_]{0,61}[A-Za-z0-9])?\\.)+[A-Za-z0-9][A-Za-z0-9-_]{0,61}[A-Za-z]$')], account_name: str, store_names: ~typing.List[str], store_key: str | None = None, subject_name: str = 'default', alt_names: ~typing.List[~typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, metadata=[_PydanticGeneralMetadata(pattern='^(?:(\\*\\.|[a-zA-Z0-9])(?:[a-zA-Z0-9-_]{0,61}[A-Za-z0-9])?\\.)+[A-Za-z0-9][A-Za-z0-9-_]{0,61}[A-Za-z]$')])]] = <factory>, wait_timeout: ~datetime.timedelta = datetime.timedelta(seconds=300), key_size: int = 2048, follow_cnames: bool = True, renewal_threshold: ~typing.Annotated[~datetime.timedelta, ~pydantic.functional_validators.BeforeValidator(func=~certwrangler.types.<lambda>, json_schema_input_type=PydanticUndefined), ~pydantic.functional_serializers.PlainSerializer(func=~certwrangler.types.<lambda>, return_type=PydanticUndefined, when_used=always)] = datetime.timedelta(days=30))[source]#

Bases: NamedModel

Managed cert definition.

model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

common_name: Domain#
account_name: str#
store_names: List[str]#
store_key: str | None#
subject_name: str#
alt_names: List[Domain]#
wait_timeout: timedelta#
key_size: int#
follow_cnames: bool#
renewal_threshold: Days#
_state: CertState#
_config: Config#
property state: CertState#
property account: Account#

Returns the account object configured for the cert.

Raises:

ValueError – Raised if the account can’t be found in the config.

property stores: List[Store]#

Returns a list of the configured store objects.

Raises:

ValueError – Raised if a store can’t be found in the config.

property solvers: Dict[str, Solver]#

Returns the available solvers.

property subject: Subject#

Returns the subject object configured for the cert.

Raises:

ValueError – Raised if a subject can’t be found in the config.

get_solver_for_zone(zone: str) Solver[source]#

Finds a solver for a given zone name.

Raises:

ValueError – Raised if a solver for the zone can’t be found in the config.

classmethod __validate_unique_stores(values: List[str]) List[str]#

Validates that all the configured stores are unique.

Raises:

ValueError – Raised if there are duplicate stores.

property time_left: timedelta#

Returns the cert expiry as a datetime.timedelta. If no cert is in the state it returns an empty datetime.timedelta.

Returns:

A datetime.timedelta representing the cert’s expiry.

property needs_renewal: bool#

Check if a cert needs to be renewed by checking its expiry time is less than renewal_threshold, or if it’s common_name or alternative_names changed.

We specifically don’t check for the subject since apparently LE strips that out.

Returns:

A bool representing if the cert should be renewed.

class certwrangler.models.ReconcilerConfig(*, interval: int = 60)[source]#

Bases: BaseModel

Config for the reconciler loop subsystem.

model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

interval: int#
class certwrangler.models.MetricsConfig(*, mount: str = '/metrics')[source]#

Bases: BaseModel

Config for the metrics subsystem.

model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

mount: str#
class certwrangler.models.HttpConfig(*, host: IPvAnyAddress = IPv4Address('127.0.0.1'), port: int = 6377, server_name: str = 'certwrangler', ssl_key_file: Path | None = None, ssl_key_password: str | None = None, ssl_cert_file: Path | None = None, ssl_ca_certs_file: Path | None = None)[source]#

Bases: BaseModel

Config for the HTTP subsystem.

model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

host: IPvAnyAddress#
port: int#
server_name: str#
ssl_key_file: Path | None#
ssl_key_password: str | None#
ssl_cert_file: Path | None#
ssl_ca_certs_file: Path | None#
classmethod __validate_ssl_files_exist(value: Path | None) Path | None#

Validates that the specified file exists.

Raises:

ValueError – Raised if the file does not exist.

__validate_ssl_options() HttpConfig#

Validate that we have both ssl_key_file and ssl_cert_file populated if either are set.

Raises:

ValueError – Raised if either ssl_key_file or ssl_cert_file is not set when the other is set.

class certwrangler.models.DaemonConfig(*, reconciler: ReconcilerConfig = <factory>, metrics: MetricsConfig = <factory>, http: HttpConfig = <factory>, watchdog_interval: int = 30)[source]#

Bases: BaseModel

Config for the daemon.

model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

reconciler: ReconcilerConfig#
metrics: MetricsConfig#
http: HttpConfig#
watchdog_interval: int#
class certwrangler.models.Config(*, daemon: ~certwrangler.models.DaemonConfig = <factory>, state: ~certwrangler.models.StateManager, accounts: ~typing.Dict[str, ~certwrangler.models.Account], certs: ~typing.Dict[str, ~certwrangler.models.Cert], solvers: ~typing.Dict[str, ~certwrangler.models.Solver], stores: ~typing.Dict[str, ~certwrangler.models.Store], subjects: ~typing.Dict[str, ~certwrangler.models.Subject])[source]#

Bases: BaseModel

The root config object for the application.

This class is the root of the entire config tree of the application and is responsible for loading any of the plugins specified by sub-members in their configuration as well as triggering any initialization hooks that may be specified by the various plugins.

model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

daemon: DaemonConfig#
state_manager: StateManager#
accounts: Dict[str, Account]#
certs: Dict[str, Cert]#
solvers: Dict[str, Solver]#
stores: Dict[str, Store]#
subjects: Dict[str, Subject]#
classmethod __load_solver_plugins(values: Dict[str, Any]) Dict[str, Solver]#

Dynamically load solver plugins based on their driver key.

Raises:

ValueError – Raised if the specified plugin can’t be loaded.

classmethod __load_state_manager_plugin(values: Dict[str, Any]) StateManager#

Dynamically load state_manager plugins based on their driver key.

Raises:

ValueError – Raised if the specified plugin can’t be loaded.

classmethod __load_store_plugins(values: Dict[str, Any]) Dict[str, Store]#

Dynamically load store plugins based on their driver key.

Raises:

ValueError – Raised if the specified plugin can’t be loaded.

classmethod __pre_populate(values: Dict[str, Any]) Dict[str, Any]#

Pre-populate the config data with some defaults.

__post_populate() Config#

Loops through the certs config and populates the reference to the root config object, which is needed to resolve foreign references.

Also loops through all the objects and populates their name field based on their key.

It then tries to resolve all references to account, subject, and stores on the cert object and will raise a ValueError if any references don’t resolve.

Raises:

ValueError – Raised if any references on sub-objects don’t resolve.

initialize() None[source]#

Initialize drivers and load state on stateful objects.