Archive for the ‘twitter’ Category
COLLABORATE Social Media Hour – Tue 4/24 1-2p Exhibit Hall (IOUG Booth)
Апрель 19th, 2012PlanetMySQL Voting: Vote UP / Vote DOWN
Twitter, Facebook MySQL trees online – pushing MySQL forward
Апрель 10th, 2012Just yesterday, I’m sure many saw Twitter opensourcing their MySQL implementation. It is based on MySQL 5.5 and the code is on Github.
For reference, the database team at Facebook has always been actively blogging, and keeping up their code available on Launchpad. Its worth noting that the implementation there is based on MySQL 5.0.84 and 5.1.
At Twitter, most of everything persistent is stored in MySQL – interest graphs, timelines, user data and those precious tweets themselves! At Facebook, its pretty similar – all user interactions like likes, shares, status updates, requests, etc. are all stored in MySQL (ref).
The media has picked up on it too. A fairly misinformed piece on GigaOm (MySQL has problems focused on Stonebrakers fate worst than death? Pfft. Facebook wants to move its code to github? Read the reasoning — its spam handling on LP.), and a shorter piece on CNET.
Both Twitter and Facebook code trees mention that its what they use in their environments, but it’s not supported in any way, shape or form. Facebook recommends Percona Server or MariaDB. Facebook also has tools like online schema change in the repository, amongst others like prefetching tools written in Python.
I haven’t had the chance to play with the Twitter release yet, but it looks like this can only push Percona Server and MariaDB forward. Based on 5.5, some of these BSD-licensed features can make it in, and some have already made it in I’m sure. And what pushes these servers, will push MySQL forward (see lots of new features in MySQL 5.6).
On a personal note, it is amazing to see some MySQL-alumni push this forward. At Twitter, there’s Jeremy Cole and Davi Arnaut. At Facebook, the team includes Domas Mituzas, Harrison Fisk, Yoshinori Matsunobu, Lachlan Mulcahy. Nothing would be complete without mentioning Mark Callaghan (though not-MySQL alumni, active MySQL community member) who led a MySQL team at Google, and now at Facebook.
Related posts:
- The Social Media Page Craze: Google+, Facebook, Twitter, LinkedIn
- Replication features of 2011 by Sergey Petrunia
- HTTPS for Twitter & Facebook – enable it!
PlanetMySQL Voting: Vote UP / Vote DOWN
Twitter bug found!
Март 3rd, 2012Some days ago while I’m looking for what are saying about a mysql.com server down I found a twitter bug:
Is not a big deal, to repeat this bug you must follow these steps:
1. Find any term, in this case “mysql.com” then in results looking for a word that have the search term as a part of them (ex dev.mysql.com) and select the a part or entire word:
2. Press Ctrl + C, some HTML codes appear from nowhere:
3. Do it again and again, you will see an strange twit like this:
I’m using Firefox 8.0 under Ubuntu, but you will get the same result using Chrome and (maybe) other web clients.
PlanetMySQL Voting: Vote UP / Vote DOWN
What have I been up to lately?
Сентябрь 2nd, 2011Despite my best intentions, I haven't posted on this blog for a while, which is a shame! I've become busy writing on so many other places since I moved into my new role in the Oracle Linux product management team in April. I've learned a lot and I am feeling quite at home here! The team is excellent and very nice to work with — I am slowly getting the "Big Picture".
But even though I've been neglecting this blog, there are a lot of things that are publicly visible and document some of my activites:
I've created two podcasts for the Oracle Linux podcasts: In addition to working the @ORCL_Linux Twitter account and FaceBook page, I've been blogging on the Oracle Linux blog:- Linux Filesystem, Storage and Memory Summit
- Follow us on YouTube and Facebook, too!
- Oracle Linux 6.1 has been released
- Oracle Linux 6.1 ISO images now available for download
- First episode of the Oracle Linux Podcast: Sunil Mushran on OCFS2
- New episode of the Oracle Linux Podcast: Chris Mason on Btrfs
- OTN Sys Admin Days - Oracle Solaris and Oracle Linux (Sep. 8th, Sacramento, CA)
- Pimp my Ride - Installing Additional Packages on Oracle Linux
- Save disk space on Linux by cloning files on Btrfs and OCFS2
- A new technical white paper about Oracle Linux and the Unbreakable Enterprise Kernel
- An article about How to Extract Files from an Oracle Linux RPM Package Without Installing the Package
- Added a new page about our Oracle Linux support offering
- Updated the OCFS2 product page
- Updated the Oracle Linux technical information page.
- MySQL High Availability at COLLABORATE 11, Orlando (FL), USA
- An introduction to Oracle Linux at the HEPiX Spring 2011 Workshop, Darmstadt, Germany
- What's new in MySQL 5.5 at LinuxTag 2011, Berlin Germany
- Updates on Oracle Linux and Oracle VM at fisl12, Porto Alegre, Brazil
Check out my followup blog post on what I'm up to in the coming weeks and months!
PlanetMySQL Voting: Vote UP / Vote DOWN
Real-time streaming data aggregation
Июль 28th, 2011Dear Kettle users,
Most of you usually use a data integration engine to process data in a batch-oriented way. Pentaho Data Integration (Kettle) is typically deployed to run monthly, nightly, hourly workloads. Sometimes folks run micro-batches of work every minute or so. However, it’s lesser known that our beloved transformation engine can also be used to stream data indefinitely (never ending) from a source to a target. This sort of data integration is sometimes referred to as being “streaming“, “real-time“, “near real-time“, “continuous” and so on. Typical examples of situations where you have a never-ending supply of data that needs to be processed the instance it becomes available are JMS (Java Message Service), RDBMS log sniffing, on-line fraud analyses, web or application log sniffing or of-course … Twitter! Since Twitter is easily accessed it’s common for examples to pop up regarding it’s usage and in this blog post too we will use this service to demo the Pentaho Data Integration capabilities wrt to processing streaming data.
Here’s what we want to do:
- Continuously read all the tweets that are being sent on Twitter.
- Extract all the hash-tags used
- Count the number of hash-tags used in a one-minute time-window
- Report on all the tags that are being used more than once
- Put the output in a browser window, continuously update every minute.
This is a very generic example but the logic of this can be applied to different fields like JMS, HL7, log sniffing and so on. It differs from the excellent work that Vincent from Open-BI described earlier this week on his blog in the sense that his Talend job finishes where ours will never end and where ours will do time-based aggregation in contrast to aggregation over a finite data set.
Also note that in order for Kettle to fully support multiple streaming data sources we would have to implement support for “windowed” (time-based) joins and other nifty things. We’ve seen very little demand for this sort of requirement in the past, perhaps because people don’t know it’s possible with Kettle. In any case, if you currently are in need of full streaming data support, have a look at SQLStream, they can help you. SQLStream is co-founded by Pentaho’s Julian Hyde of Mondrian fame.
OK, let’s see how we can solve our little problem with Kettle instead…
1. Continuously read all the tweets that are being sent on Twitter.
For this we are going to use one of the public Twitter web services, one that delivers a never-ending stream of JSON messages:
http://stream.twitter.com/1/statuses/sample.json?delimited=length
Since the format of the output is never-ending and specific in nature I wrote a small “User Defined Java Class” script:
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
HttpClient client = SlaveConnectionManager.getInstance().createHttpClient();
client.setTimeout(10000);
client.setConnectionTimeout(10000);
Credentials creds = new UsernamePasswordCredentials(getParameter("USERNAME"), getParameter("PASSWORD"));
client.getState().setCredentials(AuthScope.ANY, creds);
client.getParams().setAuthenticationPreemptive(true);
HttpMethod method = new PostMethod("http://stream.twitter.com/1/statuses/sample.json?delimited=length");
// Execute request
//
InputStream inputStream=null;
BufferedInputStream bufferedInputStream=null;
try {
int result = client.executeMethod(method);
// the response
//
inputStream = method.getResponseBodyAsStream();
bufferedInputStream = new BufferedInputStream(inputStream, 1000);
StringBuffer bodyBuffer = new StringBuffer();
int opened=0;
int c;
while ( (c=bufferedInputStream.read())!=-1 && !isStopped()) {
char ch = (char)c;
bodyBuffer.append(ch);
if (ch=='{') opened++; else if (ch=='}') opened--;
if (ch=='}' && opened==0) {
// one JSON block, pass it on!
//
Object[] r = createOutputRow(new Object[0], data.outputRowMeta.size());
String jsonString = bodyBuffer.toString();
int startIndex = jsonString.indexOf("{");
if (startIndex<0) startIndex=0;
// System.out.print("index="+startIndex+" json="+jsonString.substring(startIndex));
r[0] = jsonString.substring(startIndex);
putRow(data.outputRowMeta, r);
bodyBuffer.setLength(0);
}
}
} catch(Exception e) {
throw new KettleException("Unable to get tweets", e);
} finally {
bufferedInputStream.reset();
bufferedInputStream.close();
}
setOutputDone();
return false;
}
As the experienced UDJC writers among you will notice: this step never ends as long as the twitter service keeps on sending more data. Depending on the stability and popularity of twitter that can be “a very long time“.
You could improve the code even further to re-connect to the service every time it drops away. Personally I would not do this. I would rather have the transformation terminate with an error (as it is implemented now), send an alert (e-mail, database, SNMP) and re-start the transformation in a loop in a job. That way you have a trace in case twitter dies for a few hours.
2. Extract all the hash-tags used
First we’ll parse the JSON returned by the twitter service, extract the first 5 hash-tags from the message, split this up into 5 rows and count the tags…

