EjectABed Version 2 – Now Using the Raspberry Pi (Part 1)

I recently entered a hackster.io competition that centered around using Windows 10 on the Raspberry Pi.  I entered the ejectabed and it was accepted to the semi-final round.  My thought was to take the existing ejectabed controller from a Netduino and move it to a Raspberry Pi.  While doing that, I could open the ejectabed from my local area network to the internet so anyone could eject Sloan.
My 1st step was hook my Raspberry Pi up to my home network and deploy from Visual Studio to it.  Turns out, it was pretty straightforward.
I took a old Asus Portable Wireless Router and plugged it into my home workstation.  I then configured the router to act as an Access Point so that it would pass though all traffic from the router to which my developer workstation is attached.  I then attached the router to the PI and powered it though the PI’s USB port.  I then plugged the PI’s HDMI out to a spare monitor of mine.

20150822_112947

With all of the hardware plugged in, I headed over to Windows On Devices and followed the instructions on how to set up a Raspberry PI.  After installing the correct software on my developer workstation, flashing the SD card with win10, plugging the SD card into the PI, turning the PI on, and then remoting into the PI via powershell, I could see the PI on my local workstation via the Windows IoT Core Watcher and the PI showing its friendly welcome screen via HDMI.

Capture

20150822_101235

I then headed over to Visual Studio and copy/pasted the equisite “Hello IoT World” Blinky project to the Pi and watched the light go on and off.

20150822_104535

With that out of the way, I decided to look at controlling the light via Twitter and Azure.  The thought was to have the PI monitor a message queue on Azure and whenever there was a message, blink on or off (simulating the ejectabed being activated).  To that end, I went into Azure and created a basic storage account.  One of the nice things about Azure is that you get a queue out of the box when you create a storage account:

image

One of the not so nice things about Azure is that there is no way to control said Queue via their UI.  You have to create, push, and pull from the queue in code.  I went back to visual studio and added in the Azure Storage Nuget package

image

I then created a method to monitor the queue
1 internal async Task<Boolean> IsMessageOnQueue() 2 { 3 var storageConnectionString = "DefaultEndpointsProtocol=https;AccountName=ejectabed;AccountKey=xxx"; 4 var storageAccount = CloudStorageAccount.Parse(storageConnectionString); 5 var client = storageAccount.CreateCloudQueueClient(); 6 var queue = client.GetQueueReference("sloan"); 7 var queueExists = await queue.ExistsAsync(); 8 if (!queueExists) 9 { 10 GpioStatus.Text = "Queue does not exist or is unreachable."; 11 return false; 12 } 13 var message = await queue.GetMessageAsync(); 14 if (message != null) 15 { 16 await queue.DeleteMessageAsync(message); 17 return true; 18 } 19 GpioStatus.Text = "No message for the EjectABed."; 20 return false; 21 } 22

Then if there is a message, the PI would run the ejection sequence (in this case blink the light)
1 internal void RunEjectionSequence() 2 { 3 bedCommand.Eject(); 4 bedTimer = new DispatcherTimer(); 5 bedTimer.Interval = TimeSpan.FromSeconds(ejectionLength); 6 bedTimer.Tick += LightTimer_Tick; 7 bedTimer.Start(); 8 }

 

