Contents
5. 工作中的Python脚本分享¶
5.1. 1.多线程爬取图片的例子(centos6.8 x86_64下通过)¶
# !/usr/bin/env python
# -*- coding:UTF-8 -*-
import re
import os
import urllib
import threading
import time
import Queue
def getHtml(url):
html_page = urllib.urlopen(url).read()
return html_page
# 提取网页中图片的URL
def getUrl(html):
pattern = r'src="(.+?\.jpg)" pic_ext' # 正则表达式匹配图片
imgre = re.compile(pattern)
imglist = re.findall(imgre, html)
# re.findall(pattern,string)在string中寻找所有匹配成功的字符串,以列表形式返回值
return imglist
class getImg(threading.Thread):
def __init__(self, queue):
# 进程间通过队列通信,所以每个进程需要用到同一个队列初始化
threading.Thread.__init__(self)
self.queue = queue
self.start() # 启动线程
# 使用队列实现进程间通信
def run(self):
global count
while (True):
imgurl = self.queue.get()
print self.getName()
# urllib.urlretrieve(url,filname) 将url的内容提取出来,并存入filename中
urllib.urlretrieve(imgurl, '/root/python/images/%s.jpg' % count)
print "%s.jpg done" % count
count += 1
if self.queue.empty():
break
self.queue.task_done()
# 当使用者线程调用 task_done() 以表示检索了该项目、并完成了所有的工作时,那么未完成的任务的总数就会减少。
def main():
global count
url = "http://tieba.baidu.com/p/2460150866" # 爬虫程序要抓取内容的网页地址
html = getHtml(url)
imglist = getUrl(html)
threads = []
count = 0
queue = Queue.Queue()
# 将所有任务加入队列
for i in range(len(imglist)):
queue.put(imglist[i])
# 多线程爬取图片
for i in range(8):
thread = getImg(queue)
threads.append(thread)
# 合并进程,当子进程结束时,主进程才可以执行
for thread in threads:
thread.join()
if __name__ == '__main__':
if not os.path.exists("/root/python/images"):
os.makedirs("/root/python/images")
main()
print "多线程爬取图片任务已完成!"
5.2. 2.利用Python脚本发送工作邮件¶
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import smtplib
from email.mime.text import MIMEText
import string
import os
mail_host = "mail.example.com.cn"
mail_subject = "hostname名字不规则的机器列表"
# mail_reciver = ["yuhc@example.com.cn"]
mail_reciver = ["devops@example.com.cn", "admin@example.com.cn", "sa@example.com.cn"]
# mail_cc=["wangmiao@example.com.cn","nocdev@example.com"]
# mail_reliver以列表的形式存在,如果是单个收件地址,建议也以方式,即mail_reciver = ["yuhc@example.com.cn"]
mail_from = "yhc@example.com.cn"
text = open('/data/report/hostname_report.txt', 'r')
# body = string.join((text.read().strip()), "\r\n")
body = "ALL:\r\n" + " 你好,下面是我们全网内hostname名字不规范的列表,已经依次列出,麻烦将其改正并修正至 CMDB 系统,谢谢,列表如下所示:\r\n" + "\r\n" + text.read() + "\r\n" + "-------" + "\r\n" + "运维开发 | 余洪春"
text.close()
# body = str(body)
msg = MIMEText(body, format, 'utf-8')
msg['Subject'] = mail_subject
msg['From'] = mail_from
msg['To'] = ",".join(mail_reciver)
# msg['Cc'] = ",".join(mail_cc)
# 以下两行代码加上前面的MIMEText中的'utf-8'都是为了解决邮件正文乱码问题.
msg["Accept-Language"] = "zh-CN"
msg["Accept-Charset"] = "ISO-8859-1,utf-8"
# 发送邮件至相关人员
try:
server = smtplib.SMTP()
server.connect(mail_host, '25')
# 注意这里用到了starttls
server.starttls()
server.login("yhc@example.com.cn", "yhc123456")
server.sendmail(mail_from, mail_reciver, msg.as_string())
server.quit()
except Exception, e:
print "发送邮件失败" + str(e)
5.3. 3.监测IP地址是否占用¶
#!/usr/bin/env python
from threading import Thread
import subprocess
from Queue import Queue
num_threads = 8
list = []
for host in range(1,254):
ip = "192.168.185." + str(host)
list.append(ip)
q=Queue()
def pingme(i,queue):
while True:
ip=queue.get()
print 'Thread %s pinging %s' %(i,ip)
ret=subprocess.call('ping -c 1 %s' % ip,shell=True,stdout=open('/dev/null','w'),stderr=subprocess.STDOUT)
if ret==0:
print '%s is alive!' %ip
elif ret==1:
print '%s is down...'%ip
queue.task_done()
#start num_threads threads
for i in range(num_threads):
t=Thread(target=pingme,args=(i,q))
t.setDaemon(True)
t.start()
for ip in list:
q.put(ip)
print 'main thread waiting...'
q.join();
print 'Done'
5.4. 4.监测redis是否正常运行¶
#!/usr/bin/env python
import redis
import sys
STATUS_OK = 0
STATUS_WARNING = 1
STATUS_CRITICAL = 2
HOST = sys.argv[1]
PORT = int(sys.argv[2])
WARNING = float(sys.argv[3])
CRITICAL = float(sys.argv[4])
def connect_redis(host, port):
r = redis.Redis(host, port, socket_timeout = 5, socket_connect_timeout = 5)
return r
def main():
r = connect_redis(HOST, PORT)
try:
r.ping()
except:
print HOST,PORT,'down'
sys.exit(STATUS_CRITICAL)
redis_info = r.info()
used_mem = redis_info['used_memory']/1024/1024/1024.0
used_mem_human = redis_info['used_memory_human']
if WARNING <= used_mem < CRITICAL:
print HOST,PORT,'use memory warning',used_mem_human
sys.exit(STATUS_WARNING)
elif used_mem >= CRITICAL:
print HOST,PORT,'use memory critical',used_mem_human
sys.exit(STATUS_CRITICAL)
else:
print HOST,PORT,'use memory ok',used_mem_human
sys.exit(STATUS_OK)
if __name__ == '__main__':
main()
5.5. 5.调用有道词典的API翻译英文¶
#!/usr/bin/env python
#-*- encoding=utf-8 -*-
import urllib
import json
url='http://fanyi.youdao.com/translate?smartresult=dict&smartresult=rule&smartresult=ugc&sessionFrom=dict2.index'
#建立一个字典
data = {}
data['i'] = '余洪春是帅哥'
data['from'] = 'AUTO'
data['to'] = 'AUTO'
data['smartresult'] = 'dict'
data['client'] = 'fanyideskwe'
data['salt'] = '1506219252440'
data['sign'] = '0b8cd8f9b8b14'
data['doctype'] = 'json'
data['version'] = '2.1'
data['keyfrom'] = 'fanyi.web'
data['action'] = 'FY_BY_CLICK'
data['typoResult'] = 'true'
#在这里还不能直接将data作为参数,需要进行一下数据的解析才可以
#encode是将Unicode的编码转换成utf-8编码
#data=urllib.urlencode(data).encode('utf-8')
#另一种写法,urlencode将字典转换成url参数
data = urllib.urlencode(data)
response=urllib.urlopen(url,data)
#decode作用是将其他形式的编码转换成python使用的Unicode编码
#html=response.read().decode('utf-8')
#另一种写法
html = response.read()
target=json.loads(html)
print(target['translateResult'][0][0]['tgt'])
5.6. 6.监测Nginx活动连接数¶
import urllib2, base64, sys
import getopt
import re
def Usage():
print "Usage: getWowzaInfo.py -a [active|accepted|handled|request|reading|writting|waiting]"
sys.exit(2)
def main():
if len(sys.argv) < 2:
Usage()
try:
opts, args = getopt.getopt(sys.argv[1:], "a:")
except getopt.GetoptError:
Usage()
# Assign parameters as variables
for opt, arg in opts :
if opt == "-a" :
getInfo = arg
url="http://127.0.0.1:80/ngserver_status"
request = urllib2.Request(url)
result = urllib2.urlopen(request)
buffer = re.findall(r'\d+', result.read())
if ( getInfo == "active"):
print buffer[0]
elif ( getInfo == "accepted"):
print buffer[1]
elif ( getInfo == "handled"):
print buffer[2]
elif ( getInfo == "requests"):
print buffer[3]
elif ( getInfo == "reading"):
print buffer[4]
elif ( getInfo == "writting"):
print buffer[5]
elif ( getInfo == "waiting"):
print buffer[6]
else:
print "unknown"
sys.exit(1)
if __name__ == "__main__":
main()