#!/usr/bin/env python """ tag fields: status (open closed) type name metrics: statuscode (0-6) waitingtime unused: _ts (timestamp) """ from datetime import datetime as dt import sys import json import requests import json from os.path import join,dirname,abspath name = "waitingtimes" status_codes = [ "Open", "90Plus", "Closed", "Temporariy Closed", "Closed due to Weather", "Special Tour" ] api_url = "https://api.europapark.de/api-5.4/" def get_db(id): with open(abspath(join(dirname(__file__),"data",id+".json")),'rb') as f: return json.load(f) name_mapping = { a["code"]: a for a in get_db("attractions") } def gen_token(): from binascii import hexlify import hmac import hashlib import base64 secret = "ZhQCqoZp" message= dt.utcnow().strftime("%Y%m%d%H") dig = hmac.new(secret.encode("utf-8"), msg=message.encode("utf-8"), digestmod=hashlib.sha256).digest() return hexlify(dig).upper().decode() def get_live(page): code = gen_token() headers = requests.utils.default_headers() headers.update( { "User-Agent": "okhttp/2.7.5", }) resp = requests.get( api_url + page, params = {"mock":"false","token":code}, headers = headers).json() return resp def get_data(mock=False): now = dt.utcnow() if mock: resp = get_db(name) else: resp = get_live("waitingtimes") for v in resp: data = {} data["status"] = status_codes[v["status"]] data["statuscode"] = v["status"] data["name"] = name_mapping[v["code"]]["nameGerman"] data["type"] = name data["waitingtime"] = v["time"] data["_ts"] = now.isoformat(timespec="seconds") + "Z" yield data def main(): kvt = get_data() print(json.dumps(list(kvt))) def get_mock_data(): return get_data(True) if __name__ == "__main__": main()