sales_tool/functions/export-companies-to-gcs/source/main.py
2025-11-17 14:21:29 +09:00

87 lines
2.8 KiB
Python
Executable file
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import functions_framework
from google.cloud import storage, secretmanager
import os
import hubspot
from hubspot.crm.objects.meetings import ApiException
import csv
import io
import json
cs_client = storage.Client()
sm_client = secretmanager.SecretManagerServiceClient()
@functions_framework.http
def handle_request(request):
try:
# 会社一覧取得
companies = fetch_all_companies()
# メモリ上で CSV を生成
csv_buffer = io.StringIO()
writer = csv.writer(csv_buffer)
# ヘッダー行
writer.writerow(["company_id", "company_name"])
# 各行を書き込み
for row in companies:
company_id = row['properties']['hs_object_id']
company_name = row['properties']['name']
writer.writerow([company_id, company_name])
# Cloud Storage にアップロード
upload_to_gcs(csv_buffer)
return 'success', 200
except ApiException as e:
print("Exception when calling basic_api->create: %s\n" % e)
return (json.dumps("", ensure_ascii=False), 500, {"Content-Type": "application/json"})
def fetch_all_companies():
"""
Companies API の get_page をページネーション付きで呼び出し、
全オブジェクトをリストで返す。
"""
access_key = get_access_key() # Secret Manager からアクセストークンを取得
hs_client = hubspot.Client.create(access_token=access_key)
all_companies = []
after = None
limit = 100 # 1 回あたりの取得件数(最大 100
while True:
# get_page の基本呼び出し
response = hs_client.crm.companies.basic_api.get_page(
limit=limit,
archived=False,
after=after
)
# レスポンスから companies の配列を追加
if response.results:
all_companies.extend([c.to_dict() for c in response.results])
# 次ページがない場合はループ終了
paging = response.paging
if not paging or not paging.next or not paging.next.after:
break
# next.after をセットして次ループへ
after = paging.next.after
return all_companies
def upload_to_gcs(data):
"""
メモリ上の CSV データを Cloud Storage にアップロード
"""
bucket = cs_client.bucket(os.getenv("BUCKET"))
blob = bucket.blob(os.getenv("OBJECT"))
blob.upload_from_string(data.getvalue(), content_type='text/csv')
#
# SecretManagerからアクセストークンを取得
#
def get_access_key():
key_path = os.getenv('KEY_PATH') + "/versions/1"
# アクセストークン取得
response = sm_client.access_secret_version(name=key_path)
# アクセストークンをデコード
access_token = response.payload.data.decode("UTF-8")
return access_token