Als Entwickler der
PyGOST- Bibliothek (kryptografische GOST-
Grundelemente in reinem Python) bekomme ich häufig Fragen zur Implementierung des einfachsten sicheren Messaging auf meinem Knie. Viele halten angewandte Kryptografie für eine ziemlich einfache Sache, und ein Aufruf von .encrypt () an eine Blockverschlüsselung reicht aus, um sicher über einen Kommunikationskanal zu senden. Andere glauben, dass angewandte Kryptographie das Schicksal einiger weniger ist, und es ist akzeptabel, dass wohlhabende Unternehmen wie Telegramm mit Mathematikolympiaden
kein sicheres Protokoll implementieren
können .
All dies veranlasste mich, diesen Artikel zu schreiben, um zu zeigen, dass die Implementierung kryptografischer Protokolle und sicherer IM keine so schwierige Aufgabe ist. Es lohnt sich jedoch nicht, eigene Authentifizierungs- und Schlüsselvereinbarungsprotokolle zu erfinden.
Der Artikel wird
Peer-to-Peer- ,
Friend-to-Friend- ,
End-to-End-verschlüsselter Instant Messenger mit
SIGMA-I- Authentifizierung und Schlüsselvereinbarungsprotokoll (basierend auf der Implementierung von
IPsec IKE ) unter Verwendung ausschließlich von GOG-Kryptografiealgorithmen PyGOST-Bibliotheken und geschrieben ASN.1-Codierung von Nachrichten mit der
PyDERASN- Bibliothek (über die ich bereits
zuvor geschrieben habe ). Voraussetzung: Es muss so einfach sein, dass es an einem Abend (oder Arbeitstag) von Grund auf neu geschrieben werden kann, sonst ist es kein einfaches Programm mehr. Es hat wahrscheinlich Fehler, unnötige Schwierigkeiten, Mängel und dies ist mein erstes Programm, das die Asyncio-Bibliothek verwendet.
Design IM
Um loszulegen, müssen Sie verstehen, wie unser IM aussehen wird. Lassen Sie es der Einfachheit halber ein Peer-to-Peer-Netzwerk sein, ohne dass Teilnehmer entdeckt werden. Wir werden persönlich angeben, an welche Adresse: den Port, an den eine Verbindung hergestellt werden soll, um mit dem Gesprächspartner zu kommunizieren.
Ich verstehe, dass die Annahme der Verfügbarkeit einer direkten Kommunikation zwischen zwei beliebigen Computern derzeit eine erhebliche Einschränkung der Anwendbarkeit von IM in der Praxis darstellt. Aber je mehr Entwickler alle Arten von NAT-Traversal-Krücken implementieren, desto länger bleiben wir im IPv4-Internet, wobei die Wahrscheinlichkeit einer Kommunikation zwischen beliebigen Computern abnimmt. Wie viel können Sie den Mangel an IPv6 zu Hause und bei der Arbeit ertragen?
Wir werden ein Freund-zu-Freund-Netzwerk haben: Alle möglichen Gesprächspartner sollten im Voraus bekannt sein. Erstens vereinfacht es alles erheblich: sich vorstellen, einen Namen / Schlüssel finden oder nicht finden, die Verbindung trennen oder weiterarbeiten, den Gesprächspartner kennen. Zweitens ist es im allgemeinen Fall sicher und schließt viele Angriffe aus.
Die IM-Oberfläche wird den klassischen Lösungen von
saugfreien Projekten nahe kommen , die ich wegen ihres Minimalismus und ihrer Unix-Way-Philosophie sehr mag. Ein IM-Programm für jeden Gesprächspartner erstellt ein Verzeichnis mit drei Unix-Domain-Sockets:
- In - Nachrichten, die an den Gesprächspartner gesendet werden, werden darin aufgezeichnet.
- vom Gesprächspartner empfangene Nachrichten werden von ihm gelesen;
- Wenn wir den Status daraus lesen, werden wir herausfinden, ob der Gesprächspartner jetzt verbunden ist, die Verbindungsadresse / den Anschluss.
Außerdem wird ein Conn-Socket erstellt, der schreibt, an welchen Host-Port wir eine Verbindung zu einem Remote-Gesprächspartner herstellen.
| - Alice
| | - in
| | - raus
| `- Zustand
| - bob
| | - in
| | - raus
| `- Zustand
`- conn
Dieser Ansatz ermöglicht es Ihnen, unabhängige Implementierungen des IM-Transports und der Benutzeroberfläche vorzunehmen, da es keinen Freund für den Geschmack und die Farbe gibt, werden Sie nicht jedem gefallen. Mit
tmux und / oder
Multitail können Sie eine Multi-Window-Oberfläche mit Syntaxhervorhebung erhalten. Und mit
rlwrap können Sie eine GNU Readline-kompatible Zeichenfolge für die Eingabe von Nachrichten erhalten.
Tatsächlich verwenden saugfreie Projekte FIFO-Dateien. Persönlich konnte ich nicht verstehen, wie man in Asyncio mit Dateien konkurrenzfähig arbeitet, ohne ein handgemachtes Substrat aus ausgewählten Threads (ich benutze
Go- Sprache für solche Dinge seit langer Zeit). Deshalb habe ich mich entschlossen, mit den Unix-Domain-Sockets auszukommen. Dies macht es leider unmöglich, Echo 2001: 470: dead :: babe 6666> conn zu machen. Ich habe dieses Problem mit
socat : echo 2001: 470: dead :: babe 6666 | gelöst socat - UNIX-CONNECT: conn, socat READLINE UNIX-CONNECT: alice / in.
Anfängliches unsicheres Protokoll
TCP wird als Transportmittel verwendet: Es garantiert die Lieferung und die Bestellung. UDP garantiert weder das eine noch das andere (was nützlich wäre, wenn Kryptografie angewendet wird), und die
SCTP- Unterstützung in Python ist sofort
einsatzbereit .
Leider hat TCP kein Konzept einer Nachricht, sondern nur einen Bytestrom. Daher ist es erforderlich, ein Format für Nachrichten zu erstellen, damit diese in diesem Stream untereinander geteilt werden können. Wir können uns darauf einigen, das Zeilenvorschubzeichen zu verwenden. Für den Anfang ist es jedoch geeignet, wenn wir beginnen, unsere Nachrichten zu verschlüsseln. Dieses Symbol kann an einer beliebigen Stelle im Chiffretext erscheinen. Daher sind Protokolle in Netzwerken beliebt, die zuerst die Länge der Nachricht in Bytes senden. In Python gibt es beispielsweise standardmäßig xdrlib, mit dem Sie mit einem ähnlichen
XDR- Format arbeiten können.
Wir werden beim TCP-Lesen nicht richtig und effizient arbeiten - wir vereinfachen den Code. Wir lesen die Daten aus dem Socket in einer Endlosschleife, bis wir die vollständige Nachricht dekodieren. Sie können JSON auch mit XML als Format für diesen Ansatz verwenden. Wenn jedoch Kryptografie hinzugefügt wird, müssen die Daten signiert und authentifiziert werden - und dies erfordert eine byteweise identische Darstellung von Objekten, die JSON / XML nicht bereitstellt (Dumps können variieren).
XDR ist für eine solche Aufgabe geeignet. Ich wähle jedoch ASN.1 mit DER-Codierung und
PyDERASN- Bibliothek, da wir Objekte auf hoher Ebene zur Hand haben, die oft angenehmer und bequemer zu bearbeiten sind. Im Gegensatz zu schemenlosem
Bencode ,
MessagePack oder
CBOR überprüft ASN.1 Daten automatisch
anhand eines fest codierten Schemas.
Die empfangene Nachricht ist Msg: entweder ein Text MsgText (mit bisher einem Textfeld) oder eine Handshake-Nachricht MsgHandshake (in der der Name des Gesprächspartners übertragen wird). Jetzt sieht es überkompliziert aus, aber es hat die Zukunft berührt.
┌─────┐ ┌─────┐
EPeerA│ │PeerB│
└──┬──┘ └──┬──┘
│MsgHandshake (IdA) │
│────────────────── >> │
│ │
│MsgHandshake (IdB) │
│ <──────────────────│
│ │
│ MsgText () │
│────────────────── >> │
│ │
│ MsgText () │
│ <──────────────────│
│ │
IM ohne Kryptographie
Wie gesagt, für alle Operationen mit Sockets wird die Asyncio-Bibliothek verwendet. Erklären Sie, was wir beim Start erwarten:
parser = argparse.ArgumentParser(description="GOSTIM") parser.add_argument( "--our-name", required=True, help="Our peer name", ) parser.add_argument( "--their-names", required=True, help="Their peer names, comma-separated", ) parser.add_argument( "--bind", default="::1", help="Address to listen on", ) parser.add_argument( "--port", type=int, default=6666, help="Port to listen on", ) args = parser.parse_args() OUR_NAME = UTF8String(args.our_name) THEIR_NAMES = set(args.their_names.split(","))
Stellen Sie Ihren eigenen Namen ein (--our-name alice). Ein Komma listet alle erwarteten Gesprächspartner auf (- ihre Namen bob, eve). Für jeden der Gesprächspartner wird ein Verzeichnis mit Unix-Sockets sowie eine Coroutine für jeden In-, Out- und Status erstellt:
for peer_name in THEIR_NAMES: makedirs(peer_name, mode=0o700, exist_ok=True) out_queue = asyncio.Queue() OUT_QUEUES[peer_name] = out_queue asyncio.ensure_future(asyncio.start_unix_server( partial(unixsock_out_processor, out_queue=out_queue), path.join(peer_name, "out"), )) in_queue = asyncio.Queue() IN_QUEUES[peer_name] = in_queue asyncio.ensure_future(asyncio.start_unix_server( partial(unixsock_in_processor, in_queue=in_queue), path.join(peer_name, "in"), )) asyncio.ensure_future(asyncio.start_unix_server( partial(unixsock_state_processor, peer_name=peer_name), path.join(peer_name, "state"), )) asyncio.ensure_future(asyncio.start_unix_server(unixsock_conn_processor, "conn"))
Nachrichten vom Benutzer vom Eingangssocket werden an die Warteschlange IN_QUEUES gesendet:
async def unixsock_in_processor(reader, writer, in_queue: asyncio.Queue) -> None: while True: text = await reader.read(MaxTextLen) if text == b"": break await in_queue.put(text.decode("utf-8"))
Nachrichten von Gesprächspartnern werden an die Warteschlange OUT_QUEUES gesendet, von der Daten in den Out-Socket geschrieben werden:
async def unixsock_out_processor(reader, writer, out_queue: asyncio.Queue) -> None: while True: text = await out_queue.get() writer.write(("[%s] %s" % (datetime.now(), text)).encode("utf-8")) await writer.drain()
Beim Lesen aus dem Status-Socket sucht das Programm im PEER_ALIVE-Wörterbuch nach der Adresse des Gesprächspartners. Wenn noch keine Verbindung zum Gesprächspartner besteht, wird eine leere Zeile geschrieben.
async def unixsock_state_processor(reader, writer, peer_name: str) -> None: peer_writer = PEER_ALIVES.get(peer_name) writer.write( b"" if peer_writer is None else (" ".join([ str(i) for i in peer_writer.get_extra_info("peername")[:2] ]).encode("utf-8") + b"\n") ) await writer.drain() writer.close()
Wenn eine Adresse in den Conn-Socket geschrieben wird, wird die "Initiator" -Funktion der Verbindung gestartet:
async def unixsock_conn_processor(reader, writer) -> None: data = await reader.read(256) writer.close() host, port = data.decode("utf-8").split(" ") await initiator(host=host, port=int(port))
Betrachten Sie den Initiator. Zunächst öffnet er offensichtlich eine Verbindung zum angegebenen Host / Port und sendet eine Handshake-Nachricht mit seinem Namen:
130 async def initiator(host, port): 131 _id = repr((host, port)) 132 logging.info("%s: dialing", _id) 133 reader, writer = await asyncio.open_connection(host, port) 134
Dann wartet es auf eine Antwort von der entfernten Seite. Versuche, die empfangene Antwort gemäß dem Msg ASN.1-Schema zu dekodieren. Wir gehen davon aus, dass die gesamte Nachricht von einem TCP-Segment gesendet wird und wir sie atomar empfangen, wenn .read () aufgerufen wird. Wir überprüfen, ob wir genau die Handshake-Nachricht erhalten haben.
141
Wir überprüfen, ob der Name der Person, mit der wir sprechen, uns bekannt ist. Wenn nicht, trennen Sie die Verbindung. Wir prüfen, ob wir bereits eine Verbindung zu ihm hergestellt haben (der Gesprächspartner gab erneut den Befehl, sich mit uns zu verbinden) und schließen sie. Python-Zeichenfolgen mit Nachrichtentext werden in die IN_QUEUES-Warteschlange gestellt, es gibt jedoch einen speziellen Wert None, der der Coroutine signalisiert, dass msg_sender nicht mehr funktioniert, damit sie ihren Writer im Zusammenhang mit der veralteten TCP-Verbindung vergisst.
159 msg_handshake = msg.value 160 peer_name = str(msg_handshake["peerName"]) 161 if peer_name not in THEIR_NAMES: 162 logging.warning("unknown peer name: %s", peer_name) 163 writer.close() 164 return 165 logging.info("%s: session established: %s", _id, peer_name) 166
msg_sender akzeptiert ausgehende Nachrichten (von einem In-Socket in die Warteschlange gestellt), serialisiert sie in eine MsgText-Nachricht und sendet sie über eine TCP-Verbindung. Es kann jederzeit abbrechen - wir fangen es eindeutig ab.
async def msg_sender(peer_name: str, writer) -> None: in_queue = IN_QUEUES[peer_name] while True: text = await in_queue.get() if text is None: break writer.write(Msg(("text", MsgText(( ("text", UTF8String(text)), )))).encode()) try: await writer.drain() except ConnectionResetError: del PEER_ALIVES[peer_name] return logging.info("%s: sent %d characters message", peer_name, len(text))
Am Ende tritt der Initiator in einen endlosen Zyklus des Lesens von Nachrichten aus dem Socket ein. Überprüft, ob es sich um eine Textnachricht handelt, und stellt in die OUT_QUEUES die Warteschlange, aus der sie an den Out-Socket des entsprechenden Gesprächspartners gesendet werden. Warum können Sie nicht einfach .read () ausführen und die Nachricht dekodieren? Weil es möglich ist, dass mehrere Nachrichten des Benutzers im Puffer des Betriebssystems zusammengefasst und von einem TCP-Segment gesendet werden. Wir können den ersten dekodieren, und dann kann ein Teil des nachfolgenden im Puffer verbleiben. In jedem Notfall schließen wir die TCP-Verbindung und stoppen die msg_sender-Coroutine (indem wir None an die OUT_QUEUES-Warteschlange senden).
174 buf = b"" 175
Kehren wir zum Hauptcode zurück. Nachdem wir alle Coroutinen erstellt haben, starten wir zum Zeitpunkt des Starts des Programms den TCP-Server. Für jede hergestellte Verbindung erstellt er eine Responder-Coroutine.
logging.basicConfig( level=logging.INFO, format="%(levelname)s %(asctime)s: %(funcName)s: %(message)s", ) loop = asyncio.get_event_loop() server = loop.run_until_complete(asyncio.start_server(responder, args.bind, args.port)) logging.info("Listening on: %s", server.sockets[0].getsockname()) loop.run_forever()
Der Responder ähnelt dem Initiator und spiegelt dieselben Aktionen wider. Der Einfachheit halber beginnt jedoch sofort eine endlose Schleife zum Lesen von Nachrichten. Jetzt sendet das Handshake-Protokoll eine Nachricht von jeder Seite, aber in Zukunft wird es zwei vom Initiator der Verbindung geben, wonach Textnachrichten sofort gesendet werden können.
72 async def responder(reader, writer): 73 _id = writer.get_extra_info("peername") 74 logging.info("%s: connected", _id) 75 buf = b"" 76 msg_expected = "handshake" 77 peer_name = None 78 while True: 79
Sicheres Protokoll
Es ist an der Zeit, unsere Kommunikation zu sichern. Was verstehen wir unter Sicherheit und was wollen wir:
- Vertraulichkeit übermittelter Nachrichten;
- Authentizität und Integrität der übertragenen Nachrichten - ihre Änderung muss erkannt werden;
- Schutz vor Wiederholungsangriffen - Die Tatsache, dass Nachrichten verloren gegangen sind oder erneut versucht wurden, sollte erkannt werden (und wir beschließen, die Verbindung zu trennen).
- Identifizierung und Authentifizierung von Gesprächspartnern durch vorgesteuerte öffentliche Schlüssel - wir haben bereits früher entschieden, dass wir ein Freund-zu-Freund-Netzwerk aufbauen. Erst nach der Authentifizierung werden wir verstehen, mit wem wir kommunizieren.
- Das Vorhandensein perfekter Forward Secrecy Properties (PFS) - der Kompromiss unseres langlebigen Signaturschlüssels sollte nicht dazu führen, dass die gesamte vorherige Korrespondenz gelesen werden kann. Das Aufzeichnen von abgefangenem Verkehr wird unbrauchbar.
- Gültigkeit / Gültigkeit von Nachrichten (Transport und Handshakes) nur innerhalb derselben TCP-Sitzung. Das Einfügen korrekt signierter / authentifizierter Nachrichten aus einer anderen Sitzung (auch mit demselben Gesprächspartner) sollte nicht möglich sein.
- Der passive Beobachter sollte keine Benutzerkennungen, langlebigen öffentlichen Schlüssel oder Hashes von diesen sehen. Eine Art Anonymität von einem passiven Beobachter.
Überraschenderweise möchte fast jeder dieses Minimum in jedem Handshake-Protokoll haben, und nur sehr wenige der oben genannten werden letztendlich für selbst entwickelte Protokolle ausgeführt. Jetzt werden wir also keine neuen Dinge erfinden. Ich würde definitiv empfehlen, das
Noise-Framework zum Erstellen von Protokollen zu verwenden, aber lassen Sie uns etwas Einfacheres wählen.
Am beliebtesten sind zwei Protokolle:
- TLS ist ein komplexes Protokoll mit einer langen Geschichte von Fehlern, Schulen, Schwachstellen, schlechtem Durchdenken, Komplexität und Mängeln (dies gilt jedoch nicht viel für TLS 1.3). Aber wir betrachten es wegen der Komplexität nicht.
- IPSec mit IKE - haben keine ernsthaften kryptografischen Probleme, obwohl sie auch nicht einfach sind. Wenn Sie über IKEv1 und IKEv2 lesen, sind die Protokolle STS , ISO / IEC IS 9798-3 und SIGMA (SIGn-and-MAc) - einfach genug, um an einem Abend implementiert zu werden.
Wie ist SIGMA als letztes Glied bei der Entwicklung von STS / ISO-Protokollen gut? Es erfüllt alle unsere Anforderungen (einschließlich des „Versteckens“ der Kennungen von Gesprächspartnern) und weist keine bekannten kryptografischen Probleme auf. Es ist minimalistisch - das Entfernen mindestens eines Elements aus der Protokollnachricht führt zu seiner Unsicherheit.
Gehen wir vom einfachsten hausgemachten Protokoll zu SIGMA. Die grundlegendste Operation, an der wir interessiert sind, ist die
Tastenanpassung : Eine Funktion, an deren Ausgabe beide Teilnehmer denselben Wert erhalten, der als symmetrischer Schlüssel verwendet werden kann. Ohne auf Details einzugehen: Jede der Parteien generiert ein kurzlebiges Schlüsselpaar (nur innerhalb derselben Sitzung verwendet) (öffentliche und private Schlüssel), tauscht öffentliche Schlüssel aus, ruft die Abstimmungsfunktion auf, an deren Eingabe sie ihren privaten Schlüssel und den öffentlichen Schlüssel des Gesprächspartners senden.
┌─────┐ ┌─────┐
EPeerA│ │PeerB│
└──┬──┘ └──┬──┘
│ IdA, PubA │ │
│────────────── >> │ rPrvA, PubA = DHgen () ║
│ │ ╚═══════════════════╝
B IdB, PubB │ ╔ ═ ═ ╔ ╔ ╔ ╔ ╔ ╔ ╔
│ <────────────────│ ║║rrrrrrrrrPrvB, PubB = DHgen () ║
│ │ ╚═══════════════════╝
────┐ ╔═══════════════════╗
Ey ║ Schlüssel = DH (PrvA, PubB) ║
<───┘ ╚═══════╤═══════════╝
│ │
│ │
Jeder kann in die Mitte eingreifen und die öffentlichen Schlüssel durch eigene ersetzen - in diesem Protokoll gibt es keine Authentifizierung der Gesprächspartner. Fügen Sie eine Signatur mit langlebigen Schlüsseln hinzu.
┌─────┐ ┌─────┐
EPeerA│ │PeerB│
└──┬──┘ └──┬──┘
│IdA, PubA, Zeichen (SignPrvA, (PubA)) │ ╔═══════════════════════╗
P ─ ─ ─ ─ ign ign ign ign ign ign ign ign ign ign ign ign ign ign ,,,,,,, SignPubA = load () ║
│ │ ║PrvA, PubA = DHgen () ║
│ │ ╚══ ═ ═ ═ ═ ╝ ╝ ═ ╝ ╝ ╝ ╝ ╝ ╝
│IdB, PubB, Zeichen (SignPrvB, (PubB)) │ ╔═══════════════════════╗
│ <────────────────────────────────── ignSignPrvB, SignPubB = load () ║
│ │ ║PrvB, PubB = DHgen () ║
│ │ ╚══ ═ ═ ═ ═ ╝ ╝ ═ ╝ ╝ ╝ ╝ ╝ ╝
────┐ ╔═════════════════════│ │
│ ║verify (SignPubB, ...) ║ │
<───┘ ║Key = DH (PrvA, PubB) ║ │
│ ╚═════════════════════╝ │
│ │
Eine solche Signatur funktioniert nicht, da sie nicht an eine bestimmte Sitzung gebunden ist. Solche Nachrichten eignen sich auch für Sitzungen mit anderen Teilnehmern. Der gesamte Kontext sollte abonniert werden. Dies erzwingt auch das Hinzufügen einer weiteren Nachricht von A.
Darüber hinaus ist es wichtig, Ihre eigene Kennung als Signatur hinzuzufügen, da wir andernfalls IdXXX ersetzen und die Nachricht mit dem Schlüssel eines anderen bekannten Gesprächspartners neu signieren können. Um
Reflexionsangriffe zu verhindern, müssen sich die Elemente unter der Signatur in ihrer Bedeutung an klar definierten Stellen befinden: Wenn A signiert (PubA, PubB), muss B signieren (PubB, PubA). Dies zeigt auch, wie wichtig es ist, die Struktur und das Format der serialisierten Daten zu wählen. Beispielsweise werden Sätze in der ASN.1 DER-Codierung sortiert: SET OF (PubA, PubB) ist identisch mit SET OF (PubB, PubA).
┌─────┐ ┌─────┐
EPeerA│ │PeerB│
└──┬──┘ └──┬──┘
│ IdA, PubA │ │
│────────────────────────────────────────────> │ ║SignPrvA, SignPubA = load () ║
│ │ ║PrvA, PubA = DHgen () ║
│ │ ╚══ ═ ═ ═ ═ ╝ ╝ ═ ╝ ╝ ╝ ╝ ╝ ╝
│IdB, PubB, Zeichen (SignPrvB, (IdB, PubA, PubB)) │ ╔═════════════════════╗
„ SignPubB = load () ║
│ │ ║PrvB, PubB = DHgen () ║
│ │ ╚══ ═ ═ ═ ═ ╝ ╝ ═ ╝ ╝ ╝ ╝ ╝ ╝
│ Zeichen (SignPrvA, (IdA, PubB, PubA)) │ ╔═══════════════════╗
If ─ ─ ─ ─> ify ify ify ify ify ify ify ify ify ify ify ify ify ify ify ify ify ify ify ify ify ify ify ify ify ify SignPubB, ...) ║
Ey │ ey Schlüssel = DH (PrvA, PubB) ║
│ │ ╚══ ═ ═ ═ ╝ ╝ ╝ ═ ╝ ╝ ╝
│ │
Wir haben jedoch immer noch nicht „bewiesen“, dass wir denselben gemeinsamen Schlüssel für diese Sitzung entwickelt haben. Grundsätzlich können Sie auf diesen Schritt verzichten - die erste Transportverbindung ist ungültig, aber wir möchten, dass wir nach Abschluss des Handshakes sicher sind, dass wirklich alles vereinbart ist. Derzeit haben wir das ISO / IEC IS 9798-3-Protokoll in unseren Händen.
Wir könnten den Schlüssel selbst unterschreiben. Dies ist gefährlich, da es möglich ist, dass der verwendete Signaturalgorithmus undicht ist (lassen Sie Bits pro Signatur, aber immer noch Lecks). Sie können einen Hash vom generierten Schlüssel signieren, aber selbst ein Hash-Leck vom generierten Schlüssel kann bei einem Brute-Force-Angriff auf die Generierungsfunktion von Wert sein. SIGMA verwendet eine MAC-Funktion, die die Absender-ID authentifiziert.
┌─────┐ ┌─────┐
EPeerA│ │PeerB│
└──┬──┘ └──┬──┘
│ IdA, PubA │ │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ > │ ║SignPrvA, SignPubA = load () ║
│ │ ║PrvA, PubA = DHgen () ║
│ │ ╚══ ═ ═ ═ ═ ╝ ╝ ═ ╝ ╝ ╝ ╝ ╝ ╝
│IdB, PubB, Zeichen (SignPrvB, (PubA, PubB)), MAC (IdB) │ ╔═════════════════╗
│ <──── ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│ ║SignPrvB, SignPubB = load () ║
│ │ ║PrvB, PubB = DHgen () ║
│ │ ╚══ ═ ═ ═ ═ ╝ ╝ ═ ╝ ╝ ╝ ╝ ╝ ╝
│ │ ╔══ ═ ═ ═ ╗ ╗ ╗ ═ ╗ ╗ ╗
│ Zeichen (SignPrvA, (PubB, PubA)), MAC (IdA) │ Schlüssel = DH (PrvA, PubB) ║
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ > │ ║ überprüfen (Schlüssel, IdB) ║
│ │ ║verify (SignPubB, ...) ║
│ │ ╚══ ═ ═ ═ ╝ ╝ ╝ ═ ╝ ╝ ╝
│ │
Als Optimierung möchten einige möglicherweise ihre kurzlebigen Schlüssel wiederverwenden (was für PFS natürlich bedauerlich ist). Zum Beispiel haben wir ein Schlüsselpaar generiert, versucht, eine Verbindung herzustellen, aber TCP war nicht verfügbar oder wurde irgendwo in der Mitte des Protokolls abgebrochen. Es ist schade, die Entropie- und Prozessorressourcen für ein neues Paar auszugeben. Aus diesem Grund führen wir das sogenannte Cookie ein - einen Pseudozufallswert, der vor möglichen versehentlichen Wiederholungsangriffen schützt, wenn kurzlebige öffentliche Schlüssel wiederverwendet werden. Aufgrund der Bindung zwischen dem Cookie und dem kurzlebigen öffentlichen Schlüssel kann der öffentliche Schlüssel der Gegenpartei als unnötig aus der Signatur entfernt werden.
┌─────┐ ┌─────┐
EPeerA│ │PeerB│
└──┬──┘ └──┬──┘
│ IdA, PubA, CookieA │ ╔═════════════════════════
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ───────────────────── >> │ │ SignPrvA, SignPubA = load () ║
│ │ ║PrvA, PubA = DHgen () ║
│ │ ╚══ ═ ═ ═ ═ ╝ ╝ ═ ╝ ╝ ╝ ╝ ╝ ╝
│IdB, PubB, CookieB, Zeichen (SignPrvB, (CookieA, CookieB, PubB)), MAC (IdB) │ ╔═══════════════════════ ═══╗
│ <─── ─ ─ ─ ─ ─ ─ │ │ │ │ │ │ ───────────────────────│ ║║ignignignignignignign│ign SignPrvB, SignPubB = load () ║
│ │ ║PrvB, PubB = DHgen () ║
│ │ ╚══ ═ ═ ═ ═ ╝ ╝ ═ ╝ ╝ ╝ ╝ ╝ ╝
│ │ ╔══ ═ ═ ═ ╗ ╗ ╗ ═ ╗ ╗ ╗
│ Zeichen (SignPrvA, (CookieB, CookieA, PubA)), MAC (IdA) │ Schlüssel = DH (PrvA, PubB) ║
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ify ify ify ify ify ify ify ify ify ify ify ify Verify (Schlüssel, IdB) ║
│ │ ║verify (SignPubB, ...) ║
│ │ ╚══ ═ ═ ═ ╝ ╝ ╝ ═ ╝ ╝ ╝
│ │
Schließlich möchten wir die Privatsphäre unserer Gesprächspartner-IDs von einem passiven Beobachter erhalten. Zu diesem Zweck schlägt SIGMA vor, zunächst kurzlebige Schlüssel auszutauschen und einen gemeinsamen Schlüssel für die Authentifizierung von Authentifizierungsnachrichten auszuarbeiten. SIGMA beschreibt zwei Optionen:
- SIGMA-I - schützt den Initiator vor aktiven Angriffen, den Responder vor passiven: Der Initiator authentifiziert den Responder und wenn etwas nicht passt, gibt er seine Identifikation nicht heraus. Der Angeklagte gibt seinen Ausweis, wenn Sie mit ihm ein aktives Protokoll beginnen. Der passive Beobachter wird nichts wissen;
SIGMA-R - schützt den Responder vor aktiven Angriffen, den Initiator vor passiven. Alles ist genau das Gegenteil, aber in diesem Protokoll werden bereits vier Handshake-Nachrichten übertragen.
Wir wählen SIGMA-I als ähnlicher als das, was wir von den üblichen Server-Client-Dingen erwarten: Nur ein authentifizierter Server erkennt den Client, und jeder kennt den Server trotzdem. Außerdem ist die Implementierung aufgrund weniger Handshake-Nachrichten einfacher. Alles, was wir dem Protokoll hinzufügen, ist die Verschlüsselung des Nachrichtenteils und die Übertragung der Kennung A an den verschlüsselten Teil der letzten Nachricht:
┌─────┐ ┌─────┐
EPeerA│ │PeerB│
└──┬──┘ └──┬──┘
│ PubA, CookieA │ ╔══════════════════════════╗
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ign ign ign ign ign ign ign ign ign ign ign ign ,,,,,,, Vorzeichen Vorzeichen Vorzeichen Vorzeichen Last Last Last Last Last Last ((((((((()))))
│ │ ║PrvA, PubA = DHgen () ║
│ │ ╚═══════════════════════════╝
│PubB, CookieB, Enc((IdB, sign(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB))) │ ╔═══════════════════════════╗
│<─────────────────────────────────────────────────────────────────────────────│ ║SignPrvB, SignPubB = load()║
│ │ ║PrvB, PubB = DHgen() ║
│ │ ╚═══════════════════════════╝
│ │ ╔═════════════════════╗
│ Enc((IdA, sign(SignPrvA, (CookieB, CookieA, PubA)), MAC(IdA))) │ ║Key = DH(PrvA, PubB) ║
│─────────────────────────────────────────────────────────────────────────────>│ ║verify(Key, IdB) ║
│ │ ║verify(SignPubB, ...)║
│ │ ╚═════════════════════╝
│ │
- 34.10-2012 256- .
- 34.10-2012 VKO.
- MAC CMAC. , 34.13-2015. — (34.12-2015).
- . -256 (34.11-2012 256 ).
. . : , , (MAC) , . , , . , , ? . , KDF (key derivation function). , - : HKDF , . , Python , hkdf . HKDF HMAC , , , -. Python Wikipedia . 34.10-2012, - -256. , :
kdf = Hkdf(None, key_session, hash=GOST34112012256) kdf.expand(b"handshake1-mac-identity") kdf.expand(b"handshake1-enc") kdf.expand(b"handshake1-mac") kdf.expand(b"handshake2-mac-identity") kdf.expand(b"handshake2-enc") kdf.expand(b"handshake2-mac") kdf.expand(b"transport-initiator-enc") kdf.expand(b"transport-initiator-mac") kdf.expand(b"transport-responder-enc") kdf.expand(b"transport-responder-mac")
/
ASN.1 :
class Msg(Choice): schema = (( ("text", MsgText()), ("handshake0", MsgHandshake0(expl=tag_ctxc(0))), ("handshake1", MsgHandshake1(expl=tag_ctxc(1))), ("handshake2", MsgHandshake2(expl=tag_ctxc(2))), )) class MsgText(Sequence): schema = (( ("payload", MsgTextPayload()), ("payloadMac", MAC()), )) class MsgTextPayload(Sequence): schema = (( ("nonce", Integer(bounds=(0, float("+inf")))), ("ciphertext", OctetString(bounds=(1, MaxTextLen))), )) class MsgHandshake0(Sequence): schema = (( ("cookieInitiator", Cookie()), ("pubKeyInitiator", PubKey()), )) class MsgHandshake1(Sequence): schema = (( ("cookieResponder", Cookie()), ("pubKeyResponder", PubKey()), ("ukm", OctetString(bounds=(8, 8))), ("ciphertext", OctetString()), ("ciphertextMac", MAC()), )) class MsgHandshake2(Sequence): schema = (( ("ciphertext", OctetString()), ("ciphertextMac", MAC()), )) class HandshakeTBE(Sequence): schema = (( ("identity", OctetString(bounds=(32, 32))), ("signature", OctetString(bounds=(64, 64))), ("identityMac", MAC()), )) class HandshakeTBS(Sequence): schema = (( ("cookieTheir", Cookie()), ("cookieOur", Cookie()), ("pubKeyOur", PubKey()), )) class Cookie(OctetString): bounds = (16, 16) class PubKey(OctetString): bounds = (64, 64) class MAC(OctetString): bounds = (16, 16)
HandshakeTBS — , (to be signed). HandshakeTBE — , (to be encrypted). ukm MsgHandshake1. 34.10 VKO, , UKM (user keying material) — .
, ( , , ).
, - . JSON :
{ "our": { "prv": "21254cf66c15e0226ef2669ceee46c87b575f37f9000272f408d0c9283355f98", "pub": "938c87da5c55b27b7f332d91b202dbef2540979d6ceaa4c35f1b5bfca6df47df0bdae0d3d82beac83cec3e353939489d9981b7eb7a3c58b71df2212d556312a1" }, "their": { "alice": "d361a59c25d2ca5a05d21f31168609deeec100570ac98f540416778c93b2c7402fd92640731a707ec67b5410a0feae5b78aeec93c4a455a17570a84f2bc21fce", "bob": "aade1207dd85ecd283272e7b69c078d5fae75b6e141f7649ad21962042d643512c28a2dbdc12c7ba40eb704af920919511180c18f4d17e07d7f5acd49787224a" } }
our — , . their — . JSON :
from pygost import gost3410 from pygost.gost34112012256 import GOST34112012256 CURVE = gost3410.GOST3410Curve( *gost3410.CURVE_PARAMS["GostR3410_2001_CryptoPro_A_ParamSet"] ) parser = argparse.ArgumentParser(description="GOSTIM") parser.add_argument( "--keys-gen", action="store_true", help="Generate JSON with our new keypair", ) parser.add_argument( "--keys", default="keys.json", required=False, help="JSON with our and their keys", ) parser.add_argument( "--bind", default="::1", help="Address to listen on", ) parser.add_argument( "--port", type=int, default=6666, help="Port to listen on", ) args = parser.parse_args() if args.keys_gen: prv_raw = urandom(32) pub = gost3410.public_key(CURVE, gost3410.prv_unmarshal(prv_raw)) pub_raw = gost3410.pub_marshal(pub) print(json.dumps({ "our": {"prv": hexenc(prv_raw), "pub": hexenc(pub_raw)}, "their": {}, })) exit(0)
34.10 — . 256- 256- . PyGOST , , (urandom(32)) , gost3410.prv_unmarshal(). , gost3410.public_key(). 34.10 — , , gost3410.pub_marshal().
JSON , , , , gost3410.pub_unmarshal(). , . -256 gost34112012256.GOST34112012256(), hashlib -.
? , : cookie (128- ), 34.10, VKO .
395 async def initiator(host, port): 396 _id = repr((host, port)) 397 logging.info("%s: dialing", _id) 398 reader, writer = await asyncio.open_connection(host, port) 399
423 logging.info("%s: got %s message", _id, msg.choice) 424 if msg.choice != "handshake1": 425 logging.warning("%s: unexpected message, disconnecting", _id) 426 writer.close() 427 return 428
UKM 64- (urandom(8)), , gost3410_vko.ukm_unmarshal(). VKO 34.10-2012 256- gost3410_vko.kek_34102012256() (KEK — key encryption key).
256- . HKDF . GOST34112012256 hashlib , Hkdf . ( Hkdf) , - . kdf.expand() 256-, .
TBE TBS :
- MAC ;
- ;
- TBE ;
- ;
- MAC ;
- TBS , cookie . .
441 try: 442 peer_name = validate_tbe( 443 msg_handshake1, 444 key_handshake1_mac_identity, 445 key_handshake1_enc, 446 key_handshake1_mac, 447 cookie_our, 448 cookie_their, 449 pub_their_raw, 450 ) 451 except ValueError as err: 452 logging.warning("%s: %s, disconnecting", _id, err) 453 writer.close() 454 return 455
, 34.13-2015 34.12-2015. , MAC-. PyGOST gost3413.mac(). ( ), , , . hardcode- ? 34.12-2015 128- , 64- — 28147-89, .
gost.3412.GOST3412Kuznechik(key) .encrypt()/.decrypt() , 34.13 . MAC : gost3413.mac(GOST3412Kuznechik(key).encrypt, KUZNECHIK_BLOCKSIZE, ciphertext). MAC- (==) , , , , BEAST TLS. Python hmac.compare_digest .
. , , . 34.13-2015 : ECB, CTR, OFB, CBC, CFB. . , ( CCM, OCB, GCM ) — MAC. (CTR): , , , ( CBC, ).
.mac(), .ctr() : ciphertext = gost3413.ctr(GOST3412Kuznechik(key).encrypt, KUZNECHIK_BLOCKSIZE, plaintext, iv). , . ( ), . handshake .
gost3410.verify() : ( GOSTIM ), ( , , ), 34.11-2012 .
, handshake2 , , : , .…
456
, ( , , ), MAC-:
499
msg_sender , TCP-. nonce, . .
async def msg_sender(peer_name: str, key_enc: bytes, key_mac: bytes, writer) -> None: nonce = 0 encrypter = GOST3412Kuznechik(key_enc).encrypt macer = GOST3412Kuznechik(key_mac).encrypt in_queue = IN_QUEUES[peer_name] while True: text = await in_queue.get() if text is None: break ciphertext = ctr( encrypter, KUZNECHIK_BLOCKSIZE, text.encode("utf-8"), long2bytes(nonce, 8), ) payload = MsgTextPayload(( ("nonce", Integer(nonce)), ("ciphertext", OctetString(ciphertext)), )) mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode()) writer.write(Msg(("text", MsgText(( ("payload", payload), ("payloadMac", MAC(mac_tag)), )))).encode()) nonce += 1
msg_receiver, :
async def msg_receiver( msg_text: MsgText, nonce_expected: int, macer, encrypter, peer_name: str, ) -> None: payload = msg_text["payload"] if int(payload["nonce"]) != nonce_expected: raise ValueError("unexpected nonce value") mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode()) if not compare_digest(mac_tag, bytes(msg_text["payloadMac"])): raise ValueError("invalid MAC") plaintext = ctr( encrypter, KUZNECHIK_BLOCKSIZE, bytes(payload["ciphertext"]), long2bytes(nonce_expected, 8), ) text = plaintext.decode("utf-8") await OUT_QUEUES[peer_name].put(text)
Fazit
GOSTIM ( , )! (-256 : 995bbd368c04e50a481d138c5fa2e43ec7c89bc77743ba8dbabee1fde45de120). , GoGOST , PyDERASN , NNCP , GoVPN , GOSTIM , GPLv3+ .
, , , Python/Go-, « „“ .