星号+ AMI + Python

老实说,我考虑了很长时间才出版此材料。 对于那些知道如何使用AMI Asterisk的人来说,这里没有什么有趣的。 对于刚开始做某事的人,他们不太可能理解我的代码(尽管我试图写清楚)。 我喜欢这样的评论:“为什么要使用Habr做笔记?”。 另一方面,剪切下方显示的脚本可能是某人的起点。 该脚本除了将所有事件从AMI发送到控制台并可以对其进行过滤之外,什么也不做。 例如,我在控制台中显示属于“ zadarma-in”或“ sibseti_in”上下文中的所有调用。 如果有兴趣的话,请问下:

实时任务是查看呼叫来自哪个中继线,用户按下ivr中的哪个按钮,接听呼叫的人等等。 很长时间以来,我一直想尝试使用Python的AMI,在此之前,我对Bash有所了解,然后组织了回电。

在翻阅各种现成的库之后,我很快意识到没有一个库适合我。 结果,他以脚本的形式发明了自己的“自行车”,该脚本将所有信息从AMI发送到json。 使用标准的Python库。 优点在于,采用这种形式很容易接收和解析任何事件,并且不会丢失指向特定呼叫的链接。

第一个脚本仅打印属于“ zadarma-in”或“ sibseti_in”上下文中的任何事件。

脚本№1
import telnetlib import json import re ## HOST = "192.168.10.10" PORT = "5038" user = "zabbix" password = "password" ## tn = telnetlib.Telnet(HOST,PORT) tn.write("Action: login".encode('ascii') + b"\n") username = "Username: " + user tn.write(username.encode('ascii') + b"\n") passWord = "Secret: " + password string_NOW = '' string_out = '' cd = 0 tn.write(passWord.encode('ascii') + b"\n\n") def telnet_for_string(string): global string_out string_out_def = '' for mes in string: try: if string[mes]['Context'] == 'zadarma-in' or string[mes]['Context'] == 'sibseti_in' or string[mes]['Context'] == 'IVR': Uniqueid = string[mes]['Uniqueid'] CallerIDNum = string[mes]['CallerIDNum'] Exten = string[mes]['Exten'] CallerIDName = string[mes]['CallerIDName'] try: Digit = string[mes]['Digit'] except KeyError: Digit = '' # if Exten == 's' or Exten == 'h': # Exten = '' # Context = string[mes]['Context'] string_out_def = json.dumps({'Uniqueid': Uniqueid, 'CallerIDNum':CallerIDNum, 'CallerIDName':CallerIDName, 'Exten':Exten, 'Digit':Digit}) # print(string_out_def) except UnboundLocalError: 1+1 except KeyError: 1+1 # print(string_out_def) if string_out_def: if string_out_def != string_out: print(string_out_def) string_out = string_out_def while True: string = '' event_string = '' elements_string = '' c = 0 read_some = tn.read_some() #    AMI string = read_some.decode('utf8', 'replace').replace('\r\n', '#') #        # # print(string) #       if not string.endswith('##'): string_NOW = string_NOW + string # print('1 --->',string_NOW) #   ,      #  ,          $, #       #, -      if string.endswith('##'): string_NOW = string_NOW + string string_NOW = string_NOW.replace('##', '$') #      $ string_NOW = string_NOW.replace('\n', '#') #    # string_NOW = string_NOW.replace('\r', '#') #    # string_NOW = string_NOW.replace('"', '') #   string_NOW = string_NOW.replace('\\', '') #    # print('string_NOW -->',string_NOW) # print() #      ..         events = re.findall(r'[AZ][\w]+:\s[^$]+', string_NOW) for event in events: c+=1 # print('event ---> ',event) event_elements = re.findall(r'[AZ][\w]+:\s[^#]+', event) #       for element in event_elements: element = '\"' + element.replace(': ', '\": "') + '\", '#    # print('element', element) elements_string = elements_string + element #   ,   # event_string = event_string + '\"' + elements_string.split(':')[1].split(',')[0].replace('"','') + '\": ' + '{' + elements_string + '}' # print(elements_string) # print(str(elements_string.split(':')[1].split(',')[0])) #      json: event_string = event_string + '\"' + str(c) + '\": ' + '{' + elements_string + '}' event_string = event_string.replace('}{', '},{') #     event_string = event_string.replace(', }', '}, ') # event_string = '{' + event_string + '}' event_string = event_string.replace('}, }', '}}') #     json,       json,      # json,         . try: parsed_string = json.loads(event_string) except json.decoder.JSONDecodeError: print('#############################################', '\n\n\n') print(event_string, '\n\n\n') print(string_NOW, '\n\n\n') print('#############################################', '\n\n\n') # print(event_string) # print(parsed_string['1']) #      "telnet_for_string",       . telnet_for_string(parsed_string) string_NOW = '' #   


第二个脚本将所有事件写入控制台,同时查看这两个脚本,很清楚需要更改哪些内容才能获得所需的结果。 如果不清楚,则需要在函数“ def telnet_for_string(string)”中解析json“ string [mes]”:

脚本编号2
 import telnetlib import time import json import re ## HOST = "192.168.10.10" PORT = "5038" user = "zabbix" password = "password" ## tn = telnetlib.Telnet(HOST,PORT) tn.write("Action: login".encode('ascii') + b"\n") username = "Username: " + user tn.write(username.encode('ascii') + b"\n") passWord = "Secret: " + password string_NOW = '' string_out = '' tn.write(passWord.encode('ascii') + b"\n\n") def telnet_for_string(string): for mes in string: print(string[mes]) while True: # time.sleep(0.1) string = '' event_string = '' elements_string = '' c = 0 read_some = tn.read_some() string = read_some.decode('utf8', 'replace').replace('\r\n', '#') # print(string) if not string.endswith('##'): string_NOW = string_NOW + string if string.endswith('##'): string_NOW = string_NOW + string string_NOW = string_NOW.replace('##', '$') string_NOW = string_NOW.replace('\n', '#') string_NOW = string_NOW.replace('\r', '#') string_NOW = string_NOW.replace('"', '') string_NOW = string_NOW.replace('\\', '') events = re.findall(r'[AZ][\w]+:\s[^$]+', string_NOW) for event in events: c+=1 event_elements = re.findall(r'[AZ][\w]+:\s[^#]+', event) for element in event_elements: element = '\"' + element.replace(': ', '\": "') + '\", ' elements_string = elements_string + element event_string = event_string + '\"' + str(c) + '\": ' + '{' + elements_string + '}' event_string = event_string.replace('}{', '},{') event_string = event_string.replace(', }', '}, ') event_string = '{' + event_string + '}' event_string = event_string.replace('}, }', '}}') try: parsed_string = json.loads(event_string) except json.decoder.JSONDecodeError: print('#############################################', '\n\n\n') print(event_string, '\n\n\n') print(string_NOW, '\n\n\n') print('#############################################', '\n\n\n') telnet_for_string(parsed_string) string_NOW = '' 

Source: https://habr.com/ru/post/zh-CN415535/


All Articles