From 52ae6bb113c0d945d89065a88ef5b5b85259f14e Mon Sep 17 00:00:00 2001 From: SynLz Date: Thu, 1 Dec 2016 18:16:54 +0800 Subject: [PATCH] commited by ldong for extend the gobblin json format input. --- .../example/simplejson/TestJsonFieldGet.java | 60 +++++++++++++ .../TimeBasedJsonWriterPartitioner.java | 89 +++++++++++++++++++ 2 files changed, 149 insertions(+) create mode 100644 gobblin-example/src/main/java/gobblin/example/simplejson/TestJsonFieldGet.java create mode 100644 gobblin-example/src/main/java/gobblin/example/simplejson/TimeBasedJsonWriterPartitioner.java diff --git a/gobblin-example/src/main/java/gobblin/example/simplejson/TestJsonFieldGet.java b/gobblin-example/src/main/java/gobblin/example/simplejson/TestJsonFieldGet.java new file mode 100644 index 00000000000..d93e24caa00 --- /dev/null +++ b/gobblin-example/src/main/java/gobblin/example/simplejson/TestJsonFieldGet.java @@ -0,0 +1,60 @@ +package gobblin.example.simplejson; + +import com.google.common.base.Optional; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; + +public class TestJsonFieldGet { + private static final Logger logger = LoggerFactory.getLogger(TestJsonFieldGet.class); + + private long getRecordTimestamp(Optional writerPartitionColumnValue) { + return writerPartitionColumnValue.orNull() instanceof Long ? (Long) writerPartitionColumnValue.get() + : System.currentTimeMillis(); + } + + private long getRecordTimestamp(byte[] record) { + return getRecordTimestamp(getWriterPartitionColumnValue(record)); + } + + private Optional getWriterPartitionColumnValue(byte[] record) { + + Optional fieldValue = Optional.absent(); + + JSONObject jsonObject; + try { + jsonObject = new JSONObject(new String(record)); + + if (jsonObject.get("time") != null) { + logger.info("the special time partition field value is " + jsonObject.get("time")); + SimpleDateFormat format = new SimpleDateFormat("yyyy/MM/dd/HH"); + Date date; + try { + date = format.parse((String) jsonObject.get("time")); + long finalTime = date.getTime(); + fieldValue = Optional.of(finalTime); + } catch (ParseException e) { + logger.info(e.getMessage()); + } + } + if (fieldValue.isPresent()) { + return fieldValue; + } + } catch (JSONException e) { + e.printStackTrace(); + } + return fieldValue; + } + + public static void main(String[] args) { + String jsonString = "{\"time\":\"2016/09/18/02\",\"dataquantity\":4096}"; + TestJsonFieldGet tjf = new TestJsonFieldGet(); + long time = tjf.getRecordTimestamp(jsonString.getBytes()); + System.out.println(time); + } +} diff --git a/gobblin-example/src/main/java/gobblin/example/simplejson/TimeBasedJsonWriterPartitioner.java b/gobblin-example/src/main/java/gobblin/example/simplejson/TimeBasedJsonWriterPartitioner.java new file mode 100644 index 00000000000..0060e6e58e7 --- /dev/null +++ b/gobblin-example/src/main/java/gobblin/example/simplejson/TimeBasedJsonWriterPartitioner.java @@ -0,0 +1,89 @@ +package gobblin.example.simplejson; + +import com.google.common.base.Optional; +import gobblin.configuration.ConfigurationKeys; +import gobblin.configuration.State; +import gobblin.util.ForkOperatorUtils; +import gobblin.writer.partitioner.TimeBasedWriterPartitioner; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; + +/** + * @author cssdongl@gmail.com + * @version V1.0 + */ +public class TimeBasedJsonWriterPartitioner extends TimeBasedWriterPartitioner { + + private static final Logger logger = LoggerFactory.getLogger(TimeBasedJsonWriterPartitioner.class); + + public static final String WRITER_PARTITION_COLUMNS = ConfigurationKeys.WRITER_PREFIX + ".partition.columns"; + + private final Optional> partitionColumns; + + public TimeBasedJsonWriterPartitioner(State state) { + this(state, 1, 0); + } + + public TimeBasedJsonWriterPartitioner(State state, int numBranches, int branchId) { + super(state, numBranches, branchId); + this.partitionColumns = getWriterPartitionColumns(state, numBranches, branchId); + } + + private static Optional> getWriterPartitionColumns(State state, int numBranches, int branchId) { + String propName = ForkOperatorUtils.getPropertyNameForBranch(WRITER_PARTITION_COLUMNS, numBranches, branchId); + return state.contains(propName) ? Optional.of(state.getPropAsList(propName)) : Optional.> absent(); + } + + private long getRecordTimestamp(Optional writerPartitionColumnValue) { + return writerPartitionColumnValue.orNull() instanceof Long ? (Long) writerPartitionColumnValue.get() + : System.currentTimeMillis(); + } + + @Override + public long getRecordTimestamp(byte[] record) { + return getRecordTimestamp(getWriterPartitionColumnValue(record)); + } + + + /** + * get the timestamp field in the json record that partition the hdfs dirs. + */ + private Optional getWriterPartitionColumnValue(byte[] record) { + logger.info("Get the json field begin"); + if (!this.partitionColumns.isPresent()) { + return Optional.absent(); + } + + Optional fieldValue = Optional.absent(); + + for (String partitionColumn : this.partitionColumns.get()) { + JSONObject jsonObject; + try { + jsonObject = new JSONObject(new String(record)); + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Date date; + try { + date = format.parse((String) jsonObject.get(partitionColumn)); + long finalTime = date.getTime(); + fieldValue = Optional.of(finalTime); + } catch (ParseException e) { + logger.info(e.getMessage()); + } + + if (fieldValue.isPresent()) { + return fieldValue; + } + } catch (JSONException e) { + e.printStackTrace(); + } + } + return fieldValue; + } +}