分享

Python巡检OGG并推送钉钉消息

 xfxyxh 2022-07-31 发布于北京

下面脚本主要根据网络WSN最初OGG检测脚本修改而来,增加了推送钉钉以及判断故障等情况。

主要功能如下:

  1. 监测ssh是否能正常连接,无法连接,推送告警。
  2. 监测OGG链路延迟,进程状态,异常或延迟达到阈值则推送消息。
  3. 可以巡检Linux、AIX、Solaris平台OGG,并将异常链路推送钉钉机器人消息。

如需使用,需要根据自己环境修改下面部分即可:

  1. hostlist中信息,如果有多个主机需要监测,则复制多个花括号中部分,逗号隔开即可。
  2. 钉钉webhook地址,根据自己申请钉钉机器人webhook地址修改即可。
  3. 钉钉机器人密钥,根据钉钉机器人配置界面启用密钥填写即可。
  4. 脚本自身监测以及多久监测一次,根据自己需要,修改second变量以及exec_cnt即可。
  5. 如果是内网环境,需要取消send_message函数中互联网代理设置,填写自己内网代理服务器地址。
  1. #!/usr/bin/env python
  2. # -*-coding: utf-8 -*-
  3. # @Time:2020/7/13 9:47
  4. # @Author: WSN
  5. # @Time:2022/06/07 17:45
  6. # @Modified CZH
  7. import paramiko
  8. import re
  9. import datetime
  10. import time
  11. import os
  12. import hmac
  13. import hashlib
  14. import urllib
  15. import base64
  16. import requests
  17. import json
  18. hostlist=[
  19. {'host': 'IP地址', 'port': 'ssh端口', 'user': 'ogg操作系统用户', 'pwd': 'xxxx', 'oggdir': 'ogg安装目录','orahome': 'oracle_home目录'}
  20. ]
  21. # 钉钉发送函数,输入markdown格式消息文本即可发送
  22. def send_message(send_msg,warn_type):
  23. ### 钉钉配置 begin ###
  24. # 如果是内网,需要有互联网代理配置
  25. # proxies = {'http':'http://IP:PORT','https':'http://IP:PORT'}
  26. # os.environ["https_proxy"] = "http://IP:PORT"
  27. webhook="钉钉机器人webhook地址"
  28. timestamp = str(round(time.time() * 1000))
  29. secret = '钉钉机器人加密密钥'
  30. secret_enc = secret.encode('utf-8')
  31. string_to_sign = '{}\n{}'.format(timestamp, secret)
  32. string_to_sign_enc = string_to_sign.encode('utf-8')
  33. hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
  34. sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
  35. webhook = webhook + '×tamp=' + timestamp + '&sign=' + sign
  36. # timestamp 与系统当前时间戳如果相差1小时以上,则认为是非法的请求,所以需要注意发送消息服务器时间不能与标准时间相差超过一小时
  37. headers = {'Content-Type': 'application/json'}
  38. user = "all"
  39. ### 钉钉配置 end ###
  40. data={
  41. "msgtype": "markdown",
  42. "markdown": {
  43. "title": "数据库告警",
  44. "text": f"# <font face=\"华云彩绘\" color=\"#FF0000\" size=\"10\">{warn_type}</font> \n " + send_msg
  45. },
  46. "at": {
  47. "atMobiles": [
  48. user
  49. ],
  50. "isAtAll": False
  51. }
  52. }
  53. #print(data)
  54. x=requests.post(url=webhook,data=json.dumps(data),headers=headers)
  55. # 打印发送钉钉结果
  56. print('钉钉推送结果为: ' + str(x.json()) + '\n')
  57. def curtime():
  58. timenow = str(datetime.datetime.now()) + ' '
  59. return timenow
  60. # 定义一个休眠循环,定期执行脚本
  61. def sleeptime(hour, min, sec):
  62. return hour * 3600 + min * 60 + sec
  63. class SshOgg:
  64. def __init__(self, host, port, username, pwd):
  65. self.host = host
  66. self.port = port
  67. self.username = username
  68. self.pwd = pwd
  69. self.orahome = orahome
  70. self.ssh = paramiko.SSHClient()
  71. self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  72. try:
  73. self.ssh.connect(self.host, self.port, self.username, self.pwd, timeout=5)
  74. self.sshinvalid = False
  75. print(self.sshinvalid)
  76. except:
  77. self.sshinvalid = True
  78. print(self.sshinvalid)
  79. def check_ogg_status(self,orahome,oggdir):
  80. if self.sshinvalid:
  81. send_msg = ('**告警时间:** ' + curtime() + ' \\\n **告警主机:** ' + self.host )
  82. send_message(send_msg,warn_type='主机SSH无法连接')
  83. else:
  84. # 考虑AIX与Linux不同平台library path不同,所以一起设置,防止需要多判断处理不同平台
  85. cmd = 'export LD_LIBRARY_PATH=%s/lib:%s && export LD_LIBRARY_PATH=%s/lib:%s && echo "info all" |%s/ggsci' % (orahome,oggdir,orahome,oggdir,oggdir)
  86. # print(cmd)
  87. stdin, stdout, stderr = self.ssh.exec_command(cmd)
  88. # print(stdout.readlines)
  89. for i in stderr.readlines():
  90. send_msg = ('**告警时间:** ' + curtime() + ' \\\n **告警主机:** ' + self.host + ' \\\n **告警内容:** ' + i )
  91. send_message(send_msg,warn_type='无法检测OGG')
  92. print(i)
  93. # print(stdout.readlines)
  94. for i in stdout.readlines():
  95. # 计数进程和,不为0再进行处理
  96. v_proc_cnt = i.count('MANAGER')
  97. v_proc_cnt = v_proc_cnt + i.count('EXTRACT')
  98. v_proc_cnt = v_proc_cnt + i.count('REPLICAT')
  99. while v_proc_cnt >= 1:
  100. # 确定取到进程行再进行切片,方便判断取值
  101. v_i = i.split()
  102. print(v_i)
  103. # 将进程与状态赋予变量,用于后面判断
  104. if len(v_i) == 2:
  105. v_proc_type = v_i[0]
  106. v_proc_status = v_i[1]
  107. elif len(v_i) >= 3:
  108. # 进程类型
  109. v_proc_type = v_i[0]
  110. v_proc_status = v_i[1]
  111. # 进程具体名称
  112. v_proc_name = v_i[2]
  113. # lag at chkpt,读取trail检查点延迟
  114. v_lac = str(v_i[3]).split(':')
  115. if len(v_lac) == 1:
  116. lag_at_ckpt = v_lac[0]
  117. else:
  118. lag_at_ckpt = int(v_lac[0])*3600 + int(v_lac[1])*60 + int(v_lac[2])
  119. print(lag_at_ckpt)
  120. print(v_lac)
  121. # 切片time since ckpt,然后计算延迟秒数
  122. v_tsc = str(v_i[4]).split(':')
  123. print(v_tsc)
  124. time_since_ckpt = int(v_tsc[0])*3600 + int(v_tsc[1])*60 + int(v_tsc[2])
  125. print(time_since_ckpt)
  126. if v_proc_type == 'MANAGER':
  127. print(curtime())
  128. print(self.host)
  129. print(v_proc_type)
  130. print(v_proc_status)
  131. send_msg = ('**告警时间:** ' + curtime() + ' \\\n **告警主机:** ' + self.host + ' \\\n **OGG进程:** ' +
  132. v_proc_type + ' \\\n **当前状态 :** ' + v_proc_status)
  133. else:
  134. send_msg = ('**告警时间:** ' + curtime() + ' \\\n **告警主机:** ' + self.host + ' \\\n **进程类型:** ' +
  135. v_proc_type + ' \\\n **进程名称 :** ' + v_proc_name + ' \\\n **当前状态 :** ' + v_proc_status + ' \\\n **Lag at Chkpt:** ' + str(lag_at_ckpt) + 's' + ' \\\n **Time Since Chkpt:** ' + str(time_since_ckpt) + 's')
  136. """
  137. 1.
  138. """
  139. if v_proc_type == 'MANAGER' and v_proc_status == 'RUNING':
  140. send_message(send_msg,warn_type='OGG MGR正常')
  141. time.sleep(3)
  142. elif v_proc_type == 'MANAGER' and v_proc_status != 'RUNNING':
  143. send_message(send_msg,warn_type='OGG MGR异常')
  144. time.sleep(3)
  145. # 进程lag at chkpt如果状态为unknown则,len(v_lac)为1
  146. elif len(v_lac) == 1:
  147. send_message(send_msg,warn_type='OGG状态异常')
  148. time.sleep(3)
  149. elif len(v_lac) > 1 and v_proc_status == 'ABENDED':
  150. send_message(send_msg,warn_type='OGG故障')
  151. time.sleep(3)
  152. elif len(v_lac) > 1 and v_proc_status == 'STOPPED':
  153. send_message(send_msg,warn_type='OGG停止')
  154. time.sleep(3)
  155. elif len(v_lac) > 1 and lag_at_ckpt >= 1800:
  156. send_message(send_msg,warn_type='OGG队列读延迟')
  157. time.sleep(3)
  158. elif time_since_ckpt >= 1800:
  159. send_message(send_msg,warn_type='OGG应用延迟')
  160. time.sleep(3)
  161. elif v_proc_type == 'EXTRACT' and v_proc_status != 'RUNNING':
  162. send_message(send_msg,warn_type='OGG EXTRACT异常')
  163. time.sleep(3)
  164. elif v_proc_type == 'REPLICAT' and v_proc_status != 'RUNNING':
  165. send_message(send_msg,warn_type='OGG应用异常')
  166. time.sleep(3)
  167. else:
  168. send_message(send_msg,warn_type='其他情况')
  169. time.sleep(3)
  170. # while只执行一次即重新获取下一行进程信息判断,避免陷入死循环
  171. break
  172. def closed(self):
  173. self.ssh.close()
  174. # 定时格式为时,分,秒,间隔5分钟执行一次
  175. second = sleeptime(0, 0, 10)
  176. # 用于监测脚本本身,定时发送脚本运行邮件,防止脚本自身异常
  177. exec_cnt = 0
  178. while 1==1 :
  179. for con_info in hostlist:
  180. host = con_info['host']
  181. port = con_info['port']
  182. username = con_info['user']
  183. pwd = con_info['pwd']
  184. oggdir = con_info['oggdir']
  185. orahome = con_info['orahome']
  186. ssh = SshOgg(host, port, username, pwd)
  187. print('\n------------%s上ogg进程运行状态信息------------' % host)
  188. print(curtime() + '\n')
  189. ssh.check_ogg_status(orahome,oggdir)
  190. ssh.closed()
  191. print(curtime())
  192. exec_cnt = exec_cnt + 1
  193. print('Current execute count is :' + str(exec_cnt) + '\n')
  194. if exec_cnt == 10:
  195. send_message('This scripts is running!',warn_type='推送服务监测')
  196. exec_cnt = 0
  197. time.sleep(second)

来自 “ ITPUB博客 ” ,链接:http://blog./31439444/viewspace-2899126/,如需转载,请注明出处,否则将追究法律责任。                                             

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多