Archive for the ‘Pentaho’ Category

451 CAOS Links 2011.10.07

Октябрь 7th, 2011

OpenStack Foundation. New Pentaho CEO. And more.

# Rackspace announced its intention to form an independent OpenStack Foundation.

# HP has chosen Ubuntu as the lead host and guest operating system for its Public Cloud.

# Pentaho appointed Quentin Gallivan as its new CEO.

# Hortonworks continued the discussion about contributions to Apache Hadoop.

# Bob Bickel explained why CloudBees is not, itself, open source.

# Google announced the limited preview release of Google Cloud SQL.

# Eucalyptus Systems, Nebula and Virtual Bridges joined the Linux Foundation.

# Dave Neary discussed the different types of community in relation to the Tizen project.

# Akamai joined the OpenStack community.

# Daniel Abadi provided his perspective on Oracle’s NoSQL Database.

# One more thing…
Apple’s relationship with open source may be somewhat tenuous – Paul Rooney provides some background – but given the impact Steve Jobs has made on the industry as a whole it seems wrong not to mark his passing in some way. We’ll leave the words to the company he created.


PlanetMySQL Voting: Vote UP / Vote DOWN

Proposals for Codebits.EU

Август 15th, 2011
Codebits is an annual 3-day conference about software and, well, code. It's organized by SAPO and this year's edition is to be held on November 10 thru 12 at the Pavilhão Atlântico, Sala Tejo in Lisbon, Portugal.

I've never attended SAPO Codebits before, but I heard good things about it from Datacharmer Giuseppe Maxia. The interesting thing about the way this conference is organized is that all proposals are available to the public, which can also vote for the proposals. This year's proposals are looking very interesting already, with high quality proposals from Giuseppe about database replication with Tungsten replicator, Pentaho's chief of data integration Matt Casters about Kettle (aka Pentaho data integration), and Pedro Alves from webdetails who will be talking about "Big Data" analysis and dashboarding work he did for the Mozilla team.

There are many more interesting talks, and you should simply check out the proposals for yourself and give a thumbs up or a thumbs down according to whether you'd like see a particular proposal at the conference. I decided to send in a few proposals as well:So, if you like what you see here, take a minute to vote and shape this codebits conference. I'm hoping to meet you there!

PlanetMySQL Voting: Vote UP / Vote DOWN

451 CAOS Links 2011.08.09

Август 9th, 2011

Opscode appoints a new CEO. SugarCRM gains a new CFO. And more.

# Opscode named Mitch Hill as CEO, with Jesse Robbins becoming Chief Community Officer.

# SugarCRM claimed billings up 58% in Q2 and appointed a new CFO.

# Tasktop released Tasktop Dev 2.1 and announced Tasktop Sync 1.0.

# Pentaho delivered improved support for Hadoop and various NoSQL database projects.

# The openSUSE community approved its strategy document.

# Dustin Kirkland described the Ubuntu Orchestra Project.


PlanetMySQL Voting: Vote UP / Vote DOWN

Real-time streaming data aggregation

Июль 28th, 2011

Dear Kettle users,

Most of you usually use a data integration engine to process data in a batch-oriented way.  Pentaho Data Integration (Kettle) is typically deployed to run monthly, nightly, hourly workloads.  Sometimes folks run micro-batches of work every minute or so.  However, it’s lesser known that our beloved transformation engine can also be used to stream data indefinitely (never ending) from a source to a target.  This sort of data integration is sometimes referred to as being “streaming“, “real-time“, “near real-time“, “continuous” and so on.  Typical examples of situations where you have a never-ending supply of data that needs to be processed the instance it becomes available are JMS (Java Message Service), RDBMS log sniffing, on-line fraud analyses, web or application log sniffing or of-course … Twitter!  Since Twitter is easily accessed it’s common for examples to pop up regarding it’s usage and in this blog post too we will use this service to demo the Pentaho Data Integration capabilities wrt to processing streaming data.

Here’s what we want to do:

  1. Continuously read all the tweets that are being sent on Twitter.
  2. Extract all the hash-tags used
  3. Count the number of hash-tags used in a one-minute time-window
  4. Report on all the tags that are being used more than once
  5. Put the output in a browser window, continuously update every minute.

This is a very generic example but the logic of this can be applied to different fields like JMS, HL7, log sniffing and so on.  It differs from the excellent work that Vincent from Open-BI described earlier this week on his blog in the sense that his Talend job finishes where ours will never end and where ours will do time-based aggregation in contrast to aggregation over a finite data set.

Also note that in order for Kettle to fully support multiple streaming data sources we would have to implement support for “windowed” (time-based) joins and other nifty things.  We’ve seen very little demand for this sort of requirement in the past, perhaps because people don’t know it’s possible with Kettle.  In any case, if you currently are in need of full streaming data support, have a look at SQLStream, they can help you. SQLStream is co-founded by Pentaho’s Julian Hyde of Mondrian fame.

