cross_doctor/tool/qcos_func.py

87 lines
3.6 KiB
Python
Raw Permalink Normal View History

2026-01-13 15:44:17 +08:00
# -*- coding=utf-8
from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client
import sys
import os
import logging
# 正常情况日志级别使用 INFO需要定位时可以修改为 DEBUG此时 SDK 会打印和服务端的通信信息
# logging.basicConfig(level=logging.INFO, stream=sys.stdout)
g_cos_root = 'https://xinglu-1324629296.cos.ap-beijing.myqcloud.com'
g_cos_bucket = 'xinglu-1324629296'
def get_client():
# 1. 设置用户属性, 包括 secret_id, secret_key, region等。Appid 已在 CosConfig 中移除,请在参数 Bucket 中带上 Appid。Bucket 由 BucketName-Appid 组成
# secret_id = os.environ['COS_SECRET_ID'] # 用户的 SecretId建议使用子账号密钥授权遵循最小权限指引降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140
# secret_key = os.environ['COS_SECRET_KEY'] # 用户的 SecretKey建议使用子账号密钥授权遵循最小权限指引降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140
"""
SecretId:AKIDFLFvOfjbFi171mLnEXp370rHZozymN2c
SecretKey:a6RYUDl6EAtQmurb8To4AFRCoDOM6g3D
"""
secret_id = 'AKIDFLFvOfjbFi171mLnEXp370rHZozymN2c'
secret_key = 'a6RYUDl6EAtQmurb8To4AFRCoDOM6g3D'
region = 'ap-beijing' # 替换为用户的 region已创建桶归属的 region 可以在控制台查看https://console.cloud.tencent.com/cos5/bucket
# COS 支持的所有 region 列表参见https://cloud.tencent.com/document/product/436/6224
token = None # 如果使用永久密钥不需要填入 token如果使用临时密钥需要填入临时密钥生成和使用指引参见 https://cloud.tencent.com/document/product/436/14048
scheme = 'https' # 指定使用 http/https 协议来访问 COS默认为 https可不填
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token, Scheme=scheme)
client = CosS3Client(config)
return client
def upload_file_to_cos(local_filename: str, cos_path: str):
client = get_client()
# 本地路径 简单上传
response = client.put_object_from_local_file(
Bucket=g_cos_bucket,
LocalFilePath=local_filename,
Key=cos_path
)
print(response['ETag'])
return response
def get_file_from_cos(cos_path: str, local_filename: str):
"""
从cos上下载文件
:param cos_path:
:param local_filename:
:return:
"""
client = get_client()
response = client.get_object(
Bucket=g_cos_bucket,
Key=cos_path
)
response['Body'].get_stream_to_file(local_filename)
def upload_order_images_to_cos(idx_to_localfile: dict, orderid: str):
"""
将一组本地图片上传到对应的cos的订单目录下
:param idx_to_localfile:
:param orderid:
:return: idx_to_cosfile
"""
xlsyn_cos_host = 'https://xinglu-1324629296.cos.ap-beijing.myqcloud.com'
cos_order_path = '/user/wave_survey/%s' % orderid
idx_to_cosfile = {}
for idx, local_filepath in idx_to_localfile.items():
image_filename = local_filepath.split('/')[-1]
cos_path = '%s/%s' % (cos_order_path, image_filename)
upload_file_to_cos(local_filepath, cos_path)
cos_file = '%s/%s' % (xlsyn_cos_host, cos_path)
idx_to_cosfile[idx] = cos_file
return idx_to_cosfile
if __name__ == '__main__':
idx_to_localfile = {0:'D:/slgwork/slgcode/wave_survey/1739430848000.jpg',
2:'D:/slgwork/slgcode/wave_survey/1739430851000.jpg'}
idx_to_cosfile = upload_order_images_to_cos(idx_to_localfile, 'slg_orderid')
print(idx_to_cosfile)