"""Set of helpers to generate signed X.509v3 certificates.
This works by shelling out calls to the 'openssl req' and 'openssl ca'
commands, and passing the appropriate command line flags and configuration file
(.cnf).
"""
import base64
import hashlib
import os
import shutil
import subprocess
import sys
from . import openssl_conf
TYPE_CA = 2
TYPE_END_ENTITY = 3
MARCH_1_2015_UTC = '150301120000Z'
MARCH_2_2015_UTC = '150302120000Z'
JANUARY_1_2015_UTC = '150101120000Z'
SEPTEMBER_1_2015_UTC = '150901120000Z'
JANUARY_1_2016_UTC = '160101120000Z'
OCTOBER_5_2021_UTC = '211005120000Z'
OCTOBER_5_2022_UTC = '221005120000Z'
KEY_PURPOSE_ANY = 'anyExtendedKeyUsage'
KEY_PURPOSE_SERVER_AUTH = 'serverAuth'
KEY_PURPOSE_CLIENT_AUTH = 'clientAuth'
DEFAULT_KEY_PURPOSE = KEY_PURPOSE_SERVER_AUTH
g_cur_path_id = {}
g_tmp_dir = None
g_invoking_script_path = None
g_default_start_date = OCTOBER_5_2021_UTC
g_default_end_date = OCTOBER_5_2022_UTC
def set_default_validity_range(start_date, end_date):
"""Sets the validity range that will be used for certificates created with
Certificate"""
global g_default_start_date
global g_default_end_date
g_default_start_date = start_date
g_default_end_date = end_date
def get_unique_path_id(name):
"""Returns a base filename that contains 'name', but is unique to the output
directory"""
lowercase_name = name.lower()
path_id = g_cur_path_id.get(lowercase_name, 0)
g_cur_path_id[lowercase_name] = path_id + 1
if path_id == 0:
return name
return '%s_%d' % (name, path_id)
def get_path_in_tmp_dir(name, suffix):
return os.path.join(g_tmp_dir, '%s%s' % (name, suffix))
class Key(object):
"""Describes a public + private key pair. It is a dumb wrapper around an
on-disk key."""
def __init__(self, path):
self.path = path
def get_path(self):
"""Returns the path to a file that contains the key contents."""
return self.path
def get_or_generate_key(generation_arguments, path):
"""Helper function to either retrieve a key from an existing file |path|, or
generate a new one using the command line |generation_arguments|."""
generation_arguments_str = ' '.join(generation_arguments)
if not os.path.isfile(path):
key_contents = subprocess.check_output(generation_arguments, text=True)
write_string_to_file(generation_arguments_str + '\n' + key_contents,
path)
else:
first_line = read_file_to_string(path).splitlines()[0]
if first_line != generation_arguments_str:
sys.stderr.write(('\nERROR: The existing key file:\n %s\nis not '
'compatible with the requested parameters:\n "%s" vs "%s".\n'
'Delete the file if you want to re-generate it with the new '
'parameters, otherwise pick a new filename\n') % (
path, first_line, generation_arguments_str))
sys.exit(1)
return Key(path)
def get_or_generate_rsa_key(size_bits, path):
"""Retrieves an existing key from a file if the path exists. Otherwise
generates an RSA key with the specified bit size and saves it to the path."""
return get_or_generate_key(['openssl', 'genrsa', str(size_bits)], path)
def get_or_generate_ec_key(named_curve, path):
"""Retrieves an existing key from a file if the path exists. Otherwise
generates an EC key with the specified named curve and saves it to the
path."""
return get_or_generate_key(['openssl', 'ecparam', '-name', named_curve,
'-genkey'], path)
def create_key_path(base_name):
"""Generates a name that contains |base_name| in it, and is relative to the
"keys/" directory. If create_key_path(xxx) is called more than once during
the script run, a suffix will be added."""
keys_dir = 'keys'
if not os.path.exists(keys_dir):
os.makedirs(keys_dir)
return get_unique_path_id(os.path.join(keys_dir, base_name)) + '.key'
class Certificate(object):
"""Helper for building an X.509 certificate."""
def __init__(self, name, cert_type, issuer):
self.name = name
self.path_id = get_unique_path_id(name)
self.key = None
self.issuer = issuer
if issuer is None:
self.issuer = self
self.config = openssl_conf.Config()
self.init_config()
self.validity_flags = []
self.md_flags = []
self.set_validity_range(g_default_start_date, g_default_end_date)
self.set_signature_hash('sha256')
if cert_type == TYPE_END_ENTITY:
self.get_extensions().set_property('keyUsage',
'critical,digitalSignature,keyEncipherment')
self.get_extensions().set_property('extendedKeyUsage',
'serverAuth,clientAuth')
else:
self.get_extensions().set_property('keyUsage',
'critical,keyCertSign,cRLSign')
self.get_extensions().set_property('basicConstraints', 'critical,CA:true')
self.finalized = False
if not os.path.exists(self.get_serial_path()):
write_string_to_file('%s\n' % self.make_serial_number(),
self.get_serial_path())
if not os.path.exists(self.get_database_path()):
write_string_to_file('', self.get_database_path())
def set_validity_range(self, start_date, end_date):
"""Sets the Validity notBefore and notAfter properties for the
certificate"""
self.validity_flags = ['-startdate', start_date, '-enddate', end_date]
def set_signature_hash(self, md):
"""Sets the hash function that will be used when signing this certificate.
Can be sha1, sha256, sha512, md5, etc."""
self.md_flags = ['-md', md]
def get_extensions(self):
return self.config.get_section('req_ext')
def get_subject(self):
"""Returns the configuration section responsible for the subject of the
certificate. This can be used to alter the subject to be more complex."""
return self.config.get_section('req_dn')
def get_path(self, suffix):
"""Forms a path to an output file for this certificate, containing the
indicated suffix. The certificate's name will be used as its basis."""
return os.path.join(g_tmp_dir, '%s%s' % (self.path_id, suffix))
def get_name_path(self, suffix):
"""Forms a path to an output file for this CA, containing the indicated
suffix. If multiple certificates have the same name, they will use the same
path."""
return get_path_in_tmp_dir(self.name, suffix)
def set_key(self, key):
assert self.finalized is False
self.set_key_internal(key)
def set_key_internal(self, key):
self.key = key
section = self.config.get_section('root_ca')
section.set_property('private_key', self.key.get_path())
def get_key(self):
if self.key is None:
self.set_key_internal(
get_or_generate_rsa_key(2048, create_key_path(self.name)))
return self.key
def get_cert_path(self):
return self.get_path('.pem')
def get_serial_path(self):
return self.get_name_path('.serial')
def make_serial_number(self):
"""Returns a hex number that is generated based on the certificate file
path. This serial number will likely be globally unique, which makes it
easier to use the certificates with NSS (which assumes certificate
equivalence based on issuer and serial number)."""
m = hashlib.sha1()
script_path = os.path.realpath(g_invoking_script_path)
script_path = "/".join(script_path.split(os.sep)[-3:])
m.update(script_path.encode('utf-8'))
m.update(self.path_id.encode('utf-8'))
serial_bytes = bytearray(m.digest())
serial_bytes[0] = serial_bytes[0] & 0x7F
return serial_bytes.hex()
def get_csr_path(self):
return self.get_path('.csr')
def get_database_path(self):
return self.get_name_path('.db')
def get_config_path(self):
return self.get_path('.cnf')
def get_cert_pem(self):
self.finalize()
return read_file_to_string(self.get_cert_path())
def finalize(self):
"""Finishes the certificate creation process. This generates any needed
key, creates and signs the CSR. On completion the resulting PEM file can be
found at self.get_cert_path()"""
if self.finalized:
return
self.finalized = True
self.issuer.finalize()
self.get_key()
self.config.write_to_file(self.get_config_path())
subprocess.check_call(
['openssl', 'req', '-new',
'-key', self.key.get_path(),
'-out', self.get_csr_path(),
'-config', self.get_config_path()])
cmd = ['openssl', 'ca', '-batch', '-in',
self.get_csr_path(), '-out', self.get_cert_path(), '-config',
self.issuer.get_config_path()]
if self.issuer == self:
cmd.append('-selfsign')
cmd.extend(self.validity_flags)
cmd.extend(self.md_flags)
subprocess.check_call(cmd)
def init_config(self):
"""Initializes default properties in the certificate .cnf file that are
generic enough to work for all certificates (but can be overridden later).
"""
section = self.config.get_section('req')
section.set_property('encrypt_key', 'no')
section.set_property('utf8', 'yes')
section.set_property('string_mask', 'utf8only')
section.set_property('prompt', 'no')
section.set_property('distinguished_name', 'req_dn')
section.set_property('req_extensions', 'req_ext')
section = self.config.get_section('req_dn')
section.set_property('commonName', '"%s"' % (self.name))
section = self.config.get_section('req_ext')
section.set_property('subjectKeyIdentifier', 'hash')
section = self.config.get_section('ca')
section.set_property('default_ca', 'root_ca')
section = self.config.get_section('root_ca')
section.set_property('certificate', self.get_cert_path())
section.set_property('new_certs_dir', g_tmp_dir)
section.set_property('serial', self.get_serial_path())
section.set_property('database', self.get_database_path())
section.set_property('unique_subject', 'no')
section.set_property('default_days', '365')
section.set_property('default_md', 'sha256')
section.set_property('policy', 'policy_anything')
section.set_property('email_in_dn', 'no')
section.set_property('preserve', 'yes')
section.set_property('name_opt', 'multiline,-esc_msb,utf8')
section.set_property('cert_opt', 'ca_default')
section.set_property('copy_extensions', 'copy')
section.set_property('x509_extensions', 'signing_ca_ext')
section.set_property('default_crl_days', '30')
section.set_property('crl_extensions', 'crl_ext')
section = self.config.get_section('policy_anything')
section.set_property('domainComponent', 'optional')
section.set_property('countryName', 'optional')
section.set_property('stateOrProvinceName', 'optional')
section.set_property('localityName', 'optional')
section.set_property('organizationName', 'optional')
section.set_property('organizationalUnitName', 'optional')
section.set_property('commonName', 'optional')
section.set_property('emailAddress', 'optional')
section = self.config.get_section('signing_ca_ext')
section.set_property('subjectKeyIdentifier', 'hash')
section.set_property('authorityKeyIdentifier', 'keyid:always')
section.set_property('authorityInfoAccess', '@issuer_info')
section.set_property('crlDistributionPoints', '@crl_info')
section = self.config.get_section('issuer_info')
section.set_property('caIssuers;URI.0',
'http://url-for-aia/%s.cer' % (self.name))
section = self.config.get_section('crl_info')
section.set_property('URI.0', 'http://url-for-crl/%s.crl' % (self.name))
section = self.config.get_section('crl_ext')
section.set_property('authorityKeyIdentifier', 'keyid:always')
section.set_property('authorityInfoAccess', '@issuer_info')
def text_data_to_pem(block_header, text_data):
pem_data = base64.b64encode(text_data.encode('utf8')).decode('utf8')
return '%s\n-----BEGIN %s-----\n%s\n-----END %s-----\n' % (
text_data, block_header, pem_data, block_header)
def write_chain(description, chain, out_pem):
"""Writes the chain to a .pem file as a series of CERTIFICATE blocks"""
test_data = '[Created by: %s]\n\n%s\n' % (sys.argv[0], description)
for cert in chain:
test_data += '\n' + cert.get_cert_pem()
write_string_to_file(test_data, out_pem)
def write_string_to_file(data, path):
with open(path, 'w') as f:
f.write(data)
def read_file_to_string(path):
with open(path, 'r') as f:
return f.read()
def init(invoking_script_path):
"""Creates an output directory to contain all the temporary files that may be
created, as well as determining the path for the final output. These paths
are all based off of the name of the calling script.
"""
global g_tmp_dir
global g_invoking_script_path
g_invoking_script_path = invoking_script_path
expected_cwd = os.path.realpath(os.path.dirname(invoking_script_path))
actual_cwd = os.path.realpath(os.getcwd())
if actual_cwd != expected_cwd:
sys.stderr.write(
('Your current working directory must be that containing the python '
'scripts:\n%s\nas the script may reference paths relative to this\n')
% (expected_cwd))
sys.exit(1)
g_tmp_dir = 'out'
sys.stdout.write('Creating output directory: %s\n' % (g_tmp_dir))
shutil.rmtree(g_tmp_dir, True)
os.makedirs(g_tmp_dir)
def create_self_signed_root_certificate(name):
return Certificate(name, TYPE_CA, None)
def create_intermediate_certificate(name, issuer):
return Certificate(name, TYPE_CA, issuer)
def create_self_signed_end_entity_certificate(name):
return Certificate(name, TYPE_END_ENTITY, None)
def create_end_entity_certificate(name, issuer):
return Certificate(name, TYPE_END_ENTITY, issuer)
init(sys.argv[0])