OK, let’s see how we can solve our little problem with Kettle instead…

1. Continuously read all the tweets that are being sent on Twitter.

For this we are going to use one of the public Twitter web services, one that delivers a never-ending stream of JSON messages:

http://stream.twitter.com/1/statuses/sample.json?delimited=length

Since the format of the output is never-ending and specific in nature I wrote a small “User Defined Java Class” script:

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
HttpClient client = SlaveConnectionManager.getInstance().createHttpClient();
client.setTimeout(10000);
client.setConnectionTimeout(10000);

Credentials creds = new UsernamePasswordCredentials(getParameter("USERNAME"), getParameter("PASSWORD"));
client.getState().setCredentials(AuthScope.ANY, creds);
client.getParams().setAuthenticationPreemptive(true);

HttpMethod method = new PostMethod("http://stream.twitter.com/1/statuses/sample.json?delimited=length");

// Execute request
//
InputStream inputStream=null;
BufferedInputStream bufferedInputStream=null;
try {
int result = client.executeMethod(method);

// the response
//
inputStream = method.getResponseBodyAsStream();
bufferedInputStream = new BufferedInputStream(inputStream, 1000);

StringBuffer bodyBuffer = new StringBuffer();
int opened=0;
int c;
while ( (c=bufferedInputStream.read())!=-1  && !isStopped()) {
char ch = (char)c;
bodyBuffer.append(ch);
if (ch=='{') opened++; else if (ch=='}') opened--;
if (ch=='}' && opened==0) {
// one JSON block, pass it on!
//
Object[] r = createOutputRow(new Object[0], data.outputRowMeta.size());
String jsonString = bodyBuffer.toString();

int startIndex = jsonString.indexOf("{");
if (startIndex<0) startIndex=0;

// System.out.print("index="+startIndex+" json="+jsonString.substring(startIndex));

r[0] = jsonString.substring(startIndex);
putRow(data.outputRowMeta, r);

bodyBuffer.setLength(0);
}
}
} catch(Exception e) {
throw new KettleException("Unable to get tweets", e);
} finally {
bufferedInputStream.reset();
bufferedInputStream.close();
}

setOutputDone();
return false;
}

As the experienced UDJC writers among you will notice: this step never ends as long as the twitter service keeps on sending more data.  Depending on the stability and popularity of twitter that can be “a very long time“.

You could improve the code even further to re-connect to the service every time it drops away.  Personally I would not do this.  I would rather have the transformation terminate with an error (as it is implemented now), send an alert (e-mail, database, SNMP) and re-start the transformation in a loop in a job.  That way you have a trace in case twitter dies for a few hours.

2. Extract all the hash-tags used

First we’ll parse the JSON returned by the twitter service, extract the first 5 hash-tags from the message, split this up into 5 rows and count the tags…

3. Count the number of hash-tags used in a one-minute time-window

The counting is easy as you can simply use a “Group by”  step.  However, how can we aggregate in a time-based fashion without too much tinkering?   Well, we now have the “Single Threader” step which has the option to aggregate in a time-based manner so we might as well use this option:

This step simply accumulates all records in memory until 60 seconds have passed and then performs one iteration of the single threaded execution of the specified transformation.  This is a special execution method that doesn’t use the typical parallel engine.  Another cool thing about this engine is that the records that go into the engine in the time-window can be grouped and sorted without the transformation being restarted every minute.

4. Report on all the tags that are being used more than once

The filtering is done with a simple “Filter Rows” step.  However, thanks to the magic of the “Single Threader” step we can sort the rows descending by the tag occurrence count in that one-minute time-window.  It’s also interesting to note that if you have huge amounts of data, that you can easily parallelize your work by starting multiple copies of the single threader step and/or with some clever data partitioning.  In our case we could partition by hash-tag or re-aggregate the aggregated data.

5. Put the output in a browser window, continuously update every minute.

As shown in an earlier blog post, we can do this quite easily with a “Text File Output” step.  However, we also want to put a small header and a separator between the data from every minute so we end up with a transformation that looks like this:

The script to print the header looks like this:

var out;
if (out==null) {
out = _step_.getTrans().getServletPrintWriter();
out.println("'Real-time' twitter hashtag report, minute based");
out.flush();
}

The separator between each minute is simple too:

if (nr==1) {
var out = _step_.getTrans().getServletPrintWriter();
  out.println("============================================");
out.println();
  out.flush();
}

You can execute this transformation on a Carte instance (4.2.0) and see the following output:

'Real-time' twitter hashtag report, minute based
=================================================

nr;hashtag;count;from;to
1;tatilmayonezi;5;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
2;AUGUST6THBUZZNIGHTCLUB;3;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
3;teamfollowback;3;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
4;ayamzaman;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
5;dnd;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
6;follow;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
7;malhação;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
8;rappernames;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
9;thingswelearnedontwitter;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
=================================================

