Monday, January 13, 2020

Salesforce JWT Flow with Python to retrieve and insert data into SQL.

We can use below code to fetch the data from SF via JWT flow from python and insert it into 
Database.In case if you want to schedule your python program to retrieve the data from SF to
take the backup of your data you can refer this link 
"https://datatofish.com/python-script-windows-scheduler/" to learn how to schedule 
any python program from Windows. 

Note: You need to tweak your code according to your Salesforce connected app settings like 
consumerkey and certificates and username of your SF.



import flask
import requests
import tkinter as tk
import json
from json2html import *
import mysql.connector
from flask import request, jsonify
from base64 import urlsafe_b64encode
from datetime import datetime, timedelta
from mysql.connector import Error
from mysql.connector import errorcode
import Crypto
from flask import (
    Flask,
    render_template,
    jsonify,
)

import os
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5

app = flask.Flask(__name__)
app = Flask(__name__)
app.config["DEBUG"] = TrueJWT_HEADER = '{"alg":"RS256"}'JWT_CLAIM_ISS = "3MVG9d8..z.hDcPKuO0qkcUX0IncIWJdRX90Yc5BHsYUmbsHh8eeRweKXys8392v5gcuiK1_WHF3_zSS9rfoX"JWT_CLAIM_SUB = "david@abc.com"JWT_CLAIM_AUD = "https://login.salesforce.com"JWT_AUTH_EP = "https://login.salesforce.com/services/oauth2/token"
@app.route('/', methods=["GET"])
def home():
    return "<h1> This blog is a great place to learn advanced concepts </h1>"

@app.route('/sfconnect', methods=["GET"])
def sfconnect():
    # return "     allflows = "<div style='background-color: #fad0c4;background-image: linear-gradient(315deg, #fad0c4 0%, #f1a7f1 74%);height:570px;width:1260px' ><div style='padding: 1em;position: absolute;'><ul style='list-style-type:disc;color: green;'>" + \
   "<li style='color: green;font-size: 150%'>" + "<a href='http://127.0.0.1:5000/jwtflow'>JWT Bearer flow</a>" + "</li>" + "</ul></div></div>"     return allflows

############re usable method #########################
def getdata(acctkn):
    END_POINT = "https://light601dep-dev-ed.my.salesforce.com/services/apexrest/contactsfordatabase/";

    data = {"Authorization": "Bearer " + acctkn,
            "Content-Type": "application/json"            }

    r = requests.get(END_POINT, headers=data)
    r2= json.loads(r.content)
    print("###test#### get typeof from salesforce:%s", type(r2))
    sf_data = r.content
    data=r2
    print("###test123####contacts data from salesforce:%s",data)
    mydb = mysql.connector.connect(
        host="127.0.0.1",
        user="root",
        passwd="password",
        database="salesforcedata",auth_plugin='mysql_native_password')
    mycursor = mydb.cursor()
    for x in data:
        sql = "INSERT INTO Contactsf (FirstName, LastName) VALUES (%s, %s)"        val = (x['FirstName'], x['LastName'])
        mycursor.execute(sql, tuple(val))
        mydb.commit()
        continue
    mycursor.close()  ## here all loops done    mydb.close()  ## close db connection
    return json2html.convert(json=json.loads(r.content))


def jwt_claim():
    '''    Function to package JWT Claim data in a base64 encoded string    :return:    base64 encoded jwt claims data    '''
    claim_template = '{{"iss": "{0}", "sub": "{1}", "aud": "{2}", "exp": {3}}}'    claim = urlsafe_b64encode(JWT_HEADER.encode()).decode()
    claim += "."
    # expiration_ts = (datetime.now(tz=timezone.utc) + timedelta(minutes=5)).timestamp()    expiration_ts = int((datetime.now() + timedelta(seconds=300)).timestamp())
    payload = claim_template.format(JWT_CLAIM_ISS, JWT_CLAIM_SUB, JWT_CLAIM_AUD, expiration_ts)
    print(payload)

    claim += urlsafe_b64encode(payload.encode()).decode()
    return claim


def sign_data(data):
    f = open('pykey.pem', 'r')
    print('$$$$$$$file$$$$$$$$', f)
    rsa1key = RSA.importKey(f.read())
    signer = PKCS1_v1_5.new(rsa1key)
    digest = SHA256.new()
    digest.update(data.encode())
    sign = signer.sign(digest)
    return urlsafe_b64encode(sign).decode()


def do_auth(endpoint, data):
    '''    Function to POST JWT claim to SFDC /oauth/token endpoint and receive an access_token    :return:    access token    '''
    r = requests.post(endpoint, data=data)
    return r


@app.route('/jwtflow', methods=["GET"])
def jwtflow():
    # Keeping with JWS spec, we need to remove the padding "=" characters from base64 encoded string    claim = jwt_claim().replace("=", "")

    # Keeping with JWS spec, we need to remove the padding "=" characters from base64 encoded string    signed_claim = sign_data(claim).replace("=", "")

    target_payload = claim + "." + signed_claim

    auth_payload = {"grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer", "assertion": target_payload}
    response = do_auth(JWT_AUTH_EP, data=auth_payload)

    #  convert the text dictionary to data structure so it can be rendered as a json properly    response_text = eval(response.text)

    response_headers = eval(str(response.headers))

    return_dict = {"claim": claim, "signed_claim": signed_claim, "target_payload": target_payload,
                   "response_text": response_text, "response_headers": response_headers}
    # return jsonify(return_dict)    sfdcres = response.text
    print("The SFDC response:%s" % sfdcres)

    dicdata = json.loads(sfdcres)

    # print('#############',jsondata,'$$$$$$$$$$',dicdata);
    acctkn = dicdata["access_token"]

    print('parsed access token', acctkn)


    return getdata(acctkn)

if __name__ == "__main__":
    port = int(os.getenv("PORT", 8001))
    # Run the app, listening on all IPs with our chosen port number    app.run(host="0.0.0.0", port=port, debug=True, use_reloader=True)

app.run()

No comments:

Post a Comment