#!/usr/bin/env python3 # # (C) 2020 VOICE1 LLC # import datetime import requests import json import sys import os try: from loguru import logger except ImportError: import logging logger = logging.getLogger(__name__) # TODO: Set to config file. config_file = "config.json" if os.path.isfile(config_file): with open(config_file) as f: config_data = json.load(f) else: logger.critical(f"Unable to load config.json. Aborting.") # Generate empty config.json with open('config.json', 'w') as config: config.write( """ [ { "regcode": "", "company": "", "username": "", "password": "", "provider_username": "", "provider_password": "", "911_callrule_id": 0, "out_call_rule_id": 0, "redirect_ivr_id": "", "tenant": { "tenant1": { "username": "", "password": "", "provider_username": "", "provider_password": "", "911_callrule_id": 0, "out_call_rule_id": 0, "redirect_ivr_id": "0" } } } ] """ ) logger.info(f"Generated empty config.json file. Besure to populate it with required details before proceeding.") sys.exit() # Check and update current file to latest version. def write_file(filename, content): # put the content into the file with open(filename, 'wb') as fp: fp.write(content) # End update check add_type_list = [ "", "Apartment", "Basement", "Building", "Department", "Floor", "Front", "Key", "Lobby", "Lot", "Lower", "Office", "Penthouse", "Pier", "Rear", "Residential", "Room", "Side", "Slip", "Space", "Stop", "Suite", "Trailer", "Unit", "Upper", ] def create_helpdeskticket(**kwargs): """Open helpdesk ticket w/ VOICE1""" url = "https://api.voice1.me/tickets" params = { "email": "", "subject": "Helpdesk Ticket", "body": "Please call me", } r = requests.post(url, params=params) return def update_helpdeskticket(**kwargs): """update existing ticket with voice1""" params = {"ticket_id": "", "body": "Please call me", "status": "open"} r = requests.patch(url, params=params) return def get_nth_key(dictionary, n=0): if n < 0: n += len(dictionary) for i, key in enumerate(dictionary.keys()): if i == n: return key raise IndexError("dictionary index out of range") def set_tenant(**kwargs): global regcode global config global tenant global swvx_url global swvx_auth global nwsip_url global nwsip_auth # Get 1st REGCODE in config. for i, r in enumerate(config_data): print(f"{i}) {r['regcode']}") regcode_i = input(f"Select your 6 digit regcode [{0}]: ") or 0 config = config_data[int(regcode_i)] regcode = config["regcode"] # Populate Tenant Lists tenant = "None" if config.get("tenant"): tenants = config["tenant"] for i, t in enumerate(config["tenant"]): print(f"{i}) {t}") tenant = input(f"Select the tenant to modify or enter to skip: ") or tenant if tenant != "None": tenant = get_nth_key(config["tenant"], int(tenant)) config = config["tenant"][tenant] redirect_ivr_id = config.get("redirect_ivr_id") nwsip_url = "https://api.nwsip.com/v1" nwsip_auth = (config["provider_username"], config["provider_password"]) swvx_url = f"https://api.switchvoxuc.com/switchvox/{regcode.upper()}" swvx_auth = (config["username"], config["password"]) # Get Outbound 911 Caller ID Rule def get_caller_id_rule(phone_number=None): """ Returns cid_rule_id for matching outbound caller id number. Returns None if not found. """ params = {} url = swvx_url + f"/caller_ids/{phone_number}" r = requests.get(url, auth=swvx_auth, params=params) if r.status_code != 200: logger.error(f"Error: {r.status_code} {r.reason} | {r.text}") return None data = r.json()["data"] if data: for d in data: logger.info(f"CID Rule: {d['id']}") cid_rule_id = d["id"] return cid_rule_id # Create outbound caller id rule def set_caller_id_rule( caller_id_number, caller_id_name, extension, description="Automated", title="Outbound Caller ID", priority=6, call_rule_id=0, ): """ Set an outbound Caller ID Rule, if it exists update the rule. call_rule_id = 6 for 911 """ payload = { "outgoing_call_rule_id": call_rule_id, "caller_id_name": caller_id_name[:15], "caller_id_number": caller_id_number, "description": description, "rule_type": "single", "title": title, "priority": priority, "extension": extension, } # Lookup rule by caller ID. logger.debug(f"Looking up existing caller id rules matching {caller_id_number}") r = requests.get(swvx_url + f"/caller_ids/{caller_id_number}", auth=swvx_auth) if r.status_code != 200: logger.error(f"Error: {r.status_code} {r.reason} {r.text}") sys.exit() data = r.json()["data"] # logger.debug(f"data: {data}") if not data: logger.error(f"No existing rule returned") # Add new record. r = requests.post(swvx_url + f"/caller_ids", auth=swvx_auth, json=payload) if r.status_code != 201: logger.critical(f"Unable to create caller id rule") logger.debug(f"{r.text}") return logger.info(f"{r.json()}") return r.json()["data"] # Data exists, a.k.a. Need to update a rule? logger.warning(f"Rule already exists but no code to update.") # Quickly remove any 'dependents' from outgoing call rule so as to slim down the payload for d in data: d["outgoing_call_rule"].pop("dependents") logger.debug(f"\n{json.dumps(data, indent=4)}") logger.warning(f"Submit and review entry") return # Remove Outbound 911 Caller ID Rule def remove_caller_id_rule(cid_rule_id=None): """ :param cid_rule_id: The outbound caller id rule to specifiy for removal. :return: Returns .json() response. """ # !! Remove the caller id rule url = swvx_url + f"/caller_ids/remove/{cid_rule_id}" logger.critical(f"Removing outbound caller id: {url}") # logger.debug(json.dumps(d, indent=4)) params = {} r = requests.delete(url, auth=swvx_auth, params=params) if r.status_code != 201: logger.error(f"Error: {r.status_code} {r.reason} | {r.text}") return r.json() # Get inbound route of number def inbound_route_by_number(inbound_did): try: r = requests.get( swvx_url + f"/routes/inbound/number/{inbound_did}", auth=swvx_auth ) if r.status_code == 200: return r.json()["data"] except Exception as e: logger.error(f"Exception: {e}") return None # Redirect Inbound DID to new destination def redirect_did(inbound_did, dest_account_id, note="", priority=1, label="Available"): route = inbound_route_by_number(inbound_did=inbound_did) payload = { "name": label, "note": note, "priority": priority, "type": "route_number", "number": inbound_did, "any_provider": 0, "call_type": 0, "destination_account_id": dest_account_id, } if route: if not route["incoming_did_route"]: # If route does not exist we need to add a new route. logger.warning( f"No existing route for this number found. Adding a NEW inbound route." ) rr = requests.post( swvx_url + f"/routes/inbound/{inbound_did}/{dest_account_id}", json=payload, auth=swvx_auth, ) if rr.status_code == 201: return {"success": "submitted"} logger.critical(f"{rr.status_code} {rr.reason} {rr.text}") for r in route["incoming_did_route"]: note = f"Modified: {datetime.datetime.today()}. {note}" route_id = r["id"] logger.info( f"Modifying inbound route {route_id}, setting new destination to {dest_account_id}@{priority}" ) rr = requests.patch( swvx_url + f"/routes/inbound/{route_id}", json=payload, auth=swvx_auth ) if rr.status_code == 201: return {"success": "submitted"} logger.warning(f"Be sure to verify the priority order for proper routing.") return ####### # E911 Address functions (carrier side) # # Remove E911 Address associated w/ Number (Carrier side) def remove_e911_address(e911_number, **kwargs): """ Removes address association with the number provided. """ result = None endpoint = f"/e911s/{e911_number}" try: params = {} logger.debug(f"{nwsip_url + endpoint}") r = requests.delete(nwsip_url + endpoint, auth=nwsip_auth, params=params) result = r.json() except Exception as e: logger.error(f"Exception: {e}") return result # Add Address def format_address(address, fmt="human"): """Prettifies address for display.""" if fmt == "human": address_type = ' '.join(part for part in (address.get('address_type'), address.get('address_type_number')) if part) address_type = ' '.join(filter(None, (address.get('address_type'), address.get('address_type_number')))) return f"{address['street_number']} {address['street_name']} {address_type} {address['city']}, {address['state']} {address['zip']} {address['country']}" if fmt == "usps": name = ' '.join(part for part in (address.get('first_name'), address.get('last_name')) if part) return { 'name': name, 'company': '', 'address_1': f"{address['street_number']} {address['street_name']} {address.get('address_type', '')} {address.get('address_type_number', '')}", 'address_2': '', 'city': address['city'], 'state': address['state'], 'zipcode': address['zip'], 'zipcode_ext': '', 'phone': '' } return address def add_e911_address(address, extension, auto_correct=True): """Add proivded e911 address or present error""" endpoint = "/e911s" result = None params = {} address["address_type"] = address.get("address_type", "residential") if address["address_type"].lower() == "residential": address["address_type"] = "" address[ "label" ] = f"E911 - {address['first_name']} {address['last_name']} - {extension}" payload = address try: logger.debug(f"Submitting:\n{json.dumps(payload, indent=4)}") logger.info(f"Submitting: {format_address(address)}") r = requests.post( nwsip_url + endpoint, auth=nwsip_auth, json=payload, params=params ) if r.status_code == 422: result = r.json()["data"] logger.warning(f"Additional action required:") logger.error(f"{json.dumps(result['errors'][0]['detail'], indent=4)}") logger.warning( f"If an address correction is required please submit the updated address. Suggestions shown above." ) if auto_correct: n_address = result["errors"][0]["detail"] logger.warning(f"Submitting updated address change:\n{json.dumps(n_address, indent=4)}") result = add_e911_address(n_address, extension, auto_correct=False) return result if r.status_code != 201: logger.critical(f"{r.status_code} {r.reason} {r.text}") result = r.json() logger.error(f"Error processing request: \n{json.dumps(result, indent=4)}") return result if r.status_code == 201: logger.info(f"Successfully added E911 Address: {r.status_code} {r.reason} | {r.request.body}.") logger.info(f"Be sure to associate the number with the address.") except Exception as e: logger.critical(f"Exception: {e}") return False return True # Get E911 id by number. def get_e911id_by_number(number, **kwargs): """Returns E911 address ID by phone number.""" result = None try: r = requests.get(nwsip_url + f"/numbers/{number}", auth=nwsip_auth) if r.status_code not in [200]: logger.warning(f"{r.status_code} {r.reason}\n{r.text}") result = r.json()["data"][0] logger.debug(f"{json.dumps(result, indent=4)}") # If subkey had e911_id then return. relationships = result.get("relationships") e911_address = relationships.get("e911_address") if e911_address.get("data"): return e911_address["data"]["id"] except Exception as e: logger.error(f"Error: {e}") return result def get_e911_address(address_id, **kwargs): """Returns E911 address by address_id""" params = {} r = requests.get(nwsip_url + f"/e911s/{address_id}", auth=nwsip_auth, params=params) if r.status_code != 200: logger.error(f"Unable to return address: {r.status_code} {r.reason}") return data = r.json()["data"] logger.info(f"{r.status_code} {r.reason}") return data def e911_associate_address(number, address_id): endpoint = f"/e911s/{address_id}/associated/{number}" logger.debug(f"{nwsip_url}{endpoint}") try: r = requests.patch(nwsip_url + endpoint, auth=nwsip_auth) if r.status_code == 204: return True else: logger.error(f"Error: {r.status_code} {r.reason} {r.text}") except Exception as e: logger.error(f"Exception: {e}") return def e911_unassociate(number): endpoint = f"/e911s/{number}/deactivate" try: r = requests.delete(nwsip_url + endpoint, auth=nwsip_auth) logger.debug(f"{r.status_code} {r.reason} {r.text}") except Exception as e: logger.error(f"Exception: {e}") return r def get_account(extension): """Get account information on Switchvox extension""" params = {} r = requests.get( swvx_url + f"/extensions/{extension}", auth=swvx_auth, params=params ) if r.status_code == 200: data = r.json()["data"][0] return data return def get_e911_info(e911_id, **kwargs): base_url = nwsip_url + f"/e911s/{e911_id}" r = requests.get(base_url, auth=nwsip_auth) if r.status_code == 200: logger.info(f"{r.json()}") else: logger.critical(f"Error: {r.status_code} {r.reason}") def get_911_record(search_keyword, **kwargs): base_url = nwsip_url + "/e911s/" search_keyword = str(search_keyword) r = requests.get(base_url, auth=nwsip_auth) if r.status_code not in [200, 201, 204]: logger.critical( f"Unable to complete request. {r.status_code} {r.reason} {r.text}" ) sys.exit() # Loop thru to find matching subset. # Returns single dict on match or array of all on no match data = r.json()["data"] for add in data: if search_keyword in add.values(): logger.info(f"Found keyword [{search_keyword}] match.") data = add break # Expressly search the 'label' if search_keyword in add["label"]: logger.info(f"Fournd keyword [{search_keyword}] match.") data = add break # Exit if its a list of everyone. if not isinstance(data, dict): logger.critical( f"Unable to find single match. Please review the list of entries and specify a correct match." ) menu() return data def update_e911_address(e911_id, address, label=None, auto_correct=True, **kwargs): """Update existing e911 record with provider auto_correct will auto matically accept the proposed updated address. """ # data.pop('type', None) # e911_id = data.pop('id', None) # e911_alias = data.pop('alias', None) # Currently not used. # curr_address = data.copy() # new_address = address.copy() # logger.debug(f"New Address:\n{json.dumps(new_address, indent=4)}") if not label: extension = input(f"Enter the extension to use for the label: ") or "N/A" label = f"E911 - {address['first_name']} {address['last_name']} - {extension}" address["label"] = label # Validate address first. r = requests.post(nwsip_url + "/e911s/validate", auth=nwsip_auth, json=address) logger.info(f"{r.request.method} {r.status_code} {r.reason} - {r.url}") if r.status_code == 422: result = r.json()["data"] logger.warning(f"Additional action required:") logger.critical( f"Proposed address change:\n{json.dumps(result['errors'][0]['detail'], indent=4)}" ) logger.warning( f"If an address correction is required please submit the updated address. Suggestions shown above." ) if auto_correct: n_address = result["errors"][0]["detail"] result = update_e911_address( e911_id, n_address, label=label, auto_correct=False ) return result if r.status_code not in [200, 201, 204]: logger.critical( f"Unable to validate address: {r.status_code} {r.reason}\n{json.dumps(r.json(), indent=4)}" ) return False logger.info(f"Address validation passed.") r = requests.patch(nwsip_url + f"/e911s/{e911_id}", auth=nwsip_auth, json=address) logger.debug(f"{r.status_code} {r.reason} - {r.text}") return True def remove_did(number, **kwargs): base_url = nwsip_url + f"/numbers/{number}" r = requests.delete(base_url, auth=nwsip_auth) logger.debug(f"{r.request.method} {r.status_code} {r.reason} - {base_url}") if r.status_code in [200, 201, 204]: logger.critical(f"{r.json()}") if r.status_code == 500: logger.critical(f"{r.text}") else: logger.error(f"{r.json()}") return def remove_inbound_route(**kwargs): return def remove_outbound_caller_id(**kwargs): return def prompt_address_type(default_type=15, **kwargs): """ Generates numeric list to select from. returns tuple 'address_type', 'address_type_number' """ for i, addy_type in enumerate(add_type_list): print(f"{i:2}) {addy_type}") address_type = ( input( f"Enter the number of the address type from the list above [{default_type}]: " ) or default_type ) address_type_number = "" if int(address_type) not in [0, 15]: while True: # Loop until address type number provided. address_type_number = input(f"Enter address type number: ") if address_type_number: address_type_number = address_type_number.replace(" ", "_") break # Return values to textual response as expected by API. return add_type_list[int(address_type)], address_type_number def prompt_address(old_address=None, **kwargs): """Prompt user for address details, shared input for adding/updating records""" # TODO: If old address provided, prepoulate the 'new' address with the old address details. # Collect address info: address = dict( first_name=input(f"First Name [{old_address.get('first_name')}]: ") or old_address.get("first_name"), last_name=input(f"Last Name [{old_address.get('last_name')}]: ") or old_address.get("last_name"), street_number=input( f"Street Number [{old_address.get('street_number')}]: " ).upper() or old_address.get("street_number"), street_name=input(f"Street Name [{old_address.get('street_name')}]: ").upper() or old_address.get("street_name"), city=input(f"Enter City [{old_address.get('city')}]: ").upper() or old_address.get("city"), state=input(f"Enter 2 char state [{old_address.get('state')}]: ").upper() or old_address.get("state"), zip=input(f"Enter ZIP code [{old_address.get('zip')}]: ").upper() or old_address.get("zip"), country=input(f"Enter country: [US] ") or "US", ) # Print list of address types: # print(json.dumps(add_type_list, indent=4)) # address_type = input(f"Address Type (pick from above list) [Residential]: ") or 'Residential' # if address_type == 'Residential': # address_type_number = "" # logger.warning(f"Address Type set to {address_type} no address_type_number can be added. Please verify your entry.") # else: # address_type_number = input(f"Enter address type number: ") or '' address_type, address_type_number = prompt_address_type() address["address_type"] = address_type address["address_type_number"] = address_type_number clean_address = {k: v.strip() for k, v in address.items()} return clean_address def menu_move(**kwargs): """ Generate menu prompts for moving a DID to new destination. """ return def menu_caller_id(**kwargs): return def menu_update_address(**kwargs): return def menu_new_address(**kwargs): return def menu_remove_address(**kwargs): return def menu_assign_did(**kwargs): return def create_extension(**kwargs): """Create a new user extension (sip)""" # Prompt for template # Prompt for group to assign to. return def remove_extension(**kwargs): """Remove a user extension (sip)""" # Prompt for template # Prompt for group to remove from. return def list_templates(**kwargs): """List templates for adding a new user""" return def list_groups(**kwargs): """List all groups on PBX""" return def phone_location(**kwargs): """List/Set/Clear phone location""" # method: "switchvox.digiumPhones.locations.getList" # method: "switchvox.outgoingCallRules.getList" # To show rule name. return def phone_hotdesk(**kwargs): """List/Set/Clear phone hotdesk""" params = { "request": { "method": "switchvox.digiumPhones.assignments.update", "parameters": { "extension": "8105", "ip": "10.16.1.140", "model": "D60", "configured_by": "server", "account_id": "2852", "assignment_id": "394", "phone_type": "deskphone", "mobile_assignment_tokens": {}, "type": "assigned", "mac": "00:0f:d3:0d:4d:6d", "display": "Jean Knapp", "location": "- - -", "locationSortVal": "", "displaySortVal": "Jean Knapp", "isHotDesk": False, "actions": { "type": "div", "key": None, "ref": None, "props": { "className": "actions", "children": { "key": None, "ref": None, "props": { "icon": "edit", "disabled": False, "tooltip": "Modify Phone Settings", }, "_owner": None, }, }, "_owner": None, }, "location_id": "4", "hot_desking": "0", }, } } return def phone_set_location(**kwargs): """Set phone location""" # method: "switchvox.digiumPhones.locations.update" params = { "cid_number": "", "location_id": "", "name": "", "outgoing_call_rules": [ {"id": "6", "state": "allow"}, {"id": "133", "state": "deny"}, ], } return def menu(): print("=" * 80) print( f"You are performing the following actions on {regcode.upper()} Tenant: {tenant}" ) action = input( f"What action are you performing?\n\---------- PBX Actions ------------------------\n\ T = Tenant Change \n\ M = Move (redirect) DID. (from PBX {regcode.upper()})\n\ C = Caller ID (from PBX {regcode.upper()} )\n\ \n\ ---------- SIP Provider actions ----------------\n\ N = New E911 Address (from provider)\n\ U = Update existing E911 Address (from provider)\n\ R = Remove E911 record (from provider).\n\ A = Assign DID to 911 Address (from provider).\n\ RN = Release DID from account (from provider). \n\ Q = Quit.\n\ \n\ ---------- Comming soon -------------------------\n\ ---------- PBX Location/Hotdesking --------------\n\ L = List/Set device E911 Location/Hotdesking.\n\ H = Enable/Disable phone Hotdesking.\n\ E = Create/Remove PBX extension.\n\ G = Create/Remove PBX group.\n\ Option: " ) if not action: logger.critical(f"You must specifiy an action.") sys.exit() action = action.upper() outbound_did = None inbound_did = None extension = None note = "Automated entry" x_pos = config[ "911_callrule_id" ] # The position to place the new callrule in the list. # Move DID if action == "Q": print(f"Good Bye!") sys.exit() if action == "T": """Change tenant""" set_tenant() if action == "M": # Move DID inbound_did = input(f"Enter inbound DID to redirect: ") extension = input(f"Enter the destination extension: ") or 7001 account = get_account(extension) account_name = account["display"] account_id = account["account_id"] if str(extension) == "7001": x_pos = 736 # TODO: Set to last position / before ranged numbers. position = input(f"What position to add new record to: [{x_pos}] ") or x_pos if extension == "7001": label = "Available" else: # Real extension label = f"911 - {account['display']}" label = input(f"Enter label [{label}]: ") or label if label.lower() == "sms": label = f"SMS - {account['display']}" note = input(f"Additional note: ") result = redirect_did( inbound_did=inbound_did, dest_account_id=account_id, note=note, priority=position, label=label, ) logger.info(f"result: {result}") # Offer to set Caller ID set_caller_id = ( input(f"Do you wish to set the outgoing caller ID information? [Y/n]: ") or "y" ) if set_caller_id[:1].lower() == "n": logger.warning( f"You will need to manually set outbound caller ID information." ) return logger.debug(f"Setting action to 'C'") outbound_did = inbound_did action = "C" if action == "C": # Set/update caller ID. x_pos = config["911_callrule_id"] out_call_rule_id = config["out_call_rule_id"] outbound_did = ( input(f"Enter the outbound Caller ID Number [{outbound_did}]: ") or outbound_did ) extension = ( input( f"Enter the destination extension (7001 to remove rule) [{extension}]: " ) or extension ) if extension == "7001": logger.warning( f"Removing outbound caller id rule associated with {outbound_did}." ) caller_id_rule = get_caller_id_rule(outbound_did) approve = ( input( f"You are about to remove caller id rule {caller_id_rule} are you sure? [Y/n]" ) or "y" ) if approve.lower() == "y": r = remove_caller_id_rule(caller_id_rule) logger.debug(f"{r}") return account = get_account(extension) account_name = account["display"] account_id = account["account_id"] caller_id_name = ( input(f"Enter the Caller ID Name to be used [{account_name}]: ") or account_name ) position = input(f"What position to add new record to: [{x_pos}] ") or x_pos if extension != "7001": label = f"911 - {account_name}" else: # Extension = 7001 label = f"Avaliable" label = input(f"Enter label [{label}]: ") or label note = input(f"Additional note [{note}]: ") or note call_rule_id = ( input( f"If you know the specific outgoing_call_rule_id to apply this rule to [{out_call_rule_id}]: " ) or out_call_rule_id ) print( f""" You are about to create an Outbound Caller ID Rule as such: 1) Calls made from extension '{extension}' to outgoing call_rule_id '{call_rule_id}' 2) will be sent with the Caller ID Name as '{caller_id_name}' 3) and the outgoing number as '{outbound_did}'. 4) This rule will be placed at priority '{position}'. """ ) approve = input(f"Do you wish to continue? [Y/n]: ") or "y" if approve[:1].lower() == "n": logger.warning(f"Caller ID rule aborted.") return result = set_caller_id_rule( outbound_did, caller_id_name, extension, note, label, position, call_rule_id ) if action == "N": # New 911 Address logger.info(f"Adding new E911 record.") # inbound_did = input(f"Enter inbound DID to modify: ") extension = input(f"Enter users internal extension: ") # Collect address info: # - start info gather / replace w/ function. address = dict( first_name=input(f"First Name: "), last_name=input(f"Last Name: "), street_number=input(f"Street Number: ").upper(), street_name=input(f"Street Name: ").upper(), city=input(f"Enter City: ").upper(), state=input(f"Enter 2 char state: ").upper(), zip=input(f"Enter ZIP code: ").upper(), country=input(f"Enter country: [US] ") or "US", ) # if not address['country']: # address['country'] = 'US' # # Print list of address types: # print(json.dumps(add_type_list, indent=4)) # address_type = input(f"Address Type (pick from above list) [Residential]: ") or 'Residential' # address_type_number = input(f"Enter address type number: ") or '' # if address_type == 'Residential': # address_type_number == "" # logger.warning(f"Address Type set to {address_type} and is not allowed to have an address type number. Please verify your entry.") address_type, address_type_number = prompt_address_type() address["address_type"] = address_type address["address_type_number"] = address_type_number clean_address = {k: v.strip() for k, v in address.items()} # - End info gather logger.info(f"\n{json.dumps(clean_address, indent=4)}") logger.warning(f"You are about to add the following address information. ") confirm = input(f"Is this information correct? [Y/n]") or "y" if confirm[:1].lower() != "y": logger.critical(f"No records added.") return # Add E911 address and assigne number to address. logger.info(f"Adding Address") if not add_e911_address(address=clean_address, extension=extension): logger.critical(f"Please contact support.") return assign = input(f"Do you want to assign this address to a DID? [Y/n]") or "y" if assign[:1].lower() == "y": inbound_did = input(f"Enter inbound DID to modify: ") label = clean_address["label"] e911_id = get_911_record(label) if not isinstance(e911_id, list): e911_id = [e911_id] for addy in e911_id: if not e911_associate_address(inbound_did, addy["id"]): logger.critical(f"Please contact support.") # Add Outbound Caller ID for E911 to PBX return if action == "U": # Update 911 address by DID # logger.critical(f"Updating an existing address not currently supported.") # sys.exit() logger.info(f"Updating existing E911 record.") inbound_did = input(f"Enter inbound DID to modify: ") address_id = get_e911id_by_number(inbound_did) address = get_e911_address(address_id) if not address: logger.warning(f"No E911 address found.") return if not address["address_type"]: address["address_type"] = "Residential" logger.warning(f"{json.dumps(address, indent=4)}") proceed = ( input( f"Address record {address_id} is currently configured to the above values. Proceed? [Y/n]: " ) or "y" ) if proceed.lower() != "y": sys.exit() if not inbound_did: inbound_label = input(f"Enter part of inbound DID label to search for: ") if not inbound_did and not inbound_label: logger.critical( f"You must specify either an inbound DID or the inbound label." ) sys.exit() updated_address = prompt_address(address) logger.debug(f"Updated Address: {updated_address}") logger.info( f""" {updated_address['first_name']} {updated_address['last_name']} {updated_address['street_number']} {updated_address['street_name']} {updated_address['address_type']} {updated_address['address_type_number']} {updated_address['city']}, {updated_address['state']}, {updated_address['zip']} {updated_address['country']} """ ) # If original address_type is changing to or from 'residential' you have to create a new # address entry. # if address['address_type'] == '' or updated_address['address_type'] == '': if address["address_type"] != updated_address["address_type"]: logger.error( f"Address Type has changed. \"{address['address_type']} -> {updated_address['address_type']}\" Create a new address with the new information and then assign the DID to the new address. You can then remove the old address if required." ) # sys.exit() response = update_e911_address(address_id, updated_address) logger.info(f"{response}") if action == "R": # Remove 911 address by DID logger.info(f"Removing E911 Address from provider.") inbound_did = input(f"Enter inbound DID to modify: ") # TODO: Verify address being removed?? # extension = input(f"Users extension: ") # Lookup the phone number and get the E911 ID e911_id = get_e911id_by_number(inbound_did) if isinstance(e911_id, dict): logger.warning(f"No E911 address associated with the number {inbound_did}.") # Lookup e911 address by aliass. exten = input(f"Search for label value: ") data = get_911_record(exten) if data.get("id"): logger.info(f"Address:\n{json.dumps(data, indent=4)}") e911_id = data.get("id") logger.info( f"Attempting to remove address associated wtih e911_id {e911_id}" ) result = remove_e911_address(e911_id) logger.info(result) return else: logger.critical(f"Unable to process request.") return if not e911_id.isnumeric(): logger.warning(f"Unable to retrieve E911 ID associated with {inbound_did}.") return # Unassociate the e911_id from the user. logger.info(f"Unassociating e911 id {e911_id} with phone number {inbound_did}.") r = e911_unassociate(inbound_did) # Remove the 911 address by e911_id logger.info(f"Attempting to remove address associated wtih e911_id {e911_id}") result = remove_e911_address(e911_id) logger.info(result) # Prompt to remove from PBX. return if action == "A": # Assign number to address logger.info(f"Assigning DID to E911 Address.") inbound_did = input(f"Enter inbound DID to modify: ") e911_label = input(f"Enter e911 label/or part of label: ") e911_id = get_911_record(e911_label) if not isinstance(e911_id, list): e911_id = [e911_id] for addy in e911_id: logger.info(f"E911 Address results: {json.dumps(e911_id, indent=4)}") confirm = input(f"Is this the record you want to use? [Y/n] ") or "y" if confirm[:1].lower() == "y": associate_id = addy["id"] logger.info(f"Using E911 record {associate_id}.") break if not e911_associate_address(inbound_did, associate_id): logger.critical(f"Please contact support.") return if action == "RN": # Purge and release number logger.critical( f""" This action will remove the phone number from your account. You might not beable to retrieve the number again. This will also remove the number from your PBX: - Incoming Call Routes - Outgoing Caller ID Rules This action is non-reversable. """ ) confirm = input(f"Do you wish to proceed? [y/N]: ") or "n" if confirm.lower() == "n": logger.info(f"aborted.") return inbound_did = input(f"Enter inbound DID to modify: ") remove_did(inbound_did) # Extension Management if action == "E": pass # Prompt for template to use # Prompt for Extension data # # Secret menu options. if action == "L911": # Lookup 911 Address by label logger.info(f"Lookup action not yet defined.") pass if action == "I911": # Lookup address by 911_id """Lookup address info""" e911_id = input(f"Enter the e911_id to lookup: ") get_e911_info(e911_id) if __name__ == "__main__": try: set_tenant() while True: menu() except KeyboardInterrupt: logger.info(f"Good bye!")