包含关键字 Crypto 的文章

题目一

地址:https://www.coursera.org/learn/crypto/assignment-submission/KZ9js/week-1-programming-assignment-optional/attempt

​ 题目使用同一个流密码密钥加密得到 10 个密文,加密方式是逐字节异或,目标是通过这 10 个密文得到密钥以解出第 11 个密文。

​ 已知消息 M、密钥 K,异或加密的过程是

$$ C=M\oplus K $$

得到密文 C。异或是自反的运算,所以解密与加密过程相同

$$ M=C\oplus K $$

​ 对于同一个密钥 K 加密的两个密文 C_1、C_2,如果将两者异或可以得到

$$ B=C_1\oplus C_2=M_1\oplus K\oplus M_2\oplus K=M_1\oplus M_2 $$

字节流B是两消息逐字节异或结果。

​ 我们首先导入这些密文和加密方法

msg_1 = bytes.fromhex('315c4eeaa8b5f8aaf9174145bf43e1784b8fa00dc71d885a804e5ee9fa40b16349c146fb778cdf2d3aff021dfff5b403b510d0d0455468aeb98622b137dae857553ccd8883a7bc37520e06e515d22c954eba5025b8cc57ee59418ce7dc6bc41556bdb36bbca3e8774301fbcaa3b83b220809560987815f65286764703de0f3d524400a19b159610b11ef3e')
msg_2 = bytes.fromhex('234c02ecbbfbafa3ed18510abd11fa724fcda2018a1a8342cf064bbde548b12b07df44ba7191d9606ef4081ffde5ad46a5069d9f7f543bedb9c861bf29c7e205132eda9382b0bc2c5c4b45f919cf3a9f1cb74151f6d551f4480c82b2cb24cc5b028aa76eb7b4ab24171ab3cdadb8356f')
msg_3 = bytes.fromhex('32510ba9a7b2bba9b8005d43a304b5714cc0bb0c8a34884dd91304b8ad40b62b07df44ba6e9d8a2368e51d04e0e7b207b70b9b8261112bacb6c866a232dfe257527dc29398f5f3251a0d47e503c66e935de81230b59b7afb5f41afa8d661cb')
msg_4 = bytes.fromhex('32510ba9aab2a8a4fd06414fb517b5605cc0aa0dc91a8908c2064ba8ad5ea06a029056f47a8ad3306ef5021eafe1ac01a81197847a5c68a1b78769a37bc8f4575432c198ccb4ef63590256e305cd3a9544ee4160ead45aef520489e7da7d835402bca670bda8eb775200b8dabbba246b130f040d8ec6447e2c767f3d30ed81ea2e4c1404e1315a1010e7229be6636aaa')
msg_5 = bytes.fromhex('3f561ba9adb4b6ebec54424ba317b564418fac0dd35f8c08d31a1fe9e24fe56808c213f17c81d9607cee021dafe1e001b21ade877a5e68bea88d61b93ac5ee0d562e8e9582f5ef375f0a4ae20ed86e935de81230b59b73fb4302cd95d770c65b40aaa065f2a5e33a5a0bb5dcaba43722130f042f8ec85b7c2070')
msg_6 = bytes.fromhex('32510bfbacfbb9befd54415da243e1695ecabd58c519cd4bd2061bbde24eb76a19d84aba34d8de287be84d07e7e9a30ee714979c7e1123a8bd9822a33ecaf512472e8e8f8db3f9635c1949e640c621854eba0d79eccf52ff111284b4cc61d11902aebc66f2b2e436434eacc0aba938220b084800c2ca4e693522643573b2c4ce35050b0cf774201f0fe52ac9f26d71b6cf61a711cc229f77ace7aa88a2f19983122b11be87a59c355d25f8e4')
msg_7 = bytes.fromhex('32510bfbacfbb9befd54415da243e1695ecabd58c519cd4bd90f1fa6ea5ba47b01c909ba7696cf606ef40c04afe1ac0aa8148dd066592ded9f8774b529c7ea125d298e8883f5e9305f4b44f915cb2bd05af51373fd9b4af511039fa2d96f83414aaaf261bda2e97b170fb5cce2a53e675c154c0d9681596934777e2275b381ce2e40582afe67650b13e72287ff2270abcf73bb028932836fbdecfecee0a3b894473c1bbeb6b4913a536ce4f9b13f1efff71ea313c8661dd9a4ce')
msg_8 = bytes.fromhex('315c4eeaa8b5f8bffd11155ea506b56041c6a00c8a08854dd21a4bbde54ce56801d943ba708b8a3574f40c00fff9e00fa1439fd0654327a3bfc860b92f89ee04132ecb9298f5fd2d5e4b45e40ecc3b9d59e9417df7c95bba410e9aa2ca24c5474da2f276baa3ac325918b2daada43d6712150441c2e04f6565517f317da9d3')
msg_9 = bytes.fromhex('271946f9bbb2aeadec111841a81abc300ecaa01bd8069d5cc91005e9fe4aad6e04d513e96d99de2569bc5e50eeeca709b50a8a987f4264edb6896fb537d0a716132ddc938fb0f836480e06ed0fcd6e9759f40462f9cf57f4564186a2c1778f1543efa270bda5e933421cbe88a4a52222190f471e9bd15f652b653b7071aec59a2705081ffe72651d08f822c9ed6d76e48b63ab15d0208573a7eef027')
msg_10 = bytes.fromhex('466d06ece998b7a2fb1d464fed2ced7641ddaa3cc31c9941cf110abbf409ed39598005b3399ccfafb61d0315fca0a314be138a9f32503bedac8067f03adbf3575c3b8edc9ba7f537530541ab0f9f3cd04ff50d66f1d559ba520e89a2cb2a83')

