CompanyNameMatch
CompanyNameMatch copied to clipboard
公司、企业名称模糊匹配,基于词频的公司名主体提取,基于编辑距离的匹配度
公司名称匹配
算法概述:基于jieba分词、数百万家企业名称分词后词频统计,提取公司名称中的各类型信息,并依据业务需求计算匹配分值
流程图示意:
-
分析
a.公司名称主体必须完全一致;
b.地区信息可以有部分省略或缺失,但不可以错误;
c.有分公司信息的,必须全额匹配;
d.其余信息可接受微弱差异
-
算法释义
a.基于编辑距离的文本相似度
b.基于jieba分词的文本切分及词性分析
c.单字聚合算法
d.基于词频统计的主体、附加信息提取;根据历史数百万公司名称统计出的词频:下载
e.通过主体、附加、地区、分支信息综合计算匹配度
分析
a.公司名称主体必须完全一致;
b.地区信息可以有部分省略或缺失,但不可以错误;
c.有分公司信息的,必须全额匹配;
d.其余信息可接受微弱差异
算法释义
a.基于编辑距离的文本相似度
class StringMatcher:
def _reset_cache(self):
self._ratio = self._distance = None
self._opcodes = self._editops = self._matching_blocks = None
def __init__(self, seq1='', seq2=''):
self._str1, self._str2 = seq1, seq2
self._reset_cache()
def set_seqs(self, seq1, seq2):
self._str1, self._str2 = seq1, seq2
self._reset_cache()
def set_seq1(self, seq1):
self._str1 = seq1
self._reset_cache()
def set_seq2(self, seq2):
self._str2 = seq2
self._reset_cache()
def get_opcodes(self):
if not self._opcodes:
if self._editops:
self._opcodes = opcodes(self._editops, self._str1, self._str2)
else:
self._opcodes = opcodes(self._str1, self._str2)
return self._opcodes
def get_editops(self):
if not self._editops:
if self._opcodes:
self._editops = editops(self._opcodes, self._str1, self._str2)
else:
self._editops = editops(self._str1, self._str2)
return self._editops
def get_matching_blocks(self):
if not self._matching_blocks:
self._matching_blocks = matching_blocks(self.get_opcodes(),
self._str1, self._str2)
return self._matching_blocks
def ratio(self):
if not self._ratio:
self._ratio = ratio(self._str1, self._str2)
return self._ratio
//启用长度比较时,如果长度不一致,给予惩罚
def partial_ratio(self, use_length=False, mismatch_length_point=0.2):
blocks = self.get_matching_blocks()
scores = []
len1, len2 = len(self._str1), len(self._str2)
# len_ratio = 2 * min(len1, len2) / (len1 + len2) if len1 and len2 else 0
for block in blocks:
long_start = block[1] - block[0] if (block[1] - block[0]) > 0 else 0
long_end = long_start + len(self._str1)
long_substr = self._str2[long_start:long_end]
m2 = StringMatcher(self._str1, long_substr)
r = m2.ratio()
if use_length:
scores.append(r - mismatch_length_point)
else:
scores.append(r)
return max(scores)
b.基于jieba分词的文本切分及词性分析
def normalize(name, branch_words, re_chars):
if name:
//清洗各类标点符号
score = re.sub(r"[%s]+" % re_chars, '', name)
//jieba分词,不适用HMM模式,不认识的词统一切分为单字
score = pseg.cut(score, HMM=False)
words = []
ns = []
branch = []
for x, flag in score:
if x in branch_words:
//判定是否为分支机构类信息
branch.append(x)
elif flag == 'ns':
//判定是否为地区信息
ns.append(x)
else:
words.append(x)
words = join_char(words)
ns = "".join(ns)
branch = "".join(branch)
return words, ns, branch
return [], "", ""
c.单字聚合算法
def join_char(words_array):
//结果存储列表
score = []
//临时列表
temp = []
for word in words_array:
//非单字
if len(word) > 1:
//判断临时列表是否有值,有的话将所有单字拼接并储值至结果列表,并将临时列表清空
if temp:
score.append("".join(temp))
temp = []
//再存储非单字
score.append(word)
else:
//单字直接存入临时列表
temp.append(word)
return score
d.基于词频统计的主体、附加信息提取
def get_main_sub(string_array, weight_sort_amount):
if string_array and len(string_array) >= weight_sort_amount:
weights = [weight.get(x, 0) for x in string_array]
index = list(map(weights.index, heapq.nsmallest(weight_sort_amount, weights)))[0]
return string_array[index], "".join([x for i, x in enumerate(string_array) if i != index])
elif string_array:
weights = [weight.get(x, 0) for x in string_array]
index = weights.index(min(weights))
return string_array[index], "".join([x for i, x in enumerate(string_array) if i != index])
return "", ""
e.通过主体、附加、地区、分支信息综合计算匹配度
def match_branch(branch, compare_branch, error_branch):
//分支机构信息必须全量匹配
if branch == compare_branch:
return 0
else:
return error_branch
def match_area(area, compare_area, null_area, error_area):
if area == compare_area:
return 0
//当地区信息缺失,或为子串时,扣减字段缺失的分值
elif not area or not compare_area or area in compare_area or compare_area in area:
return null_area
else:
return error_area
def match_info(words_array, compare_array, mismatch_main, miss_field, weight_sort_amount):
main, other = get_main_sub(words_array, weight_sort_amount)
main_, other_ = get_main_sub(compare_array, weight_sort_amount)
ratio = 1 if main == main_ else mismatch_main
//当附加信息缺失,扣减字段缺失的分值
if not other or not other_:
return ratio - miss_field
//使用最长相似子串算法,对于长度不一致的,给予少量扣分惩罚
elif len(other) <= len(other_):
return ratio - (1 - StringMatcher(other, other_).partial_ratio(True))
else:
return ratio - (1 - StringMatcher(other_, other).partial_ratio(True))
def match(name, compare_array, branch_words=('分公司', '分支公司', '支公司', '分店', '分会', '分院', '分部', '分校'),
re_chars='~`!#$%^&*()_+-/|\';":/.,?><br~·!@#¥%……&*()——:-=“:’;、。,?\n 》《{}', weight_sort_amount=2,
mismatch_main=0.3, miss_field=0.1, error_area=0.5, error_branch=0.7):
score = {}
//提取主体、地区、分支信息
name_, area, branch = normalize(name, branch_words, re_chars)
for i, compare in enumerate(compare_array):
if name == compare:
score[i] = 1
else:
//提取主体、地区、分支信息
compare_name, compare_area, compare_branch = normalize(compare, branch_words, re_chars)
//主体信息匹配度计算
ratio = match_info(name_, compare_name, mismatch_main, miss_field, weight_sort_amount)
//地区信息匹配度计算
ratio_area = match_area(area, compare_area, miss_field, error_area)
//分支信息匹配度计算
ratio_branch = match_branch(branch, compare_branch, error_branch)
//总匹配度计算
score[i] = max(0, ratio - ratio_area - ratio_branch)
return score
def match_subject(df_slice):
//只取末级科目的最后一级信息
compare_array = [re.sub(".*_", "", x) if "_" in x else "" for x in df_slice.k_kmqc_y]
//过滤换行后的其他信息
name = re.sub("<br/>.*", "", df_slice.k_xfdwmc)
ratio = match(name, compare_array)
return {df_slice.k_kmqc_y[k]: v for k, v in ratio.items()}