/* Put tweets from gzip'ed json (one tweet per line) into MongoDB. Usage: compleet *.gz */ package main /* #cgo CFLAGS: -I./tokenizer #cgo LDFLAGS: -L./tokenizer -ltokenizer #include #include #include #include */ import "C" import ( "bufio" "compress/gzip" "encoding/json" "fmt" "github.com/pebbe/libtextcat" "html" "io" "labix.org/v2/mgo" "labix.org/v2/mgo/bson" "os" "os/exec" "runtime" "strconv" "strings" "time" "unsafe" ) var ( servers = "127.0.0.1:27017" data = make([]byte, 0, 100000) datum = "20120905" ) func main() { setLocale() /* required for tokenizer */ location, err := time.LoadLocation("Europe/Amsterdam") checkErr(err) fmt.Println("Using server: ", servers) session, err := mgo.Dial(servers) checkErr(err) defer session.Close() //session.SetMode(mgo.Monotonic, true) //session.SetSafe(&mgo.Safe{}) cTweets := session.DB("compleet" + datum).C("tweets") index := mgo.Index{ Key: []string{"id_str"}, Unique: true, } checkErr(cTweets.EnsureIndex(index)) tc, err := libtextcat.NewTextcat("/net/aistaff/kleiweg/Twitter/textcat-conf.txt") checkErr(err) d, err := strconv.ParseInt(datum[6:], 10, 0) checkErr(err) dag := int(d) for uur := 0; uur < 24; uur++ { filename := fmt.Sprintf("/net/corpora/twitter/000RAW/%s/%s:%02d.out.gz", datum[:6], datum, uur) _, e := os.Stat(filename) if e != nil { continue } fmt.Fprintln(os.Stderr, filename) file, err := os.Open(filename) checkErr(err) r, err := gzip.NewReader(file) checkErr(err) rd := bufio.NewReaderSize(r, 100000) zone := "" lineno := 0 for { eof := getline(rd) if eof { r.Close() file.Close() break } lineno += 1 tweet := map[string]interface{}{} err := json.Unmarshal(data, &tweet) if err != nil { fmt.Fprintf(os.Stderr, "%v:%v: %v: %v\n", filename, lineno, err, string(data)) continue } tweet_id_str, ok1 := tweet["id_str"] _, ok2 := tweet["user"] _, ok3 := tweet["created_at"] if !(ok1 && ok2 && ok3) { if _, ok := tweet["limit"]; !ok { fmt.Fprintf(os.Stderr, "%v:%v: Not a tweet: %v\n", filename, lineno, string(data)) } continue } // parsed date and time t, err := time.Parse("Mon Jan 02 15:04:05 -0700 2006", tweet["created_at"].(string)) checkErr(err) t = t.In(location) if zone == "" { zone, _ = t.Zone() } z, _ := t.Zone() if t.Hour() != uur || t.Day() != dag || z != zone { continue } tweet["TIME"] = t.Hour() + t.Minute()/60.0 + t.Second()/3600.0 // tokenizer text := html.UnescapeString(tweet["text"].(string)) cs := C.CString(text) text2 := C.GoString(C.tokenize(cs)) C.free(unsafe.Pointer(cs)) if C.tok_error != C.TOK_OK { fmt.Fprintf(os.Stderr, "%v:%v: %v: %v\n", filename, lineno, text2, text) text2 = text } words := strings.Fields(text2) tweet["TOKENS"] = words // taalrader w := []string{} for _, ww := range words { if !(ww == "RT" || strings.HasPrefix(ww, "@") || strings.HasPrefix(ww, "#") || strings.HasPrefix(ww, "http:") || strings.HasPrefix(ww, "https:")) { w = append(w, ww) } } language := tc.Classify(strings.Join(w, " ")) tlist := strings.SplitAfter(language, "]") if tlist[len(tlist) - 1] == "" { tlist = tlist[:len(tlist) - 1] } tweet["TEXTCAT"] = tlist _, e2 := cTweets.Upsert(bson.M{"id_str": tweet_id_str}, tweet) if e2 != nil { fmt.Fprintf(os.Stderr, "%s:%s:Insert tweet errror: %s\n", filename, lineno, e2) } } } } func getline(r *bufio.Reader) (eof bool) { data = data[0:0] for { line, isP, err := r.ReadLine() if err == io.EOF { eof = true break } checkErr(err) data = append(data, line...) if !isP { break } } return } func checkErr(err error) { if err != nil { _, filename, lineno, ok := runtime.Caller(1) if ok { fmt.Fprintf(os.Stderr, "%v:%v: %v\n", filename, lineno, err) } panic(err) } } func setLocale() { loc := "" out, err := exec.Command("locale", "locale", "-a").Output() if err != nil { panic(err) } ll := strings.Fields(string(out)) for _, l := range ll { if strings.HasSuffix(strings.ToLower(l), ".utf8") { loc = l break } } if loc == "" { panic("command 'locale -a' returned nothing with utf8") } cs := C.CString(loc) C.setlocale(C.LC_CTYPE, cs) C.free(unsafe.Pointer(cs)) }