msg_target = bytes.fromhex('32510ba9babebbbefd001547a810e67149caee11d945cd7fc81a05e9f85aac650e9052ba6a8cd8257bf14d13e6f0a803b54fde9e77472dbff89d71b57bddef121336cb85ccb8f3315f4b52e301d16e9f52f904')

msgs = [msg_1, msg_2, msg_3, msg_4, msg_5, msg_6, msg_7, msg_8, msg_9, msg_10]

def bytesxor(a, b):
    if len(a) > len(b):
       return bytes([x ^ y for (x, y) in zip(a[:len(b)], b)])
    else:
       return bytes([x ^ y for (x, y) in zip(a, b[:len(a)])])

根据前文理论,我们先尝试异或 msg_1msg_2,结果是

b'\x12\x10L\x06\x13NW\t\x14\x0f\x10O\x02R\x1b\n\x04B\x02\x0cM\x07\x0b\x18OH\x15T\x1f\x08\x00HN\x1e\x02A\x06\x1d\x06MT\x0b\n\x02\x02\x10\x19E\x10\x16MO:\x00SC\x00NC\x0e\x1e\x1d\nRF\x12\x17\x1b\x01\x17\x00\x1b\x0eEC\x1c\x0c\x1d\x16\nR\r\x11tN\x19\x06\x1a\x11M\x0eU\x17O\x08NT7\x14\x05\x0b\x17CST\x1bH\x07\x0e\x00\x0eM'

我们不使用十六进制格式输出,是因为需要观察其中有一些大写或小写英文字母。根据题目,明文是 ASCII 英语句子,其中大部分符号都是大小写英文字母和空格。我们需要知道一个特殊的规律

$$ ASCII(大/小写字母)\oplus ASCII(空格)=ASCII(小/大写字母) $$

英文字母的 ASCII 值与空格的 ASCII 值异或得到的 ASCII 值相当于切换原字母的大小写

根据以上规律,我们十个中的一个密文 C,与其余九个密文逐字节异或,得到九个字节流 B_i,观察其中相同索引 j(位置)的字节 b_k,如果这九个字节几乎都是英文字母,那么我们选取的密文 C 的索引 j 处的字节 C[j] 的明文就大概是空格。此时我们有

$$ C[j]\oplus ASCII(空格)=K[j] $$

从而有可能还原出一个字节的密钥。如果密文足够多足够长,重复上述过程可以还原完整的密钥。

key = [0] * len(msg_7) # `msg_7` is the longest ciphertext.

def isalpha(b):
    return (ord('a') <= b <= ord('z')) or (ord('A') <= b <= ord('Z'))

for i, msg_i in enumerate(msgs):
    may_not_space = [0] * len(msg_i) # Count cases when b_k is not alphabetic.
    for j, msg_j in enumerate(msgs):
        if i != j:
            xored = bytesxor(msg_i, msg_j)
            for k, xb in enumerate(xored):
                if (not isalpha(xb)) and xb != 0:
                    may_not_space[k] += 1
    
    for j, may_not in enumerate(may_not_space):
        if may_not <= 2: # If almost all b_k are alphabetic
            key_byte = msg_i[j] ^ ord(' ')
            if key[j] == 0:
                key[j] = key_byte
                continue
            
            if key[j] != key_byte: # Detect contradiction. Do more checks.
                reliable = True
                for m in msgs:
                    if j >= len(m):
                        continue
                    byte = m[j] ^ key_byte
                    if not isalpha(byte) and byte != ord(' '):
                        reliable = False
                        break
                if reliable:
                    key[j] = key_byte

