如何申请网站com域名,qq网页版在线登录官网,郑州小程序制作流程及费用,企业管理公司是干嘛的错误、调试和测试
程序运行中#xff0c;可能会遇到BUG、用户输入异常数据以及其它环境的异常#xff0c;这些都需要程序猿进行处理。Python提供了一套内置的异常处理机制#xff0c;供程序猿使用#xff0c;同时PDB提供了调试代码的功能#xff0c;除此之外#xff0c;…错误、调试和测试
程序运行中可能会遇到BUG、用户输入异常数据以及其它环境的异常这些都需要程序猿进行处理。Python提供了一套内置的异常处理机制供程序猿使用同时PDB提供了调试代码的功能除此之外程序猿还应该掌握测试的编写确保程序的运行符合预期。
错误处理
在一般程序处理中可以对函数的返回值进行检查是否返回了约定的错误码。例如系统程序调用的错误码一般都是-1成功返回0。但是这种方式必须用大量的代码来判断是否出错所以高级语言内置了try...except...finally的错误机制。 try:print(try...)r 10 / int(2)print(result:, r)
except ValueError as e:print(ValueError:, e)
except ZeroDivisionError as e:print(ZeroDivisionError:, e)
else:print(no error!)
finally:print(finally...)
print(‘END’
当我们认为某些代码可能会出错时就可以用try来运行这段代码如果执行出错则后续代码不会继续执行而是直接跳转至错误处理代码即except语句块执行完except后如果有finally语句块则执行finally语句块至此执行完毕。如果发生了不同的错误类型可以由不同的except语句块处理可以没有finally语句块。
Python的错误也是类所有的错误类型都继承自BaseException常见的错误类型和继承关系参考 官方文档 使用try...except捕获错误还有一个巨大的好处就是可以跨越多层调用比如函数main()调用foo()foo()调用bar()结果bar()出错了这时只要main()捕获到了就可以处理。 调用堆栈
如果错误没有被捕获它就会一直往上抛最后被Python解释器捕获打印一个错误信息然后程序退出。出错并不可怕可怕的是不知道哪里出错了。解读错误信息是定位错误的关键。
记录错误
在C语言中如果发生错误想要记录必须自己编写错误记录的程序。Python内置的logging模块可以非常容易地记录错误信息。通过配置logging还可以把错误记录到日志文件里方便事后排查。 # err_logging.pyimport loggingdef foo(s):return 10 / int(s)def bar(s):return foo(s) * 2def main():try:bar(0)except Exception as e:logging.exception(e)main()
print(END)
抛出错误
抛出错误首先需要定义一个错误 Class选择好继承关系然后用raise语句抛出一个错误实例。如果可以尽量使用Python内置的错误类型仅在非常必要的时候自己定义错误类。 # err_raise.py
class FooError(ValueError):passdef foo(s):n int(s)if n0:raise FooError(invalid value: %s % s)return 10 / nfoo(0)
调试 Debug
调试最简单的办法就是print()这个方法最简单但是在发布的时候需要把所有的调试信息注释掉。
断言 assert
凡是用print()来辅助查看的地方都可以用断言assert来替代。 def foo(s):n int(s)assert n ! 0, n is zero!return 10 / ndef main():foo(0)
assert的意思是表达式n ! 0应该是True否则根据程序运行的逻辑后面的代码肯定会出错。如果断言失败assert语句本身就会抛出AssertionError。
启动Python解释器时可以用-O参数来关闭assert。
logging
使用 logging 不仅可以抛出错误还可以输出到文件。 import logging
logging.basicConfig(levellogging.INFO)s 0
n int(s)
logging.info(n %d % n)
print(10 / n)
这就是logging的好处它允许你指定记录信息的级别有debuginfowarningerror等几个级别当我们指定levelINFO时logging.debug就不起作用了。同理指定levelWARNING后debug和info就不起作用了。这样一来你可以放心地输出不同级别的信息也不用删除最后统一控制输出哪个级别的信息。
logging的另一个好处是通过简单的配置一条语句可以同时输出到不同的地方比如console和文件。
pdb
可以在命令行下使用pdb,启动Python的调试器pdb让程序以单步方式运行可以随时查看运行状态。 # err.py
s 0
n int(s)
print(10 / n)$ python3 -m pdb err.py/Users/michael/Github/learn-python3/samples/debug/err.py(2)module()
- s ‘0
输入1可以查看代码输入n可以单步执行代码。使用p来查看变量使用q退出调试。
pdb.set_trace()
这个方法也是用pdb但是不需要单步执行我们只需要import pdb然后在可能出错的地方放一个pdb.set_trace()就可以设置一个断点。运行代码程序会自动在pdb.set_trace()暂停并进入pdb调试环境可以用命令p查看变量或者用命令c继续运行。
单元测试
单元测试是用来对一个模块、一个函数或者一个类来进行正确性检验的测试工作。
文档测试
doctest非常有用不但可以用来测试还可以直接作为示例代码。通过某些文档生成工具就可以自动把包含doctest的注释提取出来。用户看文档的时候同时也看到了doctest。
IO编程
IO就是Input / Output 也就是输入和输出。IO编程中Stream流是一个很重要的概念可以把流想象成一个水管数据就是水管里的水但是只能单向流动。
由于计算机各个部件之间的速度不一致所以处理IO问题时有两种办法同步IO、异步IO。同步和异步的区别就在于是否等待IO执行的结果。
文件读写
读写文件是最常见的IO操作。Python内置了读写文件的函数用法和C是兼容的。在磁盘上读写文件的功能都是由操作系统提供的现代操作系统不允许普通的程序直接操作磁盘所以读写文件就是请求操作系统打开一个文件对象通常称为文件描述符然后通过操作系统提供的接口从这个文件对象中读取数据读文件或者把数据写入这个文件对象写文件。
读文件 try:f open(/path/to/file, r)print(f.read())
finally:if f:f.close()with open(/path/to/file, r) as f:print(f.read())
类似于c语言open函数默认接收一个文件名、一个打开模式参数r、w默认对应文本文件rb对应二进制文件。默认打开的是UTF-8编码的文件如果需要打开其它编码的需要传入encoding参数如果文本的编码不一致可能导致读取出错可以传入错误处理参数errors。read方法一次将文件的所有内容读入内存可以通过参数指定读入的长度read(size)也可以使用readline方法每次读入一行使用readlines一次读入所有的行。文件使用后注意要进行关闭。
写文件 f open(/Users/michael/test.txt, w)f.write(Hello, world!)f.close()with open(/Users/michael/test.txt, w) as f:f.write(Hello, world!’)
写文件和读文件是一样的唯一区别是调用open()函数时传入标识符’w’或者’wb’表示写文本文件或写二进制文件。当我们写文件时操作系统往往不会立刻把数据写入磁盘而是放到内存缓存起来空闲的时候再慢慢写入。只有调用close()方法时操作系统才保证把没有写入的数据全部写入磁盘。
StringIO 和 BytesIO
很多时候数据读写不一定是文件也可以在内存中读写。StringIO顾名思义就是在内存中读写str。 from io import StringIOf StringIO()f.write(hello)
5f.write( )
1f.write(world!)
6print(f.getvalue())
hello world! from io import StringIOf StringIO(Hello!\nHi!\nGoodbye!)while True:
... s f.readline()
... if s :
... break
... print(s.strip())
...
Hello!
Hi!
Goodbye!
StringIO操作的只能是str如果要操作二进制数据就需要使用BytesIO。BytesIO实现了在内存中读写bytes。
操作文件和目录
Python内置的os模块也可以直接调用操作系统提供的接口函数。import os模块后就可以调用一些系统命令。 import osos.name # 操作系统类型
posixos.uname()
posix.uname_result(sysnameDarwin, nodenameRousseaudeMacBook-Pro.local, release15.6.0, versionDarwin Kernel Version 15.6.0: Mon Jan 9 23:07:29 PST 2017; root:xnu-3248.60.11.2.1~1/RELEASE_X86_64, machinex86_64)os.environ
environ({TERM_PROGRAM: Apple_Terminal, SHELL: /bin/bash, TERM: xterm-256color, TMPDIR: /var/folders/95/zrdts1md6j942mpyd7kd875h0000gn/T/, Apple_PubSub_Socket_Render: /private/tmp/com.apple.launchd.fhDfjTsyk6/Render, TERM_PROGRAM_VERSION: 361.1, OLDPWD: /Users/rousseau/Projects/python.my, TERM_SESSION_ID: 5A1B275C-3BE5-4673-B163-29DFF5C19C77, USER: rousseau, SSH_AUTH_SOCK: /private/tmp/com.apple.launchd.mLtAPJeOFm/Listeners, __CF_USER_TEXT_ENCODING: 0x1F5:0x0:0x0, PATH: /usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin, PWD: /Users/rousseau/Projects/python.my/mypython, XPC_FLAGS: 0x0, XPC_SERVICE_NAME: 0, SHLVL: 1, HOME: /Users/rousseau, LOGNAME: rousseau, LC_CTYPE: UTF-8, _: /usr/local/bin/python3, __PYVENV_LAUNCHER__: /usr/local/bin/python3’})os.path.abspath(.) # 查看当前目录的绝对路径:
/Users/rousseau/Projects/python.my/mypython’
# 在某个目录下创建一个新目录首先把新目录的完整路径表示出来:os.path.join(/Users/michael, testdir)
/Users/michael/testdir
# 然后创建一个目录:os.mkdir(/Users/michael/testdir)
# 删掉一个目录:os.rmdir(/Users/michael/testdir’)
因为Windows和Unix的路径表达方式不一样所以在处理路径时尽量使用Python提供的os.path.join()和os.path.split()避免处理发生问题。其它的文件处理函数os.rename、os.remove。
序列化
序列号我理解的就是将内存中变量的状态和值转换为文本以方便进行持久化的存储也可能不进行存储但是序列话之后方便进行传输。我们把变量从内存中变成可存储或传输的过程称之为序列化在Python中叫pickling在其他语言中也被称之为serializationmarshallingflattening等等都是一个意思。反过来把变量内容从序列化的对象重新读到内存里称之为反序列化即unpickling。
Python提供了pickle模块来实现序列化。 import pickled dict(nameBob, age20, score88)pickle.dumps(d)
b\x80\x03}q\x00(X\x03\x00\x00\x00ageq\x01K\x14X\x05\x00\x00\x00scoreq\x02KXX\x04\x00\x00\x00nameq\x03X\x03\x00\x00\x00Bobq\x04u.’#也可以将序列化的内容写入文本f open(dump.txt, wb)pickle.dump(d, f)f.close()#读取的过程f open(dump.txt, rb)d pickle.load(f)f.close()d
{age: 20, score: 88, name: ‘Bob’}
JSON
我们要在不同的编程语言之间传递对象就必须把对象序列化为标准格式比如XML但更好的方法是序列化为JSON因为JSON表示出来就是一个字符串可以被所有语言读取也可以方便地存储到磁盘或者通过网络传输。JSON不仅是标准格式并且比XML更快而且可以直接在Web页面中读取非常方便。
JSON表示的对象就是标准的JavaScript语言的对象JSON和Python内置的数据类型对应如下:
JSON类型Python类型{}dict[]list“string”str1234.56Int或Floattrue/falseTrue/FalsenullNone
Python内置的json模块提供了非常完善的Python对象到JSON格式的转换。 json_str {age: 20, score: 88, name: Bob}json.loads(json_str)
{age: 20, score: 88, name: ‘Bob’}
进程和线程
进程是程序运行的最小单位线程是进程内部的子任务。多任务的实现模式多进程、多线程、多进程多线程。
多进程
Unix/Linux操作系统提供了一个fork()系统调用它非常特殊。普通的函数调用调用一次返回一次但是fork()调用一次返回两次因为操作系统自动把当前进程称为父进程复制了一份称为子进程然后分别在父进程和子进程内返回。子进程永远返回0而父进程返回子进程的ID。这样做的理由是一个父进程可以fork出很多子进程所以父进程要记下每个子进程的ID而子进程只需要调用getppid()就可以拿到父进程的ID。 import osprint(Process (%s) start... % os.getpid())
# Only works on Unix/Linux/Mac:
pid os.fork()
if pid 0:print(I am child process (%s) and my parent is %s. % (os.getpid(), os.getppid()))
else:print(I (%s) just created a child process (%s). % (os.getpid(), pid))
由于Windows环境没有fork调用为了编写具备跨平台能力的代码建议使用Python提供的multiprocessing模块。
multiprocessing
multiprocessing模块提供了一个Process类来代表一个进程对象。创建子进程时只需要传入一个执行函数和函数的参数创建一个Process实例用start()方法启动这样创建进程比fork()还要简单。join()方法可以等待子进程结束后再继续往下运行通常用于进程间的同步。 from multiprocessing import Process
import os# 子进程要执行的代码
def run_proc(name):print(Run child process %s (%s)... % (name, os.getpid()))if __name____main__:print(Parent process %s. % os.getpid())p Process(targetrun_proc, args(test,))print(Child process will start.)p.start()p.join()print(Child process end.’)
Pool
可以使用进程池的方式创建大量的子进程。对Pool对象调用join()方法会等待所有子进程执行完毕调用join()之前必须先调用close()调用close()之后就不能继续添加新的Process了。 from multiprocessing import Pool
import os, time, randomdef long_time_task(name):print(Run task %s (%s)... % (name, os.getpid()))start time.time()time.sleep(random.random() * 3)end time.time()print(Task %s runs %0.2f seconds. % (name, (end - start)))if __name____main__:print(Parent process %s. % os.getpid())p Pool(4)for i in range(5):p.apply_async(long_time_task, args(i,))print(Waiting for all subprocesses done...)p.close()p.join()print(All subprocesses done.)
进程间通信
Python的multiprocessing模块包装了底层的机制提供了Queue、Pipes等多种方式来交换数据。 from multiprocessing import Process, Queue
import os, time, random# 写数据进程执行的代码:
def write(q):print(Process to write: %s % os.getpid())for value in [A, B, C]:print(Put %s to queue... % value)q.put(value)time.sleep(random.random())# 读数据进程执行的代码:
def read(q):print(Process to read: %s % os.getpid())while True:value q.get(True)print(Get %s from queue. % value)if __name____main__:# 父进程创建Queue并传给各个子进程q Queue()pw Process(targetwrite, args(q,))pr Process(targetread, args(q,))# 启动子进程pw写入:pw.start()# 启动子进程pr读取:pr.start()# 等待pw结束:pw.join()# pr进程里是死循环无法等待其结束只能强行终止:pr.terminate()
多线程
多任务可以由多进程完成也可以由一个进程内的多线程完成。一个进程至少有一个线程。由于线程是操作系统直接支持的执行单元因此高级语言通常都内置多线程的支持Python也不例外并且Python的线程是真正的Posix Thread而不是模拟出来的线程。
Python的标准库提供了两个模块_thread和threading_thread是低级模块threading是高级模块对_thread进行了封装。绝大多数情况下我们只需要使用threading这个高级模块。
由于任何进程默认就会启动一个线程我们把该线程称为主线程主线程又可以启动新的线程Python的threading模块有个current_thread()函数它永远返回当前线程的实例。主线程实例的名字叫MainThread子线程的名字在创建时指定如果不起名字Python就自动给线程命名为Thread-1Thread-2……
Lock
多线程和多进程最大的不同在于多进程中同一个变量各自有一份拷贝存在于每个进程中互不影响而多线程中所有变量都由所有线程共享所以任何一个变量都可以被任何一个线程修改因此线程之间共享数据最大的危险在于多个线程同时改一个变量把内容给改乱了。
要解决上述问题需要通过加锁来解决。创建一个锁就是通过threading.Lock()来实现当多个线程同时执行lock.acquire()时只有一个线程能成功地获取锁然后继续执行代码其他线程就继续等待直到获得锁为止。
锁的好处就是确保了某段关键代码只能由一个线程从头到尾完整地执行坏处当然也很多首先是阻止了多线程并发执行包含锁的某段代码实际上只能以单线程模式执行效率就大大地下降了。其次由于可以存在多个锁不同的线程持有不同的锁并试图获取对方持有的锁时可能会造成死锁导致多个线程全部挂起既不能执行也无法结束只能靠操作系统强制终止。 import time, threading# 假定这是你的银行存款:
balance 0
lock threading.Lock()def run_thread(n):for i in range(100000):# 先要获取锁:lock.acquire()try:# 放心地改吧:change_it(n)finally:# 改完了一定要释放锁:lock.release()def run_thread(n):for i in range(100000):change_it(n)t1 threading.Thread(targetrun_thread, args(5,))
t2 threading.Thread(targetrun_thread, args(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
多核CPU
Python的线程虽然是真正的线程但解释器执行代码时有一个GIL锁Global Interpreter Lock任何Python线程执行前必须先获得GIL锁然后每执行100条字节码解释器就自动释放GIL锁让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁所以多线程在Python中只能交替执行即使100个线程跑在100核CPU上也只能用到1个核。
GIL是Python解释器设计的历史遗留问题通常我们用的解释器是官方实现的CPython要真正利用多核除非重写一个不带GIL的解释器。
ThreadLocal
在多线程环境中每个线程处理数据最好使用局部变量但是需要在不同线程间传递参数的时候会变的很麻烦。ThreadLocal提供了创建与线程名称关联的局部变量功能能。ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接HTTP请求用户身份信息等这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。 import threading# 创建全局ThreadLocal对象:
local_school threading.local()def process_student():# 获取当前线程关联的student:std local_school.studentprint(Hello, %s (in %s) % (std, threading.current_thread().name))def process_thread(name):# 绑定ThreadLocal的student:local_school.student nameprocess_student()t1 threading.Thread(target process_thread, args(Alice,), nameThread-A)
t2 threading.Thread(target process_thread, args(Bob,), nameThread-B)
t1.start()
t2.start()
t1.join()
t2.join()
进程 VS 线程
要实现多任务通常我们会设计Master-Worker模式Master负责分配任务Worker负责执行任务因此多任务环境下通常是一个Master多个Worker。
如果用多进程实现Master-Worker主进程就是Master其他进程就是Worker。
如果用多线程实现Master-Worker主线程就是Master其他线程就是Worker。
多进程模式最大的优点就是稳定性高因为一个子进程崩溃了不会影响主进程和其他子进程。当然主进程挂了所有进程就全挂了但是Master进程只负责分配任务挂掉的概率低著名的Apache最早就是采用多进程模式。
多进程模式的缺点是创建进程的代价大在Unix/Linux系统下用fork调用还行在Windows下创建进程开销巨大。另外操作系统能同时运行的进程数也是有限的在内存和CPU的限制下如果有几千个进程同时运行操作系统连调度都会成问题。
多线程模式通常比多进程快一点但是也快不到哪去而且多线程模式致命的缺点就是任何一个线程挂掉都可能直接造成整个进程崩溃因为所有线程共享进程的内存。在Windows上如果一个线程执行的代码出了问题你经常可以看到这样的提示“该程序执行了非法操作即将关闭”其实往往是某个线程出了问题但是操作系统会强制结束整个进程。
计算密集型 vs. IO密集型
是否采用多任务的第二个考虑是任务的类型。我们可以把任务分为计算密集型和IO密集型。
计算密集型任务的特点是要进行大量的计算消耗CPU资源比如计算圆周率、对视频进行高清解码等等全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成但是任务越多花在任务切换的时间就越多CPU执行任务的效率就越低所以要最高效地利用CPU计算密集型任务同时进行的数量应当等于CPU的核心数。
计算密集型任务由于主要消耗CPU资源因此代码运行效率至关重要。Python这样的脚本语言运行效率很低完全不适合计算密集型任务。对于计算密集型任务最好用C语言编写。
第二种任务的类型是IO密集型涉及到网络、磁盘IO的任务都是IO密集型任务这类任务的特点是CPU消耗很少任务的大部分时间都在等待IO操作完成因为IO的速度远远低于CPU和内存的速度。对于IO密集型任务任务越多CPU效率越高但也有一个限度。常见的大部分任务都是IO密集型任务比如Web应用。
IO密集型任务执行期间99%的时间都花在IO上花在CPU上的时间很少因此用运行速度极快的C语言替换用Python这样运行速度极低的脚本语言完全无法提升运行效率。对于IO密集型任务最合适的语言就是开发效率最高代码量最少的语言脚本语言是首选C语言最差。
异步IO
考虑到CPU和IO之间巨大的速度差异一个任务在执行的过程中大部分时间都在等待IO操作单进程单线程模型会导致别的任务无法并行执行因此我们才需要多进程模型或者多线程模型来支持多任务并发执行。
现代操作系统对IO操作已经做了巨大的改进最大的特点就是支持异步IO。如果充分利用操作系统提供的异步IO支持就可以用单进程单线程模型来执行多任务这种全新的模型称为事件驱动模型Nginx就是支持异步IO的Web服务器它在单核CPU上采用单进程模型就可以高效地支持多任务。在多核CPU上可以运行多个进程数量与CPU核心数相同充分利用多核CPU。由于系统总的进程数量十分有限因此操作系统调度非常高效。用异步IO编程模型来实现多任务是一个主要的趋势。
对应到Python语言单进程的异步编程模型称为协程。
分布式进程
Python的multiprocessing模块不但支持多进程其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者将任务分布到其他多个进程中依靠网络通信。由于managers模块封装很好不必了解网络通信的细节就可以很容易地编写分布式多进程程序。 # task_master.pyimport random, time, queue
from multiprocessing.managers import BaseManager# 发送任务的队列:
task_queue queue.Queue()
# 接收结果的队列:
result_queue queue.Queue()# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):pass# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register(get_task_queue, callablelambda: task_queue)
QueueManager.register(get_result_queue, callablelambda: result_queue)
# 绑定端口5000, 设置验证码abc:
manager QueueManager(address(, 5000), authkeybabc)
# 启动Queue:
manager.start()
# 获得通过网络访问的Queue对象:
task manager.get_task_queue()
result manager.get_result_queue()
# 放几个任务进去:
for i in range(10):n random.randint(0, 10000)print(Put task %d... % n)task.put(n)
# 从result队列读取结果:
print(Try get results...)
for i in range(10):r result.get(timeout10)print(Result: %s % r)
# 关闭:
manager.shutdown()
print(master exit.’) # task_worker.pyimport time, sys, queue
from multiprocessing.managers import BaseManager# 创建类似的QueueManager:
class QueueManager(BaseManager):pass# 由于这个QueueManager只从网络上获取Queue所以注册时只提供名字:
QueueManager.register(get_task_queue)
QueueManager.register(get_result_queue)# 连接到服务器也就是运行task_master.py的机器:
server_addr 127.0.0.1
print(Connect to server %s... % server_addr)
# 端口和验证码注意保持与task_master.py设置的完全一致:
m QueueManager(address(server_addr, 5000), authkeybabc)
# 从网络连接:
m.connect()
# 获取Queue的对象:
task m.get_task_queue()
result m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):try:n task.get(timeout1)print(run task %d * %d... % (n, n))r %d * %d %d % (n, n, n*n)time.sleep(1)result.put(r)except Queue.Empty:print(task queue is empty.)
# 处理结束:
print(worker exit.’)