1;ska;5;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
2;followplanetjedward;4;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
3;chistede3pesos;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
4;NP;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
5;rappernames;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
6;tatilmayonezi;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
7;teamfollowback;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
8;AvrilBeatsVolcano;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
9;CM6;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
10;followme;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
11;Leão;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
12;NewArtists;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
13;OOMF;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
14;RETWEET;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
15;sougofollow;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
16;swag;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
17;thingswelearnedontwitter;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000

...

For reference, I used the following URL to start the streaming report:

http://cluster:cluster@127.0.0.1:8282/kettle/executeTrans/?trans=%2Fhome%2Fmatt%2Ftest-stuff%2FTwitter Stream%2FRead a twitter stream.ktr&USERNAME=MyTwitterAccount&PASSWORD=MyPassword

I placed the complete example over here in case you want to try this yourself on PDI/Kettle version 4.2.0-RC1 or later. Things you can add to make it even cooler is to have this transformation send an e-mail every time a certain hash-tag gets used more than 15 times in a given minute.  That sort of alerting support for example gives you easy access to emerging new trends, events and memes.

For reference, take a look at this earlier blog post of mine where I describe the internal cleanup mechanisms inside of Kettle that prevent our transformation from ever running out of memory or resources.

Until next time,

Matt


PlanetMySQL Voting: Vote UP / Vote DOWN

Real-time streaming data aggregation

Июль 28th, 2011

Dear Kettle users,

Most of you usually use a data integration engine to process data in a batch-oriented way.  Pentaho Data Integration (Kettle) is typically deployed to run monthly, nightly, hourly workloads.  Sometimes folks run micro-batches of work every minute or so.  However, it’s lesser known that our beloved transformation engine can also be used to stream data indefinitely (never ending) from a source to a target.  This sort of data integration is sometimes referred to as being “streaming“, “real-time“, “near real-time“, “continuous” and so on.  Typical examples of situations where you have a never-ending supply of data that needs to be processed the instance it becomes available are JMS (Java Message Service), RDBMS log sniffing, on-line fraud analyses, web or application log sniffing or of-course … Twitter!  Since Twitter is easily accessed it’s common for examples to pop up regarding it’s usage and in this blog post too we will use this service to demo the Pentaho Data Integration capabilities wrt to processing streaming data.

Here’s what we want to do:

  1. Continuously read all the tweets that are being sent on Twitter.
  2. Extract all the hash-tags used
  3. Count the number of hash-tags used in a one-minute time-window
  4. Report on all the tags that are being used more than once
  5. Put the output in a browser window, continuously update every minute.

This is a very generic example but the logic of this can be applied to different fields like JMS, HL7, log sniffing and so on.  It differs from the excellent work that Vincent from Open-BI described earlier this week on his blog in the sense that his Talend job finishes where ours will never end and where ours will do time-based aggregation in contrast to aggregation over a finite data set.

Also note that in order for Kettle to fully support multiple streaming data sources we would have to implement support for “windowed” (time-based) joins and other nifty things.  We’ve seen very little demand for this sort of requirement in the past, perhaps because people don’t know it’s possible with Kettle.  In any case, if you currently are in need of full streaming data support, have a look at SQLStream, they can help you. SQLStream is co-founded by Pentaho’s Julian Hyde of Mondrian fame.

OK, let’s see how we can solve our little problem with Kettle instead…

1. Continuously read all the tweets that are being sent on Twitter.

For this we are going to use one of the public Twitter web services, one that delivers a never-ending stream of JSON messages:

http://stream.twitter.com/1/statuses/sample.json?delimited=length

Since the format of the output is never-ending and specific in nature I wrote a small “User Defined Java Class” script:

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
HttpClient client = SlaveConnectionManager.getInstance().createHttpClient();
client.setTimeout(10000);
client.setConnectionTimeout(10000);

Credentials creds = new UsernamePasswordCredentials(getParameter("USERNAME"), getParameter("PASSWORD"));
client.getState().setCredentials(AuthScope.ANY, creds);
client.getParams().setAuthenticationPreemptive(true);

HttpMethod method = new PostMethod("http://stream.twitter.com/1/statuses/sample.json?delimited=length");

