|
| 1 | +#!/usr/bin/env python |
| 2 | +# |
| 3 | +# ESP32 x509 certificate bundle generation utility |
| 4 | +# |
| 5 | +# Converts PEM and DER certificates to a custom bundle format which stores just the |
| 6 | +# subject name and public key to reduce space |
| 7 | +# |
| 8 | +# The bundle will have the format: number of certificates; crt 1 subject name length; crt 1 public key length; |
| 9 | +# crt 1 subject name; crt 1 public key; crt 2... |
| 10 | +# |
| 11 | +# Copyright 2018-2019 Espressif Systems (Shanghai) PTE LTD |
| 12 | +# |
| 13 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 14 | +# you may not use this file except in compliance with the License. |
| 15 | +# You may obtain a copy of the License at |
| 16 | +# |
| 17 | +# http:#www.apache.org/licenses/LICENSE-2.0 |
| 18 | +# |
| 19 | +# Unless required by applicable law or agreed to in writing, software |
| 20 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 21 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 22 | +# See the License for the specific language governing permissions and |
| 23 | +# limitations under the License. |
| 24 | + |
| 25 | +from __future__ import with_statement |
| 26 | + |
| 27 | +import argparse |
| 28 | +import csv |
| 29 | +import os |
| 30 | +import re |
| 31 | +import struct |
| 32 | +import sys |
| 33 | +from io import open |
| 34 | + |
| 35 | +try: |
| 36 | + from cryptography import x509 |
| 37 | + from cryptography.hazmat.backends import default_backend |
| 38 | + from cryptography.hazmat.primitives import serialization |
| 39 | +except ImportError: |
| 40 | + print('The cryptography package is not installed.' |
| 41 | + 'Please refer to the Get Started section of the ESP-IDF Programming Guide for ' |
| 42 | + 'setting up the required packages.') |
| 43 | + raise |
| 44 | + |
| 45 | +ca_bundle_bin_file = 'x509_crt_bundle' |
| 46 | + |
| 47 | +quiet = False |
| 48 | + |
| 49 | + |
| 50 | +def status(msg): |
| 51 | + """ Print status message to stderr """ |
| 52 | + if not quiet: |
| 53 | + critical(msg) |
| 54 | + |
| 55 | + |
| 56 | +def critical(msg): |
| 57 | + """ Print critical message to stderr """ |
| 58 | + sys.stderr.write('gen_crt_bundle.py: ') |
| 59 | + sys.stderr.write(msg) |
| 60 | + sys.stderr.write('\n') |
| 61 | + |
| 62 | + |
| 63 | +class CertificateBundle: |
| 64 | + def __init__(self): |
| 65 | + self.certificates = [] |
| 66 | + self.compressed_crts = [] |
| 67 | + |
| 68 | + if os.path.isfile(ca_bundle_bin_file): |
| 69 | + os.remove(ca_bundle_bin_file) |
| 70 | + |
| 71 | + def add_from_path(self, crts_path): |
| 72 | + |
| 73 | + found = False |
| 74 | + for file_path in os.listdir(crts_path): |
| 75 | + found |= self.add_from_file(os.path.join(crts_path, file_path)) |
| 76 | + |
| 77 | + if found is False: |
| 78 | + raise InputError('No valid x509 certificates found in %s' % crts_path) |
| 79 | + |
| 80 | + def add_from_file(self, file_path): |
| 81 | + try: |
| 82 | + if file_path.endswith('.pem'): |
| 83 | + status('Parsing certificates from %s' % file_path) |
| 84 | + with open(file_path, 'r', encoding='utf-8') as f: |
| 85 | + crt_str = f.read() |
| 86 | + self.add_from_pem(crt_str) |
| 87 | + return True |
| 88 | + |
| 89 | + elif file_path.endswith('.der'): |
| 90 | + status('Parsing certificates from %s' % file_path) |
| 91 | + with open(file_path, 'rb') as f: |
| 92 | + crt_str = f.read() |
| 93 | + self.add_from_der(crt_str) |
| 94 | + return True |
| 95 | + |
| 96 | + except ValueError: |
| 97 | + critical('Invalid certificate in %s' % file_path) |
| 98 | + raise InputError('Invalid certificate') |
| 99 | + |
| 100 | + return False |
| 101 | + |
| 102 | + def add_from_pem(self, crt_str): |
| 103 | + """ A single PEM file may have multiple certificates """ |
| 104 | + |
| 105 | + crt = '' |
| 106 | + count = 0 |
| 107 | + start = False |
| 108 | + |
| 109 | + for strg in crt_str.splitlines(True): |
| 110 | + if strg == '-----BEGIN CERTIFICATE-----\n' and start is False: |
| 111 | + crt = '' |
| 112 | + start = True |
| 113 | + elif strg == '-----END CERTIFICATE-----\n' and start is True: |
| 114 | + crt += strg + '\n' |
| 115 | + start = False |
| 116 | + self.certificates.append(x509.load_pem_x509_certificate(crt.encode(), default_backend())) |
| 117 | + count += 1 |
| 118 | + if start is True: |
| 119 | + crt += strg |
| 120 | + |
| 121 | + if(count == 0): |
| 122 | + raise InputError('No certificate found') |
| 123 | + |
| 124 | + status('Successfully added %d certificates' % count) |
| 125 | + |
| 126 | + def add_from_der(self, crt_str): |
| 127 | + self.certificates.append(x509.load_der_x509_certificate(crt_str, default_backend())) |
| 128 | + status('Successfully added 1 certificate') |
| 129 | + |
| 130 | + def create_bundle(self): |
| 131 | + # Sort certificates in order to do binary search when looking up certificates |
| 132 | + self.certificates = sorted(self.certificates, key=lambda cert: cert.subject.public_bytes(default_backend())) |
| 133 | + |
| 134 | + bundle = struct.pack('>H', len(self.certificates)) |
| 135 | + |
| 136 | + for crt in self.certificates: |
| 137 | + """ Read the public key as DER format """ |
| 138 | + pub_key = crt.public_key() |
| 139 | + pub_key_der = pub_key.public_bytes(serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo) |
| 140 | + |
| 141 | + """ Read the subject name as DER format """ |
| 142 | + sub_name_der = crt.subject.public_bytes(default_backend()) |
| 143 | + |
| 144 | + name_len = len(sub_name_der) |
| 145 | + key_len = len(pub_key_der) |
| 146 | + len_data = struct.pack('>HH', name_len, key_len) |
| 147 | + |
| 148 | + bundle += len_data |
| 149 | + bundle += sub_name_der |
| 150 | + bundle += pub_key_der |
| 151 | + |
| 152 | + return bundle |
| 153 | + |
| 154 | + def add_with_filter(self, crts_path, filter_path): |
| 155 | + |
| 156 | + filter_set = set() |
| 157 | + with open(filter_path, 'r', encoding='utf-8') as f: |
| 158 | + csv_reader = csv.reader(f, delimiter=',') |
| 159 | + |
| 160 | + # Skip header |
| 161 | + next(csv_reader) |
| 162 | + for row in csv_reader: |
| 163 | + filter_set.add(row[1]) |
| 164 | + |
| 165 | + status('Parsing certificates from %s' % crts_path) |
| 166 | + crt_str = [] |
| 167 | + with open(crts_path, 'r', encoding='utf-8') as f: |
| 168 | + crt_str = f.read() |
| 169 | + |
| 170 | + # Split all certs into a list of (name, certificate string) tuples |
| 171 | + pem_crts = re.findall(r'(^.+?)\n(=+\n[\s\S]+?END CERTIFICATE-----\n)', crt_str, re.MULTILINE) |
| 172 | + |
| 173 | + filtered_crts = '' |
| 174 | + for name, crt in pem_crts: |
| 175 | + if name in filter_set: |
| 176 | + filtered_crts += crt |
| 177 | + |
| 178 | + self.add_from_pem(filtered_crts) |
| 179 | + |
| 180 | + |
| 181 | +class InputError(RuntimeError): |
| 182 | + def __init__(self, e): |
| 183 | + super(InputError, self).__init__(e) |
| 184 | + |
| 185 | + |
| 186 | +def main(): |
| 187 | + global quiet |
| 188 | + |
| 189 | + parser = argparse.ArgumentParser(description='ESP-IDF x509 certificate bundle utility') |
| 190 | + |
| 191 | + parser.add_argument('--quiet', '-q', help="Don't print non-critical status messages to stderr", action='store_true') |
| 192 | + parser.add_argument('--input', '-i', nargs='+', required=True, |
| 193 | + help='Paths to the custom certificate folders or files to parse, parses all .pem or .der files') |
| 194 | + parser.add_argument('--filter', '-f', help='Path to CSV-file where the second columns contains the name of the certificates \ |
| 195 | + that should be included from cacrt_all.pem') |
| 196 | + |
| 197 | + args = parser.parse_args() |
| 198 | + |
| 199 | + quiet = args.quiet |
| 200 | + |
| 201 | + bundle = CertificateBundle() |
| 202 | + |
| 203 | + for path in args.input: |
| 204 | + if os.path.isfile(path): |
| 205 | + if os.path.basename(path) == 'cacrt_all.pem' and args.filter: |
| 206 | + bundle.add_with_filter(path, args.filter) |
| 207 | + else: |
| 208 | + bundle.add_from_file(path) |
| 209 | + elif os.path.isdir(path): |
| 210 | + bundle.add_from_path(path) |
| 211 | + else: |
| 212 | + raise InputError('Invalid --input=%s, is neither file nor folder' % args.input) |
| 213 | + |
| 214 | + status('Successfully added %d certificates in total' % len(bundle.certificates)) |
| 215 | + |
| 216 | + crt_bundle = bundle.create_bundle() |
| 217 | + |
| 218 | + with open(ca_bundle_bin_file, 'wb') as f: |
| 219 | + f.write(crt_bundle) |
| 220 | + |
| 221 | + |
| 222 | +if __name__ == '__main__': |
| 223 | + try: |
| 224 | + main() |
| 225 | + except InputError as e: |
| 226 | + print(e) |
| 227 | + sys.exit(2) |
0 commit comments