3. Count the number of hash-tags used in a one-minute time-window
The counting is easy as you can simply use a “Group by” step. However, how can we aggregate in a time-based fashion without too much tinkering? Well, we now have the “Single Threader” step which has the option to aggregate in a time-based manner so we might as well use this option:

This step simply accumulates all records in memory until 60 seconds have passed and then performs one iteration of the single threaded execution of the specified transformation. This is a special execution method that doesn’t use the typical parallel engine. Another cool thing about this engine is that the records that go into the engine in the time-window can be grouped and sorted without the transformation being restarted every minute.
4. Report on all the tags that are being used more than once
The filtering is done with a simple “Filter Rows” step. However, thanks to the magic of the “Single Threader” step we can sort the rows descending by the tag occurrence count in that one-minute time-window. It’s also interesting to note that if you have huge amounts of data, that you can easily parallelize your work by starting multiple copies of the single threader step and/or with some clever data partitioning. In our case we could partition by hash-tag or re-aggregate the aggregated data.
5. Put the output in a browser window, continuously update every minute.
As shown in an earlier blog post, we can do this quite easily with a “Text File Output” step. However, we also want to put a small header and a separator between the data from every minute so we end up with a transformation that looks like this:

The script to print the header looks like this:
var out;
if (out==null) {
out = _step_.getTrans().getServletPrintWriter();
out.println("'Real-time' twitter hashtag report, minute based");
out.flush();
}
The separator between each minute is simple too:
if (nr==1) { var out = _step_.getTrans().getServletPrintWriter();out.println("============================================"); out.println();out.flush(); }
You can execute this transformation on a Carte instance (4.2.0) and see the following output:
'Real-time' twitter hashtag report, minute based ================================================= nr;hashtag;count;from;to 1;tatilmayonezi;5;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000 2;AUGUST6THBUZZNIGHTCLUB;3;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000 3;teamfollowback;3;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000 4;ayamzaman;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000 5;dnd;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000 6;follow;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000 7;malhação;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000 8;rappernames;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000 9;thingswelearnedontwitter;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000 ================================================= 1;ska;5;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 2;followplanetjedward;4;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 3;chistede3pesos;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 4;NP;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 5;rappernames;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 6;tatilmayonezi;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 7;teamfollowback;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 8;AvrilBeatsVolcano;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 9;CM6;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 10;followme;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 11;Leão;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 12;NewArtists;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 13;OOMF;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 14;RETWEET;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 15;sougofollow;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 16;swag;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 17;thingswelearnedontwitter;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 ...
For reference, I used the following URL to start the streaming report:
http://cluster:cluster@127.0.0.1:8282/kettle/executeTrans/?trans=%2Fhome%2Fmatt%2Ftest-stuff%2FTwitter Stream%2FRead a twitter stream.ktr&USERNAME=MyTwitterAccount&PASSWORD=MyPassword
I placed the complete example over here in case you want to try this yourself on PDI/Kettle version 4.2.0-RC1 or later. Things you can add to make it even cooler is to have this transformation send an e-mail every time a certain hash-tag gets used more than 15 times in a given minute. That sort of alerting support for example gives you easy access to emerging new trends, events and memes.
For reference, take a look at this earlier blog post of mine where I describe the internal cleanup mechanisms inside of Kettle that prevent our transformation from ever running out of memory or resources.
Until next time,
Matt
PlanetMySQL Voting: Vote UP / Vote DOWN
Real-time streaming data aggregation
Июль 28th, 2011Dear Kettle users,
Most of you usually use a data integration engine to process data in a batch-oriented way. Pentaho Data Integration (Kettle) is typically deployed to run monthly, nightly, hourly workloads. Sometimes folks run micro-batches of work every minute or so. However, it’s lesser known that our beloved transformation engine can also be used to stream data indefinitely (never ending) from a source to a target. This sort of data integration is sometimes referred to as being “streaming“, “real-time“, “near real-time“, “continuous” and so on. Typical examples of situations where you have a never-ending supply of data that needs to be processed the instance it becomes available are JMS (Java Message Service), RDBMS log sniffing, on-line fraud analyses, web or application log sniffing or of-course … Twitter! Since Twitter is easily accessed it’s common for examples to pop up regarding it’s usage and in this blog post too we will use this service to demo the Pentaho Data Integration capabilities wrt to processing streaming data.
Here’s what we want to do:
- Continuously read all the tweets that are being sent on Twitter.
- Extract all the hash-tags used
- Count the number of hash-tags used in a one-minute time-window
- Report on all the tags that are being used more than once
- Put the output in a browser window, continuously update every minute.
This is a very generic example but the logic of this can be applied to different fields like JMS, HL7, log sniffing and so on. It differs from the excellent work that Vincent from Open-BI described earlier this week on his blog in the sense that his Talend job finishes where ours will never end and where ours will do time-based aggregation in contrast to aggregation over a finite data set.
Also note that in order for Kettle to fully support multiple streaming data sources we would have to implement support for “windowed” (time-based) joins and other nifty things. We’ve seen very little demand for this sort of requirement in the past, perhaps because people don’t know it’s possible with Kettle. In any case, if you currently are in need of full streaming data support, have a look at SQLStream, they can help you. SQLStream is co-founded by Pentaho’s Julian Hyde of Mondrian fame.
OK, let’s see how we can solve our little problem with Kettle instead…
1. Continuously read all the tweets that are being sent on Twitter.
For this we are going to use one of the public Twitter web services, one that delivers a never-ending stream of JSON messages:
http://stream.twitter.com/1/statuses/sample.json?delimited=length
Since the format of the output is never-ending and specific in nature I wrote a small “User Defined Java Class” script:
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
HttpClient client = SlaveConnectionManager.getInstance().createHttpClient();
client.setTimeout(10000);
client.setConnectionTimeout(10000);
Credentials creds = new UsernamePasswordCredentials(getParameter("USERNAME"), getParameter("PASSWORD"));
client.getState().setCredentials(AuthScope.ANY, creds);
client.getParams().setAuthenticationPreemptive(true);
HttpMethod method = new PostMethod("http://stream.twitter.com/1/statuses/sample.json?delimited=length");
// Execute request
//
InputStream inputStream=null;
BufferedInputStream bufferedInputStream=null;
try {
int result = client.executeMethod(method);
// the response
//
inputStream = method.getResponseBodyAsStream();
bufferedInputStream = new BufferedInputStream(inputStream, 1000);
StringBuffer bodyBuffer = new StringBuffer();
int opened=0;
int c;
while ( (c=bufferedInputStream.read())!=-1 && !isStopped()) {
char ch = (char)c;
bodyBuffer.append(ch);
if (ch=='{') opened++; else if (ch=='}') opened--;
if (ch=='}' && opened==0) {
// one JSON block, pass it on!
//
Object[] r = createOutputRow(new Object[0], data.outputRowMeta.size());
String jsonString = bodyBuffer.toString();
int startIndex = jsonString.indexOf("{");
if (startIndex<0) startIndex=0;
// System.out.print("index="+startIndex+" json="+jsonString.substring(startIndex));
r[0] = jsonString.substring(startIndex);
putRow(data.outputRowMeta, r);
bodyBuffer.setLength(0);
}
}
} catch(Exception e) {
throw new KettleException("Unable to get tweets", e);
} finally {
bufferedInputStream.reset();
bufferedInputStream.close();
}
setOutputDone();
return false;
}
As the experienced UDJC writers among you will notice: this step never ends as long as the twitter service keeps on sending more data. Depending on the stability and popularity of twitter that can be “a very long time“.
You could improve the code even further to re-connect to the service every time it drops away. Personally I would not do this. I would rather have the transformation terminate with an error (as it is implemented now), send an alert (e-mail, database, SNMP) and re-start the transformation in a loop in a job. That way you have a trace in case twitter dies for a few hours.
2. Extract all the hash-tags used
First we’ll parse the JSON returned by the twitter service, extract the first 5 hash-tags from the message, split this up into 5 rows and count the tags…

