Posts Tagged ‘python’
Automate build and load of Hugo sites to Amazon S3 using Rclone with Python
This was the build and load flow for my Hugo site before I gave up on Hugo and moved back to WordPress. The site was generated using Hugo and pushed to Amazon s3 using rclone.
The code assumes the hugo and rclone executables, and the hugo root directory are in the same folder.
In this example the hugo directory is called codepearls.xyzio.com.
import os, shutil # s3 auth info s3SecretAccessKey = 's3_secret_key' s3AccessKeyId = 's3_access_key' # Hugo folder and bucket name. sitepath is path to the hugo public folder. path = 'codepearls.xyzio.com' sitepath = os.path.join(path, 'public') #Remove the files from the previous build by deleting the public folder if os.path.exists(sitepath): shutil.rmtree(sitepath) # Run hugo from root directory cmd = 'hugo.exe -s ' + path os.system(cmd) # Run rclone from root directory to sync /public to s3 and the appropriate args to encrypt the files and enable public read cmd = 'rclone.exe sync ' + sitepath + ' s3:' + path cmd += ' -v --s3-secret-access-key ' + s3SecretAccessKey cmd += ' --s3-access-key-id ' + s3AccessKeyId cmd += ' --s3-acl public-read' cmd += ' --s3-server-side-encryption AES256' cmd += ' --s3-provider AWS' cmd += ' --s3-region us-west-2' os.system(cmd)
Open a zip archive and iterate through its files with Python
Open a file compressed as .zip and iterate through its files line by line with Python.
import zipfile #Path to the zip file zipfilepath = 'path_to_zip_file' #Read in zip file zip = zipfile.ZipFile(zipfilepath) #Iterate through files in zip file for zipfilename in zip.filelist: #Read contents of the file filecontents = zip.read(zipfilename) #Break up contents into list and process for line in filecontents.replace('\r\n', '\n').split('\n'): print line
Open and print a .gz.bz2 file with Python
Open and print a file that is compressed with gzip and then with bzip2, i.e a gz.bz2 file, with Python.
import sys import bz2 import gzip from cStringIO import StringIO # .gz.bz2 File is given as commandline argument filename = sys.argv[1] gzbzfilename = filename #Read in the bz2 data o = open(gzbzfilename, 'rb') gzbzdata = o.read() #Decompress the bz2 data gzdata = bz2.decompress(gzbzdata) #Next, gunzip the gzip file and read out the file pointer f = gzip.GzipFile(fileobj=StringIO(gzdata)) file_content = f.read() f.close() #Print out the file contents print file_content
Convert magnet link to torrent file with Python and add additional trackers
With rtorrent it is easier to have it pick up torrent files from a directory. This code converts a magnet link into a torrent file with Python and then inserts additional trackers to the torrent file.
import urllib import HTMLParser #This is the magnet link to modify magnetLink = 'magnet:?xt=urn:btih:b54a3ba68fd398ed019e21290beecc9dda64a858&dn=wikipedia_en_all_novid_2018-06.zim&tr=udp%3a%2f%2ftracker.mg64.net' #Here is a list of trackers to insert into the torrent trackers = {'list','of','trackers'} #Convert HTML special characters to escaped normal characters magnetLink = urllib.unquote(magnetLink).decode('utf8') magnetLink = HTMLParser.HTMLParser().unescape(magnetLink) #Split out the trackers from the magnet link and save for later magnetsplit = magnetLink.split('&tr=') base = magnetsplit[0] magnetTrackers = magnetsplit[1::] #Add the trackers from the magnet link to our list of trackers for magnetTracker in magnetTrackers: trackers.add(magnetTracker) #Add the trackers magnetLink = base for tracker in trackers: magnetLink += '&tr=' + tracker #Create the torrent file name - it is named after the magnet hash magnetName = magnetLink[magnetLink.find("btih:") + 1:magnetLink.find("&")] magnetName = magnetName.replace('tih:','') torrentfilename = 'meta-' + magnetName + '.torrent' #Write the magnet link to the torrent file with open(torrentfilename, 'w') as o: linkstr = u'd10:magnet-uri' + str(len(magnetLink)) + u':' + magnetLink + u'e' linkstr = linkstr.encode('utf8') o.write(linkstr)
Authenticate with BackBlaze B2 and list buckets with Python
Authenticate with BackBlaze B2 and list buckets with Python.
First to authenticate with BackBlaze B2:
import requests from requests.auth import HTTPBasicAuth #Credentials from BackBlaze B2 manager key_id = 'b2_key_id' application_key = 'b2_application_key' #Contact authorization server and check response path = 'https://api.backblazeb2.com/b2api/v1/b2_authorize_account' result = requests.get(path, auth=HTTPBasicAuth(key_id, application_key)) if result.status_code != 200: print 'Error - Could not connect to BackBlaze B2' exit() #Get results and add authorization token to headers result_json = result.json() account_id = result_json['accountId'] auth_token = result_json['authorizationToken'] api_url = result_json['apiUrl'] + '/b2api/v1' api_session = requests.Session() api_session.headers.update({ 'Authorization': auth_token })
Now use api_session to interact with B2. For example, download the list of buckets like this:
#Construct the API url url = api_url + '/b2_list_buckets' #Use the api_session parameter to get a list of buckets bucketInfo = api_session.post(url, json={'accountId': account_id}) #Convert datastream to json jsonBucketInfo = bucketInfo.json() #List id and name for bucket in jsonBucketInfo['buckets']: bucketId = bucket['bucketId'] bucketName = bucket['bucketName'] print bucketId, bucketName
More details about tokens in the json are on BackBlaze’s list_buckets API page.
Authenticate with BackBlaze B2 and get file URLs using Python
Authenticate with BackBlaze B2 and get the URLs of files in a bucket.
First to authenticate with BackBlaze B2:
import requests from requests.auth import HTTPBasicAuth #Auth information from Backblaze key_id = 'key_id' application_key = 'application_key' #Authenticate path = 'https://api.backblazeb2.com/b2api/v1/b2_authorize_account' result = requests.get(path, auth=HTTPBasicAuth(key_id, application_key)) if result.status_code != 200: print 'Error - Could not connect to BackBlaze B2' exit() #Read response result_json = result.json() account_id = result_json['accountId'] auth_token = result_json['authorizationToken'] api_url = result_json['apiUrl'] + '/b2api/v1' download_url = result_json['downloadUrl'] + '/file/' api_session = requests.Session() api_session.headers.update({ 'Authorization': auth_token })
Now get bucket contents and assemble URL. Sample code for bucket ID and bucket name is here.
#Initialize bucketId = 'bucket_id_from_b2' bucketName = 'name_of_bucket' params = {'bucketId': bucketId} urls = set() #Loop for as long as a nextFile exists while True: #Construct api call, execute, and read back information url = api_url + '/' + 'b2_list_file_names' fileList = api_session.post(url, json=params) jFileList = fileList.json() #Loop through files and construct url for file in jFileList['files']: urls.add(download_url + bucketName + '/' + file['filename']) #Check for next file and break if it doesn't exist startFileName = jFileList['nextFileName'] if startFileName == None: break else: #continue If next file exists params['startFileName'] = startFileName
Hacking Planet Atwood with Python and AWS
Peter Atwood is a hobbyist who creates limited edition pocket tools and puts them up for sale on his site at Planet Pocket Tool. His tools are fairly popular and being limited are hard to acquire.
Peter posts the sales randomly and the tools generally sell out within a few minutes of being listed. The best way to capture a sale is to periodically check his site and get alerted when a sale is in progress. This write-up is about automating the check and sending an alert via SMS and email using Python.
Blogspot publishes a RSS feed for their blogs. I wrote a simple function to use feedparser to grab the datetime of the first item in the feed and compare it against the previous version. I send out an alert if the datetime of the first item is different from the one I have saved.
#Get the blog entry feed = feedparser.parse('http://atwoodknives.blogspot.com/feeds/posts/default') #Figure out the publish time of the first entry firstEntryPubTime = time.strftime('%Y%m%d%H%M%S', feed.entries[0].published_parsed) #The newest post time did not match the previously saved post time #We have a new post if firstEntryPubTime != currentUpdateTime: #Save the new first blog entry time setCurrentUpdateTime(firstEntryPubTime) url = feed.entries[0].link subject = 'Atwood: ' + feed.entries[0].title body = url + feed.entries[0].summary #Send an alert send_alert(subject, body, url) else: print "No Update"
Now, to send the alert we leverage Amazon Web Services and BOTO the AWS Python interface. Amazon has a service called Simple Notification Service or SNS. SNS is a push service that lets users push messages in various formats like SMS and email.
Getting started is simple. First create a topic to which people can subscribe using create_topic. Then subscribe your phone number, email address, and any other form of communication using subscribe. Now you are all set.
def send_alert(message_title, message_body, message_url): #Connect with boto using the AWS token and secret key c = boto.connect_sns('token','secret_key') topicarn = "arn:aws:sns:us-east-1:TopicName" #Publish or send out the URL of the blog post for quick clicking publication = c.publish(topicarn, url, subject=url[:110]) #Close connection c.close()
I set up this script on a server at DigitalOcean and ran it periodically using cron. I was able to get to the buy link for most of Atwood’s sales with this methodology and eventually bought a Fancy Ti Atwrench. While nice and well made, it is definitely not worth what Peter Atwood charges for it.
Driving Adafruit’s RGB Matrix with Python on the BeagleBone Black
I wrote this code to drive the Adafruit 32×64 RGB Matrix using a Beaglebone Black and Adafruit’s IO Python library. The code is super simple – it writes a static color background to the top and bottom halves of the display through the shift registers and triggers the latch. It then iterates through the addresses so the LEDs for that row turn on and then goes to the next address. This should create a background color through persistence of vision.
Along the way I discovered:
The Python Library isn’t fast enough to drive the RGB matrix to provide a persistent display. There is flicker and when updating colors, you see each row turn on and off. You literally see the delay. There are ways to speed up the writes by using the PRU or direct IO writes. But I should not have to do that on a 1GHz processor.
The RGB matrix doesn’t light up unless you continually change the address. This was really annoying when I was figuring things out. I did not find this documented anywhere on the web except on rayslogic’s RGB writeup.
It appears the BeagleBone Black’s IO pins cannot push enough current to turn on the input schmitt triggers. I spent a lot of time being frustrated because I couldn’t get the green and blue LEDs to light up on my board.
Finally I noticed the green LEDs were barely on and had a brightness gradient from low to high address. My guess is the address pin was barely turning on which meant 0 was the most common address – hence the higher brightness on the lower rows. I got around this by plugging the G(reen) input directly into the 5V pin on the BeagleBone Black. Of course this now meant the green LEDs are always on!
Adafruit charges too much for the RGB panels. For example, the seller kbellenterprises on ebay has a 16×32 RGB matrix for $16 including shipping. Adafruit has the same panel for $25 and you pay for shipping.
Adafruit charges too much for male-to-male jumper wires. The seller funny-diy on ebay sells 40 for $1.80 with shipping while Adafruit charges $4+shipping for their so-called “premium” wires. Adafruit needs to rename them to “premium-priced“.
My Python Code:
https://bitbucket.org/xyzio/rgbmatrix/src/master/bb_python/logic.py
Sources:
http://www.rayslogic.com/propeller/programming/AdafruitRGB/AdafruitRGB.htm
https://learn.adafruit.com/setting-up-io-python-library-on-beaglebone-black/overview
Batch converting videos to play on the Nexus 7 using FFMPEG
How to batch convert videos to play on the Nexus 7 using FFMPEG in Python.
mypath = r'path_to_video_files' from os import listdir from os.path import isfile, join onlyfiles = [ f for f in listdir(mypath) if isfile(join(mypath,f)) ] for file in onlyfiles: infile = '"' + mypath + file + '"' outfile = '"' + mypath + file + '.mp4' + '"' ffmpegpath = r'ffmpeg.exe' cmd = r' -i ' + infile + ' -acodec libmp3lame -vcodec libx264 -aspect 16:9 -s 800x480 -ab 128k -b 512k -f mp4 -strict experimental ' + outfile import os os.system(ffmpegpath + cmd)