// Execute request
//
InputStream inputStream=null;
BufferedInputStream bufferedInputStream=null;
try {
int result = client.executeMethod(method);

// the response
//
inputStream = method.getResponseBodyAsStream();
bufferedInputStream = new BufferedInputStream(inputStream, 1000);

StringBuffer bodyBuffer = new StringBuffer();
int opened=0;
int c;
while ( (c=bufferedInputStream.read())!=-1  && !isStopped()) {
char ch = (char)c;
bodyBuffer.append(ch);
if (ch=='{') opened++; else if (ch=='}') opened--;
if (ch=='}' && opened==0) {
// one JSON block, pass it on!
//
Object[] r = createOutputRow(new Object[0], data.outputRowMeta.size());
String jsonString = bodyBuffer.toString();

int startIndex = jsonString.indexOf("{");
if (startIndex<0) startIndex=0;

// System.out.print("index="+startIndex+" json="+jsonString.substring(startIndex));

r[0] = jsonString.substring(startIndex);
putRow(data.outputRowMeta, r);

bodyBuffer.setLength(0);
}
}
} catch(Exception e) {
throw new KettleException("Unable to get tweets", e);
} finally {
bufferedInputStream.reset();
bufferedInputStream.close();
}

setOutputDone();
return false;
}

As the experienced UDJC writers among you will notice: this step never ends as long as the twitter service keeps on sending more data.  Depending on the stability and popularity of twitter that can be “a very long time“.

You could improve the code even further to re-connect to the service every time it drops away.  Personally I would not do this.  I would rather have the transformation terminate with an error (as it is implemented now), send an alert (e-mail, database, SNMP) and re-start the transformation in a loop in a job.  That way you have a trace in case twitter dies for a few hours.

2. Extract all the hash-tags used

First we’ll parse the JSON returned by the twitter service, extract the first 5 hash-tags from the message, split this up into 5 rows and count the tags…

3. Count the number of hash-tags used in a one-minute time-window

The counting is easy as you can simply use a “Group by”  step.  However, how can we aggregate in a time-based fashion without too much tinkering?   Well, we now have the “Single Threader” step which has the option to aggregate in a time-based manner so we might as well use this option:

This step simply accumulates all records in memory until 60 seconds have passed and then performs one iteration of the single threaded execution of the specified transformation.  This is a special execution method that doesn’t use the typical parallel engine.  Another cool thing about this engine is that the records that go into the engine in the time-window can be grouped and sorted without the transformation being restarted every minute.

4. Report on all the tags that are being used more than once

The filtering is done with a simple “Filter Rows” step.  However, thanks to the magic of the “Single Threader” step we can sort the rows descending by the tag occurrence count in that one-minute time-window.  It’s also interesting to note that if you have huge amounts of data, that you can easily parallelize your work by starting multiple copies of the single threader step and/or with some clever data partitioning.  In our case we could partition by hash-tag or re-aggregate the aggregated data.

5. Put the output in a browser window, continuously update every minute.

As shown in an earlier blog post, we can do this quite easily with a “Text File Output” step.  However, we also want to put a small header and a separator between the data from every minute so we end up with a transformation that looks like this:

The script to print the header looks like this:

var out;
if (out==null) {
out = _step_.getTrans().getServletPrintWriter();
out.println("'Real-time' twitter hashtag report, minute based");
out.flush();
}

The separator between each minute is simple too:

if (nr==1) {
var out = _step_.getTrans().getServletPrintWriter();
  out.println("============================================");
out.println();
  out.flush();
}

You can execute this transformation on a Carte instance (4.2.0) and see the following output:

'Real-time' twitter hashtag report, minute based
=================================================

nr;hashtag;count;from;to
1;tatilmayonezi;5;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
2;AUGUST6THBUZZNIGHTCLUB;3;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
3;teamfollowback;3;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
4;ayamzaman;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
5;dnd;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
6;follow;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
7;malhação;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
8;rappernames;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
9;thingswelearnedontwitter;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
=================================================

1;ska;5;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
2;followplanetjedward;4;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
3;chistede3pesos;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
4;NP;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
5;rappernames;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
6;tatilmayonezi;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
7;teamfollowback;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
8;AvrilBeatsVolcano;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
9;CM6;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
10;followme;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
11;Leão;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
12;NewArtists;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
13;OOMF;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
14;RETWEET;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
15;sougofollow;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
16;swag;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
17;thingswelearnedontwitter;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000

...

For reference, I used the following URL to start the streaming report:

http://cluster:cluster@127.0.0.1:8282/kettle/executeTrans/?trans=%2Fhome%2Fmatt%2Ftest-stuff%2FTwitter Stream%2FRead a twitter stream.ktr&USERNAME=MyTwitterAccount&PASSWORD=MyPassword

I placed the complete example over here in case you want to try this yourself on PDI/Kettle version 4.2.0-RC1 or later. Things you can add to make it even cooler is to have this transformation send an e-mail every time a certain hash-tag gets used more than 15 times in a given minute.  That sort of alerting support for example gives you easy access to emerging new trends, events and memes.

For reference, take a look at this earlier blog post of mine where I describe the internal cleanup mechanisms inside of Kettle that prevent our transformation from ever running out of memory or resources.

Until next time,

Matt


PlanetMySQL Voting: Vote UP / Vote DOWN

Real-time streaming data aggregation

Июль 28th, 2011

Dear Kettle users,

