python 怎么水土保持监测招标文件一个文件

> 博客详情
摘要: python 下使用watchdog 来监视文件目录,触发相应的操作。
该例子为用watchdog来监视新文件,当新文件来时候,调用相应的解析脚本,进行解析入库。
&&&&--脚本1.py
&&&&--脚本2.py
&&&&--已处理的文件1,
&&&&--已处理的文件2
watchdog.py
watchdog.py 文件,该业务只监听新文件创建的事件:
#&coding=utf8
import&sys
import&time
import&logging
import&imp
import&scripts.CONFIG
from&watchdog.observers&import&Observer
from&watchdog.events&import&LoggingEventHandler
from&watchdog.events&import&FileSystemEventHandler
class&CreatedEventHandler(FileSystemEventHandler):
&&&&def&__init__(self):
&&&&&&&&FileSystemEventHandler.__init__(self)
&&&&def&on_created(handler,event):
&&&&&&&&file_name&=&event.src_path[2:]
&&&&&&&&print&'--'+file_name
&&&&&&&&moduleName&=&''
&&&&&&&&for&key&in&parse_map.keys():
&&&&&&&&&&&&if(re.match(key,file_name)):
&&&&&&&&&&&&&&&&moduleName&=&parse_map[key]
&&&&&&&&&&&&&&&&break
&&&&&&&&if(moduleName&!=&''):
&&&&&&&&&&&&try:
&&&&&&&&&&&&&&&&#动态加载相应的module
&&&&&&&&&&&&&&&&parseModule&=&&imp.load_module(moduleName,*imp.find_module(moduleName,['./scripts/']))
&&&&&&&&&&&&&&&&print&'&&load&module:&'&+&moduleName
&&&&&&&&&&&&&&&&parseModule.parse(file_name)
&&&&&&&&&&&&except&Exception,e:
&&&&&&&&&&&&&&&&print&e
#正则匹配,将文件match到相应的解析脚本上
parse_map={
&&&&&&&&&&&&'^test.xlsx$':'test',
&&&&&&&&&&&&'^emt_finance.*\.xlsx':'emt_finance'
if&__name__&==&"__main__":
&&&&logging.basicConfig(,&format='%(asctime)s&-&%(message)s',datefmt='%Y-%m-%d&%H:%M:%S')
&&&&path&=&sys.argv[1]&if&len(sys.argv)&&&1&else&'.'
&&&&event_handler&=&CreatedEventHandler()
&&&&observer&=&Observer()
&&&&observer.schedule(event_handler,&path,&recursive=False)
&&&&observer.start()
&&&&print&'Watching...'
&&&&&&&&while&True:
&&&&&&&&&&&&time.sleep(1)
&&&&except&KeyboardInterrupt:
&&&&&&&&observer.stop()
&&&&observer.join()
解析脚本test.py
#&FileName:&test.py
#&a&simple&test&code&
import&xlrd
import&MySQLdb
import&datetime
import&stat
import&shutil
def&parse(file):
&&&&values&=&[]
&&&&if(file.split('.')[-1]&!=&'xlsx'):
&&&&&&&&print&'---skip'&+&file
&&&&&&&&return
&&&&&&&&data&=&xlrd.open_workbook(file)
&&&&&&&&table&=&data.sheets()[0]
&&&&&&&&for&i&in&range(1,table.nrows):
&&&&&&&&&&&&row&=&table.row_values(i)
&&&&&&&&&&&&#excel&date&is&the&days&from&
&&&&&&&&&&&&row[0]&=&datetime.date()&+&datetime.timedelta(row[0])
&&&&&&&&&&&&values.append(row)
&&&&except&Exception,e:
&&&&&&&&print&e
&&&&#print&values
&&&&&&&&conn&=&MySQLdb.connect(config.mysql_host,config.mysql_user,config.mysql_passwd,'test',config.mysql_port)
&&&&&&&&cur&=&conn.cursor()
&&&&&&&&#values[0][5]=4
&&&&&&&&for&v&in&values:
&&&&&&&&&&&&count&=&cur.execute('replace&into&testtable&values(%s,%s,%s,%s,%s,%s)',v)
&&&&&&&&mit()
&&&&&&&&print&'&&parse&complete.'
&&&&&&&&'''
&&&&&&&&results=cur.fetchmany(5)
&&&&&&&&for&r&in&results:
&&&&&&&&&&&&print&r
&&&&&&&&'''
&&&&&&&&cur.close()
&&&&&&&&conn.close()
&&&&&&&&#os.remove('tmp/test.pyc')
&&&&&&&&if(os.path.exists('tmp/'+file)):
&&&&&&&&&&&&os.chmod('tmp/'+file,stat.S_IWRITE)#去掉只读属性
&&&&&&&&&&&&os.remove('tmp/'+file)&#删除它
&&&&&&&&shutil.move(file,'tmp/')
&&&&&&&&print&'&&move&filt&to&temp:&'&+&file
&&&&&&&&print&'&&success!'
&&&&&&&&&&&&
&&&&except&MySQLdb.Error,e:
&&&&&&&&print&"Mysql&Error&%d:&%s"&%&(e.args[0],&e.args[1])
if&__name__&==&'__main__':
&&&&exec&"import&CONFIG&as&config"
&&&&print&'=='*10
&&&&#os.remove('../tmp/test.xlsx')
&&&&#shutil.move('../test.xlsx','tmp/')
&&&&#parse('../test.xlsx')
&&&&exec&"import&scripts.CONFIG&as&config"
人打赏支持
码字总数 46226
支付宝支付
微信扫码支付
打赏金额: ¥
已支付成功
打赏金额: ¥
& 开源中国(OSChina.NET) |
开源中国社区(OSChina.net)是工信部
指定的官方社区Sponsered by
python 监控文件或目录变化
我们经常会遇到监控一个文件或目录的变化,如果有变化,把文件上传备份至备份主机,并且我们还要监控上传过程是否有问题等,根据此需求,查阅了相关的一些材料,编写如下脚本实现这个功能:
#!/usr/bin/env python
#coding=utf-8
#######################
#Status wd gs/ccs sql file changed
#文件有变化上传至备份主机,上传之后验证文件是否正确
#######################
import paramiko,os,sys,datetime,time,MySQLdb
from pyinotify import WatchManager, Notifier, ProcessEvent, IN_DELETE, IN_CREATE,IN_MODIFY
CREATE TABLE `wddel_log.status_sql` (
`ip` varchar(16) NOT NULL COMMENT '机器IP',
`tar_name` varchar(50) NOT NULL COMMENT '备份文件名字',
`md5` varchar(50) NOT NULL COMMENT '备份文件 MD5',
`flag` int(2) NOT NULL COMMENT '0:成功;1:失败',
`error_log` varchar(100) NOT NULL COMMENT '错误日志',
`uptime` datetime NOT NULL COMMENT '更新时间',
KEY `ip` (`ip`),
KEY `uptime` (`uptime`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8'''#日志表创建脚本
GM_path='/home/asktao/'
center_hostname='192.168.1.100'
center_username='root'
center_password='123456'
center_port=63008
def log2db(ip,tar_name,md5,flag,error='0'):#删除日志入库
tar_name = os.path.split(tar_name)[1]
= time.strftime("%Y-%m-%d %H:%M:%S")
conn = MySQLdb.connect(host = '192.168.1.104',user = 'root',passwd = '1q2w3e4r',charset='utf8',connect_timeout=20)
cursor = conn.cursor()
sql = "SELECT ip FROM wddel_log.status_sql WHERE ip='%s'" % ip
cursor.execute(sql)
res = cursor.fetchall()
if len(res)==0:
inster_sql = "insert into wddel_log.status_sql VALUES('%s','%s','%s',%s,'%s','%s')" % (ip,tar_name,md5,flag,error,now)
cursor.execute(inster_sql)
update_sql = "UPDATE wddel_log.status_sql SET md5='%s',flag='%s',error_log='%s',uptime='%s' WHERE ip='%s'" % (md5,flag,error,now,ip)
cursor.execute(update_sql)
cursor.close()
conn.close()
except Exception,e:
def find_ip():#获取本地 eth0 的 IP 地址
ip = os.popen("/sbin/ip a|grep 'global eth0'").readlines()[0].split()[1].split("/")[0]
if "192.168." in ip:
ip = os.popen("/sbin/ip a|grep 'global eth1'").readlines()[0].split()[1].split("/")[0]
def md5sum(file_name):#验证 sql 打包文件的 MD5
if os.path.isfile(file_name):
f = open(file_name,'rb')
py_ver = sys.version[:3]
if py_ver == "2.4":
import md5 as hashlib
import hashlib
md5 = hashlib.md5(f.read()).hexdigest()
return md5
def center_md5(file_name):#上传至备份中心的文件的 MD5
s=paramiko.SSHClient()
s.set_missing_host_key_policy(paramiko.AutoAddPolicy())
s.connect(hostname = center_hostname,port=center_port,username=center_username, password=center_password)
conm = "/usr/bin/md5sum %s" % file_name
stdin,stdout,stderr=s.exec_command(conm)
result = stdout.readlines()[0].split()[0].strip()
return result
except Exception,e:
def back_file(ip,tar_name,tar_md5):#上传文件到备份中心
remote_dir='/data/sql'
file_name=os.path.join(remote_dir,os.path.split(tar_name)[1])
t=paramiko.Transport((center_hostname,center_port))
t.connect(username=center_username,password=center_password)
sftp=paramiko.SFTPClient.from_transport(t)
sftp.put(tar_name,file_name)
#print "%s back_file OK" % tar_name
os.remove(tar_name)
remot_md5=center_md5(file_name)
if remot_md5 == tar_md5:
log2db(ip,tar_name,tar_md5,0)
log2db(ip,tar_name,tar_md5,1,'remot_md5!=tar_md5')
except Exception,e:
#print "connect error!"
log2db(ip,tar_name,tar_md5,1,e)
os.remove(tar_name)
def back_sql():#执行备份
ip = find_ip()
tar_name = "/tmp/%s.tar.gz" % ip
sql_conn = "/usr/bin/find %s -type f
-name '*.sql'|/usr/bin/xargs /bin/tar zcvPf %s" % (GM_path,tar_name)
sql_tar = os.popen(sql_conn).readlines()
tar_md5 = md5sum(tar_name)
if tar_md5 != 0:
back_file(ip,tar_name,tar_md5)
error_log =
"%s not find" % tar_name
log2db(ip,tar_name,tar_md5,0,error_log)
class PFilePath(ProcessEvent):#文件变化的触发
def process_IN_CREATE(self, event):
if os.path.splitext(event.name)[1] == ".sql":
text = "Create file: %s " % os.path.join(event.path, event.name)
#print text
back_sql()
def process_IN_MODIFY(self, event):
if os.path.splitext(event.name)[1] == ".sql":
text = "Modify file: %s " % os.path.join(event.path, event.name)
#print text
back_sql()
def FSMonitor():#主监控函数
back_sql()#运行脚本先备份 sql 文件
wm = WatchManager()
mask = IN_CREATE |IN_MODIFY
notifier = Notifier(wm, PFilePath())
wdd = wm.add_watch(GM_path, mask, rec=True)
print 'now starting monitor %s' % (GM_path)
while True:
notifier.process_events()
if notifier.check_events():
notifier.read_events()
except KeyboardInterrupt:
notifier.stop()
if __name__ == "__main__":
FSMonitor()
此脚本中主要用到 paramiko 和 pyinotify 模块,关于 paramiko 的讲解可以参见:一文,pyinotify 的用法可以参见官方文档:如何通过python实现实时监控文件? _ 路由器设置|192.168.1.1|无线路由器设置|192.168.0.1 - 路饭网
您的位置: >
> 阅读资讯:如何通过python实现实时监控文件?
如何通过python实现实时监控文件?
如何通过python实现实时监控文件?
在业务稳定性要求比较高的情况下,运维为能及时发现问题,有时需要对应用程序的日志进行实时分析,当符合某个条件时就立刻报警,而不是被动等待出问题后去解决,比如要监控nginx的$request_time和$upstream_response_time时间,分析出最耗时的请求,然后去改进代码,这时就要对日志进行实时分析了,发现时间长的语句就要报警出来,提醒开发人员要关注,当然这是其中一个应用场景,通过这种监控方式还可以应用到任何需要判断或分析文件的地方,所以今天我们就来看看如何用python实现实时监控文件,我给三个方法实例::
这个是最简单的和容易理解的,因为大家都知道linux下有tail命令,所以你可以直接用Popen()函数去调用这个命令来执行获取输出,代码如下:
logfile='access.log'
command='tail -f &+logfile+'|grep &timeout&&
popen=subprocess.Popen(command,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
while True:
line=popen.stdout.readline().strip()
print line
采用python对文件的操作来实现,用文件对象的tell(), seek()方法分别得到当前文件位置和要移动到的位置,代码如下:
import time
file = open(&access.log')
where = file.tell()
line = file.readline()
if not line:
time.sleep(1)
file.seek(where)
print line,
利用python的 yield来实现一个生成器函数,然后调用这个生成器函数,这样当日志文件有变化时就打印新的行,代码如下:
import time
def follow(thefile):
thefile.seek(0,2)
while True:
line = thefile.readline()
if not line:
time.sleep(0.1)
yield line
if __name__ == &__main__':
logfile = open(&access-log&,&r&)
loglines = follow(logfile)
for line in loglines:
print line,
最后解释下seek()函数的用法,这个函数接收2个参数:file.seek(off, whence=0),从文件中移动off个操作标记(文件指针),正数往结束方向移动,负数往开始方向移动。如果设定了whence参数,就以whence设定的起始位为准,0代表从头开始,1代表当前位置,2代表文件最末尾位置。
以上就是三个常用方法,具体日志分析的代码大家可以根据自己的业务逻辑去实现,完毕。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持路饭。
本文地址:
相关文章列表酷勤网 C 程序员的那点事!
当前位置: >
浏览次数:次
最近使用python写了个小监控,为什么使用python?简单、方便、好管理。
首先我也是个python小白,就现学现用了。
首先你得用过C/C++、java、Javascript等的一种,编程小白估计比较艰难,有一定编程经验的python小白相对简单些。笔者是参考的这篇文章,写的相当不错,但是我看完用了30分钟,哈哈。
1.1 Hello World!
Python安装比较简单,到官网上下载安装包,一路下一步就可以了。因为我的服务器上安装的是2.6.6,所以我也下了这个版本。话说2.x的差别不是很大,如果想用3.x,可能下面的代码直接运行不过,不过也差不多,稍微改改即可。
新建一个文件,命名为hello.py。使用python的IDLE打开hello.py,写入以下代码:
print &Hello World!&
按F5,就可以看见输出结果了。
1.2 基本语法
每一行是一条语句。C语言是通过分号&;&;
通过缩进来组织代码块。C语言是通过大括号&{}&;
注释使用井号&#&。
1.3 数据类型、运算符、数据结构
运算符和C语言差不多,C语言有的基本上直接用就可以。
数据类型有数值型,字符串。数据结构有 list, tuple, dict, set。介绍一下tuple, 不能修改,通过索引进行查找。dict类似于map,存放键值对。来看例子,看看tuple使用:
&&& t=(1,2,[1,2])&&& t[2][1, 2]
1.4 流程控制
Python中可以使用if elif else、for和 while 来实现流程控制。同样有 break 和 continue。有一点和C不同,如果有一个分支什么都不做,要使用 pass。例如
list=[0, 1, 2, 3, 4, 5]for item in list:
if item == 1:
print item
elif item in (2, 3, 4, 5):
print &aha & + str(item)
运行结果是:
1.5 模块组织
有方法和类。
方法这样定义
def func(var):
some code here
类和C++等有些不同
class MyClass(object):
common = 1
def __init__(self):
self.myvariable = 5
def myfunction(self, arg1, arg2):
return self.myvariable
common变量相当于C++中用 static 修饰的变量,所有类通用;继承也非常简单,可以看看开始推荐的那篇文章。
1.6 异常处理
异常处理非常简单,直接贴代码了:
def some_function():
# Division by zero raises an exception
except ZeroDivisionError:
print &Oops, invalid.&
# Exception didn't occur, we're good.
# This is executed after the code block is run
# and all exceptions have been handled, even
# if a new exception is raised while handling.
print &We're done with that.&
1.7 工程组织
直接引用库,或者从库中引入某一个方法或变量。
import randomfrom time import clock
2. 数据库查询
既然是监控,免不了和数据库打交道。我使用的是PostgreSQL,所以就介绍一下python怎么调用postgres。其他数据库类似,自行google就好了。
连接postgres首先要安装一个库psycopg2,Windows下直接下载安装即可,注意选对版本。我的服务器是CentOS,安装直接运行
yum install python-psycopg2
2.1 首先创建数据库连接
#get database connectdef get_con():
host = '127.0.0.1'
port = &5432&
database = 'platform'
user = 'postgres'
password = 'postgres'
conn = psycopg2.connect(database=database, user=user, password=password, host=host, port=port)
return conn
2.2 执行SQL语句
#执行sql查询def query(conn, sql):
cursor = conn.cursor()
cursor.execute(sql)
results = cursor.fetchall()&
#close cursor
cursor.close()
return results
2.3 然后就可以写具体业务了
def getUsers():
conn = get_con()#open connect
sql = &&&select *
from t_user
order by intime DESC
limit 5&&&
items = query(conn , sql)
print str(items)
conn.close()
#close connect
注意3个引号&&&,就是普通字符串,不过可以换行。
3. 发送邮件
查询到数据之后不能及时通知管理员的话监控就没有意义了。所以我们通过邮件来通知,直接使用python的标准库 smtplib 就可以了。写个发送邮件的函数:
#发送邮件def send_email(subject, content):
sender = &yourmail@***.com&
password = &******& #密码是看不见的哦
receivers = [] #本人真实邮箱,欢迎发邮件讨论技术问题
host = &smtp.&
port = 465
msg = MIMEText(content,'html','utf-8')
msg['From'] = sender
msg['To'] = &,&.join(receivers)
msg['Subject'] = Header(subject, 'utf-8')
smtp = smtplib.SMTP_SSL(host, port)
smtp.login(sender, password)
smtp.sendmail(sender, receivers, msg.as_string())
except Exception, e:
logger.error(e)
发送邮件时我们使用了logger,这个logger是怎么来的呢?新建一个log.py,代码如下
# coding=utf-8import loggingimport logging.handlers&logger = logging.getLogger('monitor')logger.setLevel(logging.DEBUG)&filehandler = logging.handlers.TimedRotatingFileHandler(
&/mnt/log/monitor/monitor_log&, 'midnight', 1, 7)&# 设置文件后缀名称filehandler.suffix = &%Y%m%d.log&formatter = logging.Formatter('%(asctime)s-%(name)s-%(levelname)s: %(message)s')filehandler.setFormatter(formatter)&logger.addHandler(filehandler)
通过logging.getLogger(&monitor&)生成一个logger,然后配置一个文件处理器。
然后在我们监控程序中引用即可:
from log import logger
5. 把可配置信息放到配置文件中
如果我们添加一个管理员怎么办?如果我们的邮箱密码变了怎么办?直接修改python文件啊,哈哈。python不用编译直接改代码就好了,可是我们的程序以后要打包呢,所以最好写个配置文件,python的配置文件读取非常简单,使用python库 ConfigParser 即可:
config = None#get configdef getConfig():
global config
if config is None:
config = ConfigParser.ConfigParser()
config.read(&monitor.ini&)
return config
然后这样使用:
#get database connectdef get_con():
host = getConfig().get('db', 'host')
port = getConfig().get('db', 'port')
database = getConfig().get('db', 'database')
user = getConfig().get('db', 'user')
password = getConfig().get('db', 'password')
conn = psycopg2.connect(database=database, user=user, password=password, host=host, port=port)
return conn
#发送邮件def send_email(subject, content):
sender = getConfig().get('mail', 'sender')
password = getConfig().get('mail', 'password')
receivers = getConfig().get('mail', 'receivers').split(&,&)
host = getConfig().get('mail', 'host')
port = getConfig().getint('mail', 'port')
msg = MIMEText(content,'html','utf-8')
msg['From'] = sender
msg['To'] = &,&.join(receivers)
msg['Subject'] = Header(subject, 'utf-8')
smtp = smtplib.SMTP_SSL(host, port)
smtp.login(sender, password)
smtp.sendmail(sender, receivers, msg.as_string())
logger.exception(&Exception: &)
配置文件是monitor.ini,内容如下:
#数据库配置[db]host = 127.0.0.1port = 5432database = platformuser = postgrespassword = postgres#邮件配置[mail]sender = password = ******#多个联系人用英文逗号隔开receivers = host = smtp.port = 465
6. 加点控制
我们每5分钟查一下数据,可是业务sql只能查询最近的几条,所以要加个时间段限制,弄个开始、结束时间。
start_time = & 16:24:24&end_time = None&#update end_time, invoke before get new datadef update_end_time():
global end_time
now = time.mktime(datetime.now().timetuple())
end_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(now))
return end_time#update end_time, invoke after get new datadef update_start_time():
global start_time
global end_time
start_time = end_time
return start_time
getUsers可以改写成:
def getUsers (conn):
global start_time
global end_time
sql = &&&select *
from t_user
intime&=&&& +&'&+start_time+&' and intime&&+&'&+end_time+&';&
items = query(conn, sql)
if items is not None and len(items)&0:
count = len(items)
tip = &又有&+str(count)+&个用户已经注册了。&+end_time
send_email(tip, tip+&n&+str(items))
然后写个统一的调度:
def task():
#init end_time and start_time, must init end_time first!!!
end_time = update_end_time()
start_time = update_start_time()
#init config
getConfig()&
while True:
conn = get_con()
#open connect
end_time = update_end_time()&
编程编程编程编程## process 编程编程编程编程##
(&query: &+end_time)
getUsers (conn)
#do some task else here&
update_start_time()
conn.close()#close connect
time.sleep(5*60)
#end of while&def run_monitor():
monitor = threading.Thread(target=task)
monitor.start()if __name__ == &__main__&:
run_monitor()
在task这个函数的while中,首先更新end_time,也就是当前时间;执行完再把start_time更新成刚刚的end_time,这样就不会有漏网之鱼了。还有一个需要注意的地方,关键字global。 在python中,使用全局变量是需要global关键字进行声明的,否则会出问题。
打开linux 控制台,直接运行python monitor.py是可以运行的,可是shell一旦退出,任务也就停止了。于是我就选择了一个进程管理工具:Supervisor。Supervisor 在进程中断时还能自动重启。
7.1. 安装supervisor
首先安装python-setuptools
yum install python-setuptools
安装supervisor
easy_install supervisor
生成supervisor配置文件
echo_supervisord_conf & /etc/supervisord.conf
然后在/etc/supervisord.conf添加:
[program:monitor]command = python /usr/monitor/monitor.pydirectory = /usr/monitoruser = root
7.2. 运行监控
然后在终端中运行supervisord启动supervisor。
在终端中运行supervisorctl,进入shell,运行status查看脚本的运行状态。
7.3. 关闭监控 以及常用命令
以下命令全部在supervisorctl的shell中执行。
shutdown停止Supervisor(子进程也会被停止) ;
start monitor开启monitor进程服务(一旦monitor进程退出,会自启动) ;
stop monitor关闭monitor进程服务 ;
restart monitor关闭正在运行的monitor进程,并且重新启动monitor进程服务 ;
reload重新加载supervisor配置文件 ;
exit退出supervisorctl的shell。
程序基本上就写完了,也可以跑起来了。中间借鉴过好多网络上的博客,但是记不住了,在这谢谢博主们。希望我的这篇文者可以帮到读友们,也欢迎大家给我发邮件讨论。其中很多英文注释还是很蹩脚的,哈哈,写代码有时比较懒,不想切换中文输入法了,大家凑合着看吧。
& 相关主题:

我要回帖

更多关于 python 监测文件变化 的文章

 

随机推荐