Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F4880352
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
5 KB
Subscribers
None
View Options
diff --git a/AIMeiSheng/docker_demo/falcon_helper.py b/AIMeiSheng/docker_demo/falcon_helper.py
new file mode 100644
index 0000000..1d513ed
--- /dev/null
+++ b/AIMeiSheng/docker_demo/falcon_helper.py
@@ -0,0 +1,60 @@
+import json
+import random
+import socket
+import time
+from apscheduler.schedulers.background import BackgroundScheduler
+import requests
+
+
+class FalconHelper(object):
+ def __init__(self, metric_name):
+ self.metric_name = metric_name
+ self.endpoint_name = socket.gethostname()
+ self.counter_type = "GAUGE"
+ self.headers = {"content-type": "application/json"}
+ self.post_url = "http://127.0.0.1:1988/v1/push"
+ self.cache = {}
+ self.scheduler = BackgroundScheduler()
+ self.scheduler.add_job(self.do, 'cron', hour='*', minute="*")
+ self.scheduler.start()
+
+ def do(self):
+ try:
+ msg = []
+ for k in self.cache:
+ msg += [{
+ "metric": self.metric_name,
+ "tags": "type=%s.qps" % k,
+ "endpoint": self.endpoint_name,
+ "value": int(self.cache[k]['count']) / 60,
+ "counterType": self.counter_type,
+ "timestamp": int(time.time()),
+ "step": 60
+ },
+ {
+ "metric": self.metric_name,
+ "tags": "type=%s.response_microsecond" % k,
+ "endpoint": self.endpoint_name,
+ "value": int(int(self.cache[k]['time_ms']) / int(self.cache[k]['count'])),
+ "counterType": self.counter_type,
+ "timestamp": int(time.time()),
+ "step": 60
+ }]
+ # j_msg = json.dumps(msg)
+ # print("jmsg", j_msg)
+ requests.post(self.post_url, json=msg, headers=self.headers)
+ except Exception as e:
+ print("error:", e)
+ finally:
+ self.cache = {}
+
+ def add(self, tag, time_t, count=1):
+ cost_ms = (int(round(time.time() * 1000)) - int(round(time_t * 1000)))
+ if tag in self.cache:
+ self.cache[tag]['time_ms'] += cost_ms
+ self.cache[tag]['count'] += count
+ else:
+ self.cache[tag] = {
+ "time_ms": cost_ms,
+ "count": count,
+ }
diff --git a/AIMeiSheng/docker_demo/http_server.py b/AIMeiSheng/docker_demo/http_server.py
index ef40068..406b43c 100644
--- a/AIMeiSheng/docker_demo/http_server.py
+++ b/AIMeiSheng/docker_demo/http_server.py
@@ -1,91 +1,98 @@
# -*- coding: UTF-8 -*-
"""
SVC处理逻辑
1. 根据跟定的vocal_url 判别男女
2. 根据男女信息选择适合的男女url
3. 模型推理
"""
import gc
import os
import sys
sys.path.append(os.path.dirname(__file__))
sys.path.append(os.path.join(os.path.dirname(__file__), "../"))
import json
import time
import socket
import logging
import hashlib
from flask import Flask, jsonify, request, abort
from redis_helper import RedisHelper
from feishu_helper import feishu_send
+from falcon_helper import FalconHelper
from common import *
# 全局设置
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', datefmt='%Y-%m-%d %I:%M:%S', level=logging.INFO)
app = Flask(__name__)
class HttpServer:
def __init__(self, redis_conf, server_conf):
st = time.time()
self.redis_helper = RedisHelper(redis_conf)
+ self.falcon_helper = FalconHelper("av_online_worker")
+ self.worker_name = "AiMeisheng"
self.server_conf = server_conf
self.last_send_time = time.time()
self.time_interval = 60
logging.info(f"HttpServer init conf={redis_conf}, {server_conf} sp={time.time() - st}")
def process(self, in_data):
+ st = time.time()
msg = {
"status": gs_err_code_params,
"schedule": 100,
"gender": "unknown",
"target_song_url": "",
}
if not check_input(in_data):
+ self.falcon_helper.add(self.worker_name, st)
return msg
num = self.redis_helper.llen(self.server_conf["producer"])
if num > 10:
msg["status"] = gs_err_code_too_many_connections
+ self.falcon_helper.add(self.worker_name, st)
if time.time() - self.last_send_time > 60:
err_msg = f"ai meisheng process_async too many connections:{num}"
self.last_send_time = time.time()
feishu_send(gs_feishu_conf["url"], err_msg, gs_feishu_conf["users"])
return msg
distinct_id = hashlib.md5(in_data["record_song_url"].encode()).hexdigest()
distinct_key = self.server_conf["ai_meisheng_key_prefix"] + distinct_id
if not self.redis_helper.exists(distinct_key):
msg["status"] = gs_err_code_pending
self.redis_helper.set(distinct_key, json.dumps(msg))
self.redis_helper.lpush(self.server_conf["producer"], json.dumps(in_data))
self.redis_helper.expire(distinct_key, 15)
msg = self.redis_helper.get(distinct_key)
+ self.falcon_helper.add(self.worker_name, st)
return json.loads(msg)
gs_http_server = HttpServer(gs_redis_conf, gs_server_redis_conf)
@app.route("/ai_meisheng", methods=["POST"])
def ai_meisheng():
data = request.json
st = time.time()
logging.info(f"ai_meisheng:in:{data}")
msg = gs_http_server.process(data)
json_msg = jsonify(msg)
logging.info(f"ai_meisheng:out:{data}-{msg}, sp={time.time() - st}")
return json_msg
if __name__ == "__main__":
app.run(host='0.0.0.0', port=5000, threaded=False)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sun, Jan 12, 08:35 (1 d, 15 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1347217
Default Alt Text
(5 KB)
Attached To
R350 av_svc
Event Timeline
Log In to Comment