print(f'Recovered key: {key}')
print(f'Target message: {bytesxor(bytes(key), msg_target)}'')
# for msg in msgs:
#     print(bytesxor(msg, bytes(key)))

题目二

地址:https://www.cryptopals.com/sets/1

1. Convert hex to base64

from base64 import b64encode

print(b64encode(bytes.fromhex('49276d206b696c6c696e6720796f757220627261696e206c696b65206120706f69736f6e6f7573206d757368726f6f6d')).decode())

2. Fixed XOR

def bytesxor(a, b):
    if len(a) > len(b):
       return bytes([x ^ y for (x, y) in zip(a[:len(b)], b)])
    else:
       return bytes([x ^ y for (x, y) in zip(a, b[:len(a)])])

print(bytesxor(bytes.fromhex('1c0111001f010100061a024b53535009181c'), bytes.fromhex('686974207468652062756c6c277320657965')).hex())

3. Single-byte XOR cipher

突发奇想,这里我用《动物森友会》给信件评分的算法来判断哪个结果最正确。(还真没毛病

def get_score(message: str) -> int:
    """*Animal Crossing* message scoring algorithm."""
    t = set("abl abo abr abs acc ach acr act add adm adv aer aff afr aft aga age ago ahe air ali all alm alo alr als alt alw am  ame amo and ang ani ano ans any apa app apr are arg arm arr art asi ask asl ate atm att aud aug aut ave avo awa cak cal cam can cap car cas cat cau cen cer cha che chi cho chi chu cir cit cla cle cli clo coa cof coi col com con coo cop cor cos cou cov cow cre cri cro cry cup cur cus cut bab bac bad bag bal ban bas bat be  bea bec bed bee bef beg beh bel bes bet bey bic big bik bil bir bit bla ble blo blu boa bod bon boo bor bot bou box boy bra bre bri bro bui bur bus but buy by  eac ear eas eat edu eff egg eig eit ele els emp end ene eng enj eno ent equ err esp eur eve exa exc exe exp eye dad dai dam dan dar dat dau day dea dec dee def deg del dem den dep des det dev dic did die dif dig din dir dis div do  doc doe dog dol don doo dou dow doz dra dre dri dro dru dry due dur dus dut gai gam gar gas gat gav gen ger get gir giv gla go  god goi gon goo got gov gra gre gro gua gue gui gun fac fai fal fam far fas fat fea feb fed fee fel few fie fif fig fil fin fir fis fiv fix fla fle fli flo fly fol foo for fou fra fre fri fro fru ful fun fut i   ice ide if  ima imm imp in  inc ind inf ins int inv iro is  isl it  its hab had hai hal han hap har has hat hav he  hea hei hel her hi  hid hig hil him hir his hit hol hom hon hop hor hos hot hou how hum hun hur hus kee kep key kic kil kin kit kne kni kno kab kad kai kak kan kar kas kat kau kaw kay kaz kea ked kef keg ken kes ket kev kib kie kif kig kik kim kin kis kit kiv koc kon koo kos kot kou kov kow kun kyi kac kad kag kai kaj kak kan kap kar kat kay ke  kea ked kee kem ken kes ket kid kig kil kin kis kod kom kon koo kor kos kot kou kov kuc kum kus ky  kys kam kar kat kea kec kee kei kev kew kex kic kig kin ko  kob koi kon koo kor kos kot kov kow kum kbj k'c kct kf  kff kft kh  kil kka kld kn  knc kne knl kpe kpi kpp kr  kra krd kth kur kut kve kwn jan jap job joi jud jul jum jun jus qua que qui pac pag pai pap par pas pat pay pea pen peo per pho pic pie pin pip pla ple poc poi pol poo pop pos pot pou pow pra pre pri pro pub pul pup pur pus put sad saf sai sal sam san sat sav saw say sce sch sci sco sea sec see sel sen sep ser set sev sex sha she shi sho shu sic sid sig sil sim sin sis sit six siz ski sky sle sli slo sma sme smi smo sno so  soa soc sof soi sol som son soo sor sou spa spe spi spo spr squ sta ste sti sto str stu sty sub suc sud suf sug sum sun sup sur swa swe swi swu sys rac rad rai ran rap rat rea rec red ref reg rel rem rep req res ret ric rid rig rin ris riv roa roc rod rol roo ros rou row rul run rus una unc und uni unl unt up  upo us  use usu tab tak tal tas tau tax tea tee tel tem ten ter tes tha the thi tho thr thu tic tie til tim tir tit to  tod tog tol tom ton too top tor tot tou tow tra tre tri tro tru try tue tur tv  twe twi two tyi typ val var veg ver vie vil vis voi vol vot vai vak val van var vas vat vav vay ve  vea ved vee vei vel ven ver ves vet vha vhe vhi vho vhy vid vif vil vin vir vis vit viv vok vom von voo vor vou vri vro vma yar yea yel yen yes yet you zer".split())
    s = 0
    if message and message[-1] in '.?!':
        s += 20
    for i, c in enumerate(message):
        if c in '.?!':
            for j in range(i+1, min(i+4, len(message))):
                if message[j].isupper():
                    s += 10
                    break
                elif message[j].isalpha():
                    s -= 10
                    break
    s += sum(3 for w in message.split() if len(cw :=
             ''.join(c for c in w if c.isalpha()).lower()) >= 3 and cw[:3] in t)
    for c in message:
        if not c.isspace():
            s += 20 if c.isupper() else -10
            break
    for i in range(len(message)-2):
        if message[i].isalpha() and message[i] == message[i+1] == message[i+2]:
            s -= 50
            break
    sp, nsp = message.count(' '), len(message) - message.count(' ')
    s += -20 if nsp == 0 or (sp * 100 // nsp if nsp else 0) < 20 else 20
    if len(message) > 75:
        c = 0
        for ch in message:
            c = 0 if ch in '.?!' else c + 1
            if c == 75:
                s -= 150
                break
    s -= sum(20 for i in range(0, len(message), 32)
             if ' ' not in message[i:i+32] and len(message[i:i+32]) == 32)
    return s


cipher = bytes.fromhex(
    '1b37373331363f78151b7f2b783431333d78397828372d363c78373e783a393b3736')
results = list()
for c in range(256):
    try:
        res = bytesxor((chr(c) * len(cipher)).encode(), cipher).decode()
        results.append((res, get_score(res)))
    except UnicodeDecodeError:
        pass

best = sorted(results, key=lambda x: x[1])[-1][0]
print(best)

# Cooking MC's like a pound of bacon

4. Detect single-character XOR

with open('4.txt', 'r') as file:
    ciphers = file.readlines()

for original_cipher in ciphers:
    results = list()
    cipher = bytes.fromhex(original_cipher)
    for c in range(256):
        try:
            res = bytesxor((chr(c) * len(cipher)).encode(), cipher).decode()
            results.append((res, get_score(res)))
        except UnicodeDecodeError:
            pass
    if len(results) == 0:
        continue

    best = sorted(results, key=lambda x: x[1])[-1]
    if best[1] > 40:
        print(f'{original_cipher.strip()} -> {best[0]}')

# 7b5a4215415d544115415d5015455447414c155c46155f4058455c5b523f -> Now that the party is jumping

5. Implement repeating-key XOR

def repeating_key_xor_to_hex(msg: bytes, key: bytes) -> str:
    result_chars = []
    keylen = len(key)
    for i, b in enumerate(msg):
        result_chars.append(b ^ key[i % keylen])
    return bytes(result_chars).hex()

print(repeating_key_xor_to_hex(b"Burning 'em, if you ain't quick and nimble", b'ICE'))
print(repeating_key_xor_to_hex(b'I go crazy when I hear a cymbal', b'ICE'))

6. Break repeating-key XOR

一开始没有还原正确,检查了一下发现是 # 扰乱了解密评分。考虑到英文句子里极少有 #,所以遇到就扣 5 分。

from base64 import b64decode


def get_score(message: str):
    score = 0
    for c in message:
        if c.islower():
            score += 3
        if c.isupper():
            score += 1
        if c == ' ':
            score += 1
        if c == '#':
            score -= 5
    return score


def get_key(cipher: bytes) -> int:
    results = list()
    for c in range(256):
        try:
            res = bytesxor((chr(c) * len(cipher)).encode(), cipher).decode()
            results.append((c, get_score(res)))
        except UnicodeDecodeError:
            pass
    return sorted(results, key=lambda x: x[1])[-1][0]


def repeating_key_xor(msg: bytes, key: bytes) -> bytes:
    result_chars = []
    keylen = len(key)
    for i, b in enumerate(msg):
        result_chars.append(b ^ key[i % keylen])
    return bytes(result_chars)


def normalized_average_hd(data: bytes, unit_len: int) -> float:
    chunks = [data[i:i + unit_len] for i in range(0, len(data), unit_len)]
    num_chunks = len(chunks)
    total_hd = 0
    count = 0
    for i in range(num_chunks):
        for j in range(i + 1, num_chunks):
            if len(chunks[i]) == unit_len and len(chunks[j]) == unit_len:
                total_hd += sum((byte1 ^ byte2).bit_count()
                                for byte1, byte2 in zip(chunks[i], chunks[j]))
                count += 1
    return total_hd / (count * unit_len)


with open('6.txt', 'r') as file:
    cipher = b64decode(file.read())

results = list()
for l in range(2, 41):
    norm_hd = normalized_average_hd(cipher, l)
    results.append((l, norm_hd))
keylens = sorted(results, key=lambda x: x[1])

for k in range(1):
    keylen = keylens[k][0]
    print(f"Guessed key length: {keylen}")

    key_bytes = []
    for i in range(keylen):
        block = bytes([cipher[j * keylen + i]
                      for j in range(len(cipher) // keylen)])
        key_bytes.append(get_key(block))

    key = bytes(key_bytes)
    # print(repeating_key_xor(cipher, key).decode())
    print(f'Key: {key}')

# Guessed key length: 29
# Key: b'Terminator X: Bring the noise'

7. AES in ECB mode

import base64
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms

key = b"YELLOW SUBMARINE"
with open('7.txt', 'r') as f:
    encrypted_b64 = f.read()
encrypted_data = base64.b64decode(encrypted_b64)
cipher = Cipher(algorithms.AES(key)) # Default to ECB mode
decryptor = cipher.decryptor()
decrypted_data = decryptor.update(encrypted_data) + decryptor.finalize()

print(decrypted_data.decode())

8. Detect AES in ECB mode

print(max(open('8.txt').read().splitlines(), key=lambda x: len(c:=[x[i:i+32] for i in range(0, len(x), 32)]) - len(set(c))))

第一次尝试

使用 archlinux 的 p7zip PKGBULD,修改编译器为 afl-clang-lto(++)

编译时发现一个函数指针类型转换的报错,用 -Wno-cast-function-type-strict 参数来抑制。

准备了三个随便的 7z 文件,放进 input 文件夹直接开始。(@@ 表示输入文件目录,而不是从 stdin 输入)

AFL_SKIP_CPUFREQ=1 afl-fuzz -i input -o output -- ./7zr x @@

结果:我看也就娱乐 fuzz,效果一坨🔟。

第二次尝试

准备了很多只有一个或几个文件,大小仅 100+ 字节的 7z 文件作为输入。

AFL_SKIP_CPUFREQ=1 afl-fuzz -i input -o output -- ./7zr x @@ -y

结果:覆盖率很快达到和第一次同样水平。

第三次尝试

了解到 7z 格式有 CRC 校验,估计大多数 fuzz 输入都死在校验上了。patch 源码去除校验:(应该使用 FUZZING_BUILD_MODE_UNSAFE_FOR_PRODUCTION 宏)

static inline bool TestStartCrc(const Byte *p)
{
  (void)p; // 抑制 -Wunused-parameter
  return true;
}

if (CrcCalc(buffer2, nextHeaderSize_t) != nextHeaderCRC)
    ;

if (CrcCalc(data, unpackSize) != folders.FolderCRCs.Vals[i])
    ;

添加 -fsanitize=address -g 编译选项。添加 -so 运行选项,这样不会在目录里拉💩。

本来想加上 -si 参数从 stdin 输入压缩文件,应该可以大幅提升性能。但是 7z 格式竟然不支持,原因是 7z 有一部分文件头在文件末尾,解压前必须先读取。(意义不明...)

把整个 fuzzing 项目文件夹放在 tmpfs 里应该会更快吧,虽然 afl 官网教程推荐 ext2 + noatime。因为现在基本是在实验,暂时懒得配置 ext2 环境了。

AFL_USE_ASAN=1 AFL_SKIP_CPUFREQ=1 afl-fuzz -i input -o output -- ./7zr x @@ -y -so

结果:本来不抱什么希望,睡了个午觉起来竟然就真的收集到了很多 crashes,但都是 OOM,没什么实际意义。(用 KDE Ark 打开其中某个 input,直接 coredump 了。)

==589143==ERROR: AddressSanitizer: requested allocation size 0x207ffffffffffff (0x208000000001000 after adjustments for alignment, red zones etc.) exceeds maximum supported size of 0x10000000000 (thread T0)
    #0 0x6048688567e2 in operator new[](unsigned long) (/tmp/7zfuzz/7zr+0x1fa7e2) (BuildId: 8450a6b1d6712a80c42046efedb6d74eb798c38d)
    #1 0x604868c0e1e0 in CBuffer<unsigned char>::Alloc(unsigned long) /usr/src/debug/7zip/CPP/7zip/Bundles/Alone7z/../../Archive/7z/../../Common/../../Common/MyBuffer.h:72:18
    #2 0x604868c1f5bd in NArchive::N7z::CInArchive::ReadAndDecodePackedStreams(unsigned long, unsigned long&, CObjectVector<CBuffer<unsigned char>>&, ICryptoGetTextPassword*, bool&, bool&, UString&) /usr/src/debug/7zip/CPP/7zip/Bundles/Alone7z/../../Archive/7z/7zIn.cpp:1187:10
    #3 0x604868c280ce in NArchive::N7z::CInArchive::ReadDatabase2(NArchive::N7z::CDbEx&, ICryptoGetTextPassword*, bool&, bool&, UString&) /usr/src/debug/7zip/CPP/7zip/Bundles/Alone7z/../../Archive/7z/7zIn.cpp:1705:28
    #4 0x604868be4ddd in NArchive::N7z::CInArchive::ReadDatabase(NArchive::N7z::CDbEx&, ICryptoGetTextPassword*, bool&, bool&, UString&) /usr/src/debug/7zip/CPP/7zip/Bundles/Alone7z/../../Archive/7z/7zIn.cpp:1743:25
    #5 0x604868be4ddd in NArchive::N7z::CHandler::Open(IInStream*, unsigned long const*, IArchiveOpenCallback*) /usr/src/debug/7zip/CPP/7zip/Bundles/Alone7z/../../Archive/7z/7zHandler.cpp:708:30
    #6 0x604868dc6db4 in OpenArchiveSpec(IInArchive*, bool, IInStream*, unsigned long const*, IArchiveOpenCallback*, IArchiveExtractCallback*) /usr/src/debug/7zip/CPP/7zip/Bundles/Alone7z/../../UI/Common/OpenArchive.cpp:1599:3
    #7 0x604868dbc2ba in CArc::OpenStream2(COpenOptions const&) /usr/src/debug/7zip/CPP/7zip/Bundles/Alone7z/../../UI/Common/OpenArchive.cpp:2744:26
    #8 0x604868dc9229 in CArc::OpenStream(COpenOptions const&) /usr/src/debug/7zip/CPP/7zip/Bundles/Alone7z/../../UI/Common/OpenArchive.cpp:3024:3
    #9 0x604868dcb6c1 in CArc::OpenStreamOrFile(COpenOptions&) /usr/src/debug/7zip/CPP/7zip/Bundles/Alone7z/../../UI/Common/OpenArchive.cpp:3119:17
    #10 0x604868dcda76 in CArchiveLink::Open(COpenOptions&) /usr/src/debug/7zip/CPP/7zip/Bundles/Alone7z/../../UI/Common/OpenArchive.cpp:3295:28
    #11 0x604868dd277e in CArchiveLink::Open2(COpenOptions&, IOpenCallbackUI*) /usr/src/debug/7zip/CPP/7zip/Bundles/Alone7z/../../UI/Common/OpenArchive.cpp:3419:17
    #12 0x604868d49f6d in CArchiveLink::Open3(COpenOptions&, IOpenCallbackUI*) /usr/src/debug/7zip/CPP/7zip/Bundles/Alone7z/../../UI/Common/OpenArchive.cpp:3487:17
    #13 0x604868d49f6d in CArchiveLink::Open_Strict(COpenOptions&, IOpenCallbackUI*) /usr/src/debug/7zip/CPP/7zip/Bundles/Alone7z/../../UI/Common/../Common/OpenArchive.h:437:22
    #14 0x604868d49f6d in Extract(CCodecs*, CObjectVector<COpenType> const&, CRecordVector<int> const&, CObjectVector<UString>&, CObjectVector<UString>&, NWildcard::CCensorNode const&, CExtractOptions const&, IOpenCallbackUI*, IExtractCallbackUI*, IFolderArchiveExtractCallback*, IHashCalc*, UString&, CDecompressStat&) /usr/src/debug/7zip/CPP/7zip/Bundles/Alone7z/../../UI/Common/Extract.cpp:422:30
    #15 0x604868e55d25 in Main2(int, char**) /usr/src/debug/7zip/CPP/7zip/Bundles/Alone7z/../../UI/Console/Main.cpp:1378:21
    #16 0x604868e68411 in main /usr/src/debug/7zip/CPP/7zip/Bundles/Alone7z/../../UI/Console/MainAr.cpp:132:11
    #17 0x7bc23b33f6b4 in __libc_start_call_main /usr/src/debug/glibc/glibc/csu/../sysdeps/nptl/libc_start_call_main.h:58:16
    #18 0x7bc23b33f768 in __libc_start_main /usr/src/debug/glibc/glibc/csu/../csu/libc-start.c:360:3
    #19 0x604868717a24 in _start (/tmp/7zfuzz/7zr+0xbba24) (BuildId: 8450a6b1d6712a80c42046efedb6d74eb798c38d)

==589143==HINT: if you don't care about these errors you may set allocator_may_return_null=1
SUMMARY: AddressSanitizer: allocation-size-too-big (/tmp/7zfuzz/7zr+0x1fa7e2) (BuildId: 8450a6b1d6712a80c42046efedb6d74eb798c38d) in operator new[](unsigned long)
==589143==ABORTING

这触发原理我还真没看明白,不过 7z 文件格式里有不少直接由用户控制长度的字段,出现这种情况也算正常吧。

第四次尝试

CPP/Common/MyBuffer.h 里内存分配相关的函数用 __attribute__((no_sanitize("address"))) 标记,这样就不会被 ASAN 追踪了。由于这些内存分配函数本来就是热点,所以性能提升了不少,也不会报无意义的 OOM 错误,而且应该不会错过什么漏洞(毕竟真的只是 new[] 而已)。

刚才发现 afl-llvm-cmplog 这个工具,通过插桩记录程序中的比较操作,帮助 afl++ 生成能触发关键路径的输入。要想启用,需要在编译时加上 AFL_LLVM_CMPLOG=1 环境变量,fuzz 时加上参数 -c 0。在面对需要特定文件格式输入(魔法头之类)的 fuzzing 时效果明显。

AFL_USE_ASAN=1 AFL_SKIP_CPUFREQ=1 afl-fuzz -i input -o output -- ./7zr x @@ -y -so

结果:map density 翻倍了!

wget https://www.gstatic.com/webp/gallery/1.webp -O input/1.webp
wget https://www.gstatic.com/webp/gallery/2.webp -O input/2.webp
wget https://www.gstatic.com/webp/gallery/3.webp -O input/3.webp
wget https://www.gstatic.com/webp/gallery/4.webp -O input/4.webp
wget https://www.gstatic.com/webp/gallery/5.webp -O input/5.webp
wget https://raw.githubusercontent.com/signalapp/Signal-Android/main/glide-webp/app/src/main/assets/test_01.webp -O input/test_01.webp
wget https://raw.githubusercontent.com/signalapp/Signal-Android/main/glide-webp/app/src/main/assets/test_02.webp -O input/test_02.webp
wget https://raw.githubusercontent.com/signalapp/Signal-Android/main/glide-webp/app/src/main/assets/test_03.webp -O input/test_03.webp
wget https://raw.githubusercontent.com/signalapp/Signal-Android/main/glide-webp/app/src/main/assets/test_04.webp -O input/test_04.webp
wget https://raw.githubusercontent.com/signalapp/Signal-Android/main/glide-webp/app/src/main/assets/test_05.webp -O input/test_05.webp
wget https://raw.githubusercontent.com/signalapp/Signal-Android/main/glide-webp/app/src/main/assets/test_06_lossless.webp -O input/test_06_lossless.webp
wget https://raw.githubusercontent.com/signalapp/Signal-Android/main/glide-webp/app/src/main/assets/test_06_lossy.webp -O input/test_06_lossy.webp
wget https://raw.githubusercontent.com/signalapp/Signal-Android/main/glide-webp/app/src/main/assets/test_07_lossless.webp -O input/test_07_lossless.webp
wget https://raw.githubusercontent.com/signalapp/Signal-Android/main/glide-webp/app/src/main/assets/test_07_lossy.webp -O input/test_07_lossy.webp
wget https://raw.githubusercontent.com/signalapp/Signal-Android/main/glide-webp/app/src/main/assets/test_08_lossless.webp -O input/test_08_lossless.webp
wget https://raw.githubusercontent.com/signalapp/Signal-Android/main/glide-webp/app/src/main/assets/test_08_lossy.webp -O input/test_08_lossy.webp
wget https://raw.githubusercontent.com/signalapp/Signal-Android/main/glide-webp/app/src/main/assets/test_09_large.webp -O input/test_09_large.webp

第五次尝试

AFL_QUIET=1

CC=afl-clang-lto CXX=afl-clang-lto++ ./configure --disable-shared
  • 2 Master
  • 4 AFL_USE_ASAN=1 AFL_USE_UBSAN=1 AFL_USE_CFISAN=1
  • 6 AFL_LLVM_CMPLOG=1 -l 2AT
  • 8 AFL_LLVM_LAF_ALL=1
  • 10
cd CPP/7zip/Bundles/Alone2/

export EXEPTOR_CONFIG=$(pwd)/libexeptor.yaml
export EXEPTOR_LOG=$(pwd)/exeptor.log

LD_PRELOAD=/tmp/exeptor/build/libexeptor.so make -j -f ../../cmpl_clang_x64.mak CC=afl-clang-lto CXX=afl-clang-lto++ USE_ASM=1 MY_ASM="uasm"

AFL_QUIET=1 AFL_LLVM_CMPLOG=1 LD_PRELOAD=/tmp/exeptor/build/libexeptor.so make -j -f ../../cmpl_clang_x64.mak CC=afl-clang-lto CXX=afl-clang-lto++ USE_ASM=1 MY_ASM="uasm"

AFL_LLVM_LAF_ALL=1 LD_PRELOAD=/tmp/exeptor/build/libexeptor.so make -j -f ../../cmpl_clang_x64.mak CC=afl-clang-lto CXX=afl-clang-lto++ USE_ASM=1 MY_ASM="uasm"

-f

llvm-ar rcs 7z.a ./b/c_x64/*.o

docker build . -t fuzz-7zip
docker run --rm -it --tmpfs /ramdisk:exec fuzz-7zip
cp -r /root/fuzz/ /ramdisk/ && cd /ramdisk/fuzz
export AFL_TESTCACHE_SIZE=256
AFL_FINAL_SYNC=1 afl-fuzz -i input -o output -M master -a binary -G 1024 -b 2 -- ./7zz_normal x -so -tzip @@
AFL_USE_ASAN=1 afl-fuzz -i input -o output -S sanitizer -a binary -G 1024 -b 4 -- ./7zz_sanitizer x -so -tzip @@
afl-fuzz -i input -o output -S cmplog -a binary -G 1024 -b 6 -c 0 -l 2AT -- ./7zz_cmplog x -so -tzip @@
afl-fuzz -i input -o output -S compcov -a binary -G 1024 -b 8 -- ./7zz_cmpcov x -so -tzip @@
#define Z7_ST
#include "../CPP/7zip/Archive/Common/DummyOutStream.h"
#include "../CPP/7zip/Common/CWrappers.h"
#include "7zAlloc.h"
#include "7zTypes.h"
#include "Alloc.h"
#include "Xz.h"
#include <string.h>
#include <unistd.h>
static const ISzAlloc alloc = {SzAlloc, SzFree};
static int isMT = False;
static CXzStatInfo stat;
class FakeOutStream : public ISequentialOutStream {
  public:
    // IUnknown
    STDMETHOD(QueryInterface)(REFIID, void **) { return S_OK; }
    STDMETHOD_(ULONG, AddRef)() { return 1; }
    STDMETHOD_(ULONG, Release)() { return 1; }
    // ISequentialOutStream
    STDMETHOD(Write)(const void *, UInt32 size, UInt32 *processedSize) {
        *processedSize = size;
        return S_OK;
    }
};
// Dummy input stream for fuzzing, construt with const uint8_t *Data, size_t
// Size
#include <algorithm>
class BufInStream : public ISequentialInStream {
  private:
    const uint8_t *_data;
    size_t _size;
    size_t _pos;

  public:
    BufInStream() : _data(nullptr), _size(0), _pos(0) {}
    void SetData(const uint8_t *data, size_t size) {
        _data = data;
        _size = size;
        _pos = 0;
    }
    // IUnknown
    STDMETHOD(QueryInterface)(REFIID, void **) { return S_OK; }
    STDMETHOD_(ULONG, AddRef)() { return 1; }
    STDMETHOD_(ULONG, Release)() { return 1; }
    // ISequentialInStream
    STDMETHOD(Read)(void *data, UInt32 size, UInt32 *processedSize) {
        if (_pos >= _size) {
            if (processedSize)
                *processedSize = 0;
            return S_OK;
        }
        UInt32 toRead = (UInt32)std::min<size_t>(size, _size - _pos);
        memcpy(data, _data + _pos, toRead);
        _pos += toRead;
        if (processedSize)
            *processedSize = toRead;
        return S_OK;
    }
    STDMETHOD(Seek)(Int64 offset, UInt32 seekOrigin, UInt64 *newPosition) {
        if (seekOrigin == STREAM_SEEK_SET)
            _pos = (UInt32)offset;
        else if (seekOrigin == STREAM_SEEK_CUR)
            _pos += (UInt32)offset;
        else if (seekOrigin == STREAM_SEEK_END)
            _pos = (UInt32)(_size + offset);
        if (newPosition)
            *newPosition = _pos;
        return S_OK;
    }
};
extern "C" int LLVMFuzzerTestOneInput(const uint8_t *Data, size_t Size) {
    CXzDecMtHandle p = XzDecMt_Create(&alloc, &g_AlignedAlloc);
    CXzDecMtProps props;
    XzDecMtProps_Init(&props);
    BufInStream inStream;
    inStream.SetData(Data, Size);
    FakeOutStream outStream;
    CSeqInStreamWrap inWrap;
    CSeqOutStreamWrap outWrap;
    CCompressProgressWrap progressWrap;
    inWrap.Init(&inStream);
    outWrap.Init(&outStream);
    SRes res = XzDecMt_Decode(p, &props, NULL, CODER_FINISH_ANY, &outWrap.vt,
                              &inWrap.vt, &stat, &isMT, NULL);
    XzDecMt_Destroy(p);
    return 0;
}