/* Put tweets from gzip'ed json (one tweet per line) into MongoDB. Usage: inserttweets *.gz */ package main import ( "encoding/json" "fmt" "github.com/pebbe/textcat" "github.com/pebbe/util" "html" "labix.org/v2/mgo" "os" "regexp" "strings" "time" ) var ( servers = "127.0.0.1:27017" reWord = regexp.MustCompile("(" + // url "[hH][tT][tT][pP][sS]?:([-A-Za-z0-9\\._~:/?#\\[\\]@!$&'\\(\\)\\*\\+,;=]|%[0-9a-fA-f][0-9a-fA-f])*" + "|" + // hashtag "#[\\p{L}0-9]+" + "|" + // mention "@[a-zA-Z0-9_]+" + "|" + // word "\\p{L}-?\\p{L}+(-\\p{L}+)*" + ")") tweetDel = strings.Fields(` annotations contributors current_user_retweet favorited geo id in_reply_to_status_id in_reply_to_user_id possibly_sensitive scopes retweet_count retweeted withheld_copyright withheld_in_countries withheld_scope`) userDel = strings.Fields(` contributors_enabled created_at default_profile default_profile_image description entities favourites_count follow_request_sent following followers_count friends_count geo_enabled id is_translator listed_count notifications profile_background_color profile_background_image_url profile_background_image_url_https profile_background_tile profile_banner_url profile_image_url profile_image_url_https profile_link_color profile_sidebar_border_color profile_sidebar_fill_color profile_text_color profile_use_background_image protected show_all_inline_media status statuses_count time_zone url utc_offset verified withheld_in_countries withheld_scope`) ) func main() { var cTweets *mgo.Collection var currentDB string if len(os.Args) > 1 && os.Args[1] == "-p" { servers = "127.0.0.1:27018" os.Args = append(os.Args[:1], os.Args[2:]...) } fmt.Println("Using server: ", servers) session, err := mgo.Dial(servers) util.CheckErr(err) defer session.Close() //session.SetMode(mgo.Monotonic, true) //session.SetSafe(&mgo.Safe{}) tc := textcat.NewTextCat() tc.EnableAllUtf8Languages() tc.DisableLanguages("af.utf8", "fy.utf8") tc.SetMinDocSize(10) // dat is *erg* kort (default is 25) location, err := time.LoadLocation("Europe/Amsterdam") util.CheckErr(err) for _, filename := range os.Args[1:] { fmt.Fprintln(os.Stderr, filename) r, err := util.NewLinesReaderFromFile(filename) util.CheckErr(err) lineno := 0 for line := range r.ReadLinesBytes() { lineno += 1 tweet := make(map[string]interface{}) meta := make(map[string]interface{}) err := json.Unmarshal(line, &tweet) if err != nil { fmt.Fprintf(os.Stderr, "%v:%v: %v: %v\n", filename, lineno, err, string(line)) continue } id_str := asStr(get(tweet, "id_str")) if id_str == "" { if get(tweet, "limit") == nil { fmt.Fprintf(os.Stderr, "%v:%v: Not a tweet: %v\n", filename, lineno, string(line)) } continue } user := get(tweet, "user").(map[string]interface{}) // Sla niet-Nederlandse tweets over. // Gebruikers die Nederlandse tweets posten hebben gewoonlijk hun // taalvoorkeur op "nl" gezet, of niet ingesteld, en dan is het "en". lang := asStr(get(user, "lang")) if len(lang) < 2 || (lang[:2] != "nl" && lang[:2] != "en") { continue } // tokenizer text := html.UnescapeString(asStr(get(tweet, "text"))) words := strings.Fields(reWord.ReplaceAllString(text, " $1 ")) // taalrader w := []string{} for _, ww := range words { if !(ww == "RT" || strings.HasPrefix(ww, "@") || strings.HasPrefix(ww, "#") || strings.HasPrefix(ww, "http:") || strings.HasPrefix(ww, "https:")) { w = append(w, ww) } } language, err := tc.Classify(strings.Join(w, " ")) if err != nil { continue } found := false for i, l := range language { language[i] = strings.Replace(l, ".utf8", "", -1) if l == "nl.utf8" { found = true } } if !found { continue } meta["langs"] = language meta["n_lang"] = len(language) meta["lang_1"] = language[0] for i, w := range words { words[i] = strings.ToLower(w) } meta["words"] = words // "retweeted_status" staat niet vermeld op https://dev.twitter.com/docs/platform-objects/tweets // maar is wel aanwezig in onze data. rs := get(tweet, "retweeted_status", "id_str") if rs != nil { meta["retweeted_id"] = rs meta["retweeted_user_id"] = get(tweet, "retweeted_status", "user", "id_str") meta["retweeted_user_screen_name"] = get(tweet, "retweeted_status", "user", "screen_name") delete(tweet, "retweeted_status") } // parsed date and time t, err := time.Parse("Mon Jan 02 15:04:05 -0700 2006", asStr(get(tweet, "created_at"))) util.CheckErr(err) t = t.In(location) zone, _ := t.Zone() meta["date"] = fmt.Sprintf("%04d-%02d-%02d %02d %s %02d:%02d", t.Year(), t.Month(), t.Day(), t.Hour(), zone, t.Minute(), t.Second()) meta["weekday"] = t.Weekday().String()[:3] meta["hour"] = t.Hour() tweet["META"] = meta for _, d := range tweetDel { delete(tweet, d) } for _, d := range userDel { delete(user, d) } db := fmt.Sprintf("twitter-%04d-%02d", t.Year(), t.Month()) if db != currentDB { if currentDB != "" { indexen(cTweets) } currentDB = db cTweets = session.DB(db).C("tweets") } err = cTweets.Insert(tweet) if err != nil { fmt.Fprintf(os.Stderr, "%v:%v:Insert tweet errror: %v\n", filename, lineno, err) } } } if currentDB != "" { indexen(cTweets) } } func indexen(c *mgo.Collection) { index := mgo.Index{ Key: []string{"id_str"}, DropDups: true, Unique: true} coord := mgo.Index{ Key: []string{"@coordinates.coordinates"}, Sparse: true} util.CheckErr(c.EnsureIndex(index)) util.CheckErr(c.EnsureIndex(coord)) util.CheckErr(c.EnsureIndexKey("META.date")) util.CheckErr(c.EnsureIndexKey("META.hour")) util.CheckErr(c.EnsureIndexKey("META.lang_1")) util.CheckErr(c.EnsureIndexKey("META.n_lang")) util.CheckErr(c.EnsureIndexKey("META.weekday")) util.CheckErr(c.EnsureIndexKey("META.words")) } func get(st interface{}, path ...string) interface{} { var ok bool s := st for _, p := range path { switch i := s.(type) { case map[string]interface{}: s, ok = i[p] if !ok { return nil } default: return nil } } return s } func asStr(a interface{}) string { switch s := a.(type) { case string: return s case nil: return "" } return fmt.Sprintf("%v", a) }