Trouble With Pairs - InCTF Internationals 2021

tl;dr

  • Get fake_flag by using Consensus Attack.
  • Get xored_flag by forging individual signatures.
  • Xor them both to get the flag.

Challenge point: 925

No. of solves: 14

Challenge Author: Chandu-Kona

Challenge Description

1
2
We are testing a new Optimised Signature scheme for Authentication in our Voting System.
This might lead us to reduce the time taken for Election Process.

Handout

-    BLS.py
-    signer.py
-    server.py

BLS.py

  • BLS.py is same file as ciphersuites.py from py_ecc module with a function overriden.
1
2
3
4
# from py_ecc.bls.hash_to_curve import hash_to_G2
def hash_to_G2(msg, DST, hash):
m = int(hash(msg).hexdigest(),16)
return multiply(G2,m)

signer.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from BLS import G2ProofOfPossession as bls
from secret import data

result = []
for i in data:
d = {}
d['Name'] = i['Name']
d['Vote'] = i['Vote']
d['Count'] = i['Count']
d['PK'] = i['PK'].hex()
d['Sign'] = bls.Sign(i['PrivKey'],i['Vote'].encode()).hex()
result.append(d)

# print(result)
'''
[{'Name': 'Nebraska',
'Vote': 'R',
'Count': 5,
'PK': 'aa6fc9c17a1b2de916e5d5453444655e9f6dd3d456b96239f954bc30b80f551c44c1c2423825bc01577e1986098f362b',
'Sign': 'a09538da373b317adf63cacb53799417ba57d79486aeec78d7687b37e72625190741313800a7698beb2659b725ca728a074514b4cc1fc300dee2e2ae74516993f6760f0839cc4d712a108c58955e062bf45100966fca0288f39f9bfc8ab25706'},
.
.
.
{'Name': 'Michigan',
'Vote': 'D',
'Count': 16,
'PK': 'ae29e8f4d3c7b814042d04d12930bfc6f78eb12f3b9233a3338fedf42b784b6de6b5d575a0dee6d14de1a5ab9baaf5d9',
'Sign': 'a977d66e4fabaaa4d79e4d32f6b0d4e2901278d4e0d31e662af8929ac7ca540c377907b7e315b908f5e643e49b4a4fd914d90bb60305595ab6160cfbe0bbabb5c8a98f8ae37fc6af64faf7dbc35a6b55d7e8c3946dda6135a9332f484b818312'}]
'''

server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#!/usr/bin/env python3
from BLS import G2ProofOfPossession as bls
from secret import data, bytexor, fake_flag, flag
from json import loads
import sys


class Unbuffered(object):
def __init__(self, stream):
self.stream = stream

def write(self, data):
self.stream.write(data)
self.stream.flush()

def writelines(self, datas):
self.stream.writelines(datas)
self.stream.flush()

def __getattr__(self, attr):
return getattr(self.stream, attr)

sys.stdout = Unbuffered(sys.stdout)


header = '''We are testing a new Optimised Signature scheme for Authentication in Voting System.

You can send the Published Result in Specified Format
Json Format : {'Name' : name, 'Vote' : vote, 'Sign' : signature}
'''
invalid_json = "Invalid Json!"
invalid_sign = "Invalid signature!"
flag = f"Seems like we could never patch this bug, here is your reward : {bytexor( flag, fake_flag ).hex()}"
fake_flag = f"but, this one is already known, so here is your fake reward : {fake_flag.decode()}"

class Challenge():
def __init__(self):
self.data = data
self.Names = [i["Name"] for i in self.data]
self.result = []
for i in range(len(data)): self.result.append(self.Read(input("> ").strip()))


def Read(self, inp):
try:
data = loads(inp)
Name = data["Name"]
Vote = data["Vote"]
Sign = bytes.fromhex(data["Sign"])

assert Name in self.Names and Vote in ["R","D"]

self.data[self.Names.index(Name)]["Vote"] = Vote
self.data[self.Names.index(Name)]["Sign"] = Sign
except:
print(invalid_json)
sys.exit()


def Verify_aggregate(self):
try:
for j in ["D", "R"]:
aggregate_sign = bls.Aggregate([i["Sign"] for i in self.data if i["Vote"] == j])
aggregate_Pk = bls._AggregatePKs([i["PK"] for i in self.data if i["Vote"] == j])
if not bls.Verify(aggregate_Pk, j.encode(), aggregate_sign):
return False
return True
except:
print(invalid_sign)
sys.exit()


