MongoDB y la investigación de mercado de trabajo de TI

¿Alguna vez has analizado vacantes?

Hicieron la pregunta, ¿en qué tecnologías es la demanda del mercado laboral más actual? Hace un mes? Hace un año

¿Con qué frecuencia se abren nuevas ofertas de trabajo Java en un área específica de su ciudad y qué tan activamente se cierran?

En este artículo, le diré cómo puede lograr el resultado deseado y crear un sistema de informes sobre un tema que nos interese. Vamos!


(Fuente de la imagen)

La elección recayó en Headhunter.ru


Probablemente muchos de ustedes estén familiarizados e incluso hayan usado un recurso como Headhunter.ru . Miles de nuevas vacantes en varios campos se publican diariamente en este sitio. HeadHunter también tiene una API que permite al desarrollador interactuar con los datos de este recurso.

Kit de herramientas


Usando un ejemplo simple, consideramos la construcción del proceso de obtención de datos para el sistema de informes, que se basa en trabajar con el sitio API Headhunter.ru. Como almacenamiento intermedio de información, utilizaremos el DBMS incorporado de SQLite, los datos procesados ​​se almacenarán en la base de datos NoSQL de MongoDB, Python 3.4 como el idioma principal.

API de HH
Las capacidades de la API HeadHunter son bastante amplias y están bien descritas en la documentación oficial de GitHib . En primer lugar, esta es la capacidad de enviar solicitudes anónimas que no requieren autorización para recibir información del trabajo en formato JSON. Recientemente, se han pagado varios métodos (métodos del empleador), pero no serán considerados en esta tarea.

Cada vacante permanece en el sitio durante 30 días, luego de lo cual, si no se renueva, se archivará. Si la vacante se archivó antes del vencimiento de 30 días, el empleador la cerró .

La API HeadHunter (en lo sucesivo, la API HH) le permite recibir una serie de vacantes publicadas para cualquier fecha en los últimos 30 días, que usaremos: recopilaremos las vacantes publicadas para cada día a diario .

Implementación


  • Connect SQLite DB

    import sqlite3 conn_db = sqlite3.connect('hr.db', timeout=10) c = conn_db.cursor() 
  • Tabla para almacenar cambios en el estado del trabajo
    Por conveniencia, guardaremos el historial de cambios de estado de vacante (disponibilidad por fecha) en una tabla especial de la base de datos SQLite. Gracias a la tabla vacancy_history, estaremos al tanto de la disponibilidad de vacantes en el sitio en cualquier fecha de descarga, es decir. en qué fechas estuvo activa.

     c.execute(''' create table if not exists vacancy_history ( id_vacancy integer, date_load text, date_from text, date_to text )''') 
  • Filtrado de vacantes
    Existe una limitación en el hecho de que una solicitud no puede devolver más de 2000 colecciones, y dado que se pueden publicar muchas más vacantes en el sitio en un día, coloque un filtro en el cuerpo de la solicitud, por ejemplo: vacantes solo en San Petersburgo (área = 2) , por especialización de TI (especialización = 1)

     path = ("/vacancies?area=2&specialization=1&page={}&per_page={}&date_from={}&date_to={}".format(page, per_page, date_from, date_to)) 
  • Condiciones de selección adicionales
    El mercado laboral está creciendo rápidamente e incluso teniendo en cuenta el filtro, el número de vacantes puede exceder 2000, por lo que estableceremos un límite adicional en forma de un lanzamiento separado para cada día: vacantes para la primera mitad del día y vacantes para la segunda mitad del día.

     def get_vacancy_history(): ... count_days = 30 hours = 0 while count_days >= 0: while hours < 24: date_from = (cur_date.replace(hour=hours, minute=0, second=0) - td(days=count_days)).strftime('%Y-%m-%dT%H:%M:%S') date_to = (cur_date.replace(hour=hours + 11, minute=59, second=59) - td(days=count_days)).strftime('%Y-%m-%dT%H:%M:%S') while count == per_page: path = ("/vacancies?area=2&specialization=1&page={} &per_page={}&date_from={}&date_to={}" .format(page, per_page, date_from, date_to)) conn.request("GET", path, headers=headers) response = conn.getresponse() vacancies = response.read() conn.close() count = len(json.loads(vacancies)['items']) ... #     try: c.executemany('INSERT INTO vacancy_history VALUES (?,?,?,?)', collection_for_ins) except sqlite3.DatabaseError as err: print("Error: ", err) else: conn_db.commit() if collection_for_ins: page = page + 1 total = total + count #   del(collection_for_ins[:]) hours = hours + 12 count_days = count_days - 1 hours = 0 


