#!/usr/bin/python import os, json, sys, ipaddress, time, re import requests API_SLEEP = 3 NET_ERR_SLEEP = 300 class PorkbunClient(): def __init__(self): self.secret_key = os.environ["PORKBUN_SECRET_API_KEY"] self.api_key = os.environ["PORKBUN_API_KEY"] self.client = requests.Session() self.porkbun_url_base = "https://porkbun.com/api/json/v3" self.domain = os.environ["DOMAINS"][0] self.ttl = os.environ.get("DNS_TTL", "5184000") self.get_ips() self.update_records() def make_base_auth_body(self): return { "secretapikey": self.secret_key, "apikey": self.api_key } def get_ips(self): ip_naive = self.get_ip() if ipaddress.ip_address(ip_naive).version == 4: self.ipv4 = ip_naive else: self.ipv6 = ip_naive time.sleep(API_SLEEP) self.ipv4 = self.get_ip(True) def get_ip(self, force_v4 = False): try: json_body = self.make_base_auth_body() if force_v4: url = "https://api-ipv4.porkbun.com/api/json/v3/ping" else: url = self.porkbun_url_base + "/ping" resp = self.client.post(url, json = json_body) if resp.status == 200: return json.loads(resp.text)['yourIp'] else: print('API error') sys.exit(1) except: print('Network error') time.sleep(NET_ERR_SLEEP) sys.exit(2) def update_dns_record(self, content, record_type="A"): try: record = self.get_existing_record(record_type) time.sleep(API_SLEEP) json_body = self.make_base_auth_body() if record == None: url = self.porkbun_url_base + "/dns/create/" + self.domain json_body.update({ "type": record_type, "content": content, "ttl": self.ttl }) else: url = self.porkbun_url_base + "/dns/editByNameType/" + self.domain + "/" + record_type json_body.update({ "content": content, "ttl": self.ttl }) resp = self.client.post(url, json = json_body) if resp.status != 200: print('API error') sys.exit(1) except: print("Network error") time.sleep(NET_ERR_SLEEP) sys.exit(2) def update_records(self): if hasattr(self, "ipv4"): self.update_dns_record(self.ipv4) if hasattr(self, "ipv6"): self.update_dns_record(self.ipv6, "AAAA") def main(): porkbun_client = PorkbunClient() if __name__ == "__main__": main()