def Verify_individual(self):
try:
return all ( bls.Verify(i["PK"], i["Vote"].encode(), i["Sign"]) for i in self.data)
except:
print(invalid_sign)
sys.exit()


def Get_Majority(self):
return max( ["D","R"] , key = lambda j : sum( [ i["Count"] for i in self.data if i["Vote"] == j ] ) )


if __name__ == "__main__":
print(header)
challenge = Challenge()

if challenge.Verify_aggregate():

if challenge.Get_Majority() == "R":
print("WOW!!! You found the bug.")
else:
print("Everything is Verified and Perfect.")
sys.exit()

else:
print("Not Verified!")
sys.exit()

if challenge.Verify_individual():
print(flag)
sys.exit()

else:
print(fake_flag)
sys.exit()

Observation

If you have noticed, the signing had done by using BLS signatures,
so let us talk about how BLS works.

Before we begin, if you are new to ECC pairing, you can refer to this article here: link.

BLS

In 2001, Boneh, Lynn, and Shacham (BLS) invented an elegant signature scheme based on pairing. Let us assume Alice’s private key is x, and her public key is X = x G; H is a function which maps the message to a point on the elliptic curve E. The signature is simply S = x H(m) for message m.

To verify, we check whether e(G,S) is equal to e(X,H(m)),
since we know that:

1
e(G,S) = e(G,x*H(m)) = e(G,H(m))^x = e(x*G,H(m)) = e(X,H(m))

BLS Signature Aggregation

The Bls signature has an attractive security property that is used in ETH2. It allows signature aggregation. Let us assume we have n users. Each has Private Key xi, Public Key Xi = xi G (same generator for all users). Users sign their message mi as Si = xi H(mi). Now, in verification, instead of checking n signatures individually, we want to verify a single aggregate signature.

We compute an aggregate signature S = S1 + S2 + … + Sn to achieve the previous goal.
To verify S, we chack whether e(G,S) is equal to e(X1,H(m1)) e(X2,H(m2)) … * e(Xn,H(mn)), since we know that:

1
2
3
4
e(G,S)  = e(G, S1 + S2 + ... + Sn)
= e(G, x1 * H(m1)) * e(G, x2 * H(m2)) * ... * e(G, xn * H(mn))
= e(G,H(m1)) ^ x1 * e(G,H(m2)) ^ x2 * ... * e(G,H(mn)) ^ xn
= e(X1,H(m1)) * e(X2,H(m2)) * ... * e(Xn,H(mn))

Consensus Attacks

At a high level, the attacker’s goal is to create a set of invalid individual signatures, but their aggregate signature is valid. Therefore, some users will see valid signatures while others will see invalid signatures, i.e., the views among users are split.

Let us say there are four messages and signatures:

1
2
3
4
m1 = "message1", S1
m2 = "message2", S2
m3 = "message3", S3
m4 = "message4", S4

The attacker’s goal is to create malicious signatures so that users see malicious signatures, but the aggregate verification is valid. The attacker creates the following:

1
2
3
4
S1' = S1 - 2P
S2' = S2 + P
S3' = S3 - P
S4' = S4 + 2P

Where P is a valid point in the subgroup. here if we see that the aggregate does not change
since:

1
2
3
S1' + S2' + S3' + S4' = S1 - 2P + S2 + P + S3 - P + S4 + 2P
= S1 + S2 + S3 + S4 + p - P + 2P - 2P
= S1 + S2 + S3 + S4

Now that we know how BLS works let us get back to the script server.py,
If you noticed, the server takes all the votes and verifies the signatures using the aggregate. If you can forge successfully, check the individual signatures for a consensus attack. If the server detects any forgery, it returns fake_flag else. It means you successfully braked the system; hence it returns a flag.

Note: we need both flag and fake_flag since the Original flag is XORed with fake_flag.

Exploit Idea

Recovering Fake Flag

Here, we can use Consensus attack to recover fake_flag or swap a few signatures to trick the server.

In detailed

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def connect(data):
Host, Port = b'crypto.challenge.bi0s.in', 1337
io = remote(Host,Port)
# io = process('./server.py')
for i in data:
j = dumps({"Name": i["Name"], "Vote": i["Vote"], "Sign": i["Sign"]})
io.recvuntil(b'> ')
io.sendline(j)
# io.interactive()
io.recvuntil(b'reward : ')
return io.recvline().strip()

