Browse Source

download status, exception handling

master
valerio 5 years ago
parent
commit
da975628eb
  1. 80
      TestScraper.py
  2. 235
      scraper.py

80
TestScraper.py

@ -1,70 +1,38 @@
#!/usr/bin/env python
# TestCalculatorFunctions.py
# TestScraper.py
import unittest
from scraper import Scraper
from pprint import pprint
class KnownValues(unittest.TestCase):
scraper = Scraper();
scraper = Scraper()
def test_instance(self):
self.assertIsInstance(self.scraper, Scraper)
def test_get_stations(self):
fname = 'rickandmorty.conf'
with open(fname) as fopen:
self.assertTrue(True)
def test_parse_stations(self):
source = '''BARI CENTRALE|S11119
BARI TORRE QUETTA|S11004
BOLOGNA C.LE|S05043'''
stations = self.scraper.parse_stations(source)
self.assertListEqual(stations, [
{'name': 'BARI CENTRALE', 'code': 'S11119'},
{'name': 'BARI TORRE QUETTA', 'code': 'S11004'},
{'name': 'BOLOGNA C.LE', 'code': 'S05043'},
])
for station in stations:
self.assertTrue('name' in station)
self.assertTrue('code' in station)
def test_parse_station(self):
station = 'SAN LEONARDO DI CUTRO|S11827'
expected = {'name': 'SAN LEONARDO DI CUTRO', 'code': 'S11827'}
self.assertDictEqual(self.scraper.parse_station(station), expected)
# def test_can_connect(self):
# scraper = Scraper()
# self.assertEqual(scraper.touch('http://ddg.gg'), 200)
#
# def test_get_page(self):
# scraper = Scraper()
# self.assertEqual(scraper.get_page().status_code, 200)
#
# def test_format_hackerspace(self):
# scraper = Scraper()
# hackerspace = {'name':'pippo'}
# formatted = scraper.format_hackerspace(hackerspace)
# self.assertTrue('url' in formatted)
#
#
# def test_get_hackerspaces(self):
# scraper = Scraper()
# hackerspaces = scraper.get_hackerspaces()
# self.assertGreater(len(hackerspaces), 0)
#
# for hackerspace in hackerspaces:
# self.assertTrue('url' in hackerspace)
#
# def test_convert_text_field_to_hs_url(self):
# scraper = Scraper()
# textfield = '<b><a href="/Freaknet" title="Freaknet">Freaknet</a></b>'
# self.assertEqual(scraper.convert_text_field_to_hs_url(textfield), 'https://wiki.hackerspaces.org/Freaknet')
def test_bytes_to_multiples(self):
self.assertEqual(Scraper.bytes_to_multiples(8302.2049371), '8.3kb')
self.assertEqual(Scraper.bytes_to_multiples(8302.2049371), '8.3kb')
self.assertEqual(Scraper.bytes_to_multiples(10), '10b')
self.assertEqual(Scraper.bytes_to_multiples(10000), '10kb')
self.assertEqual(Scraper.bytes_to_multiples(20000), '20kb')
self.assertEqual(Scraper.bytes_to_multiples(10000000), '10mb')
self.assertEqual(Scraper.bytes_to_multiples(1000000000), '1gb')
self.assertEqual(Scraper.bytes_to_multiples(1250000000), '1.25gb')
self.assertEqual(Scraper.bytes_to_multiples(160000000), '160mb')
def test_get_speed_string(self):
self.assertEqual(Scraper.get_speed_string(768000), '768kb/s')
def test_get_completion(self):
self.assertEqual(Scraper.get_completion(10, 100), '10% - 10b/100b')
self.assertEqual(Scraper.get_completion(3, 10), '30% - 3b/10b')
self.assertEqual(Scraper.get_completion(24, 289), '8.3% - 24b/289b')
def test_calc_res(self):
self.assertEqual(Scraper.calc_res('800x600'), 800 * 600)
self.assertEqual(Scraper.calc_res('1920x1024'), 1920 * 1024)
if __name__ == '__main__':

235
scraper.py

