Files
E911_helper/e911_helper.py
2022-02-01 02:00:10 -08:00

1265 lines
43 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
#
# (C) 2020 VOICE1 LLC
#
import datetime
from dis import dis
import requests
import json
import sys
import os
try:
from loguru import logger
logger.add("app.log", rotation="1 month", level="DEBUG")
except ImportError:
import logging
logger = logging.getLogger(__name__)
class bcolors:
HEADER = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
class Colors:
""" ANSI color codes """
BLACK = "\033[0;30m"
RED = "\033[0;31m"
GREEN = "\033[0;32m"
BROWN = "\033[0;33m"
BLUE = "\033[0;34m"
PURPLE = "\033[0;35m"
CYAN = "\033[0;36m"
LIGHT_GRAY = "\033[0;37m"
DARK_GRAY = "\033[1;30m"
LIGHT_RED = "\033[1;31m"
LIGHT_GREEN = "\033[1;32m"
YELLOW = "\033[1;33m"
LIGHT_BLUE = "\033[1;34m"
LIGHT_PURPLE = "\033[1;35m"
LIGHT_CYAN = "\033[1;36m"
LIGHT_WHITE = "\033[1;37m"
BOLD = "\033[1m"
FAINT = "\033[2m"
ITALIC = "\033[3m"
UNDERLINE = "\033[4m"
BLINK = "\033[5m"
NEGATIVE = "\033[7m"
CROSSED = "\033[9m"
END = "\033[0m"
# cancel SGR codes if we don't write to a terminal
if not __import__("sys").stdout.isatty():
for _ in dir():
if isinstance(_, str) and _[0] != "_":
locals()[_] = ""
else:
# set Windows console in VT mode
if __import__("platform").system() == "Windows":
kernel32 = __import__("ctypes").windll.kernel32
kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7)
del kernel32
# Show and confirm disclaimer
disclaimer_accepted = False
def show_disclaimer():
print(
f"""
{Colors.BOLD}{Colors.BLINK}{Colors.RED}
WARNING {Colors.END}{Colors.LIGHT_WHITE}- This script is provided as a {Colors.ITALIC}DEMO ONLY.{Colors.END}{Colors.LIGHT_WHITE} It is intended to illistrate how to use the Voice1 APIs. By using this script, you agree to the terms of use and acknowledge that you are responsible for any actions taken.
Neither VOICE1 nor VOICE1s resellers, contractors, or associates shall be responsible for providing Services to the extent that the issue is caused by (a) Customers misuse, improper use, mis-configuration, alteration, or damage to the Software; (b) Customers use of the Software with any hardware or software not supplied or supported by VOICE1; (c) Customers failure to install an update to the Software if such update would have resolved the issue; or (d) uses in a manner not in accordance with the Agreement. VOICE1 shall have no responsibility for loss of or damage to Customers data or loss of business, regardless of the cause of any such loss or damage.
TO THE MAXIMUM EXTENT PERMITTED BY APPLICABLE LAW, VOICE1 AND ITS RESELLERS DISCLAIM ALL WARRANTIES AND CONDITIONS, EITHER EXPRESS OR IMPLIED, INCLUDING, BUT NOT LIMITED TO THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND TITLE AND NON-INFRINGEMENT, WITH REGARD TO THE SOFTWARE. AND SERVICES. VOICE1 AND RESELLERS DO NOT GUARANTEE THAT THE OPERATION OF THE SOFTWARE OR ANY OTHER CODE WILL BE UNINTERRUPTED OR ERROR-FREE, AND CUSTOMER ACKNOWLEDGE THAT IT IS NOT TECHNICALLY PRACTICABLE FOR US TO DO SO.
TO THE MAXIMUM EXTENT PERMITTED BY APPLICABLE LAW, IN NO EVENT SHALL VOICE1 OR ITS RESELLERS BE LIABLE UNDER ANY LEGAL OR EQUITABLE THEORY FOR ANY SPECIAL, INCIDENTAL, INDIRECT OR CONSEQUENTIAL DAMAGES WHATSOEVER (INCLUDING, WITHOUT LIMITATION, DAMAGES FOR LOSS OF BUSINESS PROFITS, BUSINESS INTERRUPTION, LOSS OF BUSINESS INFORMATION OR ANY OTHER PECUNIARY LAW) ARISING OUT OF THE USE OF OR INABILITY TO USE THE SOFTWARE OR THE SERVICES OR ANY OTHER SUBJECT MATTER RELATING TO THIS AGREEMENT, EVEN IF VOICE1 OR RESELLERS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES. IN ANY CASE, VOICE 1'S AND RESELLER'S ENTIRE LIABILITY WITH RESPECT TO ANY SUBJECT MATTER RELATING TO THIS AGREEMENT SHALL BE LIMITED TO THE AMOUNT ACTUALLY PAID FOR THE SERVICES
{Colors.END}
"""
)
# TODO: Set to config file.
config_file = "config.json"
if os.path.isfile(config_file):
with open(config_file) as f:
config_data = json.load(f)
else:
logger.critical(f"Unable to load config.json. Aborting.")
sys.exit()
add_type_list = [
"",
"Apartment",
"Basement",
"Building",
"Department",
"Floor",
"Front",
"Key",
"Lobby",
"Lot",
"Lower",
"Office",
"Penthouse",
"Pier",
"Rear",
"Residential",
"Room",
"Side",
"Slip",
"Space",
"Stop",
"Suite",
"Trailer",
"Unit",
"Upper",
]
def create_helpdeskticket(**kwargs):
"""Open helpdesk ticket w/ VOICE1"""
url = "https://api.voice1.me/tickets"
params = {
"email": "",
"subject": "Helpdesk Ticket",
"body": "Please call me",
}
r = requests.post(url, params=params)
return
def update_helpdeskticket(**kwargs):
"""update existing ticket with voice1"""
params = {"ticket_id": "", "body": "Please call me", "status": "open"}
r = requests.patch(url, params=params)
return
def get_nth_key(dictionary, n=0):
if n < 0:
n += len(dictionary)
for i, key in enumerate(dictionary.keys()):
if i == n:
return key
raise IndexError("dictionary index out of range")
def set_tenant(**kwargs):
global regcode
global config
global tenant
global swvx_url
global swvx_auth
global nwsip_url
global nwsip_auth
# Get 1st REGCODE in config.
for i, r in enumerate(config_data):
print(f"{i}) {r['regcode']}")
regcode_i = input(f"Select your 6 digit regcode [{0}]: ") or 0
config = config_data[int(regcode_i)]
regcode = config["regcode"]
# Populate Tenant Lists
tenant = "None"
if config.get("tenant"):
tenants = config["tenant"]
for i, t in enumerate(config["tenant"]):
print(f"{i}) {t}")
tenant = input(f"Select the tenant to modify or enter to skip: ") or tenant
if tenant != "None":
tenant = get_nth_key(config["tenant"], int(tenant))
config = config["tenant"][tenant]
redirect_ivr_id = config.get("redirect_ivr_id")
nwsip_url = "https://api.nwsip.com/v1"
nwsip_auth = (config["provider_username"], config["provider_password"])
swvx_url = f"https://api.switchvoxuc.com/switchvox/{regcode.upper()}"
swvx_auth = (config["username"], config["password"])
# Get Outbound 911 Caller ID Rule
def get_caller_id_rule(phone_number=None):
"""
Returns cid_rule_id for matching outbound caller id number.
Returns None if not found.
"""
params = {}
url = swvx_url + f"/caller_ids/{phone_number}"
r = requests.get(url, auth=swvx_auth, params=params)
if r.status_code != 200:
logger.error(f"Error: {r.status_code} {r.reason} | {r.text}")
return None
data = r.json()["data"]
if data:
for d in data:
logger.info(f"CID Rule: {d['id']}")
cid_rule_id = d["id"]
return cid_rule_id
# Create outbound caller id rule
def set_caller_id_rule(
caller_id_number,
caller_id_name,
extension,
description="Automated",
title="Outbound Caller ID",
priority=6,
call_rule_id=0,
):
"""
Set an outbound Caller ID Rule, if it exists update the rule.
call_rule_id = 6 for 911
"""
payload = {
"outgoing_call_rule_id": call_rule_id,
"caller_id_name": caller_id_name[:15],
"caller_id_number": caller_id_number,
"description": description,
"rule_type": "single",
"title": title,
"priority": priority,
"extension": extension,
}
# Lookup rule by caller ID.
logger.debug(f"Looking up existing caller id rules matching {caller_id_number}")
r = requests.get(swvx_url + f"/caller_ids/{caller_id_number}", auth=swvx_auth)
if r.status_code != 200:
logger.error(f"Error: {r.status_code} {r.reason} {r.text}")
sys.exit()
data = r.json()["data"]
# logger.debug(f"data: {data}")
if not data:
logger.error(f"No existing rule returned")
# Add new record.
r = requests.post(swvx_url + f"/caller_ids", auth=swvx_auth, json=payload)
if r.status_code != 201:
logger.critical(f"Unable to create caller id rule")
logger.debug(f"{r.text}")
return
logger.info(f"{r.json()}")
return r.json()["data"]
# Data exists, a.k.a. Need to update a rule?
logger.warning(f"Rule already exists but no code to update.")
# Quickly remove any 'dependents' from outgoing call rule so as to slim down the payload
for d in data:
d["outgoing_call_rule"].pop("dependents")
logger.debug(f"\n{json.dumps(data, indent=4)}")
logger.warning(f"Submit and review entry")
return
# Remove Outbound 911 Caller ID Rule
def remove_caller_id_rule(cid_rule_id=None):
"""
:param cid_rule_id: The outbound caller id rule to specifiy for removal.
:return: Returns .json() response.
"""
# !! Remove the caller id rule
url = swvx_url + f"/caller_ids/remove/{cid_rule_id}"
logger.critical(f"Removing outbound caller id: {url}")
# logger.debug(json.dumps(d, indent=4))
params = {}
r = requests.delete(url, auth=swvx_auth, params=params)
if r.status_code != 201:
logger.error(f"Error: {r.status_code} {r.reason} | {r.text}")
return r.json()
# Get inbound route of number
def inbound_route_by_number(inbound_did):
try:
r = requests.get(
swvx_url + f"/routes/inbound/number/{inbound_did}", auth=swvx_auth
)
if r.status_code == 200:
return r.json()["data"]
except Exception as e:
logger.error(f"Exception: {e}")
return None
# Redirect Inbound DID to new destination
def redirect_did(inbound_did, dest_account_id, note="", priority=1, label="Available"):
route = inbound_route_by_number(inbound_did=inbound_did)
payload = {
"name": label,
"note": note,
"priority": priority,
"type": "route_number",
"number": inbound_did,
"any_provider": 0,
"call_type": 0,
"destination_account_id": dest_account_id,
}
if route:
if not route["incoming_did_route"]:
# If route does not exist we need to add a new route.
logger.warning(
f"No existing route for this number found. Adding a NEW inbound route."
)
rr = requests.post(
swvx_url + f"/routes/inbound/{inbound_did}/{dest_account_id}",
json=payload,
auth=swvx_auth,
)
if rr.status_code == 201:
return {"success": "submitted"}
logger.critical(f"{rr.status_code} {rr.reason} {rr.text}")
for r in route["incoming_did_route"]:
note = f"Modified: {datetime.datetime.today()}. {note}"
route_id = r["id"]
logger.info(
f"Modifying inbound route {route_id}, setting new destination to {dest_account_id}@{priority}"
)
rr = requests.patch(
swvx_url + f"/routes/inbound/{route_id}", json=payload, auth=swvx_auth
)
if rr.status_code == 201:
return {"success": "submitted"}
logger.warning(f"Be sure to verify the priority order for proper routing.")
return
#######
# E911 Address functions (carrier side)
#
# Remove E911 Address associated w/ Number (Carrier side)
def remove_e911_address(e911_number, **kwargs):
"""
Removes address association with the number provided.
"""
result = None
endpoint = f"/e911s/{e911_number}"
try:
params = {}
logger.debug(f"{nwsip_url + endpoint}")
r = requests.delete(nwsip_url + endpoint, auth=nwsip_auth, params=params)
result = r.json()
except Exception as e:
logger.error(f"Exception: {e}")
return result
# Add Address
def format_address(address, fmt="human"):
"""Prettifies address for display."""
if fmt == "human":
address_type = " ".join(
part
for part in (
address.get("address_type"),
address.get("address_type_number"),
)
if part
)
address_type = " ".join(
filter(
None, (address.get("address_type"), address.get("address_type_number"))
)
)
return f"{address['street_number']} {address['street_name']} {address_type} {address['city']}, {address['state']} {address['zip']} {address['country']}"
if fmt == "usps":
name = " ".join(
part
for part in (address.get("first_name"), address.get("last_name"))
if part
)
return {
"name": name,
"company": "",
"address_1": f"{address['street_number']} {address['street_name']} {address.get('address_type', '')} {address.get('address_type_number', '')}",
"address_2": "",
"city": address["city"],
"state": address["state"],
"zipcode": address["zip"],
"zipcode_ext": "",
"phone": "",
}
return address
def add_e911_address(address, extension, auto_correct=True):
"""Add proivded e911 address or present error"""
endpoint = "/e911s"
result = None
params = {}
address["address_type"] = address.get("address_type", "residential")
if address["address_type"].lower() == "residential":
address["address_type"] = ""
address[
"label"
] = f"E911 - {address['first_name']} {address['last_name']} - {extension}"
payload = address
try:
logger.debug(f"Submitting:\n{json.dumps(payload, indent=4)}")
logger.info(f"Submitting: {format_address(address)}")
r = requests.post(
nwsip_url + endpoint, auth=nwsip_auth, json=payload, params=params
)
if r.status_code == 422:
result = r.json()["data"]
logger.warning(f"Additional action required:")
logger.error(f"{json.dumps(result['errors'][0]['detail'], indent=4)}")
logger.warning(
f"If an address correction is required please submit the updated address. Suggestions shown above."
)
if auto_correct:
n_address = result["errors"][0]["detail"]
logger.warning(
f"Submitting updated address change:\n{json.dumps(n_address, indent=4)}"
)
result = add_e911_address(n_address, extension, auto_correct=False)
return result
if r.status_code != 201:
logger.critical(f"{r.status_code} {r.reason} {r.text}")
result = r.json()
logger.error(f"Error processing request: \n{json.dumps(result, indent=4)}")
return result
if r.status_code == 201:
logger.info(
f"Successfully added E911 Address: {r.status_code} {r.reason} | {r.request.body}."
)
logger.info(f"Be sure to associate the number with the address.")
except Exception as e:
logger.critical(f"Exception: {e}")
return False
return True
# Get E911 id by number.
def get_e911id_by_number(number, **kwargs):
"""Returns E911 address ID by phone number."""
result = None
try:
r = requests.get(nwsip_url + f"/numbers/{number}", auth=nwsip_auth)
if r.status_code not in [200]:
logger.warning(f"{r.status_code} {r.reason}\n{r.text}")
result = r.json()["data"][0]
logger.debug(f"{json.dumps(result, indent=4)}")
# If subkey had e911_id then return.
relationships = result.get("relationships")
e911_address = relationships.get("e911_address")
if e911_address.get("data"):
return e911_address["data"]["id"]
except Exception as e:
logger.error(f"Error: {e}")
return result
def get_e911_address(address_id, **kwargs):
"""Returns E911 address by address_id"""
params = {}
r = requests.get(nwsip_url + f"/e911s/{address_id}", auth=nwsip_auth, params=params)
if r.status_code != 200:
logger.error(f"Unable to return address: {r.status_code} {r.reason}")
return
data = r.json()["data"]
logger.info(f"{r.status_code} {r.reason}")
return data
def e911_associate_address(number, address_id):
endpoint = f"/e911s/{address_id}/associated/{number}"
logger.debug(f"{nwsip_url}{endpoint}")
try:
r = requests.patch(nwsip_url + endpoint, auth=nwsip_auth)
if r.status_code == 204:
return True
else:
logger.error(f"Error: {r.status_code} {r.reason} {r.text}")
except Exception as e:
logger.error(f"Exception: {e}")
return
def e911_unassociate(number):
endpoint = f"/e911s/{number}/deactivate"
try:
r = requests.delete(nwsip_url + endpoint, auth=nwsip_auth)
logger.debug(f"{r.status_code} {r.reason} {r.text}")
except Exception as e:
logger.error(f"Exception: {e}")
return r
def get_account(extension):
"""Get account information on Switchvox extension"""
params = {}
r = requests.get(
swvx_url + f"/extensions/{extension}", auth=swvx_auth, params=params
)
if r.status_code == 200:
data = r.json()["data"][0]
return data
return
def get_e911_info(e911_id, **kwargs):
base_url = nwsip_url + f"/e911s/{e911_id}"
r = requests.get(base_url, auth=nwsip_auth)
if r.status_code == 200:
logger.info(f"{r.json()}")
else:
logger.critical(f"Error: {r.status_code} {r.reason}")
def get_911_record(search_keyword, **kwargs):
base_url = nwsip_url + "/e911s/"
search_keyword = str(search_keyword)
r = requests.get(base_url, auth=nwsip_auth)
if r.status_code not in [200, 201, 204]:
logger.critical(
f"Unable to complete request. {r.status_code} {r.reason} {r.text}"
)
sys.exit()
# Loop thru to find matching subset.
# Returns single dict on match or array of all on no match
data = r.json()["data"]
for add in data:
if search_keyword in add.values():
logger.info(f"Found keyword [{search_keyword}] match.")
data = add
break
# Expressly search the 'label'
if search_keyword in add["label"]:
logger.info(f"Fournd keyword [{search_keyword}] match.")
data = add
break
# Exit if its a list of everyone.
if not isinstance(data, dict):
logger.critical(
f"Unable to find single match. Please review the list of entries and specify a correct match."
)
menu()
return data
def update_e911_address(e911_id, address, label=None, auto_correct=True, **kwargs):
"""Update existing e911 record with provider
auto_correct will auto matically accept the proposed updated address.
"""
# data.pop('type', None)
# e911_id = data.pop('id', None)
# e911_alias = data.pop('alias', None) # Currently not used.
# curr_address = data.copy()
# new_address = address.copy()
# logger.debug(f"New Address:\n{json.dumps(new_address, indent=4)}")
if not label:
extension = input(f"Enter the extension to use for the label: ") or "N/A"
label = f"E911 - {address['first_name']} {address['last_name']} - {extension}"
address["label"] = label
# Validate address first.
r = requests.post(nwsip_url + "/e911s/validate", auth=nwsip_auth, json=address)
logger.info(f"{r.request.method} {r.status_code} {r.reason} - {r.url}")
if r.status_code == 422:
result = r.json()["data"]
logger.warning(f"Additional action required:")
logger.critical(
f"Proposed address change:\n{json.dumps(result['errors'][0]['detail'], indent=4)}"
)
logger.warning(
f"If an address correction is required please submit the updated address. Suggestions shown above."
)
if auto_correct:
n_address = result["errors"][0]["detail"]
result = update_e911_address(
e911_id, n_address, label=label, auto_correct=False
)
return result
if r.status_code not in [200, 201, 204]:
logger.critical(
f"Unable to validate address: {r.status_code} {r.reason}\n{json.dumps(r.json(), indent=4)}"
)
return False
logger.info(f"Address validation passed.")
r = requests.patch(nwsip_url + f"/e911s/{e911_id}", auth=nwsip_auth, json=address)
logger.debug(f"{r.status_code} {r.reason} - {r.text}")
return True
def remove_did(number, **kwargs):
base_url = nwsip_url + f"/numbers/{number}"
r = requests.delete(base_url, auth=nwsip_auth)
logger.debug(f"{r.request.method} {r.status_code} {r.reason} - {base_url}")
if r.status_code in [200, 201, 204]:
logger.critical(f"{r.json()}")
if r.status_code == 500:
logger.critical(f"{r.text}")
else:
logger.error(f"{r.json()}")
return
def remove_inbound_route(**kwargs):
return
def remove_outbound_caller_id(**kwargs):
return
def prompt_address_type(default_type=15, **kwargs):
"""
Generates numeric list to select from.
returns tuple 'address_type', 'address_type_number'
"""
for i, addy_type in enumerate(add_type_list):
print(f"{i:2}) {addy_type}")
address_type = (
input(
f"Enter the number of the address type from the list above [{default_type}]: "
)
or default_type
)
address_type_number = ""
if int(address_type) not in [0, 15]:
while True: # Loop until address type number provided.
address_type_number = input(f"Enter address type number: ")
if address_type_number:
address_type_number = address_type_number.replace(" ", "_")
break
# Return values to textual response as expected by API.
return add_type_list[int(address_type)], address_type_number
def prompt_address(old_address=None, **kwargs):
"""Prompt user for address details, shared input for adding/updating records"""
# TODO: If old address provided, prepoulate the 'new' address with the old address details.
# Collect address info:
address = dict(
first_name=input(f"First Name [{old_address.get('first_name')}]: ")
or old_address.get("first_name"),
last_name=input(f"Last Name [{old_address.get('last_name')}]: ")
or old_address.get("last_name"),
street_number=input(
f"Street Number [{old_address.get('street_number')}]: "
).upper()
or old_address.get("street_number"),
street_name=input(f"Street Name [{old_address.get('street_name')}]: ").upper()
or old_address.get("street_name"),
city=input(f"Enter City [{old_address.get('city')}]: ").upper()
or old_address.get("city"),
state=input(f"Enter 2 char state [{old_address.get('state')}]: ").upper()
or old_address.get("state"),
zip=input(f"Enter ZIP code [{old_address.get('zip')}]: ").upper()
or old_address.get("zip"),
country=input(f"Enter country: [US] ") or "US",
)
# Print list of address types:
# print(json.dumps(add_type_list, indent=4))
# address_type = input(f"Address Type (pick from above list) [Residential]: ") or 'Residential'
# if address_type == 'Residential':
# address_type_number = ""
# logger.warning(f"Address Type set to {address_type} no address_type_number can be added. Please verify your entry.")
# else:
# address_type_number = input(f"Enter address type number: ") or ''
address_type, address_type_number = prompt_address_type()
address["address_type"] = address_type
address["address_type_number"] = address_type_number
clean_address = {k: v.strip() for k, v in address.items()}
return clean_address
def menu_move(**kwargs):
"""
Generate menu prompts for moving a DID to new destination.
"""
return
def menu_caller_id(**kwargs):
return
def menu_update_address(**kwargs):
return
def menu_new_address(**kwargs):
return
def menu_remove_address(**kwargs):
return
def menu_assign_did(**kwargs):
return
def create_extension(**kwargs):
"""Create a new user extension (sip)"""
# Prompt for template
# Prompt for group to assign to.
return
def remove_extension(**kwargs):
"""Remove a user extension (sip)"""
# Prompt for template
# Prompt for group to remove from.
return
def list_templates(**kwargs):
"""List templates for adding a new user"""
return
def list_groups(**kwargs):
"""List all groups on PBX"""
return
def phone_location(**kwargs):
"""List/Set/Clear phone location"""
# method: "switchvox.digiumPhones.locations.getList"
# method: "switchvox.outgoingCallRules.getList" # To show rule name.
return
def phone_hotdesk(**kwargs):
"""List/Set/Clear phone hotdesk"""
params = {
"request": {
"method": "switchvox.digiumPhones.assignments.update",
"parameters": {
"extension": "8105",
"ip": "10.16.1.140",
"model": "D60",
"configured_by": "server",
"account_id": "2852",
"assignment_id": "394",
"phone_type": "deskphone",
"mobile_assignment_tokens": {},
"type": "assigned",
"mac": "00:0f:d3:0d:4d:6d",
"display": "Jean Knapp",
"location": "- - -",
"locationSortVal": "",
"displaySortVal": "Jean Knapp",
"isHotDesk": False,
"actions": {
"type": "div",
"key": None,
"ref": None,
"props": {
"className": "actions",
"children": {
"key": None,
"ref": None,
"props": {
"icon": "edit",
"disabled": False,
"tooltip": "Modify Phone Settings",
},
"_owner": None,
},
},
"_owner": None,
},
"location_id": "4",
"hot_desking": "0",
},
}
}
return
def phone_set_location(**kwargs):
"""Set phone location"""
# method: "switchvox.digiumPhones.locations.update"
params = {
"cid_number": "",
"location_id": "",
"name": "",
"outgoing_call_rules": [
{"id": "6", "state": "allow"},
{"id": "133", "state": "deny"},
],
}
return
def menu():
print("=" * 80)
print(
f"You are performing the following actions on {regcode.upper()} Tenant: {tenant}"
)
action = input(
f"What action are you performing?\n\---------- PBX Actions ------------------------\n\
T = Tenant Change \n\
M = Move (redirect) DID. (from PBX {regcode.upper()})\n\
C = Caller ID (from PBX {regcode.upper()} )\n\
\n\
---------- SIP Provider actions ----------------\n\
N = New E911 Address (from provider)\n\
U = Update existing E911 Address (from provider)\n\
R = Remove E911 record (from provider).\n\
A = Assign DID to 911 Address (from provider).\n\
RN = Release DID from account (from provider). \n\
Q = Quit.\n\
\n\
---------- Comming soon -------------------------\n\
---------- PBX Location/Hotdesking --------------\n\
L = List/Set device E911 Location/Hotdesking.\n\
H = Enable/Disable phone Hotdesking.\n\
E = Create/Remove PBX extension.\n\
G = Create/Remove PBX group.\n\
Option: "
)
if not action:
logger.critical(f"You must specifiy an action.")
sys.exit()
action = action.upper()
outbound_did = None
inbound_did = None
extension = None
note = "Automated entry"
x_pos = config[
"911_callrule_id"
] # The position to place the new callrule in the list.
# Move DID
if action == "Q":
print(f"Good Bye!")
sys.exit()
if action == "T":
"""Change tenant"""
set_tenant()
if action == "M": # Move DID
inbound_did = input(f"Enter inbound DID to redirect: ")
extension = input(f"Enter the destination extension: ") or 7001
account = get_account(extension)
account_name = account["display"]
account_id = account["account_id"]
if str(extension) == "7001":
x_pos = 736 # TODO: Set to last position / before ranged numbers.
position = input(f"What position to add new record to: [{x_pos}] ") or x_pos
if extension == "7001":
label = "Available"
else: # Real extension
label = f"911 - {account['display']}"
label = input(f"Enter label [{label}]: ") or label
if label.lower() == "sms":
label = f"SMS - {account['display']}"
note = input(f"Additional note: ")
result = redirect_did(
inbound_did=inbound_did,
dest_account_id=account_id,
note=note,
priority=position,
label=label,
)
logger.info(f"result: {result}")
# Offer to set Caller ID
set_caller_id = (
input(f"Do you wish to set the outgoing caller ID information? [Y/n]: ")
or "y"
)
if set_caller_id[:1].lower() == "n":
logger.warning(
f"You will need to manually set outbound caller ID information."
)
return
logger.debug(f"Setting action to 'C'")
outbound_did = inbound_did
action = "C"
if action == "C": # Set/update caller ID.
x_pos = config["911_callrule_id"]
out_call_rule_id = config["out_call_rule_id"]
outbound_did = (
input(f"Enter the outbound Caller ID Number [{outbound_did}]: ")
or outbound_did
)
extension = (
input(
f"Enter the destination extension (7001 to remove rule) [{extension}]: "
)
or extension
)
if extension == "7001":
logger.warning(
f"Removing outbound caller id rule associated with {outbound_did}."
)
caller_id_rule = get_caller_id_rule(outbound_did)
approve = (
input(
f"You are about to remove caller id rule {caller_id_rule} are you sure? [Y/n]"
)
or "y"
)
if approve.lower() == "y":
r = remove_caller_id_rule(caller_id_rule)
logger.debug(f"{r}")
return
account = get_account(extension)
account_name = account["display"]
account_id = account["account_id"]
caller_id_name = (
input(f"Enter the Caller ID Name to be used [{account_name}]: ")
or account_name
)
position = input(f"What position to add new record to: [{x_pos}] ") or x_pos
if extension != "7001":
label = f"911 - {account_name}"
else: # Extension = 7001
label = f"Avaliable"
label = input(f"Enter label [{label}]: ") or label
note = input(f"Additional note [{note}]: ") or note
call_rule_id = (
input(
f"If you know the specific outgoing_call_rule_id to apply this rule to [{out_call_rule_id}]: "
)
or out_call_rule_id
)
print(
f"""
You are about to create an Outbound Caller ID Rule as such: 1) Calls made from extension '{extension}' to outgoing call_rule_id '{call_rule_id}'
2) will be sent with the Caller ID Name as '{caller_id_name}'
3) and the outgoing number as '{outbound_did}'.
4) This rule will be placed at priority '{position}'.
"""
)
approve = input(f"Do you wish to continue? [Y/n]: ") or "y"
if approve[:1].lower() == "n":
logger.warning(f"Caller ID rule aborted.")
return
result = set_caller_id_rule(
outbound_did, caller_id_name, extension, note, label, position, call_rule_id
)
if action == "N": # New 911 Address
logger.info(f"Adding new E911 record.")
# inbound_did = input(f"Enter inbound DID to modify: ")
extension = input(f"Enter users internal extension: ")
# Collect address info:
# - start info gather / replace w/ function.
address = dict(
first_name=input(f"First Name: "),
last_name=input(f"Last Name: "),
street_number=input(f"Street Number: ").upper(),
street_name=input(f"Street Name: ").upper(),
city=input(f"Enter City: ").upper(),
state=input(f"Enter 2 char state: ").upper(),
zip=input(f"Enter ZIP code: ").upper(),
country=input(f"Enter country: [US] ") or "US",
)
# if not address['country']:
# address['country'] = 'US'
# # Print list of address types:
# print(json.dumps(add_type_list, indent=4))
# address_type = input(f"Address Type (pick from above list) [Residential]: ") or 'Residential'
# address_type_number = input(f"Enter address type number: ") or ''
# if address_type == 'Residential':
# address_type_number == ""
# logger.warning(f"Address Type set to {address_type} and is not allowed to have an address type number. Please verify your entry.")
address_type, address_type_number = prompt_address_type()
address["address_type"] = address_type
address["address_type_number"] = address_type_number
clean_address = {k: v.strip() for k, v in address.items()}
# - End info gather
logger.info(f"\n{json.dumps(clean_address, indent=4)}")
logger.warning(f"You are about to add the following address information. ")
confirm = input(f"Is this information correct? [Y/n]") or "y"
if confirm[:1].lower() != "y":
logger.critical(f"No records added.")
return
# Add E911 address and assigne number to address.
logger.info(f"Adding Address")
if not add_e911_address(address=clean_address, extension=extension):
logger.critical(f"Please contact support.")
return
assign = input(f"Do you want to assign this address to a DID? [Y/n]") or "y"
if assign[:1].lower() == "y":
inbound_did = input(f"Enter inbound DID to modify: ")
label = clean_address["label"]
e911_id = get_911_record(label)
if not isinstance(e911_id, list):
e911_id = [e911_id]
for addy in e911_id:
if not e911_associate_address(inbound_did, addy["id"]):
logger.critical(f"Please contact support.")
# Add Outbound Caller ID for E911 to PBX
return
if action == "U": # Update 911 address by DID
# logger.critical(f"Updating an existing address not currently supported.")
# sys.exit()
logger.info(f"Updating existing E911 record.")
inbound_did = input(f"Enter inbound DID to modify: ")
address_id = get_e911id_by_number(inbound_did)
address = get_e911_address(address_id)
if not address:
logger.warning(f"No E911 address found.")
return
if not address["address_type"]:
address["address_type"] = "Residential"
logger.warning(f"{json.dumps(address, indent=4)}")
proceed = (
input(
f"Address record {address_id} is currently configured to the above values. Proceed? [Y/n]: "
)
or "y"
)
if proceed.lower() != "y":
sys.exit()
if not inbound_did:
inbound_label = input(f"Enter part of inbound DID label to search for: ")
if not inbound_did and not inbound_label:
logger.critical(
f"You must specify either an inbound DID or the inbound label."
)
sys.exit()
updated_address = prompt_address(address)
logger.debug(f"Updated Address: {updated_address}")
logger.info(
f"""
{updated_address['first_name']} {updated_address['last_name']}
{updated_address['street_number']} {updated_address['street_name']}
{updated_address['address_type']} {updated_address['address_type_number']}
{updated_address['city']}, {updated_address['state']}, {updated_address['zip']}
{updated_address['country']}
"""
)
# If original address_type is changing to or from 'residential' you have to create a new
# address entry.
# if address['address_type'] == '' or updated_address['address_type'] == '':
if address["address_type"] != updated_address["address_type"]:
logger.error(
f"Address Type has changed. \"{address['address_type']} -> {updated_address['address_type']}\" Create a new address with the new information and then assign the DID to the new address. You can then remove the old address if required."
)
# sys.exit()
response = update_e911_address(address_id, updated_address)
logger.info(f"{response}")
if action == "R": # Remove 911 address by DID
logger.info(f"Removing E911 Address from provider.")
inbound_did = input(f"Enter inbound DID to modify: ")
# TODO: Verify address being removed??
# extension = input(f"Users extension: ")
# Lookup the phone number and get the E911 ID
e911_id = get_e911id_by_number(inbound_did)
if isinstance(e911_id, dict):
logger.warning(f"No E911 address associated with the number {inbound_did}.")
# Lookup e911 address by aliass.
exten = input(f"Search for label value: ")
data = get_911_record(exten)
if data.get("id"):
logger.info(f"Address:\n{json.dumps(data, indent=4)}")
e911_id = data.get("id")
logger.info(
f"Attempting to remove address associated wtih e911_id {e911_id}"
)
result = remove_e911_address(e911_id)
logger.info(result)
return
else:
logger.critical(f"Unable to process request.")
return
if not e911_id.isnumeric():
logger.warning(f"Unable to retrieve E911 ID associated with {inbound_did}.")
return
# Unassociate the e911_id from the user.
logger.info(f"Unassociating e911 id {e911_id} with phone number {inbound_did}.")
r = e911_unassociate(inbound_did)
# Remove the 911 address by e911_id
logger.info(f"Attempting to remove address associated wtih e911_id {e911_id}")
result = remove_e911_address(e911_id)
logger.info(result)
# Prompt to remove from PBX.
return
if action == "A": # Assign number to address
logger.info(f"Assigning DID to E911 Address.")
inbound_did = input(f"Enter inbound DID to modify: ")
e911_label = input(f"Enter e911 label/or part of label: ")
e911_id = get_911_record(e911_label)
if not isinstance(e911_id, list):
e911_id = [e911_id]
for addy in e911_id:
logger.info(f"E911 Address results: {json.dumps(e911_id, indent=4)}")
confirm = input(f"Is this the record you want to use? [Y/n] ") or "y"
if confirm[:1].lower() == "y":
associate_id = addy["id"]
logger.info(f"Using E911 record {associate_id}.")
break
if not e911_associate_address(inbound_did, associate_id):
logger.critical(f"Please contact support.")
return
if action == "RN": # Purge and release number
logger.critical(
f"""
This action will remove the phone number from your account.
You might not beable to retrieve the number again.
This will also remove the number from your PBX:
- Incoming Call Routes
- Outgoing Caller ID Rules
This action is non-reversable.
"""
)
confirm = input(f"Do you wish to proceed? [y/N]: ") or "n"
if confirm.lower() == "n":
logger.info(f"aborted.")
return
inbound_did = input(f"Enter inbound DID to modify: ")
remove_did(inbound_did)
# Extension Management
if action == "E":
pass
# Prompt for template to use
# Prompt for Extension data
#
# Secret menu options.
if action == "L911": # Lookup 911 Address by label
logger.info(f"Lookup action not yet defined.")
pass
if action == "I911": # Lookup address by 911_id
"""Lookup address info"""
e911_id = input(f"Enter the e911_id to lookup: ")
get_e911_info(e911_id)
def prompt_credentials(config):
"""Prompt for credentials"""
print(f"{bcolors.HEADER}DO NOT SHARE YOUR CREDENTIALS WITH ANYONE.{bcolors.ENDC}")
config["username"] = input("Enter your api.switchvoxuc.com username: ")
config["password"] = input("Enter your api.switchvoxuc.com password: ")
config["provider_username"] = input("Enter your api.nwsip.com username: ")
config["provider_password"] = input("Enter your api.nwsip.com password: ")
print(f"{bcolors.HEADER}Configure additoinal tenant credentials.{bcolors.ENDC}")
tenant = input("Enter your tenant name or enter for None: ") or None
if tenant is not None:
tenant = tenant.upper()
config["tenant"] = {tenant: {}}
config["tenant"][tenant]["username"] = input(
f"Enter your {bcolors.OKBLUE}{tenant}{bcolors.ENDC} api.switchvoxuc.com username: "
)
config["tenant"][tenant]["password"] = input(
f"Enter your {bcolors.OKBLUE}{tenant}{bcolors.ENDC} api.switchvoxuc.com password: "
)
config["tenant"][tenant]["provider_username"] = input(
f"Enter your {bcolors.OKBLUE}{tenant}{bcolors.ENDC} api.nwsip.com username: "
)
config["tenant"][tenant]["provider_password"] = input(
f"Enter your {bcolors.OKBLUE}{tenant}{bcolors.ENDC} api.nwsip.com password: "
)
return config
if __name__ == "__main__":
try:
config_data = [prompt_credentials(config_data[0])]
show_disclaimer()
disclaimer_accepted = (
input(
"Do you accept the disclaimer? You must enter 'ACCEPT' in all capitals: "
)
or disclaimer_accepted
)
if disclaimer_accepted != "ACCEPT":
logger.info(
f"The user {config_data[0]['username']} for regcode {config_data[0]['regcode']} did not accept the use and terms your rejections has been logged."
)
sys.exit(0)
set_tenant()
while True:
menu()
except KeyboardInterrupt:
logger.info(f"Good bye!")