Most of you usually use a data integration engine to process data in a batch-oriented way.  Pentaho Data Integration (Kettle) is typically deployed to run monthly, nightly, hourly workloads.  Sometimes folks run micro-batches of work every minute or so.  However, it’s lesser known that our beloved transformation engine can also be used to stream data indefinitely (never ending) from a source to a target.  This sort of data integration is sometimes referred to as being “streaming“, “real-time“, “near real-time“, “continuous” and so on.  Typical examples of situations where you have a never-ending supply of data that needs to be processed the instance it becomes available are JMS (Java Message Service), RDBMS log sniffing, on-line fraud analyses, web or application log sniffing or of-course … Twitter!  Since Twitter is easily accessed it’s common for examples to pop up regarding it’s usage and in this blog post too we will use this service to demo the Pentaho Data Integration capabilities wrt to processing streaming data.

Here’s what we want to do:

  1. Continuously read all the tweets that are being sent on Twitter.
  2. Extract all the hash-tags used
  3. Count the number of hash-tags used in a one-minute time-window
  4. Report on all the tags that are being used more than once
  5. Put the output in a browser window, continuously update every minute.

This is a very generic example but the logic of this can be applied to different fields like JMS, HL7, log sniffing and so on.  It differs from the excellent work that Vincent from Open-BI described earlier this week on his blog in the sense that his Talend job finishes where ours will never end and where ours will do time-based aggregation in contrast to aggregation over a finite data set.

Also note that in order for Kettle to fully support multiple streaming data sources we would have to implement support for “windowed” (time-based) joins and other nifty things.  We’ve seen very little demand for this sort of requirement in the past, perhaps because people don’t know it’s possible with Kettle.  In any case, if you currently are in need of full streaming data support, have a look at SQLStream, they can help you. SQLStream is co-founded by Pentaho’s Julian Hyde of Mondrian fame.

OK, let’s see how we can solve our little problem with Kettle instead…

1. Continuously read all the tweets that are being sent on Twitter.

For this we are going to use one of the public Twitter web services, one that delivers a never-ending stream of JSON messages:

http://stream.twitter.com/1/statuses/sample.json?delimited=length

Since the format of the output is never-ending and specific in nature I wrote a small “User Defined Java Class” script:

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
HttpClient client = SlaveConnectionManager.getInstance().createHttpClient();
client.setTimeout(10000);
client.setConnectionTimeout(10000);

Credentials creds = new UsernamePasswordCredentials(getParameter("USERNAME"), getParameter("PASSWORD"));
client.getState().setCredentials(AuthScope.ANY, creds);
client.getParams().setAuthenticationPreemptive(true);

HttpMethod method = new PostMethod("http://stream.twitter.com/1/statuses/sample.json?delimited=length");

// Execute request
//
InputStream inputStream=null;
BufferedInputStream bufferedInputStream=null;
try {
int result = client.executeMethod(method);

// the response
//
inputStream = method.getResponseBodyAsStream();
bufferedInputStream = new BufferedInputStream(inputStream, 1000);

StringBuffer bodyBuffer = new StringBuffer();
int opened=0;
int c;
while ( (c=bufferedInputStream.read())!=-1  && !isStopped()) {
char ch = (char)c;
bodyBuffer.append(ch);
if (ch=='{') opened++; else if (ch=='}') opened--;
if (ch=='}' && opened==0) {
// one JSON block, pass it on!
//
Object[] r = createOutputRow(new Object[0], data.outputRowMeta.size());
String jsonString = bodyBuffer.toString();

int startIndex = jsonString.indexOf("{");
if (startIndex<0) startIndex=0;

// System.out.print("index="+startIndex+" json="+jsonString.substring(startIndex));

r[0] = jsonString.substring(startIndex);
putRow(data.outputRowMeta, r);

bodyBuffer.setLength(0);
}
}
} catch(Exception e) {
throw new KettleException("Unable to get tweets", e);
} finally {
bufferedInputStream.reset();
bufferedInputStream.close();
}

setOutputDone();
return false;
}

As the experienced UDJC writers among you will notice: this step never ends as long as the twitter service keeps on sending more data.  Depending on the stability and popularity of twitter that can be “a very long time“.

You could improve the code even further to re-connect to the service every time it drops away.  Personally I would not do this.  I would rather have the transformation terminate with an error (as it is implemented now), send an alert (e-mail, database, SNMP) and re-start the transformation in a loop in a job.  That way you have a trace in case twitter dies for a few hours.

2. Extract all the hash-tags used

First we’ll parse the JSON returned by the twitter service, extract the first 5 hash-tags from the message, split this up into 5 rows and count the tags…

3. Count the number of hash-tags used in a one-minute time-window

The counting is easy as you can simply use a “Group by”  step.  However, how can we aggregate in a time-based fashion without too much tinkering?   Well, we now have the “Single Threader” step which has the option to aggregate in a time-based manner so we might as well use this option:

