/* Syntax: dagtweets Bloknummer is een integer. Geldige waarden zijn 0 t/m - 1. De waarde voor is in het programma gedefinieerd. Dit programma moet elke dag voor alle blokwaardes worden aangeroepen, in de juiste volgorde. Als er vier blokken gebruikt worden, dan zo: dagtweets 0 dagtweets 1 dagtweets 2 dagtweets 3 */ package main import ( "bufio" "compress/gzip" "encoding/json" "fmt" "github.com/pebbe/textcat" "html" "io" "labix.org/v2/mgo" "labix.org/v2/mgo/bson" "os" "regexp" "runtime" "strconv" "strings" "time" ) var ( servers = "127.0.0.1:27017" data = make([]byte, 0, 100000) intervalSize = 5000 tijdlijst = make([]float64, 0) countlijst = make([]int, 0) intervalmap = make(map[string]int) dagmap = make(map[string][]counter) blocks = 4 // 24/blocks moet een integer zijn. Geldige waarden: 1 2 3 4 6 8 12 24 reWord = regexp.MustCompile("(" + // url "[hH][tT][tT][pP][sS]?:([-A-Za-z0-9\\._~:/?#\\[\\]@!$&'\\(\\)\\*\\+,;=]|%[0-9a-fA-f][0-9a-fA-f])*" + "|" + // hashtag "#[\\p{L}0-9]+" + "|" + // mention "@[a-zA-Z0-9_]+" + "|" + // word "\\p{L}-?\\p{L}+(-\\p{L}+)*" + ")") ) type stTweetScan struct { Id_str string Created_at string Text string User stUserScan } type stUserScan struct { Lang string } type counter struct { interval int fract float64 } func main() { var count, interval int var tsum float64 checkErr(os.Chdir("/net/corpora/twitter2/Tweets")) location, err := time.LoadLocation("Europe/Amsterdam") checkErr(err) tc := textcat.NewTextCat() tc.EnableAllUtf8Languages() tc.DisableLanguages("af.utf8", "fy.utf8") gisteren := time.Now().In(location).AddDate(0, 0, -1) dag := gisteren.Day() datum := fmt.Sprintf("%04d%02d%02d", gisteren.Year(), gisteren.Month(), dag) session, err := mgo.Dial(servers) checkErr(err) defer session.Close() //session.SetMode(mgo.Monotonic, true) //session.SetSafe(&mgo.Safe{}) cCounts := session.DB("twitter" + datum).C("counts") cTimes := session.DB("twitter" + datum).C("times") cTweets := session.DB("twitter" + datum).C("tweets") cStatus := session.DB("twitter" + datum).C("status") b, err := strconv.ParseInt(os.Args[1], 10, 0) checkErr(err) block := int(b) if block == 0 { checkErr(session.DB("twitter" + datum).DropDatabase()) } blocksize := 24 / blocks for uur := block * blocksize; uur < (block+1)*blocksize; uur++ { filename := fmt.Sprintf("%04d/%02d/%s:%02d.out.gz", gisteren.Year(), gisteren.Month(), datum, uur) _, e := os.Stat(filename) if e != nil { continue } f, err := os.Open(filename) checkErr(err) r, err := gzip.NewReader(f) checkErr(err) rd := bufio.NewReaderSize(r, 100000) zone := "" lineno := 0 for { eof := getline(rd) if eof { r.Close() f.Close() break } lineno += 1 tweetScan := stTweetScan{} err := json.Unmarshal(data, &tweetScan) if err != nil { // fmt.Fprintf(os.Stderr, "%v:%v: %v: %v\n", file.name, lineno, err, string(data)) continue } if tweetScan.Id_str == "" { continue } // Sla niet-Nederlandse tweets over. // Gebruikers die Nederlandse tweets posten hebben gewoonlijk hun // taalvoorkeur op "nl" gezet, of niet ingesteld, en dan is het "en". if tweetScan.User.Lang[:2] != "nl" && tweetScan.User.Lang[:2] != "en" { continue } // sanitise tweetScan.Text = strings.Join(strings.Fields(html.UnescapeString(tweetScan.Text)), " ") // tokenizer words := strings.Fields(reWord.ReplaceAllString(tweetScan.Text, " $1 ")) // taalrader w := []string{} for _, ww := range words { if !(ww == "RT" || strings.HasPrefix(ww, "@") || strings.HasPrefix(ww, "#") || strings.HasPrefix(ww, "http:") || strings.HasPrefix(ww, "https:")) { w = append(w, ww) } } language, _ := tc.Classify(strings.Join(w, " ")) if len(language) != 1 || language[0] != "nl.utf8" { continue } // parsed date and time t, err := time.Parse("Mon Jan 02 15:04:05 -0700 2006", tweetScan.Created_at) checkErr(err) t = t.In(location) if zone == "" { zone, _ = t.Zone() } z, _ := t.Zone() if t.Hour() != uur || t.Day() != dag || z != zone { continue } for i, w := range words { words[i] = strings.ToLower(w) } checkErr(cTweets.Insert(bson.M{"text": tweetScan.Text, "words": words, "hour": uur, "block": block, "interval": interval})) tsum += float64(t.Hour()) + float64(t.Minute())/60.0 + float64(t.Second())/3600 seen := make(map[string]bool) for _, w := range words { _, found := seen[w] if !found { intervalmap[w] += 1 seen[w] = true } } count++ if count == intervalSize { doInterval(interval, count, tsum) count = 0 tsum = 0.0 interval++ } } // range line in file } // range uur in dag if count > intervalSize/2 { doInterval(interval, count, tsum) interval++ } else { _, e := cTweets.RemoveAll(bson.M{"block": block, "interval": interval}) checkErr(e) } for w, cntrs := range dagmap { tellers := make([]float64, interval) for _, cntr := range cntrs { tellers[cntr.interval] = cntr.fract } checkErr(cCounts.Insert(bson.M{"word": w, "fracts": tellers, "block": block})) } checkErr(cTimes.Insert(bson.M{"times": tijdlijst, "counts": countlijst, "block": block})) if block == blocks-1 { checkErr(cCounts.EnsureIndexKey("word")) checkErr(cTweets.EnsureIndexKey("hour")) checkErr(cTweets.EnsureIndexKey("words")) checkErr(cStatus.Insert(bson.M{"ready": true})) } } func doInterval(interval, count int, tsum float64) { for wrd, n := range intervalmap { if n > 0 { _, ok := dagmap[wrd] if !ok { dagmap[wrd] = make([]counter, 0) } dagmap[wrd] = append(dagmap[wrd], counter{interval, float64(n) / float64(count)}) } intervalmap[wrd] = 0 } tijdlijst = append(tijdlijst, tsum/float64(count)) countlijst = append(countlijst, count) } func getline(r *bufio.Reader) (eof bool) { data = data[0:0] for { line, isP, err := r.ReadLine() if err == io.EOF { eof = true break } checkErr(err) data = append(data, line...) if !isP { break } } return } func checkErr(err error) { if err != nil { _, filename, lineno, ok := runtime.Caller(1) if ok { fmt.Fprintf(os.Stderr, "%v:%v: %v\n", filename, lineno, err) } panic(err) } }