Projet: Utiliser blitzortung.org pour annoncer l’arrivée d’un orage.
1.Packets
1 |
sudo apt-get install python python-websocket python-requests |
Pour utiliser pico2wav
1 |
sudo apt-get install libttspico-utils |
Pour utiliser espeak
1 |
sudo apt-get install espeak |
2.Coordonnées de la zone à surveiller
Pour définir la zone: http://www.mapcoordinates.net/fr
1 |
AreaBlitz = '{"west":longitude_gauche,"east":longitude_droite,"north":latitude_haut,"south":latitude_bas}' |
3.Le script
1 2 3 |
sudo mkdir /etc/rfr sudo nano /etc/rfr/blitz.py sudo chmod 755 /etc/rfr/blitz.py |
Ajouter dans le fichier:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
#!/usr/bin/env python # -*- coding: utf-8 -*- from __future__ import unicode_literals #http://maps.googleapis.com/maps/api/geocode/json?latlng=48.31334123,7.59962082&sensor=true #AreaBlitz = '{"west":-12,"east":20,"north":56,"south":33.6}' #europe AreaBlitz = '{"west":6.5,"east":8.3,"north":49,"south":47.3}' #alsace import os import random import websocket import thread import threading import time import json import requests import logging logging.getLogger('requests').setLevel(logging.CRITICAL) import sys reload(sys) sys.setdefaultencoding('utf-8') #********Variables à adapter************** interval_signalisation = 120 #interval de signalisation des lieux d'impact alsadevice = "dmix:CARD=GWloop,DEV=0" #device sur lequel lire le son #**************************************** loctospeech = [] def rreplace(s, old, new, occurrence): li = s.rsplit(old, occurrence) return new.join(li) def MyTimer(tempo = interval_signalisation): threading.Timer(tempo, MyTimer, [tempo]).start() global loctospeech if len(loctospeech) != 0: listevi = ", ".join(loctospeech) listevi = rreplace(listevi, ', ', ' et ', 1) print "Orage détécté à " + listevi #os.system("espeak '\n \n Attention, Orage détécté à "+listevi+"' --stdout | aplay -D 'dmix:CARD=TWebsdr630m,DEV=1'") #voir aplay -L os.system("pico2wave -l fr-FR -w /dev/shm/blitz.wav '\n \n Attention, Orage détecté à " + listevi + "';") os.system("sox /dev/shm/blitz.wav -G -b 16 -s -c 1 -r 48k -t raw - gain +3 | aplay -B 500000 -q -N -f S16_LE -r 48000 -c 1 -D "+alsadevice+" - ") #os.system("aplay --device="+alsadevice+" /dev/shm/test.wav &") loctospeech = [] MyTimer(interval_signalisation) def on_message(ws, message): #print message #json_str = json.dumps(message) data = json.loads(message) print(data['time']) print(str(data['lat'])+","+str(data['lon'])) town = get_town(data['lon'],data['lat']) print(town) if town['nomvilage']: try: loctospeech.index(town['nomvilage']) except ValueError: loctospeech.append(town['nomvilage']) def get_town(longitude, latitude): response = requests.get('http://maps.googleapis.com/maps/api/geocode/json?language=en&latlng='+str(latitude)+','+str(longitude)+'&sensor=false') geodata = json.loads(response.text.encode("utf-8")) loc = {'nomvilage':None,'nomroute':None} if geodata['status'] == 'OK': for itera in geodata['results'][0]['address_components']: if itera['types'][0] == 'route': if itera['long_name'] != "Unnamed Road": loc['nomroute'] = itera['long_name'].encode('utf-8') if itera['types'][0] == 'locality': loc['nomvilage'] = itera['long_name'].encode('utf-8') return loc def on_error(ws, error): print error def on_close(ws): print "### closed ###" def on_open(ws): def run(*args): time.sleep(1) ws.send(AreaBlitz) thread.start_new_thread(run, ()) if __name__ == "__main__": websocket.enableTrace(True) port = random.randint(8050,8090) ws = websocket.WebSocketApp("ws://ws.blitzortung.org:" + str(port), on_message = on_message, on_error = on_error, on_close = on_close) ws.on_open = on_open ws.run_forever() |
4.Lancement au démarage
1 |
sudo nano /etc/rc.local |
Ajouter:
1 |
/etc/rfr/blitz.py & |