Archive for the ‘twitter’ Category

What have I been up to lately?

Сентябрь 2nd, 2011

Despite my best intentions, I haven't posted on this blog for a while, which is a shame! I've become busy writing on so many other places since I moved into my new role in the Oracle Linux product management team in April. I've learned a lot and I am feeling quite at home here! The team is excellent and very nice to work with — I am slowly getting the "Big Picture".

But even though I've been neglecting this blog, there are a lot of things that are publicly visible and document some of my activites:

I've created two podcasts for the Oracle Linux podcasts: In addition to working the @ORCL_Linux Twitter account and FaceBook page, I've been blogging on the Oracle Linux blog: From time to time, I'm a guest blogger on the OTN Garage blog: I also created new content and updated pages on the main Oracle web site and the Oracle Technology Network (OTN): I've been traveling a bit as well and attended a few conferences where I spoke about Oracle Linux (and MySQL): I probably forgot a few things in my reflection of the past few months, but these were some of the highlights.

Check out my followup blog post on what I'm up to in the coming weeks and months!


PlanetMySQL Voting: Vote UP / Vote DOWN

Real-time streaming data aggregation

Июль 28th, 2011

Dear Kettle users,

Most of you usually use a data integration engine to process data in a batch-oriented way.  Pentaho Data Integration (Kettle) is typically deployed to run monthly, nightly, hourly workloads.  Sometimes folks run micro-batches of work every minute or so.  However, it’s lesser known that our beloved transformation engine can also be used to stream data indefinitely (never ending) from a source to a target.  This sort of data integration is sometimes referred to as being “streaming“, “real-time“, “near real-time“, “continuous” and so on.  Typical examples of situations where you have a never-ending supply of data that needs to be processed the instance it becomes available are JMS (Java Message Service), RDBMS log sniffing, on-line fraud analyses, web or application log sniffing or of-course … Twitter!  Since Twitter is easily accessed it’s common for examples to pop up regarding it’s usage and in this blog post too we will use this service to demo the Pentaho Data Integration capabilities wrt to processing streaming data.

Here’s what we want to do:

  1. Continuously read all the tweets that are being sent on Twitter.
  2. Extract all the hash-tags used
  3. Count the number of hash-tags used in a one-minute time-window
  4. Report on all the tags that are being used more than once
  5. Put the output in a browser window, continuously update every minute.

This is a very generic example but the logic of this can be applied to different fields like JMS, HL7, log sniffing and so on.  It differs from the excellent work that Vincent from Open-BI described earlier this week on his blog in the sense that his Talend job finishes where ours will never end and where ours will do time-based aggregation in contrast to aggregation over a finite data set.

Also note that in order for Kettle to fully support multiple streaming data sources we would have to implement support for “windowed” (time-based) joins and other nifty things.  We’ve seen very little demand for this sort of requirement in the past, perhaps because people don’t know it’s possible with Kettle.  In any case, if you currently are in need of full streaming data support, have a look at SQLStream, they can help you. SQLStream is co-founded by Pentaho’s Julian Hyde of Mondrian fame.

OK, let’s see how we can solve our little problem with Kettle instead…

1. Continuously read all the tweets that are being sent on Twitter.

For this we are going to use one of the public Twitter web services, one that delivers a never-ending stream of JSON messages:

http://stream.twitter.com/1/statuses/sample.json?delimited=length

Since the format of the output is never-ending and specific in nature I wrote a small “User Defined Java Class” script:

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
HttpClient client = SlaveConnectionManager.getInstance().createHttpClient();
client.setTimeout(10000);
client.setConnectionTimeout(10000);

Credentials creds = new UsernamePasswordCredentials(getParameter("USERNAME"), getParameter("PASSWORD"));
client.getState().setCredentials(AuthScope.ANY, creds);
client.getParams().setAuthenticationPreemptive(true);

HttpMethod method = new PostMethod("http://stream.twitter.com/1/statuses/sample.json?delimited=length");

// Execute request
//
InputStream inputStream=null;
BufferedInputStream bufferedInputStream=null;
try {
int result = client.executeMethod(method);

// the response
//
inputStream = method.getResponseBodyAsStream();
bufferedInputStream = new BufferedInputStream(inputStream, 1000);

StringBuffer bodyBuffer = new StringBuffer();
int opened=0;
int c;
while ( (c=bufferedInputStream.read())!=-1  && !isStopped()) {
char ch = (char)c;
bodyBuffer.append(ch);
if (ch=='{') opened++; else if (ch=='}') opened--;
if (ch=='}' && opened==0) {
// one JSON block, pass it on!
//
Object[] r = createOutputRow(new Object[0], data.outputRowMeta.size());
String jsonString = bodyBuffer.toString();

int startIndex = jsonString.indexOf("{");
if (startIndex<0) startIndex=0;

// System.out.print("index="+startIndex+" json="+jsonString.substring(startIndex));

r[0] = jsonString.substring(startIndex);
putRow(data.outputRowMeta, r);

bodyBuffer.setLength(0);
}
}
} catch(Exception e) {
throw new KettleException("Unable to get tweets", e);
} finally {
bufferedInputStream.reset();
bufferedInputStream.close();
}

setOutputDone();
return false;
}

As the experienced UDJC writers among you will notice: this step never ends as long as the twitter service keeps on sending more data.  Depending on the stability and popularity of twitter that can be “a very long time“.

You could improve the code even further to re-connect to the service every time it drops away.  Personally I would not do this.  I would rather have the transformation terminate with an error (as it is implemented now), send an alert (e-mail, database, SNMP) and re-start the transformation in a loop in a job.  That way you have a trace in case twitter dies for a few hours.