This step simply accumulates all records in memory until 60 seconds have passed and then performs one iteration of the single threaded execution of the specified transformation.  This is a special execution method that doesn’t use the typical parallel engine.  Another cool thing about this engine is that the records that go into the engine in the time-window can be grouped and sorted without the transformation being restarted every minute.

4. Report on all the tags that are being used more than once

The filtering is done with a simple “Filter Rows” step.  However, thanks to the magic of the “Single Threader” step we can sort the rows descending by the tag occurrence count in that one-minute time-window.  It’s also interesting to note that if you have huge amounts of data, that you can easily parallelize your work by starting multiple copies of the single threader step and/or with some clever data partitioning.  In our case we could partition by hash-tag or re-aggregate the aggregated data.

5. Put the output in a browser window, continuously update every minute.

As shown in an earlier blog post, we can do this quite easily with a “Text File Output” step.  However, we also want to put a small header and a separator between the data from every minute so we end up with a transformation that looks like this:

The script to print the header looks like this:

var out;
if (out==null) {
out = _step_.getTrans().getServletPrintWriter();
out.println("'Real-time' twitter hashtag report, minute based");
out.flush();
}

The separator between each minute is simple too:

if (nr==1) {
var out = _step_.getTrans().getServletPrintWriter();
  out.println("============================================");
out.println();
  out.flush();
}

You can execute this transformation on a Carte instance (4.2.0) and see the following output:

'Real-time' twitter hashtag report, minute based
=================================================

nr;hashtag;count;from;to
1;tatilmayonezi;5;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
2;AUGUST6THBUZZNIGHTCLUB;3;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
3;teamfollowback;3;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
4;ayamzaman;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
5;dnd;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
6;follow;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
7;malhação;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
8;rappernames;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
9;thingswelearnedontwitter;2;2011/07/27 22:52:43.000;2011/07/27 22:53:32.000
=================================================

1;ska;5;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
2;followplanetjedward;4;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
3;chistede3pesos;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
4;NP;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
5;rappernames;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
6;tatilmayonezi;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
7;teamfollowback;3;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
8;AvrilBeatsVolcano;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
9;CM6;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
10;followme;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
11;Leão;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
12;NewArtists;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
13;OOMF;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
14;RETWEET;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
15;sougofollow;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
16;swag;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000
17;thingswelearnedontwitter;2;2011/07/27 22:53:35.000;2011/07/27 22:54:47.000

...

For reference, I used the following URL to start the streaming report:

http://cluster:cluster@127.0.0.1:8282/kettle/executeTrans/?trans=%2Fhome%2Fmatt%2Ftest-stuff%2FTwitter Stream%2FRead a twitter stream.ktr&USERNAME=MyTwitterAccount&PASSWORD=MyPassword

I placed the complete example over here in case you want to try this yourself on PDI/Kettle version 4.2.0-RC1 or later. Things you can add to make it even cooler is to have this transformation send an e-mail every time a certain hash-tag gets used more than 15 times in a given minute.  That sort of alerting support for example gives you easy access to emerging new trends, events and memes.

For reference, take a look at this earlier blog post of mine where I describe the internal cleanup mechanisms inside of Kettle that prevent our transformation from ever running out of memory or resources.

Until next time,

Matt


PlanetMySQL Voting: Vote UP / Vote DOWN

PDI Loading into LucidDB

Июнь 30th, 2011

By far, the most popular way for PDI users to load data into LucidDB is to use the PDI Streaming Loader. The streaming loader is a native PDI step that:

  • Enables high performance loading, directly over the network without the need for intermediate IO and shipping of data files.
  • Lets users choose more interesting (from a DW perspective) loading type into tables. In particular, in addition to simple INSERTs it allows for MERGE (aka UPSERT) and also UPDATE. All done, in the same, bulk loader.
  • Enables the metadata for the load to be managed, scheduled, and run in PDI.

201106292256.jpg

However, we’ve had some known issues. In fact, until PDI 4.2 GA and LucidDB 0.9.4 GA it’s pretty problematic unless you run through the process of patching LucidDB outlined on this page: Known Issues.

In some ways, we have to admit, that we released this piece of software too soon. Early and often comes with some risk, and many have felt the pain of some of the issues that have been discovered with the streaming loader.

In some ways, we’ve built an unnatural approach to loading for PDI: PDI wants to PUSH data into a database. LucidDB wants to PULL data from remote sources, with it’s integrated ELT and DML based approach (with connectors to databases, salesforce, etc).   Our streaming loader “fakes” a pull data source, and allows PDI to “push” into it.

