GOSTIM: P2P F2F E2EE IM em uma noite com criptografia GOST

Como desenvolvedor da biblioteca PyGOST (primitivas criptográficas GOST em Python puro), muitas vezes recebo perguntas sobre como implementar as mensagens seguras mais simples no meu joelho. Muitos consideram a criptografia aplicada uma coisa bastante simples, e uma chamada .encrypt () para uma cifra de bloco será suficiente para enviar com segurança por um canal de comunicação. Outros acreditam que a criptografia aplicada é o destino de poucos, e é aceitável que empresas ricas como o Telegram com olimpíadas matemáticas não possam implementar um protocolo seguro.

Tudo isso me levou a escrever este artigo para mostrar que a implementação de protocolos criptográficos e MI seguro não é uma tarefa tão difícil. No entanto, inventar sua própria autenticação e protocolos de contrato de chave não vale a pena.

Heearing

O artigo será escrito mensageiro instantâneo criptografado ponto a ponto , amigo a amigo , de ponta a ponta com autenticação SIGMA-I e protocolo de acordo de chave (com base no qual o IPsec IKE é implementado) usando exclusivamente algoritmos criptográficos GOST, bibliotecas PyGOST e Codificação ASN.1 de mensagens com a biblioteca PyDERASN (sobre a qual eu já escrevi antes ). Pré-requisito: deve ser tão simples que possa ser escrito do zero em uma noite (ou dia de trabalho); caso contrário, não será mais um programa simples. Provavelmente tem erros, dificuldades desnecessárias, deficiências, e esse é o meu primeiro programa usando a biblioteca assíncrona.

Design IM


Para começar, você precisa entender como será a nossa IM. Por uma questão de simplicidade, seja uma rede ponto a ponto, sem nenhuma descoberta dos participantes. Indicaremos pessoalmente para qual endereço: a porta a ser conectada para se comunicar com o interlocutor.

Entendo que, no momento, a suposição da disponibilidade de comunicação direta entre dois computadores arbitrários é uma limitação significativa da aplicabilidade do IM na prática. Porém, quanto mais desenvolvedores implementarem todos os tipos de muletas transversais à NAT, mais permaneceremos na Internet IPv4, com a probabilidade deprimente de comunicação entre computadores arbitrários. Bem, quanto você pode suportar a falta de IPv6 em casa e no trabalho?

Teremos uma rede amigo a amigo: todos os interlocutores possíveis devem ser conhecidos com antecedência. Em primeiro lugar, simplifica muito tudo: se apresentou, encontrou ou não encontrou um nome / chave, desconectou ou continuou a trabalhar, conhecendo o interlocutor. Em segundo lugar, no caso geral, é seguro e exclui muitos ataques.

A interface do IM estará próxima das soluções clássicas de projetos sem sucção , que eu realmente gosto pelo minimalismo e pela filosofia do Unix-way. Um programa de IM para cada interlocutor cria um diretório com três soquetes de domínio Unix:

  • as mensagens enviadas para o interlocutor são gravadas nele;
  • mensagens de saída recebidas do interlocutor são lidas a partir dele;
  • estado - lendo, descobriremos se o interlocutor está conectado agora, o endereço / porta de conexão.