2. Extract all the hash-tags used

First we’ll parse the JSON returned by the twitter service, extract the first 5 hash-tags from the message, split this up into 5 rows and count the tags…

3. Count the number of hash-tags used in a one-minute time-window

The counting is easy as you can simply use a “Group by”  step.  However, how can we aggregate in a time-based fashion without too much tinkering?   Well, we now have the “Single Threader” step which has the option to aggregate in a time-based manner so we might as well use this option:

This step simply accumulates all records in memory until 60 seconds have passed and then performs one iteration of the single threaded execution of the specified transformation.  This is a special execution method that doesn’t use the typical parallel engine.  Another cool thing about this engine is that the records that go into the engine in the time-window can be grouped and sorted without the transformation being restarted every minute.

4. Report on all the tags that are being used more than once

The filtering is done with a simple “Filter Rows” step.  However, thanks to the magic of the “Single Threader” step we can sort the rows descending by the tag occurrence count in that one-minute time-window.  It’s also interesting to note that if you have huge amounts of data, that you can easily parallelize your work by starting multiple copies of the single threader step and/or with some clever data partitioning.  In our case we could partition by hash-tag or re-aggregate the aggregated data.

5. Put the output in a browser window, continuously update every minute.

As shown in an earlier blog post, we can do this quite easily with a “Text File Output” step.  However, we also want to put a small header and a separator between the data from every minute so we end up with a transformation that looks like this:

The script to print the header looks like this:

var out;
if (out==null) {
out = _step_.getTrans().getServletPrintWriter();
out.println("'Real-time' twitter hashtag report, minute based");
out.flush();
}

The separator between each minute is simple too:

if (nr==1) {
var out = _step_.getTrans().getServletPrintWriter();
  out.println("============================================");
out.println();
  out.flush();
}

You can execute this transformation on a Carte instance (4.2.0) and see the following output:

'Real-time' twitter hashtag report, minute based
=================================================

nr;hashtag;count;from;to
1;tatilmayonezi;5;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
2;AUGUST6THBUZZNIGHTCLUB;3;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
3;teamfollowback;3;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
4;ayamzaman;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
5;dnd;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
6;follow;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
7;malhação;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
8;rappernames;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
9;thingswelearnedontwitter;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
=================================================

1;ska;5;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
2;followplanetjedward;4;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
3;chistede3pesos;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
4;NP;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
5;rappernames;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
6;tatilmayonezi;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
7;teamfollowback;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
8;AvrilBeatsVolcano;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
9;CM6;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
10;followme;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
11;Leão;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
12;NewArtists;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
13;OOMF;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
14;RETWEET;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
15;sougofollow;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
16;swag;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
17;thingswelearnedontwitter;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000

...

For reference, I used the following URL to start the streaming report:

http://cluster:cluster@127.0.0.1:8282/kettle/executeTrans/?trans=%2Fhome%2Fmatt%2Ftest-stuff%2FTwitter Stream%2FRead a twitter stream.ktr&USERNAME=MyTwitterAccount&PASSWORD=MyPassword

I placed the complete example over here in case you want to try this yourself on PDI/Kettle version 4.2.0-RC1 or later. Things you can add to make it even cooler is to have this transformation send an e-mail every time a certain hash-tag gets used more than 15 times in a given minute.  That sort of alerting support for example gives you easy access to emerging new trends, events and memes.

For reference, take a look at this earlier blog post of mine where I describe the internal cleanup mechanisms inside of Kettle that prevent our transformation from ever running out of memory or resources.

Until next time,

Matt


PlanetMySQL Voting: Vote UP / Vote DOWN

Real-time streaming data aggregation

Июль 28th, 2011

Dear Kettle users,

Most of you usually use a data integration engine to process data in a batch-oriented way.  Pentaho Data Integration (Kettle) is typically deployed to run monthly, nightly, hourly workloads.  Sometimes folks run micro-batches of work every minute or so.  However, it’s lesser known that our beloved transformation engine can also be used to stream data indefinitely (never ending) from a source to a target.  This sort of data integration is sometimes referred to as being “streaming“, “real-time“, “near real-time“, “continuous” and so on.  Typical examples of situations where you have a never-ending supply of data that needs to be processed the instance it becomes available are JMS (Java Message Service), RDBMS log sniffing, on-line fraud analyses, web or application log sniffing or of-course … Twitter!  Since Twitter is easily accessed it’s common for examples to pop up regarding it’s usage and in this blog post too we will use this service to demo the Pentaho Data Integration capabilities wrt to processing streaming data.

Here’s what we want to do:

  1. Continuously read all the tweets that are being sent on Twitter.
  2. Extract all the hash-tags used
  3. Count the number of hash-tags used in a one-minute time-window
  4. Report on all the tags that are being used more than once
  5. Put the output in a browser window, continuously update every minute.

This is a very generic example but the logic of this can be applied to different fields like JMS, HL7, log sniffing and so on.  It differs from the excellent work that Vincent from Open-BI described earlier this week on his blog in the sense that his Talend job finishes where ours will never end and where ours will do time-based aggregation in contrast to aggregation over a finite data set.

Also note that in order for Kettle to fully support multiple streaming data sources we would have to implement support for “windowed” (time-based) joins and other nifty things.  We’ve seen very little demand for this sort of requirement in the past, perhaps because people don’t know it’s possible with Kettle.  In any case, if you currently are in need of full streaming data support, have a look at SQLStream, they can help you. SQLStream is co-founded by Pentaho’s Julian Hyde of Mondrian fame.

