切られたしっぽ

産業廃棄物の投下場所

Fakenet-NG 単体で証明書エラー無くHTTPS通信の復号をやらせるまで

TL;DR

  • Fakenet-NG を改造して、証明書のエラーなく任意の通信先に対して HTTPS の通信を行わせるための備忘録です
  • 実質 MitM をさせる Proxy を作るための話になります

はじめに

マルウェア解析の動的解析環境(Sandbox 環境)を構築する際、TLS/SSL を用いた HTTPS 通信をどのようにして取得して分析可能とするかというのは一つの至上命題です。マルウェアがC2サーバと通信する際に TLS/SSL を使う場合はもちろんそのやりとりの中身を記録したいですし、解析環境検知のためにメジャーなWebサービスに対して HTTPS でダミー通信を発する場合もあります。現代だとほぼ必須の要望といっても過言ではありません。

おそらく最も一般的な手法として用いられるのは、Burp などの proxy サーバを立てて通信を MitM (Man-In-The-Middle) させ、それを INetSim などのインターネットサービスシュミレータにリダイレクトさせることで確立したコネクションの中身を分析する方法ではないかと思われます。このようなマルウェアを動かす端末と proxy兼擬似C2サーバの2台を用いた構成は @soji256 氏や @masaomi346 氏のブログでも紹介されているため、記事を参考にすることで円滑に環境を構築することができるでしょう。
soji256.hatenablog.jp

qiita.com


しかしながら、この手法だと Virtual Machine が2台構成になってチョットリソースを多めにとるという点と、INetSim がかれこれ4年更新がないという点もあり現代風にアップデートした環境を用意したいです。筆者は個人PCで環境を用意しているということもあり、1VM構成かつ執筆時もアクティブなインターネットサービスシュミレータ Fakenet-NG を利用した構成を目指しました。

github.com


Fakenet-NG は mandiant のFLARE teamが作成したツールです。以前筆者のブログ記事でも紹介したので、この投稿を見てくれている方の多くはすでに使ったことがあるのではないかと思います。最近の記事を確認すると一目瞭然ですが、このツールはインターネットサービスシュミレータとしての機能だけではなく、DNSサービス, ルーティング, さらには Proxy としての機能も持たせることができる非常に高機能なツールです。おそらくですが、2024年5月現在インターネットサービスシュミレータとして最も多く活用されているツールではないかと、筆者は推測しています。根拠は、身の回りで使用者が多いからという非常に Low な確証度です。
cloud.google.com

これだけ見ると「マルウェアを動かす端末で Fakenet-NG を動かせば通信はすべてフォワーディングされて localhost にリダイレクトされるし、Proxy を挟むことで HTTPS 通信も覗き見できるし万事解決」と思うかもしれませんが、これだと1つ問題があります。 それはサーバ証明書に関する問題です。

Fakenet-NG は Proxy Listener の機能によって TLS/SSL の handshake を検出した場合 MitM を試みようとはしてくれますが、Fakenet-NG 側のサーバ証明書の変更を試みようとはしてくれません。これは該当部分のコードを見てもらえるとわかりますが、証明書が保存されているパスが直接指定されています。

Fakenet-NG のサーバ証明書読み取り部分のコード

当たり前の話ですが、マルウェアがアクセスしようとしたホスト名とサーバ証明書にある SAN (Subject Alternative Name) の値が異なると証明書の検証でエラーとなるため、自作のサーバ証明書を既存の Fakenet-NG に導入しようとすると、マルウェアHTTPS で通信する宛先すべてを知っている状態で証明書を作りコンフィグに設定しなければならなくなります。特に大量にC2サーバの宛先を保有していて次々と通信先を変えるようなマルウェアにとってはこの手法だと面倒です。


この課題を解決するために、筆者はコードに手を加えた Fakenet-NG をせっせこ作っていました。前置きが長くなってしまいましたが、Fakenet-NG 単体で MitM を円滑に成功させるための変更方法を備忘録として残すのが、この投稿の趣旨となります。また、筆者はネットワーク回りが素人なので、もっとスマートな方法を思いつく方がいらっしゃればコメント願います。

環境構築

はじめに、今回目指すものの全体像だけ先に提示します。やることを率直に書くと『TLS/SSL 通信が発生するたびに自作のCA秘密鍵と証明書から通信先を SAN に詰め込んだ自己署名証明書を作成し、それを参照させる』ようにします。概要を図示ししたものが以下です。

概要図

ここからは詳細な手順を書きます。

0. 事前準備

python のコードを直接いじるので、Fakenet-NGGithub からコード一式を clone し、No installation の項目に沿って必要なモジュールをインストールします。