Primer caso de uso
Supongamos que nos enfrentamos a la tarea de identificar las vacantes que se han cerrado durante un cierto intervalo de tiempo, por ejemplo, para julio de 2018. Esto se resuelve de la siguiente manera: el resultado de una simple consulta SQL a la tabla vacancy_history devolverá los datos que necesitamos, que se pueden pasar al DataFrame para su posterior análisis:

  c.execute(""" select a.id_vacancy, date(a.date_load) as date_last_load, date(a.date_from) as date_publish, ifnull(a.date_next, date(a.date_load, '+1 day')) as date_close from ( select vh1.id_vacancy, vh1.date_load, vh1.date_from, min(vh2.date_load) as date_next from vacancy_history vh1 left join vacancy_history vh2 on vh1.id_vacancy = vh2.id_vacancy and vh1.date_load < vh2.date_load where date(vh1.date_load) between :date_in and :date_out group by vh1.id_vacancy, vh1.date_load, vh1.date_from ) as a where a.date_next is null """, {"date_in" : date_in, "date_out" : date_out}) date_in = dt.datetime(2018, 7, 1) date_out = dt.datetime(2018, 7, 31) closed_vacancies = get_closed_by_period(date_in, date_out) df = pd.DataFrame(closed_vacancies, columns = ['id_vacancy', 'date_last_load', 'date_publish', 'date_close']) df.head() 

Obtenemos el resultado de este tipo:
id_vacancydate_last_loadfecha_publicaciónfecha_close
0 0181266972018-07-092018-07-092018-07-10
1181551212018-07-092018-06-192018-07-10
2188816052018-07-092018-07-022018-07-10
3196207832018-07-092018-06-272018-07-10
4 4196961882018-07-092018-06-152018-07-10
Si queremos analizar usando herramientas de Excel o herramientas de BI de terceros, podemos subir la tabla vacancy_history a un archivo csv para su posterior análisis:

 #       CSV data = c.execute('select * from vacancy_history') with open('vacancy_history.csv','w', newline='') as out_csv_file: csv_out = csv.writer(out_csv_file) csv_out.writerow(d[0] for d in data.description) csv_out.writerows(data.fetchall()) conn_db.close() 

Artillería pesada


Pero, ¿qué pasa si necesitamos hacer un análisis de datos más complejo? Aquí la base de datos NoSQL orientada a documentos MongoDB viene al rescate, lo que le permite almacenar datos en formato JSON.

  • Se implementa una demostración de mi base de datos MongoDB en el servicio en la nube mLab , que le permite crear una base de datos de hasta 500 MB de forma gratuita, que es suficiente para analizar la tarea actual. La base de datos hr_db tiene una colección de vacantes, a la que estableceremos una conexión:

     #    Mongo from pymongo import MongoClient from pymongo import ASCENDING from pymongo import errors client = MongoClient('mongodb://<db_user>:<dbpassword>@ds115219.mlab.com:15219/hr_db') db = client.hr_db VacancyMongo = db.Vacancy 
  • Vale la pena señalar que el nivel salarial no siempre se indica en rublos, por lo tanto, para el análisis es necesario llevar todos los valores al rublo equivalente. Para hacer esto, bombeamos una colección de diccionarios utilizando la API HH, que contiene información sobre el tipo de cambio para la fecha actual:

     #   def get_dictionaries(): conn = http.client.HTTPSConnection("api.hh.ru") conn.request("GET", "https://api.hh.ru/dictionaries", headers=headers) response = conn.getresponse() if response.status != 200: conn.close() conn = http.client.HTTPSConnection("api.hh.ru") conn.request("GET", "https://api.hh.ru/dictionaries", headers=headers) response = conn.getresponse() dictionaries = response.read() dictionaries_json = json.loads(dictionaries) return dictionaries_json 
  • Rellenar el diccionario con monedas con los tipos de cambio actuales:

     hh_dictionary = get_dictionaries() currencies = hh_dictionary['currency'] currency_rates = {} for currency in currencies: currency_rates[currency['code']] = currency['rate'] 