There’s mutliple threads involved, when exceptions happen users have received cruddy error messages such as “Broken Pipe” that are unhelpful at best, frustrating at worse. Most all of these contortions will have sorted themselves out and by the time 4.2 GA PDI and 0.9.4 GA of LucidDB are released the streaming loader should be working A-OK. Some users would just assume avoid the patch instructions above and have posed the question: In a general sense, if not the streaming loader how would I load data into LucidDB?

Again, LucidDB likes to “pull” data from remote sources. One of those is CSV files. Here’s a nice, easy, quick (30k r/s on my MacBook) method to load a million rows using PDI and LucidDB:

201106292249.jpg

This transformation outputs to a Text File 1 million rows, waits for that to complete then proceeds to the load that data into a new table in LucidDB. Step by Step the LucidDB statements

— Points LucidDB to the directory with the just generated flat file
— LucidDB has some defaults, and we can “guess” the datatypes by scanning the file
CREATE or replace SERVER csv_file_server FOREIGN DATA WRAPPER SYS_FILE_WRAPPER OPTIONS ( DIRECTORY ‘?’ );
— Let’s create a foreign table for the data file (“DATA.txt”) that was output by PDI
>create foreign table applib.data server csv_file_server;
— Create a staging, and load the data from the flat file (select * from applib.data)
CALL APPLIB.CREATE_TABLE_AS (‘APPLIB’, ‘STAGING_TABLE’, ‘select * from applib.data’, true);

We hope to have the streaming loader ready to go in 0.9.4 (LucidDB) and 4.2 (PDI). Until then, consider this easy, straight forward method of loading data that’s high performance, proven, and stable for loading data from PDI into LucidDB.

Example file: csv_luciddb_load.ktr


PlanetMySQL Voting: Vote UP / Vote DOWN

HPCC vs Hadoop at a glance

Июнь 18th, 2011
Yesterday I noticed this tweet by Andrei Savu: . This prompted me to read the related GigaOM article and then check out the HPCC Systems website.

If you're too lazy to read the article or visit that website:
HPCC (High Performance Computing Cluster) is a massive parallel-processing computing platform that solves Big Data problems. The platform is now Open Source!


HPCC Systems compares itself to Hadoop, which I think is completely justified in terms of functionality. Its product originated as a homegrown solution of LexisNexis Risk Solutions allowing its customers (banks, insurance companies, law enforcment and federal government) to quickly analyze billions of records, and as such it has been in use for a decade or so. It is now open sourced, and I already heard an announcement that Pentaho is its major Business Intelligence Partner.

Based on the limited information a made a quick analysis, which I emailed to the HPCC Systems CTO, Armando Escalante. My friend Jos van Dongen said it was a good analysis and told me I should post it. Now, I don't really have time to make a nice blog post out of it, but I figured it can't hurt to just repeat what I said in my emails. So here goes:

Just going by the documentation, I see a two real unique selling points in HPCC Systems as compared to Hadoop:

  • Real-time query performance (as opposed to only analytic jobs). HPCC offers two difference setups, labelled Thor and Roxie. Functionalitywise, Thor should be compared to a Map/Reduce cluster like Hadoop: it's good for doing fairly long running analyses on large volumes of data. Roxie is a different beast, and designed to offer fast data access, supporting ad-hoc real-time queries
  • Integrated toolset (as opposed to hodgepodge of third party tools). We're talking about an IDE, job monitoring, code repository, scheduler, configuration manager, and whatnot. This really looks like like big productivity boosters, which may make Big Data processing a lot more accessible to companies that don't have the kind of development teams required to work with Hadoop.

(there may be many more benefits, but these are just the ones I could clearly distill from the press release and the website)

Especially for Business Intelligence, Roxie maybe a big thing. If real-time Big Data queries could be integrated with Business Intelligence OLAP and reporting tools, then this is certainly a big thing. I can't disclose the details but I have trustworthy information that integration with Pentaho's Analysis Engine, the Mondrian ROLAP engine is underway and will be available as an Enterprise feature.

A few things that look different but which may not matter too much when looking at HPCC and Hadoop from a distance:
  • ECL, the "Enterprise Control Language", which is a declarative query language (as opposed to just Map/Reduce). This initially seems like a big difference but Hadoop has tools like pig and sqoop and hive. Now, it could be that ECL is vastly superior to these hadoop tools, but my hunch is you'd have to be careful in how you position that. If you choose a head-on strategy in promoting ECL as opposed to pig, then the chances are that people will just spend their energy in discovering the things that pig can do and ECL cannot (not sure if those features actually exist, but that is what hadoop fanboys will look for), and in addition, the pig developers might simply clone the unique ECL features and the leveling of that playing field will just be a matter of time. This does not mean you shouldn't promote ECL - on the contrary, if you feel it is a more productive language than pig or any other hadoop tool, then by all means let your customers and prospects know. Just be careful and avoid downplaying the hadoop equivalents because that strategy could backfire.

  • Windows support. It's really nice that HPCC Systems is available for Microsoft Windows, it makes that a lot easier for Microsoft shops (and there are a lot of them). That said, customers that really have a big-data problem will solve it no matter what their internal software policies are. So they'd happily start running hadoop on linux if that solves their problems.
  • Maturity. On paper HPCC looks more mature than hadoop. It's hard to tell how much that matters though because hadoop has all the momentum. People might choose for hadoop because they anticipate that the maturity will come thanks to the sheer number of developers committing to that platform.