Além disso, um soquete de conexão é criado, gravando em qual porta do host, iniciamos uma conexão com um interlocutor remoto.

 | - alice
 |  | - em
 |  | - fora
 |  `- estado
 | - bob
 |  | - em
 |  | - fora
 |  `- estado
 `- conn

Essa abordagem permite que você faça implementações independentes do transporte de mensagens instantâneas e da interface do usuário, porque não há amigo para o sabor e a cor, você não agradará a todos. Usando tmux e / ou multitail , você pode obter uma interface de várias janelas com destaque de sintaxe. E com o rlwrap, você pode obter uma string compatível com o GNU Readline para inserir mensagens.

De fato, projetos sem sucção usam arquivos FIFO. Pessoalmente, eu não conseguia entender como trabalhar de forma assíncrona com arquivos de forma competitiva sem um substrato feito à mão a partir de threads selecionados (eu uso a linguagem Go para essas coisas há muito tempo). Portanto, decidi conviver com os soquetes de domínio Unix. Infelizmente, isso torna impossível fazer eco 2001: 470: dead :: babe 6666> conn. Resolvi esse problema usando o socat : echo 2001: 470: dead :: babe 6666 | socat - UNIX-CONNECT: conn, socat READLINE UNIX-CONNECT: alice / in.

Protocolo inseguro inicial


O TCP é usado como transporte: garante a entrega e seu pedido. O UDP não garante nem um nem outro (o que seria útil quando a criptografia é aplicada), e o suporte SCTP no Python está pronto para uso.

Infelizmente, no TCP não há conceito de mensagem, mas apenas um fluxo de bytes. Portanto, é necessário criar um formato para as mensagens para que elas possam ser compartilhadas entre si nesse fluxo. Podemos concordar em usar o caractere de avanço de linha. Para iniciantes, é adequado, no entanto, quando começamos a criptografar nossas mensagens, esse símbolo pode aparecer em qualquer lugar no texto cifrado. Portanto, os protocolos são populares nas redes, enviando primeiro o tamanho da mensagem em bytes. Por exemplo, no Python, fora da caixa, existe o xdrlib, que permite trabalhar com um formato XDR semelhante.

Não trabalharemos corretamente e eficientemente com a leitura TCP - simplificamos o código. Lemos os dados do soquete em um loop infinito até decodificar a mensagem completa. Você também pode usar JSON com XML como um formato para esta abordagem. Mas quando a criptografia é adicionada, os dados terão que ser assinados e autenticados - e isso exigirá uma representação idêntica de objetos, byte a byte, que o JSON / XML não fornece (os dumps podem variar).

O XDR é adequado para essa tarefa, no entanto, escolho o ASN.1 com codificação DER e a biblioteca PyDERASN , pois teremos objetos de alto nível à mão, que geralmente são mais agradáveis ​​e convenientes de se trabalhar. Diferentemente do código de esquema sem esquema , MessagePack ou CBOR , o ASN.1 validará automaticamente os dados em um esquema codificado.

# Msg ::= CHOICE { # text MsgText, # handshake [0] EXPLICIT MsgHandshake } class Msg(Choice): schema = (( ("text", MsgText()), ("handshake", MsgHandshake(expl=tag_ctxc(0))), )) # MsgText ::= SEQUENCE { # text UTF8String (SIZE(1..MaxTextLen))} class MsgText(Sequence): schema = (( ("text", UTF8String(bounds=(1, MaxTextLen))), )) # MsgHandshake ::= SEQUENCE { # peerName UTF8String (SIZE(1..256)) } class MsgHandshake(Sequence): schema = (( ("peerName", UTF8String(bounds=(1, 256))), )) 

A mensagem recebida será Msg: um texto MsgText (com um campo de texto até o momento) ou uma mensagem de handshake MsgHandshake (na qual o nome do interlocutor é transmitido). Agora parece complicado demais, mas é um desafio para o futuro.

      ┌─────┐ ┌─────┐
      │PeerA│ │PeerB│
      └──┬──┘ └──┬──┘
         │MsgHandshake (IdA) │
         │───────────────── >> │
         │ │
         SMsgHandshake (IdB) │
         │ <─────────────────│
         │ │
         │ MsgText () │
         │───────────────── >> │
         │ │
         │ MsgText () │
         │ <─────────────────│
         │ │


IM sem criptografia


Como eu disse, para todas as operações com soquetes, a biblioteca assíncrona será usada. Declare o que esperamos no lançamento:

 parser = argparse.ArgumentParser(description="GOSTIM") parser.add_argument( "--our-name", required=True, help="Our peer name", ) parser.add_argument( "--their-names", required=True, help="Their peer names, comma-separated", ) parser.add_argument( "--bind", default="::1", help="Address to listen on", ) parser.add_argument( "--port", type=int, default=6666, help="Port to listen on", ) args = parser.parse_args() OUR_NAME = UTF8String(args.our_name) THEIR_NAMES = set(args.their_names.split(",")) 

Defina seu próprio nome (--nome-alice). Uma vírgula lista todos os interlocutores esperados (- seus nomes bob, véspera). Para cada um dos interlocutores, é criado um diretório com soquetes Unix, bem como uma rotina para cada estado de entrada, saída:

 for peer_name in THEIR_NAMES: makedirs(peer_name, mode=0o700, exist_ok=True) out_queue = asyncio.Queue() OUT_QUEUES[peer_name] = out_queue asyncio.ensure_future(asyncio.start_unix_server( partial(unixsock_out_processor, out_queue=out_queue), path.join(peer_name, "out"), )) in_queue = asyncio.Queue() IN_QUEUES[peer_name] = in_queue asyncio.ensure_future(asyncio.start_unix_server( partial(unixsock_in_processor, in_queue=in_queue), path.join(peer_name, "in"), )) asyncio.ensure_future(asyncio.start_unix_server( partial(unixsock_state_processor, peer_name=peer_name), path.join(peer_name, "state"), )) asyncio.ensure_future(asyncio.start_unix_server(unixsock_conn_processor, "conn")) 

As mensagens do soquete de entrada do usuário são enviadas para a fila IN_QUEUES:

 async def unixsock_in_processor(reader, writer, in_queue: asyncio.Queue) -> None: while True: text = await reader.read(MaxTextLen) if text == b"": break await in_queue.put(text.decode("utf-8")) 

As mensagens dos interlocutores são enviadas para a fila OUT_QUEUES, a partir da qual os dados são gravados no soquete de saída:

 async def unixsock_out_processor(reader, writer, out_queue: asyncio.Queue) -> None: while True: text = await out_queue.get() writer.write(("[%s] %s" % (datetime.now(), text)).encode("utf-8")) await writer.drain() 

Ao ler do soquete de estado, o programa procura no dicionário PEER_ALIVE o endereço do interlocutor. Se ainda não houver conexão com o interlocutor, uma linha vazia será gravada.

 async def unixsock_state_processor(reader, writer, peer_name: str) -> None: peer_writer = PEER_ALIVES.get(peer_name) writer.write( b"" if peer_writer is None else (" ".join([ str(i) for i in peer_writer.get_extra_info("peername")[:2] ]).encode("utf-8") + b"\n") ) await writer.drain() writer.close() 

Quando um endereço é gravado no soquete de conexão, a função "iniciador" da conexão é iniciada:

 async def unixsock_conn_processor(reader, writer) -> None: data = await reader.read(256) writer.close() host, port = data.decode("utf-8").split(" ") await initiator(host=host, port=int(port)) 

Considere o iniciador. Primeiro, ele obviamente abre uma conexão com o host / porta especificado e envia uma mensagem de handshake com seu nome:

  130 async def initiator(host, port): 131 _id = repr((host, port)) 132 logging.info("%s: dialing", _id) 133 reader, writer = await asyncio.open_connection(host, port) 134 # Handshake message {{{ 135 writer.write(Msg(("handshake", MsgHandshake(( 136 ("peerName", OUR_NAME), 137 )))).encode()) 138 # }}} 139 await writer.drain() 

Em seguida, aguarda uma resposta do lado remoto. Tenta decodificar a resposta recebida de acordo com o esquema Msg ASN.1. Assumimos que a mensagem inteira será enviada por um segmento TCP e a receberemos atomicamente quando .read () for chamado. Verificamos que recebemos exatamente a mensagem de handshake.

  141 # Wait for Handshake message {{{ 142 data = await reader.read(256) 143 if data == b"": 144 logging.warning("%s: no answer, disconnecting", _id) 145 writer.close() 146 return 147 try: 148 msg, _ = Msg().decode(data) 149 except ASN1Error: 150 logging.warning("%s: undecodable answer, disconnecting", _id) 151 writer.close() 152 return 153 logging.info("%s: got %s message", _id, msg.choice) 154 if msg.choice != "handshake": 155 logging.warning("%s: unexpected message, disconnecting", _id) 156 writer.close() 157 return 158 # }}} 

Verificamos que o nome da pessoa com quem estamos conversando é conhecido por nós. Caso contrário, interrompa a conexão. Verificamos se já estabelecemos uma conexão com ele (o interlocutor novamente deu o comando para conectar-se a nós) e a fechamos. Seqüências de caracteres Python com texto de mensagem são colocadas na fila IN_QUEUES, mas há um valor especial None, que sinaliza msg_sender à corotina para parar de funcionar, para que ela esqueça seu escritor, que está conectado a uma conexão TCP desatualizada.

  159 msg_handshake = msg.value 160 peer_name = str(msg_handshake["peerName"]) 161 if peer_name not in THEIR_NAMES: 162 logging.warning("unknown peer name: %s", peer_name) 163 writer.close() 164 return 165 logging.info("%s: session established: %s", _id, peer_name) 166 # Run text message sender, initialize transport decoder {{{ 167 peer_alive = PEER_ALIVES.pop(peer_name, None) 168 if peer_alive is not None: 169 peer_alive.close() 170 await IN_QUEUES[peer_name].put(None) 171 PEER_ALIVES[peer_name] = writer 172 asyncio.ensure_future(msg_sender(peer_name, writer)) 173 # }}} 

O msg_sender aceita mensagens de saída (enfileiradas de um soquete de entrada), as serializa em uma mensagem MsgText e as envia por uma conexão TCP. Pode interromper a qualquer momento - estamos claramente interceptando.

 async def msg_sender(peer_name: str, writer) -> None: in_queue = IN_QUEUES[peer_name] while True: text = await in_queue.get() if text is None: break writer.write(Msg(("text", MsgText(( ("text", UTF8String(text)), )))).encode()) try: await writer.drain() except ConnectionResetError: del PEER_ALIVES[peer_name] return logging.info("%s: sent %d characters message", peer_name, len(text)) 

No final, o iniciador entra em um ciclo interminável de leitura de mensagens do soquete. Verifica se esta é uma mensagem de texto e coloca no OUT_QUEUES a fila da qual eles serão enviados para o soquete de saída do interlocutor correspondente. Por que você não pode simplesmente executar .read () e decodificar a mensagem? Porque é possível que várias mensagens do usuário sejam agregadas no buffer do sistema operacional e enviadas por um segmento TCP. Podemos decodificar o primeiro e, em seguida, parte do subsequente pode permanecer no buffer. Em qualquer emergência, fechamos a conexão TCP e interrompemos a rotina msg_sender (enviando Nenhum para a fila OUT_QUEUES).

  174 buf = b"" 175 # Wait for test messages {{{ 176 while True: 177 data = await reader.read(MaxMsgLen) 178 if data == b"": 179 break 180 buf += data 181 if len(buf) > MaxMsgLen: 182 logging.warning("%s: max buffer size exceeded", _id) 183 break 184 try: 185 msg, tail = Msg().decode(buf) 186 except ASN1Error: 187 continue 188 buf = tail 189 if msg.choice != "text": 190 logging.warning("%s: unexpected %s message", _id, msg.choice) 191 break 192 try: 193 await msg_receiver(msg.value, peer_name) 194 except ValueError as err: 195 logging.warning("%s: %s", err) 196 break 197 # }}} 198 logging.info("%s: disconnecting: %s", _id, peer_name) 199 IN_QUEUES[peer_name].put(None) 200 writer.close() 66 async def msg_receiver(msg_text: MsgText, peer_name: str) -> None: 67 text = str(msg_text["text"]) 68 logging.info("%s: received %d characters message", peer_name, len(text)) 69 await OUT_QUEUES[peer_name].put(text) 

Vamos voltar ao código principal. Após criar todas as corotinas, no momento de iniciar o programa, iniciamos o servidor TCP. Para cada conexão estabelecida, ele cria uma resposta de rotina.

 logging.basicConfig( level=logging.INFO, format="%(levelname)s %(asctime)s: %(funcName)s: %(message)s", ) loop = asyncio.get_event_loop() server = loop.run_until_complete(asyncio.start_server(responder, args.bind, args.port)) logging.info("Listening on: %s", server.sockets[0].getsockname()) loop.run_forever() 

O respondedor é semelhante ao iniciador e reflete todas as mesmas ações, mas um loop infinito de leitura de mensagens começa imediatamente, por simplicidade. Agora, o protocolo de handshake envia uma mensagem de cada lado, mas, no futuro, haverá duas do iniciador da conexão, após as quais as mensagens de texto podem ser enviadas imediatamente.

  72 async def responder(reader, writer): 73 _id = writer.get_extra_info("peername") 74 logging.info("%s: connected", _id) 75 buf = b"" 76 msg_expected = "handshake" 77 peer_name = None 78 while True: 79 # Read until we get Msg message {{{ 80 data = await reader.read(MaxMsgLen) 81 if data == b"": 82 logging.info("%s: closed connection", _id) 83 break 84 buf += data 85 if len(buf) > MaxMsgLen: 86 logging.warning("%s: max buffer size exceeded", _id) 87 break 88 try: 89 msg, tail = Msg().decode(buf) 90 except ASN1Error: 91 continue 92 buf = tail 93 # }}} 94 if msg.choice != msg_expected: 95 logging.warning("%s: unexpected %s message", _id, msg.choice) 96 break 97 if msg_expected == "text": 98 try: 99 await msg_receiver(msg.value, peer_name) 100 except ValueError as err: 101 logging.warning("%s: %s", err) 102 break 103 # Process Handshake message {{{ 104 elif msg_expected == "handshake": 105 logging.info("%s: got %s message", _id, msg_expected) 106 msg_handshake = msg.value 107 peer_name = str(msg_handshake["peerName"]) 108 if peer_name not in THEIR_NAMES: 109 logging.warning("unknown peer name: %s", peer_name) 110 break 111 writer.write(Msg(("handshake", MsgHandshake(( 112 ("peerName", OUR_NAME), 113 )))).encode()) 114 await writer.drain() 115 logging.info("%s: session established: %s", _id, peer_name) 116 peer_alive = PEER_ALIVES.pop(peer_name, None) 117 if peer_alive is not None: 118 peer_alive.close() 119 await IN_QUEUES[peer_name].put(None) 120 PEER_ALIVES[peer_name] = writer 121 asyncio.ensure_future(msg_sender(peer_name, writer)) 122 msg_expected = "text" 123 # }}} 124 logging.info("%s: disconnecting", _id) 125 if msg_expected == "text": 126 IN_QUEUES[peer_name].put(None) 127 writer.close() 

Protocolo seguro


Chegou a hora de proteger nossa comunicação. O que queremos dizer com segurança e o que queremos:

  • confidencialidade das mensagens transmitidas;
  • autenticidade e integridade das mensagens transmitidas - suas alterações devem ser detectadas;
  • proteção contra ataques de repetição - o fato de que as mensagens foram perdidas ou tentadas novamente deve ser detectado (e decidimos desconectar);
  • identificação e autenticação de interlocutores por chaves públicas pré-orientadas - já decidimos anteriormente que estamos criando uma rede de amigo para amigo. Somente após a autenticação entenderemos com quem estamos nos comunicando;
  • a presença de propriedades de sigilo direto perfeito (PFS) - o comprometimento de nossa chave de assinatura de longa duração não deve levar à possibilidade de ler toda a correspondência anterior. Gravar tráfego interceptado se torna inútil;
  • validade / validade de mensagens (transporte e handshakes) somente dentro da mesma sessão TCP. A inserção de mensagens assinadas / autenticadas corretamente de outra sessão (mesmo com o mesmo interlocutor) não deve ser possível;
  • o observador passivo não deve ver identificadores de usuário, transmitir chaves públicas de longa duração nem hashes a partir deles. Algum tipo de anonimato de um observador passivo.

Surpreendentemente, quase todo mundo quer ter esse mínimo em qualquer protocolo de handshake, e muito poucas das opções acima são realizadas para protocolos caseiros. Então agora não vamos inventar coisas novas. Definitivamente, eu recomendaria o uso da estrutura Noise para criar protocolos, mas vamos escolher algo mais simples.

Os mais populares são dois protocolos:

  • O TLS é um protocolo complexo com uma longa história de bugs, escolas, vulnerabilidades, problemas de pensamento, complexidade e deficiências (no entanto, isso não se aplica muito ao TLS 1.3). Mas não o consideramos por causa da complexidade.
  • IPsec com IKE - não possui sérios problemas criptográficos, embora também não sejam simples. Se você ler sobre o IKEv1 e o IKEv2, a fonte deles é STS , ISO / IEC IS 9798-3 e SIGMA (SIGn-e-MAc) - simples o suficiente para implementar em uma noite.

Como o SIGMA, como o último elo no desenvolvimento de protocolos STS / ISO, é bom? Satisfaz todos os nossos requisitos (incluindo "ocultar" os identificadores dos interlocutores)), não possui problemas criptográficos conhecidos. É minimalista - remover pelo menos um elemento da mensagem do protocolo levará à sua insegurança.

Vamos do protocolo caseiro mais simples para o SIGMA. A operação mais básica na qual estamos interessados ​​é a correspondência de teclas : uma função na qual ambos os participantes receberão o mesmo valor que pode ser usado como uma chave simétrica. Sem entrar em detalhes: cada uma das partes gera um par de chaves efêmero (usado apenas na mesma sessão) (chaves públicas e privadas), troca chaves públicas, chama a função de correspondência, para a entrada da qual eles transmitem sua chave privada e a chave pública do interlocutor.

 ┌─────┐ ┌─────┐
 │PeerA│ │PeerB│
 └──┬──┘ └──┬──┘
    │ IdA, PubA ╔════════════════════╗ ╔════════════════════╗
    │────────────── >> │ rPrvA, PubA = DHgen () ║
    │ │ ╚═══════════════════╝
    │ IdB, PubB ╔════════════════════╗ ╔════════════════════╗
    <──────────────│ vPrvB, PubB = DHgen () ║
    │ │ ╚═══════════════════╝
    ────┐ ╔═══════════════════╗
        ║ ║ Tecla = DH (PrvA, PubB) ║
    <───┘ ╚═══════╤═══════════╝
    │ │
    │ │


Qualquer um pode intervir no meio e substituir as chaves públicas pelas suas - neste protocolo não há autenticação dos interlocutores. Adicione uma assinatura com chaves de longa duração.

 ┌─────┐ ┌─────┐
 │PeerA│ │PeerB│
 └──┬──┘ └──┬──┘
    DIdA, PubA, sinal (SignPrvA, (PubA)) │ ╔═══════════════════════╗
    Ign ─ ─ ─ ─ ign ign ign ign ign ign ign ign ign ign ign ign ,,,,,,,, SignPubA = load () ║
    V │ vPrvA, PubA = DHgen () ║
    │ │ ╚══ ═ ═ ═ ╝ ╝ ═ ╝ ╝ ╝ ╝
    DIdB, PubB, sinal (SignPrvB, (PubB)) │ ╔═══════════════════════╗
    <─────────────────────────────── ║SignPrvB, SignPubB = load () ║
    V │ vPrvB, PubB = DHgen () ║
    │ │ ╚══ ═ ═ ═ ╝ ╝ ═ ╝ ╝ ╝ ╝
    ────┐ ╔═════════════════════│ │
        Verificar (SignPubB, ...)
    <───┘Key = DH (PrvA, PubB) ║ │
    │ ╚═════════════════════╝ │
    │ │


Essa assinatura não funcionará, pois não está vinculada a uma sessão específica. Essas mensagens também são adequadas para sessões com outros participantes. Todo o contexto deve ser inscrito. Isso também força a adição de outra mensagem de A.

Além disso, é fundamental adicionar seu próprio identificador como uma assinatura, pois, caso contrário, podemos substituir o IdXXX e assinar novamente a mensagem com a chave de outro interlocutor conhecido. Para evitar ataques de reflexão , é necessário que os elementos sob a assinatura estejam em locais claramente definidos em seu significado: se A assina (PubA, PubB), então B deve assinar (PubB, PubA). Isso também indica a importância de escolher a estrutura e o formato dos dados serializados. Por exemplo, os conjuntos na codificação ASN.1 DER são classificados: SET OF (PubA, PubB) será idêntico a SET OF (PubB, PubA).

 ┌─────┐ ┌─────┐
 │PeerA│ │PeerB│
 └──┬──┘ └──┬──┘
    │ IdA, PubA ╔══════════════════════════ ╔══════════════════════════
    │─ ─ ─ ─ ─ ─ ign ign ign ign ign ign ign ign ign ign ign ign ign ign ign ign ign ign ign ign ign ign ign ign ign ign ign ign ign ign ign ign ign ign ign ign ign ign SignPubA = load () ║
    V │ vPrvA, PubA = DHgen () ║
    │ │ ╚══ ═ ═ ═ ╝ ╝ ═ ╝ ╝ ╝ ╝
    DIdB, PubB, sinal (SignPrvB, (IdB, PubA, PubB)) │ ╔═════════════════════╗
    │ <───────────────────────────────────────────────── ║SignPrvB, SignPubB = load () ║
    V │ vPrvB, PubB = DHgen () ║
    │ │ ╚══ ═ ═ ═ ╝ ╝ ═ ╝ ╝ ╝ ╝
    │ sinal (SignPrvA, (IdA, PubB, PubA)) │ ╔═══════════════════╗
    │ ─ ─ ─ ─ ─> ify ify ify ify ify ify ify ify ify ify ify ify ify ify ify ify ify ify ify ify SignPubB, ...) ║
    = │ Tecla = DH (PrvA, PubB) ║
    │ │ ╚══ ═ ═ ╝ ╝ ╝ ═ ╝ ╝ ╝
    │ │


No entanto, ainda não “provamos” que desenvolvemos a mesma chave comum para esta sessão. Em princípio, você pode passar sem essa etapa - a primeira conexão de transporte será inválida, mas queremos que, quando o aperto de mão for concluído, tenhamos certeza de que tudo está realmente de acordo. No momento, temos em nossas mãos o protocolo ISO / IEC IS 9798-3.

Poderíamos assinar a chave em si. Isso é perigoso, pois é possível que haja vazamentos no algoritmo de assinatura usado (deixe bits por assinatura, mas ainda vaze). Você pode assinar um hash da chave gerada, mas mesmo um vazamento de hash da chave gerada pode ter valor em um ataque de força bruta à função de geração. O SIGMA usa uma função MAC que autentica o ID do remetente.

 ┌─────┐ ┌─────┐
 │PeerA│ │PeerB│
 └──┬──┘ └──┬──┘
    │ IdA, PubA ╔══════════════════════════ ╔══════════════════════════
    │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ > ║ ║SignPrvA, SignPubA = load () ║
    V │ vPrvA, PubA = DHgen () ║
    │ │ ╚══ ═ ═ ═ ╝ ╝ ═ ╝ ╝ ╝ ╝
    DIdB, PubB, sinal (SignPrvB, (PubA, PubB)), MAC (IdB) │ ╔════════════
    │ <─── ─ ─ ─ ─ ─ │ │ │ │ │. ─│SignPrvB, SignPubB = load ()
    V │ vPrvB, PubB = DHgen () ║
    │ │ ╚══ ═ ═ ═ ╝ ╝ ═ ╝ ╝ ╝ ╝
    │ │ ╔══ ═ ═ ╗ ╗ ╗ ═ ╗ ╗ ╗
    │ sinal (SignPrvA, (PubB, PubA)), MAC (IdA) │ eyChave = DH (PrvA, PubB) ║
    │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ > Verificar (Chave, IdB)
    Verificar (SignPubB, ...)
    │ │ ╚══ ═ ═ ╝ ╝ ╝ ═ ╝ ╝ ╝
    │ │


Como otimização, alguns podem querer reutilizar suas chaves efêmeras (o que, é claro, é deplorável para o PFS). Por exemplo, geramos um par de chaves, tentamos conectar, mas o TCP não estava disponível ou interrompemos em algum lugar no meio do protocolo. É uma pena gastar os recursos de entropia e processador gastos em um novo par. Portanto, apresentamos o chamado cookie - um valor pseudo-aleatório que protegerá contra possíveis ataques acidentais de reprodução ao reutilizar chaves públicas efêmeras. Devido à ligação entre o cookie e a chave pública efêmera, a chave pública da parte oposta pode ser removida da assinatura como desnecessária.

 ┌─────┐ ┌─────┐
 │PeerA│ │PeerB│
 └──┬──┘ └──┬──┘
    │ IdA, PubA, CookieA ╔═════════════════════════ ╔═════════════════════════
    │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─────────────────── >> >> SignPrvA, SignPubA = load () ║
    V │ vPrvA, PubA = DHgen () ║
    │ │ ╚══ ═ ═ ═ ╝ ╝ ═ ╝ ╝ ╝ ╝
    DIdB, PubB, CookieB, sinal (SignPrvB, (CookieA, CookieB, PubB)), MAC (IdB) │ ╔═══════════════════════ ═══╗
    │ <─── ─ ─ ─ ─ ─ │ │ │ │ │. ─────────────────────│ ║SignPrvB, SignPubB = load () ║
    V │ vPrvB, PubB = DHgen () ║
    │ │ ╚══ ═ ═ ═ ╝ ╝ ═ ╝ ╝ ╝ ╝
    │ │ ╔══ ═ ═ ╗ ╗ ╗ ═ ╗ ╗ ╗
    │ sinal (SignPrvA, (CookieB, CookieA, PubA)), MAC (IdA) │ ║Key = DH (PrvA, PubB) ║
    │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ify ify ify ify ify ify ify ify ify ify Verifique (Chave, IdB) ║
    Verificar (SignPubB, ...)
    │ │ ╚══ ═ ═ ╝ ╝ ╝ ═ ╝ ╝ ╝
    │ │


Finalmente, queremos obter a privacidade de nossos identificadores interlocutores de um observador passivo. Para fazer isso, o SIGMA sugere primeiro trocar chaves efêmeras, elaborando uma chave comum na qual autenticar e autenticar mensagens. O SIGMA descreve duas opções:

  • SIGMA-I - protege o iniciador de ataques ativos, o respondente de ataques passivos: o iniciador autentica o respondedor e, se algo não se encaixa, não fornece sua identificação. O réu dá sua identificação se você iniciar um protocolo ativo com ele. O observador passivo não saberá nada;
    SIGMA-R - protege o respondente de ataques ativos, o iniciador de passivo. Tudo é exatamente o contrário, mas neste protocolo já são transmitidas quatro mensagens de handshake.


    Escolhemos o SIGMA-I como mais semelhante ao que esperamos das coisas usuais de servidor-cliente: apenas um servidor autenticado reconhece o cliente e todo mundo conhece o servidor de qualquer maneira. Além disso, é mais fácil de implementar devido a menos mensagens de handshake. Tudo o que adicionamos ao protocolo é a criptografia da parte da mensagem e a transferência do identificador A para a parte criptografada da última mensagem:

     ┌─────┐ ┌─────┐
     │PeerA│ │PeerB│
     └──┬──┘ └──┬──┘
        │ PubA, CookieA │ ╔══════════════════════════╗
        │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ign ign ign ign ign ign ign ign ign ign ,,,,, SignPubA = load () ║
        V │ vPrvA, PubA = DHgen () ║
       │ │ ╚═══════════════════════════╝
       │PubB, CookieB, Enc((IdB, sign(SignPrvB, (CookieA, CookieB, PubB)), MAC(IdB))) │ ╔═══════════════════════════╗
       │<─────────────────────────────────────────────────────────────────────────────│ ║SignPrvB, SignPubB = load()║
       │ │ ║PrvB, PubB = DHgen() ║
       │ │ ╚═══════════════════════════╝
       │ │ ╔═════════════════════╗
       │ Enc((IdA, sign(SignPrvA, (CookieB, CookieA, PubA)), MAC(IdA))) │ ║Key = DH(PrvA, PubB) ║
       │─────────────────────────────────────────────────────────────────────────────>│ ║verify(Key, IdB) ║
       │ │ ║verify(SignPubB, ...)║
       │ │ ╚═════════════════════╝
       │ │
    


    • 34.10-2012 256- .
    • 34.10-2012 VKO.
    • MAC CMAC. , 34.13-2015. — (34.12-2015).
    • . -256 (34.11-2012 256 ).


    . . : , , (MAC) , . , , . , , ? . , KDF (key derivation function). , - : HKDF , . , Python , hkdf . HKDF HMAC , , , -. Python Wikipedia . 34.10-2012, - -256. , :

     kdf = Hkdf(None, key_session, hash=GOST34112012256) kdf.expand(b"handshake1-mac-identity") kdf.expand(b"handshake1-enc") kdf.expand(b"handshake1-mac") kdf.expand(b"handshake2-mac-identity") kdf.expand(b"handshake2-enc") kdf.expand(b"handshake2-mac") kdf.expand(b"transport-initiator-enc") kdf.expand(b"transport-initiator-mac") kdf.expand(b"transport-responder-enc") kdf.expand(b"transport-responder-mac") 

    /


    ASN.1 :

     class Msg(Choice): schema = (( ("text", MsgText()), ("handshake0", MsgHandshake0(expl=tag_ctxc(0))), ("handshake1", MsgHandshake1(expl=tag_ctxc(1))), ("handshake2", MsgHandshake2(expl=tag_ctxc(2))), )) class MsgText(Sequence): schema = (( ("payload", MsgTextPayload()), ("payloadMac", MAC()), )) class MsgTextPayload(Sequence): schema = (( ("nonce", Integer(bounds=(0, float("+inf")))), ("ciphertext", OctetString(bounds=(1, MaxTextLen))), )) class MsgHandshake0(Sequence): schema = (( ("cookieInitiator", Cookie()), ("pubKeyInitiator", PubKey()), )) class MsgHandshake1(Sequence): schema = (( ("cookieResponder", Cookie()), ("pubKeyResponder", PubKey()), ("ukm", OctetString(bounds=(8, 8))), ("ciphertext", OctetString()), ("ciphertextMac", MAC()), )) class MsgHandshake2(Sequence): schema = (( ("ciphertext", OctetString()), ("ciphertextMac", MAC()), )) class HandshakeTBE(Sequence): schema = (( ("identity", OctetString(bounds=(32, 32))), ("signature", OctetString(bounds=(64, 64))), ("identityMac", MAC()), )) class HandshakeTBS(Sequence): schema = (( ("cookieTheir", Cookie()), ("cookieOur", Cookie()), ("pubKeyOur", PubKey()), )) class Cookie(OctetString): bounds = (16, 16) class PubKey(OctetString): bounds = (64, 64) class MAC(OctetString): bounds = (16, 16) 

    HandshakeTBS — , (to be signed). HandshakeTBE — , (to be encrypted). ukm MsgHandshake1. 34.10 VKO, , UKM (user keying material) — .


    , ( , , ).

    , - . JSON :

     { "our": { "prv": "21254cf66c15e0226ef2669ceee46c87b575f37f9000272f408d0c9283355f98", "pub": "938c87da5c55b27b7f332d91b202dbef2540979d6ceaa4c35f1b5bfca6df47df0bdae0d3d82beac83cec3e353939489d9981b7eb7a3c58b71df2212d556312a1" }, "their": { "alice": "d361a59c25d2ca5a05d21f31168609deeec100570ac98f540416778c93b2c7402fd92640731a707ec67b5410a0feae5b78aeec93c4a455a17570a84f2bc21fce", "bob": "aade1207dd85ecd283272e7b69c078d5fae75b6e141f7649ad21962042d643512c28a2dbdc12c7ba40eb704af920919511180c18f4d17e07d7f5acd49787224a" } } 

    our — , . their — . JSON :

     from pygost import gost3410 from pygost.gost34112012256 import GOST34112012256 CURVE = gost3410.GOST3410Curve( *gost3410.CURVE_PARAMS["GostR3410_2001_CryptoPro_A_ParamSet"] ) parser = argparse.ArgumentParser(description="GOSTIM") parser.add_argument( "--keys-gen", action="store_true", help="Generate JSON with our new keypair", ) parser.add_argument( "--keys", default="keys.json", required=False, help="JSON with our and their keys", ) parser.add_argument( "--bind", default="::1", help="Address to listen on", ) parser.add_argument( "--port", type=int, default=6666, help="Port to listen on", ) args = parser.parse_args() if args.keys_gen: prv_raw = urandom(32) pub = gost3410.public_key(CURVE, gost3410.prv_unmarshal(prv_raw)) pub_raw = gost3410.pub_marshal(pub) print(json.dumps({ "our": {"prv": hexenc(prv_raw), "pub": hexenc(pub_raw)}, "their": {}, })) exit(0) # Parse and unmarshal our and their keys {{{ with open(args.keys, "rb") as fd: _keys = json.loads(fd.read().decode("utf-8")) KEY_OUR_SIGN_PRV = gost3410.prv_unmarshal(hexdec(_keys["our"]["prv"])) _pub = hexdec(_keys["our"]["pub"]) KEY_OUR_SIGN_PUB = gost3410.pub_unmarshal(_pub) KEY_OUR_SIGN_PUB_HASH = OctetString(GOST34112012256(_pub).digest()) for peer_name, pub_raw in _keys["their"].items(): _pub = hexdec(pub_raw) KEYS[GOST34112012256(_pub).digest()] = { "name": peer_name, "pub": gost3410.pub_unmarshal(_pub), } # }}} 

    34.10 — . 256- 256- . PyGOST , , (urandom(32)) , gost3410.prv_unmarshal(). , gost3410.public_key(). 34.10 — , , gost3410.pub_marshal().

    JSON , , , , gost3410.pub_unmarshal(). , . -256 gost34112012256.GOST34112012256(), hashlib -.

    ? , : cookie (128- ), 34.10, VKO .

      395 async def initiator(host, port): 396 _id = repr((host, port)) 397 logging.info("%s: dialing", _id) 398 reader, writer = await asyncio.open_connection(host, port) 399 # Generate our ephemeral public key and cookie, send Handshake 0 message {{{ 400 cookie_our = Cookie(urandom(16)) 401 prv = gost3410.prv_unmarshal(urandom(32)) 402 pub_our = gost3410.public_key(CURVE, prv) 403 pub_our_raw = PubKey(gost3410.pub_marshal(pub_our)) 404 writer.write(Msg(("handshake0", MsgHandshake0(( 405 ("cookieInitiator", cookie_our), 406 ("pubKeyInitiator", pub_our_raw), 407 )))).encode()) 408 # }}} 409 await writer.drain() 

    • Msg ;
    • handshake1;
    • ;
    • TBE .

      423 logging.info("%s: got %s message", _id, msg.choice) 424 if msg.choice != "handshake1": 425 logging.warning("%s: unexpected message, disconnecting", _id) 426 writer.close() 427 return 428 # }}} 429 msg_handshake1 = msg.value 430 # Validate Handshake message {{{ 431 cookie_their = msg_handshake1["cookieResponder"] 432 pub_their_raw = msg_handshake1["pubKeyResponder"] 433 pub_their = gost3410.pub_unmarshal(bytes(pub_their_raw)) 434 ukm_raw = bytes(msg_handshake1["ukm"]) 435 ukm = ukm_unmarshal(ukm_raw) 436 key_session = kek_34102012256(CURVE, prv, pub_their, ukm, mode=2001) 437 kdf = Hkdf(None, key_session, hash=GOST34112012256) 438 key_handshake1_mac_identity = kdf.expand(b"handshake1-mac-identity") 439 key_handshake1_enc = kdf.expand(b"handshake1-enc") 440 key_handshake1_mac = kdf.expand(b"handshake1-mac") 

    UKM 64- (urandom(8)), , gost3410_vko.ukm_unmarshal(). VKO 34.10-2012 256- gost3410_vko.kek_34102012256() (KEK — key encryption key).

    256- . HKDF . GOST34112012256 hashlib , Hkdf . ( Hkdf) , - . kdf.expand() 256-, .

    TBE TBS :

    • MAC ;
    • ;
    • TBE ;
    • ;
    • MAC ;
    • TBS , cookie . .

      441 try: 442 peer_name = validate_tbe( 443 msg_handshake1, 444 key_handshake1_mac_identity, 445 key_handshake1_enc, 446 key_handshake1_mac, 447 cookie_our, 448 cookie_their, 449 pub_their_raw, 450 ) 451 except ValueError as err: 452 logging.warning("%s: %s, disconnecting", _id, err) 453 writer.close() 454 return 455 # }}} 128 def validate_tbe( 129 msg_handshake: Union[MsgHandshake1, MsgHandshake2], 130 key_mac_identity: bytes, 131 key_enc: bytes, 132 key_mac: bytes, 133 cookie_their: Cookie, 134 cookie_our: Cookie, 135 pub_key_our: PubKey, 136 ) -> str: 137 ciphertext = bytes(msg_handshake["ciphertext"]) 138 mac_tag = mac(GOST3412Kuznechik(key_mac).encrypt, KUZNECHIK_BLOCKSIZE, ciphertext) 139 if not compare_digest(mac_tag, bytes(msg_handshake["ciphertextMac"])): 140 raise ValueError("invalid MAC") 141 plaintext = ctr( 142 GOST3412Kuznechik(key_enc).encrypt, 143 KUZNECHIK_BLOCKSIZE, 144 ciphertext, 145 8 * b"\x00", 146 ) 147 try: 148 tbe, _ = HandshakeTBE().decode(plaintext) 149 except ASN1Error: 150 raise ValueError("can not decode TBE") 151 key_sign_pub_hash = bytes(tbe["identity"]) 152 peer = KEYS.get(key_sign_pub_hash) 153 if peer is None: 154 raise ValueError("unknown identity") 155 mac_tag = mac( 156 GOST3412Kuznechik(key_mac_identity).encrypt, 157 KUZNECHIK_BLOCKSIZE, 158 key_sign_pub_hash, 159 ) 160 if not compare_digest(mac_tag, bytes(tbe["identityMac"])): 161 raise ValueError("invalid identity MAC") 162 tbs = HandshakeTBS(( 163 ("cookieTheir", cookie_their), 164 ("cookieOur", cookie_our), 165 ("pubKeyOur", pub_key_our), 166 )) 167 if not gost3410.verify( 168 CURVE, 169 peer["pub"], 170 GOST34112012256(tbs.encode()).digest(), 171 bytes(tbe["signature"]), 172 ): 173 raise ValueError("invalid signature") 174 return peer["name"] 

    , 34.13-2015 34.12-2015. , MAC-. PyGOST gost3413.mac(). ( ), , , . hardcode- ? 34.12-2015 128- , 64- — 28147-89, .

    gost.3412.GOST3412Kuznechik(key) .encrypt()/.decrypt() , 34.13 . MAC : gost3413.mac(GOST3412Kuznechik(key).encrypt, KUZNECHIK_BLOCKSIZE, ciphertext). MAC- (==) , , , , BEAST TLS. Python hmac.compare_digest .

    . , , . 34.13-2015 : ECB, CTR, OFB, CBC, CFB. . , ( CCM, OCB, GCM ) — MAC. (CTR): , , , ( CBC, ).

    .mac(), .ctr() : ciphertext = gost3413.ctr(GOST3412Kuznechik(key).encrypt, KUZNECHIK_BLOCKSIZE, plaintext, iv). , . ( ), . handshake .

    gost3410.verify() : ( GOSTIM ), ( , , ), 34.11-2012 .

    , handshake2 , , : , .…

      456 # Prepare and send Handshake 2 message {{{ 457 tbs = HandshakeTBS(( 458 ("cookieTheir", cookie_their), 459 ("cookieOur", cookie_our), 460 ("pubKeyOur", pub_our_raw), 461 )) 462 signature = gost3410.sign( 463 CURVE, 464 KEY_OUR_SIGN_PRV, 465 GOST34112012256(tbs.encode()).digest(), 466 ) 467 key_handshake2_mac_identity = kdf.expand(b"handshake2-mac-identity") 468 mac_tag = mac( 469 GOST3412Kuznechik(key_handshake2_mac_identity).encrypt, 470 KUZNECHIK_BLOCKSIZE, 471 bytes(KEY_OUR_SIGN_PUB_HASH), 472 ) 473 tbe = HandshakeTBE(( 474 ("identity", KEY_OUR_SIGN_PUB_HASH), 475 ("signature", OctetString(signature)), 476 ("identityMac", MAC(mac_tag)), 477 )) 478 tbe_raw = tbe.encode() 479 key_handshake2_enc = kdf.expand(b"handshake2-enc") 480 key_handshake2_mac = kdf.expand(b"handshake2-mac") 481 ciphertext = ctr( 482 GOST3412Kuznechik(key_handshake2_enc).encrypt, 483 KUZNECHIK_BLOCKSIZE, 484 tbe_raw, 485 8 * b"\x00", 486 ) 487 mac_tag = mac( 488 GOST3412Kuznechik(key_handshake2_mac).encrypt, 489 KUZNECHIK_BLOCKSIZE, 490 ciphertext, 491 ) 492 writer.write(Msg(("handshake2", MsgHandshake2(( 493 ("ciphertext", OctetString(ciphertext)), 494 ("ciphertextMac", MAC(mac_tag)), 495 )))).encode()) 496 # }}} 497 await writer.drain() 498 logging.info("%s: session established: %s", _id, peer_name) 

    , ( , , ), MAC-:

      499 # Run text message sender, initialize transport decoder {{{ 500 key_initiator_enc = kdf.expand(b"transport-initiator-enc") 501 key_initiator_mac = kdf.expand(b"transport-initiator-mac") 502 key_responder_enc = kdf.expand(b"transport-responder-enc") 503 key_responder_mac = kdf.expand(b"transport-responder-mac") ... 509 asyncio.ensure_future(msg_sender( 510 peer_name, 511 key_initiator_enc, 512 key_initiator_mac, 513 writer, 514 )) 515 encrypter = GOST3412Kuznechik(key_responder_enc).encrypt 516 macer = GOST3412Kuznechik(key_responder_mac).encrypt 517 # }}} 519 nonce_expected = 0 520 # Wait for test messages {{{ 521 while True: 522 data = await reader.read(MaxMsgLen) ... 530 msg, tail = Msg().decode(buf) ... 537 try: 538 await msg_receiver( 539 msg.value, 540 nonce_expected, 541 macer, 542 encrypter, 543 peer_name, 544 ) 545 except ValueError as err: 546 logging.warning("%s: %s", err) 547 break 548 nonce_expected += 1 549 # }}} 

    msg_sender , TCP-. nonce, . .

     async def msg_sender(peer_name: str, key_enc: bytes, key_mac: bytes, writer) -> None: nonce = 0 encrypter = GOST3412Kuznechik(key_enc).encrypt macer = GOST3412Kuznechik(key_mac).encrypt in_queue = IN_QUEUES[peer_name] while True: text = await in_queue.get() if text is None: break ciphertext = ctr( encrypter, KUZNECHIK_BLOCKSIZE, text.encode("utf-8"), long2bytes(nonce, 8), ) payload = MsgTextPayload(( ("nonce", Integer(nonce)), ("ciphertext", OctetString(ciphertext)), )) mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode()) writer.write(Msg(("text", MsgText(( ("payload", payload), ("payloadMac", MAC(mac_tag)), )))).encode()) nonce += 1 

    msg_receiver, :

     async def msg_receiver( msg_text: MsgText, nonce_expected: int, macer, encrypter, peer_name: str, ) -> None: payload = msg_text["payload"] if int(payload["nonce"]) != nonce_expected: raise ValueError("unexpected nonce value") mac_tag = mac(macer, KUZNECHIK_BLOCKSIZE, payload.encode()) if not compare_digest(mac_tag, bytes(msg_text["payloadMac"])): raise ValueError("invalid MAC") plaintext = ctr( encrypter, KUZNECHIK_BLOCKSIZE, bytes(payload["ciphertext"]), long2bytes(nonce_expected, 8), ) text = plaintext.decode("utf-8") await OUT_QUEUES[peer_name].put(text) 

    Conclusão


    GOSTIM ( , )! (-256 : 995bbd368c04e50a481d138c5fa2e43ec7c89bc77743ba8dbabee1fde45de120). , GoGOST , PyDERASN , NNCP , GoVPN , GOSTIM , GPLv3+ .

    , , , Python/Go-, « „“ .

Source: https://habr.com/ru/post/pt452200/


All Articles