Below code covers most of the oAuth 2.0 flows of Salesforce. Tweaking below code helps you
to implement any oAuth 2.0 flow within no time. All the Best !!
import flask
import requests
import tkinter as tk
import json
from json2html import *
from flask import request, jsonify
from base64 import urlsafe_b64encode
from datetime import datetime, timedelta
import Crypto
from flask import (
Flask,
render_template,
jsonify,
)
import os
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
app = flask.Flask(__name__)
app = Flask(__name__)
app.config["DEBUG"] = TrueJWT_HEADER = '{"alg":"RS256"}'JWT_CLAIM_ISS = "3MVG9d8..z.hDcPKuO0qkcUX0IncIWJdRX90Yc5BHsYUmbsHh8eeRweKXys8392v5gcuiK1_WHF3_zSS9rfoX"JWT_CLAIM_SUB = "david@lightning601.com"JWT_CLAIM_AUD = "https://login.salesforce.com"JWT_AUTH_EP = "https://login.salesforce.com/services/oauth2/token"'''===========================Home page ============================================='''
@app.route('/', methods=["GET"])
def home():
return "<h1> This blog is a great place to learn advanced concepts </h1>"
##############webserver first step ###########################
@app.route('/sfconnect', methods=["GET"])
def sfconnect():
# return "<a href='https://login.salesforce.com/services/oauth2/authorize?response_type=code&client_id=3MVG9d8..z.hDcPKuO0qkcUX0IncIWJdRX90Yc5BHsYUmbsHh8eeRweKXys8392v5gcuiK1_WHF3_zSS9rfoX&redirect_uri=http%3A%2F%2Flocalhost%3A5000%2Fsalesforcedata'> Webserver flow </a>" allflows = "<div style='background-color: #fad0c4;background-image: linear-gradient(315deg, #fad0c4 0%, #f1a7f1 74%);height:570px;width:1260px' ><div style='padding: 1em;position: absolute;'><ul style='list-style-type:disc;color: green;'>" + "<li style='color: green;font-size: 150%'>" + "<a href='https://login.salesforce.com/services/oauth2/authorize?response_type=code&client_id=3MVG9d8..z.hDcPKuO0qkcUX0IncIWJdRX90Yc5BHsYUmbsHh8eeRweKXys8392v5gcuiK1_WHF3_zSS9rfoX&redirect_uri=http%3A%2F%2Flocalhost%3A5000%2Fsalesforcedata'> Webserver flow </a>" + "</li>" + \
"<li style='color: green;font-size: 150%'>" + "<a href='https://login.salesforce.com/services/oauth2/authorize?response_type=token&client_id=3MVG9d8..z.hDcPKuO0qkcUX0IncIWJdRX90Yc5BHsYUmbsHh8eeRweKXys8392v5gcuiK1_WHF3_zSS9rfoX&redirect_uri=http%3A%2F%2Flocalhost%3A5000%2Fsfconnect'> User agent flow </a>" + "</li>" + \
"<li style='color: green;font-size: 150%'>" + "<a href='http://127.0.0.1:5000/unamepwd'>User name & Password flow </a>" + "</li>" + \
"<li style='color: green;font-size: 150%'>" + "<a href='http://127.0.0.1:5000/refresh'>Refresh token flow</a>" + "</li>" + \
"<li style='color: green;font-size: 150%'>" + "<a href='http://127.0.0.1:5000/jwtflow'>JWT Bearer flow</a>" + "</li>" + \
"<li style='color: green;font-size: 150%'>" + "<a href='http://127.0.0.1:5000/Deviceflow'>Device flow step 1</a>" + "</li>" + \
"<li style='color: green;font-size: 150%'>" + "<a href='http://127.0.0.1:5000/samlassertionflow'>Saml Assertion flow</a>" + "</li>"\
"<li style='color: green;font-size: 150%'>" + "<a href='https://asset-tokens.herokuapp.com/' target = '_blank' >Asset Token flow</a>" + "</li>"+ "</ul></div></div>"
return allflows
############################################################################################ Get token ###############################################################################################
@app.route('/salesforcedata', methods=["GET"])
def sfdata():
print(request.args)
if 'code' in request.args:
code = request.args["code"]
API_ENDPOINT = "https://login.salesforce.com/services/oauth2/token" # r = requests.post(url=AUTHORIZE_ENDPOINT, data=dataauthorize)
datafortoken = {'grant_type': 'authorization_code',
'client_id': '3MVG9d8..z.hDcPKuO0qkcUX0IncIWJdRX90Yc5BHsYUmbsHh8eeRweKXys8392v5gcuiK1_WHF3_zSS9rfoX',
'client_secret': 'B682A1156FF3A2CCF0E83764552CD1DD0FD2A4510577DDE69C408D80FB9F4A76',
'code': code,
'redirect_uri': 'http://localhost:5000/salesforcedata' }
r = requests.post(url=API_ENDPOINT, data=datafortoken)
sfdcres = r.text
print("The pastebin URL is:%s" % sfdcres)
dicdata = json.loads(sfdcres)
# print('#############',jsondata,'$$$$$$$$$$',dicdata);
acctkn = dicdata["access_token"]
print('parsed access token', acctkn)
# place your own domain name. return getdata(dicdata["access_token"])
############################################################################# User name and password flow####################################################################################
@app.route('/unamepwd', methods=["GET"])
def username_pwd():
API_ENDPOINT = "https://login.salesforce.com/services/oauth2/token"
clientid = '3MVG9G9pzCUSkzZvyptQFNKyeFZLuFEuMqSNrj_gLpwyqgHTmUSLK0qb0EvWiNm6VYzLixRXeULtJZlm3LevO' clientsec = '32B4AFDB1FD3E9B8FD62FD2333ACC9B33A65778C35634F8D618745746A2A6C86' # data to be sent to api data = {'grant_type': 'password',
'client_id': clientid,
'client_secret': clientsec,
'username': 'David@abc.com',
'password': 'pwdsecuritytoken' }
# String reqbody = 'grant_type=password&client_id='+clientid+'&client_secret='+clientsec+'&username='+'David@lightning601.com'+'&password='+'Welcome@18G841Lq4OPaBc3nvtMcGmhC5GH';
# sending post request and saving response as response object r = requests.post(url=API_ENDPOINT, data=data)
# extracting response text # sfdcres = json.dumps(r.text) resp = json.loads(r.text)
# return json2html.convert(json=json.loads(r.text))
return getdata(resp["access_token"])
############################################################################# Refresh token flow#############################################################################################
@app.route('/refresh', methods=["GET"])
def refrehflow():
API_ENDPOINT = "https://login.salesforce.com/services/oauth2/token"
clientid = '3MVG9G9pzCUSkzZvyptQFNKyeFZLuFEuMqSNrj_gLpwyqgHTmUSLK0qb0EvWiNm6VYzLixRXeULtJZlm3LevO' refresh_token = '5Aep8613hy0tHCYdhyHUIj3Fev55mM_VcyWVY6uxHslAKE.Fvf2x7ni.qVWP4HfxXN7ieW.wzJFXROLFe4F8cR9' # data to be sent to api data = {'grant_type': 'refresh_token',
'client_id': clientid,
'refresh_token': refresh_token
}
# String reqbody = 'grant_type=password&client_id='+clientid+'&client_secret='+clientsec+'&username='+'David@lightning601.com'+'&password='+'Welcome@18G841Lq4OPaBc3nvtMcGmhC5GH';
# sending post request and saving response as response object r = requests.post(url=API_ENDPOINT, data=data)
# extracting response text sfdcres = r.text
# return json2html.convert(json=json.loads(r.text)) resp = json.loads(r.text)
# return json2html.convert(json=json.loads(r.text))
return getdata(resp["access_token"])
############re usable method #########################
def getdata(acctkn):
END_POINT = "https://light601dep-dev-ed.my.salesforce.com/services/apexrest/contactsfordatabase/";
data = {"Authorization": "Bearer " + acctkn,
"Content-Type": "application/json" }
r = requests.get(END_POINT, headers=data)
print("contacts data from salesforce:%s" % r.content)
# dict = json.loads(r.content)
# return r.content return json2html.convert(json=json.loads(r.content))
def jwt_claim():
''' Function to package JWT Claim data in a base64 encoded string :return: base64 encoded jwt claims data '''
claim_template = '{{"iss": "{0}", "sub": "{1}", "aud": "{2}", "exp": {3}}}' claim = urlsafe_b64encode(JWT_HEADER.encode()).decode()
claim += "."
# expiration_ts = (datetime.now(tz=timezone.utc) + timedelta(minutes=5)).timestamp() expiration_ts = int((datetime.now() + timedelta(seconds=300)).timestamp())
payload = claim_template.format(JWT_CLAIM_ISS, JWT_CLAIM_SUB, JWT_CLAIM_AUD, expiration_ts)
print(payload)
claim += urlsafe_b64encode(payload.encode()).decode()
return claim
def sign_data(data):
f = open('pykey.pem', 'r')
print('$$$$$$$file$$$$$$$$', f)
rsa1key = RSA.importKey(f.read())
signer = PKCS1_v1_5.new(rsa1key)
digest = SHA256.new()
digest.update(data.encode())
sign = signer.sign(digest)
return urlsafe_b64encode(sign).decode()
def do_auth(endpoint, data):
''' Function to POST JWT claim to SFDC /oauth/token endpoint and receive an access_token :return: access token '''
r = requests.post(endpoint, data=data)
return r
@app.route('/jwtflow', methods=["GET"])
def jwtflow():
# Keeping with JWS spec, we need to remove the padding "=" characters from base64 encoded string claim = jwt_claim().replace("=", "")
# Keeping with JWS spec, we need to remove the padding "=" characters from base64 encoded string signed_claim = sign_data(claim).replace("=", "")
target_payload = claim + "." + signed_claim
auth_payload = {"grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer", "assertion": target_payload}
response = do_auth(JWT_AUTH_EP, data=auth_payload)
# convert the text dictionary to data structure so it can be rendered as a json properly response_text = eval(response.text)
response_headers = eval(str(response.headers))
return_dict = {"claim": claim, "signed_claim": signed_claim, "target_payload": target_payload,
"response_text": response_text, "response_headers": response_headers}
# return jsonify(return_dict) sfdcres = response.text
print("The SFDC response:%s" % sfdcres)
dicdata = json.loads(sfdcres)
# print('#############',jsondata,'$$$$$$$$$$',dicdata);
acctkn = dicdata["access_token"]
print('parsed access token', acctkn)
# place your own domain name. return getdata(dicdata["access_token"])
#####################Device flow#######################################
@app.route('/Deviceflow', methods=["GET"])
def deviceflow():
print(request.args)
Deviceflowclientid = '3MVG9d8..z.hDcPKuO0qkcUX0IvjSlDS5xKC7RazmseJKnAbeQxg3zIE55yk7zvsy3NpjLIayWxNoUU3RfYz0';
API_ENDPOINT = "https://login.salesforce.com/services/oauth2/token"
getdevicecode = {'response_type': 'device_code',
'client_id': Deviceflowclientid,
'client_secret': 'B682A1156FF3A2CCF0E83764552CD1DD0FD2A4510577DDE69C408D80FB9F4A76',
'redirect_uri': 'http://localhost:5000/Deviceflowstep2' }
r = requests.post(url=API_ENDPOINT, data=getdevicecode)
sfdcres = r.text
dicdata = json.loads(sfdcres)
devicecode = dicdata["device_code"]
parseddata = '' for i in dicdata:
parseddata = parseddata+"<tr><th>" + str(i) + "</th><td>" + str(dicdata[i]) + "</td></tr>" parseddata = "<table border='1' style='color:blue;background-color:gold;'><tbody>" + parseddata + "</tbody></table>" stepsbeforegettingtoken="Before clicking below link, take the verification uri <b style='color:red'>{}</b> and enter it into another device(Mobile/Deskstop)/n and then enter User code <b style='color:red'>{}</b>".format(str(dicdata['verification_uri']),str(dicdata['user_code']))
step2link = "<br/><br/><b style='color:Green'>"+stepsbeforegettingtoken+"</b><br/><h2><a href = " + "http://127.0.0.1:5000/Deviceflowstep2?devicecode=" + devicecode + "> Device flow step 2 (Get access token) " + "</h2>" parseddata = parseddata + step2link
return parseddata
###################################
#####################Device flow#######################################
@app.route('/Deviceflowstep2', methods=["GET"])
def deviceflowstep2():
print('####################', request.args)
# return request.args.devicecode
dicdata = json.dumps(request.args)
# devicecode = dicdata["devicecode"] print(dicdata)
dicdata = json.loads(dicdata)
Deviceflowclientid = '3MVG9d8..z.hDcPKuO0qkcUX0IvjSlDS5xKC7RazmseJKnAbeQxg3zIE55yk7zvsy3NpjLIayWxNoUU3RfYz0';
API_ENDPOINT = "https://login.salesforce.com/services/oauth2/token" # r = requests.post(url=AUTHORIZE_ENDPOINT, data=dataauthorize)
getdevicecode = {'grant_type': 'device',
'client_id': Deviceflowclientid,
'code': dicdata['devicecode']
}
r = requests.post(url=API_ENDPOINT, data=getdevicecode)
sfdcres = r.text
dicdata = json.loads(sfdcres)
acctkn = dicdata["access_token"]
print('parsed access token', acctkn)
return json2html.convert(json=json.loads(r.content))
#########################################################################################
def samlflow(entries):
assertion=str(entries['assertion'].get())
API_ENDPOINT = "https://light601dep-dev-ed.my.salesforce.com/services/oauth2/token" # r = requests.post(url=AUTHORIZE_ENDPOINT, data=dataauthorize) import base64
data=str(entries['assertion'].get())
getdevicecode = {'grant_type': 'assertion',
'assertion_type': 'urn:oasis:names:tc:SAML:2.0:profiles:SSO:browser',
'assertion': data}
r = requests.post(url=API_ENDPOINT, data=getdevicecode)
sfdcres = r.text
print('##################',sfdcres)
dicdata = json.loads(sfdcres)
acctkn = dicdata["access_token"]
print('parsed access token', acctkn)
return json2html.convert(json=json.loads(r.content))
def makeform(root, fields):
entries = {}
for field in fields:
print(field)
row = tk.Frame(root)
lab = tk.Label(row, width=22, text=field+": ", anchor='w')
ent = tk.Entry(row)
ent.insert(0, "0")
row.pack(side=tk.TOP,
fill=tk.X,
padx=5,
pady=5)
lab.pack(side=tk.LEFT)
ent.pack(side=tk.RIGHT,
expand=tk.YES,
fill=tk.X)
entries[field] = ent
return entries
@app.route('/samlassertionflow', methods=["GET"])
def samlassertionflow():
app = flask.Flask(__name__)
app = Flask(__name__)
root = tk.Tk()
fields = ('assertion', 'End point')
ents = makeform(root, fields)
b1 = tk.Button(root, text='Saml assertion flow',
command=(lambda e=ents: samlflow(e)))
b1.pack(side=tk.LEFT, padx=5, pady=5)
root.mainloop()
return 'Proceed with the generated input interface'
#########################################################################################
if __name__ == "__main__":
port = int(os.getenv("PORT", 5000))
# Run the app, listening on all IPs with our chosen port number app.run(host="0.0.0.0", port=port, debug=True, use_reloader=True)
app.run()