Commit 9079da3f authored by Pasan Mallawaarachchi's avatar Pasan Mallawaarachchi 💻

email worker with build instructions

parent e695b4f5
Pipeline #1945 failed with stages
couchdb:
port: '5984'
host: 0.0.0.0
username: root
password: root
curl:
check_mail: http://root:root@0.0.0.0:5984/outbox/_all_docs/?limit=1 #http://root:root@0.0.0.0:5984/outbox/_all_docs/?limit=1
get_outbox_mail: http://root:root@0.0.0.0:5984/outbox/
delete_outbox_mail: http://root:root@0.0.0.0:5984/outbox/
add_sent_mail: http://root:root@0.0.0.0:5984/sentitems/
add_mail_log: http://root:root@0.0.0.0:5984/mail_logs/
protocols:
method: http://
requests:
get: GET
post: POST
put: PUT
delete: DELETE
header_type: Content-Type
header_json: application/json
mailgun:
domain: sandbox69b93297d9f045b5ae2383ee3b5d234c.mailgun.org
apikey: 0b6326227e0797740c93e5cfffc901c1-49a2671e-e9e05eca
error_report:
error_sender: qa@parkwaylabs.com
error_head: Email Worker Crashed
error_recipients: pasan@parkwaylabs.com
app_mode:
production: "0" # 1, 0
package main
/**
* TODO : Add correct CouchDB URL with username and password
* TODO : Add Mailgun API_KEY and Domain
* Compile App :
* - run "go build" this will create a new binary file
* - chmod +x<file_name>
* - ./<file_name>
*/
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"strings"
"sync"
"time"
mailgun "github.com/mailgun/mailgun-go"
yaml "gopkg.in/yaml.v2"
)
/**
* worker configuration structure
*/
type Config struct {
Couchdb struct {
Port string `yaml:"port"`
Host string `yaml:"host"`
Username string `yaml:"username"`
Password string `yaml:"password"`
} `yaml:"couchdb"`
Curl struct {
CheckMail string `yaml:"check_mail"`
GetOutboxMail string `yaml:"get_outbox_mail"`
DeleteOutboxMail string `yaml:"delete_outbox_mail"`
AddSentMail string `yaml:"add_sent_mail"`
AddMailLog string `yaml:"add_mail_log"`
} `yaml:"curl"`
Protocols struct {
Request string `yaml:"method"`
} `yaml:"protocols"`
Requests struct {
Get string `yaml:"get"`
Post string `yaml:"post"`
Put string `yaml:"put"`
Delete string `yaml:"delete"`
HeaderType string `yaml:"header_type"`
HeaderJson string `yaml:"header_json"`
} `yaml:"requests"`
Mailgun struct {
Domain string `yaml:"domain"`
Apikey string `yaml:"apikey"`
} `yaml:"mailgun"`
ErrorReport struct {
ErrorSender string `yaml:"error_sender"`
ErrorHead string `yaml:"error_head"`
ErrorRecipients string `yaml:"error_recipients"`
} `yaml:"error_report"`
AppMod struct {
Production string `yaml:"production"`
} `yaml:"app_mode"`
}
/**
* couchDB outbox document structure
*/
type Couchdb struct {
TotalRows int `json:"total_rows"`
Offset int `json:"offset"`
Rows []struct {
ID string `json:"id"`
Key string `json:"key"`
Value struct {
Rev string `json:"rev"`
} `json:"value"`
} `json:"rows"`
}
/**
* email document structure
*/
type MailBody struct {
ID string `json:"_id"`
Rev string `json:"_rev"`
From string `json:"from"`
To string `json:"to"`
Cc string `json:"cc"`
Subject string `json:"subject"`
Body string `json:"body"`
}
/**
* Global VAriables
*/
var childs = 0 // No of subchilds / workers
var waitGroup sync.WaitGroup
var mailData chan string
var outboxReadTime chan string
var outboxDeleteTime chan string
func main() {
// get configurations
var config Config
source, sourceErr := ioutil.ReadFile("config.yaml")
if sourceErr != nil {
panic(sourceErr)
}
configErr := yaml.Unmarshal(source, &config)
if configErr != nil {
panic(configErr)
}
if config.AppMod.Production == "0" {
fmt.Println("Starting the Email Manager...")
}
time.Sleep(time.Millisecond * 300)
mailData = make(chan string)
outboxReadTime = make(chan string)
outboxDeleteTime = make(chan string)
for i := 1; i < 4; i++ {
waitGroup.Add(1)
go worker(i)
}
for 1 == 1 {
if childs < 4 {
childs++
//time.Sleep(time.Second * 1)
/**
* make cURL request to read "OUTBOX" DB for documents
* the following cURL request returns the oldest document of the DB
*
*/
req, reqErr := http.NewRequest(config.Requests.Get, config.Curl.CheckMail, nil)
//req, reqErr := http.NewRequest("GET", "http://root:root@0.0.0.0:5984/outbox/_all_docs/?limit=1", nil)
req.Header.Set(config.Requests.HeaderType, config.Requests.HeaderJson)
resp, respErr := http.DefaultClient.Do(req)
if reqErr != nil {
alert := " Error creating request to OUTBOX DB \n" + reqErr.Error()
alertAdmin(alert)
}
if respErr != nil {
alert := " Error in getting email from the OUTBOX DB \n" + respErr.Error()
alertAdmin(alert)
}
defer resp.Body.Close()
respbody, _ := ioutil.ReadAll(resp.Body)
//fmt.Println(string(respbody))
/**
* struct the response of the of the cURL request
* after "Unmarshal", the "newDoc" holds the data of the cURL request
*/
var newDoc Couchdb
err1 := json.Unmarshal(respbody, &newDoc)
if err1 != nil {
alert := " Error converting the OUTBOX response to JSON String \n" + err1.Error()
alertAdmin(alert)
}
/**
* if: the "OUTBOX" DB has new records, it returns the number of Rows
* so there are new documents the value is always greaterthan 0
* and we send newEmail Document as JSONstring through the channel
*
* else: send "ko"
*/
if newDoc.TotalRows > 0 {
/**
* get Document ID and Revision ID
* @type {[type]}
*/
newMailID := newDoc.Rows[0].ID
newMailRev := newDoc.Rows[0].Value.Rev
/**
* Now we get the documents' email data from the "OUTBOX" DB
* cURL reuest to get the document
*/
req, reqErr := http.NewRequest(config.Requests.Get, config.Curl.GetOutboxMail+newMailID, nil)
req.Header.Set(config.Requests.HeaderType, config.Requests.HeaderJson)
resp, respErr := http.DefaultClient.Do(req)
if reqErr != nil {
alert := " Error requesting the OUTBOX mail data \n" + reqErr.Error()
alertAdmin(alert)
}
if respErr != nil {
alert := " Error getting the OUTBOX mail data \n" + respErr.Error()
alertAdmin(alert)
}
defer resp.Body.Close()
respbody, _ := ioutil.ReadAll(resp.Body)
newMail := string(respbody)
readTimeOutbox := time.Now().String()
// mailReadTime := mailReadTime.String()
/**
* structuring the email document
*
*/
var mail MailBody
mailErr := json.Unmarshal([]byte(newMail), &mail)
if mailErr != nil {
alert := " Error converting OUTBOX DB mail data into JSON string \n" + mailErr.Error()
alertAdmin(alert)
}
/**
* Send email document as JSONstring
*/
mailData <- (newMail)
outboxReadTime <- (readTimeOutbox)
// data <- ("Worker " + strconv.Itoa(childs))
/**
* Delete the document from the "OUTBOX"
*/
delReq, delReqerr := http.NewRequest(config.Requests.Delete, config.Curl.DeleteOutboxMail+newMailID+"?rev="+newMailRev, nil)
if delReqerr != nil {
alert := " Error requesting to delete mail from OUTBOX \n" + delReqerr.Error()
alertAdmin(alert)
}
delReq.Header.Set(config.Requests.HeaderType, config.Requests.HeaderJson)
delResp, delResperr := http.DefaultClient.Do(delReq)
if delResperr != nil {
alert := " Error deleting mail from OUTBOX DB \n" + delResperr.Error()
alertAdmin(alert)
}
defer delResp.Body.Close()
ioutil.ReadAll(delResp.Body)
deleteTimeOutbox := time.Now().String()
outboxDeleteTime <- (deleteTimeOutbox)
} else {
mailData <- "ko"
outboxReadTime <- ""
outboxDeleteTime <- ""
}
}
}
close(mailData)
close(outboxReadTime)
close(outboxDeleteTime)
waitGroup.Wait()
}
func worker(i int) {
// get configurations
var config Config
source, sourceErr := ioutil.ReadFile("config.yaml")
if sourceErr != nil {
panic(sourceErr)
}
configErr := yaml.Unmarshal(source, &config)
if configErr != nil {
panic(configErr)
}
//fmt.Println("Worker ", i, " strats working")
defer func() {
fmt.Println("Destroying the worker...")
waitGroup.Done()
}()
for {
newMail, ok := <-mailData
readTimeOutbox := <-outboxReadTime
deleteTimeOutbox := <-outboxDeleteTime
if !ok {
fmt.Println("The channel is closed!")
alertAdmin(" Error passing mail data into the channel")
break
}
if newMail == "ko" {
fmt.Println("Waiting for new mail")
} else {
/**
* structuring the email document
*
*/
var mail MailBody
mailErr := json.Unmarshal([]byte(newMail), &mail)
if mailErr != nil {
alert := " Error structuring mail data \n" + mailErr.Error()
alertAdmin(alert)
}
//fmt.Println("worker ", i, " - ", newMail)
//newMailID := mail.ID
//newMailRev := mail.Rev
//workerNo := strconv.Itoa(i)
sender := mail.From
subject := mail.Subject
body := mail.Body
recipient := mail.To
cc := mail.Cc
// sending mail via Mailgun
mg := mailgun.NewMailgun(config.Mailgun.Domain, config.Mailgun.Apikey)
message := mg.NewMessage(sender, subject, body, recipient)
/**
* Add CC if cc recipiants are available
*/
if len(cc) > 0 {
message.AddCC(cc)
}
MailgunResp, id, err := mg.Send(message)
if err != nil {
log.Fatal(err)
//alertAdmin(" MAilgun Error \n" + err.Error())
}
//fmt.Println(resp)
if config.AppMod.Production == "0" {
fmt.Printf("Email sent ID: %s Resp: %s\n", id, MailgunResp)
}
//sendMessage(mg, sender, subject, body, recipient, cc)
mailSentTimeMailgun := time.Now().String()
/**
* add the email as new document in "SENT ITEMS" DB
*/
mailBody := strings.NewReader(`{"from":"` + sender + `","to":"` + recipient + `","subject":"` + subject + `","body":"` + body + `"}`)
req, mailBodyerr := http.NewRequest(config.Requests.Put, config.Curl.AddSentMail+mail.ID, mailBody)
if mailBodyerr != nil {
alert := " Error requesting newmail to add in SENTITEMS DB \n" + mailBodyerr.Error()
alertAdmin(alert)
}
req.Header.Set(config.Requests.HeaderType, config.Requests.HeaderJson)
resp, respErr := http.DefaultClient.Do(req)
if respErr != nil {
alert := " Error adding newmail to SENTITEMS DB \n" + respErr.Error()
alertAdmin(alert)
}
defer resp.Body.Close()
ioutil.ReadAll(resp.Body)
mailAddedSentitems := time.Now().String()
logmail(subject, sender, recipient, readTimeOutbox, mailSentTimeMailgun, id, MailgunResp, mailAddedSentitems, deleteTimeOutbox, mail.ID)
}
childs--
time.Sleep(time.Millisecond * 800)
}
}
/**
* Mailgun email sending function
*/
func sendMessage(mg mailgun.Mailgun, sender, subject, body, recipient, cc string) {
message := mg.NewMessage(sender, subject, body, recipient)
/**
* Add CC if cc recipiants are available
*/
if len(cc) > 0 {
message.AddCC(cc)
}
resp, id, err := mg.Send(message)
if err != nil {
log.Fatal(err)
//alertAdmin(" MAilgun Error \n" + err.Error())
}
//fmt.Println(resp)
// get configurations
var config Config
source, sourceErr := ioutil.ReadFile("config.yaml")
if sourceErr != nil {
panic(sourceErr)
}
configErr := yaml.Unmarshal(source, &config)
if configErr != nil {
panic(configErr)
}
if config.AppMod.Production == "0" {
fmt.Printf("Email sent ID: %s Resp: %s\n", id, resp)
}
}
/**
* this function is fired when the systems' functions thows errors
*/
func alertAdmin(err string) {
// get configurations
var config Config
source, sourceErr := ioutil.ReadFile("config.yaml")
if sourceErr != nil {
panic(sourceErr)
}
configErr := yaml.Unmarshal(source, &config)
if configErr != nil {
panic(configErr)
}
// sending mail via Mailgun
mg := mailgun.NewMailgun(config.Mailgun.Domain, config.Mailgun.Apikey)
body := "Email worker crashed : " + err
sendMessage(mg, config.ErrorReport.ErrorSender, config.ErrorReport.ErrorHead, body, config.ErrorReport.ErrorRecipients, "")
}
/**
* log worker tasks
*/
func logmail(subject, sender, recipient, readTimeOutbox, mailSentTimeMailgun, id, MailgunResp, mailAddedSentitems, deleteTimeOutbox, mailID string) {
// get configurations
var config Config
source, sourceErr := ioutil.ReadFile("config.yaml")
if sourceErr != nil {
panic(sourceErr)
}
configErr := yaml.Unmarshal(source, &config)
if configErr != nil {
panic(configErr)
}
logBody := strings.NewReader(`{"Mail Subject":"` + subject +
`","Mail sender":"` + sender +
`","Mail recipiants":"` + recipient +
`","OUTBOX read time":"` + readTimeOutbox +
`","Mail sent to mailgun":"` + mailSentTimeMailgun +
`","Mailgun ID":"` + id +
`","Mailgun Message":"` + MailgunResp +
`","Mail added to SENTITEMS":"` + mailAddedSentitems +
`","OUTBOX delete time":"` + deleteTimeOutbox + `"}`)
logReq, logBodyerr := http.NewRequest(config.Requests.Put, config.Curl.AddMailLog+mailID, logBody)
if logBodyerr != nil {
alert := " Error requesting mail_logs \n" + logBodyerr.Error()
alertAdmin(alert)
}
logReq.Header.Set(config.Requests.HeaderType, config.Requests.HeaderJson)
logResp, respErr := http.DefaultClient.Do(logReq)
if respErr != nil {
alert := " Error adding logs to mail_logs DB \n" + respErr.Error()
alertAdmin(alert)
}
defer logResp.Body.Close()
ioutil.ReadAll(logResp.Body)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment