Prisjakt Insights API icon

Prisjakt Insights API

(0 reviews)

Snippets

Shop offer inventory report

This is a Python code snippet to download a shop offer inventory report. Use the snippet as-is (just remember to update the credentials), or use it as inspiration.

  1. Download and install Python 3 (https://www.python.org/downloads/)
  2. Save the snippet below to a file with the filename download_report.py
  3. Update your client id (CLIENT_ID) and client secret (CLIENT_SECRET) in the file
  4. Run with the following command: python3 download_report.py
$ python3 download_report.py
Fetching auth token...
Fetching shop id...
Requesting report for shop {shopId}...
Waiting for report 3a8601c8-a61c-4876-a9ea-8fa1855ce496 to be ready...
....
Generating report links...
Downloading and decompressing 1 file(s)...
Done ['3a8601c8-a61c-4876-a9ea-8fa1855ce496_000000000000.csv']
Python snippet
import gzip
import json
import time
from urllib import request, parse

# Change these values!
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"

# Don't change anything below this line
INSIGHTS_BASE_URL = "https://api.schibsted.com/prisjakt/insights-v2"
INSIGHTS_AUTH_URL = f"{INSIGHTS_BASE_URL}/auth"
INSIGHTS_GRAPHQL_URL = f"{INSIGHTS_BASE_URL}/graphql"


def download_files(urls):
    files = []
    for url in urls:
        filename = parse.urlparse(url["url"])[2].rpartition('/')[2][:-3]
        files.append(filename)

        with request.urlopen(url["url"]) as res, open(filename, 'wb') as out_file:
            with gzip.GzipFile(fileobj=res) as uncompressed:
                data = uncompressed.read()
                out_file.write(data)

    return files


def generate_report_links(report_id, access_token):
    query = """
    mutation CreateReportLinks($reportId: ID!) {
        createReportLinks(reportId: $reportId) {
            links {
                url
            }
        }
    }
    """
    variables = {"reportId": report_id}
    data = {
        "query": query,
        "variables": variables
    }

    encoded_data = json.dumps(data).encode('utf-8')

    headers = get_headers("application/json")
    headers.update({"Authorization": f"Bearer {access_token}"})

    req = request.Request(
        INSIGHTS_GRAPHQL_URL,
        data=encoded_data,
        headers=headers)

    with request.urlopen(req, timeout=10) as res:
        body = json.loads(res.read())
        if "errors" in body:
            print(body["errors"])
            exit(1)

        return body["data"]["createReportLinks"]["links"]


def wait_for_report(report_id, shop_id, access_token):
    while True:
        state = fetch_report_status(report_id, shop_id, access_token)
        if state == "CREATED":
            print("")
            return
        if state in ["FAILED", "DELETED"]:
            print(f"Report state is {state}, exiting...")
            exit(1)

        print('.', end='', flush=True)
        time.sleep(10)


def fetch_report_status(report_id, shop_id, access_token):
    query = """
    query ShopReports($shopId: ID!, $filter: ShopReportFilterOptions) {
        shopReports(shopId: $shopId, filter: $filter) {
            edges {
                node {
                    state
                }
            }
        }
    }
    """
    variables = {"shopId": shop_id, "filter": {"ids": [report_id]}}
    data = {
        "query": query,
        "variables": variables
    }

    encoded_data = json.dumps(data).encode('utf-8')

    headers = get_headers("application/json")
    headers.update({"Authorization": f"Bearer {access_token}"})

    req = request.Request(
        INSIGHTS_GRAPHQL_URL,
        data=encoded_data,
        headers=headers)

    with request.urlopen(req, timeout=10) as res:
        body = json.loads(res.read())
        if "errors" in body:
            print(body["errors"])
            exit(1)

        return body["data"]["shopReports"]["edges"][0]["node"]["state"]


def request_report(shop_id, access_token):
    query = """
    mutation FeedReport($shopId: ID!) {
        requestShopOfferInventoryReport(shopId: $shopId) {
            id
        }
    }
    """
    variables = {"shopId": shop_id}
    data = {
        "query": query,
        "variables": variables
    }

    encoded_data = json.dumps(data).encode('utf-8')

    headers = get_headers("application/json")
    headers.update({"Authorization": f"Bearer {access_token}"})

    req = request.Request(
        INSIGHTS_GRAPHQL_URL,
        data=encoded_data,
        headers=headers)

    with request.urlopen(req, timeout=10) as res:
        body = json.loads(res.read())
        if "errors" in body:
            print(body["errors"])
            exit(1)

        return body["data"]["requestShopOfferInventoryReport"]["id"]


def fetch_shop_id(access_token):
    query = """
    query CurrentUser {
        currentUser {
            shops {
                shop {
                    id
                }
            }
        }
    }
    """
    data = {
        "query": query
    }

    encoded_data = json.dumps(data).encode('utf-8')

    headers = get_headers("application/json")
    headers.update({"Authorization": f"Bearer {access_token}"})

    req = request.Request(
        INSIGHTS_GRAPHQL_URL,
        data=encoded_data,
        headers=headers)

    with request.urlopen(req, timeout=10) as res:
        body = json.loads(res.read())
        if "errors" in body:
            print(body["errors"])
            exit(1)

        shops = body["data"]["currentUser"]["shops"]
        if len(shops) == 0:
            print("No shop permissions found")
            exit(1)

        if len(shops) > 1:
            print("Multiple shops found, using first one.")

        return shops[0]["shop"]["id"]


def fetch_auth_token():
    data = {
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET,
        "grant_type": "client_credentials"}
    encoded_data = parse.urlencode(data).encode()
    headers = get_headers("application/x-www-form-urlencoded")
    req = request.Request(
        INSIGHTS_AUTH_URL,
        headers=headers,
        data=encoded_data)

    with request.urlopen(req, timeout=10) as res:
        body = json.loads(res.read())
        return body["access_token"]


def get_headers(content_type):
    return {
        "Client-Id": CLIENT_ID,
        "Client-Secret": CLIENT_SECRET,
        "Content-Type": content_type,
        "User-Agent": "pj-feed-helper/1.0"
    }


if __name__ == "__main__":
    print("Fetching auth token...")
    access_token = fetch_auth_token()
    print("Fetching shop id...")
    shop_id = fetch_shop_id(access_token)
    print(f"Requesting report for shop {shop_id}...")
    report_id = request_report(shop_id, access_token)
    print(f"Waiting for report {report_id} to be ready...")
    wait_for_report(report_id, shop_id, access_token)
    print("Generating report links...")
    links = generate_report_links(report_id, access_token)
    print(f"Downloading and decompressing {len(links)} file(s)...")
    files = download_files(links)
    print("Done", files)

Reviews