I deployed the code to the PI without a problem.  I then created a Basic console application to push messages to the queue that the PI could drain
1 class Program 2 { 3 static String storageConnectionString = "DefaultEndpointsProtocol=https;AccountName=ejectabed;AccountKey=xxx"; 4 5 static void Main(string[] args) 6 { 7 Console.WriteLine("Start"); 8 Console.WriteLine("Press The 'E' Key To Eject. Press 'Q' to quit..."); 9 10 var keyInfo = ConsoleKey.S; 11 do 12 { 13 keyInfo = Console.ReadKey().Key; 14 if (keyInfo == ConsoleKey.E) 15 { 16 CreateQueue(); 17 WriteToQueue(); 18 //ReadFromQueue(); 19 } 20 21 } while (keyInfo != ConsoleKey.Q); 22 23 Console.WriteLine("End"); 24 Console.ReadKey(); 25 } 26 27 private static void CreateQueue() 28 { 29 var storageAccount = CloudStorageAccount.Parse(storageConnectionString); 30 var client = storageAccount.CreateCloudQueueClient(); 31 var queue = client.GetQueueReference("sloan"); 32 queue.CreateIfNotExists(); 33 Console.WriteLine("Created Queue"); 34 } 35 36 private static void WriteToQueue() 37 { 38 var storageAccount = CloudStorageAccount.Parse(storageConnectionString); 39 var client = storageAccount.CreateCloudQueueClient(); 40 var queue = client.GetQueueReference("sloan"); 41 var message = new CloudQueueMessage("Eject!"); 42 queue.AddMessage(message); 43 Console.WriteLine("Wrote To Queue"); 44 } 45 46 47 private static void ReadFromQueue() 48 { 49 var storageAccount = CloudStorageAccount.Parse(storageConnectionString); 50 var client = storageAccount.CreateCloudQueueClient(); 51 var queue = client.GetQueueReference("sloan"); 52 var queueExists = queue.Exists(); 53 if (!queueExists) 54 Console.WriteLine("Queue does not exist"); 55 var message = queue.GetMessage(); 56 if (message != null) 57 { 58 queue.DeleteMessage(message); 59 Console.WriteLine("Message Found and Deleted"); 60 } 61 else 62 { 63 Console.WriteLine("No messages"); 64 } 65 } 66

I could then Write to the queue and the PI would read and react.  You can see it in action here:

image

With the queue up and running, I was ready to add in the ability for someone to Tweet to the queue.  I created a cloud service project and pointed to a new project that will monitor Twitter and then push to the queue:

image

image

The Twitter project uses the TweetInvi nuget package and is a worker project.  It makes a call to Twitter every 15 seconds and if there is a tweet to “ejectabed” with a person’s name, it will write to the queue (right now, only Sloan’s name is available)
1 type TwitterWorker() = 2 inherit RoleEntryPoint() 3 4 let storageConnectionString = RoleEnvironment.GetConfigurationSettingValue("storageConnectionString") 5 6 let createQueue(queueName) = 7 let storageAccount = CloudStorageAccount.Parse(storageConnectionString) 8 let client = storageAccount.CreateCloudQueueClient() 9 let queue = client.GetQueueReference(queueName); 10 queue.CreateIfNotExists() |> ignore 11 12 let writeToQueue(queueName) = 13 let storageAccount = CloudStorageAccount.Parse(storageConnectionString) 14 let client = storageAccount.CreateCloudQueueClient() 15 let queue = client.GetQueueReference(queueName) 16 let message = new CloudQueueMessage("Eject!") 17 queue.AddMessage(message) |> ignore 18 19 let writeTweetToQueue(queueName) = 20 createQueue(queueName) 21 writeToQueue(queueName) 22 23 let getKeywordFromTweet(tweet: ITweet) = 24 let keyword = "sloan" 25 let hasKeyword = tweet.Text.Contains(keyword) 26 let isFavourited = tweet.FavouriteCount > 0 27 match hasKeyword, isFavourited with 28 | true,false -> Some (keyword,tweet) 29 | _,_ -> None 30 31 32 override this.Run() = 33 while(true) do 34 let consumerKey = RoleEnvironment.GetConfigurationSettingValue("consumerKey") 35 let consumerSecret = RoleEnvironment.GetConfigurationSettingValue("consumerSecret") 36 let accessToken = RoleEnvironment.GetConfigurationSettingValue("accessToken") 37 let accessTokenSecret = RoleEnvironment.GetConfigurationSettingValue("accessTokenSecret") 38 39 let creds = Credentials.TwitterCredentials(consumerKey, consumerSecret, accessToken, accessTokenSecret) 40 Tweetinvi.Auth.SetCredentials(creds) 41 let matchingTweets = Tweetinvi.Search.SearchTweets("@ejectabed") 42 let matchingTweets' = matchingTweets |> Seq.map(fun t -> getKeywordFromTweet(t)) 43 |> Seq.filter(fun t -> t.IsSome) 44 |> Seq.map (fun t -> t.Value) 45 matchingTweets' |> Seq.iter(fun (k,t) -> writeTweetToQueue(k)) 46 matchingTweets' |> Seq.iter(fun (k,t) -> t.Favourite()) 47 48 Thread.Sleep(15000) 49 50 override this.OnStart() = 51 ServicePointManager.DefaultConnectionLimit <- 12 52 base.OnStart()

Deploying to Azure was a snap
image
And now when I Tweet,
image
the PI reacts.  Since Twitter does not allow the same Tweet to be sent again, I deleted it every time I wanted to send a new message to the queue.

Aggregation of WCPSS Tax Records with School Assignment

So the next part of my WCPSS hit parade, I need a way of combing the screen scrape that I did from the Wake County Tax Records as described here and the screen scrape of the Wake County Public School Assignments as found here.  Getting data from the DocumentDb is straight foreword as long as you don’t ask too much from the query syntax.

I created two functions that pull the tax record and the school assignment via the index number:

1 let getAssignment (id:int) = 2 let collection = client.CreateDocumentCollectionQuery(database.CollectionsLink).Where(fun dc -> dc.Id = "houseassignment").ToArray().FirstOrDefault() 3 let documentLink = collection.SelfLink 4 let queryString = "SELECT * FROM houseassignment WHERE houseassignment.houseIndex = " + id.ToString() 5 let query = client.CreateDocumentQuery(documentLink,queryString) 6 match query |> Seq.length with 7 | 0 -> None 8 | _ -> 9 let assignmentValue = query |> Seq.head 10 let assignment = HouseAssignment.Parse(assignmentValue.ToString()) 11 Some assignment 12 13 let getValuation (id:int) = 14 let collection = client.CreateDocumentCollectionQuery(database.CollectionsLink).Where(fun dc -> dc.Id = "taxinformation").ToArray().FirstOrDefault() 15 let documentLink = collection.SelfLink 16 let queryString = "SELECT * FROM taxinformation WHERE taxinformation.index = 1" 17 let query = client.CreateDocumentQuery(documentLink,queryString) 18 match query |> Seq.length with 19 | 0 -> None 20 | _ -> 21 let valuationValue = query |> Seq.head 22 let valuation = HouseValuation.Parse(valuationValue.ToString()) 23 Some valuation

Note option types are being used because there any many index values where there is not a corresponding record.  Also, there might a situation where the assignment has a record but the valuation does not and vice-versa so I created a function to only put the records together where there both records:

1 let assignSchoolTaxBase (id:int) = 2 let assignment = getAssignment(id) 3 let valuation = getValuation(id) 4 match assignment.IsSome,valuation.IsSome with 5 | true, true -> assignment.Value.Schools 6 |> Seq.map(fun s -> s, valuation.Value.AssessedValue) 7 |> Some 8 | _ -> None

And running this on the first record, we are getting expected. 

image

Also, running it on an index where there there is not a record, we are also getting expected

image

With the matching working, we need a way of bring all of the school arrays together and then aggregating the tax valuation.  I decided to take a step by step approach to this, even though there might be a more terse way to write it. 

1 #time 2 indexes |> Seq.map(fun i -> assignSchoolTaxBase(i)) 3 |> Seq.filter(fun s -> s.IsSome) 4 |> Seq.collect(fun s -> s.Value) 5 |> Seq.groupBy(fun (s,av) -> s) 6 |> Seq.map(fun (s,ss) -> s,ss |> Seq.sumBy(fun (s,av)-> av)) 7 |> Seq.toArray

When I run it on the 1st 10 records, the values come back as expected

image

So the last step is to run it on all 350,000 indexes (let indexes = [|1..350000|]).  The problem is that after a long period of time, things were not returning.  So this is where the power of Azure comes in –> there is no problem so large I can’t thow more cores at it.  I went to management portal and increased the VM to 8 cores

Capture

I then went into the code base and added pseq for the database calls (which I assume was taking the longest time):

1 #time 2 let indexes = [|1..350000|] 3 let assignedValues = indexes |> PSeq.map(fun i -> assignSchoolTaxBase(i)) |> Seq.toArray 4 5 let filePath = @"C:\Git\WakeCountySchoolScores\SchoolValuation.csv" 6 7 assignedValues 8 |> Seq.filter(fun s -> s.IsSome) 9 |> Seq.collect(fun s -> s.Value) 10 |> Seq.groupBy(fun (s,av) -> s) 11 |> Seq.map(fun (s,ss) -> s,ss |> Seq.sumBy(fun (s,av)-> av)) 12 |> Seq.map(fun (s,v) -> s + "," + v.ToString() + Environment.NewLine) 13 |> Seq.iter(fun (s) -> File.AppendAllText(filePath, s))

and after 2 hours:

image

Moving Files Between Azure Blob Storage Using F#

Dear Future Jamie:

In case you forget (again) about how to move files from one container to another on Azure Blob Storage, here is the code:

1 //http://azure.microsoft.com/en-us/documentation/articles/storage-dotnet-how-to-use-blobs/#configure-access 2 3 #r "../packages/WindowsAzure.Storage.4.3.0/lib/net40/Microsoft.WindowsAzure.Storage.dll" 4 5 open Microsoft.WindowsAzure.Storage 6 open Microsoft.WindowsAzure.Storage.Auth 7 open Microsoft.WindowsAzure.Storage.Blob 8 open System.IO 9 10 let connectionString = "youconnectionStringHere" 11 let storageAccount = CloudStorageAccount.Parse(connectionString) 12 let blobClient = storageAccount.CreateCloudBlobClient() 13 14 let sourceContainer = blobClient.GetContainerReference("source") 15 let targetContainer = blobClient.GetContainerReference("target") 16 17 let copyBlob (sourceBlob:CloudBlockBlob) = 18 sourceBlob.FetchAttributes() 19 let blobName = sourceBlob.Name 20 let arrayLength = int sourceBlob.Properties.Length 21 let byteArray = Array.zeroCreate(arrayLength) 22 sourceBlob.DownloadToByteArray(byteArray,0) |> ignore 23 24 let targetBlob = targetContainer.GetBlockBlobReference(blobName) 25 targetBlob.UploadFromByteArray(byteArray,0,arrayLength) 26 27 let sourceBlobs = sourceContainer.ListBlobs() 28 sourceBlobs |> Seq.map(fun b -> b :?> CloudBlockBlob) 29 |> Seq.iter(fun b -> copyBlob b) 30 31 32 let result = targetContainer.ListBlobs() 33 Seq.length result

Love,

Jamie from Dec 2014

PS: You really should exercise more….