¿Alguna vez has analizado vacantes?
Hicieron la pregunta, ¿en qué tecnologías es la demanda del mercado laboral más actual? Hace un mes? Hace un año
¿Con qué frecuencia se abren nuevas ofertas de trabajo Java en un área específica de su ciudad y qué tan activamente se cierran?
En este artículo, le diré cómo puede lograr el resultado deseado y crear un sistema de informes sobre un tema que nos interese. Vamos!
(Fuente de la imagen)Probablemente muchos de ustedes estén familiarizados e incluso hayan usado un recurso como
Headhunter.ru . Miles de nuevas vacantes en varios campos se publican diariamente en este sitio. HeadHunter también tiene una API que permite al desarrollador interactuar con los datos de este recurso.
Kit de herramientas
Usando un ejemplo simple, consideramos la construcción del proceso de obtención de datos para el sistema de informes, que se basa en trabajar con el sitio API Headhunter.ru. Como almacenamiento intermedio de información, utilizaremos el DBMS incorporado de SQLite, los datos procesados se almacenarán en la base de datos NoSQL de MongoDB, Python 3.4 como el idioma principal.
API de HHLas capacidades de la API HeadHunter son bastante amplias y están bien descritas en la documentación oficial de
GitHib . En primer lugar, esta es la capacidad de enviar solicitudes anónimas que no requieren autorización para recibir información del trabajo en formato JSON. Recientemente, se han pagado varios métodos (métodos del empleador), pero no serán considerados en esta tarea.
Cada vacante permanece en el sitio durante 30 días, luego de lo cual, si no se renueva, se archivará. Si la vacante se archivó antes del vencimiento de 30 días, el empleador la cerró .
La API HeadHunter (en lo sucesivo, la API HH) le permite recibir una serie de vacantes publicadas para cualquier fecha en los últimos 30 días, que usaremos: recopilaremos las vacantes publicadas para cada día a diario .
Implementación
- Connect SQLite DB
import sqlite3 conn_db = sqlite3.connect('hr.db', timeout=10) c = conn_db.cursor()
- Tabla para almacenar cambios en el estado del trabajo
Por conveniencia, guardaremos el historial de cambios de estado de vacante (disponibilidad por fecha) en una tabla especial de la base de datos SQLite. Gracias a la tabla vacancy_history, estaremos al tanto de la disponibilidad de vacantes en el sitio en cualquier fecha de descarga, es decir. en qué fechas estuvo activa.
c.execute(''' create table if not exists vacancy_history ( id_vacancy integer, date_load text, date_from text, date_to text )''')
- Filtrado de vacantes
Existe una limitación en el hecho de que una solicitud no puede devolver más de 2000 colecciones, y dado que se pueden publicar muchas más vacantes en el sitio en un día, coloque un filtro en el cuerpo de la solicitud, por ejemplo: vacantes solo en San Petersburgo (área = 2) , por especialización de TI (especialización = 1)
path = ("/vacancies?area=2&specialization=1&page={}&per_page={}&date_from={}&date_to={}".format(page, per_page, date_from, date_to))
- Condiciones de selección adicionales
El mercado laboral está creciendo rápidamente e incluso teniendo en cuenta el filtro, el número de vacantes puede exceder 2000, por lo que estableceremos un límite adicional en forma de un lanzamiento separado para cada día: vacantes para la primera mitad del día y vacantes para la segunda mitad del día.
def get_vacancy_history(): ... count_days = 30 hours = 0 while count_days >= 0: while hours < 24: date_from = (cur_date.replace(hour=hours, minute=0, second=0) - td(days=count_days)).strftime('%Y-%m-%dT%H:%M:%S') date_to = (cur_date.replace(hour=hours + 11, minute=59, second=59) - td(days=count_days)).strftime('%Y-%m-%dT%H:%M:%S') while count == per_page: path = ("/vacancies?area=2&specialization=1&page={} &per_page={}&date_from={}&date_to={}" .format(page, per_page, date_from, date_to)) conn.request("GET", path, headers=headers) response = conn.getresponse() vacancies = response.read() conn.close() count = len(json.loads(vacancies)['items']) ...
Primer caso de usoSupongamos que nos enfrentamos a la tarea de identificar las vacantes que se han cerrado durante un cierto intervalo de tiempo, por ejemplo, para julio de 2018. Esto se resuelve de la siguiente manera: el resultado de una simple consulta SQL a la tabla vacancy_history devolverá los datos que necesitamos, que se pueden pasar al DataFrame para su posterior análisis:
c.execute(""" select a.id_vacancy, date(a.date_load) as date_last_load, date(a.date_from) as date_publish, ifnull(a.date_next, date(a.date_load, '+1 day')) as date_close from ( select vh1.id_vacancy, vh1.date_load, vh1.date_from, min(vh2.date_load) as date_next from vacancy_history vh1 left join vacancy_history vh2 on vh1.id_vacancy = vh2.id_vacancy and vh1.date_load < vh2.date_load where date(vh1.date_load) between :date_in and :date_out group by vh1.id_vacancy, vh1.date_load, vh1.date_from ) as a where a.date_next is null """, {"date_in" : date_in, "date_out" : date_out}) date_in = dt.datetime(2018, 7, 1) date_out = dt.datetime(2018, 7, 31) closed_vacancies = get_closed_by_period(date_in, date_out) df = pd.DataFrame(closed_vacancies, columns = ['id_vacancy', 'date_last_load', 'date_publish', 'date_close']) df.head()
Obtenemos el resultado de este tipo:
Si queremos analizar usando herramientas de Excel o herramientas de BI de terceros, podemos subir la tabla vacancy_history a un archivo csv para su posterior análisis:
Artillería pesada
Pero, ¿qué pasa si necesitamos hacer un análisis de datos más complejo? Aquí la base de datos NoSQL orientada a documentos
MongoDB viene al rescate, lo que le permite almacenar datos en formato JSON.
Las acciones mencionadas anteriormente para la recolección de vacantes se lanzan diariamente, por lo que no es necesario ver todas las vacantes cada vez y recibir información detallada para cada una de ellas. Tomaremos solo los que fueron recibidos en los últimos cinco días.
- Obtener una variedad de vacantes para los últimos 5 días de una base de datos SQLite:
def get_list_of_vacancies_sql(): conn_db = sqlite3.connect('hr.db', timeout=10) conn_db.row_factory = lambda cursor, row: row[0] c = conn_db.cursor() items = c.execute(""" select distinct id_vacancy from vacancy_history where date(date_load) >= date('now', '-5 day') """).fetchall() conn_db.close() return items
- Obteniendo una variedad de trabajos para los últimos cinco días de MongoDB:
def get_list_of_vacancies_nosql(): date_load = (dt.datetime.now() - td(days=5)).strftime('%Y-%m-%d') vacancies_from_mongo = [] for item in VacancyMongo.find({"date_load" : {"$gte" : date_load}}, {"id" : 1, "_id" : 0}): vacancies_from_mongo.append(int(item['id'])) return vacancies_from_mongo
- Queda por encontrar la diferencia entre las dos matrices, para aquellas vacantes que no están en MongoDB, obtenga información detallada y escríbala en la base de datos:
sql_list = get_list_of_vacancies_sql() mongo_list = get_list_of_vacancies_nosql() vac_for_pro = [] s = set(mongo_list) vac_for_pro = [x for x in sql_list if x not in s] vac_id_chunks = [vac_for_pro[x: x + 500] for x in range(0, len(vac_for_pro), 500)]
- Entonces, tenemos una matriz con nuevas vacantes que aún no están disponibles en MongoDB, recibiremos información detallada para cada uno de ellos utilizando una solicitud en la API de HH, antes de procesarla directamente en MongoDB, procesaremos cada documento:
- Traemos la cantidad de salarios al equivalente en rublos;
- Agregar una graduación de un nivel de especialista a cada vacante (Junior / Middle / Senior, etc.)
Todo esto se implementa en la función vacancies_processing:
from nltk.stem.snowball import SnowballStemmer stemmer = SnowballStemmer("russian") def vacancies_processing(vacancies_list): cur_date = dt.datetime.now().strftime('%Y-%m-%d') for vacancy_id in vacancies_list: conn = http.client.HTTPSConnection("api.hh.ru") conn.request("GET", "/vacancies/{}".format(vacancy_id), headers=headers) response = conn.getresponse() if response.status != 404: vacancy_txt = response.read() conn.close() vacancy = json.loads(vacancy_txt)
- Obtención de información detallada accediendo a la API de HH, preprocesamiento recibido
MongoDB llevará a cabo los datos y los insertará en varios flujos, con 500 vacantes en cada uno:
t_num = 1 threads = [] for vac_id_chunk in vac_id_chunks: print('starting', t_num) t_num = t_num + 1 t = threading.Thread(target=vacancies_processing, kwargs={'vacancies_list': vac_id_chunk}) threads.append(t) t.start() for t in threads: t.join()
La colección poblada en MongoDB se parece a esto:

