From 1cf87d9973e5debac4e7fe883cd93b1083eceaf8 Mon Sep 17 00:00:00 2001 From: Ben D Date: Fri, 21 Jan 2022 13:30:35 -0800 Subject: [PATCH] Initial Commit --- e911_helper.py | 1165 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1165 insertions(+) create mode 100644 e911_helper.py diff --git a/e911_helper.py b/e911_helper.py new file mode 100644 index 0000000..bf6e2d8 --- /dev/null +++ b/e911_helper.py @@ -0,0 +1,1165 @@ +#!/usr/bin/env python3 + +# +# (C) 2020 VOICE1 LLC +# + +import datetime +import requests +import json +import sys +import os + +try: + from loguru import logger +except ImportError: + import logging + logger = logging.getLogger(__name__) + +# TODO: Set to config file. + +config_file = "config.json" +if os.path.isfile(config_file): + with open(config_file) as f: + config_data = json.load(f) +else: + logger.critical(f"Unable to load config.json. Aborting.") + # Generate empty config.json + with open('config.json', 'w') as config: + config.write( +""" +[ + { + "regcode": "", + "company": "", + "username": "", + "password": "", + "provider_username": "", + "provider_password": "", + "911_callrule_id": 0, + "out_call_rule_id": 0, + "redirect_ivr_id": "", + "tenant": { + "tenant1": { + "username": "", + "password": "", + "provider_username": "", + "provider_password": "", + "911_callrule_id": 0, + "out_call_rule_id": 0, + "redirect_ivr_id": "0" + } + } + } +] +""" + ) + logger.info(f"Generated empty config.json file. Besure to populate it with required details before proceeding.") + sys.exit() + +# Check and update current file to latest version. +def write_file(filename, content): + # put the content into the file + with open(filename, 'wb') as fp: + fp.write(content) + + + +# End update check + +add_type_list = [ + "", + "Apartment", + "Basement", + "Building", + "Department", + "Floor", + "Front", + "Key", + "Lobby", + "Lot", + "Lower", + "Office", + "Penthouse", + "Pier", + "Rear", + "Residential", + "Room", + "Side", + "Slip", + "Space", + "Stop", + "Suite", + "Trailer", + "Unit", + "Upper", +] + + +def create_helpdeskticket(**kwargs): + """Open helpdesk ticket w/ VOICE1""" + url = "https://api.voice1.me/tickets" + params = { + "email": "", + "subject": "Helpdesk Ticket", + "body": "Please call me", + } + r = requests.post(url, params=params) + return + + +def update_helpdeskticket(**kwargs): + """update existing ticket with voice1""" + params = {"ticket_id": "", "body": "Please call me", "status": "open"} + r = requests.patch(url, params=params) + return + + +def get_nth_key(dictionary, n=0): + if n < 0: + n += len(dictionary) + for i, key in enumerate(dictionary.keys()): + if i == n: + return key + raise IndexError("dictionary index out of range") + + +def set_tenant(**kwargs): + global regcode + global config + global tenant + global swvx_url + global swvx_auth + global nwsip_url + global nwsip_auth + + # Get 1st REGCODE in config. + for i, r in enumerate(config_data): + print(f"{i}) {r['regcode']}") + regcode_i = input(f"Select your 6 digit regcode [{0}]: ") or 0 + config = config_data[int(regcode_i)] + regcode = config["regcode"] + + # Populate Tenant Lists + tenant = "None" + if config.get("tenant"): + tenants = config["tenant"] + for i, t in enumerate(config["tenant"]): + print(f"{i}) {t}") + + tenant = input(f"Select the tenant to modify or enter to skip: ") or tenant + if tenant != "None": + tenant = get_nth_key(config["tenant"], int(tenant)) + config = config["tenant"][tenant] + + redirect_ivr_id = config.get("redirect_ivr_id") + nwsip_url = "https://api.nwsip.com/v1" + nwsip_auth = (config["provider_username"], config["provider_password"]) + + swvx_url = f"https://api.switchvoxuc.com/switchvox/{regcode.upper()}" + swvx_auth = (config["username"], config["password"]) + + +# Get Outbound 911 Caller ID Rule +def get_caller_id_rule(phone_number=None): + """ + Returns cid_rule_id for matching outbound caller id number. + Returns None if not found. + + """ + params = {} + url = swvx_url + f"/caller_ids/{phone_number}" + r = requests.get(url, auth=swvx_auth, params=params) + if r.status_code != 200: + logger.error(f"Error: {r.status_code} {r.reason} | {r.text}") + return None + data = r.json()["data"] + + if data: + for d in data: + logger.info(f"CID Rule: {d['id']}") + cid_rule_id = d["id"] + return cid_rule_id + + +# Create outbound caller id rule + + +def set_caller_id_rule( + caller_id_number, + caller_id_name, + extension, + description="Automated", + title="Outbound Caller ID", + priority=6, + call_rule_id=0, +): + """ + Set an outbound Caller ID Rule, if it exists update the rule. + call_rule_id = 6 for 911 + """ + payload = { + "outgoing_call_rule_id": call_rule_id, + "caller_id_name": caller_id_name[:15], + "caller_id_number": caller_id_number, + "description": description, + "rule_type": "single", + "title": title, + "priority": priority, + "extension": extension, + } + + # Lookup rule by caller ID. + logger.debug(f"Looking up existing caller id rules matching {caller_id_number}") + r = requests.get(swvx_url + f"/caller_ids/{caller_id_number}", auth=swvx_auth) + if r.status_code != 200: + logger.error(f"Error: {r.status_code} {r.reason} {r.text}") + sys.exit() + data = r.json()["data"] + # logger.debug(f"data: {data}") + if not data: + logger.error(f"No existing rule returned") + # Add new record. + r = requests.post(swvx_url + f"/caller_ids", auth=swvx_auth, json=payload) + if r.status_code != 201: + logger.critical(f"Unable to create caller id rule") + logger.debug(f"{r.text}") + return + logger.info(f"{r.json()}") + return r.json()["data"] + # Data exists, a.k.a. Need to update a rule? + logger.warning(f"Rule already exists but no code to update.") + + # Quickly remove any 'dependents' from outgoing call rule so as to slim down the payload + for d in data: + d["outgoing_call_rule"].pop("dependents") + + logger.debug(f"\n{json.dumps(data, indent=4)}") + logger.warning(f"Submit and review entry") + return + + +# Remove Outbound 911 Caller ID Rule + + +def remove_caller_id_rule(cid_rule_id=None): + """ + :param cid_rule_id: The outbound caller id rule to specifiy for removal. + :return: Returns .json() response. + """ + # !! Remove the caller id rule + url = swvx_url + f"/caller_ids/remove/{cid_rule_id}" + logger.critical(f"Removing outbound caller id: {url}") + # logger.debug(json.dumps(d, indent=4)) + params = {} + r = requests.delete(url, auth=swvx_auth, params=params) + if r.status_code != 201: + logger.error(f"Error: {r.status_code} {r.reason} | {r.text}") + return r.json() + + +# Get inbound route of number + + +def inbound_route_by_number(inbound_did): + try: + r = requests.get( + swvx_url + f"/routes/inbound/number/{inbound_did}", auth=swvx_auth + ) + if r.status_code == 200: + return r.json()["data"] + except Exception as e: + logger.error(f"Exception: {e}") + return None + + +# Redirect Inbound DID to new destination + + +def redirect_did(inbound_did, dest_account_id, note="", priority=1, label="Available"): + + route = inbound_route_by_number(inbound_did=inbound_did) + payload = { + "name": label, + "note": note, + "priority": priority, + "type": "route_number", + "number": inbound_did, + "any_provider": 0, + "call_type": 0, + "destination_account_id": dest_account_id, + } + if route: + if not route["incoming_did_route"]: + # If route does not exist we need to add a new route. + logger.warning( + f"No existing route for this number found. Adding a NEW inbound route." + ) + rr = requests.post( + swvx_url + f"/routes/inbound/{inbound_did}/{dest_account_id}", + json=payload, + auth=swvx_auth, + ) + if rr.status_code == 201: + return {"success": "submitted"} + logger.critical(f"{rr.status_code} {rr.reason} {rr.text}") + for r in route["incoming_did_route"]: + note = f"Modified: {datetime.datetime.today()}. {note}" + route_id = r["id"] + logger.info( + f"Modifying inbound route {route_id}, setting new destination to {dest_account_id}@{priority}" + ) + rr = requests.patch( + swvx_url + f"/routes/inbound/{route_id}", json=payload, auth=swvx_auth + ) + if rr.status_code == 201: + return {"success": "submitted"} + logger.warning(f"Be sure to verify the priority order for proper routing.") + return + + +####### +# E911 Address functions (carrier side) +# +# Remove E911 Address associated w/ Number (Carrier side) + + +def remove_e911_address(e911_number, **kwargs): + """ + Removes address association with the number provided. + """ + result = None + endpoint = f"/e911s/{e911_number}" + try: + params = {} + logger.debug(f"{nwsip_url + endpoint}") + r = requests.delete(nwsip_url + endpoint, auth=nwsip_auth, params=params) + result = r.json() + except Exception as e: + logger.error(f"Exception: {e}") + return result + + +# Add Address +def format_address(address, fmt="human"): + """Prettifies address for display.""" + + if fmt == "human": + address_type = ' '.join(part for part in (address.get('address_type'), address.get('address_type_number')) if part) + address_type = ' '.join(filter(None, (address.get('address_type'), address.get('address_type_number')))) + + return f"{address['street_number']} {address['street_name']} {address_type} {address['city']}, {address['state']} {address['zip']} {address['country']}" + + if fmt == "usps": + name = ' '.join(part for part in (address.get('first_name'), address.get('last_name')) if part) + return { + 'name': name, + 'company': '', + 'address_1': f"{address['street_number']} {address['street_name']} {address.get('address_type', '')} {address.get('address_type_number', '')}", + 'address_2': '', + 'city': address['city'], + 'state': address['state'], + 'zipcode': address['zip'], + 'zipcode_ext': '', + 'phone': '' + } + return address + + +def add_e911_address(address, extension, auto_correct=True): + """Add proivded e911 address or present error""" + endpoint = "/e911s" + result = None + params = {} + address["address_type"] = address.get("address_type", "residential") + if address["address_type"].lower() == "residential": + address["address_type"] = "" + address[ + "label" + ] = f"E911 - {address['first_name']} {address['last_name']} - {extension}" + payload = address + + try: + logger.debug(f"Submitting:\n{json.dumps(payload, indent=4)}") + logger.info(f"Submitting: {format_address(address)}") + r = requests.post( + nwsip_url + endpoint, auth=nwsip_auth, json=payload, params=params + ) + if r.status_code == 422: + result = r.json()["data"] + logger.warning(f"Additional action required:") + logger.error(f"{json.dumps(result['errors'][0]['detail'], indent=4)}") + logger.warning( + f"If an address correction is required please submit the updated address. Suggestions shown above." + ) + + if auto_correct: + n_address = result["errors"][0]["detail"] + logger.warning(f"Submitting updated address change:\n{json.dumps(n_address, indent=4)}") + result = add_e911_address(n_address, extension, auto_correct=False) + return result + + if r.status_code != 201: + logger.critical(f"{r.status_code} {r.reason} {r.text}") + result = r.json() + logger.error(f"Error processing request: \n{json.dumps(result, indent=4)}") + return result + if r.status_code == 201: + logger.info(f"Successfully added E911 Address: {r.status_code} {r.reason} | {r.request.body}.") + logger.info(f"Be sure to associate the number with the address.") + except Exception as e: + logger.critical(f"Exception: {e}") + return False + return True + + +# Get E911 id by number. + + +def get_e911id_by_number(number, **kwargs): + """Returns E911 address ID by phone number.""" + result = None + try: + r = requests.get(nwsip_url + f"/numbers/{number}", auth=nwsip_auth) + if r.status_code not in [200]: + logger.warning(f"{r.status_code} {r.reason}\n{r.text}") + + result = r.json()["data"][0] + logger.debug(f"{json.dumps(result, indent=4)}") + # If subkey had e911_id then return. + relationships = result.get("relationships") + e911_address = relationships.get("e911_address") + if e911_address.get("data"): + return e911_address["data"]["id"] + except Exception as e: + logger.error(f"Error: {e}") + return result + + +def get_e911_address(address_id, **kwargs): + """Returns E911 address by address_id""" + params = {} + r = requests.get(nwsip_url + f"/e911s/{address_id}", auth=nwsip_auth, params=params) + if r.status_code != 200: + logger.error(f"Unable to return address: {r.status_code} {r.reason}") + return + data = r.json()["data"] + logger.info(f"{r.status_code} {r.reason}") + return data + + +def e911_associate_address(number, address_id): + endpoint = f"/e911s/{address_id}/associated/{number}" + logger.debug(f"{nwsip_url}{endpoint}") + try: + r = requests.patch(nwsip_url + endpoint, auth=nwsip_auth) + if r.status_code == 204: + return True + else: + logger.error(f"Error: {r.status_code} {r.reason} {r.text}") + except Exception as e: + logger.error(f"Exception: {e}") + return + + +def e911_unassociate(number): + endpoint = f"/e911s/{number}/deactivate" + try: + r = requests.delete(nwsip_url + endpoint, auth=nwsip_auth) + logger.debug(f"{r.status_code} {r.reason} {r.text}") + except Exception as e: + logger.error(f"Exception: {e}") + return r + + +def get_account(extension): + """Get account information on Switchvox extension""" + params = {} + r = requests.get( + swvx_url + f"/extensions/{extension}", auth=swvx_auth, params=params + ) + if r.status_code == 200: + data = r.json()["data"][0] + return data + return + + +def get_e911_info(e911_id, **kwargs): + base_url = nwsip_url + f"/e911s/{e911_id}" + r = requests.get(base_url, auth=nwsip_auth) + if r.status_code == 200: + logger.info(f"{r.json()}") + else: + logger.critical(f"Error: {r.status_code} {r.reason}") + + +def get_911_record(search_keyword, **kwargs): + base_url = nwsip_url + "/e911s/" + search_keyword = str(search_keyword) + r = requests.get(base_url, auth=nwsip_auth) + if r.status_code not in [200, 201, 204]: + logger.critical( + f"Unable to complete request. {r.status_code} {r.reason} {r.text}" + ) + sys.exit() + + # Loop thru to find matching subset. + # Returns single dict on match or array of all on no match + data = r.json()["data"] + for add in data: + if search_keyword in add.values(): + logger.info(f"Found keyword [{search_keyword}] match.") + data = add + break + # Expressly search the 'label' + if search_keyword in add["label"]: + logger.info(f"Fournd keyword [{search_keyword}] match.") + data = add + break + # Exit if its a list of everyone. + if not isinstance(data, dict): + logger.critical( + f"Unable to find single match. Please review the list of entries and specify a correct match." + ) + menu() + return data + + +def update_e911_address(e911_id, address, label=None, auto_correct=True, **kwargs): + """Update existing e911 record with provider + auto_correct will auto matically accept the proposed updated address. + """ + # data.pop('type', None) + # e911_id = data.pop('id', None) + # e911_alias = data.pop('alias', None) # Currently not used. + # curr_address = data.copy() + # new_address = address.copy() + # logger.debug(f"New Address:\n{json.dumps(new_address, indent=4)}") + + if not label: + extension = input(f"Enter the extension to use for the label: ") or "N/A" + label = f"E911 - {address['first_name']} {address['last_name']} - {extension}" + address["label"] = label + + # Validate address first. + r = requests.post(nwsip_url + "/e911s/validate", auth=nwsip_auth, json=address) + logger.info(f"{r.request.method} {r.status_code} {r.reason} - {r.url}") + + if r.status_code == 422: + result = r.json()["data"] + logger.warning(f"Additional action required:") + logger.critical( + f"Proposed address change:\n{json.dumps(result['errors'][0]['detail'], indent=4)}" + ) + logger.warning( + f"If an address correction is required please submit the updated address. Suggestions shown above." + ) + + if auto_correct: + n_address = result["errors"][0]["detail"] + result = update_e911_address( + e911_id, n_address, label=label, auto_correct=False + ) + return result + + if r.status_code not in [200, 201, 204]: + logger.critical( + f"Unable to validate address: {r.status_code} {r.reason}\n{json.dumps(r.json(), indent=4)}" + ) + return False + logger.info(f"Address validation passed.") + + r = requests.patch(nwsip_url + f"/e911s/{e911_id}", auth=nwsip_auth, json=address) + logger.debug(f"{r.status_code} {r.reason} - {r.text}") + return True + + +def remove_did(number, **kwargs): + base_url = nwsip_url + f"/numbers/{number}" + r = requests.delete(base_url, auth=nwsip_auth) + logger.debug(f"{r.request.method} {r.status_code} {r.reason} - {base_url}") + if r.status_code in [200, 201, 204]: + logger.critical(f"{r.json()}") + if r.status_code == 500: + logger.critical(f"{r.text}") + else: + logger.error(f"{r.json()}") + return + + +def remove_inbound_route(**kwargs): + return + + +def remove_outbound_caller_id(**kwargs): + return + + +def prompt_address_type(default_type=15, **kwargs): + """ + Generates numeric list to select from. + returns tuple 'address_type', 'address_type_number' + """ + for i, addy_type in enumerate(add_type_list): + print(f"{i:2}) {addy_type}") + address_type = ( + input( + f"Enter the number of the address type from the list above [{default_type}]: " + ) + or default_type + ) + address_type_number = "" + if int(address_type) not in [0, 15]: + while True: # Loop until address type number provided. + address_type_number = input(f"Enter address type number: ") + if address_type_number: + address_type_number = address_type_number.replace(" ", "_") + break + # Return values to textual response as expected by API. + return add_type_list[int(address_type)], address_type_number + + +def prompt_address(old_address=None, **kwargs): + """Prompt user for address details, shared input for adding/updating records""" + # TODO: If old address provided, prepoulate the 'new' address with the old address details. + + # Collect address info: + address = dict( + first_name=input(f"First Name [{old_address.get('first_name')}]: ") + or old_address.get("first_name"), + last_name=input(f"Last Name [{old_address.get('last_name')}]: ") + or old_address.get("last_name"), + street_number=input( + f"Street Number [{old_address.get('street_number')}]: " + ).upper() + or old_address.get("street_number"), + street_name=input(f"Street Name [{old_address.get('street_name')}]: ").upper() + or old_address.get("street_name"), + city=input(f"Enter City [{old_address.get('city')}]: ").upper() + or old_address.get("city"), + state=input(f"Enter 2 char state [{old_address.get('state')}]: ").upper() + or old_address.get("state"), + zip=input(f"Enter ZIP code [{old_address.get('zip')}]: ").upper() + or old_address.get("zip"), + country=input(f"Enter country: [US] ") or "US", + ) + + # Print list of address types: + # print(json.dumps(add_type_list, indent=4)) + + # address_type = input(f"Address Type (pick from above list) [Residential]: ") or 'Residential' + # if address_type == 'Residential': + # address_type_number = "" + # logger.warning(f"Address Type set to {address_type} no address_type_number can be added. Please verify your entry.") + # else: + # address_type_number = input(f"Enter address type number: ") or '' + + address_type, address_type_number = prompt_address_type() + address["address_type"] = address_type + address["address_type_number"] = address_type_number + + clean_address = {k: v.strip() for k, v in address.items()} + return clean_address + + +def menu_move(**kwargs): + """ + Generate menu prompts for moving a DID to new destination. + """ + return + + +def menu_caller_id(**kwargs): + return + + +def menu_update_address(**kwargs): + return + + +def menu_new_address(**kwargs): + return + + +def menu_remove_address(**kwargs): + return + + +def menu_assign_did(**kwargs): + return + + +def create_extension(**kwargs): + """Create a new user extension (sip)""" + # Prompt for template + # Prompt for group to assign to. + return + + +def remove_extension(**kwargs): + """Remove a user extension (sip)""" + # Prompt for template + # Prompt for group to remove from. + return + + +def list_templates(**kwargs): + """List templates for adding a new user""" + return + + +def list_groups(**kwargs): + """List all groups on PBX""" + return + + +def phone_location(**kwargs): + """List/Set/Clear phone location""" + # method: "switchvox.digiumPhones.locations.getList" + # method: "switchvox.outgoingCallRules.getList" # To show rule name. + + return + + +def phone_hotdesk(**kwargs): + """List/Set/Clear phone hotdesk""" + params = { + "request": { + "method": "switchvox.digiumPhones.assignments.update", + "parameters": { + "extension": "8105", + "ip": "10.16.1.140", + "model": "D60", + "configured_by": "server", + "account_id": "2852", + "assignment_id": "394", + "phone_type": "deskphone", + "mobile_assignment_tokens": {}, + "type": "assigned", + "mac": "00:0f:d3:0d:4d:6d", + "display": "Jean Knapp", + "location": "- - -", + "locationSortVal": "", + "displaySortVal": "Jean Knapp", + "isHotDesk": False, + "actions": { + "type": "div", + "key": None, + "ref": None, + "props": { + "className": "actions", + "children": { + "key": None, + "ref": None, + "props": { + "icon": "edit", + "disabled": False, + "tooltip": "Modify Phone Settings", + }, + "_owner": None, + }, + }, + "_owner": None, + }, + "location_id": "4", + "hot_desking": "0", + }, + } + } + + return + + +def phone_set_location(**kwargs): + """Set phone location""" + # method: "switchvox.digiumPhones.locations.update" + params = { + "cid_number": "", + "location_id": "", + "name": "", + "outgoing_call_rules": [ + {"id": "6", "state": "allow"}, + {"id": "133", "state": "deny"}, + ], + } + return + + +def menu(): + print("=" * 80) + print( + f"You are performing the following actions on {regcode.upper()} Tenant: {tenant}" + ) + action = input( + f"What action are you performing?\n\---------- PBX Actions ------------------------\n\ + T = Tenant Change \n\ + M = Move (redirect) DID. (from PBX {regcode.upper()})\n\ + C = Caller ID (from PBX {regcode.upper()} )\n\ + \n\ + ---------- SIP Provider actions ----------------\n\ + N = New E911 Address (from provider)\n\ + U = Update existing E911 Address (from provider)\n\ + R = Remove E911 record (from provider).\n\ + A = Assign DID to 911 Address (from provider).\n\ + RN = Release DID from account (from provider). \n\ + Q = Quit.\n\ + \n\ + ---------- Comming soon -------------------------\n\ + ---------- PBX Location/Hotdesking --------------\n\ + L = List/Set device E911 Location/Hotdesking.\n\ + H = Enable/Disable phone Hotdesking.\n\ + E = Create/Remove PBX extension.\n\ + G = Create/Remove PBX group.\n\ + Option: " + ) + if not action: + logger.critical(f"You must specifiy an action.") + sys.exit() + + action = action.upper() + outbound_did = None + inbound_did = None + extension = None + note = "Automated entry" + x_pos = config[ + "911_callrule_id" + ] # The position to place the new callrule in the list. + # Move DID + if action == "Q": + print(f"Good Bye!") + sys.exit() + if action == "T": + """Change tenant""" + set_tenant() + + if action == "M": # Move DID + inbound_did = input(f"Enter inbound DID to redirect: ") + extension = input(f"Enter the destination extension: ") or 7001 + account = get_account(extension) + account_name = account["display"] + account_id = account["account_id"] + if str(extension) == "7001": + x_pos = 736 # TODO: Set to last position / before ranged numbers. + position = input(f"What position to add new record to: [{x_pos}] ") or x_pos + if extension == "7001": + label = "Available" + else: # Real extension + label = f"911 - {account['display']}" + label = input(f"Enter label [{label}]: ") or label + if label.lower() == "sms": + label = f"SMS - {account['display']}" + note = input(f"Additional note: ") + result = redirect_did( + inbound_did=inbound_did, + dest_account_id=account_id, + note=note, + priority=position, + label=label, + ) + logger.info(f"result: {result}") + + # Offer to set Caller ID + set_caller_id = ( + input(f"Do you wish to set the outgoing caller ID information? [Y/n]: ") + or "y" + ) + if set_caller_id[:1].lower() == "n": + logger.warning( + f"You will need to manually set outbound caller ID information." + ) + return + logger.debug(f"Setting action to 'C'") + outbound_did = inbound_did + action = "C" + + if action == "C": # Set/update caller ID. + x_pos = config["911_callrule_id"] + out_call_rule_id = config["out_call_rule_id"] + + outbound_did = ( + input(f"Enter the outbound Caller ID Number [{outbound_did}]: ") + or outbound_did + ) + extension = ( + input( + f"Enter the destination extension (7001 to remove rule) [{extension}]: " + ) + or extension + ) + if extension == "7001": + logger.warning( + f"Removing outbound caller id rule associated with {outbound_did}." + ) + caller_id_rule = get_caller_id_rule(outbound_did) + approve = ( + input( + f"You are about to remove caller id rule {caller_id_rule} are you sure? [Y/n]" + ) + or "y" + ) + if approve.lower() == "y": + r = remove_caller_id_rule(caller_id_rule) + logger.debug(f"{r}") + return + account = get_account(extension) + account_name = account["display"] + account_id = account["account_id"] + caller_id_name = ( + input(f"Enter the Caller ID Name to be used [{account_name}]: ") + or account_name + ) + position = input(f"What position to add new record to: [{x_pos}] ") or x_pos + if extension != "7001": + label = f"911 - {account_name}" + else: # Extension = 7001 + label = f"Avaliable" + label = input(f"Enter label [{label}]: ") or label + note = input(f"Additional note [{note}]: ") or note + call_rule_id = ( + input( + f"If you know the specific outgoing_call_rule_id to apply this rule to [{out_call_rule_id}]: " + ) + or out_call_rule_id + ) + + print( + f""" + You are about to create an Outbound Caller ID Rule as such: 1) Calls made from extension '{extension}' to outgoing call_rule_id '{call_rule_id}' + 2) will be sent with the Caller ID Name as '{caller_id_name}' + 3) and the outgoing number as '{outbound_did}'. + 4) This rule will be placed at priority '{position}'. + """ + ) + approve = input(f"Do you wish to continue? [Y/n]: ") or "y" + if approve[:1].lower() == "n": + logger.warning(f"Caller ID rule aborted.") + return + + result = set_caller_id_rule( + outbound_did, caller_id_name, extension, note, label, position, call_rule_id + ) + + if action == "N": # New 911 Address + logger.info(f"Adding new E911 record.") + # inbound_did = input(f"Enter inbound DID to modify: ") + extension = input(f"Enter users internal extension: ") + + # Collect address info: + # - start info gather / replace w/ function. + address = dict( + first_name=input(f"First Name: "), + last_name=input(f"Last Name: "), + street_number=input(f"Street Number: ").upper(), + street_name=input(f"Street Name: ").upper(), + city=input(f"Enter City: ").upper(), + state=input(f"Enter 2 char state: ").upper(), + zip=input(f"Enter ZIP code: ").upper(), + country=input(f"Enter country: [US] ") or "US", + ) + # if not address['country']: + # address['country'] = 'US' + # # Print list of address types: + # print(json.dumps(add_type_list, indent=4)) + + # address_type = input(f"Address Type (pick from above list) [Residential]: ") or 'Residential' + # address_type_number = input(f"Enter address type number: ") or '' + # if address_type == 'Residential': + # address_type_number == "" + # logger.warning(f"Address Type set to {address_type} and is not allowed to have an address type number. Please verify your entry.") + + address_type, address_type_number = prompt_address_type() + address["address_type"] = address_type + address["address_type_number"] = address_type_number + + clean_address = {k: v.strip() for k, v in address.items()} + # - End info gather + + logger.info(f"\n{json.dumps(clean_address, indent=4)}") + logger.warning(f"You are about to add the following address information. ") + confirm = input(f"Is this information correct? [Y/n]") or "y" + if confirm[:1].lower() != "y": + logger.critical(f"No records added.") + return + + # Add E911 address and assigne number to address. + logger.info(f"Adding Address") + if not add_e911_address(address=clean_address, extension=extension): + logger.critical(f"Please contact support.") + return + assign = input(f"Do you want to assign this address to a DID? [Y/n]") or "y" + if assign[:1].lower() == "y": + inbound_did = input(f"Enter inbound DID to modify: ") + label = clean_address["label"] + e911_id = get_911_record(label) + if not isinstance(e911_id, list): + e911_id = [e911_id] + for addy in e911_id: + if not e911_associate_address(inbound_did, addy["id"]): + logger.critical(f"Please contact support.") + # Add Outbound Caller ID for E911 to PBX + + return + + if action == "U": # Update 911 address by DID + # logger.critical(f"Updating an existing address not currently supported.") + # sys.exit() + logger.info(f"Updating existing E911 record.") + inbound_did = input(f"Enter inbound DID to modify: ") + + address_id = get_e911id_by_number(inbound_did) + address = get_e911_address(address_id) + if not address: + logger.warning(f"No E911 address found.") + return + if not address["address_type"]: + address["address_type"] = "Residential" + logger.warning(f"{json.dumps(address, indent=4)}") + + proceed = ( + input( + f"Address record {address_id} is currently configured to the above values. Proceed? [Y/n]: " + ) + or "y" + ) + if proceed.lower() != "y": + sys.exit() + + if not inbound_did: + inbound_label = input(f"Enter part of inbound DID label to search for: ") + + if not inbound_did and not inbound_label: + logger.critical( + f"You must specify either an inbound DID or the inbound label." + ) + sys.exit() + + updated_address = prompt_address(address) + logger.debug(f"Updated Address: {updated_address}") + logger.info( + f""" + {updated_address['first_name']} {updated_address['last_name']} + {updated_address['street_number']} {updated_address['street_name']} + {updated_address['address_type']} {updated_address['address_type_number']} + {updated_address['city']}, {updated_address['state']}, {updated_address['zip']} + {updated_address['country']} + """ + ) + # If original address_type is changing to or from 'residential' you have to create a new + # address entry. + # if address['address_type'] == '' or updated_address['address_type'] == '': + if address["address_type"] != updated_address["address_type"]: + logger.error( + f"Address Type has changed. \"{address['address_type']} -> {updated_address['address_type']}\" Create a new address with the new information and then assign the DID to the new address. You can then remove the old address if required." + ) + # sys.exit() + response = update_e911_address(address_id, updated_address) + logger.info(f"{response}") + + if action == "R": # Remove 911 address by DID + logger.info(f"Removing E911 Address from provider.") + inbound_did = input(f"Enter inbound DID to modify: ") + + # TODO: Verify address being removed?? + # extension = input(f"Users extension: ") + + # Lookup the phone number and get the E911 ID + e911_id = get_e911id_by_number(inbound_did) + if isinstance(e911_id, dict): + logger.warning(f"No E911 address associated with the number {inbound_did}.") + # Lookup e911 address by aliass. + exten = input(f"Search for label value: ") + data = get_911_record(exten) + if data.get("id"): + logger.info(f"Address:\n{json.dumps(data, indent=4)}") + e911_id = data.get("id") + logger.info( + f"Attempting to remove address associated wtih e911_id {e911_id}" + ) + result = remove_e911_address(e911_id) + logger.info(result) + return + else: + logger.critical(f"Unable to process request.") + return + + if not e911_id.isnumeric(): + logger.warning(f"Unable to retrieve E911 ID associated with {inbound_did}.") + return + + # Unassociate the e911_id from the user. + logger.info(f"Unassociating e911 id {e911_id} with phone number {inbound_did}.") + r = e911_unassociate(inbound_did) + + # Remove the 911 address by e911_id + logger.info(f"Attempting to remove address associated wtih e911_id {e911_id}") + result = remove_e911_address(e911_id) + logger.info(result) + + # Prompt to remove from PBX. + return + + if action == "A": # Assign number to address + logger.info(f"Assigning DID to E911 Address.") + inbound_did = input(f"Enter inbound DID to modify: ") + e911_label = input(f"Enter e911 label/or part of label: ") + + e911_id = get_911_record(e911_label) + if not isinstance(e911_id, list): + e911_id = [e911_id] + for addy in e911_id: + logger.info(f"E911 Address results: {json.dumps(e911_id, indent=4)}") + confirm = input(f"Is this the record you want to use? [Y/n] ") or "y" + if confirm[:1].lower() == "y": + associate_id = addy["id"] + logger.info(f"Using E911 record {associate_id}.") + break + if not e911_associate_address(inbound_did, associate_id): + logger.critical(f"Please contact support.") + return + + if action == "RN": # Purge and release number + logger.critical( + f""" + This action will remove the phone number from your account. + You might not beable to retrieve the number again. + + This will also remove the number from your PBX: + - Incoming Call Routes + - Outgoing Caller ID Rules + + This action is non-reversable. + """ + ) + confirm = input(f"Do you wish to proceed? [y/N]: ") or "n" + if confirm.lower() == "n": + logger.info(f"aborted.") + return + + inbound_did = input(f"Enter inbound DID to modify: ") + remove_did(inbound_did) + + # Extension Management + if action == "E": + pass + # Prompt for template to use + + # Prompt for Extension data + # + + # Secret menu options. + if action == "L911": # Lookup 911 Address by label + logger.info(f"Lookup action not yet defined.") + pass + if action == "I911": # Lookup address by 911_id + """Lookup address info""" + e911_id = input(f"Enter the e911_id to lookup: ") + get_e911_info(e911_id) + + +if __name__ == "__main__": + try: + set_tenant() + while True: + menu() + except KeyboardInterrupt: + logger.info(f"Good bye!")