Python 脚本部分实例:企业微信告警、FTP 客户端、SSH 客户端、Saltstack 客户端、vCenter 客户端、获取域名 ssl 证书过期时间、发送今天的天气预报以及未来的天气趋势图; Shell 脚本部分实例:SVN 完整备份、Zabbix 监控用户密码过期、构建本地 YUM 以及文章中有读者的需求(负载高时,查出占用比较高的进程脚本并存储或推送通知); Python 脚本部分企业微信告警import requests import json class DLF: def __init__(self, corpid, corpsecret): self.url = 'https://qyapi.weixin.qq.com/cgi-bin' self.corpid = corpid self.corpsecret = corpsecret self._token = self._get_token() def _get_token(self): ''' 获取企业微信API接口的access_token :return: ''' token_url = self.url + '/gettoken?corpid=%s&corpsecret=%s' %(self.corpid, self.corpsecret) try: res = requests.get(token_url).json() token = res['access_token'] return token except Exception as e: return str(e) def _get_media_id(self, file_obj): get_media_url = self.url + '/media/upload?access_token={}&type=file'.format(self._token) data = {'media': file_obj} try: res = requests.post(url=get_media_url, files=data) media_id = res.json()['media_id'] return media_id except Exception as e: return str(e) def send_text(self, agentid, content, touser=None, toparty=None): send_msg_url = self.url + '/message/send?access_token=%s' % (self._token) send_data = { 'touser': touser, 'toparty': toparty, 'msgtype': 'text', 'agentid': agentid, 'text': { 'content': content } } try: res = requests.post(send_msg_url, data=json.dumps(send_data)) except Exception as e: return str(e) def send_image(self, agentid, file_obj, touser=None, toparty=None): media_id = self._get_media_id(file_obj) send_msg_url = self.url + '/message/send?access_token=%s' % (self._token) send_data = { 'touser': touser, 'toparty': toparty, 'msgtype': 'image', 'agentid': agentid, 'image': { 'media_id': media_id } } try: res = requests.post(send_msg_url, data=json.dumps(send_data)) except Exception as e: return str(e) FTP 客户端
SSH 客户端import paramiko class SSHClient: def __init__(self, host, port, user, pkey): self.ssh_host = host self.ssh_port = port self.ssh_user = user self.private_key = paramiko.RSAKey.from_private_key_file(pkey) self.ssh = None self._connect() def _connect(self): self.ssh = paramiko.SSHClient() self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: self.ssh.connect(hostname=self.ssh_host, port=self.ssh_port, username=self.ssh_user, pkey=self.private_key, timeout=10) except: return 'ssh connect fail' def execute_command(self, command): stdin, stdout, stderr = self.ssh.exec_command(command) out = stdout.read() err = stderr.read() return out, err def close(self): self.ssh.close() Saltstack 客户端
vCenter 客户端from pyVmomi import vim from asset import models import atexit class Vmware: def __init__(self, ip, user, password, port, idc, vcenter_id): self.ip = ip self.user = user self.password = password self.port = port self.idc_id = idc self.vcenter_id = vcenter_id def get_obj(self, content, vimtype, name=None): ''' 列表返回,name 可以指定匹配的对象 ''' container = content.viewManager.CreateContainerView(content.rootFolder, vimtype, True) obj = [ view for view in container.view ] return obj def get_esxi_info(self): # 宿主机信息 esxi_host = {} res = {'connect_status': True, 'msg': None} try: # connect this thing si = SmartConnectNoSSL(host=self.ip, user=self.user, pwd=self.password, port=self.port, connectionPoolTimeout=60) except Exception as e: res['connect_status'] = False try: res['msg'] = ('%s Caught vmodl fault : ' + e.msg) % (self.ip) except Exception as e: res['msg'] = '%s: connection error' % (self.ip) return res # disconnect this thing atexit.register(Disconnect, si) content = si.RetrieveContent() esxi_obj = self.get_obj(content, [vim.HostSystem]) for esxi in esxi_obj: esxi_host[esxi.name] = {} esxi_host[esxi.name]['idc_id'] = self.idc_id esxi_host[esxi.name]['vcenter_id'] = self.vcenter_id esxi_host[esxi.name]['server_ip'] = esxi.name esxi_host[esxi.name]['manufacturer'] = esxi.summary.hardware.vendor esxi_host[esxi.name]['server_model'] = esxi.summary.hardware.model for i in esxi.summary.hardware.otherIdentifyingInfo: if isinstance(i, vim.host.SystemIdentificationInfo): esxi_host[esxi.name]['server_sn'] = i.identifierValue # 系统名称 esxi_host[esxi.name]['system_name'] = esxi.summary.config.product.fullName # cpu总核数 esxi_cpu_total = esxi.summary.hardware.numCpuThreads # 内存总量 GB esxi_memory_total = esxi.summary.hardware.memorySize / 1024 / 1024 / 1024 # 获取硬盘总量 GB esxi_disk_total = 0 for ds in esxi.datastore: esxi_disk_total += ds.summary.capacity / 1024 / 1024 / 1024 # 默认配置4核8G100G,根据这个配置计算剩余可分配虚拟机 default_configure = { 'cpu': 4, 'memory': 8, 'disk': 100 } esxi_host[esxi.name]['vm_host'] = [] vm_usage_total_cpu = 0 vm_usage_total_memory = 0 vm_usage_total_disk = 0 # 虚拟机信息 for vm in esxi.vm: host_info = {} host_info['vm_name'] = vm.name host_info['power_status'] = vm.runtime.powerState host_info['cpu_total_kernel'] = str(vm.config.hardware.numCPU) + '核' host_info['memory_total'] = str(vm.config.hardware.memoryMB) + 'MB' host_info['system_info'] = vm.config.guestFullName disk_info = '' disk_total = 0 for d in vm.config.hardware.device: if isinstance(d, vim.vm.device.VirtualDisk): disk_total += d.capacityInKB / 1024 / 1024 disk_info += d.deviceInfo.label + ': ' + str((d.capacityInKB) / 1024 / 1024) + ' GB' + ',' host_info['disk_info'] = disk_info esxi_host[esxi.name]['vm_host'].append(host_info) # 计算当前宿主机可用容量:总量 - 已分配的 if host_info['power_status'] == 'poweredOn': vm_usage_total_cpu += vm.config.hardware.numCPU vm_usage_total_disk += disk_total vm_usage_total_memory += (vm.config.hardware.memoryMB / 1024) esxi_cpu_free = esxi_cpu_total - vm_usage_total_cpu esxi_memory_free = esxi_memory_total - vm_usage_total_memory esxi_disk_free = esxi_disk_total - vm_usage_total_disk esxi_host[esxi.name]['cpu_info'] = 'Total: %d核, Free: %d核' % (esxi_cpu_total, esxi_cpu_free) esxi_host[esxi.name]['memory_info'] = 'Total: %dGB, Free: %dGB' % (esxi_memory_total, esxi_memory_free) esxi_host[esxi.name]['disk_info'] = 'Total: %dGB, Free: %dGB' % (esxi_disk_total, esxi_disk_free) # 计算cpu 内存 磁盘按照默认资源分配的最小值,即为当前可分配资源 if esxi_cpu_free < 4 or esxi_memory_free < 8 or esxi_disk_free < 100: free_allocation_vm_host = 0 else: free_allocation_vm_host = int(min( [ esxi_cpu_free / default_configure['cpu'], esxi_memory_free / default_configure['memory'], esxi_disk_free / default_configure['disk'] ] )) esxi_host[esxi.name]['free_allocation_vm_host'] = free_allocation_vm_host esxi_host['connect_status'] = True return esxi_host def write_to_db(self): esxi_host = self.get_esxi_info() # 连接失败 if not esxi_host['connect_status']: return esxi_host del esxi_host['connect_status'] for machine_ip in esxi_host: # 物理机信息 esxi_host_dict = esxi_host[machine_ip] # 虚拟机信息 virtual_host = esxi_host[machine_ip]['vm_host'] del esxi_host[machine_ip]['vm_host'] obj = models.EsxiHost.objects.create(**esxi_host_dict) obj.save() for host_info in virtual_host: host_info['management_host_id'] = obj.id obj2 = models.virtualHost.objects.create(**host_info) obj2.save() 获取域名 ssl 证书过期时间
发送今天的天气预报以及未来的天气趋势图![]() import requests import json import datetime def weather(city): url = 'http://wthrcdn./weather_mini?city=%s' % city try: data = requests.get(url).json()['data'] city = data['city'] ganmao = data['ganmao'] today_weather = data['forecast'][0] res = '老婆今天是{}\n今天天气概况\n城市: {:<10}\n时间: {:<10}\n高温: {:<10}\n低温: {:<10}\n风力: {:<10}\n风向: {:<10}\n天气: {:<10}\n\n稍后会发送近期温度趋势图,请注意查看。\ '.format( ganmao, city, datetime.datetime.now().strftime('%Y-%m-%d'), today_weather['high'].split()[1], today_weather['low'].split()[1], today_weather['fengli'].split('[')[2].split(']')[0], today_weather['fengxiang'],today_weather['type'], ) return {'source_data': data, 'res': res} except Exception as e: return str(e) ``` + 获取天气预报趋势图 ```python # -*- coding: utf-8 -*- import matplotlib.pyplot as plt import re import datetime def Future_weather_states(forecast, save_path, day_num=5): ''' 展示未来的天气预报趋势图 :param forecast: 天气预报预测的数据 :param day_num: 未来几天 :return: 趋势图 ''' future_forecast = forecast dict={} for i in range(day_num): data = [] date = future_forecast[i]['date'] date = int(re.findall('\d+',date)[0]) data.append(int(re.findall('\d+', future_forecast[i]['high'])[0])) data.append(int(re.findall('\d+', future_forecast[i]['low'])[0])) data.append(future_forecast[i]['type']) dict[date] = data data_list = sorted(dict.items()) date=[] high_temperature = [] low_temperature = [] for each in data_list: date.append(each[0]) high_temperature.append(each[1][0]) low_temperature.append(each[1][1]) fig = plt.plot(date,high_temperature,'r',date,low_temperature,'b') current_date = datetime.datetime.now().strftime('%Y-%m') plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False plt.xlabel(current_date) plt.ylabel('℃') plt.legend(['高温', '低温']) plt.xticks(date) plt.title('最近几天温度变化趋势') plt.savefig(save_path) ``` + 发送到企业微信 ```python # -*- coding: utf-8 -*- import requests import json class DLF: def __init__(self, corpid, corpsecret): self.url = 'https://qyapi.weixin.qq.com/cgi-bin' self.corpid = corpid self.corpsecret = corpsecret self._token = self._get_token() def _get_token(self): ''' 获取企业微信API接口的access_token :return: ''' token_url = self.url + '/gettoken?corpid=%s&corpsecret=%s' %(self.corpid, self.corpsecret) try: res = requests.get(token_url).json() token = res['access_token'] return token except Exception as e: return str(e) def _get_media_id(self, file_obj): get_media_url = self.url + '/media/upload?access_token={}&type=file'.format(self._token) data = {'media': file_obj} try: res = requests.post(url=get_media_url, files=data) media_id = res.json()['media_id'] return media_id except Exception as e: return str(e) def send_text(self, agentid, content, touser=None, toparty=None): send_msg_url = self.url + '/message/send?access_token=%s' % (self._token) send_data = { 'touser': touser, 'toparty': toparty, 'msgtype': 'text', 'agentid': agentid, 'text': { 'content': content } } try: res = requests.post(send_msg_url, data=json.dumps(send_data)) except Exception as e: return str(e) def send_image(self, agentid, file_obj, touser=None, toparty=None): media_id = self._get_media_id(file_obj) send_msg_url = self.url + '/message/send?access_token=%s' % (self._token) send_data = { 'touser': touser, 'toparty': toparty, 'msgtype': 'image', 'agentid': agentid, 'image': { 'media_id': media_id } } try: res = requests.post(send_msg_url, data=json.dumps(send_data)) except Exception as e: return str(e) + main脚本 # -*- coding: utf-8 -*- from plugins.weather_forecast import weather from plugins.trend_chart import Future_weather_states from plugins.send_wechat import DLF import os # 企业微信相关信息 corpid = 'xxx' corpsecret = 'xxx' agentid = 'xxx' # 天气预报趋势图保存路径 _path = os.path.dirname(os.path.abspath(__file__)) save_path = os.path.join(_path ,'./tmp/weather_forecast.jpg') # 获取天气预报信息 content = weather('大兴') # 发送文字消息 dlf = DLF(corpid, corpsecret) dlf.send_text(agentid=agentid, content=content['res'], toparty='1') # 生成天气预报趋势图 Future_weather_states(content['source_data']['forecast'], save_path) # 发送图片消息 file_obj = open(save_path, 'rb') dlf.send_image(agentid=agentid, toparty='1', file_obj=file_obj) Shell 脚本部分SVN 完整备份
zabbix 监控用户密码过期diskarray=(`awk -F':' '$NF ~ /\/bin\/bash/||/\/bin\/sh/{print $1}' /etc/passwd`) length=${#diskarray[@]} printf '{\n' printf '\t''\'data\':[' for ((i=0;i<$length;i++)) do printf '\n\t\t{' printf '\'{#USER_NAME}\':\'${diskarray[$i]}\'}' if [ $i -lt $[$length-1] ];then printf ',' fi done printf '\n\t]\n' printf '}\n' 检查用户密码过期 #!/bin/bash export LANG=en_US.UTF-8 SEVEN_DAYS_AGO=$(date -d '-7 day' +'%s') user='$1' # 将Sep 09, 2018格式的时间转换成unix时间 expires_date=$(sudo chage -l $user | awk -F':' '/Password expires/{print $NF}' | sed -n 's/^ //p') if [[ '$expires_date' != 'never' ]];then expires_date=$(date -d '$expires_date' +'%s') if [ '$expires_date' -le '$SEVEN_DAYS_AGO' ];then echo '1' else echo '0' fi else echo '0' fi 构建本地YUM
读者需求解答负载高时,查出占用比较高的进程脚本并存储或推送通知# 物理cpu个数 physical_cpu_count=$(egrep 'physical id' /proc/cpuinfo | sort | uniq | wc -l) # 单个物理cpu核数 physical_cpu_cores=$(egrep 'cpu cores' /proc/cpuinfo | uniq | awk '{print $NF}') # 总核数 total_cpu_cores=$((physical_cpu_count*physical_cpu_cores)) # 分别是一分钟、五分钟、十五分钟负载的阈值,其中有一项超过阈值才会触发 one_min_load_threshold='$total_cpu_cores' five_min_load_threshold=$(awk 'BEGIN {print ''$total_cpu_cores'' * '0.8'}') fifteen_min_load_threshold=$(awk 'BEGIN {print ''$total_cpu_cores'' * '0.7'}') # 分别是分钟、五分钟、十五分钟负载平均值 one_min_load=$(uptime | awk '{print $(NF-2)}' | tr -d ',') five_min_load=$(uptime | awk '{print $(NF-1)}' | tr -d ',') fifteen_min_load=$(uptime | awk '{print $NF}' | tr -d ',') # 获取当前cpu 内存 磁盘io信息,并写入日志文件 # 如果需要发送消息或者调用其他,请自行编写函数即可 get_info(){ log_dir='cpu_high_script_log' test -d '$log_dir' || mkdir '$log_dir' ps -eo user,pid,%cpu,stat,time,command --sort -%cpu | head -10 > '$log_dir'/cpu_top10.log ps -eo user,pid,%mem,rss,vsz,stat,time,command --sort -%mem | head -10 > '$log_dir'/mem_top10.log iostat -dx 1 10 > '$log_dir'/disk_io_10.log } export -f get_info echo '$one_min_load $one_min_load_threshold $five_min_load $five_min_load_threshold $fifteen_min_load $fifteen_min_load_threshold' | \ awk '{ if ($1>=$2 || $3>=$4 || $5>=$6) system('get_info') }' |
|
来自: 阿明哥哥资料区 > 《94.数学.数据.python》