Initial
This commit is contained in:
0
libkeyringctl/__init__.py
Normal file
0
libkeyringctl/__init__.py
Normal file
45
libkeyringctl/ci.py
Normal file
45
libkeyringctl/ci.py
Normal file
@ -0,0 +1,45 @@
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
from os import environ
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
from .git import git_changed_files
|
||||
from .util import get_parent_cert_paths
|
||||
from .verify import verify
|
||||
|
||||
|
||||
def ci(working_dir: Path, keyring_root: Path, project_root: Path) -> None:
|
||||
"""Verify certificates against modern expectations using sq-keyring-linter and hokey
|
||||
|
||||
Currently only newly added certificates will be checked against the expectations as existing
|
||||
keys are not all fully compatible with those assumptions.
|
||||
New certificates are determined by using $CI_MERGE_REQUEST_DIFF_BASE_SHA as the base,
|
||||
|
||||
Parameters
|
||||
----------
|
||||
working_dir: A directory to use for temporary files
|
||||
keyring_root: The keyring root directory to look up username shorthand sources
|
||||
project_root: Path to the root of the git repository
|
||||
"""
|
||||
|
||||
ci_merge_request_diff_base = environ.get("CI_MERGE_REQUEST_DIFF_BASE_SHA")
|
||||
created, deleted, modified = git_changed_files(
|
||||
git_path=project_root, base=ci_merge_request_diff_base, paths=[Path("keyring")]
|
||||
)
|
||||
|
||||
changed_certificates: List[Path] = list(get_parent_cert_paths(paths=created + deleted + modified))
|
||||
|
||||
verify(
|
||||
working_dir=working_dir,
|
||||
keyring_root=keyring_root,
|
||||
sources=changed_certificates,
|
||||
lint_hokey=False,
|
||||
lint_sq_keyring=False,
|
||||
)
|
||||
|
||||
added_certificates: List[Path] = [
|
||||
path for path in changed_certificates if (path / f"{path.name}.asc").relative_to(project_root) in created
|
||||
]
|
||||
if added_certificates:
|
||||
verify(working_dir=working_dir, keyring_root=keyring_root, sources=added_certificates)
|
229
libkeyringctl/cli.py
Normal file
229
libkeyringctl/cli.py
Normal file
@ -0,0 +1,229 @@
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
from argparse import ArgumentParser
|
||||
from logging import DEBUG
|
||||
from logging import basicConfig
|
||||
from logging import debug
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
from tempfile import mkdtemp
|
||||
|
||||
from .ci import ci
|
||||
from .keyring import Username
|
||||
from .keyring import build
|
||||
from .keyring import convert
|
||||
from .keyring import export
|
||||
from .keyring import inspect_keyring
|
||||
from .keyring import list_keyring
|
||||
from .types import TrustFilter
|
||||
from .util import absolute_path
|
||||
from .util import cwd
|
||||
from .verify import verify
|
||||
|
||||
parser = ArgumentParser()
|
||||
parser.add_argument(
|
||||
"-v", "--verbose", action="store_true", help="Causes to print debugging messages about the progress"
|
||||
)
|
||||
parser.add_argument("--wait", action="store_true", help="Block before cleaning up the temp directory")
|
||||
parser.add_argument(
|
||||
"-f",
|
||||
"--force",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="force the execution of subcommands (e.g. overwriting of files)",
|
||||
)
|
||||
subcommands = parser.add_subparsers(dest="subcommand")
|
||||
|
||||
convert_parser = subcommands.add_parser(
|
||||
"convert",
|
||||
help="convert one or multiple PGP public keys to a decomposed directory structure",
|
||||
)
|
||||
convert_parser.add_argument("source", type=absolute_path, nargs="+", help="Files or directorie to convert")
|
||||
convert_parser.add_argument("--target", type=absolute_path, help="Target directory instead of a random tmpdir")
|
||||
convert_parser.add_argument(
|
||||
"--name",
|
||||
type=Username,
|
||||
default=None,
|
||||
help="override the username to use (only useful when using a single file as source)",
|
||||
)
|
||||
|
||||
import_parser = subcommands.add_parser(
|
||||
"import",
|
||||
help="import one or several PGP keys to the keyring directory structure",
|
||||
)
|
||||
import_parser.add_argument("source", type=absolute_path, nargs="+", help="Files or directories to import")
|
||||
import_parser.add_argument(
|
||||
"--name",
|
||||
type=Username,
|
||||
default=None,
|
||||
help="override the username to use (only useful when using a single file as source)",
|
||||
)
|
||||
import_parser.add_argument("--main", action="store_true", help="Import a main signing key into the keyring")
|
||||
|
||||
export_parser = subcommands.add_parser(
|
||||
"export",
|
||||
help="export a directory structure of PGP packet data to a combined file",
|
||||
)
|
||||
export_parser.add_argument("-o", "--output", type=absolute_path, help="file to write PGP packet data to")
|
||||
export_parser.add_argument(
|
||||
"source",
|
||||
nargs="*",
|
||||
help="username, fingerprint or directories containing certificates",
|
||||
type=absolute_path,
|
||||
)
|
||||
|
||||
build_parser = subcommands.add_parser(
|
||||
"build",
|
||||
help="build keyring PGP artifacts alongside ownertrust and revoked status files",
|
||||
)
|
||||
|
||||
list_parser = subcommands.add_parser(
|
||||
"list",
|
||||
help="list the certificates in the keyring",
|
||||
)
|
||||
list_parser.add_argument("--main", action="store_true", help="List main signing keys instead of packager keys")
|
||||
list_parser.add_argument(
|
||||
"--trust",
|
||||
choices=[e.value for e in TrustFilter],
|
||||
default=TrustFilter.all.value,
|
||||
help="Filter the list based on trust",
|
||||
)
|
||||
list_parser.add_argument(
|
||||
"source",
|
||||
nargs="*",
|
||||
help="username, fingerprint or directories containing certificates",
|
||||
type=absolute_path,
|
||||
)
|
||||
|
||||
inspect_parser = subcommands.add_parser(
|
||||
"inspect",
|
||||
help="inspect certificates in the keyring and pretty print the data",
|
||||
)
|
||||
inspect_parser.add_argument(
|
||||
"source",
|
||||
nargs="*",
|
||||
help="username, fingerprint or directories containing certificates",
|
||||
type=absolute_path,
|
||||
)
|
||||
|
||||
verify_parser = subcommands.add_parser(
|
||||
"verify",
|
||||
help="verify certificates against modern expectations",
|
||||
)
|
||||
verify_parser.add_argument(
|
||||
"source",
|
||||
nargs="*",
|
||||
help="username, fingerprint or directories containing certificates",
|
||||
type=absolute_path,
|
||||
)
|
||||
verify_parser.add_argument("--no-lint-hokey", dest="lint_hokey", action="store_false", help="Do not run hokey lint")
|
||||
verify_parser.add_argument(
|
||||
"--no-lint-sq-keyring", dest="lint_sq_keyring", action="store_false", help="Do not run sq-keyring-linter"
|
||||
)
|
||||
verify_parser.set_defaults(lint_hokey=True, lint_sq_keyring=True)
|
||||
|
||||
check_parser = subcommands.add_parser(
|
||||
"check",
|
||||
help="Run keyring integrity and consistency checks",
|
||||
)
|
||||
|
||||
ci_parser = subcommands.add_parser(
|
||||
"ci",
|
||||
help="ci command to verify certain aspects and expectations in pipelines",
|
||||
)
|
||||
|
||||
|
||||
def main() -> None: # noqa: ignore=C901
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.verbose:
|
||||
basicConfig(level=DEBUG)
|
||||
|
||||
# temporary working directory that gets auto cleaned
|
||||
with TemporaryDirectory(prefix="arch-keyringctl-") as tempdir:
|
||||
project_root = Path(".").absolute()
|
||||
keyring_root = Path("keyring").absolute()
|
||||
working_dir = Path(tempdir)
|
||||
debug(f"Working directory: {working_dir}")
|
||||
with cwd(working_dir):
|
||||
if "convert" == args.subcommand:
|
||||
target_dir = args.target or Path(mkdtemp(prefix="arch-keyringctl-")).absolute()
|
||||
print(
|
||||
convert(
|
||||
working_dir=working_dir,
|
||||
keyring_root=keyring_root,
|
||||
sources=args.source,
|
||||
target_dir=target_dir,
|
||||
name_override=args.name,
|
||||
)
|
||||
)
|
||||
elif "import" == args.subcommand:
|
||||
target_dir = "main" if args.main else "packager"
|
||||
print(
|
||||
convert(
|
||||
working_dir=working_dir,
|
||||
keyring_root=keyring_root,
|
||||
sources=args.source,
|
||||
target_dir=keyring_root / target_dir,
|
||||
name_override=args.name,
|
||||
)
|
||||
)
|
||||
elif "export" == args.subcommand:
|
||||
result = export(
|
||||
working_dir=working_dir,
|
||||
keyring_root=keyring_root,
|
||||
sources=args.source,
|
||||
output=args.output,
|
||||
)
|
||||
if result:
|
||||
print(
|
||||
result,
|
||||
end="",
|
||||
)
|
||||
elif "build" == args.subcommand:
|
||||
build(
|
||||
working_dir=working_dir,
|
||||
keyring_root=keyring_root,
|
||||
target_dir=keyring_root.parent / "build",
|
||||
)
|
||||
elif "list" == args.subcommand:
|
||||
trust_filter = TrustFilter[args.trust]
|
||||
list_keyring(
|
||||
keyring_root=keyring_root,
|
||||
sources=args.source,
|
||||
main_keys=args.main,
|
||||
trust_filter=trust_filter,
|
||||
)
|
||||
elif "inspect" == args.subcommand:
|
||||
print(
|
||||
inspect_keyring(
|
||||
working_dir=working_dir,
|
||||
keyring_root=keyring_root,
|
||||
sources=args.source,
|
||||
),
|
||||
end="",
|
||||
)
|
||||
elif "verify" == args.subcommand:
|
||||
verify(
|
||||
working_dir=working_dir,
|
||||
keyring_root=keyring_root,
|
||||
sources=args.source,
|
||||
lint_hokey=args.lint_hokey,
|
||||
lint_sq_keyring=args.lint_sq_keyring,
|
||||
)
|
||||
elif "ci" == args.subcommand:
|
||||
ci(working_dir=working_dir, keyring_root=keyring_root, project_root=project_root)
|
||||
elif "check" == args.subcommand:
|
||||
verify(
|
||||
working_dir=working_dir,
|
||||
keyring_root=keyring_root,
|
||||
sources=[keyring_root],
|
||||
lint_hokey=False,
|
||||
lint_sq_keyring=False,
|
||||
)
|
||||
else:
|
||||
parser.print_help()
|
||||
|
||||
if args.wait:
|
||||
print("Press [ENTER] to continue")
|
||||
input()
|
55
libkeyringctl/git.py
Normal file
55
libkeyringctl/git.py
Normal file
@ -0,0 +1,55 @@
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
|
||||
from .util import system
|
||||
|
||||
|
||||
def git_changed_files(
|
||||
git_path: Optional[Path] = None, base: Optional[str] = None, paths: Optional[List[Path]] = None
|
||||
) -> Tuple[List[Path], List[Path], List[Path]]:
|
||||
"""Returns lists of created, deleted and modified files based on diff stats related to a base commit
|
||||
and optional paths.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
git_path: Path to the git repository, current directory by default
|
||||
base: Optional base rev or current index by default
|
||||
paths: Optional list of paths to take into account, unfiltered by default
|
||||
|
||||
Returns
|
||||
-------
|
||||
Lists of created, deleted and modified paths
|
||||
"""
|
||||
cmd = ["git"]
|
||||
if git_path:
|
||||
cmd += ["-C", str(git_path)]
|
||||
cmd += ["--no-pager", "diff", "--color=never", "--summary", "--numstat"]
|
||||
if base:
|
||||
cmd += [base]
|
||||
if paths:
|
||||
cmd += ["--"]
|
||||
cmd += [str(path) for path in paths]
|
||||
|
||||
result: str = system(cmd)
|
||||
|
||||
created: List[Path] = []
|
||||
deleted: List[Path] = []
|
||||
modified: List[Path] = []
|
||||
|
||||
for line in result.splitlines():
|
||||
line = line.strip()
|
||||
if line.startswith("create"):
|
||||
created.append(Path(line.split(maxsplit=3)[3]))
|
||||
continue
|
||||
if line.startswith("delete"):
|
||||
deleted.append(Path(line.split(maxsplit=3)[3]))
|
||||
continue
|
||||
modified.append(Path(line.split(maxsplit=2)[2]))
|
||||
|
||||
modified = [path for path in modified if path not in created and path not in deleted]
|
||||
|
||||
return created, deleted, modified
|
1262
libkeyringctl/keyring.py
Normal file
1262
libkeyringctl/keyring.py
Normal file
File diff suppressed because it is too large
Load Diff
363
libkeyringctl/sequoia.py
Normal file
363
libkeyringctl/sequoia.py
Normal file
@ -0,0 +1,363 @@
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
from collections import deque
|
||||
from datetime import datetime
|
||||
from functools import reduce
|
||||
from pathlib import Path
|
||||
from platform import python_version_tuple
|
||||
from re import sub
|
||||
from tempfile import mkdtemp
|
||||
from typing import Dict
|
||||
|
||||
# NOTE: remove after python 3.8.x is no longer supported upstream
|
||||
if int(python_version_tuple()[1]) < 9: # pragma: no cover
|
||||
from typing import Iterable
|
||||
else:
|
||||
from collections.abc import Iterable
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
|
||||
from .types import Fingerprint
|
||||
from .types import PacketKind
|
||||
from .types import Uid
|
||||
from .types import Username
|
||||
from .util import cwd
|
||||
from .util import natural_sort_path
|
||||
from .util import system
|
||||
|
||||
|
||||
def keyring_split(working_dir: Path, keyring: Path, preserve_filename: bool = False) -> Iterable[Path]:
|
||||
"""Split a file containing a PGP keyring into separate certificate files
|
||||
|
||||
The original keyring filename is preserved if the split only yields a single certificate.
|
||||
If preserve_filename is True, all keyrings are placed into separate directories while preserving
|
||||
the filename.
|
||||
|
||||
The file is split using sq.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
working_dir: The path of the working directory below which to create the output files
|
||||
keyring: The path of a file containing a PGP keyring
|
||||
preserve_filename: If True, all keyrings are placed into separate directories while preserving the filename
|
||||
|
||||
Returns
|
||||
-------
|
||||
An iterable over the naturally sorted list of certificate files derived from a keyring
|
||||
"""
|
||||
|
||||
keyring_dir = Path(mkdtemp(dir=working_dir, prefix="keyring-")).absolute()
|
||||
|
||||
with cwd(keyring_dir):
|
||||
system(["sq", "keyring", "split", "--prefix", "", str(keyring)])
|
||||
|
||||
keyrings: List[Path] = list(natural_sort_path(keyring_dir.iterdir()))
|
||||
|
||||
if 1 == len(keyrings) or preserve_filename:
|
||||
for index, key in enumerate(keyrings):
|
||||
keyring_sub_dir = Path(mkdtemp(dir=keyring_dir, prefix=f"{keyring.name}-")).absolute()
|
||||
keyrings[index] = key.rename(keyring_sub_dir / keyring.name)
|
||||
|
||||
return keyrings
|
||||
|
||||
|
||||
def keyring_merge(certificates: List[Path], output: Optional[Path] = None, force: bool = False) -> str:
|
||||
"""Merge multiple certificates into a keyring
|
||||
|
||||
Parameters
|
||||
----------
|
||||
certificates: List of paths to certificates to merge into a keyring
|
||||
output: Path to a file which the keyring is written, return the result instead if None
|
||||
force: Whether to force overwriting existing files (defaults to False)
|
||||
|
||||
Returns
|
||||
-------
|
||||
The result if no output file has been used
|
||||
"""
|
||||
|
||||
cmd = ["sq", "keyring", "merge"]
|
||||
if force:
|
||||
cmd.insert(1, "--force")
|
||||
if output:
|
||||
cmd += ["--output", str(output)]
|
||||
cmd += [str(cert) for cert in sorted(certificates)]
|
||||
|
||||
return system(cmd)
|
||||
|
||||
|
||||
def packet_split(working_dir: Path, certificate: Path) -> Iterable[Path]:
|
||||
"""Split a file containing a PGP certificate into separate packet files
|
||||
|
||||
The files are split using sq
|
||||
|
||||
Parameters
|
||||
----------
|
||||
working_dir: The path of the working directory below which to create the output files
|
||||
certificate: The absolute path of a file containing one PGP certificate
|
||||
|
||||
Returns
|
||||
-------
|
||||
An iterable over the naturally sorted list of packet files derived from certificate
|
||||
"""
|
||||
|
||||
packet_dir = Path(mkdtemp(dir=working_dir, prefix="packet-")).absolute()
|
||||
|
||||
with cwd(packet_dir):
|
||||
system(["sq", "packet", "split", "--prefix", "", str(certificate)])
|
||||
return natural_sort_path(packet_dir.iterdir())
|
||||
|
||||
|
||||
def packet_join(packets: List[Path], output: Optional[Path] = None, force: bool = False) -> str:
|
||||
"""Join PGP packet data in files to a single output file
|
||||
|
||||
Parameters
|
||||
----------
|
||||
packets: A list of paths to files that contain PGP packet data
|
||||
output: Path to a file to which all PGP packet data is written, return the result instead if None
|
||||
force: Whether to force overwriting existing files (defaults to False)
|
||||
|
||||
Returns
|
||||
-------
|
||||
The result if no output file has been used
|
||||
"""
|
||||
|
||||
cmd = ["sq", "packet", "join"]
|
||||
if force:
|
||||
cmd.insert(1, "--force")
|
||||
packets_str = list(map(lambda path: str(path), packets))
|
||||
cmd.extend(packets_str)
|
||||
cmd.extend(["--output", str(output)])
|
||||
return system(cmd)
|
||||
|
||||
|
||||
def inspect(
|
||||
packet: Path, certifications: bool = True, fingerprints: Optional[Dict[Fingerprint, Username]] = None
|
||||
) -> str:
|
||||
"""Inspect PGP packet data and return the result
|
||||
|
||||
Parameters
|
||||
----------
|
||||
packet: Path to a file that contain PGP data
|
||||
certifications: Whether to print third-party certifications
|
||||
fingerprints: Optional dict of fingerprints to usernames to enrich the output with
|
||||
|
||||
Returns
|
||||
-------
|
||||
The result of the inspection
|
||||
"""
|
||||
|
||||
cmd = ["sq", "inspect"]
|
||||
if certifications:
|
||||
cmd.append("--certifications")
|
||||
cmd.append(str(packet))
|
||||
result: str = system(cmd)
|
||||
|
||||
if fingerprints:
|
||||
for fingerprint, username in fingerprints.items():
|
||||
result = sub(f"{fingerprint}", f"{fingerprint} {username}", result)
|
||||
result = sub(f" {fingerprint[24:]}", f" {fingerprint[24:]} {username}", result)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def packet_dump(packet: Path) -> str:
|
||||
"""Dump a PGP packet to string
|
||||
|
||||
The `sq packet dump` command is used to retrieve a dump of information from a PGP packet
|
||||
|
||||
Parameters
|
||||
----------
|
||||
packet: The path to the PGP packet to retrieve the value from
|
||||
|
||||
Returns
|
||||
-------
|
||||
The contents of the packet dump
|
||||
"""
|
||||
|
||||
return system(["sq", "packet", "dump", str(packet)])
|
||||
|
||||
|
||||
def packet_dump_field(packet: Path, query: str) -> str:
|
||||
"""Retrieve the value of a field from a PGP packet
|
||||
|
||||
Field queries are possible with the following notation during tree traversal:
|
||||
- Use '.' to separate the parent section
|
||||
- Use '*' as a wildcard for the current section
|
||||
- Use '|' inside the current level as a logical OR
|
||||
|
||||
Example:
|
||||
- Version
|
||||
- Hashed area|Unhashed area.Issuer
|
||||
- *.Issuer
|
||||
|
||||
Parameters
|
||||
----------
|
||||
packet: The path to the PGP packet to retrieve the value from
|
||||
query: The name of the field as a query notation
|
||||
|
||||
Raises
|
||||
------
|
||||
Exception: If the field is not found in the PGP packet
|
||||
|
||||
Returns
|
||||
-------
|
||||
The value of the field found in packet
|
||||
"""
|
||||
|
||||
dump = packet_dump(packet)
|
||||
|
||||
queries = deque(query.split("."))
|
||||
path = [queries.popleft()]
|
||||
depth = 0
|
||||
|
||||
# remove leading 4 space indention
|
||||
lines = list(filter(lambda line: line.startswith(" "), dump.splitlines()))
|
||||
lines = [sub(r"^ {4}", "", line, count=1) for line in lines]
|
||||
# filter empty lines
|
||||
lines = list(filter(lambda line: line.strip(), lines))
|
||||
|
||||
for line in lines:
|
||||
# determine current line depth by counting whitespace pairs
|
||||
depth_line = int((len(line) - len(line.lstrip(" "))) / 2)
|
||||
line = line.lstrip(" ")
|
||||
|
||||
# skip nodes that are deeper as our currently matched path
|
||||
if depth < depth_line:
|
||||
continue
|
||||
|
||||
# unwind the current query path until reaching previous match depth
|
||||
while depth > depth_line:
|
||||
queries.appendleft(path.pop())
|
||||
depth -= 1
|
||||
matcher = path[-1].split("|")
|
||||
|
||||
# check if current field matches the query expression
|
||||
field = line.split(sep=":", maxsplit=1)[0]
|
||||
if field not in matcher and "*" not in matcher:
|
||||
continue
|
||||
|
||||
# next depth is one level deeper as the current line
|
||||
depth = depth_line + 1
|
||||
|
||||
# check if matcher is not the leaf of the query expression
|
||||
if queries:
|
||||
path.append(queries.popleft())
|
||||
continue
|
||||
|
||||
# return final match
|
||||
return line.split(sep=": ", maxsplit=1)[1] if ": " in line else line
|
||||
|
||||
raise Exception(f"Packet '{packet}' did not match the query '{query}'")
|
||||
|
||||
|
||||
def packet_signature_creation_time(packet: Path) -> datetime:
|
||||
"""Retrieve the signature creation time field as datetime
|
||||
|
||||
Parameters
|
||||
----------
|
||||
packet: The path to the PGP packet to retrieve the value from
|
||||
|
||||
Returns
|
||||
-------
|
||||
The signature creation time as datetime
|
||||
"""
|
||||
field = packet_dump_field(packet, "Hashed area.Signature creation time")
|
||||
field = " ".join(field.split(" ", 3)[0:3])
|
||||
return datetime.strptime(field, "%Y-%m-%d %H:%M:%S %Z")
|
||||
|
||||
|
||||
def packet_kinds(packet: Path) -> List[PacketKind]:
|
||||
"""Retrieve the PGP packet types of a packet path
|
||||
|
||||
Parameters
|
||||
----------
|
||||
packet: The path to the PGP packet to retrieve the kind of
|
||||
|
||||
Returns
|
||||
-------
|
||||
The kind of PGP packet
|
||||
"""
|
||||
|
||||
dump = packet_dump(packet)
|
||||
lines = [line for line in dump.splitlines()]
|
||||
lines = list(
|
||||
filter(lambda line: not line.startswith(" ") and not line.startswith("WARNING") and line.strip(), lines)
|
||||
)
|
||||
return [PacketKind(line.split()[0]) for line in lines]
|
||||
|
||||
|
||||
def latest_certification(certifications: Iterable[Path]) -> Path:
|
||||
"""Returns the latest certification based on the signature creation time from a list of packets.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
certifications: List of certification from which to choose the latest from
|
||||
|
||||
Returns
|
||||
-------
|
||||
The latest certification from a list of packets
|
||||
"""
|
||||
return reduce(
|
||||
lambda a, b: a if packet_signature_creation_time(a) > packet_signature_creation_time(b) else b,
|
||||
certifications,
|
||||
)
|
||||
|
||||
|
||||
def key_generate(uids: List[Uid], outfile: Path) -> str:
|
||||
"""Generate a PGP key with specific uids
|
||||
|
||||
Parameters
|
||||
----------
|
||||
uids: List of uids that the key should have
|
||||
outfile: Path to the file to which the key should be written to
|
||||
|
||||
Returns
|
||||
-------
|
||||
The result of the key generate call
|
||||
"""
|
||||
|
||||
cmd = ["sq", "key", "generate"]
|
||||
for uid in uids:
|
||||
cmd.extend(["--userid", str(uid)])
|
||||
cmd.extend(["--export", str(outfile)])
|
||||
return system(cmd)
|
||||
|
||||
|
||||
def key_extract_certificate(key: Path, output: Optional[Path]) -> str:
|
||||
"""Extracts the non secret part from a key into a certificate
|
||||
|
||||
Parameters
|
||||
----------
|
||||
key: Path to a file that contain secret key material
|
||||
output: Path to the file to which the key should be written to, stdout if None
|
||||
|
||||
Returns
|
||||
-------
|
||||
The result of the extract in case output is None
|
||||
"""
|
||||
|
||||
cmd = ["sq", "key", "extract-cert", str(key)]
|
||||
if output:
|
||||
cmd.extend(["--output", str(output)])
|
||||
return system(cmd)
|
||||
|
||||
|
||||
def certify(key: Path, certificate: Path, uid: Uid, output: Optional[Path]) -> str:
|
||||
"""Inspect PGP packet data and return the result
|
||||
|
||||
Parameters
|
||||
----------
|
||||
key: Path to a file that contain secret key material
|
||||
certificate: Path to a certificate file whose uid should be certified
|
||||
uid: Uid contain in the certificate that should be certified
|
||||
output: Path to the file to which the key should be written to, stdout if None
|
||||
|
||||
Returns
|
||||
-------
|
||||
The result of the certification in case output is None
|
||||
"""
|
||||
|
||||
cmd = ["sq", "certify", str(key), str(certificate), uid]
|
||||
if output:
|
||||
cmd.extend(["--output", str(output)])
|
||||
return system(cmd)
|
273
libkeyringctl/trust.py
Normal file
273
libkeyringctl/trust.py
Normal file
@ -0,0 +1,273 @@
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
from logging import debug
|
||||
from pathlib import Path
|
||||
from typing import Dict
|
||||
from typing import Iterable
|
||||
from typing import Optional
|
||||
from typing import Set
|
||||
|
||||
from .types import Color
|
||||
from .types import Fingerprint
|
||||
from .types import Trust
|
||||
from .types import TrustFilter
|
||||
from .types import Uid
|
||||
from .util import contains_fingerprint
|
||||
from .util import get_cert_paths
|
||||
from .util import get_fingerprint_from_partial
|
||||
|
||||
|
||||
def certificate_trust_from_paths(
|
||||
sources: Iterable[Path], main_keys: Set[Fingerprint], all_fingerprints: Set[Fingerprint]
|
||||
) -> Dict[Fingerprint, Trust]:
|
||||
"""Get the trust status of all certificates in a list of paths given by main keys.
|
||||
|
||||
Uses `get_get_certificate_trust` to determine the trust status.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
sources: Certificates to acquire the trust status from
|
||||
main_keys: Fingerprints of trusted keys used to calculate the trust of the certificates from sources
|
||||
all_fingerprints: Fingerprints of all certificates, packager and main, to look up key-ids to full fingerprints
|
||||
|
||||
Returns
|
||||
-------
|
||||
A dictionary of fingerprints and their trust level
|
||||
"""
|
||||
|
||||
sources = get_cert_paths(sources)
|
||||
certificate_trusts: Dict[Fingerprint, Trust] = {}
|
||||
|
||||
for certificate in sorted(sources):
|
||||
fingerprint = Fingerprint(certificate.name)
|
||||
certificate_trusts[fingerprint] = certificate_trust(
|
||||
certificate=certificate, main_keys=main_keys, all_fingerprints=all_fingerprints
|
||||
)
|
||||
return certificate_trusts
|
||||
|
||||
|
||||
def certificate_trust( # noqa: ignore=C901
|
||||
certificate: Path, main_keys: Set[Fingerprint], all_fingerprints: Set[Fingerprint]
|
||||
) -> Trust:
|
||||
"""Get the trust status of a certificates given by main keys.
|
||||
|
||||
main certificates are:
|
||||
revoked if:
|
||||
- the certificate has been self-revoked (also applies to 3rd party applied revocation certificates)
|
||||
full trust if:
|
||||
- the certificate is not self-revoked
|
||||
|
||||
regular certificates are:
|
||||
full trust if:
|
||||
- the certificate is not self-revoked and:
|
||||
- any uid contains at least 3 non revoked main key signatures
|
||||
marginal trust if:
|
||||
- the certificate is not self-revoked and:
|
||||
- any uid contains at least 1 but less than 3 non revoked main key signatures
|
||||
- no uid contains at least 3 non revoked main key signatures
|
||||
unknown trust if:
|
||||
- the certificate is not self-revoked and:
|
||||
- no uid contains any non revoked main key signature
|
||||
revoked if:
|
||||
- the certificate has been self-revoked, or
|
||||
- no uid contains at least 3 non revoked main key signatures and:
|
||||
- any uid contains at least 1 revoked main key signature
|
||||
|
||||
Parameters
|
||||
----------
|
||||
certificate: Certificate to acquire the trust status from
|
||||
main_keys: Fingerprints of trusted keys used to calculate the trust of the certificates from sources
|
||||
all_fingerprints: Fingerprints of all certificates, packager and main, to look up key-ids to full fingerprints
|
||||
|
||||
Returns
|
||||
-------
|
||||
Trust level of the certificate
|
||||
"""
|
||||
|
||||
fingerprint: Fingerprint = Fingerprint(certificate.name)
|
||||
keyring_root = certificate.parent.parent.parent
|
||||
|
||||
# collect revoked main keys
|
||||
main_keys_revoked: Set[Fingerprint] = set()
|
||||
for main_key in main_keys:
|
||||
for revocation in keyring_root.glob(f"main/*/{main_key}/revocation/*.asc"):
|
||||
if main_key.endswith(revocation.stem):
|
||||
main_keys_revoked.add(main_key)
|
||||
|
||||
revocations: Set[Fingerprint] = set()
|
||||
# TODO: what about direct key revocations/signatures?
|
||||
for revocation in certificate.glob("revocation/*.asc"):
|
||||
issuer: Optional[Fingerprint] = get_fingerprint_from_partial(all_fingerprints, Fingerprint(revocation.stem))
|
||||
if not issuer:
|
||||
raise Exception(f"Unknown issuer: {issuer}")
|
||||
if not fingerprint.endswith(issuer):
|
||||
raise Exception(f"Wrong root revocation issuer: {issuer}, expected: {fingerprint}")
|
||||
debug(f"Revoking {fingerprint} due to self-revocation")
|
||||
revocations.add(fingerprint)
|
||||
|
||||
if revocations:
|
||||
return Trust.revoked
|
||||
|
||||
# main keys are either trusted or revoked
|
||||
is_main_certificate = contains_fingerprint(fingerprints=main_keys, fingerprint=fingerprint)
|
||||
if is_main_certificate:
|
||||
return Trust.full
|
||||
|
||||
uid_trust: Dict[Uid, Trust] = {}
|
||||
self_revoked_uids: Set[Uid] = set()
|
||||
uids = certificate / "uid"
|
||||
for uid_path in uids.iterdir():
|
||||
uid: Uid = Uid(uid_path.name)
|
||||
|
||||
revocations = set()
|
||||
for revocation in uid_path.glob("revocation/*.asc"):
|
||||
issuer = get_fingerprint_from_partial(all_fingerprints, Fingerprint(revocation.stem))
|
||||
if not issuer:
|
||||
raise Exception(f"Unknown issuer: {issuer}")
|
||||
# self revocation
|
||||
if fingerprint.endswith(issuer):
|
||||
self_revoked_uids.add(uid)
|
||||
# main key revocation
|
||||
elif contains_fingerprint(fingerprints=main_keys, fingerprint=issuer):
|
||||
revocations.add(issuer)
|
||||
|
||||
certifications: Set[Fingerprint] = set()
|
||||
for certification in uid_path.glob("certification/*.asc"):
|
||||
issuer = get_fingerprint_from_partial(all_fingerprints, Fingerprint(certification.stem))
|
||||
if not issuer:
|
||||
raise Exception(f"Unknown issuer: {issuer}")
|
||||
# only take main key certifications into account
|
||||
if not contains_fingerprint(fingerprints=main_keys, fingerprint=issuer):
|
||||
continue
|
||||
# do not care about revoked main keys
|
||||
if contains_fingerprint(fingerprints=main_keys_revoked, fingerprint=issuer):
|
||||
continue
|
||||
# do not care about certifications that are revoked
|
||||
if contains_fingerprint(fingerprints=revocations, fingerprint=issuer):
|
||||
continue
|
||||
certifications.add(issuer)
|
||||
|
||||
# self revoked uid
|
||||
if uid in self_revoked_uids:
|
||||
debug(f"Certificate {fingerprint} with uid {uid} is self-revoked")
|
||||
uid_trust[uid] = Trust.revoked
|
||||
continue
|
||||
|
||||
# full trust
|
||||
if len(certifications) >= 3:
|
||||
uid_trust[uid] = Trust.full
|
||||
continue
|
||||
|
||||
# no full trust and contains revocations
|
||||
if revocations:
|
||||
uid_trust[uid] = Trust.revoked
|
||||
continue
|
||||
|
||||
# marginal trust
|
||||
if certifications:
|
||||
uid_trust[uid] = Trust.marginal
|
||||
continue
|
||||
|
||||
# no trust
|
||||
uid_trust[uid] = Trust.unknown
|
||||
|
||||
for uid, uid_trust_status in uid_trust.items():
|
||||
debug(f"Certificate {fingerprint} with uid {uid} has trust level: {uid_trust_status.name}")
|
||||
|
||||
trust: Trust
|
||||
# any uid has full trust
|
||||
if any(map(lambda t: Trust.full == t, uid_trust.values())):
|
||||
trust = Trust.full
|
||||
# no uid has full trust but at least one is revoked
|
||||
elif any(map(lambda e: Trust.revoked == e[1] and e[0] not in self_revoked_uids, uid_trust.items())):
|
||||
trust = Trust.revoked
|
||||
# no uid has full trust or is revoked
|
||||
elif any(map(lambda t: Trust.marginal == t, uid_trust.values())):
|
||||
trust = Trust.marginal
|
||||
else:
|
||||
trust = Trust.unknown
|
||||
|
||||
debug(f"Certificate {fingerprint} has trust level: {trust.name}")
|
||||
return trust
|
||||
|
||||
|
||||
def trust_icon(trust: Trust) -> str:
|
||||
"""Returns a single character icon representing the passed trust status
|
||||
|
||||
Parameters
|
||||
----------
|
||||
trust: The trust to get an icon for
|
||||
|
||||
Returns
|
||||
-------
|
||||
The single character icon representing the passed trust status
|
||||
"""
|
||||
if trust == Trust.revoked:
|
||||
return "✗"
|
||||
if trust == Trust.unknown:
|
||||
return "~"
|
||||
if trust == Trust.marginal:
|
||||
return "~"
|
||||
if trust == Trust.full:
|
||||
return "✓"
|
||||
return "?"
|
||||
|
||||
|
||||
def trust_color(trust: Trust) -> Color:
|
||||
"""Returns a color representing the passed trust status
|
||||
|
||||
Parameters
|
||||
----------
|
||||
trust: The trust to get the color of
|
||||
|
||||
Returns
|
||||
-------
|
||||
The color representing the passed trust status
|
||||
"""
|
||||
color: Color = Color.RED
|
||||
if trust == Trust.revoked:
|
||||
color = Color.RED
|
||||
if trust == Trust.unknown:
|
||||
color = Color.YELLOW
|
||||
if trust == Trust.marginal:
|
||||
color = Color.YELLOW
|
||||
if trust == Trust.full:
|
||||
color = Color.GREEN
|
||||
return color
|
||||
|
||||
|
||||
def format_trust_label(trust: Trust) -> str:
|
||||
"""Formats a given trust status to a text label including color and icon.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
trust: The trust to get the label for
|
||||
|
||||
Returns
|
||||
-------
|
||||
Text label representing the trust status as literal and icon with colors
|
||||
"""
|
||||
return f"{trust_color(trust).value}{trust_icon(trust)} {trust.name}{Color.RST.value}"
|
||||
|
||||
|
||||
def filter_by_trust(trust: Trust, trust_filter: TrustFilter) -> bool:
|
||||
"""Filters a trust by a given filter and returns true if within the rules
|
||||
|
||||
Parameters
|
||||
----------
|
||||
trust: Trust to check for being filtered
|
||||
trust_filter: Filter rules to check the trust against
|
||||
|
||||
Returns
|
||||
-------
|
||||
True if the given trust is within the filter rules
|
||||
"""
|
||||
trust_map = {
|
||||
TrustFilter.unknown: [Trust.unknown],
|
||||
TrustFilter.marginal: [Trust.marginal],
|
||||
TrustFilter.full: [Trust.full],
|
||||
TrustFilter.revoked: [Trust.revoked],
|
||||
TrustFilter.unrevoked: [Trust.unknown, Trust.marginal, Trust.full],
|
||||
TrustFilter.all: [Trust.revoked, Trust.unknown, Trust.marginal, Trust.full],
|
||||
}
|
||||
return trust in trust_map[trust_filter]
|
38
libkeyringctl/types.py
Normal file
38
libkeyringctl/types.py
Normal file
@ -0,0 +1,38 @@
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
from enum import Enum
|
||||
from enum import auto
|
||||
from typing import NewType
|
||||
|
||||
Fingerprint = NewType("Fingerprint", str)
|
||||
Uid = NewType("Uid", str)
|
||||
Username = NewType("Username", str)
|
||||
PacketKind = NewType("PacketKind", str)
|
||||
|
||||
|
||||
class Trust(Enum):
|
||||
unknown = auto
|
||||
revoked = auto()
|
||||
marginal = auto()
|
||||
full = auto()
|
||||
|
||||
|
||||
class TrustFilter(Enum):
|
||||
unknown = "unknown"
|
||||
revoked = "revoked"
|
||||
marginal = "marginal"
|
||||
full = "full"
|
||||
unrevoked = "unrevoked"
|
||||
all = "all"
|
||||
|
||||
|
||||
TRUST_MAX_LENGTH: int = max([len(e.name) for e in Trust])
|
||||
|
||||
|
||||
class Color(Enum):
|
||||
RED = "\033[31m"
|
||||
GREEN = "\033[32m"
|
||||
YELLOW = "\033[33m"
|
||||
RST = "\033[0m"
|
||||
BOLD = "\033[1m"
|
||||
UNDERLINE = "\033[4m"
|
341
libkeyringctl/util.py
Normal file
341
libkeyringctl/util.py
Normal file
@ -0,0 +1,341 @@
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
from contextlib import contextmanager
|
||||
from hashlib import sha256
|
||||
from os import chdir
|
||||
from os import environ
|
||||
from os import getcwd
|
||||
from pathlib import Path
|
||||
from platform import python_version_tuple
|
||||
from re import escape
|
||||
from re import split
|
||||
from re import sub
|
||||
from string import ascii_letters
|
||||
from string import digits
|
||||
from subprocess import STDOUT
|
||||
from subprocess import CalledProcessError
|
||||
from subprocess import check_output
|
||||
from sys import exit
|
||||
from sys import stderr
|
||||
from tempfile import mkstemp
|
||||
from traceback import print_stack
|
||||
from typing import IO
|
||||
from typing import AnyStr
|
||||
from typing import Dict
|
||||
|
||||
# NOTE: remove after python 3.8.x is no longer supported upstream
|
||||
if int(python_version_tuple()[1]) < 9: # pragma: no cover
|
||||
from typing import Iterable
|
||||
from typing import Iterator
|
||||
else:
|
||||
from collections.abc import Iterable
|
||||
from collections.abc import Iterator
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Set
|
||||
from typing import Union
|
||||
|
||||
from libkeyringctl.types import Fingerprint
|
||||
from libkeyringctl.types import Trust
|
||||
from libkeyringctl.types import Uid
|
||||
|
||||
|
||||
@contextmanager
|
||||
def cwd(new_dir: Path) -> Iterator[None]:
|
||||
"""Change to a new current working directory in a context and go back to the previous dir after the context is done
|
||||
|
||||
Parameters
|
||||
----------
|
||||
new_dir: A path to change to
|
||||
"""
|
||||
|
||||
previous_dir = getcwd()
|
||||
chdir(new_dir)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
chdir(previous_dir)
|
||||
|
||||
|
||||
def natural_sort_path(_list: Iterable[Path]) -> Iterable[Path]:
|
||||
"""Sort an Iterable of Paths naturally
|
||||
|
||||
Parameters
|
||||
----------
|
||||
_list: An iterable containing paths to be sorted
|
||||
|
||||
Return
|
||||
------
|
||||
An Iterable of paths that are naturally sorted
|
||||
"""
|
||||
|
||||
def convert_text_chunk(text: str) -> Union[int, str]:
|
||||
"""Convert input text to int or str
|
||||
|
||||
Parameters
|
||||
----------
|
||||
text: An input string
|
||||
|
||||
Returns
|
||||
-------
|
||||
Either an integer if text is a digit, else text in lower-case representation
|
||||
"""
|
||||
|
||||
return int(text) if text.isdigit() else text.lower()
|
||||
|
||||
def alphanum_key(key: Path) -> List[Union[int, str]]:
|
||||
"""Retrieve an alphanumeric key from a Path, that can be used in sorted()
|
||||
|
||||
Parameters
|
||||
----------
|
||||
key: A path for which to create a key
|
||||
|
||||
Returns
|
||||
-------
|
||||
A list of either int or str objects that may serve as 'key' argument for sorted()
|
||||
"""
|
||||
|
||||
return [convert_text_chunk(c) for c in split("([0-9]+)", str(key.name))]
|
||||
|
||||
return sorted(_list, key=alphanum_key)
|
||||
|
||||
|
||||
def system(
|
||||
cmd: List[str],
|
||||
_stdin: Optional[IO[AnyStr]] = None,
|
||||
exit_on_error: bool = False,
|
||||
env: Optional[Dict[str, str]] = None,
|
||||
) -> str:
|
||||
"""Execute a command using check_output
|
||||
|
||||
Parameters
|
||||
----------
|
||||
cmd: A list of strings to be fed to check_output
|
||||
_stdin: input fd used for the spawned process
|
||||
exit_on_error: Whether to exit the script when encountering an error (defaults to False)
|
||||
env: Optional environment vars for the shell invocation
|
||||
|
||||
Raises
|
||||
------
|
||||
CalledProcessError: If not exit_on_error and `check_output()` encounters an error
|
||||
|
||||
Returns
|
||||
-------
|
||||
The output of cmd
|
||||
"""
|
||||
if not env:
|
||||
env = {"HOME": environ["HOME"], "PATH": environ["PATH"], "LANG": "en_US.UTF-8"}
|
||||
|
||||
try:
|
||||
return check_output(cmd, stderr=STDOUT, stdin=_stdin, env=env).decode()
|
||||
except CalledProcessError as e:
|
||||
stderr.buffer.write(e.stdout)
|
||||
print_stack()
|
||||
if exit_on_error:
|
||||
exit(e.returncode)
|
||||
raise e
|
||||
|
||||
|
||||
def absolute_path(path: str) -> Path:
|
||||
"""Return the absolute path of a given str
|
||||
|
||||
Parameters
|
||||
----------
|
||||
path: A string representing a path
|
||||
|
||||
Returns
|
||||
-------
|
||||
The absolute path representation of path
|
||||
"""
|
||||
|
||||
return Path(path).absolute()
|
||||
|
||||
|
||||
def transform_fd_to_tmpfile(working_dir: Path, sources: List[Path]) -> None:
|
||||
"""Transforms an input list of paths from any file descriptor of the current process to a tempfile in working_dir.
|
||||
|
||||
Using this function on fd inputs allow to pass the content to another process while hidepid is active and /proc
|
||||
not visible for the other process.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
working_dir: A directory to use for temporary files
|
||||
sources: Paths that should be iterated and all fd's transformed to tmpfiles
|
||||
"""
|
||||
for index, source in enumerate(sources):
|
||||
source_str = str(source)
|
||||
if source_str.startswith("/proc/self/fd/") or source_str.startswith("/dev/fd/"):
|
||||
file = mkstemp(dir=working_dir, prefix=f"{source.name}", suffix=".fd")[1]
|
||||
with open(file, mode="wb") as f:
|
||||
f.write(source.read_bytes())
|
||||
f.flush()
|
||||
sources[index] = Path(file)
|
||||
|
||||
|
||||
def get_cert_paths(paths: Iterable[Path]) -> Set[Path]:
|
||||
"""Walks a list of paths and resolves all discovered certificate paths
|
||||
|
||||
Parameters
|
||||
----------
|
||||
paths: A list of paths to walk and resolve to certificate paths.
|
||||
|
||||
Returns
|
||||
-------
|
||||
A set of paths to certificates
|
||||
"""
|
||||
|
||||
# depth first search certificate paths
|
||||
cert_paths: Set[Path] = set()
|
||||
visit: List[Path] = list(paths)
|
||||
while visit:
|
||||
path = visit.pop()
|
||||
# this level contains a certificate, abort depth search
|
||||
if list(path.glob("*.asc")):
|
||||
cert_paths.add(path)
|
||||
continue
|
||||
visit.extend([path for path in path.iterdir() if path.is_dir()])
|
||||
return cert_paths
|
||||
|
||||
|
||||
def get_parent_cert_paths(paths: Iterable[Path]) -> Set[Path]:
|
||||
"""Walks a list of paths upwards and resolves all discovered parent certificate paths
|
||||
|
||||
Parameters
|
||||
----------
|
||||
paths: A list of paths to walk and resolve to certificate paths.
|
||||
|
||||
Returns
|
||||
-------
|
||||
A set of paths to certificates
|
||||
"""
|
||||
|
||||
# depth first search certificate paths
|
||||
cert_paths: Set[Path] = set()
|
||||
visit: List[Path] = list(paths)
|
||||
while visit:
|
||||
node = visit.pop().parent
|
||||
# this level contains a certificate, abort depth search
|
||||
if "keyring" == node.parent.parent.parent.name:
|
||||
cert_paths.add(node)
|
||||
continue
|
||||
visit.append(node)
|
||||
return cert_paths
|
||||
|
||||
|
||||
def contains_fingerprint(fingerprints: Iterable[Fingerprint], fingerprint: Fingerprint) -> bool:
|
||||
"""Returns weather an iterable structure of fingerprints contains a specific fingerprint
|
||||
|
||||
Parameters
|
||||
----------
|
||||
fingerprints: Iteratable structure of fingerprints that should be searched
|
||||
fingerprint: Fingerprint to search for
|
||||
|
||||
Returns
|
||||
-------
|
||||
Weather an iterable structure of fingerprints contains a specific fingerprint
|
||||
"""
|
||||
|
||||
return any(filter(lambda e: str(e).endswith(fingerprint), fingerprints))
|
||||
|
||||
|
||||
def get_fingerprint_from_partial(
|
||||
fingerprints: Iterable[Fingerprint], fingerprint: Fingerprint
|
||||
) -> Optional[Fingerprint]:
|
||||
"""Returns the full fingerprint looked up from a partial fingerprint like a key-id
|
||||
|
||||
Parameters
|
||||
----------
|
||||
fingerprints: Iteratable structure of fingerprints that should be searched
|
||||
fingerprint: Partial fingerprint to search for
|
||||
|
||||
Returns
|
||||
-------
|
||||
The full fingerprint or None
|
||||
"""
|
||||
|
||||
for fingerprint in filter(lambda e: str(e).endswith(fingerprint), fingerprints):
|
||||
return fingerprint
|
||||
return None
|
||||
|
||||
|
||||
def filter_fingerprints_by_trust(trusts: Dict[Fingerprint, Trust], trust: Trust) -> List[Fingerprint]:
|
||||
"""Filters a dict of Fingerprint to Trust by a passed Trust parameter and returns the matching fingerprints.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
trusts: Dict of Fingerprint to Trust that should be filtered based on the trust parameter
|
||||
trust: Trust that should be used to filter the trusts dict
|
||||
|
||||
Returns
|
||||
-------
|
||||
The matching fingerprints of the dict filtered by trust
|
||||
"""
|
||||
|
||||
return list(
|
||||
map(
|
||||
lambda item: item[0],
|
||||
filter(lambda item: trust == item[1], trusts.items()),
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
simple_printable: str = ascii_letters + digits + "_-.+@"
|
||||
ascii_mapping: Dict[str, str] = {
|
||||
"àáâãäæąăǎа": "a",
|
||||
"ćçĉċč": "c",
|
||||
"ďđ": "d",
|
||||
"éèêëęēĕėěɇ": "e",
|
||||
"ĝğġģ": "g",
|
||||
"ĥħȟ": "h",
|
||||
"ìíîïĩīĭįıij": "i",
|
||||
"ĵɉ": "j",
|
||||
"ķ": "k",
|
||||
"ł": "l",
|
||||
"ńņň": "n",
|
||||
"òóôõöøŏőðȍǿ": "o",
|
||||
"śș": "s",
|
||||
"ß": "ss",
|
||||
"ț": "t",
|
||||
"úûüȗűȕù": "u",
|
||||
"ýÿ": "y",
|
||||
"źż": "z",
|
||||
}
|
||||
ascii_mapping_lookup: Dict[str, str] = {}
|
||||
for key, value in ascii_mapping.items():
|
||||
for c in key:
|
||||
ascii_mapping_lookup[c] = value
|
||||
ascii_mapping_lookup[c.upper()] = value.upper()
|
||||
|
||||
|
||||
def simplify_ascii(_str: str) -> str:
|
||||
"""Simplify a string to contain more filesystem and printable friendly characters
|
||||
|
||||
Parameters
|
||||
----------
|
||||
_str: A string to simplify (e.g. 'Foobar McFooface <foobar@foo.face>')
|
||||
|
||||
Returns
|
||||
-------
|
||||
The simplified representation of _str
|
||||
"""
|
||||
_str = _str.strip("<")
|
||||
_str = _str.strip(">")
|
||||
_str = "".join([ascii_mapping_lookup.get(char) or char for char in _str])
|
||||
_str = sub("[^" + escape(simple_printable) + "]", "_", _str)
|
||||
return _str
|
||||
|
||||
|
||||
def simplify_uid(uid: Uid, hash_postfix: bool = True) -> str:
|
||||
"""Simplify a uid to contain more filesystem and printable friendly characters with an optional
|
||||
collision resistant hash postfix.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
uid: Uid to simplify (e.g. 'Foobar McFooface <foobar@foo.face>')
|
||||
hash_postfix: Whether to add a hash of the uid as postfix
|
||||
|
||||
Returns
|
||||
-------
|
||||
Simplified str representation of uid
|
||||
"""
|
||||
_hash = "" if not hash_postfix else f"_{sha256(uid.encode()).hexdigest()[:8]}"
|
||||
return f"{simplify_ascii(_str=uid)}{_hash}"
|
342
libkeyringctl/verify.py
Normal file
342
libkeyringctl/verify.py
Normal file
@ -0,0 +1,342 @@
|
||||
from logging import debug
|
||||
from pathlib import Path
|
||||
from subprocess import PIPE
|
||||
from subprocess import Popen
|
||||
from tempfile import NamedTemporaryFile
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Set
|
||||
|
||||
from libkeyringctl.keyring import export
|
||||
from libkeyringctl.keyring import get_fingerprints_from_paths
|
||||
from libkeyringctl.keyring import is_pgp_fingerprint
|
||||
from libkeyringctl.keyring import transform_fingerprint_to_keyring_path
|
||||
from libkeyringctl.keyring import transform_username_to_keyring_path
|
||||
from libkeyringctl.sequoia import packet_dump_field
|
||||
from libkeyringctl.sequoia import packet_kinds
|
||||
from libkeyringctl.types import Fingerprint
|
||||
from libkeyringctl.types import Uid
|
||||
from libkeyringctl.util import get_cert_paths
|
||||
from libkeyringctl.util import get_fingerprint_from_partial
|
||||
from libkeyringctl.util import simplify_uid
|
||||
from libkeyringctl.util import system
|
||||
|
||||
|
||||
def verify( # noqa: ignore=C901
|
||||
working_dir: Path,
|
||||
keyring_root: Path,
|
||||
sources: Optional[List[Path]],
|
||||
lint_hokey: bool = True,
|
||||
lint_sq_keyring: bool = True,
|
||||
) -> None:
|
||||
"""Verify certificates against modern expectations using sq-keyring-linter and hokey
|
||||
|
||||
Parameters
|
||||
----------
|
||||
working_dir: A directory to use for temporary files
|
||||
keyring_root: The keyring root directory to look up username shorthand sources
|
||||
sources: A list of username, fingerprint or directories from which to read PGP packet information
|
||||
(defaults to `keyring_root`)
|
||||
lint_hokey: Whether to run hokey lint
|
||||
lint_sq_keyring: Whether to run sq-keyring-linter
|
||||
"""
|
||||
|
||||
if not sources:
|
||||
sources = [keyring_root]
|
||||
|
||||
# transform shorthand paths to actual keyring paths
|
||||
transform_username_to_keyring_path(keyring_dir=keyring_root / "packager", paths=sources)
|
||||
transform_fingerprint_to_keyring_path(keyring_root=keyring_root, paths=sources)
|
||||
|
||||
cert_paths: Set[Path] = get_cert_paths(sources)
|
||||
all_fingerprints = get_fingerprints_from_paths([keyring_root])
|
||||
|
||||
for certificate in sorted(cert_paths):
|
||||
print(f"Verify {certificate.name} owned by {certificate.parent.name}")
|
||||
|
||||
verify_integrity(certificate=certificate, all_fingerprints=all_fingerprints)
|
||||
|
||||
with NamedTemporaryFile(
|
||||
dir=working_dir, prefix=f"{certificate.parent.name}-{certificate.name}", suffix=".asc"
|
||||
) as keyring:
|
||||
keyring_path = Path(keyring.name)
|
||||
export(
|
||||
working_dir=working_dir,
|
||||
keyring_root=keyring_root,
|
||||
sources=[certificate],
|
||||
output=keyring_path,
|
||||
)
|
||||
|
||||
if lint_hokey:
|
||||
keyring_fd = Popen(("sq", "dearmor", f"{str(keyring_path)}"), stdout=PIPE)
|
||||
print(system(["hokey", "lint"], _stdin=keyring_fd.stdout), end="")
|
||||
if lint_sq_keyring:
|
||||
print(system(["sq-keyring-linter", f"{str(keyring_path)}"]), end="")
|
||||
|
||||
|
||||
def verify_integrity(certificate: Path, all_fingerprints: Set[Fingerprint]) -> None: # noqa: ignore=C901
|
||||
if not is_pgp_fingerprint(certificate.name):
|
||||
raise Exception(f"Unexpected certificate name for certificate {certificate.name}: {str(certificate)}")
|
||||
|
||||
pubkey = certificate / f"{certificate.name}.asc"
|
||||
if not pubkey.is_file():
|
||||
raise Exception(f"Missing certificate pubkey {certificate.name}: {str(pubkey)}")
|
||||
|
||||
if not list(certificate.glob("uid/*/*.asc")):
|
||||
raise Exception(f"Missing at least one UID for {certificate.name}")
|
||||
|
||||
# check packet files
|
||||
for path in certificate.iterdir():
|
||||
if path.is_file():
|
||||
if path.name != f"{certificate.name}.asc":
|
||||
raise Exception(f"Unexpected file in certificate {certificate.name}: {str(path)}")
|
||||
assert_packet_kind(path=path, expected="Public-Key")
|
||||
assert_filename_matches_packet_fingerprint(path=path, check=certificate.name)
|
||||
debug(f"OK: {path}")
|
||||
elif path.is_dir():
|
||||
if "revocation" == path.name:
|
||||
verify_integrity_key_revocations(path=path)
|
||||
elif "directkey" == path.name:
|
||||
for directkey in path.iterdir():
|
||||
assert_is_dir(path=directkey)
|
||||
if "certification" == directkey.name:
|
||||
verify_integrity_direct_key_certifications(path=directkey)
|
||||
elif "revocation" == directkey.name:
|
||||
verify_integrity_direct_key_revocations(path=directkey)
|
||||
else:
|
||||
raise_unexpected_file(path=directkey)
|
||||
elif "uid" == path.name:
|
||||
for uid in path.iterdir():
|
||||
assert_is_dir(path=uid)
|
||||
uid_packet = uid / f"{uid.name}.asc"
|
||||
assert_is_file(path=uid_packet)
|
||||
|
||||
uid_binding_sig = uid / "certification" / f"{certificate.name}.asc"
|
||||
uid_revocation_sig = uid / "revocation" / f"{certificate.name}.asc"
|
||||
if not uid_binding_sig.is_file() and not uid_revocation_sig:
|
||||
raise Exception(f"Missing uid binding/revocation sig for {certificate.name}: {str(uid)}")
|
||||
|
||||
for uid_path in uid.iterdir():
|
||||
if uid_path.is_file():
|
||||
if uid_path.name != f"{uid.name}.asc":
|
||||
raise Exception(f"Unexpected file in certificate {certificate.name}: {str(uid_path)}")
|
||||
|
||||
assert_packet_kind(path=uid_path, expected="User")
|
||||
|
||||
uid_value = simplify_uid(Uid(packet_dump_field(packet=uid_path, query="Value")))
|
||||
if uid_value != uid.name:
|
||||
raise Exception(f"Unexpected uid in file {str(uid_path)}: {uid_value}")
|
||||
elif not uid_path.is_dir():
|
||||
raise Exception(f"Unexpected file type in certificate {certificate.name}: {str(uid_path)}")
|
||||
elif "certification" == uid_path.name:
|
||||
for sig in uid_path.iterdir():
|
||||
assert_is_file(path=sig)
|
||||
assert_is_pgp_fingerprint(path=sig, _str=sig.stem)
|
||||
assert_has_suffix(path=sig, suffix=".asc")
|
||||
|
||||
assert_packet_kind(path=sig, expected="Signature")
|
||||
assert_signature_type_certification(path=sig)
|
||||
|
||||
issuer = get_fingerprint_from_partial(
|
||||
fingerprints=all_fingerprints,
|
||||
fingerprint=Fingerprint(
|
||||
packet_dump_field(packet=sig, query="Hashed area|Unhashed area.Issuer")
|
||||
),
|
||||
)
|
||||
if issuer != sig.stem:
|
||||
raise Exception(f"Unexpected issuer in file {str(sig)}: {issuer}")
|
||||
debug(f"OK: {sig}")
|
||||
elif "revocation" == uid_path.name:
|
||||
for sig in uid_path.iterdir():
|
||||
assert_is_file(path=sig)
|
||||
assert_is_pgp_fingerprint(path=sig, _str=sig.stem)
|
||||
assert_has_suffix(path=sig, suffix=".asc")
|
||||
|
||||
assert_packet_kind(path=sig, expected="Signature")
|
||||
assert_signature_type(path=sig, expected="CertificationRevocation")
|
||||
|
||||
issuer = get_fingerprint_from_partial(
|
||||
fingerprints=all_fingerprints,
|
||||
fingerprint=Fingerprint(
|
||||
packet_dump_field(packet=sig, query="Hashed area|Unhashed area.Issuer")
|
||||
),
|
||||
)
|
||||
if issuer != sig.stem:
|
||||
raise Exception(f"Unexpected issuer in file {str(sig)}: {issuer}")
|
||||
|
||||
certification = uid_path.parent / "certification" / sig.name
|
||||
if certification.exists():
|
||||
raise Exception(f"Certification exists for revocation {str(sig)}: {certification}")
|
||||
|
||||
debug(f"OK: {sig}")
|
||||
else:
|
||||
raise Exception(f"Unexpected directory in certificate {certificate.name}: {str(uid_path)}")
|
||||
debug(f"OK: {uid_path}")
|
||||
debug(f"OK: {uid}")
|
||||
elif "subkey" == path.name:
|
||||
for subkey in path.iterdir():
|
||||
assert_is_dir(path=subkey)
|
||||
assert_is_pgp_fingerprint(path=subkey, _str=subkey.name)
|
||||
|
||||
subkey_packet = subkey / f"{subkey.name}.asc"
|
||||
assert_is_file(path=subkey_packet)
|
||||
|
||||
subkey_binding_sig = subkey / "certification" / f"{certificate.name}.asc"
|
||||
subkey_revocation_sig = subkey / "revocation" / f"{certificate.name}.asc"
|
||||
if not subkey_binding_sig.is_file() and not subkey_revocation_sig:
|
||||
raise Exception(f"Missing subkey binding/revocation sig for {certificate.name}: {str(subkey)}")
|
||||
|
||||
for subkey_path in subkey.iterdir():
|
||||
if subkey_path.is_file():
|
||||
if subkey_path.name != f"{subkey.name}.asc":
|
||||
raise Exception(
|
||||
f"Unexpected file in certificate {certificate.name}: {str(subkey_path)}"
|
||||
)
|
||||
|
||||
assert_packet_kind(path=subkey_path, expected="Public-Subkey")
|
||||
assert_filename_matches_packet_fingerprint(path=subkey_path, check=subkey_path.stem)
|
||||
elif not subkey_path.is_dir():
|
||||
raise Exception(
|
||||
f"Unexpected file type in certificate {certificate.name}: {str(subkey_path)}"
|
||||
)
|
||||
elif "certification" == subkey_path.name:
|
||||
for sig in subkey_path.iterdir():
|
||||
assert_is_file(path=sig)
|
||||
assert_is_pgp_fingerprint(path=sig, _str=sig.stem)
|
||||
assert_has_suffix(path=sig, suffix=".asc")
|
||||
|
||||
assert_packet_kind(path=sig, expected="Signature")
|
||||
assert_signature_type(path=sig, expected="SubkeyBinding")
|
||||
|
||||
assert_filename_matches_packet_issuer_fingerprint(path=sig, check=certificate.name)
|
||||
elif "revocation" == subkey_path.name:
|
||||
for sig in subkey_path.iterdir():
|
||||
assert_is_file(path=sig)
|
||||
assert_is_pgp_fingerprint(path=sig, _str=sig.stem)
|
||||
assert_has_suffix(path=sig, suffix=".asc")
|
||||
|
||||
assert_packet_kind(path=sig, expected="Signature")
|
||||
assert_signature_type(path=sig, expected="SubkeyRevocation")
|
||||
|
||||
assert_filename_matches_packet_issuer_fingerprint(path=sig, check=certificate.name)
|
||||
else:
|
||||
raise Exception(
|
||||
f"Unexpected directory in certificate {certificate.name}: {str(subkey_path)}"
|
||||
)
|
||||
debug(f"OK: {subkey_path}")
|
||||
else:
|
||||
raise Exception(f"Unexpected directory in certificate {certificate.name}: {str(path)}")
|
||||
else:
|
||||
raise Exception(f"Unexpected file type in certificate {certificate.name}: {str(path)}")
|
||||
|
||||
|
||||
def assert_packet_kind(path: Path, expected: str) -> None:
|
||||
kinds = packet_kinds(packet=path)
|
||||
if not kinds or len(kinds) != 1:
|
||||
raise Exception(f"Unexpected amount of packets in file {str(path)}: {kinds}")
|
||||
kind = kinds[0]
|
||||
if kind != expected:
|
||||
raise Exception(f"Unexpected packet in file {str(path)} kind: {kind} expected: {expected}")
|
||||
|
||||
|
||||
def assert_signature_type(path: Path, expected: str) -> None:
|
||||
sig_type = packet_dump_field(packet=path, query="Type")
|
||||
if sig_type != expected:
|
||||
raise Exception(f"Unexpected packet type in file {str(path)} type: {sig_type} expected: {expected}")
|
||||
|
||||
|
||||
def assert_signature_type_certification(path: Path) -> None:
|
||||
sig_type = packet_dump_field(packet=path, query="Type")
|
||||
if sig_type not in ["GenericCertification", "PersonaCertification", "CasualCertification", "PositiveCertification"]:
|
||||
raise Exception(f"Unexpected packet certification type in file {str(path)} type: {sig_type}")
|
||||
|
||||
|
||||
def assert_is_pgp_fingerprint(path: Path, _str: str) -> None:
|
||||
if not is_pgp_fingerprint(_str):
|
||||
raise Exception(f"Unexpected file name, not a pgp fingerprint: {str(path)}")
|
||||
|
||||
|
||||
def assert_filename_matches_packet_issuer_fingerprint(path: Path, check: str) -> None:
|
||||
fingerprint = packet_dump_field(packet=path, query="Unhashed area|Hashed area.Issuer Fingerprint")
|
||||
if not fingerprint == check:
|
||||
raise Exception(f"Unexpected packet fingerprint in file {str(path)}: {fingerprint}")
|
||||
|
||||
|
||||
def assert_filename_matches_packet_fingerprint(path: Path, check: str) -> None:
|
||||
fingerprint = packet_dump_field(packet=path, query="Fingerprint")
|
||||
if not fingerprint == check:
|
||||
raise Exception(f"Unexpected packet fingerprint in file {str(path)}: {fingerprint}")
|
||||
|
||||
|
||||
def assert_has_suffix(path: Path, suffix: str) -> None:
|
||||
if path.suffix != suffix:
|
||||
raise Exception(f"Unexpected file suffix in {str(path)} expected: {suffix}")
|
||||
|
||||
|
||||
def assert_is_file(path: Path) -> None:
|
||||
if not path.is_file():
|
||||
raise Exception(f"Unexpected type, should be file: {str(path)}")
|
||||
|
||||
|
||||
def assert_is_dir(path: Path) -> None:
|
||||
if not path.is_dir():
|
||||
raise Exception(f"Unexpected type, should be directory: {str(path)}")
|
||||
|
||||
|
||||
def raise_unexpected_file(path: Path) -> None:
|
||||
raise Exception(f"Unexpected file in directory: {str(path)}")
|
||||
|
||||
|
||||
def verify_integrity_key_revocations(path: Path) -> None:
|
||||
assert_is_dir(path=path)
|
||||
for sig in path.iterdir():
|
||||
assert_is_file(path=sig)
|
||||
assert_is_pgp_fingerprint(path=sig, _str=sig.stem)
|
||||
assert_has_suffix(path=sig, suffix=".asc")
|
||||
|
||||
assert_packet_kind(path=sig, expected="Signature")
|
||||
assert_signature_type(path=sig, expected="KeyRevocation")
|
||||
|
||||
assert_filename_matches_packet_issuer_fingerprint(path=sig, check=sig.stem)
|
||||
|
||||
debug(f"OK: {sig}")
|
||||
|
||||
|
||||
def verify_integrity_direct_key_certifications(path: Path) -> None:
|
||||
for issuer_dir in path.iterdir():
|
||||
assert_is_dir(path=issuer_dir)
|
||||
assert_is_pgp_fingerprint(path=issuer_dir, _str=issuer_dir.name)
|
||||
for certification in issuer_dir.iterdir():
|
||||
verify_integrity_direct_key_certification(path=certification)
|
||||
|
||||
|
||||
def verify_integrity_direct_key_revocations(path: Path) -> None:
|
||||
for issuer_dir in path.iterdir():
|
||||
assert_is_dir(path=issuer_dir)
|
||||
assert_is_pgp_fingerprint(path=issuer_dir, _str=issuer_dir.name)
|
||||
for certification in issuer_dir.iterdir():
|
||||
verify_integrity_direct_key_revocation(path=certification)
|
||||
|
||||
|
||||
def verify_integrity_direct_key_certification(path: Path) -> None:
|
||||
assert_is_file(path=path)
|
||||
assert_has_suffix(path=path, suffix=".asc")
|
||||
|
||||
assert_packet_kind(path=path, expected="Signature")
|
||||
assert_signature_type(path=path, expected="DirectKey")
|
||||
|
||||
assert_filename_matches_packet_issuer_fingerprint(path=path, check=path.parent.name)
|
||||
|
||||
debug(f"OK: {path}")
|
||||
|
||||
|
||||
def verify_integrity_direct_key_revocation(path: Path) -> None:
|
||||
assert_is_file(path=path)
|
||||
assert_has_suffix(path=path, suffix=".asc")
|
||||
|
||||
assert_packet_kind(path=path, expected="Signature")
|
||||
assert_signature_type(path=path, expected="CertificationRevocation")
|
||||
|
||||
assert_filename_matches_packet_issuer_fingerprint(path=path, check=path.parent.name)
|
||||
|
||||
debug(f"OK: {path}")
|
Reference in New Issue
Block a user