MongoDB和IT就业市场研究

您是否分析过空缺?

他们问了一个问题,劳动力市场最新需求是哪种技术? 一个月前? 一年前?

新的Java职位空缺多长时间在您所在城市的特定区域开放一次,关闭的积极程度如何?

在本文中,我将告诉您如何获得期望的结果并针对我们感兴趣的主题构建报告系统。 走吧


(图片来源)

选择落在Headhunter.ru上


也许你们中的许多人都很熟悉,甚至使用了Headhunter.ru这样的资源。 在该站点上,每天都会发布数千个各个领域的新职位空缺。 HeadHunter还具有一个API,允许开发人员与该资源的数据进行交互。

工具包


通过一个简单的示例,我们考虑基于API网站Headhunter.ru的工作,为报告系统获取数据的过程的构建。 作为信息的中间存储,我们将使用嵌入式SQLite DBMS,处理后的数据将存储在MongoDB的NoSQL数据库(Python 3.4作为主要语言)中。

HH API
HeadHunter API的功能非常广泛,并且在GitHib的官方文档中对此进行了很好的描述。 首先,这是一种发送匿名请求的功能,该请求不需要授权即可接收JSON格式的工作信息。 最近,许多方法(雇主方法)已付清,但在本任务中将不予考虑。

每个空缺都将在网站上挂起30天,之后,如果不更新,将被存档。 如果空缺是在30天到期之前存档的,那么雇主将其关闭

HeadHunter API(以下称为HH API)使您可以接收过去30天内任何日期的一系列已发布职位空缺,我们将使用这些空缺-我们将每天收集每天发布的职位空缺。

