Single scheduler - multiple worker architecture with GRPC and Go

Single scheduler (conductor) and multiple workers
Single scheduler (conductor) and multiple workers

Part 4 - The scheduler

Full codegithub.com/KorayGocmen/scheduler-worker-grpc
Entry point of the scheduler is the scheduler/scheduler.go file which contains the init and the main functions. Main functions calls 2 functions async, which are to start the HTTP server (api) and the GRPC server.
scheduler/scheduler.go
package main

import (
  "log"
  "os"
  "os/signal"
  "syscall"
)

func init() {
  loadConfig()
}

// Entry point of the scheduler application.
func main() {

  go api()
  go startGRPCServer()

  sig := make(chan os.Signal)
  signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)

  for {
    select {
    case s := <-sig:
      log.Fatalf("Signal (%d) received, stopping\n", s)
    }
  }
}
API uses Juline Schmidt's http router.github.com/julienschmidt/httprouter
API exposes the 3 api requests to start/query/stop jobs on a worker. These functions are pretty standard and not important. The important parts are the 3 functions that does all the work: "startJobOnWorker", "stopJobOnWorker", "queryJobOnWorker".
scheduler/api.go
package main

import (
  "encoding/json"
  "io/ioutil"
  "log"
  "net/http"

  "github.com/julienschmidt/httprouter"
)

const (
  contentTypeHeader     = "Content-Type"
  applicationJSONHeader = "application/json"
)

func apiStartJob(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  startJobReq := apiStartJobReq{}

  w.Header().Set(contentTypeHeader, applicationJSONHeader)

  body, err := ioutil.ReadAll(r.Body)
  if err != nil {
    w.WriteHeader(http.StatusInternalServerError)
    json.NewEncoder(w).Encode(apiError{Error: err.Error()})
    return
  }

  err = json.Unmarshal(body, &startJobReq)
  if err != nil {
    w.WriteHeader(http.StatusInternalServerError)
    json.NewEncoder(w).Encode(apiError{Error: err.Error()})
    return
  }

  jobID, err := startJobOnWorker(startJobReq)
  if err != nil {
    w.WriteHeader(http.StatusInternalServerError)
    json.NewEncoder(w).Encode(apiError{Error: err.Error()})
    return
  }

  w.WriteHeader(http.StatusCreated)
  json.NewEncoder(w).Encode(apiStartJobRes{JobID: jobID})
}

func apiStopJob(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  stopJobReq := apiStopJobReq{}

  w.Header().Set(contentTypeHeader, applicationJSONHeader)

  body, err := ioutil.ReadAll(r.Body)
  if err != nil {
    w.WriteHeader(http.StatusInternalServerError)
    json.NewEncoder(w).Encode(apiError{Error: err.Error()})
    return
  }

  err = json.Unmarshal(body, &stopJobReq)
  if err != nil {
    w.WriteHeader(http.StatusInternalServerError)
    json.NewEncoder(w).Encode(apiError{Error: err.Error()})
    return
  }

  if err := stopJobOnWorker(stopJobReq); err != nil {
    w.WriteHeader(http.StatusInternalServerError)
    json.NewEncoder(w).Encode(apiError{Error: err.Error()})
    return
  }

  w.WriteHeader(http.StatusOK)
  json.NewEncoder(w).Encode(apiStopJobRes{Success: true})
}

func apiQueryJob(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
  queryJobReq := apiQueryJobReq{}

  w.Header().Set(contentTypeHeader, applicationJSONHeader)

  body, err := ioutil.ReadAll(r.Body)
  if err != nil {
    w.WriteHeader(http.StatusInternalServerError)
    json.NewEncoder(w).Encode(apiError{Error: err.Error()})
    return
  }

  err = json.Unmarshal(body, &queryJobReq)
  if err != nil {
    w.WriteHeader(http.StatusInternalServerError)
    json.NewEncoder(w).Encode(apiError{Error: err.Error()})
    return
  }

  jobDone, jobError, jobErrorText, err := queryJobOnWorker(queryJobReq)
  if err != nil {
    w.WriteHeader(http.StatusInternalServerError)
    json.NewEncoder(w).Encode(apiError{Error: err.Error()})
    return
  }

  queryJobRes := apiQueryJobRes{
    Done:      jobDone,
    Error:     jobError,
    ErrorText: jobErrorText,
  }

  w.WriteHeader(http.StatusOK)
  json.NewEncoder(w).Encode(queryJobRes)
}

func createRouter() *httprouter.Router {
  router := httprouter.New()

  router.POST("/start", apiStartJob)
  router.POST("/stop", apiStopJob)
  router.POST("/query", apiQueryJob)

  return router
}

func api() {
  srv := &http.Server{
    Addr:    config.HTTPServer.Addr,
    Handler: createRouter(),
  }

  log.Println("HTTP Server listening on", config.HTTPServer.Addr)
  if err := srv.ListenAndServe(); err != nil {
    log.Fatal(err)
  }
}
Those 3 functions are located at grpc_translator.go file and they translate http requests to grpc requests on specified workers.
scheduler/grpc_translator.go
package main

import (
  "context"
  "errors"
  "time"

  pb "github.com/koraygocmen/scheduler-worker-grpc/jobscheduler"
  "google.golang.org/grpc"
)

