防损项目经常要采集图片用于训练。
利用脚本可便捷完成从数据库中下载图片,只需要提供sql语句,可以直接运行脚本将图片下载到本地,或者按照商品code分类下载到不同文件夹。
一共三个脚本和一个文本文件:
具体操作: 在sql.txt
文件中写入查询语句,然后分类下载图片.py
和不分类下载图片.py
两个脚本根据需求选择其中一个执行即可。
关于download_urls.py
:
此文件是作为模块被另外两个脚本导入执行。
此文件作为脚本单独执行的结果是新建文本文件urls.txt
,将sql查询结果写入其中,一般不需要单独执行。
download_urls.py 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 import pymysqlurl_file = 'urls.txt' with open ("sql.txt" , 'r' ) as s: data = s.read() query = data def connect_to_mysql (): try : conn = pymysql.connect( host='xxxx' , database='xxxx' , user='xxxx' , password='xxxx' ) print ("成功连接到MySQL数据库" ) return conn except pymysql.Error as error: print ("连接MySQL数据库失败: {}" .format (error)) return None def execute_query_write_to_file (conn, query, filename ): try : cursor = conn.cursor() cursor.execute(query) rows = cursor.fetchall() with open (filename, 'w' ) as file: for row in rows: line = '\t' .join(str (col) for col in row) + '\n' file.write(line) print ("查询结果已写入文件:{}" .format (filename)) except pymysql.Error as error: print ("执行查询语句失败: {}" .format (error)) def delete_empty_lines (url_file ): with open (url_file, "r" ) as file: lines = file.readlines() lines = [line for line in lines if line.strip() and line.startswith("http" )] with open (url_file, "w" ) as file: file.write("" .join(lines)) def main (): conn = connect_to_mysql() if not conn: return execute_query_write_to_file(conn, query, url_file) conn.close() delete_empty_lines(url_file) if __name__ == '__main__' : main()
分类下载图片.py 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 import osimport asyncioimport aiohttpimport timeimport download_urlsstart = time.time() url_file = download_urls.url_file save_dir = './images/' if not os.path.exists(save_dir): os.makedirs(save_dir) async def download_image (session, url, code ): try : async with session.get(url) as resp: if not os.path.exists(save_dir+code): os.makedirs(save_dir+code) dir_path, filename = os.path.split(url) download_path = os.path.join('images' , code, filename) with open (download_path, 'wb' ) as f: while True : chunk = await resp.content.read(1024 * 1024 ) if not chunk: break f.write(chunk) print (f'{url} 下载完成!' ) except Exception as e: print (f'{url} 下载失败!异常信息:{str (e)} ' ) async def main (): async with aiohttp.ClientSession() as session: with open (url_file, 'r' ) as f: lines = f.readlines() tasks = [download_image(session, line.strip().split("\t" )[0 ], line.strip().split("\t" )[1 ]) for line in lines] await asyncio.gather(*tasks) download_urls.main() asyncio.run(main()) print ('脚本执行结束,总耗时:{}秒,请退出!' .format (time.time() - start))time.sleep(30 )
不分类下载图片.py 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 import asyncioimport osimport timeimport aiohttpimport download_urlsstart = time.time() url_file = download_urls.url_file save_dir = './images/' if not os.path.exists(save_dir): os.makedirs(save_dir) async def download_image (session, url ): try : async with session.get(url) as resp: dir_path, dir_name = os.path.split(url) save_path = os.path.join(save_dir + dir_name) with open (save_path, 'wb' ) as f: while True : chunk = await resp.content.read(1024 * 1024 ) if not chunk: break f.write(chunk) print (f'{url} 下载完成!' ) except Exception as e: print (f'{url} 下载失败!异常信息:{str (e)} ' ) async def main (): async with aiohttp.ClientSession() as session: with open (url_file, 'r' ) as f: lines = f.readlines() tasks = [download_image(session, line.strip().split("\t" )[0 ]) for line in lines] await asyncio.gather(*tasks) download_urls.main() asyncio.run(main()) print ('脚本执行结束,总耗时:{}秒,请退出!' .format (time.time() - start))time.sleep(30 )
新的需求 不分类下载图片,但是每张图片都有一个对应的json,需要将每张图片的json写到一个txt文件中,并且json文本的文件名需要与图片相同。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 import osimport asyncioimport aiohttpimport timeimport download_urlsstart = time.time() url_file = download_urls.url_file save_dir = './images/' if not os.path.exists(save_dir): os.makedirs(save_dir) if not os.path.exists('./labels/' ): os.makedirs('./labels/' ) async def download_image (session, url, json ): try : async with session.get(url) as resp: dir_path, file_name = os.path.split(url) save_path = os.path.join(save_dir + file_name) with open ("./labels/{}.txt" .format (file_name), "w" ) as file: file.write(json) print ("json已成功写入文件" ) with open (save_path, 'wb' ) as f: while True : chunk = await resp.content.read(1024 * 1024 ) if not chunk: break f.write(chunk) print ('图片下载完成' ) except Exception as e: print (f'{url} 下载失败!异常信息:{str (e)} ' ) async def main (): async with aiohttp.ClientSession() as session: with open (url_file, 'r' ) as f: lines = f.readlines() tasks = [download_image(session, line.strip().split("\t" )[0 ], line.strip().split("\t" )[1 ]) for line in lines] await asyncio.gather(*tasks) download_urls.main() asyncio.run(main()) print ('脚本执行结束,总耗时:{}秒,请退出!' .format (time.time() - start))time.sleep(30 )