diff -up ./acme_tiny.py.chain ./acme_tiny.py
--- ./acme_tiny.py.chain 2017-05-16 03:57:46.000000000 -0400
+++ ./acme_tiny.py 2017-11-22 15:14:19.270485351 -0500
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/python
import argparse, subprocess, json, os, sys, base64, binascii, time, hashlib, re, copy, textwrap, logging
try:
from urllib.request import urlopen # Python 3
@@ -12,7 +12,7 @@ LOGGER = logging.getLogger(__name__)
LOGGER.addHandler(logging.StreamHandler())
LOGGER.setLevel(logging.INFO)
-def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA):
+def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, chain=False):
# helper function base64 encode for jose spec
def _b64(b):
return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "")
@@ -57,9 +57,9 @@ def get_crt(account_key, csr, acme_dir,
})
try:
resp = urlopen(url, data.encode('utf8'))
- return resp.getcode(), resp.read()
+ return resp.getcode(), resp.read(), resp.info()
except IOError as e:
- return getattr(e, "code", None), getattr(e, "read", e.__str__)()
+ return getattr(e, "code", None), getattr(e, "read", e.__str__)(), None
# find domains
log.info("Parsing CSR...")
@@ -80,9 +80,9 @@ def get_crt(account_key, csr, acme_dir,
# get the certificate domains and expiration
log.info("Registering account...")
- code, result = _send_signed_request(CA + "/acme/new-reg", {
+ code, result, headers = _send_signed_request(CA + "/acme/new-reg", {
"resource": "new-reg",
- "agreement": "https://letsencrypt.org/documents/LE-SA-v1.1.1-August-1-2016.pdf",
+ "agreement": "https://letsencrypt.org/documents/LE-SA-v1.2-November-15-2017.pdf",
})
if code == 201:
log.info("Registered!")
@@ -96,7 +96,7 @@ def get_crt(account_key, csr, acme_dir,
log.info("Verifying {0}...".format(domain))
# get new challenge
- code, result = _send_signed_request(CA + "/acme/new-authz", {
+ code, result, headers = _send_signed_request(CA + "/acme/new-authz", {
"resource": "new-authz",
"identifier": {"type": "dns", "value": domain},
})
@@ -123,7 +123,7 @@ def get_crt(account_key, csr, acme_dir,
wellknown_path, wellknown_url))
# notify challenge are met
- code, result = _send_signed_request(challenge['uri'], {
+ code, result, headers = _send_signed_request(challenge['uri'], {
"resource": "challenge",
"keyAuthorization": keyauthorization,
})
@@ -153,17 +153,32 @@ def get_crt(account_key, csr, acme_dir,
proc = subprocess.Popen(["openssl", "req", "-in", csr, "-outform", "DER"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
csr_der, err = proc.communicate()
- code, result = _send_signed_request(CA + "/acme/new-cert", {
+ code, result, headers = _send_signed_request(CA + "/acme/new-cert", {
"resource": "new-cert",
"csr": _b64(csr_der),
})
if code != 201:
raise ValueError("Error signing certificate: {0} {1}".format(code, result))
+ certchain = [result]
+ if chain:
+ def parse_link_header(line):
+ m = re.search(r"^Link:\s*<([^>]*)>(?:\s*;\s*(.*))?\r\n$", line)
+ return (m.group(1), dict([(a[0],a[1].strip('"'))
+ for a in [attr.split("=")
+ for attr in m.group(2).split("\s*;\s*")]]))
+
+ up = [
+ link for link, attr in [
+ parse_link_header(l) for l in headers.getallmatchingheaders("Link")
+ ] if attr['rel'] == 'up'
+ ]
+ certchain += [urlopen(url).read() for url in up]
+
# return signed certificate!
log.info("Certificate signed!")
- return """-----BEGIN CERTIFICATE-----\n{0}\n-----END CERTIFICATE-----\n""".format(
- "\n".join(textwrap.wrap(base64.b64encode(result).decode('utf8'), 64)))
+ return "".join(["""-----BEGIN CERTIFICATE-----\n{0}\n-----END CERTIFICATE-----\n""".format(
+ "\n".join(textwrap.wrap(base64.b64encode(cert).decode('utf8'), 64))) for cert in certchain])
def main(argv):
parser = argparse.ArgumentParser(
@@ -188,11 +203,19 @@ def main(argv):
parser.add_argument("--acme-dir", required=True, help="path to the .well-known/acme-challenge/ directory")
parser.add_argument("--quiet", action="store_const", const=logging.ERROR, help="suppress output except for errors")
parser.add_argument("--ca", default=DEFAULT_CA, help="certificate authority, default is Let's Encrypt")
+ parser.add_argument("--chain", action="store_true",
+ help="fetch and append intermediate certs to output")
args = parser.parse_args(argv)
LOGGER.setLevel(args.quiet or LOGGER.level)
- signed_crt = get_crt(args.account_key, args.csr, args.acme_dir, log=LOGGER, CA=args.ca)
- sys.stdout.write(signed_crt)
+ try:
+ signed_crt = get_crt(args.account_key, args.csr, args.acme_dir,
+ log=LOGGER, CA=args.ca, chain=args.chain)
+ sys.stdout.write(signed_crt)
+ except Exception as e:
+ #if not args.quiet: raise e
+ LOGGER.error(e)
+ sys.exit(1)
if __name__ == "__main__": # pragma: no cover
main(sys.argv[1:])