// startJobOnWorker translates the http start request to grpc
// request on the workers.
// Returns:
// 		- string: job id
// 		- error: nil if no error
func startJobOnWorker(req apiStartJobReq) (string, error) {
  workersMutex.Lock()
  defer workersMutex.Unlock()

  worker, ok := workers[req.WorkerID]
  if !ok {
    return "", errors.New("worker not found")
  }

  conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
  if err != nil {
    return "", err
  }
  defer conn.Close()
  c := pb.NewWorkerClient(conn)

  ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  defer cancel()

  startJobReq := pb.StartJobReq{
    Command: req.Command,
    Path:    req.Path,
  }

  r, err := c.StartJob(ctx, &startJobReq)
  if err != nil {
    return "", err
  }

  return r.JobID, nil
}

// stopJobOnWorker translates the http stop request to grpc
// request on the workers.
// Returns:
// 		- error: nil if no error
func stopJobOnWorker(req apiStopJobReq) error {
  workersMutex.Lock()
  defer workersMutex.Unlock()

  worker, ok := workers[req.WorkerID]
  if !ok {
    return errors.New("worker not found")
  }

  conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
  if err != nil {
    return err
  }
  defer conn.Close()
  c := pb.NewWorkerClient(conn)

  ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  defer cancel()

  stopJobReq := pb.StopJobReq{
    JobID: req.JobID,
  }

  if _, err := c.StopJob(ctx, &stopJobReq); err != nil {
    return err
  }

  return nil
}

// queryJobOnWorker translates the http query request to grpc
// request on the workers.
// Returns:
//		- bool: job status (true if job is done)
//		- bool: job error (true if job had an error)
// 		- string: job error text ("" if job error is false)
//		- error: nil if no error
func queryJobOnWorker(req apiQueryJobReq) (bool, bool, string, error) {
  workersMutex.Lock()
  defer workersMutex.Unlock()

  worker, ok := workers[req.WorkerID]
  if !ok {
    return false, false, "", errors.New("worker not found")
  }

  conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
  if err != nil {
    return false, false, "", err
  }
  defer conn.Close()
  c := pb.NewWorkerClient(conn)

  ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  defer cancel()

  queryJobReq := pb.QueryJobReq{
    JobID: req.JobID,
  }

  r, err := c.QueryJob(ctx, &queryJobReq)
  if err != nil {
    return false, false, "", err
  }

  return r.Done, r.Error, r.ErrorText, nil
}
grpc translator calls worker functions to create and delete workers. These 2 functions just assigns an id to a new worker and adds the new worker to workers map and vice versa. Pretty standard. I am too bored to add them here.
Finally the last important bit is the GRPC server of the scheduler. Which is used by the workers to register and deregister on scheduler.
scheduler/grpc_server.go
package main

import (
  "context"
  "errors"
  "time"

  pb "github.com/koraygocmen/scheduler-worker-grpc/jobscheduler"
  "google.golang.org/grpc"
)

// startJobOnWorker translates the http start request to grpc
// request on the workers.
// Returns:
// 		- string: job id
// 		- error: nil if no error
func startJobOnWorker(req apiStartJobReq) (string, error) {
  workersMutex.Lock()
  defer workersMutex.Unlock()

  worker, ok := workers[req.WorkerID]
  if !ok {
    return "", errors.New("worker not found")
  }

  conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
  if err != nil {
    return "", err
  }
  defer conn.Close()
  c := pb.NewWorkerClient(conn)

  ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  defer cancel()

  startJobReq := pb.StartJobReq{
    Command: req.Command,
    Path:    req.Path,
  }

  r, err := c.StartJob(ctx, &startJobReq)
  if err != nil {
    return "", err
  }

  return r.JobID, nil
}

// stopJobOnWorker translates the http stop request to grpc
// request on the workers.
// Returns:
// 		- error: nil if no error
func stopJobOnWorker(req apiStopJobReq) error {
  workersMutex.Lock()
  defer workersMutex.Unlock()

  worker, ok := workers[req.WorkerID]
  if !ok {
    return errors.New("worker not found")
  }

  conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
  if err != nil {
    return err
  }
  defer conn.Close()
  c := pb.NewWorkerClient(conn)

  ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  defer cancel()

  stopJobReq := pb.StopJobReq{
    JobID: req.JobID,
  }

  if _, err := c.StopJob(ctx, &stopJobReq); err != nil {
    return err
  }

  return nil
}

// queryJobOnWorker translates the http query request to grpc
// request on the workers.
// Returns:
//		- bool: job status (true if job is done)
//		- bool: job error (true if job had an error)
// 		- string: job error text ("" if job error is false)
//		- error: nil if no error
func queryJobOnWorker(req apiQueryJobReq) (bool, bool, string, error) {
  workersMutex.Lock()
  defer workersMutex.Unlock()

  worker, ok := workers[req.WorkerID]
  if !ok {
    return false, false, "", errors.New("worker not found")
  }

  conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
  if err != nil {
    return false, false, "", err
  }
  defer conn.Close()
  c := pb.NewWorkerClient(conn)

  ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  defer cancel()

  queryJobReq := pb.QueryJobReq{
    JobID: req.JobID,
  }

  r, err := c.QueryJob(ctx, &queryJobReq)
  if err != nil {
    return false, false, "", err
  }

  return r.Done, r.Error, r.ErrorText, nil
}
Alright, worker is going to be slightly more complicated. But hopefully will be the last piece.
Koray Gocmen
Koray Gocmen

University of Toronto, Computer Engineering.

Architected and implemented reliable infrastructures and worked as the lead developer for multiple startups.