Merge pull request #214910 from rnhmjoj/pr-gnupg-test

nixos/tests/gnupg: init
This commit is contained in:
Jacek Galowicz
2023-02-07 09:17:06 +01:00
committed by GitHub
4 changed files with 148 additions and 24 deletions

View File

@@ -1,4 +1,4 @@
from contextlib import _GeneratorContextManager from contextlib import _GeneratorContextManager, nullcontext
from pathlib import Path from pathlib import Path
from queue import Queue from queue import Queue
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple
@@ -406,25 +406,23 @@ class Machine:
return rootlog.nested(msg, my_attrs) return rootlog.nested(msg, my_attrs)
def wait_for_monitor_prompt(self) -> str: def wait_for_monitor_prompt(self) -> str:
with self.nested("waiting for monitor prompt"): assert self.monitor is not None
assert self.monitor is not None answer = ""
answer = "" while True:
while True: undecoded_answer = self.monitor.recv(1024)
undecoded_answer = self.monitor.recv(1024) if not undecoded_answer:
if not undecoded_answer: break
break answer += undecoded_answer.decode()
answer += undecoded_answer.decode() if answer.endswith("(qemu) "):
if answer.endswith("(qemu) "): break
break return answer
return answer
def send_monitor_command(self, command: str) -> str: def send_monitor_command(self, command: str) -> str:
self.run_callbacks() self.run_callbacks()
with self.nested(f"sending monitor command: {command}"): message = f"{command}\n".encode()
message = f"{command}\n".encode() assert self.monitor is not None
assert self.monitor is not None self.monitor.send(message)
self.monitor.send(message) return self.wait_for_monitor_prompt()
return self.wait_for_monitor_prompt()
def wait_for_unit( def wait_for_unit(
self, unit: str, user: Optional[str] = None, timeout: int = 900 self, unit: str, user: Optional[str] = None, timeout: int = 900
@@ -547,7 +545,7 @@ class Machine:
self.shell.send("echo ${PIPESTATUS[0]}\n".encode()) self.shell.send("echo ${PIPESTATUS[0]}\n".encode())
rc = int(self._next_newline_closed_block_from_shell().strip()) rc = int(self._next_newline_closed_block_from_shell().strip())
return (rc, output.decode()) return (rc, output.decode(errors="replace"))
def shell_interact(self, address: Optional[str] = None) -> None: def shell_interact(self, address: Optional[str] = None) -> None:
"""Allows you to interact with the guest shell for debugging purposes. """Allows you to interact with the guest shell for debugging purposes.
@@ -685,9 +683,9 @@ class Machine:
retry(tty_matches) retry(tty_matches)
def send_chars(self, chars: str, delay: Optional[float] = 0.01) -> None: def send_chars(self, chars: str, delay: Optional[float] = 0.01) -> None:
with self.nested(f"sending keys '{chars}'"): with self.nested(f"sending keys {repr(chars)}"):
for char in chars: for char in chars:
self.send_key(char, delay) self.send_key(char, delay, log=False)
def wait_for_file(self, filename: str) -> None: def wait_for_file(self, filename: str) -> None:
"""Waits until the file exists in machine's file system.""" """Waits until the file exists in machine's file system."""
@@ -860,11 +858,15 @@ class Machine:
if matches is not None: if matches is not None:
return return
def send_key(self, key: str, delay: Optional[float] = 0.01) -> None: def send_key(
self, key: str, delay: Optional[float] = 0.01, log: Optional[bool] = True
) -> None:
key = CHAR_TO_KEY.get(key, key) key = CHAR_TO_KEY.get(key, key)
self.send_monitor_command(f"sendkey {key}") context = self.nested(f"sending key {repr(key)}") if log else nullcontext()
if delay is not None: with context:
time.sleep(delay) self.send_monitor_command(f"sendkey {key}")
if delay is not None:
time.sleep(delay)
def send_console(self, chars: str) -> None: def send_console(self, chars: str) -> None:
assert self.process assert self.process

View File

@@ -248,6 +248,7 @@ in {
gnome = handleTest ./gnome.nix {}; gnome = handleTest ./gnome.nix {};
gnome-flashback = handleTest ./gnome-flashback.nix {}; gnome-flashback = handleTest ./gnome-flashback.nix {};
gnome-xorg = handleTest ./gnome-xorg.nix {}; gnome-xorg = handleTest ./gnome-xorg.nix {};
gnupg = handleTest ./gnupg.nix {};
go-neb = handleTest ./go-neb.nix {}; go-neb = handleTest ./go-neb.nix {};
gobgpd = handleTest ./gobgpd.nix {}; gobgpd = handleTest ./gobgpd.nix {};
gocd-agent = handleTest ./gocd-agent.nix {}; gocd-agent = handleTest ./gocd-agent.nix {};

118
nixos/tests/gnupg.nix Normal file
View File

@@ -0,0 +1,118 @@
import ./make-test-python.nix ({ pkgs, lib, ...}:
{
name = "gnupg";
meta = with lib.maintainers; {
maintainers = [ rnhmjoj ];
};
# server for testing SSH
nodes.server = { ... }: {
imports = [ ../modules/profiles/minimal.nix ];
users.users.alice.isNormalUser = true;
services.openssh.enable = true;
};
# machine for testing GnuPG
nodes.machine = { pkgs, ... }: {
imports = [ ../modules/profiles/minimal.nix ];
users.users.alice.isNormalUser = true;
services.getty.autologinUser = "alice";
environment.shellInit = ''
# preset a key passphrase in gpg-agent
preset_key() {
# find all keys
case "$1" in
ssh) grips=$(awk '/^[0-9A-F]/{print $1}' "''${GNUPGHOME:-$HOME/.gnupg}/sshcontrol") ;;
pgp) grips=$(gpg --with-keygrip --list-secret-keys | awk '/Keygrip/{print $3}') ;;
esac
# try to preset the passphrase for each key found
for grip in $grips; do
"$(gpgconf --list-dirs libexecdir)/gpg-preset-passphrase" -c -P "$2" "$grip"
done
}
'';
programs.gnupg.agent.enable = true;
programs.gnupg.agent.enableSSHSupport = true;
};
testScript =
''
import shlex
def as_alice(command: str) -> str:
"""
Wraps a command to run it as Alice in a login shell
"""
quoted = shlex.quote(command)
return "su --login alice --command " + quoted
start_all()
with subtest("Wait for the autologin"):
machine.wait_until_tty_matches("1", "alice@machine")
with subtest("Can generate a PGP key"):
# Note: this needs a tty because of pinentry
machine.send_chars("gpg --gen-key\n")
machine.wait_until_tty_matches("1", "Real name:")
machine.send_chars("Alice\n")
machine.wait_until_tty_matches("1", "Email address:")
machine.send_chars("alice@machine\n")
machine.wait_until_tty_matches("1", "Change")
machine.send_chars("O\n")
machine.wait_until_tty_matches("1", "Please enter")
machine.send_chars("pgp_p4ssphrase\n")
machine.wait_until_tty_matches("1", "Please re-enter")
machine.send_chars("pgp_p4ssphrase\n")
machine.wait_until_tty_matches("1", "public and secret key created")
with subtest("Confirm the key is in the keyring"):
machine.wait_until_succeeds(as_alice("gpg --list-secret-keys | grep -q alice@machine"))
with subtest("Can generate and add an SSH key"):
machine.succeed(as_alice("ssh-keygen -t ed25519 -f alice -N ssh_p4ssphrase"))
# Note: apparently this must be run before using the OpenSSH agent
# socket for the first time in a tty. It's not needed for `ssh`
# because there's a hook that calls it automatically (only in NixOS).
machine.send_chars("gpg-connect-agent updatestartuptty /bye\n")
# Note: again, this needs a tty because of pinentry
machine.send_chars("ssh-add alice\n")
machine.wait_until_tty_matches("1", "Enter passphrase")
machine.send_chars("ssh_p4ssphrase\n")
machine.wait_until_tty_matches("1", "Please enter")
machine.send_chars("ssh_agent_p4ssphrase\n")
machine.wait_until_tty_matches("1", "Please re-enter")
machine.send_chars("ssh_agent_p4ssphrase\n")
with subtest("Confirm the SSH key has been registered"):
machine.wait_until_succeeds(as_alice("ssh-add -l | grep -q alice@machine"))
with subtest("Can preset the key passphrases in the agent"):
machine.succeed(as_alice("echo allow-preset-passphrase > .gnupg/gpg-agent.conf"))
machine.succeed(as_alice("pkill gpg-agent"))
machine.succeed(as_alice("preset_key pgp pgp_p4ssphrase"))
machine.succeed(as_alice("preset_key ssh ssh_agent_p4ssphrase"))
with subtest("Can encrypt and decrypt a message"):
machine.succeed(as_alice("echo Hello | gpg -e -r alice | gpg -d | grep -q Hello"))
with subtest("Can log into the server"):
# Install Alice's public key
public_key = machine.succeed(as_alice("cat alice.pub"))
server.succeed("mkdir /etc/ssh/authorized_keys.d")
server.succeed(f"printf '{public_key}' > /etc/ssh/authorized_keys.d/alice")
server.wait_for_open_port(22)
machine.succeed(as_alice("ssh -i alice -o StrictHostKeyChecking=no server exit"))
'';
})

View File

@@ -1,5 +1,6 @@
{ fetchurl, fetchpatch, lib, stdenv, pkg-config, libgcrypt, libassuan, libksba { fetchurl, fetchpatch, lib, stdenv, pkg-config, libgcrypt, libassuan, libksba
, libgpg-error, libiconv, npth, gettext, texinfo, buildPackages , libgpg-error, libiconv, npth, gettext, texinfo, buildPackages
, nixosTests
, guiSupport ? stdenv.isDarwin, enableMinimal ? false , guiSupport ? stdenv.isDarwin, enableMinimal ? false
, adns, bzip2, gnutls, libusb1, openldap , adns, bzip2, gnutls, libusb1, openldap
, pinentry, readline, sqlite, zlib , pinentry, readline, sqlite, zlib
@@ -85,6 +86,8 @@ stdenv.mkDerivation rec {
enableParallelBuilding = true; enableParallelBuilding = true;
passthru.tests.connman = nixosTests.gnupg;
meta = with lib; { meta = with lib; {
homepage = "https://gnupg.org"; homepage = "https://gnupg.org";
description = "Modern release of the GNU Privacy Guard, a GPL OpenPGP implementation"; description = "Modern release of the GNU Privacy Guard, a GPL OpenPGP implementation";