1. <em id="vzzs9"></em>
      <tbody id="vzzs9"></tbody>

    2. <span id="vzzs9"></span>
      <progress id="vzzs9"></progress>
      首頁 運維干貨使用Go語言,25秒讀取16GB文件

      使用Go語言,25秒讀取16GB文件

      運維派隸屬馬哥教育旗下專業運維社區,是國內成立最早的IT運維技術社區,歡迎關注公眾號:yunweipai
      領取學習更多免費Linux云計算、Python、Docker、K8s教程關注公眾號:馬哥linux運維

      當今世界的任何計算機系統每天都會生成大量的日志或數據。隨著系統的發展,將調試數據存儲到數據庫中是不可行的,因為它們是不可變的,并且只能用于分析和解決故障。所以大部分公司傾向于將日志存儲在文件中,而這些文件通常位于本地磁盤中。

      使用Go語言,25秒讀取16GB文件插圖

      我們將使用Go語言,從一個大小為16GB的.txt或.log文件中提取日志。

      讓我們開始編碼……

      首先,我們打開文件。對于任何文件的IO,我們都將使用標準的Go os.File。

      f, err := os.Open(fileName)
       if err != nil {
         fmt.Println("cannot able to read the file", err)
         return
       }
      // UPDATE: close after checking error
      defer file.Close()  //Do not forget to close the file
      

      打開文件后,我們有以下兩個選項可以選擇:

      逐行讀取文件,這有助于減少內存緊張,但需要更多的時間。
      一次將整個文件讀入內存并處理該文件,這將消耗更多內存,但會顯著減少時間。

      由于文件太大,即16 GB,因此無法將整個文件加載到內存中。但是第一種選擇對我們來說也是不可行的,因為我們希望在幾秒鐘內處理文件。

      但你猜怎么著,還有第三種選擇。瞧……相比于將整個文件加載到內存中,在Go語言中,我們還可以使用bufio.NewReader()將文件分塊加載。

      r := bufio.NewReader(f)
      for {
      buf := make([]byte,4*1024) //the chunk size
      n, err := r.Read(buf) //loading chunk into buffer
         buf = buf[:n]
      if n == 0 {
      
           if err != nil {
             fmt.Println(err)
             break
           }
           if err == io.EOF {
             break
           }
           return err
        }
      }
      

      一旦我們將文件分塊,我們就可以分叉一個線程,即Go routine,同時處理多個文件區塊。上述代碼將修改為:

      //sync pools to reuse the memory and decrease the preassure on Garbage Collector
      linesPool := sync.Pool{New: func() interface{} {
              lines := make([]byte, 500*1024)
              return lines
      }}
      stringPool := sync.Pool{New: func() interface{} {
                lines := ""
                return lines
      }}
      slicePool := sync.Pool{New: func() interface{} {
                 lines := make([]string, 100)
                 return lines
      }}
      r := bufio.NewReader(f)
      var wg sync.WaitGroup //wait group to keep track off all threads
      for {
      
           buf := linesPool.Get().([]byte)
           n, err := r.Read(buf)
           buf = buf[:n]
      if n == 0 {
              if err != nil {
                  fmt.Println(err)
                  break
              }
              if err == io.EOF {
                  break
              }
              return err
           }
      nextUntillNewline, err := r.ReadBytes('\n')//read entire line
      
           if err != io.EOF {
               buf = append(buf, nextUntillNewline...)
           }
      
           wg.Add(1)
           go func() { 
      
              //process each chunk concurrently
              //start -> log start time, end -> log end time
      
              ProcessChunk(buf, &linesPool, &stringPool, &slicePool,     start, end)
      wg.Done()
      
           }()
      }
      wg.Wait()
      }
      

      上面的代碼,引入了兩個優化點:

      sync.Pool是一個強大的對象池,可以重用對象來減輕垃圾收集器的壓力。我們將重用各個分片的內存,以減少內存消耗,大大加快我們的工作。
      Go Routines幫助我們同時處理緩沖區塊,這大大提高了處理速度。

      現在讓我們實現ProcessChunk函數,它將處理以下格式的日志行。

      2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,...\n
      

      我們將根據命令行提供的時間戳提取日志。

      func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) {
      //another wait group to process every chunk further                             
            var wg2 sync.WaitGroup
      logs := stringPool.Get().(string)
      logs = string(chunk)
      linesPool.Put(chunk) //put back the chunk in pool
      //split the string by "\n", so that we have slice of logs
            logsSlice := strings.Split(logs, "\n")
      stringPool.Put(logs) //put back the string pool
      chunkSize := 100 //process the bunch of 100 logs in thread
      n := len(logsSlice)
      noOfThread := n / chunkSize
      if n%chunkSize != 0 { //check for overflow 
               noOfThread++
            }
      length := len(logsSlice)
      //traverse the chunk
           for i := 0; i < length; i += chunkSize {
      
               wg2.Add(1)
      //process each chunk in saperate chunk
               go func(s int, e int) {
                  for i:= s; i<e;i++{
                     text := logsSlice[i]
      if len(text) == 0 {
                        continue
                     }
      
                  logParts := strings.SplitN(text, ",", 2)
                  logCreationTimeString := logParts[0]
                  logCreationTime, err := time.Parse("2006-01-  02T15:04:05.0000Z", logCreationTimeString)
      if err != nil {
                       fmt.Printf("\n Could not able to parse the time :%s       for log : %v", logCreationTimeString, text)
                       return
                  }
      // check if log's timestamp is inbetween our desired period
                if logCreationTime.After(start) && logCreationTime.Before(end) {
      
                  fmt.Println(text)
                 }
              }
              textSlice = nil
              wg2.Done()
      
           }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
         //passing the indexes for processing
      }  
         wg2.Wait() //wait for a chunk to finish
         logsSlice = nil
      }
      

      對上面的代碼進行基準測試。以16 GB的日志文件為例,提取日志所需的時間約為25秒。

      完整的代碼示例如下:

      func main() {
      
       s := time.Now()
       args := os.Args[1:]
       if len(args) != 6 { // for format  LogExtractor.exe -f "From Time" -t "To Time" -i "Log file directory location"
        fmt.Println("Please give proper command line arguments")
        return
       }
       startTimeArg := args[1]
       finishTimeArg := args[3]
       fileName := args[5]
      
       file, err := os.Open(fileName)
      
       if err != nil {
        fmt.Println("cannot able to read the file", err)
        return
       }
      
       defer file.Close() //close after checking err
      
       queryStartTime, err := time.Parse("2006-01-02T15:04:05.0000Z", startTimeArg)
       if err != nil {
        fmt.Println("Could not able to parse the start time", startTimeArg)
        return
       }
      
       queryFinishTime, err := time.Parse("2006-01-02T15:04:05.0000Z", finishTimeArg)
       if err != nil {
        fmt.Println("Could not able to parse the finish time", finishTimeArg)
        return
       }
      
       filestat, err := file.Stat()
       if err != nil {
        fmt.Println("Could not able to get the file stat")
        return
       }
      
       fileSize := filestat.Size()
       offset := fileSize - 1
       lastLineSize := 0
      
       for {
        b := make([]byte, 1)
        n, err := file.ReadAt(b, offset)
        if err != nil {
         fmt.Println("Error reading file ", err)
         break
        }
        char := string(b[0])
        if char == "\n" {
         break
        }
        offset--
        lastLineSize += n
       }
      
       lastLine := make([]byte, lastLineSize)
       _, err = file.ReadAt(lastLine, offset+1)
      
       if err != nil {
        fmt.Println("Could not able to read last line with offset", offset, "and lastline size", lastLineSize)
        return
       }
      
       logSlice := strings.SplitN(string(lastLine), ",", 2)
       logCreationTimeString := logSlice[0]
      
       lastLogCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)
       if err != nil {
        fmt.Println("can not able to parse time : ", err)
       }
      
       if lastLogCreationTime.After(queryStartTime) && lastLogCreationTime.Before(queryFinishTime) {
        Process(file, queryStartTime, queryFinishTime)
       }
      
       fmt.Println("\nTime taken - ", time.Since(s))
      }
      
      func Process(f *os.File, start time.Time, end time.Time) error {
      
       linesPool := sync.Pool{New: func() interface{} {
        lines := make([]byte, 250*1024)
        return lines
       }}
      
       stringPool := sync.Pool{New: func() interface{} {
        lines := ""
        return lines
       }}
      
       r := bufio.NewReader(f)
      
       var wg sync.WaitGroup
      
       for {
        buf := linesPool.Get().([]byte)
      
        n, err := r.Read(buf)
        buf = buf[:n]
      
        if n == 0 {
         if err != nil {
          fmt.Println(err)
          break
         }
         if err == io.EOF {
          break
         }
         return err
        }
      
        nextUntillNewline, err := r.ReadBytes('\n')
      
        if err != io.EOF {
         buf = append(buf, nextUntillNewline...)
        }
      
        wg.Add(1)
        go func() {
         ProcessChunk(buf, &linesPool, &stringPool, start, end)
         wg.Done()
        }()
      
       }
      
       wg.Wait()
       return nil
      }
      
      func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, start time.Time, end time.Time) {
      
       var wg2 sync.WaitGroup
      
       logs := stringPool.Get().(string)
       logs = string(chunk)
      
       linesPool.Put(chunk)
      
       logsSlice := strings.Split(logs, "\n")
      
       stringPool.Put(logs)
      
       chunkSize := 300
       n := len(logsSlice)
       noOfThread := n / chunkSize
      
       if n%chunkSize != 0 {
        noOfThread++
       }
      
       for i := 0; i < (noOfThread); i++ {
      
        wg2.Add(1)
        go func(s int, e int) {
         defer wg2.Done() //to avaoid deadlocks
         for i := s; i < e; i++ {
          text := logsSlice[i]
          if len(text) == 0 {
           continue
          }
          logSlice := strings.SplitN(text, ",", 2)
          logCreationTimeString := logSlice[0]
      
          logCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)
          if err != nil {
           fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)
           return
          }
      
          if logCreationTime.After(start) && logCreationTime.Before(end) {
           //fmt.Println(text)
          }
         }
      
      
        }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
       }
      
       wg2.Wait()
       logsSlice = nil
      }
      

      原文鏈接:https://medium.com/swlh/processing-16gb-file-in-seconds-go-lang-3982c235dfa2
      (版權歸原作者所有,侵刪)

      本文鏈接:http://www.abandonstatusquo.com/39959.html

      網友評論comments

      發表評論

      您的電子郵箱地址不會被公開。

      暫無評論

      Copyright ? 2012-2022 YUNWEIPAI.COM - 運維派 京ICP備16064699號-6
      掃二維碼
      掃二維碼
      返回頂部
      久久久久亚洲国内精品|亚洲一区二区在线观看综合无码|欧洲一区无码精品色|97伊人久久超碰|一级a爱片国产亚洲精品