OK, let’s see how we can solve our little problem with Kettle instead…

1. Continuously read all the tweets that are being sent on Twitter.

For this we are going to use one of the public Twitter web services, one that delivers a never-ending stream of JSON messages:

http://stream.twitter.com/1/statuses/sample.json?delimited=length

Since the format of the output is never-ending and specific in nature I wrote a small “User Defined Java Class” script:

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
HttpClient client = SlaveConnectionManager.getInstance().createHttpClient();
client.setTimeout(10000);
client.setConnectionTimeout(10000);

Credentials creds = new UsernamePasswordCredentials(getParameter("USERNAME"), getParameter("PASSWORD"));
client.getState().setCredentials(AuthScope.ANY, creds);
client.getParams().setAuthenticationPreemptive(true);

HttpMethod method = new PostMethod("http://stream.twitter.com/1/statuses/sample.json?delimited=length");

// Execute request
//
InputStream inputStream=null;
BufferedInputStream bufferedInputStream=null;
try {
int result = client.executeMethod(method);

// the response
//
inputStream = method.getResponseBodyAsStream();
bufferedInputStream = new BufferedInputStream(inputStream, 1000);

StringBuffer bodyBuffer = new StringBuffer();
int opened=0;
int c;
while ( (c=bufferedInputStream.read())!=-1  && !isStopped()) {
char ch = (char)c;
bodyBuffer.append(ch);
if (ch=='{') opened++; else if (ch=='}') opened--;
if (ch=='}' && opened==0) {
// one JSON block, pass it on!
//
Object[] r = createOutputRow(new Object[0], data.outputRowMeta.size());
String jsonString = bodyBuffer.toString();

int startIndex = jsonString.indexOf("{");
if (startIndex<0) startIndex=0;

// System.out.print("index="+startIndex+" json="+jsonString.substring(startIndex));

r[0] = jsonString.substring(startIndex);
putRow(data.outputRowMeta, r);

bodyBuffer.setLength(0);
}
}
} catch(Exception e) {
throw new KettleException("Unable to get tweets", e);
} finally {
bufferedInputStream.reset();
bufferedInputStream.close();
}

setOutputDone();
return false;
}

As the experienced UDJC writers among you will notice: this step never ends as long as the twitter service keeps on sending more data.  Depending on the stability and popularity of twitter that can be “a very long time“.

You could improve the code even further to re-connect to the service every time it drops away.  Personally I would not do this.  I would rather have the transformation terminate with an error (as it is implemented now), send an alert (e-mail, database, SNMP) and re-start the transformation in a loop in a job.  That way you have a trace in case twitter dies for a few hours.

2. Extract all the hash-tags used

First we’ll parse the JSON returned by the twitter service, extract the first 5 hash-tags from the message, split this up into 5 rows and count the tags…

3. Count the number of hash-tags used in a one-minute time-window

The counting is easy as you can simply use a “Group by”  step.  However, how can we aggregate in a time-based fashion without too much tinkering?   Well, we now have the “Single Threader” step which has the option to aggregate in a time-based manner so we might as well use this option:

This step simply accumulates all records in memory until 60 seconds have passed and then performs one iteration of the single threaded execution of the specified transformation.  This is a special execution method that doesn’t use the typical parallel engine.  Another cool thing about this engine is that the records that go into the engine in the time-window can be grouped and sorted without the transformation being restarted every minute.

4. Report on all the tags that are being used more than once

The filtering is done with a simple “Filter Rows” step.  However, thanks to the magic of the “Single Threader” step we can sort the rows descending by the tag occurrence count in that one-minute time-window.  It’s also interesting to note that if you have huge amounts of data, that you can easily parallelize your work by starting multiple copies of the single threader step and/or with some clever data partitioning.  In our case we could partition by hash-tag or re-aggregate the aggregated data.

5. Put the output in a browser window, continuously update every minute.

As shown in an earlier blog post, we can do this quite easily with a “Text File Output” step.  However, we also want to put a small header and a separator between the data from every minute so we end up with a transformation that looks like this:

The script to print the header looks like this:

var out;
if (out==null) {
out = _step_.getTrans().getServletPrintWriter();
out.println("'Real-time' twitter hashtag report, minute based");
out.flush();
}

The separator between each minute is simple too:

if (nr==1) {
var out = _step_.getTrans().getServletPrintWriter();
  out.println("============================================");
out.println();
  out.flush();
}

You can execute this transformation on a Carte instance (4.2.0) and see the following output:

'Real-time' twitter hashtag report, minute based
=================================================

nr;hashtag;count;from;to
1;tatilmayonezi;5;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
2;AUGUST6THBUZZNIGHTCLUB;3;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
3;teamfollowback;3;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
4;ayamzaman;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
5;dnd;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
6;follow;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
7;malhação;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
8;rappernames;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
9;thingswelearnedontwitter;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
=================================================

1;ska;5;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
2;followplanetjedward;4;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
3;chistede3pesos;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
4;NP;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
5;rappernames;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
6;tatilmayonezi;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
7;teamfollowback;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
8;AvrilBeatsVolcano;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
9;CM6;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
10;followme;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
11;Leão;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
12;NewArtists;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
13;OOMF;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
14;RETWEET;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
15;sougofollow;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
16;swag;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
17;thingswelearnedontwitter;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000

...

For reference, I used the following URL to start the streaming report:

http://cluster:cluster@127.0.0.1:8282/kettle/executeTrans/?trans=%2Fhome%2Fmatt%2Ftest-stuff%2FTwitter Stream%2FRead a twitter stream.ktr&USERNAME=MyTwitterAccount&PASSWORD=MyPassword

I placed the complete example over here in case you want to try this yourself on PDI/Kettle version 4.2.0-RC1 or later. Things you can add to make it even cooler is to have this transformation send an e-mail every time a certain hash-tag gets used more than 15 times in a given minute.  That sort of alerting support for example gives you easy access to emerging new trends, events and memes.

For reference, take a look at this earlier blog post of mine where I describe the internal cleanup mechanisms inside of Kettle that prevent our transformation from ever running out of memory or resources.

Until next time,

Matt


PlanetMySQL Voting: Vote UP / Vote DOWN

Real-time streaming data aggregation

Июль 28th, 2011

Dear Kettle users,

Most of you usually use a data integration engine to process data in a batch-oriented way.  Pentaho Data Integration (Kettle) is typically deployed to run monthly, nightly, hourly workloads.  Sometimes folks run micro-batches of work every minute or so.  However, it’s lesser known that our beloved transformation engine can also be used to stream data indefinitely (never ending) from a source to a target.  This sort of data integration is sometimes referred to as being “streaming“, “real-time“, “near real-time“, “continuous” and so on.  Typical examples of situations where you have a never-ending supply of data that needs to be processed the instance it becomes available are JMS (Java Message Service), RDBMS log sniffing, on-line fraud analyses, web or application log sniffing or of-course … Twitter!  Since Twitter is easily accessed it’s common for examples to pop up regarding it’s usage and in this blog post too we will use this service to demo the Pentaho Data Integration capabilities wrt to processing streaming data.

Here’s what we want to do:

  1. Continuously read all the tweets that are being sent on Twitter.
  2. Extract all the hash-tags used
  3. Count the number of hash-tags used in a one-minute time-window
  4. Report on all the tags that are being used more than once
  5. Put the output in a browser window, continuously update every minute.

This is a very generic example but the logic of this can be applied to different fields like JMS, HL7, log sniffing and so on.  It differs from the excellent work that Vincent from Open-BI described earlier this week on his blog in the sense that his Talend job finishes where ours will never end and where ours will do time-based aggregation in contrast to aggregation over a finite data set.

Also note that in order for Kettle to fully support multiple streaming data sources we would have to implement support for “windowed” (time-based) joins and other nifty things.  We’ve seen very little demand for this sort of requirement in the past, perhaps because people don’t know it’s possible with Kettle.  In any case, if you currently are in need of full streaming data support, have a look at SQLStream, they can help you. SQLStream is co-founded by Pentaho’s Julian Hyde of Mondrian fame.

OK, let’s see how we can solve our little problem with Kettle instead…

1. Continuously read all the tweets that are being sent on Twitter.

For this we are going to use one of the public Twitter web services, one that delivers a never-ending stream of JSON messages:

http://stream.twitter.com/1/statuses/sample.json?delimited=length

Since the format of the output is never-ending and specific in nature I wrote a small “User Defined Java Class” script:

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
HttpClient client = SlaveConnectionManager.getInstance().createHttpClient();
client.setTimeout(10000);
client.setConnectionTimeout(10000);

Credentials creds = new UsernamePasswordCredentials(getParameter("USERNAME"), getParameter("PASSWORD"));
client.getState().setCredentials(AuthScope.ANY, creds);
client.getParams().setAuthenticationPreemptive(true);

HttpMethod method = new PostMethod("http://stream.twitter.com/1/statuses/sample.json?delimited=length");

// Execute request
//
InputStream inputStream=null;
BufferedInputStream bufferedInputStream=null;
try {
int result = client.executeMethod(method);

// the response
//
inputStream = method.getResponseBodyAsStream();
bufferedInputStream = new BufferedInputStream(inputStream, 1000);

StringBuffer bodyBuffer = new StringBuffer();
int opened=0;
int c;
while ( (c=bufferedInputStream.read())!=-1  && !isStopped()) {
char ch = (char)c;
bodyBuffer.append(ch);
if (ch=='{') opened++; else if (ch=='}') opened--;
if (ch=='}' && opened==0) {
// one JSON block, pass it on!
//
Object[] r = createOutputRow(new Object[0], data.outputRowMeta.size());
String jsonString = bodyBuffer.toString();

int startIndex = jsonString.indexOf("{");
if (startIndex<0) startIndex=0;

// System.out.print("index="+startIndex+" json="+jsonString.substring(startIndex));

r[0] = jsonString.substring(startIndex);
putRow(data.outputRowMeta, r);

bodyBuffer.setLength(0);
}
}
} catch(Exception e) {
throw new KettleException("Unable to get tweets", e);
} finally {
bufferedInputStream.reset();
bufferedInputStream.close();
}

setOutputDone();
return false;
}

As the experienced UDJC writers among you will notice: this step never ends as long as the twitter service keeps on sending more data.  Depending on the stability and popularity of twitter that can be “a very long time“.

You could improve the code even further to re-connect to the service every time it drops away.  Personally I would not do this.  I would rather have the transformation terminate with an error (as it is implemented now), send an alert (e-mail, database, SNMP) and re-start the transformation in a loop in a job.  That way you have a trace in case twitter dies for a few hours.

2. Extract all the hash-tags used

First we’ll parse the JSON returned by the twitter service, extract the first 5 hash-tags from the message, split this up into 5 rows and count the tags…

3. Count the number of hash-tags used in a one-minute time-window