3. Count the number of hash-tags used in a one-minute time-window
The counting is easy as you can simply use a “Group by” step. However, how can we aggregate in a time-based fashion without too much tinkering? Well, we now have the “Single Threader” step which has the option to aggregate in a time-based manner so we might as well use this option:

This step simply accumulates all records in memory until 60 seconds have passed and then performs one iteration of the single threaded execution of the specified transformation. This is a special execution method that doesn’t use the typical parallel engine. Another cool thing about this engine is that the records that go into the engine in the time-window can be grouped and sorted without the transformation being restarted every minute.
4. Report on all the tags that are being used more than once
The filtering is done with a simple “Filter Rows” step. However, thanks to the magic of the “Single Threader” step we can sort the rows descending by the tag occurrence count in that one-minute time-window. It’s also interesting to note that if you have huge amounts of data, that you can easily parallelize your work by starting multiple copies of the single threader step and/or with some clever data partitioning. In our case we could partition by hash-tag or re-aggregate the aggregated data.
5. Put the output in a browser window, continuously update every minute.
As shown in an earlier blog post, we can do this quite easily with a “Text File Output” step. However, we also want to put a small header and a separator between the data from every minute so we end up with a transformation that looks like this:

The script to print the header looks like this:
var out;
if (out==null) {
out = _step_.getTrans().getServletPrintWriter();
out.println("'Real-time' twitter hashtag report, minute based");
out.flush();
}
The separator between each minute is simple too:
if (nr==1) { var out = _step_.getTrans().getServletPrintWriter();out.println("============================================"); out.println();out.flush(); }
You can execute this transformation on a Carte instance (4.2.0) and see the following output:
'Real-time' twitter hashtag report, minute based ================================================= nr;hashtag;count;from;to 1;tatilmayonezi;5;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000 2;AUGUST6THBUZZNIGHTCLUB;3;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000 3;teamfollowback;3;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000 4;ayamzaman;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000 5;dnd;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000 6;follow;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000 7;malhação;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000 8;rappernames;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000 9;thingswelearnedontwitter;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000 ================================================= 1;ska;5;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 2;followplanetjedward;4;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 3;chistede3pesos;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 4;NP;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 5;rappernames;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 6;tatilmayonezi;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 7;teamfollowback;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 8;AvrilBeatsVolcano;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 9;CM6;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 10;followme;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 11;Leão;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 12;NewArtists;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 13;OOMF;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 14;RETWEET;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 15;sougofollow;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 16;swag;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 17;thingswelearnedontwitter;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 ...
For reference, I used the following URL to start the streaming report:
http://cluster:cluster@127.0.0.1:8282/kettle/executeTrans/?trans=%2Fhome%2Fmatt%2Ftest-stuff%2FTwitter Stream%2FRead a twitter stream.ktr&USERNAME=MyTwitterAccount&PASSWORD=MyPassword
I placed the complete example over here in case you want to try this yourself on PDI/Kettle version 4.2.0-RC1 or later. Things you can add to make it even cooler is to have this transformation send an e-mail every time a certain hash-tag gets used more than 15 times in a given minute. That sort of alerting support for example gives you easy access to emerging new trends, events and memes.
For reference, take a look at this earlier blog post of mine where I describe the internal cleanup mechanisms inside of Kettle that prevent our transformation from ever running out of memory or resources.
Until next time,
Matt
PlanetMySQL Voting: Vote UP / Vote DOWN
Real-time streaming data aggregation
Июль 28th, 2011Dear Kettle users,
Most of you usually use a data integration engine to process data in a batch-oriented way. Pentaho Data Integration (Kettle) is typically deployed to run monthly, nightly, hourly workloads. Sometimes folks run micro-batches of work every minute or so. However, it’s lesser known that our beloved transformation engine can also be used to stream data indefinitely (never ending) from a source to a target. This sort of data integration is sometimes referred to as being “streaming“, “real-time“, “near real-time“, “continuous” and so on. Typical examples of situations where you have a never-ending supply of data that needs to be processed the instance it becomes available are JMS (Java Message Service), RDBMS log sniffing, on-line fraud analyses, web or application log sniffing or of-course … Twitter! Since Twitter is easily accessed it’s common for examples to pop up regarding it’s usage and in this blog post too we will use this service to demo the Pentaho Data Integration capabilities wrt to processing streaming data.
Here’s what we want to do:
- Continuously read all the tweets that are being sent on Twitter.
- Extract all the hash-tags used
- Count the number of hash-tags used in a one-minute time-window
- Report on all the tags that are being used more than once
- Put the output in a browser window, continuously update every minute.
This is a very generic example but the logic of this can be applied to different fields like JMS, HL7, log sniffing and so on. It differs from the excellent work that Vincent from Open-BI described earlier this week on his blog in the sense that his Talend job finishes where ours will never end and where ours will do time-based aggregation in contrast to aggregation over a finite data set.
Also note that in order for Kettle to fully support multiple streaming data sources we would have to implement support for “windowed” (time-based) joins and other nifty things. We’ve seen very little demand for this sort of requirement in the past, perhaps because people don’t know it’s possible with Kettle. In any case, if you currently are in need of full streaming data support, have a look at SQLStream, they can help you. SQLStream is co-founded by Pentaho’s Julian Hyde of Mondrian fame.
OK, let’s see how we can solve our little problem with Kettle instead…
1. Continuously read all the tweets that are being sent on Twitter.
For this we are going to use one of the public Twitter web services, one that delivers a never-ending stream of JSON messages:
http://stream.twitter.com/1/statuses/sample.json?delimited=length
Since the format of the output is never-ending and specific in nature I wrote a small “User Defined Java Class” script:
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
HttpClient client = SlaveConnectionManager.getInstance().createHttpClient();
client.setTimeout(10000);
client.setConnectionTimeout(10000);
Credentials creds = new UsernamePasswordCredentials(getParameter("USERNAME"), getParameter("PASSWORD"));
client.getState().setCredentials(AuthScope.ANY, creds);
client.getParams().setAuthenticationPreemptive(true);
HttpMethod method = new PostMethod("http://stream.twitter.com/1/statuses/sample.json?delimited=length");
// Execute request
//
InputStream inputStream=null;
BufferedInputStream bufferedInputStream=null;
try {
int result = client.executeMethod(method);
// the response
//
inputStream = method.getResponseBodyAsStream();
bufferedInputStream = new BufferedInputStream(inputStream, 1000);
StringBuffer bodyBuffer = new StringBuffer();
int opened=0;
int c;
while ( (c=bufferedInputStream.read())!=-1 && !isStopped()) {
char ch = (char)c;
bodyBuffer.append(ch);
if (ch=='{') opened++; else if (ch=='}') opened--;
if (ch=='}' && opened==0) {
// one JSON block, pass it on!
//
Object[] r = createOutputRow(new Object[0], data.outputRowMeta.size());
String jsonString = bodyBuffer.toString();
int startIndex = jsonString.indexOf("{");
if (startIndex<0) startIndex=0;
// System.out.print("index="+startIndex+" json="+jsonString.substring(startIndex));
r[0] = jsonString.substring(startIndex);
putRow(data.outputRowMeta, r);
bodyBuffer.setLength(0);
}
}
} catch(Exception e) {
throw new KettleException("Unable to get tweets", e);
} finally {
bufferedInputStream.reset();
bufferedInputStream.close();
}
setOutputDone();
return false;
}
As the experienced UDJC writers among you will notice: this step never ends as long as the twitter service keeps on sending more data. Depending on the stability and popularity of twitter that can be “a very long time“.
You could improve the code even further to re-connect to the service every time it drops away. Personally I would not do this. I would rather have the transformation terminate with an error (as it is implemented now), send an alert (e-mail, database, SNMP) and re-start the transformation in a loop in a job. That way you have a trace in case twitter dies for a few hours.
2. Extract all the hash-tags used
First we’ll parse the JSON returned by the twitter service, extract the first 5 hash-tags from the message, split this up into 5 rows and count the tags…

