Asterisco + AMI + Python

Honestamente, pensé durante mucho tiempo si publicar este material. Para aquellos que saben cómo trabajar con AMI Asterisk, aquí no hay nada interesante. Para aquellos que recién comienzan a hacer algo, es poco probable que comprendan mi código (aunque intenté escribir con claridad). Wang comenta como: "¿Por qué usar Habr para tus notas?". Por otro lado, el guión que se muestra debajo del corte puede ser el punto de partida de alguien. El script no hace nada excepto que envía todos los eventos desde el AMI a la consola y puede filtrarlos. Por ejemplo, muestro en la consola todas las llamadas que entran en cualquiera de los contextos "zadarma-in" o "sibseti_in". Si está interesado, pregunto debajo del corte:

Hubo una tarea en tiempo real para ver de qué troncal provenía la llamada, qué botones en ivr presionó el usuario, quién respondió la llamada, etc. Durante mucho tiempo quise intentar trabajar con AMI desde Python, antes de eso tuve una pequeña experiencia de Bash y luego organizar una llamada de regreso.

Después de hurgar con varias bibliotecas listas para usar, rápidamente me di cuenta de que ninguna de ellas me conviene. Como resultado, su propia "bicicleta" fue inventada en forma de un guión que envía toda la información de AMI a json. Se utilizan bibliotecas estándar de Python. La ventaja es que de esta forma es fácil recibir y analizar cualquier evento y no perder el enlace a una llamada específica.

El primer script imprime solo aquellos eventos que caen en cualquiera de los contextos "zadarma-in" o "sibseti_in".

Script№1
import telnetlib import json import re ## HOST = "192.168.10.10" PORT = "5038" user = "zabbix" password = "password" ## tn = telnetlib.Telnet(HOST,PORT) tn.write("Action: login".encode('ascii') + b"\n") username = "Username: " + user tn.write(username.encode('ascii') + b"\n") passWord = "Secret: " + password string_NOW = '' string_out = '' cd = 0 tn.write(passWord.encode('ascii') + b"\n\n") def telnet_for_string(string): global string_out string_out_def = '' for mes in string: try: if string[mes]['Context'] == 'zadarma-in' or string[mes]['Context'] == 'sibseti_in' or string[mes]['Context'] == 'IVR': Uniqueid = string[mes]['Uniqueid'] CallerIDNum = string[mes]['CallerIDNum'] Exten = string[mes]['Exten'] CallerIDName = string[mes]['CallerIDName'] try: Digit = string[mes]['Digit'] except KeyError: Digit = '' # if Exten == 's' or Exten == 'h': # Exten = '' # Context = string[mes]['Context'] string_out_def = json.dumps({'Uniqueid': Uniqueid, 'CallerIDNum':CallerIDNum, 'CallerIDName':CallerIDName, 'Exten':Exten, 'Digit':Digit}) # print(string_out_def) except UnboundLocalError: 1+1 except KeyError: 1+1 # print(string_out_def) if string_out_def: if string_out_def != string_out: print(string_out_def) string_out = string_out_def while True: string = '' event_string = '' elements_string = '' c = 0 read_some = tn.read_some() #    AMI string = read_some.decode('utf8', 'replace').replace('\r\n', '#') #        # # print(string) #       if not string.endswith('##'): string_NOW = string_NOW + string # print('1 --->',string_NOW) #   ,      #  ,          $, #       #, -      if string.endswith('##'): string_NOW = string_NOW + string string_NOW = string_NOW.replace('##', '$') #      $ string_NOW = string_NOW.replace('\n', '#') #    # string_NOW = string_NOW.replace('\r', '#') #    # string_NOW = string_NOW.replace('"', '') #   string_NOW = string_NOW.replace('\\', '') #    # print('string_NOW -->',string_NOW) # print() #      ..         events = re.findall(r'[AZ][\w]+:\s[^$]+', string_NOW) for event in events: c+=1 # print('event ---> ',event) event_elements = re.findall(r'[AZ][\w]+:\s[^#]+', event) #       for element in event_elements: element = '\"' + element.replace(': ', '\": "') + '\", '#    # print('element', element) elements_string = elements_string + element #   ,   # event_string = event_string + '\"' + elements_string.split(':')[1].split(',')[0].replace('"','') + '\": ' + '{' + elements_string + '}' # print(elements_string) # print(str(elements_string.split(':')[1].split(',')[0])) #      json: event_string = event_string + '\"' + str(c) + '\": ' + '{' + elements_string + '}' event_string = event_string.replace('}{', '},{') #     event_string = event_string.replace(', }', '}, ') # event_string = '{' + event_string + '}' event_string = event_string.replace('}, }', '}}') #     json,       json,      # json,         . try: parsed_string = json.loads(event_string) except json.decoder.JSONDecodeError: print('#############################################', '\n\n\n') print(event_string, '\n\n\n') print(string_NOW, '\n\n\n') print('#############################################', '\n\n\n') # print(event_string) # print(parsed_string['1']) #      "telnet_for_string",       . telnet_for_string(parsed_string) string_NOW = '' #   


Y la segunda secuencia de comandos, que escribe todos los eventos en la consola, observando ambas secuencias de comandos, queda claro qué debe cambiarse para lograr el resultado deseado. Si no está completamente claro, debe analizar json "string [mes]" en la función "def telnet_for_string (string)":

Guión número 2
 import telnetlib import time import json import re ## HOST = "192.168.10.10" PORT = "5038" user = "zabbix" password = "password" ## tn = telnetlib.Telnet(HOST,PORT) tn.write("Action: login".encode('ascii') + b"\n") username = "Username: " + user tn.write(username.encode('ascii') + b"\n") passWord = "Secret: " + password string_NOW = '' string_out = '' tn.write(passWord.encode('ascii') + b"\n\n") def telnet_for_string(string): for mes in string: print(string[mes]) while True: # time.sleep(0.1) string = '' event_string = '' elements_string = '' c = 0 read_some = tn.read_some() string = read_some.decode('utf8', 'replace').replace('\r\n', '#') # print(string) if not string.endswith('##'): string_NOW = string_NOW + string if string.endswith('##'): string_NOW = string_NOW + string string_NOW = string_NOW.replace('##', '$') string_NOW = string_NOW.replace('\n', '#') string_NOW = string_NOW.replace('\r', '#') string_NOW = string_NOW.replace('"', '') string_NOW = string_NOW.replace('\\', '') events = re.findall(r'[AZ][\w]+:\s[^$]+', string_NOW) for event in events: c+=1 event_elements = re.findall(r'[AZ][\w]+:\s[^#]+', event) for element in event_elements: element = '\"' + element.replace(': ', '\": "') + '\", ' elements_string = elements_string + element event_string = event_string + '\"' + str(c) + '\": ' + '{' + elements_string + '}' event_string = event_string.replace('}{', '},{') event_string = event_string.replace(', }', '}, ') event_string = '{' + event_string + '}' event_string = event_string.replace('}, }', '}}') try: parsed_string = json.loads(event_string) except json.decoder.JSONDecodeError: print('#############################################', '\n\n\n') print(event_string, '\n\n\n') print(string_NOW, '\n\n\n') print('#############################################', '\n\n\n') telnet_for_string(parsed_string) string_NOW = '' 

Source: https://habr.com/ru/post/es415535/


All Articles