The counting is easy as you can simply use a “Group by”  step.  However, how can we aggregate in a time-based fashion without too much tinkering?   Well, we now have the “Single Threader” step which has the option to aggregate in a time-based manner so we might as well use this option:

This step simply accumulates all records in memory until 60 seconds have passed and then performs one iteration of the single threaded execution of the specified transformation.  This is a special execution method that doesn’t use the typical parallel engine.  Another cool thing about this engine is that the records that go into the engine in the time-window can be grouped and sorted without the transformation being restarted every minute.

4. Report on all the tags that are being used more than once

The filtering is done with a simple “Filter Rows” step.  However, thanks to the magic of the “Single Threader” step we can sort the rows descending by the tag occurrence count in that one-minute time-window.  It’s also interesting to note that if you have huge amounts of data, that you can easily parallelize your work by starting multiple copies of the single threader step and/or with some clever data partitioning.  In our case we could partition by hash-tag or re-aggregate the aggregated data.

5. Put the output in a browser window, continuously update every minute.

As shown in an earlier blog post, we can do this quite easily with a “Text File Output” step.  However, we also want to put a small header and a separator between the data from every minute so we end up with a transformation that looks like this:

The script to print the header looks like this:

var out;
if (out==null) {
out = _step_.getTrans().getServletPrintWriter();
out.println("'Real-time' twitter hashtag report, minute based");
out.flush();
}

The separator between each minute is simple too:

if (nr==1) {
var out = _step_.getTrans().getServletPrintWriter();
  out.println("============================================");
out.println();
  out.flush();
}

You can execute this transformation on a Carte instance (4.2.0) and see the following output:

'Real-time' twitter hashtag report, minute based
=================================================

nr;hashtag;count;from;to
1;tatilmayonezi;5;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
2;AUGUST6THBUZZNIGHTCLUB;3;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
3;teamfollowback;3;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
4;ayamzaman;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
5;dnd;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
6;follow;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
7;malhação;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
8;rappernames;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
9;thingswelearnedontwitter;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
=================================================

1;ska;5;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
2;followplanetjedward;4;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
3;chistede3pesos;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
4;NP;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
5;rappernames;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
6;tatilmayonezi;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
7;teamfollowback;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
8;AvrilBeatsVolcano;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
9;CM6;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
10;followme;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
11;Leão;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
12;NewArtists;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
13;OOMF;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
14;RETWEET;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
15;sougofollow;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
16;swag;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
17;thingswelearnedontwitter;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000

...

For reference, I used the following URL to start the streaming report:

http://cluster:cluster@127.0.0.1:8282/kettle/executeTrans/?trans=%2Fhome%2Fmatt%2Ftest-stuff%2FTwitter Stream%2FRead a twitter stream.ktr&USERNAME=MyTwitterAccount&PASSWORD=MyPassword

I placed the complete example over here in case you want to try this yourself on PDI/Kettle version 4.2.0-RC1 or later. Things you can add to make it even cooler is to have this transformation send an e-mail every time a certain hash-tag gets used more than 15 times in a given minute.  That sort of alerting support for example gives you easy access to emerging new trends, events and memes.

For reference, take a look at this earlier blog post of mine where I describe the internal cleanup mechanisms inside of Kettle that prevent our transformation from ever running out of memory or resources.

Until next time,

Matt


PlanetMySQL Voting: Vote UP / Vote DOWN

Open Query on Twitter/Identi.ca

Август 27th, 2010

Open Query now has its own @openquery account on Twitter and Identi.ca so you can conveniently follow us there for announcements and tips – and also ask us questions! All OQ engineers can post/reply. The OQ site front page also tracks this feed.

Previously I was posting from my personal @arjenlentz account with #openquery hashtag, but that’s obviously less practical.


PlanetMySQL Voting: Vote UP / Vote DOWN

OpenSQL Camp Europe: Time to cast your votes!

Июль 15th, 2010

If you wonder why there hasn't been an update from me for quite a while — I just returned from two months of paternal leave, in which I actually managed to stay away from the PC most of the time. In the meanwhile, I've officially become an Oracle employee and there is a lot of administrative things to take care of... But it feels good to be back!

During my absence, Giuseppe and Felix kicked off the Call for Papers for this year's European OpenSQL Camp, which will again take place in parallel to FrOSCon in St. Augustin (Germany) on August 21st/22nd. We've received a number of great submissions, now we would like to ask our community about your favourites!

Basically it's "one vote per person per session" and you can cast your votes in two ways, either by twittering @opensqlcamp or via the opensqlcamp mailing list. The procedure is outlined in more detail on this wiki page.

As we need to finalize the schedule and inform the speakers, the voting period will close this coming Sunday, 18th of July. So don't hesitate, cast your votes now! Based on your feedback we will compile the session schedule for this year's camp. Thanks for your help!


PlanetMySQL Voting: Vote UP / Vote DOWN

Not Only NoSQL!! Uber Scaling-Out with SPIDER storage engine

Март 30th, 2010
The history tells that a single RDBMS node cannot handle tons of traffics on web system which come from all over the world, no matter how the database is tuned. MySQL has implemented a master/slave style replication built-in for long time, and it has enabled web applications to handle traffics using a scale-out strategy. Having many slaves has been suitable for web sites where most of traffics are reads. Thus, MySQL's master/slave replication has been used on many web sites, and is being used still.

However, when a site grow large, amount of traffic may exceed the replication's capacity. In such a case, people may use memcached. It's an in-memory, very fast and well-known KVS, key value store, and its read throughput is far better than MySQL. It's been used as a cache for web applications to store 'hot' data with MySQL as a back-end storage, as it can reduce read requests to MySQL dramatically.

While 1:N replication can scale read workload and memcached can reduce read requests, it cannot ease write load well. So, write traffic gets higher and higher when a web site becomes huge. On such web sites, a technique called "Sharding" has been used; it's a technique that the application choose an appropriate MySQL server from several servers.

In that way, MySQL+memcached has been a de-fact standard data store on huge web sites for long time.

Since web applications are getting larger still, especially on social media sites, write load is getting higher and higher as people communicate in real-time. In such area, yet another technique is required to handle the write load. Then, some people have chosen NoSQL solutions instead of MySQL+memcached. NoSQL is a kind of buzz word, IMHO, which represents non-relational databases which doesn't require SQL access. Despite lack of SQL access, some NoSQL softwares are suitable for huge scale web applications, like Cassandra. Although people cannot JOIN records on NoSQL system, it is not possible on RDBMS over the shards as well. So, MySQL isn't used as a RDBMS, is used as a data store without joins in other words, on such a web application in the first place.

For further information of this kind of thoughts, I recommend you to read Mark Calleghan's post: http://mysqlha.blogspot.com/2010/03/plays-well-with-others.html
and this post: http://nosql.mypopescu.com/post/407159447/cassandra-twitter-an-interview-with-ryan-king

Technically, it is possible to handle huge amount of traffics using MySQL, but a running cost gets expensive, Twitter says. As these techniques are separate ones, so those people have to spent their time to learn all of three who implement the application over them and manage them. On the other hand, Cassandra can handle more traffics as a single database management system, so people only have to learn it instead of three. Sounds great? But, is it a really good choice?

No! They're not aware of yet another solution, say SPIDER storage engine!

SPIDER for MySQL
http://spiderformysql.com/

SPIDER is a storage engine developed by a Japanese MySQL hacker, Mr. Kentoku Shiba, it makes use of MySQL's partitioning functionality and store partitioned data onto remote servers. I may say it's a Sharding storage engine. While flexibility of MySQL's storage engine API enables such an engine, but I value Kentoku's design a lot.

The following picture depicts how SPIDER storage engine works. (This is a snippet from the site above.)
In this entry, I do not explain how to use SPIDER storage engine, but I tell you how great its ability is. If you want to try it out, please refer to Giuseppe Maxia's post.

Please look at the following graph, which represents an INSERT performance comparing a single MySQL server (InnoDB), 2 SPIDER node + 2 backend MySQL server and 4 SPIDER node + 4 backend MySQL Server. You can see how good it scales.
The next graph is a SELECT performance. Read scales pretty good as well.
Red circles indicate where working set sizes exceed memory sizes. While performance drops when a working set size exceeds the available memory size, SPIDER is able to expand the memory so that a working set fits in it. SPIDER can make use of memory on all remote servers, as if there is a huge buffer pool in total.

For more information about SPIDER's performance test, please refer to Kentoku's slide. It's surprising.

The most significant problem for twitter is to scale out read/write load with less running cost. Unfortunately, they had chosen NoSQL solution due to the fact that "MySQL replication + memcached + sharding" cannot handle write intensive workload well. However, such a problem can be resolved using SPIDER storage engine with MySQL!

Generally, KVS cannot solve certain problems like below:
  • JOIN
  • Sort (ORDER BY)
  • Aggregation (GROUP BY)
When using KVS, these problems can be handled using MapReduce, however, we can process the same task using a very simple SQL in general. Thus, SQL allows us to develop a complex logic very efficiently. When I ask Kentoku permission to write an article about his storage engine, he told me his philosophy like below:
I think that the most significant benefit to use RDB is its usefulness and flexibility. It is a very important characteristic for developers in order to keep the application competitive, especially for those developers who have to add new features/functionalities day by day, like web services. I develop SPIDER storage engine in order to provide developers such useful and flexible RDB's characteristics, even on the environment where the traffic and data is huge thus Sharding is required.
I 100% agree with his opinion. If you are facing the problem caused by high traffic and huge data just like twitter, please consider to use SPIDER storage engine before migrating to NoSQL solutions.

PlanetMySQL Voting: Vote UP / Vote DOWN

Getting started with Cassandra

Февраль 24th, 2010

With the motivation from today’s public news on Twitter’s move from MySQL to Cassandra, my own skills desire following in-depth discussions at last November’s Open SQL Camp to consider Cassandra and yesterday’s discussion with a new client on persistent key-value store products, today I download installed and configured for the first time. Not that today’s news was unexpected, if you follow the Twitter Engineering Open Source projects you would have seen Cassandra as well as other products being used or evaluated by Twitter.

So I went from nothing to a working Cassandra node in under 5 minutes. This is what I did.

  1. While I knew this was an Apache project, a Google Search yields for me the 3rd link for the The Apache Cassandra Project at http://incubator.apache.org/cassandra/. Congrats for Cassandra now a top level Apache Project. This url will update soon.
  2. Download Cassandra. Hard to miss with a big green button on home page. Current version is 0.5
  3. I read Getting Started, which is the 3rd top level link on menu after Home and Download. Step 1 is picking a version which I’ve already done, Step 2 is Running a single node.
  4. The Getting Started indicated a problem on Mac OS X for the required minimum Java version. I was installing on Mac OS X 10.5 and CentOS 5.4. I’ve experienced this Java 6 default path issue before. Set my JAVA_HOME and PATH accordingly (after I updated the wiki with correct value)
  5. I extracted the tar file, changed to the directory and took at look at the README.txt file. Yes, I always check this first with any software and relevant because it includes valuable instructions on creating the default data and log directories.
  6. Start with bin/cassandra -f. No problems!
  7. I then followed the instructions from the link in Step 2 with the CassandraCli. This tests and confirms the installation is operational.

Ok, a working environment. I’ve now installed on a second machine and tested however I now need to configure the cluster, and the documentation is not as straightforward. Time to try out Google again.

On a side note, this is one reason why I love Open Source. I followed the instructions online and found a mistake in the Mac OS X path, I simply registered and corrected providing the benefit of my experience for the next reader(s).


PlanetMySQL Voting: Vote UP / Vote DOWN

When it Comes to Tweets, the Key is Location, Location, Location!

Февраль 24th, 2010

When you only have 140 characters to get your message across, you have to depend a lot on context. For Twitter, a big part of that context has become location. Knowing where someone is tweeting from can add a lot of value to the experience, and it's Raffi Krikorian's job to integrate location into Twitter. Raffi will be talking about this and other location-related topics at the upcoming Where 2.0 conference. We began by asking him how Twitter determines location, and whether it will always be an opt-in option.

Raffi Krikorian: I think part of it is based around the philosophy of Twitter itself. We only publish information that you've explicitly given to us on a tweet-by-tweet basis. So for location on your tweets, it's all opt-in. You have to give us that location information, and we'll put it out. There are other things we do behind the scenes, like our local trends information, that doesn't actually tie to an individual person. We might do some IP look-ups. We look at your user location field. But for anything that's tied to an individual, it's all opt-in.

James Turner: 140 characters is a restriction that Twitter's famous for. Location is fairly high bandwidth information. Have you considered carrying location data out of band from the 140 characters?

Raffi Krikorian: We do that right now. Originally, when people used to tweet location, they put a URL in their text field which linked to a map or linked to a service which might show where they are. But ever since we launched our geotagging API in November, we store the latitude and longitude for your tweet out of band. It's completely metadata on top of the tweet. A bunch of clients implement it, such as Tweety and Seesmic Web, they can read that metadata, and will show you either a map or attempt a reverse geocode and give you an actual name.

James Turner: What value do you see location bringing to social networking? Usually, if someone is talking about a location, it's explicit in the message or in the blog, "I'm at so-and-so and the show is really nice tonight." If you imagine that people are pervasively providing their geolocation, how does that aid social networking?

where20-2010-block.jpgRaffi Krikorian: I think that one, it helps people like us at Twitter to be able to give more relevant context information to other people. Especially in our 140 character constrained lifestyle, you can't necessarily put fully structured information of where you might be or what you might be talking about. But since we're now trying to expand the dimensionality of our platform to include place, we can now store that structured data, and, therefore, we can analyze it better. We can deliver it to the right people better, and we can do more interesting high-level analytics. Therefore, we can deliver relevant search or relevant information to people who are wanting it.

I think one of the dreams would be, not necessarily for Twitter but for someone out there, to be able to look at status update streams with geotagging on top of it and try to figure out what are the hot bars out there tonight, or be able to see cross-referencing with my foursquare check-in, for example. I want to be able to ask the service, "What bar should I go to right now that my friends have liked that I think I'll probably like and have no line?" And you'd only do something like that kind of high-level query if you actually have some really good way, either to analyze data or to get structured data out of the system. Analysis of that is going to be hard, especially in a world where you only have 140 characters to express yourself, for providing these metadata or meta ways to included structured information, and it becomes a UI problem to get that information into the system. It should become a lot easier for other people to build applications on top of it. So I think that's where geo-type stuff would go for networking, with better recommendations or better information delivery, better stuff within social networks.

James Turner: It sounds, in some ways, like you could mine the data the same way that, for example, GPS data in people who have phones in cars or cell phone data can be used to infer traffic patterns.

Raffi Krikorian: That's actually an excellent way to think about it. In the same way that you can watch how people are moving on freeways and try to figure out what's going on, that's a very passive interaction because people are looking at something and data's being sent out of band whereas something like Twitter, you're trying to express emotion or you're trying to express sentiment through Twitter. And that sentiment with latitude and longitude attached to it inherently talks about not just a feeling, but a feeling associated with a place. If we could start tying those places not just to latitude and longitude, but to the contextual, then you could start really understanding what's going on in the world and be able to deliver it to a lot of people.

It's the foursquare world, right? It's really important for me to find out that my friends really like that bar down the street. It's also really important to me to know that my friends are down the street right now and if I'm not busy, I can go there. So I think that's the direction we're trying to take things.

James Turner: One of the things we saw, especially in Iran this year, was Twitter as breaking news source simply because it is something that along with taking photos with your cell phone, it's just a very fast way of saying, "This thing is happening right now." When you add geolocation in on top of that, do you see the ability to kind of infer news just based on what people are tweeting and where they're tweeting it?

Raffi Krikorian: I think the question of what defines news is always going to be up for debate, but most certainly. We saw this a bunch of times. Right now, there's a Twitter account that watches a USGS website and tweets with geotagging exactly where an earthquake happens within a few seconds of it hitting the USGS website. We've seen examples of people uploading photos via Flickr of traffic accidents on the web. And Flickr has implemented our geotagging API, so if you upload a photo to them with geotagging, they'll pass it through to Twitter. And then on Twitter's side, we allow you to ask either for tweets within a certain location or connect to our geohost and get a stream of tweets subscribed to location. So I could see all of the events that are occurring that are geotagged within the San Francisco area, for example, in real-time or in the Bay area. And then in the Bay area, I'm going to start to see the breaking news events, like I mentioned before, traffic accidents. I'll see people just talking about random stuff. And I'll see the earthquakes pop up here and there as the USGS stuff comes out.

