Code source de pywws.Tasks

#!/usr/bin/env python

# pywws - Python software for USB Wireless Weather Stations
# Copyright (C) 2008-13  Jim Easterbrook

# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

"""Routines to perform common tasks such as plotting gaphs or uploading files."""

from collections import deque
from datetime import datetime, timedelta
import logging
import os
import Queue
import shutil
import threading

from pywws.calib import Calib
from pywws import Plot
from pywws import Template
from pywws.TimeZone import Local
from pywws.toservice import ToService
from pywws import Upload
from pywws import WindRose
from pywws import YoWindow

[docs]class RegularTasks(object): def __init__(self, params, status, calib_data, hourly_data, daily_data, monthly_data, asynch=False): self.logger = logging.getLogger('pywws.Tasks.RegularTasks') self.params = params self.status = status self.calib_data = calib_data self.hourly_data = hourly_data self.daily_data = daily_data self.monthly_data = monthly_data self.asynch = asynch # get directories self.work_dir = self.params.get('paths', 'work', '/tmp/weather') self.template_dir = self.params.get( 'paths', 'templates', os.path.expanduser('~/weather/templates/')) self.graph_template_dir = self.params.get( 'paths', 'graph_templates', os.path.expanduser('~/weather/graph_templates/')) self.local_dir = self.params.get( 'paths', 'local_files', os.path.expanduser('~/weather/results/')) # create calibration object self.calibrator = Calib(self.params, self.status) # create templater object self.templater = Template.Template( self.params, self.status, self.calib_data, self.hourly_data, self.daily_data, self.monthly_data) # create plotter objects self.plotter = Plot.GraphPlotter( self.params, self.status, self.calib_data, self.hourly_data, self.daily_data, self.monthly_data, self.work_dir) self.roseplotter = WindRose.RosePlotter( self.params, self.status, self.calib_data, self.hourly_data, self.daily_data, self.monthly_data, self.work_dir) # create FTP uploader object self.uploader = Upload.Upload(self.params) self.uploads_directory = os.path.join(self.work_dir, 'uploads') if not os.path.isdir(self.uploads_directory): os.makedirs(self.uploads_directory) # delay creation of a Twitter object until we know it's needed self.twitter = None # create a YoWindow object self.yowindow = YoWindow.YoWindow(self.calib_data) # get local time's offset from UTC, without DST now = self.calib_data.before(datetime.max) if not now: now = datetime.utcnow() time_offset = Local.utcoffset(now) - Local.dst(now) # get daytime end hour, in UTC self.day_end_hour = eval(params.get('config', 'day end hour', '21')) self.day_end_hour = (self.day_end_hour - (time_offset.seconds // 3600)) % 24 # create service uploader objects = {} for section in ('live', 'logged', 'hourly', '12 hourly', 'daily'): for name in eval(self.params.get(section, 'services', '[]')): if name not in[name] = ToService( self.params, self.status, self.calib_data, service_name=name) if self.asynch: self.service_start = {} self.service_queued = {} for name in self.service_start[name] =[name].catchup_start() self.service_queued[name] = 0 # start asynchronous thread to do uploads if self.asynch:'Starting asynchronous thread') self.uploads_lock = threading.Lock() self.to_thread = Queue.Queue() self.from_thread = Queue.Queue() self.thread = threading.Thread(target=self._asynch_thread) self.thread.start()
[docs] def stop_thread(self): if not self.asynch: return self.to_thread.put(('shutdown',)) self.thread.join()
def _asynch_thread(self): service_queue = {} uploads_pending = True tweet_queue = deque() running = True try: while running: timeout = 600 while True: try: command = self.to_thread.get(timeout=timeout) except Queue.Empty: break if command[0] == 'shutdown': running = False break elif command[0] == 'upload': uploads_pending = True elif command[0] == 'twitter': tweet_queue.append(command[1]) elif command[0] == 'service': name, timestamp, coded_data = command[1:] if name not in service_queue: service_queue[name] = deque() service_queue[name].append((timestamp, coded_data)) timeout = 5 self.logger.debug('Doing asynchronous tasks') while tweet_queue: tweet = tweet_queue[0]"Tweeting") if self.twitter.Upload(tweet): tweet_queue.popleft() for name in service_queue: count = 0 while service_queue[name]: timestamp, coded_data = service_queue[name][0] if (len(service_queue[name]) > 1 and[name].catchup <= 0): # don't send 'catchup' records pass elif[name].send_data(coded_data): count += 1 else: break self.from_thread.put(('service', name, timestamp)) service_queue[name].popleft() if count > 0:[name]'%d records sent', count) if uploads_pending: self.uploads_lock.acquire() if self._do_uploads(): uploads_pending = False self.uploads_lock.release() except Exception, ex: self.logger.exception(ex)
[docs] def has_live_tasks(self): yowindow_file = self.params.get('live', 'yowindow', '') if yowindow_file: return True for template in eval(self.params.get('live', 'twitter', '[]')): return True for name in eval(self.params.get('live', 'services', '[]')): return True for template in eval(self.params.get('live', 'plot', '[]')): return True for template in eval(self.params.get('live', 'text', '[]')): return True return False
def _parse_templates(self, section, option): for template in eval(self.params.get(section, option, '[]')): if isinstance(template, (list, tuple)): yield template else: yield template, '' def _do_common(self, sections, live_data=None): OK = True for section in sections: yowindow_file = self.params.get(section, 'yowindow', '') if yowindow_file: self.yowindow.write_file(yowindow_file) break for section in sections: for template, flags in self._parse_templates(section, 'twitter'): if not self.do_twitter(template): OK = False uploads = [] local_files = [] service_done = [] for section in sections: for name in eval(self.params.get(section, 'services', '[]')): if name not in service_done: self._do_service(name, live_data) service_done.append(name) for template, flags in self._parse_templates(section, 'text'): if 'T' in flags: if not self.do_twitter(template, live_data): OK = False continue upload = self.do_template(template, live_data) if 'L' in flags: if upload not in local_files: local_files.append(upload) elif upload not in uploads: uploads.append(upload) for template, flags in self._parse_templates(section, 'plot'): upload = self.do_plot(template) if not upload: continue if 'L' in flags: if upload not in local_files: local_files.append(upload) elif upload not in uploads: uploads.append(upload) if local_files: if not os.path.isdir(self.local_dir): os.makedirs(self.local_dir) for file in local_files: shutil.move(file, self.local_dir) if uploads: if self.asynch: if not self.thread.isAlive(): raise RuntimeError('upload thread has terminated') self.uploads_lock.acquire() for file in uploads: shutil.move(file, self.uploads_directory) if self.asynch: self.uploads_lock.release() self.to_thread.put(('upload',)) if not self.asynch: self._do_uploads() if self.asynch: # process replies from thread while True: try: message = self.from_thread.get_nowait() except Queue.Empty: break if message[0] == 'service': name, timestamp = message[1:] self.status.set('last update', name, timestamp) self.service_queued[name] -= 1 return OK
[docs] def do_live(self, data): return self._do_common(['live'], self.calibrator.calib(data))
[docs] def do_tasks(self): sections = ['logged'] self.params.unset('logged', 'last update') now = self.calib_data.before(datetime.max) if now: now += timedelta(minutes=self.calib_data[now]['delay']) else: now = datetime.utcnow() threshold = now.replace(minute=0, second=0, microsecond=0) last_update = self.params.get_datetime('hourly', 'last update') if last_update: self.params.unset('hourly', 'last update') self.status.set('last update', 'hourly', last_update.isoformat(' ')) last_update = self.status.get_datetime('last update', 'hourly') if (not last_update) or (last_update < threshold): # time to do hourly tasks sections.append('hourly') # set 12 hourly threshold threshold -= timedelta(hours=(threshold.hour - self.day_end_hour) % 12) last_update = self.params.get_datetime('12 hourly', 'last update') if last_update: self.params.unset('12 hourly', 'last update') self.status.set('last update', '12 hourly', last_update.isoformat(' ')) last_update = self.status.get_datetime('last update', '12 hourly') if (not last_update) or (last_update < threshold): # time to do 12 hourly tasks sections.append('12 hourly') # set daily threshold threshold -= timedelta(hours=(threshold.hour - self.day_end_hour) % 24) last_update = self.params.get_datetime('daily', 'last update') if last_update: self.params.unset('daily', 'last update') self.status.set('last update', 'daily', last_update.isoformat(' ')) last_update = self.status.get_datetime('last update', 'daily') if (not last_update) or (last_update < threshold): # time to do daily tasks sections.append('daily') OK = self._do_common(sections) if OK: for section in sections: self.status.set('last update', section, now.isoformat(' ')) if 'hourly' in sections: # save any unsaved data self.params.flush() self.status.flush() self.calib_data.flush() self.hourly_data.flush() self.daily_data.flush() self.monthly_data.flush() return OK
def _do_uploads(self): uploads = [] for name in os.listdir(self.uploads_directory): path = os.path.join(self.uploads_directory, name) if os.path.isfile(path): uploads.append(path) if not uploads: return True OK = True self.uploader.connect() for path in uploads: if self.uploader.upload_file(path): os.unlink(path) else: OK = False self.uploader.disconnect() return OK def _do_service(self, name, live_data): service =[name] if self.asynch: for data in service.next_data(self.service_start[name], live_data): if self.service_queued[name] >= 50: break coded_data = service.encode_data(data) if not coded_data: continue self.to_thread.put( ('service', name, data['idx'].isoformat(' '), coded_data)) self.service_start[name] = data['idx'] + timedelta(seconds=30) self.service_queued[name] += 1 else: service.Upload(live_data is None, live_data)
[docs] def do_twitter(self, template, data=None): if not self.twitter: from pywws import ToTwitter self.twitter = ToTwitter.ToTwitter(self.params)"Templating %s", template) input_file = os.path.join(self.template_dir, template) tweet = self.templater.make_text(input_file, live_data=data)[:140] if self.asynch: self.to_thread.put(('twitter', tweet)) return True else:"Tweeting") return self.twitter.Upload(tweet)
[docs] def do_plot(self, template):"Graphing %s", template) input_file = os.path.join(self.graph_template_dir, template) output_file = os.path.join(self.work_dir, os.path.splitext(template)[0]) if self.plotter.DoPlot(input_file, output_file) == 0: return output_file elif self.roseplotter.DoPlot(input_file, output_file) == 0: return output_file return None
[docs] def do_template(self, template, data=None):"Templating %s", template) input_file = os.path.join(self.template_dir, template) output_file = os.path.join(self.work_dir, template) self.templater.make_file(input_file, output_file, live_data=data) return output_file