Page MenuHomePhabricator

No OneTemporary

diff --git a/AIMeiSheng/docker_demo/falcon_helper.py b/AIMeiSheng/docker_demo/falcon_helper.py
new file mode 100644
index 0000000..1d513ed
--- /dev/null
+++ b/AIMeiSheng/docker_demo/falcon_helper.py
@@ -0,0 +1,60 @@
+import json
+import random
+import socket
+import time
+from apscheduler.schedulers.background import BackgroundScheduler
+import requests
+
+
+class FalconHelper(object):
+ def __init__(self, metric_name):
+ self.metric_name = metric_name
+ self.endpoint_name = socket.gethostname()
+ self.counter_type = "GAUGE"
+ self.headers = {"content-type": "application/json"}
+ self.post_url = "http://127.0.0.1:1988/v1/push"
+ self.cache = {}
+ self.scheduler = BackgroundScheduler()
+ self.scheduler.add_job(self.do, 'cron', hour='*', minute="*")
+ self.scheduler.start()
+
+ def do(self):
+ try:
+ msg = []
+ for k in self.cache:
+ msg += [{
+ "metric": self.metric_name,
+ "tags": "type=%s.qps" % k,
+ "endpoint": self.endpoint_name,
+ "value": int(self.cache[k]['count']) / 60,
+ "counterType": self.counter_type,
+ "timestamp": int(time.time()),
+ "step": 60
+ },
+ {
+ "metric": self.metric_name,
+ "tags": "type=%s.response_microsecond" % k,
+ "endpoint": self.endpoint_name,
+ "value": int(int(self.cache[k]['time_ms']) / int(self.cache[k]['count'])),
+ "counterType": self.counter_type,
+ "timestamp": int(time.time()),
+ "step": 60
+ }]
+ # j_msg = json.dumps(msg)
+ # print("jmsg", j_msg)
+ requests.post(self.post_url, json=msg, headers=self.headers)
+ except Exception as e:
+ print("error:", e)
+ finally:
+ self.cache = {}
+
+ def add(self, tag, time_t, count=1):
+ cost_ms = (int(round(time.time() * 1000)) - int(round(time_t * 1000)))
+ if tag in self.cache:
+ self.cache[tag]['time_ms'] += cost_ms
+ self.cache[tag]['count'] += count
+ else:
+ self.cache[tag] = {
+ "time_ms": cost_ms,
+ "count": count,
+ }
diff --git a/AIMeiSheng/docker_demo/http_server.py b/AIMeiSheng/docker_demo/http_server.py
index ef40068..406b43c 100644
--- a/AIMeiSheng/docker_demo/http_server.py
+++ b/AIMeiSheng/docker_demo/http_server.py
@@ -1,91 +1,98 @@
# -*- coding: UTF-8 -*-
"""
SVC处理逻辑
1. 根据跟定的vocal_url 判别男女
2. 根据男女信息选择适合的男女url
3. 模型推理
"""
import gc
import os
import sys
sys.path.append(os.path.dirname(__file__))
sys.path.append(os.path.join(os.path.dirname(__file__), "../"))
import json
import time
import socket
import logging
import hashlib
from flask import Flask, jsonify, request, abort
from redis_helper import RedisHelper
from feishu_helper import feishu_send
+from falcon_helper import FalconHelper
from common import *
# 全局设置
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', datefmt='%Y-%m-%d %I:%M:%S', level=logging.INFO)
app = Flask(__name__)
class HttpServer:
def __init__(self, redis_conf, server_conf):
st = time.time()
self.redis_helper = RedisHelper(redis_conf)
+ self.falcon_helper = FalconHelper("av_online_worker")
+ self.worker_name = "AiMeisheng"
self.server_conf = server_conf
self.last_send_time = time.time()
self.time_interval = 60
logging.info(f"HttpServer init conf={redis_conf}, {server_conf} sp={time.time() - st}")
def process(self, in_data):
+ st = time.time()
msg = {
"status": gs_err_code_params,
"schedule": 100,
"gender": "unknown",
"target_song_url": "",
}
if not check_input(in_data):
+ self.falcon_helper.add(self.worker_name, st)
return msg
num = self.redis_helper.llen(self.server_conf["producer"])
if num > 10:
msg["status"] = gs_err_code_too_many_connections
+ self.falcon_helper.add(self.worker_name, st)
if time.time() - self.last_send_time > 60:
err_msg = f"ai meisheng process_async too many connections:{num}"
self.last_send_time = time.time()
feishu_send(gs_feishu_conf["url"], err_msg, gs_feishu_conf["users"])
return msg
distinct_id = hashlib.md5(in_data["record_song_url"].encode()).hexdigest()
distinct_key = self.server_conf["ai_meisheng_key_prefix"] + distinct_id
if not self.redis_helper.exists(distinct_key):
msg["status"] = gs_err_code_pending
self.redis_helper.set(distinct_key, json.dumps(msg))
self.redis_helper.lpush(self.server_conf["producer"], json.dumps(in_data))
self.redis_helper.expire(distinct_key, 15)
msg = self.redis_helper.get(distinct_key)
+ self.falcon_helper.add(self.worker_name, st)
return json.loads(msg)
gs_http_server = HttpServer(gs_redis_conf, gs_server_redis_conf)
@app.route("/ai_meisheng", methods=["POST"])
def ai_meisheng():
data = request.json
st = time.time()
logging.info(f"ai_meisheng:in:{data}")
msg = gs_http_server.process(data)
json_msg = jsonify(msg)
logging.info(f"ai_meisheng:out:{data}-{msg}, sp={time.time() - st}")
return json_msg
if __name__ == "__main__":
app.run(host='0.0.0.0', port=5000, threaded=False)

File Metadata

Mime Type
text/x-diff
Expires
Sun, Jan 12, 08:35 (1 d, 11 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1347217
Default Alt Text
(5 KB)

Event Timeline