The only thing I can think of where HPCC looks like it has a disadvantage as compared to Hadoop is adoption rate and licensing. I hope these will prove not to be significant hurdles for HPCC, but I think that these might be bigger problems then they seem. Especially the AGPL licensing seems problematic to me.

The AGPL is not well regarded by anyone I know - not in the open source world. The general idea seems to be that even more than plain GPL3 it restricts how the software may be used. If the goal of open sourcing HPCC is to gain mindshare and a developer community (something that hadoop has done and is doing extremely well) then a more permissive license is really the way to go.

If you look at products like MySQL but also Pentaho - they are both very strongly corporately led products. The have a good number of users, but few contributions from outside the company, and this is probably due to a combination of GPL licensing and the additional requirement for handing over the copyright of any contributions to the company. Hence these products don't really benefit from an open source development model (or at least not as much as they could). For these companies, Open source may help initially to gain a lot of users, but those are in majority the users that just want a free ride: conversion rates to enterprise edition customers are quite low. It might be enough to make a decent buck, but eventually you'll hit a cap on how far you can grow. I'm not saying this is bad - you only need to grow as much as you have to, but it is something to be aware of.

Contrast this to Hadoop. The have a Apache 2.0 permissive license, and this results in many individuals but also companies contributing to the project. And there are still companies like Cloudera that manage to make a good living off of the services around their distribution of Hadoop. You don't lose the ability to develop add-ons either with this model - apache 2.0 allows all that. The difference with GPL (and AGPL) of course is that it allows this also to other users and companies. So the trick to stay on top in this model is to simply offer the best product (as opposed to being the sole holder of the copyright to he code).

Anyway - that is it for now - I hope this is helpful.

PlanetMySQL Voting: Vote UP / Vote DOWN

Kettle vs Oracle REF CURSOR

Ноябрь 17th, 2010

Dear Kettle fans,

PDI-200 has been out there for a while now.  Jens created the feature request a little over 3 years ago.  I guess the main thing blocking this issue was not as much a technical problem but more of a licensing and dependency one (Oracle JDBC dependency and distribution license).

However, now that we have the User Defined Java Class step we can work around those pesky problems. That is because the Java code in there only gets compiled and executed at runtime so it’s perfectly fine to create any sort of dependency in there you like.

The following transformation reads a set of rows from a stored procedure as described on this web page.

In short, our UDJC step executes the following code:

begin ? := sp_get_stocks(?); end;

The result is a set of rows and the parameter is in this case a single numeric value.

The step contains mostly Java code but thanks to configuration options you only need to do 2 things to make this work for your own REF CURSOR returning procedures…

First you need to specify the output fields of the rows…

And then you need to specify the parameters:

The source code for this sample transformation is over here and runs on Pentaho Data Integration version 4.x (or higher).  All in all it only took a few hours to write these 150 lines of Java so perhaps it can serve as inspiration for other similar problems you might have with Oracle or other databases.

Until next time,

Matt


PlanetMySQL Voting: Vote UP / Vote DOWN

How Real is the Data Deluge?

Октябрь 11th, 2010

Servers

It seems obvious that given the decreasing cost of storage and computation, there's going to be a significant increase in the volume of data that organizations accumulate over the next 10 years.  But the type of data being accumulated may be different from the areas where traditional DBMSs dominated.  It's not just about transactions; it's search patterns, on-line behavior, click-thru data, events fired off by smartphones, messages over Twitter & Facebook, log data of various kinds.

If an organization can figure out a better way identify prospects, or deliver more targeted ads, or optimize pricing decisions by analyzing terrabytes of data, they'd be crazy not to. Over the long term, companies that don't develop these capabilities will be at a competitive disadvantage.

As to what the implications are from a technological perspective, that's a whole different can of worms. I'm starting to see adoption of Big Data technologies like Hadoop, HDFS, Cassandra, MongoDB, XML databases, analysis with R, Pentaho, and loads of other technologies.  And MySQL continues to play a role here as do other traditional relational databases.  Over the next few months, I'm going to dig down deeper with people using these technologies to try and discern the emerging customer patterns.

If you're in this space or using some of these technologies, let me know your thoughts. What volume of data are you dealing with?  How many nodes or servers are you using?  Are you running on a public cloud, private cloud or hybrid? What technologies did you evaluate?  What about traditional DBMSs didn't work for this scenario? 


PlanetMySQL Voting: Vote UP / Vote DOWN