Prisjakt Insights API
Snippets
Shop offer inventory report
This is a Python
code snippet to download a shop offer inventory report. Use the snippet as-is (just remember to update the credentials), or use it as inspiration.
- Download and install
Python 3
(https://www.python.org/downloads/) - Save the snippet below to a file with the filename
download_report.py
- Update your client id (
CLIENT_ID
) and client secret (CLIENT_SECRET
) in the file - Run with the following command:
python3 download_report.py
$ python3 download_report.py
Fetching auth token...
Fetching shop id...
Requesting report for shop {shopId}...
Waiting for report 3a8601c8-a61c-4876-a9ea-8fa1855ce496 to be ready...
....
Generating report links...
Downloading and decompressing 1 file(s)...
Done ['3a8601c8-a61c-4876-a9ea-8fa1855ce496_000000000000.csv']
Python snippet
import gzip
import json
import time
from urllib import request, parse
# Change these values!
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
# Don't change anything below this line
INSIGHTS_BASE_URL = "https://api.schibsted.com/prisjakt/insights-v2"
INSIGHTS_AUTH_URL = f"{INSIGHTS_BASE_URL}/auth"
INSIGHTS_GRAPHQL_URL = f"{INSIGHTS_BASE_URL}/graphql"
def download_files(urls):
files = []
for url in urls:
filename = parse.urlparse(url["url"])[2].rpartition('/')[2][:-3]
files.append(filename)
with request.urlopen(url["url"]) as res, open(filename, 'wb') as out_file:
with gzip.GzipFile(fileobj=res) as uncompressed:
data = uncompressed.read()
out_file.write(data)
return files
def generate_report_links(report_id, access_token):
query = """
mutation CreateReportLinks($reportId: ID!) {
createReportLinks(reportId: $reportId) {
links {
url
}
}
}
"""
variables = {"reportId": report_id}
data = {
"query": query,
"variables": variables
}
encoded_data = json.dumps(data).encode('utf-8')
headers = get_headers("application/json")
headers.update({"Authorization": f"Bearer {access_token}"})
req = request.Request(
INSIGHTS_GRAPHQL_URL,
data=encoded_data,
headers=headers)
with request.urlopen(req, timeout=10) as res:
body = json.loads(res.read())
if "errors" in body:
print(body["errors"])
exit(1)
return body["data"]["createReportLinks"]["links"]
def wait_for_report(report_id, shop_id, access_token):
while True:
state = fetch_report_status(report_id, shop_id, access_token)
if state == "CREATED":
print("")
return
if state in ["FAILED", "DELETED"]:
print(f"Report state is {state}, exiting...")
exit(1)
print('.', end='', flush=True)
time.sleep(10)
def fetch_report_status(report_id, shop_id, access_token):
query = """
query ShopReports($shopId: ID!, $filter: ShopReportFilterOptions) {
shopReports(shopId: $shopId, filter: $filter) {
edges {
node {
state
}
}
}
}
"""
variables = {"shopId": shop_id, "filter": {"ids": [report_id]}}
data = {
"query": query,
"variables": variables
}
encoded_data = json.dumps(data).encode('utf-8')
headers = get_headers("application/json")
headers.update({"Authorization": f"Bearer {access_token}"})
req = request.Request(
INSIGHTS_GRAPHQL_URL,
data=encoded_data,
headers=headers)
with request.urlopen(req, timeout=10) as res:
body = json.loads(res.read())
if "errors" in body:
print(body["errors"])
exit(1)
return body["data"]["shopReports"]["edges"][0]["node"]["state"]
def request_report(shop_id, access_token):
query = """
mutation FeedReport($shopId: ID!) {
requestShopOfferInventoryReport(shopId: $shopId) {
id
}
}
"""
variables = {"shopId": shop_id}
data = {
"query": query,
"variables": variables
}
encoded_data = json.dumps(data).encode('utf-8')
headers = get_headers("application/json")
headers.update({"Authorization": f"Bearer {access_token}"})
req = request.Request(
INSIGHTS_GRAPHQL_URL,
data=encoded_data,
headers=headers)
with request.urlopen(req, timeout=10) as res:
body = json.loads(res.read())
if "errors" in body:
print(body["errors"])
exit(1)
return body["data"]["requestShopOfferInventoryReport"]["id"]
def fetch_shop_id(access_token):
query = """
query CurrentUser {
currentUser {
shops {
shop {
id
}
}
}
}
"""
data = {
"query": query
}
encoded_data = json.dumps(data).encode('utf-8')
headers = get_headers("application/json")
headers.update({"Authorization": f"Bearer {access_token}"})
req = request.Request(
INSIGHTS_GRAPHQL_URL,
data=encoded_data,
headers=headers)
with request.urlopen(req, timeout=10) as res:
body = json.loads(res.read())
if "errors" in body:
print(body["errors"])
exit(1)
shops = body["data"]["currentUser"]["shops"]
if len(shops) == 0:
print("No shop permissions found")
exit(1)
if len(shops) > 1:
print("Multiple shops found, using first one.")
return shops[0]["shop"]["id"]
def fetch_auth_token():
data = {
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"grant_type": "client_credentials"}
encoded_data = parse.urlencode(data).encode()
headers = get_headers("application/x-www-form-urlencoded")
req = request.Request(
INSIGHTS_AUTH_URL,
headers=headers,
data=encoded_data)
with request.urlopen(req, timeout=10) as res:
body = json.loads(res.read())
return body["access_token"]
def get_headers(content_type):
return {
"Client-Id": CLIENT_ID,
"Client-Secret": CLIENT_SECRET,
"Content-Type": content_type,
"User-Agent": "pj-feed-helper/1.0"
}
if __name__ == "__main__":
print("Fetching auth token...")
access_token = fetch_auth_token()
print("Fetching shop id...")
shop_id = fetch_shop_id(access_token)
print(f"Requesting report for shop {shop_id}...")
report_id = request_report(shop_id, access_token)
print(f"Waiting for report {report_id} to be ready...")
wait_for_report(report_id, shop_id, access_token)
print("Generating report links...")
links = generate_report_links(report_id, access_token)
print(f"Downloading and decompressing {len(links)} file(s)...")
files = download_files(links)
print("Done", files)