package main import ( "encoding/csv" "fmt" "io" "log" "net/http" "os" "strconv" "strings" "github.com/gocql/gocql" ) type CsvLine struct { StationId string DatapointId int AlarmId int EventTime string Value float64 ValueThreshold float64 IsActive bool } var SESSION *gocql.Session // 1. Read a CSV file line-by-line (from local file) func readFromFile(filepath string) (err error) { file, err := os.Open(filepath) if err != nil { return err } defer file.Close() reader := csv.NewReader(file) for { record, err := reader.Read() if err == io.EOF { break } if err != nil { fmt.Printf("%s\n", err) continue } processRecord(record) } return nil } // 1. Read a CSV file line-by-line (from remote file) func readFromUrl(url string) (err error) { resp, err := http.Get(url) if err != nil { return err } defer resp.Body.Close() reader := csv.NewReader(resp.Body) // note: this part is exactly the same as in the readFromFile function above // so it could be refactored to reuse the same code // but then the code in the blog post would have been a lot less readable for { record, err := reader.Read() if err == io.EOF { break } if err != nil { fmt.Printf("%s", err) continue } processRecord(record) } return nil } // 2. Process the records in each line func processRecord(line []string) { if len(line) < 7 { fmt.Println("Invalid length, discarding line...") return } if line[0] == "station_id" { // ignore first line return } // note: error checking omitted for brevity stationId := line[0] datapointId, _ := strconv.Atoi(line[1]) alarmId, _ := strconv.Atoi(line[2]) eventTime := line[3] value, _ := strconv.ParseFloat(line[4], 64) valueThreshold, _ := strconv.ParseFloat(line[5], 64) active, _ := strconv.ParseBool(line[6]) buf := CsvLine{ StationId: stationId, DatapointId: datapointId, AlarmId: alarmId, EventTime: eventTime, Value: value, ValueThreshold: valueThreshold, IsActive: active, } insertIntoDb(buf) } // 3. Insert the values into the database func insertIntoDb(record CsvLine) { err := SESSION.Query(`INSERT INTO events (stationid, datapointid, alarmid, eventtime, value, valuethreshold, active) VALUES (?, ?, ?, ?, ?, ?, ?)`, record.StationId, record.DatapointId, record.AlarmId, record.EventTime, record.Value, record.ValueThreshold, record.IsActive, ).Exec() if err != nil { fmt.Printf("Insert failed: %s\n", err) } } func main() { // 1. get configuration from command line if len(os.Args) < 3 { fmt.Println("Usage: ./ingest URI CASSANDRASERVER") return } datasource := os.Args[1] cassandraUri := os.Args[2] // 2. connect to database cluster := gocql.NewCluster(cassandraUri) cluster.Keyspace = "exampleCSV" cluster.Consistency = gocql.Quorum session, err := cluster.CreateSession() if err != nil { log.Fatal(err) } defer session.Close() // make global SESSION = session // 3. read data from appropriate source if strings.HasPrefix(datasource, "http") { err = readFromUrl(datasource) } else { err = readFromFile(datasource) } if err != nil { log.Fatal(err) } return }