feat: python发布消息到nats-server

This commit is contained in:
mozzie 2023-09-14 13:59:50 +08:00
parent 743d2ef1d0
commit fe13c5840b
2 changed files with 12 additions and 15 deletions

View File

@ -1,22 +1,20 @@
from flask import Flask, jsonify
from pynats2 import NATSClient from pynats2 import NATSClient
from datetime import datetime,timedelta
import json import json
app = Flask(__name__)
@app.route('/publish', methods=['POST'])
def publish_to_nats(): def publish_to_nats():
data_dict = { standardLog = {
"key": "value", "timestamp": (datetime.utcnow()+timedelta(hours=8)).strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"number": 123, "level": 'INFO', # DEBUG、INFO、WARNING、ERROR、CRITICAL等级别
"list": [1, 2, 3] "name": 'alg.slice',# 日志生产的模块、服务
"message": "算法版本:$1,分割结果:$2",# 日志记录模板
"customFields": {
"$1": "1.0.0",
"$2": True
},
} }
data_str = json.dumps(data_dict)
# 将字符串转换为bytes
data_bytes = data_str.encode('utf-8')
with NATSClient() as client: with NATSClient() as client:
client.publish("alg.test", payload=data_bytes) client.publish("alg.test", payload=json.dumps(standardLog).encode('utf-8'))
return jsonify({'message': 'Message sent successfully!'})
if __name__ == '__main__': if __name__ == '__main__':
app.run(debug=True) publish_to_nats()

View File

@ -1,2 +1 @@
flask
pynats2 pynats2