Single scheduler - multiple worker architecture with GRPC and Go

Single scheduler (conductor) and multiple workers
Single scheduler (conductor) and multiple workers

Part 5 - The worker

Full codegithub.com/KorayGocmen/scheduler-worker-grpc
Entry point of the worker is the worker/worker.go file which contains the init and the main functions. Main functions calls 2 functions async, which are to start the GRPC server and to register on the scheduler.
worker/worker.go
package main

import (
  "fmt"
  "log"
  "os"
  "os/signal"
  "syscall"
)

var (
  // workerID is the id assigned by the scheduler
  // after registering on scheduler.
  workerID string
)

func init() {
  loadConfig()
}

// Entry point of the worker application.
func main() {

  go startGRPCServer()
  go registerWorker()

  sig := make(chan os.Signal)
  signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)

  for {
    select {
    case s := <-sig:
      fatal(fmt.Sprintf("Signal (%d) received, stopping\n", s))
    }
  }
}

func fatal(message string) {
  deregisterWorker()
  log.Fatalln(message)
}
The GRPC server of the worker calls 3 main important functions. These are the "startScript", "stopScript", "queryScript" functions. These functions call the script files or the "jobs" with the provided command and records the output of those scripts to a file on the worker.
worker/grpc_server.go
package main

import (
  "context"
  "fmt"
  "log"
  "net"

  pb "github.com/koraygocmen/scheduler-worker-grpc/jobscheduler"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials"
)

// server holds the GRPC worker server instance.
type server struct{}

// StartJob starts a new job with the given command and the path
// Command can be any exectuable command on the worker and the path
// is the relative path of the script.
func (s *server) StartJob(ctx context.Context, r *pb.StartJobReq) (*pb.StartJobRes, error) {
  jobID, err := startScript(r.Command, r.Path)
  if err != nil {
    return nil, err
  }

  res := pb.StartJobRes{
    JobID: jobID,
  }

  return &res, nil
}

// StopJob stops a running job with the given job id.
func (s *server) StopJob(ctx context.Context, r *pb.StopJobReq) (*pb.StopJobRes, error) {
  if err := stopScript(r.JobID); err != nil {
    return nil, err
  }

  return &pb.StopJobRes{}, nil
}

// QueryJob returns the status of job with the given job id.
// The status of the job is inside the `Done` variable in response
// and it specifies if the job is still running (true), or stopped (false).
func (s *server) QueryJob(ctx context.Context, r *pb.QueryJobReq) (*pb.QueryJobRes, error) {
  jobDone, jobError, jobErrorText, err := queryScript(r.JobID)
  if err != nil {
    return nil, err
  }

  res := pb.QueryJobRes{
    Done:      jobDone,
    Error:     jobError,
    ErrorText: jobErrorText,
  }
  return &res, nil
}

// startGRPCServer starts the GRPC server for the worker.
// Scheduler can make grpc requests to this server to start,
// stop, query status of jobs etc.
func startGRPCServer() {
  lis, err := net.Listen("tcp", config.GRPCServer.Addr)
  if err != nil {
    fatal(fmt.Sprintf("failed to listen: %v", err))
  }

  var opts []grpc.ServerOption
  if config.GRPCServer.UseTLS {
    creds, err := credentials.NewServerTLSFromFile(
      config.GRPCServer.CrtFile,
      config.GRPCServer.KeyFile,
    )
    if err != nil {
      fatal(fmt.Sprint("Failed to generate credentials", err))
    }
    opts = []grpc.ServerOption{grpc.Creds(creds)}
  }

  log.Println("GRPC Server listening on", config.GRPCServer.Addr)

  grpcServer := grpc.NewServer(opts...)
  pb.RegisterWorkerServer(grpcServer, &server{})
  grpcServer.Serve(lis)
}
jobs.go contains those 3 important functions to start, query and stop scripts. Each job is kept in the jobs map with job id as the key.
worker/jobs.go
package main

import (
  "errors"
  "fmt"
  "os"
  "os/exec"
  "sync"

  "github.com/google/uuid"
)

// jobsMutex is the lock to access jobs map.
// jobs is the map that holds current/past jobs.
// 		- key: job id
// 		- value: pointer to the created job object.
var (
  jobsMutex = &sync.Mutex{}
  jobs      = make(map[string]*job)
)

// job holds information about the ongoing or past jobs,
// that were triggered by the scheduler.
// 		- id: UUID assigned by the worker and sent back to the scheduler.
// 		- command: command which the scheduler run the job with
// 		- path: path to the job file/executable sent by the scheduler.
// 		- outFilePath: file path to where the output of the job will be piped.
// 		- cmd: pointer to the cmd.Exec command to get job status etc.
// 		- done: whether if job is done (default false)
//    - err: error while running the job (default nil)
type job struct {
  id          string
  command     string
  path        string
  outFilePath string
  cmd         *exec.Cmd
  done        bool
  err         error
}

// startScript start a new job.
// Returns:
//		- string: job id
//		- error: nil if no error
func startScript(command, path string) (string, error) {
  jobsMutex.Lock()
  defer jobsMutex.Unlock()

  jobID := uuid.New().String()
  outFilePath := fmt.Sprintf("%s.out", jobID)

  outfile, err := os.Create(outFilePath)
  if err != nil {
    return "", err
  }
  defer outfile.Close()

  cmd := exec.Command(command, path)
  cmd.Stdout = outfile

  if err = cmd.Start(); err != nil {
    return "", err
  }

  newJob := job{
    id:          jobID,
    command:     command,
    path:        path,
    outFilePath: outFilePath,
    cmd:         cmd,
    done:        false,
    err:         nil,
  }
  jobs[jobID] = &newJob

  // Get the status of the job async.
  go func() {
    if err := cmd.Wait(); err != nil {
      newJob.err = err
    }
    newJob.done = true
  }()

  return jobID, nil
}

// stopScript stop a running job.
// Returns:
//		- error: nil if no error
func stopScript(jobID string) error {
  jobsMutex.Lock()
  defer jobsMutex.Unlock()

  job, found := jobs[jobID]
  if !found {
    return errors.New("job not found")
  }

  if job.done {
    return nil
  }

  if err := job.cmd.Process.Kill(); err != nil {
    return err
  }

  return nil
}

// queryScript check if job is done or not.
// Returns:
//		- bool: job status (true if job is done)
//		- bool: job error (true if job had an error)
// 		- string: job error text ("" if job error is false)
//		- error: nil if no error
func queryScript(jobID string) (bool, bool, string, error) {
  jobsMutex.Lock()
  defer jobsMutex.Unlock()

  job, found := jobs[jobID]
  if !found {
    return false, false, "", errors.New("job not found")
  }

  var (
    jobDone      = job.done
    jobError     = false
    jobErrorText = ""
  )

  if job.err != nil {
    jobError = true
    jobErrorText = job.err.Error()
  }

  return jobDone, jobError, jobErrorText, nil
}
The MOST important piece of this whole project is how the script is started on the worker. The script is started inside a nameless concurrent function inside the "startScript" function in order to still have access to the job object it was started for. This way, when the job is finished or when an error occurs, I don't need to go find the job with the key and try to find a way to pass it etc. I can just access the calling job object since the script is running inside an anonymous coroutine that is still in the same context.
There is quite a bit of stuff I left out, such as the proto file, how to build to a go file that is shared between the worker and the scheduler. The config file for both and the entire Makefile. They can be found on my github repo. But the stuff I touched upon was the most important pieces.
Koray Gocmen
Koray Gocmen

University of Toronto, Computer Engineering.

Architected and implemented reliable infrastructures and worked as the lead developer for multiple startups.