实作


  • 连接SQLite数据库

    import sqlite3 conn_db = sqlite3.connect('hr.db', timeout=10) c = conn_db.cursor() 
  • 用于存储作业状态更改的表
    为了方便起见,我们将空缺状态更改的历史记录(按日期提供)保存在SQLite数据库的特殊表中。 多亏了vacancy_history我们可以知道在任何卸载日期(即 她活跃的日期。

     c.execute(''' create table if not exists vacancy_history ( id_vacancy integer, date_load text, date_from text, date_to text )''') 
  • 空缺过滤
    有一个限制,即一个请求不能返回超过2000个集合,并且由于一天之内网站上可以发布更多的空缺,我们将在请求正文中放置一个过滤器,例如:仅在圣彼得堡的空缺(区域= 2) ,通过IT专业化(专业化= 1)

     path = ("/vacancies?area=2&specialization=1&page={}&per_page={}&date_from={}&date_to={}".format(page, per_page, date_from, date_to)) 
  • 附加选择条件
    劳动力市场正在快速增长,甚至考虑到筛选条件,职位空缺的数量可能会超过2000,因此我们将以每天单独发布的形式设置一个额外的限制:每天上半年的空缺和一天后半部分的空缺

     def get_vacancy_history(): ... count_days = 30 hours = 0 while count_days >= 0: while hours < 24: date_from = (cur_date.replace(hour=hours, minute=0, second=0) - td(days=count_days)).strftime('%Y-%m-%dT%H:%M:%S') date_to = (cur_date.replace(hour=hours + 11, minute=59, second=59) - td(days=count_days)).strftime('%Y-%m-%dT%H:%M:%S') while count == per_page: path = ("/vacancies?area=2&specialization=1&page={} &per_page={}&date_from={}&date_to={}" .format(page, per_page, date_from, date_to)) conn.request("GET", path, headers=headers) response = conn.getresponse() vacancies = response.read() conn.close() count = len(json.loads(vacancies)['items']) ... #     try: c.executemany('INSERT INTO vacancy_history VALUES (?,?,?,?)', collection_for_ins) except sqlite3.DatabaseError as err: print("Error: ", err) else: conn_db.commit() if collection_for_ins: page = page + 1 total = total + count #   del(collection_for_ins[:]) hours = hours + 12 count_days = count_days - 1 hours = 0 


第一个用例
假设我们面临的任务是确定在特定时间间隔(例如2018年7月)内已关闭的空缺。 可以通过以下方法解决:对vacancy_history表进行简单SQL查询的结果将返回我们需要的数据,可以将其传递到DataFrame进行进一步分析:

  c.execute(""" select a.id_vacancy, date(a.date_load) as date_last_load, date(a.date_from) as date_publish, ifnull(a.date_next, date(a.date_load, '+1 day')) as date_close from ( select vh1.id_vacancy, vh1.date_load, vh1.date_from, min(vh2.date_load) as date_next from vacancy_history vh1 left join vacancy_history vh2 on vh1.id_vacancy = vh2.id_vacancy and vh1.date_load < vh2.date_load where date(vh1.date_load) between :date_in and :date_out group by vh1.id_vacancy, vh1.date_load, vh1.date_from ) as a where a.date_next is null """, {"date_in" : date_in, "date_out" : date_out}) date_in = dt.datetime(2018, 7, 1) date_out = dt.datetime(2018, 7, 31) closed_vacancies = get_closed_by_period(date_in, date_out) df = pd.DataFrame(closed_vacancies, columns = ['id_vacancy', 'date_last_load', 'date_publish', 'date_close']) df.head() 

我们得到这种类型的结果:
id_vacancydate_last_loaddate_publishdate_close
0181266972018-07-092018-07-092018-07-10
1个181551212018-07-092018-06-192018-07-10
2188816052018-07-092018-07-022018-07-10
3196207832018-07-092018-06-272018-07-10
4196961882018-07-092018-06-152018-07-10
如果要使用Excel工具或第三方BI工具进行分析,可以将vacancy_history表上传到csv文件中以进行进一步分析:

 #       CSV data = c.execute('select * from vacancy_history') with open('vacancy_history.csv','w', newline='') as out_csv_file: csv_out = csv.writer(out_csv_file) csv_out.writerow(d[0] for d in data.description) csv_out.writerows(data.fetchall()) conn_db.close() 

重型火炮


但是,如果我们需要进行更复杂的数据分析怎么办? 在这里, MongoDB面向文档的NoSQL数据库得以解决,它允许您以JSON格式存储数据。

  • 我的MongoDB数据库演示已部署在mLab云服务中,该演示使您可以免费创建最大500MB的数据库,足以解析当前任务。 hr_db数据库具有一个Vacancy集合,我们将与之建立连接:

     #    Mongo from pymongo import MongoClient from pymongo import ASCENDING from pymongo import errors client = MongoClient('mongodb://<db_user>:<dbpassword>@ds115219.mlab.com:15219/hr_db') db = client.hr_db VacancyMongo = db.Vacancy 
  • 值得注意的是,薪水水平并不总是以卢布表示,因此,为了进行分析,有必要将所有值都设为卢布等值。 为此,我们使用HH API抽出词典集合,其中包含有关当前日期汇率的信息:

     #   def get_dictionaries(): conn = http.client.HTTPSConnection("api.hh.ru") conn.request("GET", "https://api.hh.ru/dictionaries", headers=headers) response = conn.getresponse() if response.status != 200: conn.close() conn = http.client.HTTPSConnection("api.hh.ru") conn.request("GET", "https://api.hh.ru/dictionaries", headers=headers) response = conn.getresponse() dictionaries = response.read() dictionaries_json = json.loads(dictionaries) return dictionaries_json 
  • 用当前汇率的货币填充字典:

     hh_dictionary = get_dictionaries() currencies = hh_dictionary['currency'] currency_rates = {} for currency in currencies: currency_rates[currency['code']] = currency['rate'] 

上述收集空缺的操作每天都会启动,因此无需每次都查看所有空缺并接收每个空缺的详细信息。 我们将只接受最近五天内收到的邮件。
  • 从SQLite数据库获取最近5天的职位空缺:

     def get_list_of_vacancies_sql(): conn_db = sqlite3.connect('hr.db', timeout=10) conn_db.row_factory = lambda cursor, row: row[0] c = conn_db.cursor() items = c.execute(""" select distinct id_vacancy from vacancy_history where date(date_load) >= date('now', '-5 day') """).fetchall() conn_db.close() return items 
  • 从MongoDB获得最近五天的一系列工作:

     def get_list_of_vacancies_nosql(): date_load = (dt.datetime.now() - td(days=5)).strftime('%Y-%m-%d') vacancies_from_mongo = [] for item in VacancyMongo.find({"date_load" : {"$gte" : date_load}}, {"id" : 1, "_id" : 0}): vacancies_from_mongo.append(int(item['id'])) return vacancies_from_mongo 
  • 对于两个不在MongoDB中的空缺,要找到两个阵列之间的区别还有待获得详细信息并将其写入数据库:

     sql_list = get_list_of_vacancies_sql() mongo_list = get_list_of_vacancies_nosql() vac_for_pro = [] s = set(mongo_list) vac_for_pro = [x for x in sql_list if x not in s] vac_id_chunks = [vac_for_pro[x: x + 500] for x in range(0, len(vac_for_pro), 500)] 
  • 因此,我们有一个数组,其中包含在MongoDB中尚不可用的新空缺,对于每个空缺,我们将使用HH API中的请求接收详细信息,然后将其直接处理到MongoDB中,我们将处理每个文档:
    1. 我们把工资额提高到卢布的等值;
    2. 为每个职位空缺增加一个专业级别的毕业(初中/中/高级等)

    所有这些都在vacancies_processing函数中实现:

     from nltk.stem.snowball import SnowballStemmer stemmer = SnowballStemmer("russian") def vacancies_processing(vacancies_list): cur_date = dt.datetime.now().strftime('%Y-%m-%d') for vacancy_id in vacancies_list: conn = http.client.HTTPSConnection("api.hh.ru") conn.request("GET", "/vacancies/{}".format(vacancy_id), headers=headers) response = conn.getresponse() if response.status != 404: vacancy_txt = response.read() conn.close() vacancy = json.loads(vacancy_txt) # salary salary = None if 'salary' in vacancy: if vacancy['salary'] != None: ... max_salary = 500000 if salary is not None: salary = int(salary) if salary >= max_salary: salary = max_salary # grade grade = None if 'name' in vacancy: p_grade = '' title = re.sub(u'[^a-z-]+', ' ', vacancy['name'].lower(), re.UNICODE) words = re.split(r'\s{1,}', title.strip()) for title_word in words: title_word = stemmer.stem(title_word) if len(title_word.strip()) > 1: p_grade = p_grade + " " + title_word.strip() if re.search('()|(princip)', p_grade): grade = 'principal' elif re.search('()|(senior)|([f|F]ull)', p_grade): grade = 'senior' ... else: grade = 'not specify' vacancy['salary_processed'] = salary vacancy['date_load'] = cur_date vacancy['grade'] = grade vacancy.pop('branded_description', None) try: post_id = VacancyMongo.insert_one(vacancy) except errors.DuplicateKeyError: print ('Cant insert the duplicate vacancy_id:', vacancy['id']) 
  • 通过访问HH API获取详细信息,已进行预处理
    MongoDB将执行数据并将其插入到多个流中,每个流中都有500个空缺:

     t_num = 1 threads = [] for vac_id_chunk in vac_id_chunks: print('starting', t_num) t_num = t_num + 1 t = threading.Thread(target=vacancies_processing, kwargs={'vacancies_list': vac_id_chunk}) threads.append(t) t.start() for t in threads: t.join() 


MongoDB中填充的集合如下所示:



更多例子


拥有收集的数据库供我们使用时,我们可以执行各种分析样本。 因此,我将介绍圣彼得堡Python开发人员收入最高的10个职位:

 cursor_mongo = VacancyMongo.find({"name" : {"$regex" : ".*[pP]ython*"}}) df_mongo = pd.DataFrame(list(cursor_mongo)) del df_mongo['_id'] pd.concat([df_mongo.drop(['employer'], axis=1), df_mongo['employer'].apply(pd.Series)['name']], axis=1)[['grade', 'name', 'salary_processed' ]].sort_values('salary_processed', ascending=False)[:10] 

Python收入最高的10大工作
等级已处理薪水
资深的网络团队负责人/架构师(Python / Django / React)景顺有限公司293901.0
资深的黑山共和国的高级Python开发人员赌王277141.0
资深的黑山共和国的高级Python开发人员赌王275289.0
中间的后端Web开发人员(Python)Soshace250000.0
中间的后端Web开发人员(Python)Soshace250000.0
资深的瑞士初创公司首席Python工程师亚述国际公司250000.0
中间的后端Web开发人员(Python)Soshace250000.0
中间的后端Web开发人员(Python)Soshace250000.0
资深的Python团队负责人Digitalhr230000.0
资深的首席开发人员(Python,PHP,Javascript)IK集团220231.0



现在,让我们找出哪个地铁站的Java开发人员空缺职位集中度最高。 使用正则表达式,按职位名称“ Java”过滤,还仅选择指定了地址的那些职位:

 cursor_mongo = VacancyMongo.find({"name" : {"$regex" : ".*[jJ]ava[^sS]"}, "address" : {"$ne" : None}}) df_mongo = pd.DataFrame(list(cursor_mongo)) df_mongo['metro'] = df_mongo.apply(lambda x: x['address']['metro']['station_name'] if x['address']['metro'] is not None else None, axis = 1) df_mongo.groupby('metro')['_id'] \ .count() \ .reset_index(name='count') \ .sort_values(['count'], ascending=False) \ [:10] 

地铁站中Java开发人员的工作
地铁
Vasileostrovskaya87
彼得格勒68
维堡46
列宁广场45
戈尔科夫斯卡娅45
Chkalovskaya43
纳尔瓦32
起义广场29日
老村落29日
Elizarovskaya27


总结


因此,已开发系统的分析功能确实很广泛,可用于计划启动或打开新的活动方向。

我注意到,到目前为止,仅介绍了该系统的基本功能,将来计划在地理坐标分析和预测城市特定区域的空缺出现的方向上进行开发。

可以在指向我的GitHub的链接上找到本文的完整源代码。

言:欢迎对本文发表评论,我将很高兴回答您的所有问题并提出您的意见。 谢谢你

Source: https://habr.com/ru/post/zh-CN422627/


All Articles