コードが用意できたら、今度は CAの秘密鍵と証明書を作成して、CA証明書をマルウェアを動作させる端末に『信頼されたルート証明機関』としてインポートさせます。CA秘密鍵と証明書の作成については、筆者は以下の記事を参考にさせていただきました。図も大量にあって初心者にもわかりやすかったです。
zenn.dev

以下、ブログから必要な部分のコマンドだけ抜粋します。

# CA 秘密鍵作成
$ openssl genrsa -out localCA.key 2048

# CSR (Certificate Signing Request) 作成
$ openssl req -out localCA.csr -key localCA.key -new

# CA証明書作成
$ openssl x509 -req -days 3650 -signkey localCA.key -in localCA.csr -out localCA.crt

作成したCA秘密鍵と証明書は、Fakenet-NG を clone したフォルダから見て "\fakenet\listeners\ssl_utils\localCA.key""\fakenet\listeners\ssl_utils\localCA.crt" にそれぞれ配置してください。自己署名証明書を作成する際に利用します。また、CA証明書(localCA.crt)は端末でインポートさせることを忘れないでください。やり方がわからない方は、こちらのブログ記事などを参照していただくとうまくいくのではないかと思います。これで一旦の下準備完了です。

1. 通信先情報の取得

さてここから通信先に合わせた自己署名証明書を作るわけですが、何はともあれ通信先の情報をコネクションの起点となる Proxy Listener から抽出する必要があります。抽出に必要なデータを受け取っている Proxy Listener のコードは以下あたりです。

どうやら Proxy Listener が最初に受け取ったデータが、\fakenet\listeners\ssl_utils\ssl_detector.py にある looks_like_ssl関数で識別される場合にサーバ証明書を差し替えるルートにいくようです。looks_like_ssl の実装は tls handshake を捕まえているようなので、最小限のコード追加を考えた場合、Proxy Listener 視点では handshake から通信先に関する情報を見つける必要がありそうです。

ドメイン(ホスト名)

まずはマルウェアがホスト名で通信する場合です。handshake から通信先のホスト名を特定する方法がわからなかったので、TLS Protocol v1.3 の RFC を確認してみると、ホスト名で通信を行う場合 client hello の Extensions structure に server_name (type 0x0) としてその文字列が格納されているらしいです。

datatracker.ietf.org

実際に client hello をパースするスクリプトを公開してくれている方がいたので試してみると、client hello のデータがきれいにパースされました。なので、client hello のパーススクリプトに server_name 部分もパースして抽出させるようにします。 Extensions structure は別に RFC があるようなのでこちらを参考にデータ構造を見て、パースさせます。

datatracker.ietf.org

上記の公開されたスクリプトに server_name パースを付け加えたコードが以下です。

import struct
import binascii


_int16 = struct.Struct(">H")


def int16(b):
    """
    Return first two bytes of b as an unsigned integer.
    """
    return _int16.unpack(b[:2])[0]


def take(data, count):
    prefix = data[:count]
    data = data[count:]
    return prefix, data


def parse_hello(data):
    """
    Parse TLS 1.2 ClientHello from data, return the extensions as binary data
    or None if not found.
    Likely to raise struct.error or IndexError on error.
    """
    try: 
        header, data = take(data, 7)
        messageId, major, minor, l1, l2 = struct.unpack(">BBBHH", header)

        header2, data = take(data, 4)

        # random
        random, data = take(data, 32)
        # session identifier
        slen, data = take(data, 1)
        slen = slen[0]
        session, data = take(data, slen)
        # ciphers list
        clen, data = take(data, 2)
        clen = int16(clen)
        ciphers, data = take(data, clen)
        # compression methods (should always be an array of length 1, with one 0 element)
        compression_length, data = take(data, 1)
        compression_methods, data = take(data, compression_length[0])
        # extensions
        extlen, data = take(data, 2)
        extensions, data = take(data, int16(extlen))
        
        while extensions:
            _type, extensions = take(extensions, 2)
            length, extensions = take(extensions, 2)
            body, extensions = take(extensions, int16(length))
            if not _type == b"\x00\x00":
                continue
            name_type, body = take(body, 1)
            
            if not name_type == b"\x00":
                continue
            
            server_name_len, body = take(body, 2)
            if body[0] == 0:
                server_name_len, body = take(body, 2)
            return body

        return None
    except Exception:
        return None

かなり雑なのですが、これで TLS client hello に該当するデータを Proxy Listener が受け取った場合、通信先のホスト名が識別できるようになりました。今回は、これを \fakenet\listeners\ssl_utils\parse_client_hello.py に保存します。

IPアドレス

次にマルウェアIPアドレスで直接通信する場合です。そんなもん「IP の Dst header を見れば解決では?」と思うかもしれませんが、Proxy Listener が受け取るデータは IP header が落ちているのでその手は使えません。また、Extensions structure の server_name に必ずIPアドレスも入るとも限らないのでホスト名と同様の手法も使えません。

