Came back to the software side of my night vision security camera project after taking some time to learn about electronics and I found it a bit hard to retrace the logic of my program.
I have a bunch of sensors that communicate their state to different workers and I think I fudged the design big time.
Here is a simple graphic of how my gophers go about their business, communicating via channels.
Camera----------------StreamServer-------------Robot------------LightSensor
StreamFrames --------->
LocalVideo ---------------------------------->
BusyOrIdle--------------->
<------------------------BusyOrIdle
<-----------------------------------------LightLevel
<----------------LightLevel
And the code.
main.go
package main
import (
"apps/picam/camera"
"apps/picam/hardware"
"apps/picam/server"
"flag"
"log"
"os"
"os/signal"
"time"
rpio "github.com/stianeikeland/go-rpio"
)
var light int
func init() {
if err := rpio.Open(); err != nil {
log.Fatal(err)
}
hardware.PinsPullDown()
hardware.SetPinMode()
hardware.SetPinState()
}
func main() {
var port = flag.String("port", ":5000", "The port on which the camera listens for clients")
var password = flag.String("pwd", "", "Camera password")
flag.Parse()
defer rpio.Close()
stream := make(chan []byte, 1) //chan for streaming video to client
localFeed := make(chan []byte, 1) //chan for local video recording
robotState := make(chan string, 1)
serverState := make(chan string, 1)
l1 := make(chan int, 1)
l2 := make(chan int, 1)
done := make(chan bool, 1)
videoFeedListeners := []chan []byte{stream, localFeed}
videoPipe, err := camera.Start("/home/pi/src/picam/v4l2")
checkErr(err)
defer videoPipe.Close()
security := hardware.SecurityRobot{
VideoFeed: localFeed, //used to record video locally
Done: done,
RobotState: robotState, //refers to, if motion sensor detected movement and camera is recording
ServerState: serverState, //refers to, if server is streaming
}
cam := camera.Camera{
Done: done,
Reader: videoPipe,
Forwarder: videoFeedListeners,
}
server := server.NewServer(robotState, serverState)
light := hardware.ReadLightLevels(done)
motion := hardware.DetectMovement(done)
go cam.ReadForwardFrame()
go cam.Statistics()
go func(done chan bool) {
for {
select {
case <-done:
log.Println("Light sensor communicator exit - √")
return
case level := <-light:
l1 <- level
l2 <- level
}
}
}(done)
go server.Run(port, password, stream, l2, done)
go security.Video(l1, motion)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
for range c {
hardware.PinsPullDown()
close(done)
break
}
log.Println("Interrupt signal received. Shutting camera down")
time.Sleep(2 * time.Second)
}
func checkErr(err error) {
if err != nil {
log.Fatal(err)
}
}
camera.go
package camera
import (
"bytes"
"fmt"
"io"
"log"
"os/exec"
"time"
)
type Camera struct {
Done chan bool
Reader io.ReadCloser
Forwarder []chan []byte
FPS int
FrameSize int
}
func Start(command string) (io.ReadCloser, error) {
var stderr bytes.Buffer
cmd := exec.Command(command)
cmd.Stderr = &stderr
pipe, _ := cmd.StdoutPipe()
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf(fmt.Sprint(err) + ": " + stderr.String())
}
return pipe, nil
}
func (c *Camera) ReadForwardFrame() {
for {
frame := make([]byte, 65507)
n, err := c.Reader.Read(frame)
if err != nil {
log.Println(err)
log.Println("Stopping reader")
return
}
if n > 0 {
frame = frame[0:n]
c.FrameSize = n
for _, listener := range c.Forwarder {
listener <- frame
}
}
c.FPS++
}
}
func (c *Camera) Statistics() {
ticker := time.NewTicker(time.Second)
for range ticker.C {
select {
case <-c.Done:
log.Println("Stopping statistics")
return
default:
fmt.Printf("\rfps: %v, size: %v ", c.FPS, c.FrameSize)
c.FPS = 0
}
}
}
server.go
package server
import (
"apps/picam/hardware"
"fmt"
"log"
"net"
"strconv"
"time"
)
type session struct {
Conn *net.UDPConn
Message chan string
Password string
Address chan *net.UDPAddr
ClientAdress *net.UDPAddr
}
type Server struct {
SecurityRobotState chan string
ServerState chan string
}
type monitor struct {
Conn *net.UDPConn
Kill chan bool
}
func newMonitor(port string) *monitor {
return &monitor{
Conn: bindAddress(port),
Kill: make(chan bool),
}
}
func NewServer(robotState chan string, serverState chan string) *Server {
return &Server{
SecurityRobotState: robotState,
ServerState: serverState,
}
}
func (m *monitor) detectTimeOut(delay time.Duration, done <-chan bool) {
buffer := make([]byte, 10)
m.Conn.SetReadDeadline(time.Now().Add(delay))
for {
select {
case <-done:
log.Println("Shutting down monitor")
return
default:
n, err := m.Conn.Read(buffer)
if err != nil {
log.Println(err)
}
if n > 0 {
m.Conn.SetReadDeadline(time.Now().Add(delay))
}
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
log.Println("No response")
m.Kill <- true
return
}
}
}
}
func (s *Server) Run(port, password *string, frame <-chan []byte, lightLvL <-chan int, done <-chan bool) {
var robotState string
light := hardware.High
session := newSession(port, password)
go session.listenForClients(done)
log.Println("Listening for clients on: ", session.Conn.LocalAddr())
defer session.Conn.Close()
heartBeatPort := stringPortToInt(*port)
monitor := newMonitor(fmt.Sprintf(":%v", heartBeatPort+1))
defer monitor.Conn.Close()
for {
select {
case <-done:
log.Println("Stopping server")
return
case address := <-session.Address:
if robotState != "recording" && light == hardware.Low {
hardware.BaseHigh()
}
log.Println("New client authenticated from address: ", address)
go monitor.detectTimeOut(time.Second*5, done)
session.ClientAdress = address
s.ServerState <- "streaming"
case f := <-frame:
switch session.ClientAdress {
case nil:
s.ServerState <- "idle"
continue
default:
_, err := session.Conn.WriteToUDP(f, session.ClientAdress)
if err != nil {
log.Println(err)
}
}
case <-monitor.Kill:
log.Println("Client timed out")
if robotState != "recording" {
hardware.BaseLow()
}
session.ClientAdress = nil
case robotState = <-s.SecurityRobotState:
continue
case light = <-lightLvL:
continue
//using a default case to force the loop to keep running. Is there a better way?
default:
continue
}
}
}
func newSession(port, password *string) *session {
return &session{
Conn: bindAddress(*port),
Message: make(chan string),
Password: *password,
Address: make(chan *net.UDPAddr),
}
}
func (s *session) listenForClients(done <-chan bool) {
for {
select {
case <-done:
log.Println("Stopping listener")
return
default:
buf := make([]byte, 1024)
n, addr, err := s.Conn.ReadFromUDP(buf)
if err != nil {
log.Println("Read error. ", err)
}
m := buf[0:n]
if s.Password == "" {
s.Address <- addr
continue
}
if s.authenticate(string(m), addr) {
s.Address <- addr
}
}
}
}
drivers.go
package hardware
import (
"bufio"
"fmt"
"log"
"os"
"strings"
"time"
"apps/picam/storage"
rpio "github.com/stianeikeland/go-rpio"
"golang.org/x/exp/io/spi"
)
const (
Low = iota
High
moving
still
)
type SecurityRobot struct {
VideoFeed chan []byte
Done chan bool
recording bool
RobotState chan string
ServerState chan string
}
func (r *SecurityRobot) Video(lightLevel <-chan int, motion <-chan int) {
var serverState string
var light int
var m int
frame := make(chan []byte, 1)
stop := make(chan bool, 1)
for {
select {
case <-r.Done:
log.Println("Local recording exit - ")
return
case light = <-lightLevel:
continue
case m = <-motion:
continue
case f := <-r.VideoFeed:
switch m {
case still:
if r.recording {
stop <- true
r.recording = false
r.RobotState <- "idle"
if serverState != "streaming" {
BaseLow() //light switch OFF
}
}
case moving:
if !r.recording {
if light == Low {
BaseHigh() //light switch ON
}
r.recording = true
r.RobotState <- "recording"
now := time.Now()
fname := fmt.Sprintf("%s.h264", now.Format("02-01-2006T15:04:05"))
fname = strings.Replace(fname, ":", "-", 3)
go writeToFile(fname, frame, stop)
}
frame <- f
}
case serverState = <-r.ServerState:
continue
}
}
}
func writeToFile(fname string, frame <-chan []byte, stop chan bool) {
file, err := os.Create(fname)
if err != nil {
log.Fatal(err)
}
fw := bufio.NewWriter(file)
log.Println("Recording...")
for {
select {
case <-stop:
fw.Flush()
file.Close()
//upload to google drive
log.Println("Recording stopped")
log.Println("Uploading to google drive")
go googleDrive(fname)
return
case f := <-frame:
_, err = fw.Write(f)
if err != nil {
log.Println(err)
}
}
}
}
func googleDrive(filename string) {
id := drive.DriveUpload(filename)
log.Println("upload success")
drive.ShareFile(id)
log.Println("sharing video successful")
if err := os.Remove(filename); err == nil {
log.Println("local copy removed")
}
}
//driver for chip mcp3008
//converts analog data to digitial
func ReadLightLevels(done chan bool) chan int {
lightLvL := make(chan int, 1)
go func() {
var l int //setting default light lvl to dark
dev, err := spi.Open(&spi.Devfs{
Dev: "/dev/spidev0.0", Mode: spi.Mode0, MaxSpeed: 1000000})
if err != nil {
log.Println("SPI error", err)
log.Println("Did you forget to enable SPI interface?")
return
}
defer dev.Close()
tick := time.Tick(time.Second * 1)
write := []byte{1, 8 << 4, 0}
for {
select {
case <-done:
log.Println("Light sensor goroutine exit - √")
return
case <-tick:
read := make([]byte, 3)
if err := dev.Tx(write, read); err != nil {
fmt.Println("SPI read/write error. ", err)
return
}
code := int(read[1]&3)<<8 + int(read[2])
voltage := (float32(code) * 3.3) / 1024
if voltage <= 0.750 && l == High {
l = Low
lightLvL <- Low
}
if voltage >= 0.750 && l == Low {
l = High
lightLvL <- High
}
}
}
}()
return lightLvL
}
func DetectMovement(done chan bool) chan int {
motion := make(chan int, 1)
tick := time.Tick(time.Microsecond * 10)
go func() {
for {
select {
case <-done:
log.Println("PIR detector goroutine exit - √")
return
case <-tick:
switch rpio.ReadPin(pirInput) {
case rpio.High:
motion <- moving
case rpio.Low:
motion <- still
}
}
}
}()
return motion
}
func BaseHigh() {
transistorBase.High()
}
func BaseLow() {
transistorBase.Low()
}