3. Count the number of hash-tags used in a one-minute time-window
The counting is easy as you can simply use a “Group by” step. However, how can we aggregate in a time-based fashion without too much tinkering? Well, we now have the “Single Threader” step which has the option to aggregate in a time-based manner so we might as well use this option:

This step simply accumulates all records in memory until 60 seconds have passed and then performs one iteration of the single threaded execution of the specified transformation. This is a special execution method that doesn’t use the typical parallel engine. Another cool thing about this engine is that the records that go into the engine in the time-window can be grouped and sorted without the transformation being restarted every minute.
4. Report on all the tags that are being used more than once
The filtering is done with a simple “Filter Rows” step. However, thanks to the magic of the “Single Threader” step we can sort the rows descending by the tag occurrence count in that one-minute time-window. It’s also interesting to note that if you have huge amounts of data, that you can easily parallelize your work by starting multiple copies of the single threader step and/or with some clever data partitioning. In our case we could partition by hash-tag or re-aggregate the aggregated data.
5. Put the output in a browser window, continuously update every minute.
As shown in an earlier blog post, we can do this quite easily with a “Text File Output” step. However, we also want to put a small header and a separator between the data from every minute so we end up with a transformation that looks like this:

The script to print the header looks like this:
var out;
if (out==null) {
out = _step_.getTrans().getServletPrintWriter();
out.println("'Real-time' twitter hashtag report, minute based");
out.flush();
}
The separator between each minute is simple too:
if (nr==1) { var out = _step_.getTrans().getServletPrintWriter();out.println("============================================"); out.println();out.flush(); }
You can execute this transformation on a Carte instance (4.2.0) and see the following output:
'Real-time' twitter hashtag report, minute based ================================================= nr;hashtag;count;from;to 1;tatilmayonezi;5;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000 2;AUGUST6THBUZZNIGHTCLUB;3;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000 3;teamfollowback;3;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000 4;ayamzaman;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000 5;dnd;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000 6;follow;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000 7;malhação;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000 8;rappernames;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000 9;thingswelearnedontwitter;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000 ================================================= 1;ska;5;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 2;followplanetjedward;4;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 3;chistede3pesos;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 4;NP;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 5;rappernames;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 6;tatilmayonezi;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 7;teamfollowback;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 8;AvrilBeatsVolcano;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 9;CM6;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 10;followme;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 11;Leão;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 12;NewArtists;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 13;OOMF;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 14;RETWEET;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 15;sougofollow;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 16;swag;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 17;thingswelearnedontwitter;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000 ...
For reference, I used the following URL to start the streaming report:
http://cluster:cluster@127.0.0.1:8282/kettle/executeTrans/?trans=%2Fhome%2Fmatt%2Ftest-stuff%2FTwitter Stream%2FRead a twitter stream.ktr&USERNAME=MyTwitterAccount&PASSWORD=MyPassword
I placed the complete example over here in case you want to try this yourself on PDI/Kettle version 4.2.0-RC1 or later. Things you can add to make it even cooler is to have this transformation send an e-mail every time a certain hash-tag gets used more than 15 times in a given minute. That sort of alerting support for example gives you easy access to emerging new trends, events and memes.
For reference, take a look at this earlier blog post of mine where I describe the internal cleanup mechanisms inside of Kettle that prevent our transformation from ever running out of memory or resources.
Until next time,
Matt
PlanetMySQL Voting: Vote UP / Vote DOWN
Open Query on Twitter/Identi.ca
Август 27th, 2010Open Query now has its own @openquery account on Twitter and Identi.ca so you can conveniently follow us there for announcements and tips – and also ask us questions! All OQ engineers can post/reply. The OQ site front page also tracks this feed.
Previously I was posting from my personal @arjenlentz account with #openquery hashtag, but that’s obviously less practical.
PlanetMySQL Voting: Vote UP / Vote DOWN
OpenSQL Camp Europe: Time to cast your votes!
Июль 15th, 2010
If you wonder why there hasn't been an update from me for quite a while — I just returned from two months of paternal leave, in which I actually managed to stay away from the PC most of the time. In the meanwhile, I've officially become an Oracle employee and there is a lot of administrative things to take care of... But it feels good to be back!
During my absence, Giuseppe and Felix kicked off the Call for Papers for this year's European OpenSQL Camp, which will again take place in parallel to FrOSCon in St. Augustin (Germany) on August 21st/22nd. We've received a number of great submissions, now we would like to ask our community about your favourites!
Basically it's "one vote per person per session" and you can cast your votes in two ways, either by twittering @opensqlcamp or via the opensqlcamp mailing list. The procedure is outlined in more detail on this wiki page.
As we need to finalize the schedule and inform the speakers, the voting period will close this coming Sunday, 18th of July. So don't hesitate, cast your votes now! Based on your feedback we will compile the session schedule for this year's camp. Thanks for your help!
PlanetMySQL Voting: Vote UP / Vote DOWN
Not Only NoSQL!! Uber Scaling-Out with SPIDER storage engine
Март 30th, 2010However, when a site grow large, amount of traffic may exceed the replication's capacity. In such a case, people may use memcached. It's an in-memory, very fast and well-known KVS, key value store, and its read throughput is far better than MySQL. It's been used as a cache for web applications to store 'hot' data with MySQL as a back-end storage, as it can reduce read requests to MySQL dramatically.
While 1:N replication can scale read workload and memcached can reduce read requests, it cannot ease write load well. So, write traffic gets higher and higher when a web site becomes huge. On such web sites, a technique called "Sharding" has been used; it's a technique that the application choose an appropriate MySQL server from several servers.
In that way, MySQL+memcached has been a de-fact standard data store on huge web sites for long time.
Since web applications are getting larger still, especially on social media sites, write load is getting higher and higher as people communicate in real-time. In such area, yet another technique is required to handle the write load. Then, some people have chosen NoSQL solutions instead of MySQL+memcached. NoSQL is a kind of buzz word, IMHO, which represents non-relational databases which doesn't require SQL access. Despite lack of SQL access, some NoSQL softwares are suitable for huge scale web applications, like Cassandra. Although people cannot JOIN records on NoSQL system, it is not possible on RDBMS over the shards as well. So, MySQL isn't used as a RDBMS, is used as a data store without joins in other words, on such a web application in the first place.
For further information of this kind of thoughts, I recommend you to read Mark Calleghan's post: http://mysqlha.blogspot.com/2010/03/plays-well-with-others.html
and this post: http://nosql.mypopescu.com/post/407159447/cassandra-twitter-an-interview-with-ryan-king
Technically, it is possible to handle huge amount of traffics using MySQL, but a running cost gets expensive, Twitter says. As these techniques are separate ones, so those people have to spent their time to learn all of three who implement the application over them and manage them. On the other hand, Cassandra can handle more traffics as a single database management system, so people only have to learn it instead of three. Sounds great? But, is it a really good choice?
No! They're not aware of yet another solution, say SPIDER storage engine!
SPIDER for MySQL
http://spiderformysql.com/
SPIDER is a storage engine developed by a Japanese MySQL hacker, Mr. Kentoku Shiba, it makes use of MySQL's partitioning functionality and store partitioned data onto remote servers. I may say it's a Sharding storage engine. While flexibility of MySQL's storage engine API enables such an engine, but I value Kentoku's design a lot.
The following picture depicts how SPIDER storage engine works. (This is a snippet from the site above.)
In this entry, I do not explain how to use SPIDER storage engine, but I tell you how great its ability is. If you want to try it out, please refer to Giuseppe Maxia's post.
Please look at the following graph, which represents an INSERT performance comparing a single MySQL server (InnoDB), 2 SPIDER node + 2 backend MySQL server and 4 SPIDER node + 4 backend MySQL Server. You can see how good it scales.
The next graph is a SELECT performance. Read scales pretty good as well.
Red circles indicate where working set sizes exceed memory sizes. While performance drops when a working set size exceeds the available memory size, SPIDER is able to expand the memory so that a working set fits in it. SPIDER can make use of memory on all remote servers, as if there is a huge buffer pool in total.
For more information about SPIDER's performance test, please refer to Kentoku's slide. It's surprising.
The most significant problem for twitter is to scale out read/write load with less running cost. Unfortunately, they had chosen NoSQL solution due to the fact that "MySQL replication + memcached + sharding" cannot handle write intensive workload well. However, such a problem can be resolved using SPIDER storage engine with MySQL!
Generally, KVS cannot solve certain problems like below:
- JOIN
- Sort (ORDER BY)
- Aggregation (GROUP BY)
I think that the most significant benefit to use RDB is its usefulness and flexibility. It is a very important characteristic for developers in order to keep the application competitive, especially for those developers who have to add new features/functionalities day by day, like web services. I develop SPIDER storage engine in order to provide developers such useful and flexible RDB's characteristics, even on the environment where the traffic and data is huge thus Sharding is required.I 100% agree with his opinion. If you are facing the problem caused by high traffic and huge data just like twitter, please consider to use SPIDER storage engine before migrating to NoSQL solutions.
PlanetMySQL Voting: Vote UP / Vote DOWN