どうしたものかと Fakenet-NG のコードを読んでいると、どうやら Proxy Listener より上流にいる diverter が IP port forwarding をする際に管理している forwarding 用のテーブルを Proxy Listener にも渡していることがわかります。(IP forwarding 回りのコードを参照)

なので、純粋にfowarding 管理テーブルである ip_fwd_table から取得することで解決させます。コードは以下のような感じです。

ip_fw_table = self.server.diverter.ip_fwd_table
for fowarded, original in ip_fw_table.items():
    if f'TCP/{self.client_address[1]}' in fowarded:
        # self.client_address[1] がフォワード元の port 番号なので、
        # 合致する key があればそれに対応する value が forwarding 前の通信先
        create_cert(original)  

なお、ホスト名アクセスの場合は Fakenet-NG 内部の DNSサーバがすべて自身のIPアドレスを返すようになっているので、 forwarding 前のIPアドレスは自身のIPアドレスになっています。なのでホスト名でのアクセスの場合、同様の手法はとれません。全く違うアプローチをしなけばならないのは面倒ですね。

2. 自己署名証明書の作成

通信先の情報が取得できたので、あとはその情報を SAN に設定したサーバ証明書を通信の度に毎回作ります。大量に証明書ができるのですが、いい方法がこれしか思いつきませんでした。しかし解析環境の場合作られる証明書はせいぜい100程度だろうと考え、たいした容量にもならないと思いこの方法で妥協します。サーバ証明書の作り方については、以下の記事を参考にコードを流用させていただきました。

zenn.dev

最終的に作成したコードは、以下のようなシンプルなつくりです。

  • \fakenet\listeners\ssl_utils\certs にすでに一度作成した通信先のための証明書がないかを確認する
  • なかった場合は、引数に与えらえた通信先を SAN に設定して、事前準備で作った CA秘密鍵で署名したサーバ証明書を作る
  • ファイルを \fakenet\listeners\ssl_utils\certs 配下に置き、サーバ証明書秘密鍵と証明書のパスを tuple で返す
import os
import logging

from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.x509 import load_pem_x509_certificate
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption, load_pem_private_key
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import hashes
from datetime import datetime, timedelta, timezone

CERTS_DIR = os.path.join(os.path.dirname(__file__), 'certs')
logger = logging.getLogger("CreateCerts")
logger.setLevel(logging.DEBUG)


def create_new_cert(hostinfo, overwrite=False):
    if not os.path.exists(CERTS_DIR):
        os.makedirs(CERTS_DIR)
    
    if isinstance(hostinfo, bytes):
        hostinfo = hostinfo.decode()
    
    key_path = os.path.join(CERTS_DIR, f'privkey_{hostinfo.replace(".","-")}.pem')
    cert_path = os.path.join(CERTS_DIR, f'server_{hostinfo.replace(".","-")}.pem')


    if (os.path.exists(key_path) and os.path.exists(cert_path)) and not overwrite:
        return cert_path, key_path

    try: 
        with open(os.path.join(os.path.dirname(__file__), 'localCA.crt'), 'rb') as fc:
            ca_cert = load_pem_x509_certificate(fc.read())

        with open(os.path.join(os.path.dirname(__file__), 'localCA.key'), 'rb') as fk:
            ca_key = load_pem_private_key(fk.read(),password=None)
            
        server_private_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=4096,
            backend=default_backend()
        )

        now = datetime.now(timezone.utc)
        csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name([
            x509.NameAttribute(NameOID.COUNTRY_NAME, "JP"),
            x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Tokyo"),
            x509.NameAttribute(NameOID.LOCALITY_NAME, "Dokoka"),
            x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Fakenet"),
            x509.NameAttribute(NameOID.COMMON_NAME, "localhost"),
        ])).add_extension(
            x509.SubjectAlternativeName([x509.DNSName(hostinfo)]),
            critical=False
        ).sign(server_private_key, hashes.SHA256(), default_backend())

        server_cert = x509.CertificateBuilder().subject_name(
            csr.subject
        ).issuer_name(
            ca_cert.subject
        ).public_key(
            csr.public_key()
        ).serial_number(
            x509.random_serial_number()
        ).not_valid_before(
            now - timedelta(days=365)
        ).not_valid_after(
            now + timedelta(days=365)
        ).add_extension(
            x509.SubjectAlternativeName([x509.DNSName(hostinfo)]),
            critical=False
        ).add_extension(
            x509.BasicConstraints(ca=False, path_length=None), critical=True,
        ).sign(ca_key, hashes.SHA256(), default_backend())
     

        with open(key_path, 'wb') as k:
            k.write(server_private_key.private_bytes(encoding=Encoding.PEM, format=PrivateFormat.TraditionalOpenSSL, encryption_algorithm=NoEncryption()))

        with open(cert_path, 'wb') as c:
            c.write(server_cert.public_bytes(Encoding.PEM))
        
        logger.info(f'Create New certs for {hostinfo}: {key_path}')
            
        return cert_path, key_path
        
    except Exception:
        return None, None
    

