diff --git a/AIMeiSheng/docker_demo/falcon_helper.py b/AIMeiSheng/docker_demo/falcon_helper.py new file mode 100644 index 0000000..1d513ed --- /dev/null +++ b/AIMeiSheng/docker_demo/falcon_helper.py @@ -0,0 +1,60 @@ +import json +import random +import socket +import time +from apscheduler.schedulers.background import BackgroundScheduler +import requests + + +class FalconHelper(object): + def __init__(self, metric_name): + self.metric_name = metric_name + self.endpoint_name = socket.gethostname() + self.counter_type = "GAUGE" + self.headers = {"content-type": "application/json"} + self.post_url = "http://127.0.0.1:1988/v1/push" + self.cache = {} + self.scheduler = BackgroundScheduler() + self.scheduler.add_job(self.do, 'cron', hour='*', minute="*") + self.scheduler.start() + + def do(self): + try: + msg = [] + for k in self.cache: + msg += [{ + "metric": self.metric_name, + "tags": "type=%s.qps" % k, + "endpoint": self.endpoint_name, + "value": int(self.cache[k]['count']) / 60, + "counterType": self.counter_type, + "timestamp": int(time.time()), + "step": 60 + }, + { + "metric": self.metric_name, + "tags": "type=%s.response_microsecond" % k, + "endpoint": self.endpoint_name, + "value": int(int(self.cache[k]['time_ms']) / int(self.cache[k]['count'])), + "counterType": self.counter_type, + "timestamp": int(time.time()), + "step": 60 + }] + # j_msg = json.dumps(msg) + # print("jmsg", j_msg) + requests.post(self.post_url, json=msg, headers=self.headers) + except Exception as e: + print("error:", e) + finally: + self.cache = {} + + def add(self, tag, time_t, count=1): + cost_ms = (int(round(time.time() * 1000)) - int(round(time_t * 1000))) + if tag in self.cache: + self.cache[tag]['time_ms'] += cost_ms + self.cache[tag]['count'] += count + else: + self.cache[tag] = { + "time_ms": cost_ms, + "count": count, + } diff --git a/AIMeiSheng/docker_demo/http_server.py b/AIMeiSheng/docker_demo/http_server.py index ef40068..406b43c 100644 --- a/AIMeiSheng/docker_demo/http_server.py +++ b/AIMeiSheng/docker_demo/http_server.py @@ -1,91 +1,98 @@ # -*- coding: UTF-8 -*- """ SVC处理逻辑 1. 根据跟定的vocal_url 判别男女 2. 根据男女信息选择适合的男女url 3. 模型推理 """ import gc import os import sys sys.path.append(os.path.dirname(__file__)) sys.path.append(os.path.join(os.path.dirname(__file__), "../")) import json import time import socket import logging import hashlib from flask import Flask, jsonify, request, abort from redis_helper import RedisHelper from feishu_helper import feishu_send +from falcon_helper import FalconHelper from common import * # 全局设置 logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', datefmt='%Y-%m-%d %I:%M:%S', level=logging.INFO) app = Flask(__name__) class HttpServer: def __init__(self, redis_conf, server_conf): st = time.time() self.redis_helper = RedisHelper(redis_conf) + self.falcon_helper = FalconHelper("av_online_worker") + self.worker_name = "AiMeisheng" self.server_conf = server_conf self.last_send_time = time.time() self.time_interval = 60 logging.info(f"HttpServer init conf={redis_conf}, {server_conf} sp={time.time() - st}") def process(self, in_data): + st = time.time() msg = { "status": gs_err_code_params, "schedule": 100, "gender": "unknown", "target_song_url": "", } if not check_input(in_data): + self.falcon_helper.add(self.worker_name, st) return msg num = self.redis_helper.llen(self.server_conf["producer"]) if num > 10: msg["status"] = gs_err_code_too_many_connections + self.falcon_helper.add(self.worker_name, st) if time.time() - self.last_send_time > 60: err_msg = f"ai meisheng process_async too many connections:{num}" self.last_send_time = time.time() feishu_send(gs_feishu_conf["url"], err_msg, gs_feishu_conf["users"]) return msg distinct_id = hashlib.md5(in_data["record_song_url"].encode()).hexdigest() distinct_key = self.server_conf["ai_meisheng_key_prefix"] + distinct_id if not self.redis_helper.exists(distinct_key): msg["status"] = gs_err_code_pending self.redis_helper.set(distinct_key, json.dumps(msg)) self.redis_helper.lpush(self.server_conf["producer"], json.dumps(in_data)) self.redis_helper.expire(distinct_key, 15) msg = self.redis_helper.get(distinct_key) + self.falcon_helper.add(self.worker_name, st) return json.loads(msg) gs_http_server = HttpServer(gs_redis_conf, gs_server_redis_conf) @app.route("/ai_meisheng", methods=["POST"]) def ai_meisheng(): data = request.json st = time.time() logging.info(f"ai_meisheng:in:{data}") msg = gs_http_server.process(data) json_msg = jsonify(msg) logging.info(f"ai_meisheng:out:{data}-{msg}, sp={time.time() - st}") return json_msg if __name__ == "__main__": app.run(host='0.0.0.0', port=5000, threaded=False)