Las acciones mencionadas anteriormente para la recolección de vacantes se lanzan diariamente, por lo que no es necesario ver todas las vacantes cada vez y recibir información detallada para cada una de ellas. Tomaremos solo los que fueron recibidos en los últimos cinco días.
  • Obtener una variedad de vacantes para los últimos 5 días de una base de datos SQLite:

     def get_list_of_vacancies_sql(): conn_db = sqlite3.connect('hr.db', timeout=10) conn_db.row_factory = lambda cursor, row: row[0] c = conn_db.cursor() items = c.execute(""" select distinct id_vacancy from vacancy_history where date(date_load) >= date('now', '-5 day') """).fetchall() conn_db.close() return items 
  • Obteniendo una variedad de trabajos para los últimos cinco días de MongoDB:

     def get_list_of_vacancies_nosql(): date_load = (dt.datetime.now() - td(days=5)).strftime('%Y-%m-%d') vacancies_from_mongo = [] for item in VacancyMongo.find({"date_load" : {"$gte" : date_load}}, {"id" : 1, "_id" : 0}): vacancies_from_mongo.append(int(item['id'])) return vacancies_from_mongo 
  • Queda por encontrar la diferencia entre las dos matrices, para aquellas vacantes que no están en MongoDB, obtenga información detallada y escríbala en la base de datos:

     sql_list = get_list_of_vacancies_sql() mongo_list = get_list_of_vacancies_nosql() vac_for_pro = [] s = set(mongo_list) vac_for_pro = [x for x in sql_list if x not in s] vac_id_chunks = [vac_for_pro[x: x + 500] for x in range(0, len(vac_for_pro), 500)] 
  • Entonces, tenemos una matriz con nuevas vacantes que aún no están disponibles en MongoDB, recibiremos información detallada para cada uno de ellos utilizando una solicitud en la API de HH, antes de procesarla directamente en MongoDB, procesaremos cada documento:
    1. Traemos la cantidad de salarios al equivalente en rublos;
    2. Agregar una graduación de un nivel de especialista a cada vacante (Junior / Middle / Senior, etc.)

    Todo esto se implementa en la función vacancies_processing:

     from nltk.stem.snowball import SnowballStemmer stemmer = SnowballStemmer("russian") def vacancies_processing(vacancies_list): cur_date = dt.datetime.now().strftime('%Y-%m-%d') for vacancy_id in vacancies_list: conn = http.client.HTTPSConnection("api.hh.ru") conn.request("GET", "/vacancies/{}".format(vacancy_id), headers=headers) response = conn.getresponse() if response.status != 404: vacancy_txt = response.read() conn.close() vacancy = json.loads(vacancy_txt) # salary salary = None if 'salary' in vacancy: if vacancy['salary'] != None: ... max_salary = 500000 if salary is not None: salary = int(salary) if salary >= max_salary: salary = max_salary # grade grade = None if 'name' in vacancy: p_grade = '' title = re.sub(u'[^a-z-]+', ' ', vacancy['name'].lower(), re.UNICODE) words = re.split(r'\s{1,}', title.strip()) for title_word in words: title_word = stemmer.stem(title_word) if len(title_word.strip()) > 1: p_grade = p_grade + " " + title_word.strip() if re.search('()|(princip)', p_grade): grade = 'principal' elif re.search('()|(senior)|([f|F]ull)', p_grade): grade = 'senior' ... else: grade = 'not specify' vacancy['salary_processed'] = salary vacancy['date_load'] = cur_date vacancy['grade'] = grade vacancy.pop('branded_description', None) try: post_id = VacancyMongo.insert_one(vacancy) except errors.DuplicateKeyError: print ('Cant insert the duplicate vacancy_id:', vacancy['id']) 
  • Obtención de información detallada accediendo a la API de HH, preprocesamiento recibido
    MongoDB llevará a cabo los datos y los insertará en varios flujos, con 500 vacantes en cada uno:

     t_num = 1 threads = [] for vac_id_chunk in vac_id_chunks: print('starting', t_num) t_num = t_num + 1 t = threading.Thread(target=vacancies_processing, kwargs={'vacancies_list': vac_id_chunk}) threads.append(t) t.start() for t in threads: t.join() 