fake_result = result[:]
w = fake_result[idx+1]
ny = fake_result[idx]
ny["Sign"] = G2_to_signature( add(signature_to_G2(unhex(ny["Sign"])), multiply(signature_to_G2(unhex(w["Sign"])), 2) )).hex()
w["Sign"] = G2_to_signature( neg( signature_to_G2( unhex(w["Sign"]) ))).hex()
assert bls.Verify(bls._AggregatePKs([unhex(ny["PK"]), unhex(w["PK"])]), b'R', bls.Aggregate([unhex(ny["Sign"]), unhex(w["Sign"])]))
fake_result[idx] = ny
fake_result[idx+1] = w

fake_flag = connect(fake_result)

fake_flag = b’bi0s{7h1s_0n3_1s_n07_7h3_r1gh7_fl4g. :)}’

Recovering Flag

Here, we can make use of the faulty implementation of the Hash_to_G2 function in BLS.py, since H(m) = sha256(m)G; S = x H(m) and we know m, we can take the inverse of sha256(m) and cancel it out and now multiply our desired message hash to forge, or we can multiply sha256(desired message) with PublicKey X since X = x *G and sha256(dm)X = xsha256(dm)*G

In detailed

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def bytexor(data,pad):
assert len(data) == len(pad)
return bytes(i^j for i,j in zip(data,pad))

order = 52435875175126190479447740508185965837690552500527637822603658699938581184513
m1,m2 = [int(sha256(i).hexdigest(),16) for i in [b"D",b"R"] ]
idx,ny = [(i,j) for i,j in enumerate(result) if j["Name"] == 'New York' ][0]
ny["Sign"] = G2_to_signature( multiply( multiply( signature_to_G2( unhex(ny["Sign"]) ), invert(m1,order)), m2)).hex()
ny["Vote"] = "R"
assert bls.Verify(unhex(ny["PK"]) ,ny["Vote"].encode() ,unhex(ny["Sign"]) )
result[idx] = ny

xored_flag = unhex(connect(result).decode())

flag = bytexor(fake_flag,xored_flag)

Exploit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
from hashlib import sha256
from os import remove
from BLS import G2ProofOfPossession as bls
from py_ecc.bls.g2_primitives import G2_to_signature, signature_to_G2
from py_ecc.optimized_bls12_381 import add, multiply, neg
from gmpy2 import invert
from json import dumps
from signer import result
from pwn import *

unhex = lambda i : bytes.fromhex(i)

context.log_level = 'debug'

def bytexor(data,pad):
assert len(data) == len(pad)
return bytes(i^j for i,j in zip(data,pad))

def connect(data):
Host, Port = b'crypto.challenge.bi0s.in', 1337
io = remote(Host,Port)
# io = process('./server.py')
for i in data:
j = dumps({"Name": i["Name"], "Vote": i["Vote"], "Sign": i["Sign"]})
io.recvuntil(b'> ')
io.sendline(j)
# io.interactive()
io.recvuntil(b'reward : ')
return io.recvline().strip()



order = 52435875175126190479447740508185965837690552500527637822603658699938581184513
m1,m2 = [int(sha256(i).hexdigest(),16) for i in [b"D",b"R"] ]
idx,ny = [(i,j) for i,j in enumerate(result) if j["Name"] == 'New York' ][0]
ny["Sign"] = G2_to_signature( multiply( multiply( signature_to_G2( unhex(ny["Sign"]) ), invert(m1,order)), m2)).hex()
ny["Vote"] = "R"
assert bls.Verify(unhex(ny["PK"]) ,ny["Vote"].encode() ,unhex(ny["Sign"]) )
result[idx] = ny

xored_flag = unhex(connect(result).decode())

fake_result = result[:]
w = fake_result[idx+1]
ny = fake_result[idx]
ny["Sign"] = G2_to_signature( add(signature_to_G2(unhex(ny["Sign"])), multiply(signature_to_G2(unhex(w["Sign"])), 2) )).hex()
w["Sign"] = G2_to_signature( neg( signature_to_G2( unhex(w["Sign"]) ))).hex()
assert bls.Verify(bls._AggregatePKs([unhex(ny["PK"]), unhex(w["PK"])]), b'R', bls.Aggregate([unhex(ny["Sign"]), unhex(w["Sign"])]))
fake_result[idx] = ny
fake_result[idx+1] = w

fake_flag = connect(fake_result)


flag = bytexor(fake_flag,xored_flag)

print(flag)

assert flag == b'inctf{BLS_574nd5_f0r_B0n3h_Lynn_Sh4ch4m}'

Flag

flag = b’inctf{BLS_574nd5_f0r_B0n3h_Lynn_Sh4ch4m}’