Algunos mas ejemplos
Con la base de datos recopilada a nuestra disposición, podemos realizar varias muestras analíticas. Entonces, sacaré a la luz las 10 vacantes mejor pagadas de desarrolladores de Python en San Petersburgo:
cursor_mongo = VacancyMongo.find({"name" : {"$regex" : ".*[pP]ython*"}}) df_mongo = pd.DataFrame(list(cursor_mongo)) del df_mongo['_id'] pd.concat([df_mongo.drop(['employer'], axis=1), df_mongo['employer'].apply(pd.Series)['name']], axis=1)[['grade', 'name', 'salary_processed' ]].sort_values('salary_processed', ascending=False)[:10]
Los 10 trabajos mejor pagados de Pythongrado | nombre | nombre | salario_procesado |
---|
senior | Jefe de Equipo Web / Arquitecto (Python / Django / React) | Investex ltd | 293901.0 |
senior | Desarrollador senior de Python en Montenegro | Betmaster | 277141.0 |
senior | Desarrollador senior de Python en Montenegro | Betmaster | 275289.0 |
medio | Desarrollador web back-end (Python) | Soshace | 250000.0 |
medio | Desarrollador web back-end (Python) | Soshace | 250000.0 |
senior | Ingeniero principal de Python para una startup suiza | Assaia International AG | 250000.0 |
medio | Desarrollador web back-end (Python) | Soshace | 250000.0 |
medio | Desarrollador web back-end (Python) | Soshace | 250000.0 |
senior | Python teamlead | Digitalhr | 230000.0 |
senior | Desarrollador principal (Python, PHP, Javascript) | GRUPO IK | 220231.0 |
Ahora veamos qué estación de metro tiene la mayor concentración de publicaciones vacantes para desarrolladores de Java. Utilizando una expresión regular, filtro por título de trabajo "Java", y también selecciono solo aquellos trabajos donde se especifica la dirección:
cursor_mongo = VacancyMongo.find({"name" : {"$regex" : ".*[jJ]ava[^sS]"}, "address" : {"$ne" : None}}) df_mongo = pd.DataFrame(list(cursor_mongo)) df_mongo['metro'] = df_mongo.apply(lambda x: x['address']['metro']['station_name'] if x['address']['metro'] is not None else None, axis = 1) df_mongo.groupby('metro')['_id'] \ .count() \ .reset_index(name='count') \ .sort_values(['count'], ascending=False) \ [:10]
Empleos para desarrolladores de Java en estaciones de metrometro | contar |
---|
Vasileostrovskaya | 87 |
Petrogradskaya | 68 |
Vyborg | 46 |
Plaza Lenin | 45 |
Gorkovskaya | 45 |
Chkalovskaya | 43 |
Narva | 32 |
Plaza del levantamiento | 29 |
Pueblo viejo | 29 |
Elizarovskaya | 27 |
Resumen
Por lo tanto, las capacidades analíticas del sistema desarrollado son realmente amplias y se pueden usar para planificar un inicio o abrir una nueva dirección de actividad.
Observo que hasta ahora solo se presenta la funcionalidad básica del sistema, en el futuro se planea desarrollar en la dirección del análisis por coordenadas geográficas y predecir la aparición de vacantes en un área particular de la ciudad.
El código fuente completo de este artículo se puede encontrar en el enlace a mi
GitHub .
PD: Los comentarios sobre el artículo son bienvenidos, estaré encantado de responder a todas sus preguntas y conocer su opinión. Gracias