@ -1,108 +1,177 @@
import requests, json, ConfigParser, os
from bs4 import BeautifulSoup
from pprint import pprint
#!/usr/bin/env python
import requests
import ConfigParser
import time
from bs4 import BeautifulSoup
class Scraper():
# url = 'http://www.viaggiatreno.it/viaggiatrenonew/resteasy/viaggiatreno/partenze/S01480/Tue%20Oct%2011%202017%2008:30:00%20GMT+0200%20(CEST)'
def __init__(self):
config = ConfigParser.RawConfigParser()
config.read('rickandmorty.conf')
class Scraper:
name = 'Rick and Morty py'
author = 'NotIsSet'
license = 'GPLv2'
year = '2017'
version = '0.0.1'
out_buffer = ''
out_buffer_time = 0
out_buffer_refresh_threshold = 0
@staticmethod
def bytes_to_multiples(b):
divisor = 1
unit = 'b'
if b >= 1000000000:
unit = 'gb'
divisor = 1000000000
elif b >= 1000000:
unit = 'mb'
divisor = 1000000
elif b >= 1000:
unit = 'kb'
divisor = 1000
_ret = round((float(b) / divisor), 2)
if _ret - int(_ret) == 0.0:
_ret = int(_ret)
_ret = str(_ret) + unit
return _ret
# getfloat() raises an exception if the value is not a float
# getint() and getboolean() also do this for their respective types
self.url = config.get('Main', 'url')
@staticmethod
def get_speed_string(bps):
return Scraper.bytes_to_multiples(bps) + '/s'
pprint(self.url)
@staticmethod
def get_completion(written, total):
out_percentage = round(((float(written * 100)) / total), 2)
if out_percentage - int(out_percentage) == 0.0:
out_percentage = int(out_percentage)
# config.read(['site.cfg', os.path.expanduser('~/.rickandmorty.conf')])
self.load_page()
pass
return '%s%% - %s/%s' % (
out_percentage,
Scraper.bytes_to_multiples(written),
Scraper.bytes_to_multiples(total)
)
def calc_res(self, resolution):
@staticmethod
def calc_res(resolution):
if resolution is None:
return 0
vals = resolution.split('x')
if (len(vals) < 2):
factors = resolution.split('x')
if len(factors) < 2:
return 0
pprint(vals)
_ret = int(vals[0]) * int(vals[1])
_ret = int(factors[0]) * int(factors[1])
return _ret
def load_page(self):
url = self.url % (1, 1)
r = requests.get(url)
# pprint(content)
pprint(url)
@staticmethod
def get_top_resolution(resolutions):
top_res = 0
_ret = resolutions[0][0]
for video in resolutions:
if Scraper.calc_res(video[1]) > top_res:
top_res = Scraper.calc_res(video[1])
_ret = video[0]
return _ret
def buffer_out(self, string, concatenate=True):
current_time = int(round(time.time() * 1000))
out = self.out_buffer + '\n' + string
if concatenate:
self.out_buffer = out
if current_time - self.out_buffer_time > self.out_buffer_refresh_threshold:
self.out_buffer_time = current_time
print '\x1b[2J' + out
def __init__(self):
self.info_header = '\n'.join([
'name: ' + self.name,
'author: ' + self.author,
'license: ' + self.license,
'year: ' + self.year,
'version: ' + self.version,
])
config = ConfigParser.RawConfigParser()
config.read('rickandmorty.conf')
self.url = config.get('Main', 'url')
self.headers = requests.utils.default_headers()
self.headers.update({"User-Agent": "Mozilla/5.0"})
def get_episode_url(self, season, episode):
url = self.url % (season, episode)
r = requests.get(url, headers=self.headers)
# self.buffer_out(url)
soup = BeautifulSoup(r.text, 'html.parser')
player = soup.find(id="player")
frameUrl = player.get('data-src').strip()
r = requests.get(frameUrl)
# pprint(content)
pprint(url)
if player is None:
return None
frame_url = player.get('data-src')
if player is None:
return None
frame_url = frame_url.strip()
# self.buffer_out(frame_url)
r = requests.get(frame_url, headers=self.headers)
soup = BeautifulSoup(r.text, 'html.parser')
videoResolutions = [x.get('res') for x in soup.find_all(name="source")]
videoSources = [x.get('src') for x in soup.find_all(name="source")]
pprint(videoSources)
pprint(videoResolutions)
videoUrls = zip(videoSources, videoResolutions)
topRes = 0
curTop = videoUrls[0][0]
for video in videoUrls:
if (self.calc_res(video[1]) > topRes):
topRes = self.calc_res(video[1])
curTop = video[0]
url = curTop
# frameUrl = player.get('data-src').strip()
print "downloading with requests"
local_filename = "s01e01.mp4"
# NOTE the stream=True parameter
r = requests.get(url, stream=True)
with open(local_filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
print('writing chunk...\n')
f.write(chunk)
# f.flush() commented by recommendation from J.F.Sebastian
# r = requests.get(url)
# with open("s01e01.mp4", "wb") as code:
# code.write(r.content)
# pprint(url)
# pprint(frameUrl)
# iframe#player
video_resolutions = [x.get('res') for x in soup.find_all(name="source")]
video_sources = [x.get('src') for x in soup.find_all(name="source")]
def parse_stations(self, stations):
_ret = []
for station in stations.split('\n'):
if len(station) > 0:
_ret.append(self.parse_station(station))
return _ret
video_urls = zip(video_sources, video_resolutions)
return Scraper.get_top_resolution(video_urls)
def parse_station(self, station):
stat = station.split('|')
return {
'name': stat[0].strip(),
'code': stat[1].strip()
}
def download_file(self, url, destination):
r = requests.get(url, stream=True, headers=self.headers)
written_chunks = 0
last_time = time.time()
def find_stations(self, station_name, stations):
_ret = []
for station in stations:
if station_name.lower() in station['name'].lower():
_ret.append(station)
return _ret
with open(destination, 'wb') as f:
total_length = int(r.headers['Content-Length'])
speed_buffer = []
speed_buffer_size = 1000
for chunk in r.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
time_diff = time.time() - last_time
last_time = time.time()
speed_buffer.append(float(1024 / time_diff / 10))
if len(speed_buffer) > speed_buffer_size:
speed_buffer.pop(0)
f.write(chunk)
written_chunks += 1
completion = Scraper.get_completion(written_chunks * 1000, total_length / 1024 * 1000)
speed = Scraper.get_speed_string(float(sum(speed_buffer)) / speed_buffer_size)
if written_chunks % 20 == 0:
self.buffer_out(
'Download in progress...\n%s - %s\n' % (
completion, speed),
False)
self.buffer_out('Download complete')
def run(self, download=False):
self.buffer_out(self.info_header)
self.buffer_out(self.url)
for season in range(1, 4):
for episode in range(1, 12):
ep_url = self.get_episode_url(season, episode)
if ep_url is None:
self.buffer_out('season %s episode %s - WARN could not retrieve url' % (season, episode))
continue
self.buffer_out('season %s episode %s - url: %s' % (season, episode, ep_url))
if download:
destination = "s%se%s.mp4" % (season, episode)
self.download_file(ep_url, destination)
pass
if __name__ == '__main__':
scraper = Scraper()
stations = scraper.get_stations('elenco_stazioni.txt')
pprint(stations)
scraper.run()

Loading…
Cancel
Save