James Turner: Does that alleviate some of the need for some of the function-based or event-based hashtagging, where if I'm in the San Jose Convention Center, then I'm probably attending the event that's there, so I really don't need to tag it?

Raffi Krikorian: I think yes and no. I think that the hashtag system allows for really good context, so people can at a later date understand what went on at the time without necessarily having to figure out how to cross-reference all of the geo-tweets and then cluster them with other geo-tweets in the area trying to infer high-level stuff. I think tagging, explicit tagging, still provides a nice human-readable way to understand that information. And then the geotagging provides a really good machine-readable way to dissect and also provide that out of band type information. I think it's two different use cases. I ,for one, still apply hashtags whenever I talk about stuff to imply other things or imply with a different type of context than just the location alone might provide.

James Turner: We've seen a couple of interesting uses of social media that are apart from their obvious use. Someone was able to prove they weren't able to commit a crime because they were updating their Facebook page when the crime was committed. Similarly, we just saw a news item on a criminal who was caught because he was updating his Facebook page and the cops were able to figure out where he was. Certainly when you've got something like tweets that are geolocated, you can see that type of thing. Do you think that this is going to become more of a privacy issue? Do you think this kind of openness right now is a fad? Or are we seeing a real paradigm change about what people feel comfortable letting other people know about them?

Raffi Krikorian: Well, I think you have two points there, and I just want to hit the first one first, which is being able to find out where people are; being able to know that someone updated a Facebook item from a certain location. What we're doing at Twitter is not necessarily authenticated location, a lot of it's still implied. There's a lot of trust on either the application that updated Twitter or the person that's updating Twitter. We provide no guarantees that the location that's being reported by us is factual, except by the fact that someone posted that into a system. So just like you could totally lie about what you're doing at Twitter -- like I could tweet right now and say I'm sleeping, but I'm on the phone with you. I could send a tweet right now that says I'm in New York when in reality, I'm in my home right now in Oakland, California. A statement like that, of just revealing where my home is, sort of touches upon your second point. I think privacy in this type of world is a very tricky thing that needs to be maneuvered very carefully. Something like foursquare has privacy down because you need to have a bidirectional relationship with other people in foursquare to get a notification of where I am. I need to request to be friends with you, and you need to approve that request in order for me to get notification of where you are. I think that has privacy, or at least it has a privacy model which implies certain levels of control. For Twitter, since we have our asymmetric following method, I can follow you and you don't necessarily have to approve me. The onus is on us to make sure that the user's privacy is under control. It's definitely something that services like us need to take into account, whether it means that we're fuzzing the data, whether it means we're going to be storing data with a different level of precision then you've giving it to us, are all questions up for debate.

If we don't do that, then location-based services won't take off at all, actually. A lot of people will be really concerned about their privacy, no matter how much of a fad there is, or how much uptake there is in the alpha nerd population. But I think if you can provide good methods of privacy control, that can be explained to everyone, so everyone understands what they're doing in a very user-friendly way, then I think there'll be a huge upsurge, because of the value of the data that can come back to people.

James Turner: We're seeing news that the FBI wants ISPs to retain two years of email and two years of surfing records. I don't know how much you could talk about it because of the wonderful government restrictions on this stuff, but is that the kind of thing you guys worry about at all, that you're going to become a source for that kind of intelligence?

Raffi Krikorian: I'm not sure how much I can talk about it. What I will say is that we default to only displaying whatever data you gave us. So we don't hide any data that you give us, unless you're a protected user. Whatever data you give us, we publish back out again. So I guess the answer is yes and no on that point.

James Turner: What do you see as the technical side of geolocation, in terms of what's going to be the new interesting technologies coming along, and how they're going to be used?

Raffi Krikorian: From Twitter's standpoint, it's how do you accept all of this real-time data, index and analyze it and spread it throughout our system in almost real-time. People have traditionally built a bunch of GIS-like systems on top of PostgreSQL or on top of MySQL, and that's fine, but it doesn't scale after a while. After you throw a couple million or a couple hundred million entries at it, the amount it takes for one of those databases to process that, to insert it, all I have to do is select against it, and you can understand it's untenable for real-time operation. And by real-time, I mean sub-second operation.

So the stuff that we're doing is more geared towards how can you accept tweets that are coming in at what you can imagine to be an incredibly fast rate. Tweets are coming in, figure out their location, attach appropriate metadata data to it. Store it in our database. Span it out to anyone who wants to look at it. Run research and analytics on it and index it in their search index, and do this all within a couple of seconds on the way through the system. I think there's a lot of interesting stuff being done out there on how things are being stored, how things are being indexed. But I think our personal contribution will be how do you do it at that kind of speed?


PlanetMySQL Voting: Vote UP / Vote DOWN

#songsincode on Twitter, SongsInCodeDB

Август 21st, 2009

Looking at twitter #songsincode (just search on #songsincode tag), it appears a large chunk of geeky/nerdy world has come to a halt while spending the day expression song titles in code. So far we’ve seen most programming languages as well as CSS and SQL come by. I think it’s a nice example of how “the collective” can become very creative. My favourite SQL ones so far (by @john_chr): SELECT * FROM walk WHERE gait LIKE '%EGYPTIAN%'

Update: a good friend of mine, Steve Thorne (@Jerub), wanted to set up a site for this, so we hacked one up: SongsInCodeDB.


PlanetMySQL Voting: Vote UP / Vote DOWN