#!/usr/bin/env python3 # # (C) 2020 VOICE1 LLC # import datetime from dis import dis import requests import json import sys import os from getpass import getpass try: from loguru import logger logger.add("app.log", rotation="1 month", level="DEBUG") except ImportError: import logging logger = logging.getLogger(__name__) class Colors: """ ANSI color codes """ BLACK = "\033[0;30m" RED = "\033[0;31m" GREEN = "\033[0;32m" BROWN = "\033[0;33m" BLUE = "\033[0;34m" PURPLE = "\033[0;35m" CYAN = "\033[0;36m" LIGHT_GRAY = "\033[0;37m" DARK_GRAY = "\033[1;30m" LIGHT_RED = "\033[1;31m" LIGHT_GREEN = "\033[1;32m" YELLOW = "\033[1;33m" LIGHT_BLUE = "\033[1;34m" LIGHT_PURPLE = "\033[1;35m" LIGHT_CYAN = "\033[1;36m" LIGHT_WHITE = "\033[1;37m" BOLD = "\033[1m" FAINT = "\033[2m" ITALIC = "\033[3m" UNDERLINE = "\033[4m" BLINK = "\033[5m" NEGATIVE = "\033[7m" CROSSED = "\033[9m" END = "\033[0m" # cancel SGR codes if we don't write to a terminal if not __import__("sys").stdout.isatty(): for _ in dir(): if isinstance(_, str) and _[0] != "_": locals()[_] = "" else: # set Windows console in VT mode if __import__("platform").system() == "Windows": kernel32 = __import__("ctypes").windll.kernel32 kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7) del kernel32 def show_colors(): for i in dir(Colors): if i[0:1] != "_" and i != "END": print("{:>16} {}".format(i, getattr(Colors, i) + i + Colors.END)) # Show and confirm disclaimer disclaimer_accepted = False def show_disclaimer(): print( f""" {Colors.BOLD} {Colors.RED}{Colors.BLINK}WARNING {Colors.END}{Colors.LIGHT_WHITE}- This script is provided "AS-IS" for use as a {Colors.ITALIC}DEMO ONLY.{Colors.END}{Colors.LIGHT_WHITE} It is intended to illistrate how to use the Voice1 APIs. By using this script, you agree to the terms of use and acknowledge that you are responsible for any actions taken. Neither VOICE1 nor VOICE1’s resellers, contractors, or associates shall be responsible for providing Services to the extent that the issue is caused by (a) Customer’s misuse, improper use, mis-configuration, alteration, or damage to the Software; (b) Customer’s use of the Software with any hardware or software not supplied or supported by VOICE1; (c) Customer’s failure to install an update to the Software if such update would have resolved the issue; or (d) uses in a manner not in accordance with the Agreement. VOICE1 shall have no responsibility for loss of or damage to Customer’s data or loss of business, regardless of the cause of any such loss or damage. TO THE MAXIMUM EXTENT PERMITTED BY APPLICABLE LAW, VOICE1 AND ITS RESELLERS DISCLAIM ALL WARRANTIES AND CONDITIONS, EITHER EXPRESS OR IMPLIED, INCLUDING, BUT NOT LIMITED TO THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND TITLE AND NON-INFRINGEMENT, WITH REGARD TO THE SOFTWARE. AND SERVICES. VOICE1 AND RESELLERS DO NOT GUARANTEE THAT THE OPERATION OF THE SOFTWARE OR ANY OTHER CODE WILL BE UNINTERRUPTED OR ERROR-FREE, AND CUSTOMER ACKNOWLEDGE THAT IT IS NOT TECHNICALLY PRACTICABLE FOR US TO DO SO. TO THE MAXIMUM EXTENT PERMITTED BY APPLICABLE LAW, IN NO EVENT SHALL VOICE1 OR ITS RESELLERS BE LIABLE UNDER ANY LEGAL OR EQUITABLE THEORY FOR ANY SPECIAL, INCIDENTAL, INDIRECT OR CONSEQUENTIAL DAMAGES WHATSOEVER (INCLUDING, WITHOUT LIMITATION, DAMAGES FOR LOSS OF BUSINESS PROFITS, BUSINESS INTERRUPTION, LOSS OF BUSINESS INFORMATION OR ANY OTHER PECUNIARY LAW) ARISING OUT OF THE USE OF OR INABILITY TO USE THE SOFTWARE OR THE SERVICES OR ANY OTHER SUBJECT MATTER RELATING TO THIS AGREEMENT, EVEN IF VOICE1 OR RESELLERS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES. IN ANY CASE, VOICE 1'S AND RESELLER'S’ ENTIRE LIABILITY WITH RESPECT TO ANY SUBJECT MATTER RELATING TO THIS AGREEMENT SHALL BE LIMITED TO THE AMOUNT ACTUALLY PAID FOR THE SERVICES {Colors.END} """ ) add_type_list = [ "", "Apartment", "Basement", "Building", "Department", "Floor", "Front", "Key", "Lobby", "Lot", "Lower", "Office", "Penthouse", "Pier", "Rear", "Residential", "Room", "Side", "Slip", "Space", "Stop", "Suite", "Trailer", "Unit", "Upper", ] def prompt_credentials(config): """Prompt for credentials""" print(f"{Colors.LIGHT_GREEN}DO NOT SHARE YOUR CREDENTIALS WITH ANYONE.{Colors.END}") config["username"] = input("Enter your api.switchvoxuc.com username: ") config["password"] = getpass("Enter your api.switchvoxuc.com password: ") config["provider_username"] = input("Enter your api.nwsip.com username: ") config["provider_password"] = getpass("Enter your api.nwsip.com password: ") print(f"{Colors.LIGHT_GREEN}Configure additoinal tenant credentials.{Colors.END}") tenant = input("Enter your tenant name or enter for None: ") or None if tenant is not None: tenant = tenant.upper() config["tenant"] = {tenant: {}} config["tenant"][tenant]["username"] = input( f"Enter your {Colors.LIGHT_CYAN}{tenant}{Colors.END} api.switchvoxuc.com username: " ) config["tenant"][tenant]["password"] = getpass( f"Enter your {Colors.LIGHT_CYAN}{tenant}{Colors.END} api.switchvoxuc.com password: " ) config["tenant"][tenant]["provider_username"] = input( f"Enter your {Colors.LIGHT_CYAN}{tenant}{Colors.END} api.nwsip.com username: " ) config["tenant"][tenant]["provider_password"] = getpass( f"Enter your {Colors.LIGHT_CYAN}{tenant}{Colors.END} api.nwsip.com password: " ) return config def create_helpdeskticket(**kwargs): """Open helpdesk ticket w/ VOICE1""" url = "https://api.voice1.me/tickets" params = { "email": "", "subject": "Helpdesk Ticket", "body": "Please call me", } r = requests.post(url, params=params) return def update_helpdeskticket(**kwargs): """update existing ticket with voice1""" params = {"ticket_id": "", "body": "Please call me", "status": "open"} r = requests.patch(url, params=params) return def get_nth_key(dictionary, n=0): if n < 0: n += len(dictionary) for i, key in enumerate(dictionary.keys()): if i == n: return key raise IndexError("dictionary index out of range") def build_config(): config = {} regcode = input(f"Enter your 6 digit regcode: ") or None e911_callrule_id = int(input(f"Enter the default 'call_rule_id' for dialing 911 to use: ")) or 6 priority = int(input(f"Enter the default 'position' to add caller_id_rules: ")) or 0 redirect_extension = int(input(f"Enter the default 'extension number' to redirect calls to: ")) or 7001 return [config] def set_tenant(**kwargs): global regcode global config global tenant global swvx_url global swvx_auth global nwsip_url global nwsip_auth # Empty config.json so build one in memory. if not config_data: config_data = build_config() # Get 1st REGCODE in config. for i, r in enumerate(config_data): print(f"{i}) {r['regcode']}") regcode_i = input(f"Select your 6 digit regcode [{0}]: ") or 0 config = config_data[int(regcode_i)] regcode = config["regcode"] # Populate Tenant Lists tenant = "None" if config.get("tenant"): tenants = config["tenant"] for i, t in enumerate(config["tenant"]): print(f"{i}) {t}") tenant = input(f"Select the tenant to modify or enter to skip: ") or tenant if tenant != "None": tenant = get_nth_key(config["tenant"], int(tenant)) config = config["tenant"][tenant] redirect_ivr_id = config.get("redirect_ivr_id") nwsip_url = "https://api.nwsip.com/v1" nwsip_auth = (config["provider_username"], config["provider_password"]) swvx_url = f"https://api.switchvoxuc.com/switchvox/{regcode.upper()}" swvx_auth = (config["username"], config["password"]) # Get Outbound 911 Caller ID Rule def get_caller_id_rule(phone_number=None): """ Returns cid_rule_id for matching outbound caller id number. Returns None if not found. """ params = {} url = swvx_url + f"/caller_ids/{phone_number}" r = requests.get(url, auth=swvx_auth, params=params) if r.status_code != 200: logger.error(f"Error: {r.status_code} {r.reason} | {r.text}") return None data = r.json()["data"] if data: for d in data: logger.info(f"CID Rule: {d['id']}") cid_rule_id = d["id"] return cid_rule_id # Create outbound caller id rule def set_caller_id_rule( caller_id_number, caller_id_name, extension, description="Automated", title="Outbound Caller ID", priority=6, call_rule_id=0, ): """ Set an outbound Caller ID Rule, if it exists update the rule. call_rule_id = 6 for 911 """ payload = { "outgoing_call_rule_id": call_rule_id, "caller_id_name": caller_id_name[:15], "caller_id_number": caller_id_number, "description": description, "rule_type": "single", "title": title, "priority": priority, "extension": extension, } # Lookup rule by caller ID. logger.debug(f"Looking up existing caller id rules matching {caller_id_number}") r = requests.get(swvx_url + f"/caller_ids/{caller_id_number}", auth=swvx_auth) if r.status_code != 200: logger.error(f"Error: {r.status_code} {r.reason} {r.text}") sys.exit() data = r.json()["data"] # logger.debug(f"data: {data}") if not data: logger.error(f"No existing rule returned") # Add new record. r = requests.post(swvx_url + f"/caller_ids", auth=swvx_auth, json=payload) if r.status_code != 201: logger.critical(f"Unable to create caller id rule") logger.debug(f"{r.text}") return logger.info(f"{r.json()}") return r.json()["data"] # Data exists, a.k.a. Need to update a rule? logger.warning(f"Rule already exists but no code to update.") # Quickly remove any 'dependents' from outgoing call rule so as to slim down the payload for d in data: d["outgoing_call_rule"].pop("dependents") logger.debug(f"\n{json.dumps(data, indent=4)}") logger.warning(f"Submit and review entry") return # Remove Outbound 911 Caller ID Rule def remove_caller_id_rule(cid_rule_id=None): """ :param cid_rule_id: The outbound caller id rule to specifiy for removal. :return: Returns .json() response. """ # !! Remove the caller id rule url = swvx_url + f"/caller_ids/remove/{cid_rule_id}" logger.critical(f"Removing outbound caller id: {url}") params = {} r = requests.delete(url, auth=swvx_auth, params=params) if r.status_code != 201: logger.error(f"Error: {r.status_code} {r.reason} | {r.text}") return r.json() # Get inbound route of number def inbound_route_by_number(inbound_did): try: r = requests.get( swvx_url + f"/routes/inbound/number/{inbound_did}", auth=swvx_auth ) if r.status_code == 200: return r.json()["data"] except Exception as e: logger.error(f"Exception: {e}") return None # Redirect Inbound DID to new destination def redirect_did(inbound_did, dest_account_id, note="", priority=1, label="Available"): route = inbound_route_by_number(inbound_did=inbound_did) payload = { "name": label, "note": note, "priority": priority, "type": "route_number", "number": inbound_did, "any_provider": 0, "call_type": 0, "destination_account_id": dest_account_id, } if route: if not route["incoming_did_route"]: # If route does not exist we need to add a new route. logger.warning( f"No existing route for this number found. Adding a NEW inbound route." ) rr = requests.post( swvx_url + f"/routes/inbound/{inbound_did}/{dest_account_id}", json=payload, auth=swvx_auth, ) if rr.status_code == 201: return {"success": "submitted"} logger.critical(f"{rr.status_code} {rr.reason} {rr.text}") for r in route["incoming_did_route"]: note = f"Modified: {datetime.datetime.today()}. {note}" route_id = r["id"] logger.info( f"Modifying inbound route {route_id}, setting new destination to {dest_account_id}@{priority}" ) rr = requests.patch( swvx_url + f"/routes/inbound/{route_id}", json=payload, auth=swvx_auth ) if rr.status_code == 201: return {"success": "submitted"} logger.warning(f"Be sure to verify the priority order for proper routing.") return ####### # E911 Address functions (carrier side) # # Remove E911 Address associated w/ Number (Carrier side) def remove_e911_address(e911_number, **kwargs): """ Removes address association with the number provided. """ result = None endpoint = f"/e911s/{e911_number}" try: params = {} logger.debug(f"{nwsip_url + endpoint}") r = requests.delete(nwsip_url + endpoint, auth=nwsip_auth, params=params) result = r.json() except Exception as e: logger.error(f"Exception: {e}") return result # Add Address def format_address(address, fmt="human"): """Prettifies address for display.""" if fmt == "human": address_type = " ".join( part for part in ( address.get("address_type"), address.get("address_type_number"), ) if part ) address_type = " ".join( filter( None, (address.get("address_type"), address.get("address_type_number")) ) ) return f"{address['street_number']} {address['street_name']} {address_type} {address['city']}, {address['state']} {address['zip']} {address['country']}" if fmt == "usps": name = " ".join( part for part in (address.get("first_name"), address.get("last_name")) if part ) return { "name": name, "company": "", "address_1": f"{address['street_number']} {address['street_name']} {address.get('address_type', '')} {address.get('address_type_number', '')}", "address_2": "", "city": address["city"], "state": address["state"], "zipcode": address["zip"], "zipcode_ext": "", "phone": "", } return address def add_e911_address(address, extension, auto_correct=True): """Add proivded e911 address or present error""" endpoint = "/e911s" result = None params = {} address["address_type"] = address.get("address_type", "residential") if address["address_type"].lower() == "residential": address["address_type"] = "" address[ "label" ] = f"E911 - {address['first_name']} {address['last_name']} - {extension}" payload = address try: logger.debug(f"Submitting:\n{json.dumps(payload, indent=4)}") logger.info(f"Submitting: {format_address(address)}") r = requests.post( nwsip_url + endpoint, auth=nwsip_auth, json=payload, params=params ) if r.status_code == 422: result = r.json()["data"] logger.warning(f"Additional action required:") logger.error(f"{json.dumps(result['errors'][0]['detail'], indent=4)}") logger.warning( f"If an address correction is required please submit the updated address. Suggestions shown above." ) if auto_correct: n_address = result["errors"][0]["detail"] logger.warning( f"Submitting updated address change:\n{json.dumps(n_address, indent=4)}" ) result = add_e911_address(n_address, extension, auto_correct=False) return result if r.status_code != 201: logger.critical(f"{r.status_code} {r.reason} {r.text}") result = r.json() logger.error(f"Error processing request: \n{json.dumps(result, indent=4)}") return result if r.status_code == 201: logger.info( f"Successfully added E911 Address: {r.status_code} {r.reason} | {r.request.body}." ) logger.info(f"Be sure to associate the number with the address.") except Exception as e: logger.critical(f"Exception: {e}") return False return True # Get E911 id by number. def get_e911id_by_number(number, **kwargs): """Returns E911 address ID by phone number.""" result = None try: r = requests.get(nwsip_url + f"/numbers/{number}", auth=nwsip_auth) if r.status_code not in [200]: logger.warning(f"{r.status_code} {r.reason}\n{r.text}") result = r.json()["data"][0] logger.debug(f"{json.dumps(result, indent=4)}") # If subkey had e911_id then return. relationships = result.get("relationships") e911_address = relationships.get("e911_address") if e911_address.get("data"): return e911_address["data"]["id"] except Exception as e: logger.error(f"Error: {e}") return result def get_e911_address(address_id, **kwargs): """Returns E911 address by address_id""" params = {} r = requests.get(nwsip_url + f"/e911s/{address_id}", auth=nwsip_auth, params=params) if r.status_code != 200: logger.error(f"Unable to return address: {r.status_code} {r.reason}") return data = r.json()["data"] logger.info(f"{r.status_code} {r.reason}") return data def e911_associate_address(number, address_id): endpoint = f"/e911s/{address_id}/associated/{number}" logger.debug(f"{nwsip_url}{endpoint}") try: r = requests.patch(nwsip_url + endpoint, auth=nwsip_auth) if r.status_code == 204: return True else: logger.error(f"Error: {r.status_code} {r.reason} {r.text}") except Exception as e: logger.error(f"Exception: {e}") return def e911_unassociate(number): endpoint = f"/e911s/{number}/deactivate" try: r = requests.delete(nwsip_url + endpoint, auth=nwsip_auth) logger.debug(f"{r.status_code} {r.reason} {r.text}") except Exception as e: logger.error(f"Exception: {e}") return r def get_account(extension): """Get account information on Switchvox extension""" params = {} r = requests.get( swvx_url + f"/extensions/{extension}", auth=swvx_auth, params=params ) if r.status_code == 200: data = r.json()["data"][0] return data return def get_e911_info(e911_id, **kwargs): base_url = nwsip_url + f"/e911s/{e911_id}" r = requests.get(base_url, auth=nwsip_auth) if r.status_code == 200: logger.info(f"{r.json()}") else: logger.critical(f"Error: {r.status_code} {r.reason}") def get_911_record(search_keyword, **kwargs): base_url = nwsip_url + "/e911s/" search_keyword = str(search_keyword) r = requests.get(base_url, auth=nwsip_auth) if r.status_code not in [200, 201, 204]: logger.critical( f"Unable to complete request. {r.status_code} {r.reason} {r.text}" ) sys.exit() # Loop thru to find matching subset. # Returns single dict on match or array of all on no match data = r.json()["data"] for add in data: if search_keyword in add.values(): logger.info(f"Found keyword [{search_keyword}] match.") data = add break # Expressly search the 'label' if search_keyword in add["label"]: logger.info(f"Fournd keyword [{search_keyword}] match.") data = add break # Exit if its a list of everyone. if not isinstance(data, dict): logger.critical( f"Unable to find single match. Please review the list of entries and specify a correct match." ) menu() return data def update_e911_address(e911_id, address, label=None, auto_correct=True, **kwargs): """Update existing e911 record with provider auto_correct will auto matically accept the proposed updated address. """ # data.pop('type', None) # e911_id = data.pop('id', None) # e911_alias = data.pop('alias', None) # Currently not used. # curr_address = data.copy() # new_address = address.copy() # logger.debug(f"New Address:\n{json.dumps(new_address, indent=4)}") if not label: extension = input(f"Enter the extension to use for the label: ") or "N/A" label = f"E911 - {address['first_name']} {address['last_name']} - {extension}" address["label"] = label # Validate address first. r = requests.post(nwsip_url + "/e911s/validate", auth=nwsip_auth, json=address) logger.info(f"{r.request.method} {r.status_code} {r.reason} - {r.url}") if r.status_code == 422: result = r.json()["data"] logger.warning(f"Additional action required:") logger.critical( f"Proposed address change:\n{json.dumps(result['errors'][0]['detail'], indent=4)}" ) logger.warning( f"If an address correction is required please submit the updated address. Suggestions shown above." ) if auto_correct: n_address = result["errors"][0]["detail"] result = update_e911_address( e911_id, n_address, label=label, auto_correct=False ) return result if r.status_code not in [200, 201, 204]: logger.critical( f"Unable to validate address: {r.status_code} {r.reason}\n{json.dumps(r.json(), indent=4)}" ) return False logger.info(f"Address validation passed.") r = requests.patch(nwsip_url + f"/e911s/{e911_id}", auth=nwsip_auth, json=address) logger.debug(f"{r.status_code} {r.reason} - {r.text}") return True def remove_did(number, **kwargs): base_url = nwsip_url + f"/numbers/{number}" r = requests.delete(base_url, auth=nwsip_auth) logger.debug(f"{r.request.method} {r.status_code} {r.reason} - {base_url}") if r.status_code in [200, 201, 204]: logger.critical(f"{r.json()}") if r.status_code == 500: logger.critical(f"{r.text}") else: logger.error(f"{r.json()}") return def remove_inbound_route(**kwargs): return def remove_outbound_caller_id(**kwargs): return def prompt_address_type(default_type=15, **kwargs): """ Generates numeric list to select from. returns tuple 'address_type', 'address_type_number' """ for i, addy_type in enumerate(add_type_list): print(f"{i:2}) {addy_type}") address_type = ( input( f"Enter the number of the address type from the list above [{default_type}]: " ) or default_type ) address_type_number = "" if int(address_type) not in [0, 15]: while True: # Loop until address type number provided. address_type_number = input(f"Enter address type number: ") if address_type_number: address_type_number = address_type_number.replace(" ", "_") break # Return values to textual response as expected by API. return add_type_list[int(address_type)], address_type_number def prompt_address(old_address=None, **kwargs): """Prompt user for address details, shared input for adding/updating records""" # TODO: If old address provided, prepoulate the 'new' address with the old address details. # Collect address info: address = dict( first_name=input(f"First Name [{old_address.get('first_name')}]: ") or old_address.get("first_name"), last_name=input(f"Last Name [{old_address.get('last_name')}]: ") or old_address.get("last_name"), street_number=input( f"Street Number [{old_address.get('street_number')}]: " ).upper() or old_address.get("street_number"), street_name=input(f"Street Name [{old_address.get('street_name')}]: ").upper() or old_address.get("street_name"), city=input(f"Enter City [{old_address.get('city')}]: ").upper() or old_address.get("city"), state=input(f"Enter 2 char state [{old_address.get('state')}]: ").upper() or old_address.get("state"), zip=input(f"Enter ZIP code [{old_address.get('zip')}]: ").upper() or old_address.get("zip"), country=input(f"Enter country: [US] ") or "US", ) # Print list of address types: # print(json.dumps(add_type_list, indent=4)) # address_type = input(f"Address Type (pick from above list) [Residential]: ") or 'Residential' # if address_type == 'Residential': # address_type_number = "" # logger.warning(f"Address Type set to {address_type} no address_type_number can be added. Please verify your entry.") # else: # address_type_number = input(f"Enter address type number: ") or '' address_type, address_type_number = prompt_address_type() address["address_type"] = address_type address["address_type_number"] = address_type_number clean_address = {k: v.strip() for k, v in address.items()} return clean_address def menu_move(**kwargs): """ Generate menu prompts for moving a DID to new destination. """ return def menu_caller_id(**kwargs): return def menu_update_address(**kwargs): return def menu_new_address(**kwargs): return def menu_remove_address(**kwargs): return def menu_assign_did(**kwargs): return def create_extension(**kwargs): """Create a new user extension (sip)""" # Prompt for template # Prompt for group to assign to. return def remove_extension(**kwargs): """Remove a user extension (sip)""" # Prompt for template # Prompt for group to remove from. return def list_templates(**kwargs): """List templates for adding a new user""" return def list_groups(**kwargs): """List all groups on PBX""" return def phone_location(**kwargs): """List/Set/Clear phone location""" # method: "switchvox.digiumPhones.locations.getList" # method: "switchvox.outgoingCallRules.getList" # To show rule name. return def phone_hotdesk(**kwargs): """List/Set/Clear phone hotdesk""" params = { "request": { "method": "switchvox.digiumPhones.assignments.update", "parameters": { "extension": "8105", "ip": "10.16.1.140", "model": "D60", "configured_by": "server", "account_id": "2852", "assignment_id": "394", "phone_type": "deskphone", "mobile_assignment_tokens": {}, "type": "assigned", "mac": "00:0f:d3:0d:4d:6d", "display": "Jean Knapp", "location": "- - -", "locationSortVal": "", "displaySortVal": "Jean Knapp", "isHotDesk": False, "actions": { "type": "div", "key": None, "ref": None, "props": { "className": "actions", "children": { "key": None, "ref": None, "props": { "icon": "edit", "disabled": False, "tooltip": "Modify Phone Settings", }, "_owner": None, }, }, "_owner": None, }, "location_id": "4", "hot_desking": "0", }, } } return def phone_set_location(**kwargs): """Set phone location""" # method: "switchvox.digiumPhones.locations.update" params = { "cid_number": "", "location_id": "", "name": "", "outgoing_call_rules": [ {"id": "6", "state": "allow"}, {"id": "133", "state": "deny"}, ], } return def menu(): print("=" * 80) print( f"You are performing the following actions on {regcode.upper()} Tenant: {tenant}" ) action = input(f"What action are you performing?\n\ {Colors.YELLOW}---------- PBX Actions ------------------------{Colors.END}\n\ T = Tenant Change \n\ M = Move (redirect) incoming DID. (from PBX {regcode.upper()})\n\ C = Set Outgoing Caller ID Name & Number (from PBX {regcode.upper()} )\n\ \n\ {Colors.LIGHT_CYAN}---------- SIP Provider actions ----------------{Colors.END}\n\ N = New E911 Address (from provider)\n\ U = Update existing E911 Address (from provider)\n\ R = Remove E911 record (from provider).\n\ A = Assign DID to 911 Address (from provider).\n\ RN = Release DID from account (from provider). \n\ Q = Quit.\n\ \n\ {Colors.NEGATIVE}---------- Comming soon -------------------------{Colors.END}\n\ {Colors.LIGHT_RED}---------- PBX Location/Hotdesking --------------{Colors.END}\n\ L = List/Set device E911 Location/Hotdesking.\n\ H = Enable/Disable phone Hotdesking.\n\ E = Create/Remove PBX extension.\n\ G = Create/Remove PBX group.\n\ Option: " ) if not action: logger.critical(f"You must specifiy an action.") sys.exit() action = action.upper() outbound_did = None inbound_did = None extension = None note = "Automated entry" x_pos = config[ "911_callrule_id" ] # The position to place the new callrule in the list. # Move DID if action == "Q": print(f"Good Bye!") sys.exit() if action == "T": """Change tenant""" set_tenant() if action == "M": # Move DID inbound_did = input(f"Enter inbound DID to redirect: ") extension = input(f"Enter the destination extension: ") or 7001 account = get_account(extension) account_name = account["display"] account_id = account["account_id"] if str(extension) == "7001": x_pos = 736 # TODO: Set to last position / before ranged numbers. position = input(f"What position to add new record to: [{x_pos}] ") or x_pos if extension == "7001": label = "Available" else: # Real extension label = f"911 - {account['display']}" label = input(f"Enter label [{label}]: ") or label if label.lower() == "sms": label = f"SMS - {account['display']}" note = input(f"Additional note: ") result = redirect_did( inbound_did=inbound_did, dest_account_id=account_id, note=note, priority=position, label=label, ) logger.info(f"result: {result}") # Offer to set Caller ID set_caller_id = ( input(f"Do you wish to set the outgoing caller ID information? [Y/n]: ") or "y" ) if set_caller_id[:1].lower() == "n": logger.warning( f"You will need to manually set outbound caller ID information." ) return logger.debug(f"Setting action to 'C'") outbound_did = inbound_did action = "C" if action == "C": # Set/update caller ID. x_pos = config["911_callrule_id"] out_call_rule_id = config["out_call_rule_id"] print(f"{Colors.LIGHT_RED}If you attempting to set an outgoing " f"caller ID number for E911 use STOP! You should be using the newer " f"'phone_locations' function in your Switchvox.{Colors.END}") ack_new_911 = input(f"Are you setting an outgoing 911 number?: ") or 'y' if ack_new_911.lower().startswith('y'): print(f"Finish setting up your phone_location rules and hot-desking in your switchvox.") return outbound_did = ( input(f"Enter the outbound Caller ID Number [{outbound_did}]: ") or outbound_did ) extension = ( input( f"Enter the destination extension (7001 to remove rule) [{extension}]: " ) or extension ) if extension == "7001": logger.warning( f"Removing outbound caller id rule associated with {outbound_did}." ) caller_id_rule = get_caller_id_rule(outbound_did) approve = ( input( f"You are about to remove caller id rule {caller_id_rule} are you sure? [Y/n]" ) or "y" ) if approve.lower() == "y": r = remove_caller_id_rule(caller_id_rule) logger.debug(f"{r}") return account = get_account(extension) account_name = account["display"] account_id = account["account_id"] caller_id_name = ( input(f"Enter the Caller ID Name to be used [{account_name}]: ") or account_name ) position = input(f"What position to add new record to: [{x_pos}] ") or x_pos if extension != "7001": label = f"911 - {account_name}" else: # Extension = 7001 label = f"Avaliable" label = input(f"Enter label [{label}]: ") or label note = input(f"Additional note [{note}]: ") or note call_rule_id = ( input( f"If you know the specific outgoing_call_rule_id to apply this rule to [{out_call_rule_id}]: " ) or out_call_rule_id ) print( f""" You are about to create an Outbound Caller ID Rule as such: 1) Calls made from extension '{extension}' to outgoing call_rule_id '{call_rule_id}' 2) will be sent with the Caller ID Name as '{caller_id_name}' 3) and the outgoing number as '{outbound_did}'. 4) This rule will be placed at priority '{position}'. """ ) approve = input(f"Do you wish to continue? [Y/n]: ") or "y" if approve[:1].lower() == "n": logger.warning(f"Caller ID rule aborted.") return result = set_caller_id_rule( outbound_did, caller_id_name, extension, note, label, position, call_rule_id ) if action == "N": # New 911 Address logger.info(f"Adding new E911 record.") # inbound_did = input(f"Enter inbound DID to modify: ") extension = input(f"Enter users internal extension: ") # Collect address info: # - start info gather / replace w/ function. address = dict( first_name=input(f"First Name: "), last_name=input(f"Last Name: "), street_number=input(f"Street Number: ").upper(), street_name=input(f"Street Name: ").upper(), city=input(f"Enter City: ").upper(), state=input(f"Enter 2 char state: ").upper(), zip=input(f"Enter ZIP code: ").upper(), country=input(f"Enter country: [US] ") or "US", ) # if not address['country']: # address['country'] = 'US' # # Print list of address types: # print(json.dumps(add_type_list, indent=4)) # address_type = input(f"Address Type (pick from above list) [Residential]: ") or 'Residential' # address_type_number = input(f"Enter address type number: ") or '' # if address_type == 'Residential': # address_type_number == "" # logger.warning(f"Address Type set to {address_type} and is not allowed to have an address type number. Please verify your entry.") address_type, address_type_number = prompt_address_type() address["address_type"] = address_type address["address_type_number"] = address_type_number clean_address = {k: v.strip() for k, v in address.items()} # - End info gather logger.info(f"\n{json.dumps(clean_address, indent=4)}") logger.warning(f"You are about to add the following address information. ") confirm = input(f"Is this information correct? [Y/n]") or "y" if confirm[:1].lower() != "y": logger.critical(f"No records added.") return # Add E911 address and assigne number to address. logger.info(f"Adding Address") if not add_e911_address(address=clean_address, extension=extension): logger.critical(f"Please contact support.") return assign = input(f"Do you want to assign this address to a DID? [Y/n]") or "y" if assign[:1].lower() == "y": inbound_did = input(f"Enter inbound DID to modify: ") label = clean_address["label"] e911_id = get_911_record(label) if not isinstance(e911_id, list): e911_id = [e911_id] for addy in e911_id: if not e911_associate_address(inbound_did, addy["id"]): logger.critical(f"Please contact support.") # Add Outbound Caller ID for E911 to PBX return if action == "U": # Update 911 address by DID # logger.critical(f"Updating an existing address not currently supported.") # sys.exit() logger.info(f"Updating existing E911 record.") inbound_did = input(f"Enter inbound DID to modify: ") address_id = get_e911id_by_number(inbound_did) address = get_e911_address(address_id) if not address: logger.warning(f"No E911 address found.") return if not address["address_type"]: address["address_type"] = "Residential" logger.warning(f"{json.dumps(address, indent=4)}") proceed = ( input( f"Address record {address_id} is currently configured to the above values. Proceed? [Y/n]: " ) or "y" ) if proceed.lower() != "y": sys.exit() if not inbound_did: inbound_label = input(f"Enter part of inbound DID label to search for: ") if not inbound_did and not inbound_label: logger.critical( f"You must specify either an inbound DID or the inbound label." ) sys.exit() updated_address = prompt_address(address) logger.debug(f"Updated Address: {updated_address}") logger.info( f""" {updated_address['first_name']} {updated_address['last_name']} {updated_address['street_number']} {updated_address['street_name']} {updated_address['address_type']} {updated_address['address_type_number']} {updated_address['city']}, {updated_address['state']}, {updated_address['zip']} {updated_address['country']} """ ) # If original address_type is changing to or from 'residential' you have to create a new # address entry. # if address['address_type'] == '' or updated_address['address_type'] == '': if address["address_type"] != updated_address["address_type"]: logger.error( f"Address Type has changed. \"{address['address_type']} -> {updated_address['address_type']}\" Create a new address with the new information and then assign the DID to the new address. You can then remove the old address if required." ) # sys.exit() response = update_e911_address(address_id, updated_address) logger.info(f"{response}") if action == "R": # Remove 911 address by DID logger.info(f"Removing E911 Address from provider.") inbound_did = input(f"Enter inbound DID to modify: ") # TODO: Verify address being removed?? # extension = input(f"Users extension: ") # Lookup the phone number and get the E911 ID e911_id = get_e911id_by_number(inbound_did) if isinstance(e911_id, dict): logger.warning(f"No E911 address associated with the number {inbound_did}.") # Lookup e911 address by aliass. exten = input(f"Search for label value: ") data = get_911_record(exten) if data.get("id"): logger.info(f"Address:\n{json.dumps(data, indent=4)}") e911_id = data.get("id") logger.info( f"Attempting to remove address associated wtih e911_id {e911_id}" ) result = remove_e911_address(e911_id) logger.info(result) return else: logger.critical(f"Unable to process request.") return if not e911_id.isnumeric(): logger.warning(f"Unable to retrieve E911 ID associated with {inbound_did}.") return # Unassociate the e911_id from the user. logger.info(f"Unassociating e911 id {e911_id} with phone number {inbound_did}.") r = e911_unassociate(inbound_did) # Remove the 911 address by e911_id logger.info(f"Attempting to remove address associated wtih e911_id {e911_id}") result = remove_e911_address(e911_id) logger.info(result) # Prompt to remove from PBX. return if action == "A": # Assign number to address logger.info(f"Assigning DID to E911 Address.") inbound_did = input(f"Enter inbound DID to modify: ") e911_label = input(f"Enter e911 label/or part of label: ") e911_id = get_911_record(e911_label) if not isinstance(e911_id, list): e911_id = [e911_id] for addy in e911_id: logger.info(f"E911 Address results: {json.dumps(e911_id, indent=4)}") confirm = input(f"Is this the record you want to use? [Y/n] ") or "y" if confirm[:1].lower() == "y": associate_id = addy["id"] logger.info(f"Using E911 record {associate_id}.") break if not e911_associate_address(inbound_did, associate_id): logger.critical(f"Please contact support.") return if action == "RN": # Purge and release number logger.critical( f""" This action will remove the phone number from your account. You might not beable to retrieve the number again. This will also remove the number from your PBX: - Incoming Call Routes - Outgoing Caller ID Rules This action is non-reversable. """ ) confirm = input(f"Do you wish to proceed? [y/N]: ") or "n" if confirm.lower() == "n": logger.info(f"aborted.") return inbound_did = input(f"Enter inbound DID to modify: ") remove_did(inbound_did) # Extension Management if action == "E": pass # Prompt for template to use # Prompt for Extension data # # Secret menu options. if action == "L911": # Lookup 911 Address by label logger.info(f"Lookup action not yet defined.") pass if action == "I911": # Lookup address by 911_id """Lookup address info""" e911_id = input(f"Enter the e911_id to lookup: ") get_e911_info(e911_id) if __name__ == "__main__": # TODO: Set to config file. config_file = "config.json" if os.path.isfile(config_file): with open(config_file) as f: config_data = json.load(f) else: logger.critical(f"Unable to load config.json. Aborting.") config_data = build_config() try: config_data = [prompt_credentials(config_data[0])] show_disclaimer() disclaimer_accepted = ( input( f"Do you accept the disclaimer? You must enter {Colors.BOLD}'ACCEPT'{Colors.END} in all capitals: " ) or disclaimer_accepted ) if disclaimer_accepted != "ACCEPT": logger.info( f"The user {config_data[0]['username']} for regcode {config_data[0]['regcode']} did not accept the use and terms your rejections has been logged." ) sys.exit(0) set_tenant() while True: menu() except KeyboardInterrupt: logger.info(f"Good bye!")