cross_doctor/tool/qcos_func.py

87 lines
3.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding=utf-8
from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client
import sys
import os
import logging
# 正常情况日志级别使用 INFO需要定位时可以修改为 DEBUG此时 SDK 会打印和服务端的通信信息
# logging.basicConfig(level=logging.INFO, stream=sys.stdout)
g_cos_root = 'https://xinglu-1324629296.cos.ap-beijing.myqcloud.com'
g_cos_bucket = 'xinglu-1324629296'
def get_client():
# 1. 设置用户属性, 包括 secret_id, secret_key, region等。Appid 已在 CosConfig 中移除,请在参数 Bucket 中带上 Appid。Bucket 由 BucketName-Appid 组成
# secret_id = os.environ['COS_SECRET_ID'] # 用户的 SecretId建议使用子账号密钥授权遵循最小权限指引降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140
# secret_key = os.environ['COS_SECRET_KEY'] # 用户的 SecretKey建议使用子账号密钥授权遵循最小权限指引降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140
"""
SecretId:AKIDFLFvOfjbFi171mLnEXp370rHZozymN2c
SecretKey:a6RYUDl6EAtQmurb8To4AFRCoDOM6g3D
"""
secret_id = 'AKIDFLFvOfjbFi171mLnEXp370rHZozymN2c'
secret_key = 'a6RYUDl6EAtQmurb8To4AFRCoDOM6g3D'
region = 'ap-beijing' # 替换为用户的 region已创建桶归属的 region 可以在控制台查看https://console.cloud.tencent.com/cos5/bucket
# COS 支持的所有 region 列表参见https://cloud.tencent.com/document/product/436/6224
token = None # 如果使用永久密钥不需要填入 token如果使用临时密钥需要填入临时密钥生成和使用指引参见 https://cloud.tencent.com/document/product/436/14048
scheme = 'https' # 指定使用 http/https 协议来访问 COS默认为 https可不填
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token, Scheme=scheme)
client = CosS3Client(config)
return client
def upload_file_to_cos(local_filename: str, cos_path: str):
client = get_client()
# 本地路径 简单上传
response = client.put_object_from_local_file(
Bucket=g_cos_bucket,
LocalFilePath=local_filename,
Key=cos_path
)
print(response['ETag'])
return response
def get_file_from_cos(cos_path: str, local_filename: str):
"""
从cos上下载文件
:param cos_path:
:param local_filename:
:return:
"""
client = get_client()
response = client.get_object(
Bucket=g_cos_bucket,
Key=cos_path
)
response['Body'].get_stream_to_file(local_filename)
def upload_order_images_to_cos(idx_to_localfile: dict, orderid: str):
"""
将一组本地图片上传到对应的cos的订单目录下
:param idx_to_localfile:
:param orderid:
:return: idx_to_cosfile
"""
xlsyn_cos_host = 'https://xinglu-1324629296.cos.ap-beijing.myqcloud.com'
cos_order_path = '/user/wave_survey/%s' % orderid
idx_to_cosfile = {}
for idx, local_filepath in idx_to_localfile.items():
image_filename = local_filepath.split('/')[-1]
cos_path = '%s/%s' % (cos_order_path, image_filename)
upload_file_to_cos(local_filepath, cos_path)
cos_file = '%s/%s' % (xlsyn_cos_host, cos_path)
idx_to_cosfile[idx] = cos_file
return idx_to_cosfile
if __name__ == '__main__':
idx_to_localfile = {0:'D:/slgwork/slgcode/wave_survey/1739430848000.jpg',
2:'D:/slgwork/slgcode/wave_survey/1739430851000.jpg'}
idx_to_cosfile = upload_order_images_to_cos(idx_to_localfile, 'slg_orderid')
print(idx_to_cosfile)