これを \fakenet\listeners\ssl_utils\parse_client_hello.py に配置して、Proxy Listener から呼べるようにします。

3. Proxy Listener への組み込み

必要な機能ができたので、あとは Proxy Listener 側から作成した証明書を参照してあげます。\fakenet\listeners\ProxyListener.py から上記2つのpythonファイルを import して、ssl handshake 検出後にホスト名/IPアドレスを抽出して自己署名証明書を作成、パスを上書きするだけです。変更部分のコードは以下になります。

(snip.)
from .ssl_utils import create_certs
from .ssl_utils import parse_client_hello
(snip.)

class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
(snip.)
    def handle(self):
        (snip.)
        is_ssl_encrypted = 'No'

        if data:

            if ssl_detector.looks_like_ssl(data):
                is_ssl_encrypted = 'Yes'
                self.server.logger.debug('SSL detected')
                hostinfo = parse_client_hello.parse_hello(data)
                if hostinfo:
                    certfile_path, keyfile_path = create_certs.create_new_cert(hostinfo)

                else:
                    ip_fw_table = self.server.diverter.ip_fwd_table
                    for fowarded, original in ip_fw_table.items():
                        if f'TCP/{self.client_address[1]}' in fowarded:
                            hostinfo = original
                            certfile_path, keyfile_path = create_certs.create_new_cert(hostinfo)
                            break
(snip.)
                listener_sock.start()
                # remote_sock.setblocking(0)

                # ssl has no 'peek' option, so we need to process the first
                # packet that is already consumed from the socket
                if ssl_remote_sock:
                    ssl_remote_sock.setblocking(0)
                    remote_q.put(data)
                else:
                    remote_sock.setblocking(0)
                
                while True:
                    readable, writable, exceptional = select.select(
                            [ssl_remote_sock if ssl_remote_sock else remote_sock], [], [], .001)
(snip.)

ちなみに remote_sock.setblocking(0) 周辺のコード (元の該当コードで言うとこのあたり) を変更しているのは、TLS 検出した場合は ssl.wrap_socket で remote_sock の socket が ssl_remote_sock に wrap されてしまうため、ここでエラーが発生してしまうためでした。今回いろいろ変更を加えてしまったので平常時が動いていたのか定かではないですが、おま環起因ではない場合は HTTPS 通信を Proxy Listener が吸っただけで必ずエラーになるので、改造前の状態だと全く機能していないことになります。。。(まさかね??)

4. テスト

これで必要なパーツがそろったので、証明書エラーなしで通信ができるかテストをしてみます。HTTPS の通信が Proxy Listener を必ず通るように \fakenet\configs\default.ini に書かれた HTTPListener443 の Hidden 設定を True にして Fakenet-NG を起動します。

[HTTPListener443]
Enabled:     True
Port:        443
Protocol:    TCP
Listener:    HTTPListener
UseSSL:      Yes
Webroot:     defaultFiles/
Timeout:     10
DumpHTTPPosts: Yes
DumpHTTPPostsFilePrefix: http
Hidden:      True

Fakenet-NG の起動コマンドは以下です。

$ python -m fakenet.fakenet

起動したら、Webブラウザからホスト名ベースでのアクセスを、powershell からIPアドレスベースでのアクセスを試みてみましょう。エラーなくアクセスできたことが確認できると思います。*1

自己署名証明書の動作テスト


また、Fakenet-NG は終了時にこれまで記録していた通信を pcap ファイルにダンプして Fakenet-NG のホームディレクトリに保存してくれています。pcap を確認してみると、先ほど試行した HTTPS の通信が復号された状態で中身を確認することができました。

ダンプされたpcapデータの確認

これにて、Fakenet-NG 単体でやりたかったことができるようになったので満足です。

おわりに

ほぼほぼ MitM 攻撃の解説みたいな記事になってしました。内容も内容なので、おそらく世界で1, 2人くらいしか参考にならない投稿なような気がしています。

現状だと色々粗があるのでそのままでは PR 書けませんが、もしいろんな人に需要がありそうなら fork したプロジェクトを公開するか頑張ってリファクタして PR 出そうかなと思います。それでは。

*1: curlpython でもCAを参照させるために、環境変数 CURL_CA_BUNDLE に "\fakenet\listeners\ssl_utils\localCA.crt" へのfull pathを入れてあげてください