La colección poblada en MongoDB se parece a esto:



Algunos mas ejemplos


Con la base de datos recopilada a nuestra disposición, podemos realizar varias muestras analíticas. Entonces, sacaré a la luz las 10 vacantes mejor pagadas de desarrolladores de Python en San Petersburgo:

 cursor_mongo = VacancyMongo.find({"name" : {"$regex" : ".*[pP]ython*"}}) df_mongo = pd.DataFrame(list(cursor_mongo)) del df_mongo['_id'] pd.concat([df_mongo.drop(['employer'], axis=1), df_mongo['employer'].apply(pd.Series)['name']], axis=1)[['grade', 'name', 'salary_processed' ]].sort_values('salary_processed', ascending=False)[:10] 

Los 10 trabajos mejor pagados de Python
gradonombrenombresalario_procesado
seniorJefe de Equipo Web / Arquitecto (Python / Django / React)Investex ltd293901.0
seniorDesarrollador senior de Python en MontenegroBetmaster277141.0
seniorDesarrollador senior de Python en MontenegroBetmaster275289.0
medioDesarrollador web back-end (Python)Soshace250000.0
medioDesarrollador web back-end (Python)Soshace250000.0
seniorIngeniero principal de Python para una startup suizaAssaia International AG250000.0
medioDesarrollador web back-end (Python)Soshace250000.0
medioDesarrollador web back-end (Python)Soshace250000.0
seniorPython teamleadDigitalhr230000.0
seniorDesarrollador principal (Python, PHP, Javascript)GRUPO IK220231.0



Ahora veamos qué estación de metro tiene la mayor concentración de publicaciones vacantes para desarrolladores de Java. Utilizando una expresión regular, filtro por título de trabajo "Java", y también selecciono solo aquellos trabajos donde se especifica la dirección:

 cursor_mongo = VacancyMongo.find({"name" : {"$regex" : ".*[jJ]ava[^sS]"}, "address" : {"$ne" : None}}) df_mongo = pd.DataFrame(list(cursor_mongo)) df_mongo['metro'] = df_mongo.apply(lambda x: x['address']['metro']['station_name'] if x['address']['metro'] is not None else None, axis = 1) df_mongo.groupby('metro')['_id'] \ .count() \ .reset_index(name='count') \ .sort_values(['count'], ascending=False) \ [:10] 

Empleos para desarrolladores de Java en estaciones de metro
metrocontar
Vasileostrovskaya87
Petrogradskaya68
Vyborg46
Plaza Lenin45
Gorkovskaya45
Chkalovskaya43
Narva32
Plaza del levantamiento29
Pueblo viejo29
Elizarovskaya27


Resumen


Por lo tanto, las capacidades analíticas del sistema desarrollado son realmente amplias y se pueden usar para planificar un inicio o abrir una nueva dirección de actividad.

Observo que hasta ahora solo se presenta la funcionalidad básica del sistema, en el futuro se planea desarrollar en la dirección del análisis por coordenadas geográficas y predecir la aparición de vacantes en un área particular de la ciudad.

El código fuente completo de este artículo se puede encontrar en el enlace a mi GitHub .

PD: Los comentarios sobre el artículo son bienvenidos, estaré encantado de responder a todas sus preguntas y conocer su opinión. Gracias

Source: https://habr.com/ru/post/es422627/


All Articles