import functions_framework from google.cloud import storage, secretmanager import os import hubspot from hubspot.crm.objects.meetings import SimplePublicObjectInputForCreate, ApiException import requests import csv import io import re import jaconv from rapidfuzz import process, fuzz import json CUTOFF = 80 # Fuzzy 閾値 (0-100) LEGAL_SUFFIX = r'(株式会社|(株)|\(株\)|有限会社|合同会社|Inc\.?|Corp\.?|Co\.?Ltd\.?)' cs_client = storage.Client(project=os.getenv("PROJECT_ID")) sm_client = secretmanager.SecretManagerServiceClient() @functions_framework.http def handle_request(request): try: request_json = request.get_json() print(request_json) mode = os.getenv("MODE") # モード(devまたはprod) title = request_json['title'] host_id = request_json['host_id'] if mode == 'prod' else 'ksuenaga@datacom.jp' # ホストユーザーID(開発環境では固定値を使用) starts_at = request_json['starts_at'] ends_at = request_json['ends_at'] minutes = request_json['minutes'] # タイトルから【】を削除 title = title.replace("【", "").replace("】", "") # タイトルから企業名を抽出 company_name = title.split("様")[0].strip() # "様" で分割して企業名を取得 print("抽出した企業名:", company_name) # 会社名から会社IDを取得 matched_company_id, matched_company_name = search_company(company_name) # マッチしたときだけ処理を行う if matched_company_id: # ユーザーIDを取得 by_email = load_owners() user_id = None if host_id in by_email: user_id = by_email[host_id]['id'] print("取得したユーザーID:", user_id) # 改行コードを
タグに変換 minutes_html = minutes.replace("\n", "
") # ミーティングログを作成 create_meeting_log(matched_company_id, title, user_id, starts_at, ends_at, minutes_html) response_data = { "matched_company_id": matched_company_id, # マッチした会社ID "matched_company_name": matched_company_name, # マッチした会社名 } return (json.dumps(response_data, ensure_ascii=False), 200, {"Content-Type": "application/json"}) except ApiException as e: print("Exception when calling basic_api->create: %s\n" % e) def normalize(name: str) -> str: """表記ゆれ吸収用の正規化""" n = jaconv.z2h(name, kana=False, digit=True, ascii=True).lower() n = re.sub(LEGAL_SUFFIX, '', n) return re.sub(r'[\s\-・・,,、\.]', '', n) # GCSから会社一覧取得 def load_componies(): """ 毎回 Cloud Storage から CSV を読み込む。 *応答速度を気にしない* 前提なのでキャッシュしなくても OK。 """ blob = cs_client.bucket(os.getenv("BUCKET")).blob('master/mst_company.csv') raw = blob.download_as_bytes() # bytes recs, by_norm = [], {} with io.StringIO(raw.decode("utf-8")) as f: reader = csv.DictReader(f) for row in reader: row["norm_name"] = normalize(row["company_name"]) recs.append(row) by_norm[row["norm_name"]] = row # 完全一致用ハッシュ return recs, by_norm # (list[dict], dict) # GCSから担当者一覧取得 def load_owners(): """ GCS から担当者一覧 CSV を読み込み、 email -> row 辞書 のマッピングを返す """ blob = cs_client.bucket(os.getenv("BUCKET")).blob('master/mst_owner.csv') raw = blob.download_as_bytes() # bytes by_email = {} with io.StringIO(raw.decode("utf-8")) as f: reader = csv.DictReader(f) for row in reader: # row に "email" と "user_id" フィールドがある前提 email = row["email"].strip().lower() by_email[email] = row return by_email def fuzzy_candidates(norm: str, recs): """ norm : 正規化済み検索語 recs : 会社レコード list[dict] (norm_name 含む) 戻り値 : list[(score:int, idx:int)] """ top = 2 # 上位 2 件を取得 matches = process.extract( norm, [r["norm_name"] for r in recs], scorer=fuzz.WRatio, score_cutoff=CUTOFF, limit=top ) print("ファジーマッチ結果:", matches) if len(matches) == 0: return None # マッチなしの場合は None を返す elif len(matches) == 1: return recs[matches[0][2]] # 上位 1 件のみの場合はそのレコードを返す else: if(matches[0][1] == matches[1][1]): return None # 上位 2 件のスコアが同じ場合は None を返す return recs[matches[0][2]] # 上位 1 件のみの場合はそのレコードを返す def search_company(company_name): # -------------------- マスタ読み込み -------------------- recs, by_norm = load_componies() norm_company_name = normalize(company_name) print("正規化した企業名:", norm_company_name) matched_company_id = None matched_company_name = None # -------------------- 完全一致 -------------------- if norm_company_name in by_norm: matched_company_id = by_norm[norm_company_name]["company_id"] matched_company_name = by_norm[norm_company_name]["company_name"] # -------------------- ファジーマッチ複数 -------------------- else : result = fuzzy_candidates(norm_company_name, recs) if result: matched_company_id = result["company_id"] matched_company_name = result["company_name"] print("マッチした会社ID:", matched_company_id) print("マッチした会社名:", matched_company_name) return matched_company_id, matched_company_name def create_meeting_log(company_id ,title, user_id, starts_at, ends_at, minutes): """ HubSpot API を使ってミーティングログを作成する。 """ access_key = get_access_key() # Secret Manager からアクセストークンを取得 hs_client = hubspot.Client.create(access_token=access_key) properties = { "hs_timestamp": starts_at, "hs_meeting_title": title, "hubspot_owner_id": user_id, "hs_meeting_body": minutes, "hs_meeting_start_time": starts_at, "hs_meeting_end_time": ends_at, } simple_public_object_input_for_create = SimplePublicObjectInputForCreate( associations=[{"types":[{"associationCategory":"HUBSPOT_DEFINED","associationTypeId":188}],"to":{"id":company_id}}], properties=properties ) api_response = hs_client.crm.objects.meetings.basic_api.create(simple_public_object_input_for_create=simple_public_object_input_for_create) print(api_response) # # SecretManagerからアクセストークンを取得 # def get_access_key(): key_path = os.getenv('KEY_PATH') + "/versions/1" # アクセストークン取得 response = sm_client.access_secret_version(name=key_path) # アクセストークンをデコード access_token = response.payload.data.decode("UTF-8") return access_token